import os import sys import time import json import asyncio import logging import threading import numpy as np from datetime import datetime, timezone from pathlib import Path # Correct sys.path ROOT_DIR = Path(__file__).parent.parent sys.path.insert(0, str(ROOT_DIR / "nautilus_dolphin")) sys.path.insert(0, str(ROOT_DIR)) # Use production components from external_factors.ob_stream_service import OBStreamService from nautilus_dolphin.nautilus.hz_ob_provider import HZOBProvider from nautilus_dolphin.nautilus.ob_features import OBFeatureEngine import psutil # Configuration TEST_DURATION_SEC = 1200 # 20 minutes ASSETS = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT", "XRPUSDT"] POLL_INTERVAL = 0.1 # 100ms (System Bible v4.1 Target) logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s' ) logger = logging.getLogger("GoldCert") class LiveGoldCertifier: def __init__(self): self.streamer = OBStreamService(assets=ASSETS) self.provider = HZOBProvider(assets=ASSETS) self.engine = OBFeatureEngine(self.provider) self.latencies = [] self.e2e_latencies = [] self.mem_logs = [] self.running = True async def run_streamer(self): logger.info("Starting Live Streamer...") try: await self.streamer.stream() except Exception as e: logger.error(f"Streamer Error: {e}") async def push_to_hz(self): """Bridge between Streamer and Hazelcast (since streamer is a service).""" import hazelcast client = hazelcast.HazelcastClient(cluster_name="dolphin", cluster_members=["dolphin.taile8ad92.ts.net:5701"]) hz_map = client.get_map("DOLPHIN_FEATURES") logger.info("HZ Bridge Active (ASYNC PUSH).") while self.running: for asset in ASSETS: snap = await self.streamer.get_depth_buckets(asset) if snap: # snap is a dict with np.arrays # HZOBProvider expects a JSON string or dict that it can parse # We'll push the raw dict (json serializable except arrays) serializable = snap.copy() serializable["bid_notional"] = snap["bid_notional"].tolist() serializable["ask_notional"] = snap["ask_notional"].tolist() serializable["bid_depth"] = snap["bid_depth"].tolist() serializable["ask_depth"] = snap["ask_depth"].tolist() hz_map.put(f"asset_{asset}_ob", json.dumps(serializable)) await asyncio.sleep(0.1) # 10Hz HZ update async def certify_loop(self): logger.info(f"GOLD CERTIFICATION STARTING: {TEST_DURATION_SEC}s duration.") start_wall = time.time() process = psutil.Process(os.getpid()) mem_start = process.memory_info().rss / (1024*1024) bar_idx = 0 while time.time() - start_wall < TEST_DURATION_SEC: loop_start = time.perf_counter() # Step 1: Engine processes live data from HZ self.engine.step_live(ASSETS, bar_idx) # Step 2: Measure Lateny # E2E: Loop Finish - Snap Creation Time # Snap Latency: Now - snap["timestamp"] for asset in ASSETS: snap = self.provider.get_snapshot(asset, time.time()) if snap: snap_lat = (time.time() - snap.timestamp) * 1000 self.latencies.append(snap_lat) loop_end = time.perf_counter() self.e2e_latencies.append((loop_end - loop_start) * 1000) if bar_idx % 1000 == 0: mem_now = process.memory_info().rss / (1024*1024) self.mem_logs.append(mem_now) logger.info(f"[CERT] T+{int(time.time()-start_wall)}s | Bar {bar_idx} | Mem: {mem_now:.2f}MB | Lat: {np.mean(self.latencies[-10:] if self.latencies else [0]):.2f}ms") bar_idx += 1 await asyncio.sleep(POLL_INTERVAL) self.running = False duration = time.time() - start_wall mem_end = process.memory_info().rss / (1024*1024) print("\n" + "="*80) print("GOLD FINAL LIVE CERTIFICATION COMPLETED") print("="*80) print(f"Wall Clock Duration: {duration:.2f}s") print(f"Total Bars Processed: {bar_idx}") print(f"Mean Snap Latency: {np.mean(self.latencies):.2f}ms") print(f"P99 Snap Latency: {np.percentile(self.latencies, 99):.2f}ms") print(f"Mean Engine E2E: {np.mean(self.e2e_latencies):.4f}ms") print(f"Memory Progression: {self.mem_logs[0]:.2f} -> {self.mem_logs[-1]:.2f} MB (Delta: {mem_end - mem_start:.2f}MB)") print(f"Rate Limit Status: {self.streamer.rate_limits}") print("="*80) if mem_end - mem_start > 300: print("VULNERABILITY: Significant memory growth detected.") else: print("STABILITY: Memory profile confirmed safe.") # Verify Integrity (random check) logger.info("Verifying Data Integrity...") for asset in ASSETS: snap_hz = self.provider.get_snapshot(asset, time.time()) snap_raw = await self.streamer.get_depth_buckets(asset) if snap_hz and snap_raw: if abs(snap_hz.timestamp - snap_raw["timestamp"]) < 1.0: logger.info(f"Integrity {asset}: OK") else: logger.warning(f"Integrity {asset}: Timestamp mismatch!") def start(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Start streamer, HZ bridge, and certification loop tasks = [ self.run_streamer(), self.push_to_hz(), self.certify_loop() ] try: loop.run_until_complete(asyncio.gather(*tasks)) except KeyboardInterrupt: self.running = False finally: loop.close() if __name__ == "__main__": cert = LiveGoldCertifier() cert.start()