160 lines
6.1 KiB
Python
160 lines
6.1 KiB
Python
|
|
import os
|
||
|
|
import sys
|
||
|
|
import time
|
||
|
|
import json
|
||
|
|
import asyncio
|
||
|
|
import logging
|
||
|
|
import threading
|
||
|
|
import numpy as np
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
# Correct sys.path
|
||
|
|
ROOT_DIR = Path(__file__).parent.parent
|
||
|
|
sys.path.insert(0, str(ROOT_DIR / "nautilus_dolphin"))
|
||
|
|
sys.path.insert(0, str(ROOT_DIR))
|
||
|
|
|
||
|
|
# Use production components
|
||
|
|
from external_factors.ob_stream_service import OBStreamService
|
||
|
|
from nautilus_dolphin.nautilus.hz_ob_provider import HZOBProvider
|
||
|
|
from nautilus_dolphin.nautilus.ob_features import OBFeatureEngine
|
||
|
|
import psutil
|
||
|
|
|
||
|
|
# Configuration
|
||
|
|
TEST_DURATION_SEC = 1200 # 20 minutes
|
||
|
|
ASSETS = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT", "XRPUSDT"]
|
||
|
|
POLL_INTERVAL = 0.1 # 100ms (System Bible v4.1 Target)
|
||
|
|
|
||
|
|
logging.basicConfig(
|
||
|
|
level=logging.INFO,
|
||
|
|
format='%(asctime)s [%(levelname)s] %(message)s'
|
||
|
|
)
|
||
|
|
logger = logging.getLogger("GoldCert")
|
||
|
|
|
||
|
|
class LiveGoldCertifier:
|
||
|
|
def __init__(self):
|
||
|
|
self.streamer = OBStreamService(assets=ASSETS)
|
||
|
|
self.provider = HZOBProvider(assets=ASSETS)
|
||
|
|
self.engine = OBFeatureEngine(self.provider)
|
||
|
|
self.latencies = []
|
||
|
|
self.e2e_latencies = []
|
||
|
|
self.mem_logs = []
|
||
|
|
self.running = True
|
||
|
|
|
||
|
|
async def run_streamer(self):
|
||
|
|
logger.info("Starting Live Streamer...")
|
||
|
|
try:
|
||
|
|
await self.streamer.stream()
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Streamer Error: {e}")
|
||
|
|
|
||
|
|
async def push_to_hz(self):
|
||
|
|
"""Bridge between Streamer and Hazelcast (since streamer is a service)."""
|
||
|
|
import hazelcast
|
||
|
|
client = hazelcast.HazelcastClient(cluster_name="dolphin", cluster_members=["dolphin.taile8ad92.ts.net:5701"])
|
||
|
|
hz_map = client.get_map("DOLPHIN_FEATURES")
|
||
|
|
|
||
|
|
logger.info("HZ Bridge Active (ASYNC PUSH).")
|
||
|
|
while self.running:
|
||
|
|
for asset in ASSETS:
|
||
|
|
snap = await self.streamer.get_depth_buckets(asset)
|
||
|
|
if snap:
|
||
|
|
# snap is a dict with np.arrays
|
||
|
|
# HZOBProvider expects a JSON string or dict that it can parse
|
||
|
|
# We'll push the raw dict (json serializable except arrays)
|
||
|
|
serializable = snap.copy()
|
||
|
|
serializable["bid_notional"] = snap["bid_notional"].tolist()
|
||
|
|
serializable["ask_notional"] = snap["ask_notional"].tolist()
|
||
|
|
serializable["bid_depth"] = snap["bid_depth"].tolist()
|
||
|
|
serializable["ask_depth"] = snap["ask_depth"].tolist()
|
||
|
|
|
||
|
|
hz_map.put(f"asset_{asset}_ob", json.dumps(serializable))
|
||
|
|
await asyncio.sleep(0.1) # 10Hz HZ update
|
||
|
|
|
||
|
|
async def certify_loop(self):
|
||
|
|
logger.info(f"GOLD CERTIFICATION STARTING: {TEST_DURATION_SEC}s duration.")
|
||
|
|
start_wall = time.time()
|
||
|
|
process = psutil.Process(os.getpid())
|
||
|
|
mem_start = process.memory_info().rss / (1024*1024)
|
||
|
|
|
||
|
|
bar_idx = 0
|
||
|
|
while time.time() - start_wall < TEST_DURATION_SEC:
|
||
|
|
loop_start = time.perf_counter()
|
||
|
|
|
||
|
|
# Step 1: Engine processes live data from HZ
|
||
|
|
self.engine.step_live(ASSETS, bar_idx)
|
||
|
|
|
||
|
|
# Step 2: Measure Lateny
|
||
|
|
# E2E: Loop Finish - Snap Creation Time
|
||
|
|
# Snap Latency: Now - snap["timestamp"]
|
||
|
|
for asset in ASSETS:
|
||
|
|
snap = self.provider.get_snapshot(asset, time.time())
|
||
|
|
if snap:
|
||
|
|
snap_lat = (time.time() - snap.timestamp) * 1000
|
||
|
|
self.latencies.append(snap_lat)
|
||
|
|
|
||
|
|
loop_end = time.perf_counter()
|
||
|
|
self.e2e_latencies.append((loop_end - loop_start) * 1000)
|
||
|
|
|
||
|
|
if bar_idx % 1000 == 0:
|
||
|
|
mem_now = process.memory_info().rss / (1024*1024)
|
||
|
|
self.mem_logs.append(mem_now)
|
||
|
|
logger.info(f"[CERT] T+{int(time.time()-start_wall)}s | Bar {bar_idx} | Mem: {mem_now:.2f}MB | Lat: {np.mean(self.latencies[-10:] if self.latencies else [0]):.2f}ms")
|
||
|
|
|
||
|
|
bar_idx += 1
|
||
|
|
await asyncio.sleep(POLL_INTERVAL)
|
||
|
|
|
||
|
|
self.running = False
|
||
|
|
duration = time.time() - start_wall
|
||
|
|
mem_end = process.memory_info().rss / (1024*1024)
|
||
|
|
|
||
|
|
print("\n" + "="*80)
|
||
|
|
print("GOLD FINAL LIVE CERTIFICATION COMPLETED")
|
||
|
|
print("="*80)
|
||
|
|
print(f"Wall Clock Duration: {duration:.2f}s")
|
||
|
|
print(f"Total Bars Processed: {bar_idx}")
|
||
|
|
print(f"Mean Snap Latency: {np.mean(self.latencies):.2f}ms")
|
||
|
|
print(f"P99 Snap Latency: {np.percentile(self.latencies, 99):.2f}ms")
|
||
|
|
print(f"Mean Engine E2E: {np.mean(self.e2e_latencies):.4f}ms")
|
||
|
|
print(f"Memory Progression: {self.mem_logs[0]:.2f} -> {self.mem_logs[-1]:.2f} MB (Delta: {mem_end - mem_start:.2f}MB)")
|
||
|
|
print(f"Rate Limit Status: {self.streamer.rate_limits}")
|
||
|
|
print("="*80)
|
||
|
|
|
||
|
|
if mem_end - mem_start > 300:
|
||
|
|
print("VULNERABILITY: Significant memory growth detected.")
|
||
|
|
else:
|
||
|
|
print("STABILITY: Memory profile confirmed safe.")
|
||
|
|
|
||
|
|
# Verify Integrity (random check)
|
||
|
|
logger.info("Verifying Data Integrity...")
|
||
|
|
for asset in ASSETS:
|
||
|
|
snap_hz = self.provider.get_snapshot(asset, time.time())
|
||
|
|
snap_raw = await self.streamer.get_depth_buckets(asset)
|
||
|
|
if snap_hz and snap_raw:
|
||
|
|
if abs(snap_hz.timestamp - snap_raw["timestamp"]) < 1.0:
|
||
|
|
logger.info(f"Integrity {asset}: OK")
|
||
|
|
else:
|
||
|
|
logger.warning(f"Integrity {asset}: Timestamp mismatch!")
|
||
|
|
|
||
|
|
def start(self):
|
||
|
|
loop = asyncio.new_event_loop()
|
||
|
|
asyncio.set_event_loop(loop)
|
||
|
|
|
||
|
|
# Start streamer, HZ bridge, and certification loop
|
||
|
|
tasks = [
|
||
|
|
self.run_streamer(),
|
||
|
|
self.push_to_hz(),
|
||
|
|
self.certify_loop()
|
||
|
|
]
|
||
|
|
|
||
|
|
try:
|
||
|
|
loop.run_until_complete(asyncio.gather(*tasks))
|
||
|
|
except KeyboardInterrupt:
|
||
|
|
self.running = False
|
||
|
|
finally:
|
||
|
|
loop.close()
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
cert = LiveGoldCertifier()
|
||
|
|
cert.start()
|