Files
DOLPHIN/prod/test_async_hazards.py
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

122 lines
3.9 KiB
Python
Executable File

import os
import sys
import time
import json
import random
import threading
import concurrent.futures
import numpy as np
from pathlib import Path
# Correct sys.path
ROOT_DIR = Path(__file__).parent.parent
sys.path.insert(0, str(ROOT_DIR / "nautilus_dolphin"))
sys.path.insert(0, str(ROOT_DIR))
from nautilus_dolphin.nautilus.hz_ob_provider import HZOBProvider
from nautilus_dolphin.nautilus.ob_features import OBFeatureEngine
from nautilus_dolphin.nautilus.ob_provider import OBSnapshot
# Mocking Hazelcast to control race timings
class MockIMap:
def __init__(self):
self.data = {}
self.listener = None
def put(self, key, val):
self.data[key] = val
if self.listener:
# Simulate async callback delay
class Event: pass
ev = Event()
ev.key = key
ev.value = val
self.listener(ev)
def get(self, key): return self.data.get(key)
def entry_set(self): return self.data.items()
def blocking(self): return self
def add_entry_listener(self, **kwargs):
self.listener = kwargs.get('updated_func') or kwargs.get('added_func')
class MockHZClient:
def __init__(self, imap): self._imap = imap
def get_map(self, name): return self._imap
def shutdown(self): pass
class AsyncHazardTester:
def __init__(self):
self.imap = MockIMap()
self.client = MockHZClient(self.imap)
# Patch HZOBProvider to use our Mock Client
self.provider = HZOBProvider(assets=["BTCUSDT"])
self.provider._client = self.client
self.provider._imap = self.imap
# Manually register mock listener logic
self.imap.add_entry_listener(updated_func=self.provider._on_entry_updated)
self.engine = OBFeatureEngine(self.provider)
self.stop_signal = False
def writer_thread(self):
"""Hammers the map with new snapshots."""
i = 0
while not self.stop_signal:
snap = {
"timestamp": time.time(),
"asset": "BTCUSDT",
"bid_notional": [100.0 * i] * 5,
"ask_notional": [105.0 * i] * 5,
"bid_depth": [1.0] * 5,
"ask_depth": [1.0] * 5
}
self.imap.put("asset_BTCUSDT_ob", json.dumps(snap))
i += 1
if i % 100 == 0: time.sleep(0.001)
def reader_thread(self):
"""Hammers the engine step_live calling the provider."""
j = 0
errors = 0
while not self.stop_signal:
try:
self.engine.step_live(["BTCUSDT"], j)
j += 1
except Exception as e:
print(f"RACE DETECTED/ERROR: {e}")
errors += 1
if j % 500 == 0: time.sleep(0.001)
return errors
def fuzz_thread(self):
"""Injects malformed JSON occasionally."""
while not self.stop_signal:
time.sleep(random.uniform(0.01, 0.05))
bad_data = random.choice([
"{'invalid': json}",
"[]",
"null",
'{"timestamp": "BAD_DATE"}'
])
self.imap.put("asset_BTCUSDT_ob", bad_data)
def run_collision_test(self, duration=30):
print(f"STARTING ASYNC COLLISION TEST ({duration}s)...")
self.stop_signal = False
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
f_write = executor.submit(self.writer_thread)
f_read = executor.submit(self.reader_thread)
f_fuzz = executor.submit(self.fuzz_thread)
time.sleep(duration)
self.stop_signal = True
errors = f_read.result()
print(f"COLLISION TEST FINISHED. Errors: {errors}")
return errors == 0
if __name__ == "__main__":
tester = AsyncHazardTester()
success = tester.run_collision_test(duration=20)
sys.exit(0 if success else 1)