Compare commits

...

2 Commits

2 changed files with 1431 additions and 66 deletions

View File

@@ -412,77 +412,101 @@ class SILOQYSymbolDiscoveryActor(Actor):
self.log.error(f"Nautilus ActorExecutor: Failed to complete symbol discovery: {e}") self.log.error(f"Nautilus ActorExecutor: Failed to complete symbol discovery: {e}")
# Don't re-raise, let system continue # Don't re-raise, let system continue
async def _discover_all_symbols(self): async def _discover_all_symbols(self):
"""PRESERVED: Original Binance symbol discovery algorithm""" """PRESERVED: Original Binance symbol discovery algorithm"""
self.log.info("Starting dynamic symbol discovery from Binance...") self.log.info("Starting dynamic symbol discovery from Binance...")
url = "https://api.binance.com/api/v3/exchangeInfo" url = "https://api.binance.com/api/v3/exchangeInfo"
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
self.log.info("Fetching exchange info from Binance API...") # RATE LIMIT CHECK - Before anything else
response = await client.get(url, timeout=10) self.log.info("Checking Binance API rate limit status...")
if response.status_code == 200: time_check_url = "https://api.binance.com/api/v3/time"
self.log.info("Successfully received exchange info")
data = response.json()
# STEP 1: Collect all symbol names first try:
self.log.info("Collecting all available symbols...") rate_check_response = await client.get(time_check_url, timeout=5)
full_symbols = []
for symbol_info in data['symbols']:
if symbol_info['status'] == 'TRADING' and symbol_info['symbol'].endswith('USDT'):
symbol = symbol_info['symbol']
full_symbols.append(symbol)
# Log the full symbols list obtained from exchange if rate_check_response.status_code == 200:
self.log.info(f"Full symbols discovered from exchange: {len(full_symbols)} total") # Parse rate limit headers
self.log.info(f"First 10 symbols: {full_symbols[:10]}") headers = rate_check_response.headers
used_weight = headers.get('X-MBX-USED-WEIGHT-1M', 'Unknown')
server_time = rate_check_response.json().get('serverTime', 'Unknown')
# STEP 2: Apply throttle mode symbol limiting BEFORE processing self.log.info(f"Rate limit check passed - Used weight: {used_weight}/1200, Server time: {server_time}")
if self.throttle_mode:
symbols_to_process = full_symbols[:self.max_symbols_throttled] # Check if we're close to rate limit
self.log.warning(f"THROTTLE MODE: Will process {len(symbols_to_process)} symbols (from {len(full_symbols)} available)") if used_weight != 'Unknown' and int(used_weight) > 1000:
self.log.warning(f"HIGH RATE LIMIT USAGE: {used_weight}/1200 - Proceeding with caution")
elif rate_check_response.status_code == 429:
retry_after = rate_check_response.headers.get('Retry-After', '60')
self.log.error(f"RATE LIMITED: Must wait {retry_after} seconds before API calls")
raise Exception(f"Binance API rate limited - retry after {retry_after}s")
elif rate_check_response.status_code == 418:
self.log.error("IP BANNED: This IP address has been auto-banned by Binance")
raise Exception("IP address banned by Binance - cannot proceed")
else:
self.log.warning(f"Rate limit check returned status {rate_check_response.status_code}")
self.log.warning("Proceeding anyway, but may encounter issues")
except Exception as e:
if "rate limited" in str(e).lower() or "banned" in str(e).lower():
raise # Re-raise rate limit/ban errors
else:
self.log.warning(f"Rate limit check failed: {e}")
self.log.warning("Proceeding with symbol discovery anyway")
async with httpx.AsyncClient() as client:
self.log.info("Fetching exchange info from Binance API...")
response = await client.get(url, timeout=10)
if response.status_code == 200:
self.log.info("Successfully received exchange info")
data = response.json()
# Combined symbol discovery and tick size extraction
self.log.info("Processing symbols and extracting tick sizes...")
full_symbols = []
for symbol_info in data['symbols']:
if symbol_info['status'] == 'TRADING' and symbol_info['symbol'].endswith('USDT'):
symbol = symbol_info['symbol']
full_symbols.append(symbol)
# Extract tick size while processing # Extract tick size while processing
tick_size = None
for filter_info in symbol_info['filters']:
if filter_info['filterType'] == 'PRICE_FILTER':
tick_size = float(filter_info['tickSize'])
break
# If no PRICE_FILTER found, try other filter types
if tick_size is None:
for filter_info in symbol_info['filters']:
if filter_info['filterType'] == 'TICK_SIZE':
tick_size = float(filter_info['tickSize'])
break
# Fallback to default if still not found
if tick_size is None:
tick_size = 1e-8 # Default fallback
self.log.warning(f"No tick size found for {symbol}, using fallback {tick_size}")
self.tick_sizes[symbol] = tick_size
self.log.info(f"Processed {len(full_symbols)} symbols, extracted {len(self.tick_sizes)} tick sizes")
# Apply throttle mode symbol limiting
if self.throttle_mode:
self.symbols = full_symbols[:self.max_symbols_throttled]
self.log.warning(f"THROTTLE MODE: Limited to {len(self.symbols)} symbols (from {len(full_symbols)} available)")
else:
self.symbols = full_symbols
self.log.info(f"Discovered {len(self.symbols)} trading symbols")
self.log.info(f"First 10 symbols: {self.symbols[:10]}")
else: else:
symbols_to_process = full_symbols self.log.error(f"Failed to fetch exchange info: {response.status_code}")
raise Exception(f"Failed to fetch exchange info: {response.status_code}")
# STEP 3: Process tick sizes only for selected symbols
self.log.info("Processing symbols and extracting tick sizes...")
symbols_to_process_set = set(symbols_to_process) # For O(1) lookup
for symbol_info in data['symbols']:
symbol = symbol_info['symbol']
# Only process symbols that are in our selected list
if symbol not in symbols_to_process_set:
continue
# Extract tick size while processing
tick_size = None
for filter_info in symbol_info['filters']:
if filter_info['filterType'] == 'PRICE_FILTER':
tick_size = float(filter_info['tickSize'])
break
# If no PRICE_FILTER found, try other filter types
if tick_size is None:
for filter_info in symbol_info['filters']:
if filter_info['filterType'] == 'TICK_SIZE':
tick_size = float(filter_info['tickSize'])
break
# Fallback to default if still not found
if tick_size is None:
tick_size = 1e-8 # Default fallback
self.log.warning(f"No tick size found for {symbol}, using fallback {tick_size}")
self.tick_sizes[symbol] = tick_size
# Set final symbols list
self.symbols = symbols_to_process
self.log.info(f"Processed {len(symbols_to_process)} symbols, extracted {len(self.tick_sizes)} tick sizes")
self.log.info(f"Discovered {len(self.symbols)} trading symbols")
self.log.info(f"First 10 symbols: {self.symbols[:10]}")
else:
self.log.error(f"Failed to fetch exchange info: {response.status_code}")
raise Exception(f"Failed to fetch exchange info: {response.status_code}")
async def _fetch_stats_and_reconstruct_candles(self): async def _fetch_stats_and_reconstruct_candles(self):
"""PRESERVED: All original rate limiting with Nautilus async patterns""" """PRESERVED: All original rate limiting with Nautilus async patterns"""
@@ -1446,7 +1470,7 @@ def test_siloqy_actors_with_nautilus_process_management():
"candle_interval_ms": 15 * 60 * 1000, "candle_interval_ms": 15 * 60 * 1000,
"throttle_mode": True, # ENABLED: Safe for dual instance testing "throttle_mode": True, # ENABLED: Safe for dual instance testing
"throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s) "throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s)
"max_symbols_throttled": 400 # Only 100 symbols (vs 2000+) "max_symbols_throttled": 414 # Only 100 symbols (vs 2000+)
} }
) )

File diff suppressed because it is too large Load Diff