Files
DOLPHIN/prod/certify_final_20m.py

160 lines
6.1 KiB
Python
Raw Permalink Normal View History

import os
import sys
import time
import json
import asyncio
import logging
import threading
import numpy as np
from datetime import datetime, timezone
from pathlib import Path
# Correct sys.path
ROOT_DIR = Path(__file__).parent.parent
sys.path.insert(0, str(ROOT_DIR / "nautilus_dolphin"))
sys.path.insert(0, str(ROOT_DIR))
# Use production components
from external_factors.ob_stream_service import OBStreamService
from nautilus_dolphin.nautilus.hz_ob_provider import HZOBProvider
from nautilus_dolphin.nautilus.ob_features import OBFeatureEngine
import psutil
# Configuration
TEST_DURATION_SEC = 1200 # 20 minutes
ASSETS = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT", "XRPUSDT"]
POLL_INTERVAL = 0.1 # 100ms (System Bible v4.1 Target)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger("GoldCert")
class LiveGoldCertifier:
def __init__(self):
self.streamer = OBStreamService(assets=ASSETS)
self.provider = HZOBProvider(assets=ASSETS)
self.engine = OBFeatureEngine(self.provider)
self.latencies = []
self.e2e_latencies = []
self.mem_logs = []
self.running = True
async def run_streamer(self):
logger.info("Starting Live Streamer...")
try:
await self.streamer.stream()
except Exception as e:
logger.error(f"Streamer Error: {e}")
async def push_to_hz(self):
"""Bridge between Streamer and Hazelcast (since streamer is a service)."""
import hazelcast
client = hazelcast.HazelcastClient(cluster_name="dolphin", cluster_members=["dolphin.taile8ad92.ts.net:5701"])
hz_map = client.get_map("DOLPHIN_FEATURES")
logger.info("HZ Bridge Active (ASYNC PUSH).")
while self.running:
for asset in ASSETS:
snap = await self.streamer.get_depth_buckets(asset)
if snap:
# snap is a dict with np.arrays
# HZOBProvider expects a JSON string or dict that it can parse
# We'll push the raw dict (json serializable except arrays)
serializable = snap.copy()
serializable["bid_notional"] = snap["bid_notional"].tolist()
serializable["ask_notional"] = snap["ask_notional"].tolist()
serializable["bid_depth"] = snap["bid_depth"].tolist()
serializable["ask_depth"] = snap["ask_depth"].tolist()
hz_map.put(f"asset_{asset}_ob", json.dumps(serializable))
await asyncio.sleep(0.1) # 10Hz HZ update
async def certify_loop(self):
logger.info(f"GOLD CERTIFICATION STARTING: {TEST_DURATION_SEC}s duration.")
start_wall = time.time()
process = psutil.Process(os.getpid())
mem_start = process.memory_info().rss / (1024*1024)
bar_idx = 0
while time.time() - start_wall < TEST_DURATION_SEC:
loop_start = time.perf_counter()
# Step 1: Engine processes live data from HZ
self.engine.step_live(ASSETS, bar_idx)
# Step 2: Measure Lateny
# E2E: Loop Finish - Snap Creation Time
# Snap Latency: Now - snap["timestamp"]
for asset in ASSETS:
snap = self.provider.get_snapshot(asset, time.time())
if snap:
snap_lat = (time.time() - snap.timestamp) * 1000
self.latencies.append(snap_lat)
loop_end = time.perf_counter()
self.e2e_latencies.append((loop_end - loop_start) * 1000)
if bar_idx % 1000 == 0:
mem_now = process.memory_info().rss / (1024*1024)
self.mem_logs.append(mem_now)
logger.info(f"[CERT] T+{int(time.time()-start_wall)}s | Bar {bar_idx} | Mem: {mem_now:.2f}MB | Lat: {np.mean(self.latencies[-10:] if self.latencies else [0]):.2f}ms")
bar_idx += 1
await asyncio.sleep(POLL_INTERVAL)
self.running = False
duration = time.time() - start_wall
mem_end = process.memory_info().rss / (1024*1024)
print("\n" + "="*80)
print("GOLD FINAL LIVE CERTIFICATION COMPLETED")
print("="*80)
print(f"Wall Clock Duration: {duration:.2f}s")
print(f"Total Bars Processed: {bar_idx}")
print(f"Mean Snap Latency: {np.mean(self.latencies):.2f}ms")
print(f"P99 Snap Latency: {np.percentile(self.latencies, 99):.2f}ms")
print(f"Mean Engine E2E: {np.mean(self.e2e_latencies):.4f}ms")
print(f"Memory Progression: {self.mem_logs[0]:.2f} -> {self.mem_logs[-1]:.2f} MB (Delta: {mem_end - mem_start:.2f}MB)")
print(f"Rate Limit Status: {self.streamer.rate_limits}")
print("="*80)
if mem_end - mem_start > 300:
print("VULNERABILITY: Significant memory growth detected.")
else:
print("STABILITY: Memory profile confirmed safe.")
# Verify Integrity (random check)
logger.info("Verifying Data Integrity...")
for asset in ASSETS:
snap_hz = self.provider.get_snapshot(asset, time.time())
snap_raw = await self.streamer.get_depth_buckets(asset)
if snap_hz and snap_raw:
if abs(snap_hz.timestamp - snap_raw["timestamp"]) < 1.0:
logger.info(f"Integrity {asset}: OK")
else:
logger.warning(f"Integrity {asset}: Timestamp mismatch!")
def start(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Start streamer, HZ bridge, and certification loop
tasks = [
self.run_streamer(),
self.push_to_hz(),
self.certify_loop()
]
try:
loop.run_until_complete(asyncio.gather(*tasks))
except KeyboardInterrupt:
self.running = False
finally:
loop.close()
if __name__ == "__main__":
cert = LiveGoldCertifier()
cert.start()