From 0d70c767e41ce43c3047acfce45d2977e156ae74 Mon Sep 17 00:00:00 2001 From: Codex Date: Fri, 8 May 2026 21:16:53 +0200 Subject: [PATCH] Wire long-capable prod alpha path --- prod/acb_processor_service.py | 200 ++ prod/docs/SYSTEM_BIBLE_v7.md | 5 + prod/launch_dolphin_live.py | 296 +++ prod/nautilus_event_trader.py | 2010 +++++++++++++++++++++ prod/paper_trade_flow.py | 658 +++++++ prod/tests/test_long_capability_layers.py | 194 ++ 6 files changed, 3363 insertions(+) create mode 100644 prod/acb_processor_service.py create mode 100644 prod/launch_dolphin_live.py create mode 100644 prod/nautilus_event_trader.py create mode 100644 prod/paper_trade_flow.py create mode 100644 prod/tests/test_long_capability_layers.py diff --git a/prod/acb_processor_service.py b/prod/acb_processor_service.py new file mode 100644 index 0000000..a9ce7d4 --- /dev/null +++ b/prod/acb_processor_service.py @@ -0,0 +1,200 @@ +""" +MIG6.1 & MIG6.2: ACB Processor Service +Watches for new scan arrivals and atomically computes/writes ACB boost +to the Hazelcast DOLPHIN_FEATURES map using CP Subsystem lock for atomicity. +""" + +import sys +import time +import json +import logging +from pathlib import Path +from datetime import datetime +import hazelcast + +HCM_DIR = Path(__file__).parent.parent + +# Use platform-independent paths from dolphin_paths +sys.path.insert(0, str(HCM_DIR)) +sys.path.insert(0, str(HCM_DIR / 'prod')) +from dolphin_paths import get_eigenvalues_path + +SCANS_DIR = get_eigenvalues_path() + +sys.path.insert(0, str(HCM_DIR / 'nautilus_dolphin')) +from nautilus_dolphin.nautilus.adaptive_circuit_breaker import AdaptiveCircuitBreaker + +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s:%(message)s') + +class ACBProcessorService: + def __init__(self, hz_cluster="dolphin", hz_host="localhost:5701"): + try: + self.hz_client = hazelcast.HazelcastClient( + cluster_name=hz_cluster, + cluster_members=[hz_host] + ) + self.imap = self.hz_client.get_map("DOLPHIN_FEATURES").blocking() + + # Using CP Subsystem lock as per MIG6.1 + self.lock = self.hz_client.cp_subsystem.get_lock("acb_update_lock").blocking() + except Exception as e: + logging.error(f"Failed to connect to Hazelcast: {e}") + raise + + self.acb = AdaptiveCircuitBreaker() + self.acb.config.EIGENVALUES_PATH = SCANS_DIR # CRITICAL: override Windows default for Linux + self.acb.preload_w750(self._get_recent_dates(60)) + self.last_scan_count = 0 + self.last_date = None + + def _get_recent_dates(self, n=60): + try: + dirs = sorted([d.name for d in SCANS_DIR.iterdir() if d.is_dir() and len(d.name)==10]) + return dirs[-n:] + except Exception: + return [] + + def get_today_str(self): + return datetime.utcnow().strftime('%Y-%m-%d') + + def check_new_scans(self, date_str): + today_dir = SCANS_DIR / date_str + if not today_dir.exists(): + return False + + json_files = list(today_dir.glob("scan_*.json")) + count = len(json_files) + + if self.last_date != date_str: + self.last_date = date_str + self.last_scan_count = 0 + # Preload updated dates when day rolls over + self.acb.preload_w750(self._get_recent_dates(60)) + + if count > self.last_scan_count: + self.last_scan_count = count + return True + + return False + + def process_and_write(self, date_str): + """Compute ACB boost and write to HZ acb_boost. + + Preference order: + 1. HZ exf_latest — live, pre-lagged values (preferred, ~0.5 s latency) + 2. NPZ disk scan — fallback when HZ data absent or stale (>12 h) + """ + try: + boost_info = None + long_boost_info = None + + # ── HZ path (preferred) ──────────────────────────────────────────── + try: + exf_raw = self.imap.get('exf_latest') + if exf_raw: + exf_snapshot = json.loads(exf_raw) + scan_raw = self.imap.get('latest_eigen_scan') + w750_live = None + if scan_raw: + scan_data = json.loads(scan_raw) + w750_live = scan_data.get('w750_velocity') + boost_info = self.acb.get_dynamic_boost_from_hz( + date_str, exf_snapshot, w750_velocity=w750_live, direction=-1 + ) + long_boost_info = self.acb.get_dynamic_boost_from_hz( + date_str, exf_snapshot, w750_velocity=w750_live, direction=1 + ) + logging.debug( + f"ACB computed from HZ: short={boost_info['boost']:.4f} " + f"long={long_boost_info['boost']:.4f}" + ) + except ValueError as ve: + logging.warning(f"ACB HZ snapshot stale: {ve} — falling back to NPZ") + boost_info = None + except Exception as e: + logging.warning(f"ACB HZ read failed: {e} — falling back to NPZ") + boost_info = None + + # ── NPZ fallback ─────────────────────────────────────────────────── + if boost_info is None: + boost_info = self.acb.get_dynamic_boost_for_date(date_str, direction=-1) + long_boost_info = self.acb.get_dynamic_boost_for_date(date_str, direction=1) + logging.debug( + f"ACB computed from NPZ: short={boost_info['boost']:.4f} " + f"long={long_boost_info['boost']:.4f}" + ) + + payload = json.dumps(boost_info) + long_payload = json.dumps(long_boost_info or boost_info) + + # Atomic Write via CP Subsystem Lock + self.lock.lock() + try: + # Legacy key remains SHORT for BLUE/PRODGREEN compatibility. + self.imap.put("acb_boost", payload) + self.imap.put("acb_boost_short", payload) + self.imap.put("acb_boost_long", long_payload) + logging.info( + f"acb_boost updated (src={boost_info.get('source','npz')}): " + f"short={boost_info['boost']:.4f}/{boost_info['signals']:.1f}sig " + f"long={(long_boost_info or {}).get('boost', 0.0):.4f}/" + f"{(long_boost_info or {}).get('signals', 0.0):.1f}sig" + ) + try: + from ch_writer import ch_put, ts_us as _ts + ch_put("acb_state", { + "ts": _ts(), + "boost": float(boost_info.get("boost", 0)), + "beta": float(boost_info.get("beta", 0)), + "signals": float(boost_info.get("signals", 0)), + }) + except Exception: + pass + finally: + self.lock.unlock() + + except Exception as e: + logging.error(f"Error processing ACB: {e}") + + def run(self, poll_interval=1.0, hz_refresh_interval=30.0): + """Main service loop. + + Two update triggers: + 1. New scan files arrive for today → compute from HZ (preferred) or NPZ. + 2. hz_refresh_interval elapsed → re-push acb_boost from live exf_latest + even when no new scans exist (covers live-only operation days when + scan files land in a different directory or not at all). + """ + logging.info("Starting ACB Processor Service (Python CP Subsystem)...") + today = self.get_today_str() + # Write immediately on startup so acb_boost is populated from the first second + logging.info(f"Startup write for {today}") + self.process_and_write(today) + last_hz_refresh = time.monotonic() + + while True: + try: + today = self.get_today_str() + now = time.monotonic() + + # Trigger 1: new scan files + if self.check_new_scans(today): + self.process_and_write(today) + last_hz_refresh = now + + # Trigger 2: periodic HZ refresh (ensures acb_boost stays current + # even on days with no new NPZ scan files) + elif (now - last_hz_refresh) >= hz_refresh_interval: + self.process_and_write(today) + last_hz_refresh = now + + time.sleep(poll_interval) + except KeyboardInterrupt: + break + except Exception as e: + logging.error(f"Loop error: {e}") + time.sleep(5.0) + +if __name__ == "__main__": + service = ACBProcessorService() + service.run() diff --git a/prod/docs/SYSTEM_BIBLE_v7.md b/prod/docs/SYSTEM_BIBLE_v7.md index 47a961f..0d4cdf5 100644 --- a/prod/docs/SYSTEM_BIBLE_v7.md +++ b/prod/docs/SYSTEM_BIBLE_v7.md @@ -3386,4 +3386,9 @@ The canonical vel_div is `v50_lambda_max_velocity − v750_lambda_max_velocity` > 2026-05-08 lowered post-win-threshold addendum: the post-win overlay is stronger than the original narrow sample implied, but only when conditioned on realized exhaustion. Dollar-only thresholds below about `$300` are harmful. With a prior-return filter (`pnl_pct >= +0.75%`), lower thresholds become useful: e.g. `$100-$150` prior wins produced `63-67` immediate next-trade cases with about `+$2.4k` marginal delta and positive flipped-LONG PnL. High-leverage `$300-$500` wins support a next-`2`-trade rebound/cooldown signal (`+$2.7k` to `+$3.7k` marginal delta). The edge is payoff-asymmetry / loss-tail avoidance, not WR improvement, and should be researched as a guarded next-1/next-2 overlay or abstain gate. > 2026-05-08 post-win EFSM implementation addendum: the candidate BLUE overlay is now the post-win **EFSM** (**Execution FSM**) at `adaptive_exit/post_win_long_overlay.py` with tests in `prod/tests/test_post_win_long_overlay.py`. Canonical class names are `PostWinExecutionFSM` and `PostWinExecutionFSMConfig` (`PostWinLongOverlay` names remain compatibility aliases). Codified rule: `pnl_abs > $397` arms next `1` FLIP_LONG slot; `pnl_abs > $397 and lev > 8.6` arms next `2`; `0 < pnl_abs < $250 and pnl_pct >= +0.75%` arms next `1`; consumed slots reset to SHORT. Active slots cannot re-arm and overlay-flipped LONG outcomes cannot re-arm. This reset invariant is mandatory: unsafe recursive re-arm replay turned `+$1.51k` marginal delta into `-$5.43k`. V7 is side-aware but SHORT-calibrated; validate LONG overlay exits in shadow or with conservative LONG-specific settings before live use. > 2026-05-08 AlphaExitEngineV7 LONG calibration addendum: V7 threshold/gate constants are now surfaced as `AlphaExitV7Config` in `nautilus_dolphin/nautilus_dolphin/nautilus/alpha_exit_v7_engine.py`. Default `AlphaExitEngineV7()` remains the deployed SHORT-calibrated surface: `exit_pressure_threshold=2.69`, `retract_pressure_threshold=1.0`, `extend_pressure_threshold=-0.5`, vol-normalized MAE tiers `max(floor, k * rv_comp)` with `k=(3.5,7.0,12.0)` and floors `(0.005,0.012,0.025)`, and bounce soft weights `(0.15,0.35)`. A separate LONG engine can now be initialized with a different `AlphaExitV7Config` without mutating BLUE SHORT defaults. Synthetic LONG replay over BLUE V7 journal paths (`97` paths, `6,812` rows, bounce disabled because the current bounce model is SHORT-trained) found natural LONG PnL `-$328.84`; deployed default V7 improved this to `+$1.43` (`+$330.26` delta); best tested LONG proxy was reducing MFE-risk contributions by half while keeping pressure threshold `2.69`, yielding `+$205.32` (`+$534.15` delta), `36/97` exits, and `1.69%` max DD. Do not deploy this live from proxy alone; first shadow it on actual EFSM-flipped LONG contexts. Detailed method/results: `prod/docs/LONG_DETERMINISTIC_RULE_RESEARCH.md`. +> 2026-05-08 LONG-capability addendum: BLUE and PRODGREEN Alpha Engine code paths are now LONG-capable without changing the default deployed side. `short_only` remains the default everywhere. LONG is activated only by explicit config/env direction (`long`, `long_only`, `buy`, `1`, `+1`). The signal generator exposes configurable LONG thresholds (`long_vel_div_threshold=+0.01`, `long_vel_div_extreme=+0.04`) and keeps the canonical SHORT thresholds (`-0.02`, `-0.05`). `NDAlphaEngine.begin_day(..., direction=+1)` now propagates LONG semantics into signal gating, DC interpretation, IRP expected action, sizing, PnL, exit price slippage, and ACB meta-strength. The sizing trend multiplier is side-aware: negative `vel_div` trend remains favorable for SHORT; positive `vel_div` trend is favorable for LONG. +> 2026-05-08 ACBv6 side-awareness addendum: ACBv6 remains SHORT/risk-off by default and preserves the legacy cache key for default calls. LONG/risk-on ACB is opt-in via `direction=+1` and uses separate cache entries (`date|long`) so HZ prewarm cannot pollute SHORT BLUE state. SHORT signals are unchanged: bearish funding, high DVOL, fear, and taker selling. LONG signals are explicit and separate: positive funding, calm DVOL, greed/risk appetite, and taker buying. OB beta modulation is also side-aware: stress/cascade raises beta for SHORT and reduces it for LONG; calm/liquidity-building raises beta for LONG and reduces it for SHORT. +> 2026-05-08 ACB HZ keying addendum: `prod/acb_processor_service.py` now publishes `acb_boost` and `acb_boost_short` as the legacy SHORT payload and `acb_boost_long` as the LONG/risk-on payload. BLUE continues to use `acb_boost` unless explicitly run with `DOLPHIN_DIRECTION=long_only`, in which case its local prewarm calls ACB with `direction=+1`. PRODGREEN's Nautilus actor subscribes to `acb_boost_long` when its config direction is LONG, otherwise to legacy `acb_boost`. +> 2026-05-08 LONG exit-layer addendum: base TP/SL/max-hold exits were already direction-aware through signed PnL. Optional `AlphaExitManager` vel_div invalidation/exhaustion and TF-spread recovery exits are now side-aware too. They remain disabled by default unless explicitly enabled, but if enabled for a LONG invocation they now treat falling/negative `vel_div` and adverse TF-spread recovery as LONG invalidation rather than applying hidden SHORT semantics. +> 2026-05-08 validation addendum: targeted regression after LONG-capability wiring passed `prod/tests/test_long_capability_layers.py` (`9 passed`), existing ACB HZ + V7 + EFSM suites (`57 passed`), and ACB signal-threshold integrity (`11 passed`). Compile checks passed for modified Alpha/ACB/live runner files. These tests verify SHORT default preservation, explicit LONG entries, side-separated ACB caches, side-aware OB beta modulation, side-aware optional VD exits, and case-insensitive PRODGREEN direction parsing. > The retrieval spec is documented in `prod/docs/ASSET_FINGERPRINT_CANDIDATE_SYSTEM.md`. diff --git a/prod/launch_dolphin_live.py b/prod/launch_dolphin_live.py new file mode 100644 index 0000000..a5b48f1 --- /dev/null +++ b/prod/launch_dolphin_live.py @@ -0,0 +1,296 @@ +#!/usr/bin/env python3 +""" +Dolphin Live Node — DolphinActor inside NT TradingNode +======================================================= +Phase 1: paper_trading=True (live Binance Futures data, paper fills). + Validates signal parity with nautilus_event_trader.py before live exec. + +To go live (Phase 2): set paper_trading=False in build_node(). +""" +import os +import sys +import asyncio +from copy import deepcopy +from pathlib import Path + +PROJECT_ROOT = Path(__file__).parent.parent +sys.path.insert(0, str(PROJECT_ROOT / 'nautilus_dolphin')) +sys.path.insert(0, str(PROJECT_ROOT / 'prod')) +sys.path.insert(0, str(PROJECT_ROOT)) + +from dotenv import load_dotenv +load_dotenv(PROJECT_ROOT / '.env') + +from nautilus_trader.live.node import TradingNode +from nautilus_trader.config import TradingNodeConfig, LiveDataEngineConfig, CacheConfig +from nautilus_trader.adapters.binance.config import BinanceDataClientConfig +from nautilus_trader.adapters.binance.common.enums import BinanceAccountType +from nautilus_trader.adapters.binance.factories import BinanceLiveDataClientFactory +from nautilus_trader.model.identifiers import TraderId + +from prod.bingx.config import BingxExecClientConfig +from prod.bingx.data_config import BingxDataClientConfig +from prod.bingx.enums import BingxEnvironment +from prod.bingx.data_factories import BingxLiveDataClientFactory +from prod.bingx.factories import BingxLiveExecClientFactory + +# Nautilus changed this enum name across versions. +_BINANCE_USDT_FUTURES_ACCOUNT_TYPE = getattr( + BinanceAccountType, + "USDT_FUTURES", + getattr(BinanceAccountType, "USDT_FUTURE", None), +) +if _BINANCE_USDT_FUTURES_ACCOUNT_TYPE is None: + raise AttributeError("BinanceAccountType is missing both USDT_FUTURES and USDT_FUTURE") + +# --------------------------------------------------------------------------- +# Universe — 50 OBF assets. Subscribed for live quote cache (order sizing). +# Must cover the full eigen universe so _exec_submit_entry finds live quotes. +# --------------------------------------------------------------------------- +LIVE_ASSETS = [ + "BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT", "XRPUSDT", + "ADAUSDT", "DOGEUSDT", "TRXUSDT", "DOTUSDT", "MATICUSDT", + "LTCUSDT", "AVAXUSDT", "LINKUSDT", "UNIUSDT", "ATOMUSDT", + "ETCUSDT", "XLMUSDT", "BCHUSDT", "NEARUSDT", "ALGOUSDT", + "VETUSDT", "FILUSDT", "APTUSDT", "OPUSDT", "ARBUSDT", + "INJUSDT", "SUIUSDT", "SEIUSDT", "TIAUSDT", "ORDIUSDT", + "WLDUSDT", "FETUSDT", "AGIXUSDT", "RENDERUSDT", "IOTAUSDT", + "AAVEUSDT", "SNXUSDT", "CRVUSDT", "COMPUSDT", "MKRUSDT", + "ENJUSDT", "MANAUSDT", "SANDUSDT", "AXSUSDT", "GALAUSDT", + "ZECUSDT", "DASHUSDT", "XMRUSDT", "NEOUSDT", "QTUMUSDT", +] + +# --------------------------------------------------------------------------- +# DolphinActor config — gold-standard params, must match nautilus_event_trader +# --------------------------------------------------------------------------- +DOLPHIN_CONFIG = { + 'live_mode': True, + 'venue': 'BINANCE', + 'data_venue': 'BINANCE', + 'exec_venue': 'BINANCE', + 'direction': 'short_only', + 'hazelcast': { + 'host': '127.0.0.1:5701', + 'cluster': 'dolphin', + 'state_map': 'DOLPHIN_STATE_PRODGREEN', + 'imap_pnl': 'DOLPHIN_PNL_PRODGREEN', + }, + 'paper_trade': {'initial_capital': 25000.0}, + 'assets': LIVE_ASSETS, + 'engine': { + 'boost_mode': 'd_liq', + # Signal + 'vel_div_threshold': -0.020, + 'vel_div_extreme': -0.050, + # Leverage — gold spec: 8x soft / 9x hard + 'min_leverage': 0.5, + 'max_leverage': 8.0, + 'abs_max_leverage': 9.0, + 'leverage_convexity': 3.0, + 'fraction': 0.20, + 'max_account_leverage': 3.0, + # Exits — gold spec: 250 bars max hold + 'fixed_tp_pct': 0.0095, + 'stop_pct': 1.0, + 'max_hold_bars': 250, + # Direction confirm + 'use_direction_confirm': True, + 'dc_lookback_bars': 7, + 'dc_min_magnitude_bps': 0.75, + 'dc_skip_contradicts': True, + 'dc_leverage_boost': 1.0, + 'dc_leverage_reduce': 0.5, + # Asset selection — gold spec: IRP filter disabled in live + 'use_asset_selection': True, + 'min_irp_alignment': 0.0, + 'asset_selector_lookback': 10, + # Fees / slippage + 'use_sp_fees': True, + 'use_sp_slippage': True, + 'sp_maker_entry_rate': 0.62, + 'sp_maker_exit_rate': 0.50, + # OB edge + 'use_ob_edge': True, + 'ob_edge_bps': 5.0, + 'ob_confirm_rate': 0.40, + 'ob_imbalance_bias': -0.09, + 'ob_depth_scale': 1.0, + # Alpha layers + 'lookback': 100, + 'use_alpha_layers': True, + 'use_dynamic_leverage': True, + 'seed': 42, + # V7 RT exit engine (GREEN only) + 'use_exit_v7': True, + 'use_exit_v6': False, + 'v6_bar_duration_sec': 5.0, + 'bounce_model_path': str(PROJECT_ROOT / 'prod' / 'models' / 'bounce_detector_v3.pkl'), + }, + 'strategy_name': 'prodgreen', + 'vol_p60': 0.00009868, +} + + +def _env_upper(name: str, default: str = "") -> str: + return str(os.environ.get(name, default)).strip().upper() + + +def _env_bool(name: str, default: bool = False) -> bool: + raw = str(os.environ.get(name, str(default))).strip().lower() + return raw in ("1", "true", "yes", "on") + + +def _resolve_bingx_environment(value: str | None = None) -> BingxEnvironment: + name = str(value or os.environ.get("DOLPHIN_BINGX_ENV", "VST")).strip().upper() + return BingxEnvironment.LIVE if name == "LIVE" else BingxEnvironment.VST + + +def _resolve_bingx_allow_mainnet(value: str | None = None) -> bool: + if isinstance(value, bool): + return value + raw = str(value or os.environ.get("DOLPHIN_BINGX_ALLOW_MAINNET", "0")).strip().lower() + return raw in ("1", "true", "yes", "on") + + +def _resolve_bingx_recv_window_ms(value: str | None = None) -> int: + raw = str(value or os.environ.get("DOLPHIN_BINGX_RECV_WINDOW_MS", "")).strip() + try: + parsed = int(raw) + return parsed if parsed > 0 else 5_000 + except (TypeError, ValueError): + return 5_000 + + +def build_actor_config( + *, + data_venue: str | None = None, + exec_venue: str | None = None, +) -> dict: + actor_cfg = deepcopy(DOLPHIN_CONFIG) + resolved_data_venue = (data_venue or _env_upper("DOLPHIN_DATA_VENUE", actor_cfg["data_venue"])).upper() + resolved_exec_venue = (exec_venue or _env_upper("DOLPHIN_EXEC_VENUE", actor_cfg["exec_venue"])).upper() + actor_cfg["data_venue"] = resolved_data_venue + actor_cfg["exec_venue"] = resolved_exec_venue + actor_cfg["venue"] = resolved_exec_venue + actor_cfg["direction"] = os.environ.get("DOLPHIN_DIRECTION", actor_cfg.get("direction", "short_only")) + return actor_cfg + + +def build_bingx_exec_client_config( + *, + resolved_bingx_env: BingxEnvironment, + resolved_bingx_allow_mainnet: bool, + resolved_bingx_recv_window_ms: int | None, + assets: list[str] | None = None, +) -> BingxExecClientConfig: + from prod.bingx.config import BingxInstrumentProviderConfig + default_leverage = int(os.environ.get("DOLPHIN_BINGX_DEFAULT_LEVERAGE", "1")) + symbol_filters = tuple(assets) if assets else None + return BingxExecClientConfig( + api_key=os.environ.get("BINGX_API_KEY"), + secret_key=os.environ.get("BINGX_SECRET_KEY"), + environment=resolved_bingx_env, + allow_mainnet=resolved_bingx_allow_mainnet, + recv_window_ms=resolved_bingx_recv_window_ms if resolved_bingx_recv_window_ms is not None else 5_000, + default_leverage=default_leverage, + leverage_by_symbol={symbol: default_leverage for symbol in (assets or [])} if assets else None, + prefer_websocket=_env_bool("DOLPHIN_BINGX_PREFER_WEBSOCKET", True), + instrument_provider=BingxInstrumentProviderConfig( + load_all=True, + symbol_filters=symbol_filters, + ), + ) + + +def build_node( + *, + data_venue: str | None = None, + exec_venue: str | None = None, + trader_id: str | None = None, + bingx_environment: BingxEnvironment | None = None, + bingx_allow_mainnet: bool | None = None, + bingx_recv_window_ms: int | None = None, +) -> TradingNode: + resolved_bingx_env = bingx_environment or _resolve_bingx_environment() + resolved_bingx_allow_mainnet = ( + bingx_allow_mainnet if bingx_allow_mainnet is not None else _resolve_bingx_allow_mainnet() + ) + resolved_bingx_recv_window_ms = ( + bingx_recv_window_ms if bingx_recv_window_ms is not None else _resolve_bingx_recv_window_ms() + ) + if resolved_bingx_env is BingxEnvironment.LIVE and not resolved_bingx_allow_mainnet: + raise RuntimeError( + "BingX LIVE requested but DOLPHIN_BINGX_ALLOW_MAINNET is not enabled" + ) + + actor_cfg = build_actor_config(data_venue=data_venue, exec_venue=exec_venue) + actor_cfg["bingx_environment"] = str(resolved_bingx_env.value) + resolved_data_venue = actor_cfg["data_venue"] + resolved_exec_venue = actor_cfg["exec_venue"] + + data_clients = {} + exec_clients = {} + + if resolved_data_venue == "BINANCE": + api_key = os.environ["BINANCE_API_KEY"] + api_secret = os.environ["BINANCE_API_SECRET"] + data_clients["BINANCE"] = BinanceDataClientConfig( + account_type=_BINANCE_USDT_FUTURES_ACCOUNT_TYPE, + api_key=api_key, + api_secret=api_secret, + testnet=False, + ) + elif resolved_data_venue == "BINGX": + from prod.bingx.config import BingxInstrumentProviderConfig + data_clients["BINGX"] = BingxDataClientConfig( + environment=resolved_bingx_env, + allow_mainnet=resolved_bingx_allow_mainnet, + instrument_provider=BingxInstrumentProviderConfig( + load_all=True, + symbol_filters=tuple(actor_cfg.get("assets", [])), + ), + ) + else: + raise ValueError(f"Unsupported data venue: {resolved_data_venue}") + + if resolved_exec_venue == "BINGX": + exec_clients["BINGX"] = build_bingx_exec_client_config( + resolved_bingx_env=resolved_bingx_env, + resolved_bingx_allow_mainnet=resolved_bingx_allow_mainnet, + resolved_bingx_recv_window_ms=resolved_bingx_recv_window_ms, + assets=actor_cfg.get("assets"), + ) + + trader_id_value = trader_id or os.environ.get("DOLPHIN_TRADER_ID", "DOLPHIN-LIVE-001") + + from nautilus_dolphin.nautilus.dolphin_actor import DolphinActor + actor = DolphinActor(config=actor_cfg) + + node_config = TradingNodeConfig( + trader_id=TraderId(trader_id_value), + data_clients=data_clients, + exec_clients=exec_clients if exec_clients else None, + data_engine=LiveDataEngineConfig(time_bars_build_with_no_updates=False), + cache=CacheConfig(database=None), + ) + node = TradingNode(config=node_config) + + if "BINANCE" in data_clients: + node.add_data_client_factory("BINANCE", BinanceLiveDataClientFactory) + if "BINGX" in data_clients: + node.add_data_client_factory("BINGX", BingxLiveDataClientFactory) + if "BINGX" in exec_clients: + node.add_exec_client_factory("BINGX", BingxLiveExecClientFactory) + + node.trader.add_strategy(actor) + node.build() + return node + + +async def run() -> None: + node = build_node() + await node.run_async() + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/prod/nautilus_event_trader.py b/prod/nautilus_event_trader.py new file mode 100644 index 0000000..4dcbc08 --- /dev/null +++ b/prod/nautilus_event_trader.py @@ -0,0 +1,2010 @@ +#!/usr/bin/env python3 +""" +DOLPHIN Nautilus Event-Driven Trader +""" +import sys +import json +import math +import os +import time +import signal +import threading +import urllib.request +from typing import Optional +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone +from pathlib import Path +from collections import deque + +# Stablecoins / pegged assets that must never be traded +_STABLECOIN_SYMBOLS = frozenset({ + 'USDCUSDT', 'BUSDUSDT', 'FDUSDUSDT', 'USDTUSDT', 'TUSDUSDT', + 'DAIUSDT', 'FRAXUSDT', 'USDDUSDT', 'USTCUSDT', 'EURUSDT', +}) + +sys.path.insert(0, '/mnt/dolphinng5_predict') +sys.path.insert(0, '/mnt/dolphinng5_predict/nautilus_dolphin') + +from nautilus_dolphin.nautilus.proxy_boost_engine import create_d_liq_engine +from nautilus_dolphin.nautilus.esf_alpha_orchestrator import NDPosition +from nautilus_dolphin.nautilus.adaptive_circuit_breaker import AdaptiveCircuitBreaker +from nautilus_dolphin.nautilus.ob_features import OBFeatureEngine +from nautilus_dolphin.nautilus.ob_provider import MockOBProvider +from nautilus_dolphin.nautilus.esof_size_gate import parse_esof_payload, esof_gate_from_payload +try: + from adaptive_exit.market_state_runtime import MarketStateRuntime +except Exception: + MarketStateRuntime = None +try: + from adaptive_exit.sc_threshold_advisor import SCThresholdAdvisor +except Exception: + SCThresholdAdvisor = None +try: + from adaptive_exit.sc_gauge_advisor import SCGaugeAdvisor, build_obf_snapshot_from_engine +except Exception: + SCGaugeAdvisor = None + build_obf_snapshot_from_engine = None +try: + from adaptive_exit.bounce_advisor import BounceAdvisor +except Exception: + BounceAdvisor = None +try: + from nautilus_dolphin.nautilus.alpha_exit_v7_engine import AlphaExitEngineV7, TradeContextV7 +except Exception: + AlphaExitEngineV7 = None + TradeContextV7 = None + +BLUE_CH_DB = "dolphin" + +try: + from prod.ch_writer import ch_put, ts_us as _ch_ts_us +except ImportError: + def ch_put(*a, **kw): pass + def _ch_ts_us(): return 0 + +try: + from announcement_router import build_announcement_center +except ImportError: + from prod.announcement_router import build_announcement_center + +sys.path.insert(0, '/mnt/dolphinng5_predict/prod') +from dolphin_exit_handler import install_exit_handler +install_exit_handler("nautilus_trader") + +HZ_CLUSTER = "dolphin" +HZ_HOST = "127.0.0.1:5701" +EIGEN_DIR = Path('/mnt/dolphinng6_data/eigenvalues') + +CAPITAL_DISK_CHECKPOINT = Path("/tmp/dolphin_capital_checkpoint.json") +ANNOUNCEMENT_CONFIG = Path("/mnt/dolphinng5_predict/prod/configs/position_notifications_blue.json") +ANNOUNCEMENT_RUNTIME_ENV = Path("/mnt/dolphin_training/observability_notifications_blue.runtime.json") + +ENGINE_KWARGS = dict( + initial_capital=25000.0, vel_div_threshold=-0.02, vel_div_extreme=-0.05, + min_leverage=0.5, max_leverage=8.0, # note: create_d_liq_engine overrides to D_LIQ_SOFT_CAP=8.0 + leverage_convexity=3.0, + fraction=0.20, fixed_tp_pct=0.0095, stop_pct=1.0, max_hold_bars=250, # gold spec: 250 + use_direction_confirm=True, dc_lookback_bars=7, dc_min_magnitude_bps=0.75, + dc_skip_contradicts=True, dc_leverage_boost=1.0, dc_leverage_reduce=0.5, + use_asset_selection=True, min_irp_alignment=0.0, # gold spec: no IRP filter + use_sp_fees=True, use_sp_slippage=True, + sp_maker_entry_rate=0.62, sp_maker_exit_rate=0.50, + use_ob_edge=True, ob_edge_bps=5.0, ob_confirm_rate=0.40, + lookback=100, use_alpha_layers=True, use_dynamic_leverage=True, seed=42, + allow_subday_acb_exit=False, +) + + +def _env_bool(name: str, default: bool) -> bool: + raw = os.environ.get(name) + if raw is None: + return default + return str(raw).strip().lower() in {"1", "true", "yes", "on"} + + +def _direction_from_env(value: Optional[str] = None) -> int: + raw = os.environ.get("DOLPHIN_DIRECTION", "short_only") if value is None else value + text = str(raw or "short_only").strip().lower() + if text in {"short", "short_only", "sell", "-1"}: + return -1 + if text in {"long", "long_only", "buy", "+1", "1"}: + return 1 + raise ValueError( + f"Unsupported DOLPHIN_DIRECTION={raw!r}; use short_only or long_only" + ) + + +def _direction_label(direction: int) -> str: + return "LONG" if int(direction) == 1 else "SHORT" + + +def _safe_float(value, default: float = 0.0) -> float: + try: + out = float(value) + except (TypeError, ValueError): + return default + return out if math.isfinite(out) else default + + +def _flatten_env_payload(payload, prefix: str = "") -> dict: + flat = {} + if not isinstance(payload, dict): + return flat + for key, value in payload.items(): + if not isinstance(key, str) or not key.strip(): + continue + full_key = f"{prefix}_{key}" if prefix else key + if isinstance(value, dict): + flat.update(_flatten_env_payload(value, full_key)) + else: + flat[full_key.upper()] = value + return flat + + +def _seed_runtime_env(path: Path) -> None: + if not path.exists(): + return + try: + payload = json.loads(path.read_text()) + except Exception: + return + for key, value in _flatten_env_payload(payload).items(): + if key not in os.environ and value not in (None, "", "__CHANGE_ME__", "__REPLACE_ME__"): + os.environ[key] = str(value) + +BTC_VOL_WINDOW = 50 + +# Per-bucket SL % used when HIBERNATE fires while a position is open. +# Instead of immediate HIBERNATE_HALT, we arm TP (existing fixed_tp_pct) + +# a per-bucket stop-loss so the position exits cleanly rather than being +# force-closed at whatever price the halt fires at. +# Values derived from AE shadow data + bucket trade analysis (2026-04-19). +# B3 wide: shadow shows mae_norm 5-5.1 before FIXED_TP; 3.5×ATR fires on noise. +# B4 tight: 34.8% WR, 0.80 R:R — cut fast, no recovery value. +# B6 widest: extreme vol (vol_daily_pct 760-864); normal ATR excursions are large. +_BUCKET_SL_PCT: dict = { + 0: 0.015, # Low-vol high-corr nano-cap + 1: 0.012, # Med-vol low-corr mid-price (XRP/XLM class) + 2: 0.015, # Mega-cap BTC/ETH — default (not traded) + 3: 0.025, # High-vol mid-corr STAR bucket (ENJ/ADA/DOGE) — needs room + 4: 0.008, # Worst bucket (BNB/LTC/LINK) — cut fast + 5: 0.018, # High-vol low-corr micro-price (ATOM/TRX class) + 6: 0.030, # Extreme-vol mid-corr (FET/ZRX) — widest + 'default': 0.015, +} +# Gold-calibrated from full 5-year BTC history: 0.00026414 (stricter, ~2.7x tighter). +# 2026-04-07: switched to 56-day gold window value (0.00009868) — the exact threshold +# used in the T=2155 ROI=+189% backtest. More permissive; paper trading to gather data. +VOL_P60_THRESHOLD = 0.00009868 + +# Algorithm Versioning +# v1_shakedown: v50-v150 (noise bug), loose vol gate +# v2_gold_fix: CORRECTED v50-v750 macro divergence (matches parquet backtest) +ALGO_VERSION = "v2_gold_fix_v50-v750" + +# Persistent, version-tagged trade log (survives reboots; sorts by date) +_LOG_DIR = "/mnt/dolphinng5_predict/prod/logs" +os.makedirs(_LOG_DIR, exist_ok=True) +_LOG_DATE = datetime.now(timezone.utc).strftime("%Y%m%d") +TRADE_LOG = f"{_LOG_DIR}/nautilus_trader_{_LOG_DATE}_{ALGO_VERSION}.log" +running = True + +def log(msg): + ts = datetime.now(timezone.utc).isoformat() + line = f"[{ts}] {msg}" + print(line, flush=True) + with open(TRADE_LOG, 'a') as f: + f.write(line + '\n') + +class DolphinLiveTrader: + def __init__(self): + self.eng = None + self.hz_client = None + self.features_map = None + self.safety_map = None + self.pnl_map = None + self.state_map = None + self.heartbeat_map = None + self.eng_lock = threading.Lock() + self._dedup_lock = threading.Lock() # guards atomic check-and-set on last_scan_number + self._scan_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="scan") + self.last_scan_number = -1 + self.last_file_mtime = 0 + self.bar_idx = 0 + self.current_day = None + self.trades_executed = 0 + self.scans_processed = 0 + self.btc_prices = deque(maxlen=BTC_VOL_WINDOW + 2) + self.cached_posture = "APEX" + self.posture_cache_time = 0 + self.ob_assets = [] + self.ob_eng = None + self.acb = None + self.last_w750_vel = None + self._pending_entries: dict = {} # trade_id → entry snapshot (for CH trade_events) + self._last_exf: dict = {} + self._exf_log_time = 0.0 # throttle for on_exf_update logging + self._ae = None # AdaptiveExitEngine shadow (parallel, never real exits) + self._v7_exit_engine = None # AlphaExitEngineV7 live BLUE exit control + journal + self._v7_contexts: dict = {} # trade_id → TradeContextV7 + self._v7_decisions: dict = {} # trade_id → latest v7 decision + self._v7_decision_seq: dict = {} # trade_id → monotonic eval sequence + self._v7_journal_enabled: bool = _env_bool("DOLPHIN_ENABLE_V7_JOURNAL", True) + self._v7_journal_db: str = BLUE_CH_DB + self._v7_journal_table: str = "v7_decision_events" + self._v7_live_exit_enabled: bool = False + self._sc_advisor = None # SC threshold advisor (shadow-only) + self._sc_advisor_last_log = 0.0 + self._sc_gauge = None # SC bucket gauge advisor (shadow-only) + self._sc_gauge_last_log = 0.0 + self._bounce_advisor = None # inverse-ARS bounce advisor (shadow-only) + self._bounce_advisor_last_log = 0.0 + self._bounce_price_history: dict[str, deque] = {} + self._market_state_runtime = MarketStateRuntime() if MarketStateRuntime is not None else None + self._hibernate_protect_active: str | None = None # trade_id being protected + self._bucket_assignments: dict = {} # asset → KMeans bucket_id (loaded from pkl) + self._last_esof_size_mult: float = 1.0 + self.trade_direction: int = _direction_from_env() + self._trade_announcement_center = None + _seed_runtime_env(ANNOUNCEMENT_RUNTIME_ENV) + if ANNOUNCEMENT_CONFIG.exists(): + try: + self._trade_announcement_center = build_announcement_center( + ANNOUNCEMENT_CONFIG, + hz_getter=self._get_hz, + logger=None, + ) + log(" Position announcements: loaded") + except Exception as e: + log(f" Position announcements: {e}") + self._trade_announcement_center = None + + def _build_engine(self): + log("Building NDAlphaEngine...") + engine_kwargs = dict(ENGINE_KWARGS) + engine_kwargs["allow_subday_acb_exit"] = _env_bool( + "DOLPHIN_ALLOW_ACB_SUBDAY_EXIT", + bool(engine_kwargs.get("allow_subday_acb_exit", False)), + ) + self.eng = create_d_liq_engine(**engine_kwargs) + log(f" Engine: {type(self.eng).__name__}") + log(f" Direction: {_direction_label(self.trade_direction)} ({self.trade_direction:+d})") + log(f" ACB subday exits: {'ON' if engine_kwargs['allow_subday_acb_exit'] else 'OFF'}") + log(f" Leverage: soft={self.eng.base_max_leverage}x abs={self.eng.abs_max_leverage}x") + + if EIGEN_DIR.exists(): + try: + date_strings = sorted([d.name for d in EIGEN_DIR.iterdir() if d.is_dir()]) + self.acb = AdaptiveCircuitBreaker() + self.acb.preload_w750(date_strings) + self.eng.set_acb(self.acb) + log(" ACBv6: loaded") + except Exception as e: + log(f" ACBv6: {e}") + else: + self.acb = AdaptiveCircuitBreaker() + self.eng.set_acb(self.acb) + log(" ACBv6: loaded (no preload dates)") + + self.eng.set_esoteric_hazard_multiplier(0.0) # gold spec: init guard, MUST precede set_mc_forewarner + log(f" Hazard: set_esoteric_hazard_multiplier(0.0) — soft={self.eng.base_max_leverage}x") + + MC_MODELS_DIR = '/mnt/dolphinng5_predict/nautilus_dolphin/mc_results/models' + MC_BASE_CFG = { + 'trial_id': 0, 'vel_div_threshold': -0.020, 'vel_div_extreme': -0.050, + 'use_direction_confirm': True, 'dc_lookback_bars': 7, + 'dc_min_magnitude_bps': 0.75, 'dc_skip_contradicts': True, + 'dc_leverage_boost': 1.00, 'dc_leverage_reduce': 0.50, + 'vd_trend_lookback': 10, 'min_leverage': 0.50, 'max_leverage': 8.00, # gold spec + 'leverage_convexity': 3.00, 'fraction': 0.20, 'use_alpha_layers': True, + 'use_dynamic_leverage': True, 'fixed_tp_pct': 0.0095, 'stop_pct': 1.00, + 'max_hold_bars': 250, 'use_sp_fees': True, 'use_sp_slippage': True, # gold spec + 'sp_maker_entry_rate': 0.62, 'sp_maker_exit_rate': 0.50, + 'use_ob_edge': True, 'ob_edge_bps': 5.00, 'ob_confirm_rate': 0.40, + 'ob_imbalance_bias': -0.09, 'ob_depth_scale': 1.00, + 'use_asset_selection': True, 'min_irp_alignment': 0.0, + 'asset_selector_lookback': 10, 'lookback': 100, # gold spec + 'acb_beta_high': 0.80, 'acb_beta_low': 0.20, 'acb_w750_threshold_pct': 60, + } + if Path(MC_MODELS_DIR).exists(): + try: + from mc.mc_ml import DolphinForewarner + forewarner = DolphinForewarner(models_dir=MC_MODELS_DIR) + self.eng.set_mc_forewarner(forewarner, MC_BASE_CFG) + log(" MC-Forewarner: wired") + except Exception as e: + log(f" MC-Forewarner: {e}") + + try: + from adaptive_exit.adaptive_exit_engine import AdaptiveExitEngine + self._ae = AdaptiveExitEngine.load() + log(" AdaptiveExitEngine: loaded (shadow mode — no real exits)") + except Exception as e: + log(f" AdaptiveExitEngine: {e} — shadow disabled") + + if AlphaExitEngineV7 is not None and self._v7_journal_enabled: + try: + self._v7_exit_engine = AlphaExitEngineV7(bar_duration_sec=11.0) + self._ensure_v7_journal_table() + log(" AlphaExitEngineV7: loaded (live BLUE exit control + journal)") + except Exception as e: + log(f" AlphaExitEngineV7: {e} — shadow disabled") + self._v7_exit_engine = None + self._v7_live_exit_enabled = self._v7_exit_engine is not None + if self.eng is not None: + self.eng.exit_decision_provider = self._v7_live_exit_decision if self._v7_live_exit_enabled else None + + self._load_bucket_assignments() + + if SCThresholdAdvisor is not None: + try: + self._sc_advisor = SCThresholdAdvisor.load( + strategy="blue", + shadow_db=BLUE_CH_DB, + ) + log(" SCThresholdAdvisor: loaded (shadow mode — no sizing changes)") + except Exception as e: + log(f" SCThresholdAdvisor: {e} — shadow disabled") + self._sc_advisor = None + + if SCGaugeAdvisor is not None: + try: + self._sc_gauge = SCGaugeAdvisor.load( + strategy="blue", + shadow_db=BLUE_CH_DB, + ) + log(" SCGaugeAdvisor: loaded (shadow mode — no sizing changes)") + except Exception as e: + log(f" SCGaugeAdvisor: {e} — shadow disabled") + self._sc_gauge = None + + if BounceAdvisor is not None: + try: + self._bounce_advisor = BounceAdvisor.load( + strategy="blue", + shadow_db=BLUE_CH_DB, + ) + log(" BounceAdvisor: loaded (shadow mode — no execution changes)") + except Exception as e: + log(f" BounceAdvisor: {e} — shadow disabled") + self._bounce_advisor = None + + def _load_bucket_assignments(self): + """Load KMeans asset→bucket_id mapping for hibernate protection SL levels.""" + try: + import pickle + pkl_path = Path('/mnt/dolphinng5_predict/adaptive_exit/models/bucket_assignments.pkl') + with open(pkl_path, 'rb') as f: + data = pickle.load(f) + self._bucket_assignments = data.get('assignments', {}) + log(f" BucketAssignments: {len(self._bucket_assignments)} assets loaded for hibernate protection") + except Exception as e: + log(f" BucketAssignments: {e} — hibernate protect will use default SL={_BUCKET_SL_PCT['default']*100:.1f}%") + + def _announce_position_event( + self, + *, + kind: str, + severity: str, + title: str, + message: str, + metadata: dict | None = None, + ) -> None: + center = getattr(self, "_trade_announcement_center", None) + if center is None: + return + try: + center.note_event( + kind=kind, + severity=severity, + title=title, + message=message, + metadata=metadata or {}, + ) + except Exception as e: + log(f" Position announcement failed: {e}") + + def _read_esof_payload(self) -> dict | None: + """Read the freshest EsoF advisory payload from HZ, if available.""" + if not self.features_map: + return None + for key in ("esof_latest", "esof_advisor_latest"): + try: + raw = self.features_map.blocking().get(key) + except Exception: + continue + payload = parse_esof_payload(raw) + if payload: + return payload + return None + + def _sync_esof_size_gate(self) -> None: + """Update the shared engine with the current continuous EsoF size multiplier.""" + payload = self._read_esof_payload() + score, mult = esof_gate_from_payload(payload) + with self.eng_lock: + if hasattr(self.eng, "set_esof_advisory_score"): + self.eng.set_esof_advisory_score(score) + if mult != self._last_esof_size_mult: + self._last_esof_size_mult = mult + if score is None: + log(f"EsoF size gate: neutral mult={mult:.2f} (no fresh score)") + else: + log(f"EsoF size gate: sc={score:+.3f} mult={mult:.2f}") + + def _sync_sc_threshold_advisor(self, scan_number: int, vel_div: float) -> None: + """Shadow-only advisory layer for tracking / future threshold learning.""" + if self._sc_advisor is None: + return + try: + payload = self._read_esof_payload() + trade_history = getattr(self.eng, "trade_history", []) + open_tid = next(iter(self._pending_entries.keys()), "") + pending = self._pending_entries.get(open_tid, {}) if open_tid else {} + rec = self._sc_advisor.evaluate( + trade_id=str(open_tid or ""), + asset=str(pending.get("asset", "")), + sc=_safe_float(payload.get("advisory_score", payload.get("score", 0.0)) if payload else None), + vel_div=float(vel_div or 0.0), + exf_snapshot=getattr(self, "_last_exf", {}) or {}, + trade_history=trade_history, + current_mult=float(self._last_esof_size_mult or 1.0), + esof_payload=payload, + scan_number=int(scan_number or 0), + bar_idx=int(self.bar_idx), + strategy="blue", + log_shadow=True, + ) + if open_tid: + pending["sc_threshold_advisor"] = rec + pending["sc_exec_mult"] = float(self._last_esof_size_mult or 1.0) + self._pending_entries[open_tid] = pending + now = time.time() + if now - self._sc_advisor_last_log >= 300: + self._sc_advisor_last_log = now + log( + f"SC_ADVISOR: sc={rec['sc']:+.3f} cur={rec['current_mult']:.2f} " + f"rec={rec['recommended_mult']:.2f} cut={rec['recommended_sc_cut']:+.2f} " + f"conf={rec['confidence']:.2f} src={rec['decision_source']}" + ) + except Exception as e: + log(f"SC_ADVISOR error: {e}") + + def _current_obf_snapshot(self, asset: str, bar_idx: int) -> dict[str, dict]: + if build_obf_snapshot_from_engine is None or self.ob_eng is None or not asset: + return {} + try: + return build_obf_snapshot_from_engine(self.ob_eng, asset, bar_idx) + except Exception: + return {} + + def _record_bounce_prices(self, prices_dict: dict[str, float]) -> None: + """Maintain rolling price histories for the bounce advisor.""" + if not prices_dict: + return + for asset, px in prices_dict.items(): + try: + price = float(px) + except Exception: + continue + if not math.isfinite(price) or price <= 0.0: + continue + hist = self._bounce_price_history.get(asset) + if hist is None: + hist = deque(maxlen=512) + self._bounce_price_history[asset] = hist + hist.append(price) + + def _bounce_price_path(self, asset: str) -> list[float]: + hist = self._bounce_price_history.get(asset) + if not hist: + return [] + return [float(px) for px in hist if math.isfinite(float(px))] + + def _bounce_eval( + self, + *, + trade_id: str, + asset: str, + side: str, + source: str, + scan_number: int, + entry_ts: datetime | None, + current_price: float, + entry_price: float, + quantity: float, + notional: float, + leverage: float, + vel_div: float, + current_mult: float, + bars_held: int, + log_shadow: bool = True, + ) -> dict | None: + """Evaluate the bounce advisor on a rolling price path and persist the row.""" + if self._bounce_advisor is None or not trade_id or not asset: + return None + price_path = self._bounce_price_path(asset) + if len(price_path) < 3: + return None + rec = self._bounce_advisor.evaluate( + trade_id=str(trade_id), + asset=str(asset), + side=str(side or "SHORT"), + price_path=price_path, + entry_ts=entry_ts or datetime.now(timezone.utc), + entry_price=float(entry_price or 0.0), + current_price=float(current_price or 0.0), + quantity=float(quantity or 0.0), + notional=float(notional or 0.0), + leverage=float(leverage or 0.0), + current_mult=float(current_mult or 1.0), + vel_div=float(vel_div or 0.0), + scan_number=int(scan_number or 0), + bar_idx=int(self.bar_idx), + bars_held=int(max(0, bars_held)), + source=str(source or "entry"), + obf_snapshot=self._current_obf_snapshot(asset, self.bar_idx), + log_shadow=log_shadow, + use_ta=True, + use_obf=True, + ) + if rec: + rec["price_path"] = price_path[-128:] + return rec + + def _ensure_v7_journal_table(self) -> None: + """Create the V7 decision journal if it does not already exist.""" + ddl = f""" + CREATE TABLE IF NOT EXISTS {self._v7_journal_db}.{self._v7_journal_table} + ( + ts DateTime64(6, 'UTC'), + ts_day Date MATERIALIZED toDate(ts), + strategy LowCardinality(String), + source LowCardinality(String), + trade_id String, + asset LowCardinality(String), + side LowCardinality(String), + entry_price Float64, + current_price Float64, + quantity Float64, + notional Float64, + leverage Float32, + bar_idx UInt32, + decision_seq UInt32, + bars_held UInt16, + action LowCardinality(String), + reason LowCardinality(String), + pnl_pct Float32, + mfe Float32, + mae Float32, + mfe_risk Float32, + mae_risk Float32, + exit_pressure Float32, + rv_comp Float32, + mae_thresh1 Float32, + bounce_score Float32, + bounce_risk Float32, + ob_imbalance Float32, + vel_div_entry Float32, + vel_div_now Float32, + v50_vel Float32, + v750_vel Float32, + exf_funding Float32, + exf_dvol Float32, + exf_fear_greed Float32, + exf_taker Float32, + posture LowCardinality(String) + ) + ENGINE = MergeTree + PARTITION BY toYYYYMM(ts) + ORDER BY (ts_day, trade_id, decision_seq, ts) + TTL ts_day + toIntervalDay(180) + """ + try: + req = urllib.request.Request( + "http://localhost:8123/", + data=ddl.encode(), + method="POST", + ) + req.add_header("X-ClickHouse-User", "dolphin") + req.add_header("X-ClickHouse-Key", "dolphin_ch_2026") + urllib.request.urlopen(req, timeout=5).close() + except Exception as exc: + log(f"[V7_JOURNAL] table ensure failed: {exc}") + + def _record_v7_decision( + self, + *, + trade_id: str, + asset: str, + side: str, + decision: dict, + current_price: float, + ob_imbalance: float, + vel_div_now: float, + v50_vel: float, + v750_vel: float, + source: str = "scan_eval", + bar_idx: int | None = None, + ) -> None: + """Persist a V7 evaluation for observability and offline comparison.""" + if not self._v7_journal_enabled or self._v7_exit_engine is None: + return + pending = self._pending_entries.get(trade_id, {}) + seq = int(self._v7_decision_seq.get(trade_id, 0)) + 1 + self._v7_decision_seq[trade_id] = seq + entry_price = float(pending.get("entry_price", 0.0) or 0.0) + quantity = float(pending.get("quantity", 0.0) or 0.0) + row = { + "ts": _ch_ts_us(), + "strategy": "blue", + "source": source, + "trade_id": str(trade_id or ""), + "asset": str(asset or pending.get("asset", "")), + "side": str(side or pending.get("side", "")), + "entry_price": entry_price, + "current_price": float(current_price or 0.0), + "quantity": quantity, + "notional": float(quantity * entry_price), + "leverage": float(pending.get("leverage", 0.0) or 0.0), + "bar_idx": int(max(0, self.bar_idx - 1 if bar_idx is None else bar_idx)), + "decision_seq": seq, + "bars_held": int(decision.get("bars_held", 0) or 0), + "action": str(decision.get("action", "UNKNOWN") or "UNKNOWN"), + "reason": str(decision.get("reason") or ""), + "pnl_pct": float(decision.get("pnl_pct", 0.0) or 0.0), + "mfe": float(decision.get("mfe", 0.0) or 0.0), + "mae": float(decision.get("mae", 0.0) or 0.0), + "mfe_risk": float(decision.get("mfe_risk", 0.0) or 0.0), + "mae_risk": float(decision.get("mae_risk", 0.0) or 0.0), + "exit_pressure": float(decision.get("exit_pressure", 0.0) or 0.0), + "rv_comp": float(decision.get("rv_comp", 0.0) or 0.0), + "mae_thresh1": float(decision.get("mae_thresh1", 0.0) or 0.0), + "bounce_score": float(decision.get("bounce_score", 0.0) or 0.0), + "bounce_risk": float(decision.get("bounce_risk", 0.0) or 0.0), + "ob_imbalance": float(ob_imbalance or 0.0), + "vel_div_entry": float(pending.get("vel_div_entry", 0.0) or 0.0), + "vel_div_now": float(vel_div_now or 0.0), + "v50_vel": float(v50_vel or 0.0), + "v750_vel": float(v750_vel or 0.0), + "exf_funding": float(self._last_exf.get("funding", 0.0) or 0.0), + "exf_dvol": float(self._last_exf.get("dvol", 0.0) or 0.0), + "exf_fear_greed": float(self._last_exf.get("fear_greed", 0.0) or 0.0), + "exf_taker": float(self._last_exf.get("taker", 0.0) or 0.0), + "posture": str(pending.get("posture", self.cached_posture) or ""), + } + try: + ch_put(self._v7_journal_table, row) + except Exception as exc: + log(f"[V7_JOURNAL] write failed: {exc}") + + def _v7_live_exit_decision( + self, + *, + pos, + bar_idx: int, + prices: dict, + vel_div: float, + v50_vel: float, + v750_vel: float, + ) -> dict | None: + """Live BLUE exit hook backed by AlphaExitEngineV7. + + The orchestrator calls this before falling back to the base exit manager. + Returns a V7 decision dict or None if the trade cannot yet be evaluated. + """ + if self._v7_exit_engine is None or pos is None: + return None + + trade_id = str(getattr(pos, "trade_id", "") or "") + asset = str(getattr(pos, "asset", "") or "") + if not trade_id or not asset: + return None + + pending = self._pending_entries.get(trade_id, {}) + ctx_v7 = self._v7_contexts.get(trade_id) + eval_bar = max(0, int(bar_idx) - 1) + + if ctx_v7 is None: + try: + ctx_v7 = self._v7_exit_engine.make_context( + entry_price=float( + pending.get("entry_price", getattr(pos, "entry_price", 0.0)) + or getattr(pos, "entry_price", 0.0) + or 0.0 + ), + entry_bar=int(pending.get("entry_bar", eval_bar) or eval_bar), + side=1 if str(pending.get("side", "SHORT") or "SHORT") == "SHORT" else 0, + ) + if self._last_exf: + ctx_v7.set_exf( + funding=float(self._last_exf.get("funding", 0.0) or 0.0), + dvol=float(self._last_exf.get("dvol", 0.0) or 0.0), + fear_greed=float(self._last_exf.get("fear_greed", 0.0) or 0.0), + taker=float(self._last_exf.get("taker", 0.0) or 0.0), + ) + self._v7_contexts[trade_id] = ctx_v7 + self._v7_decision_seq.setdefault(trade_id, 0) + except Exception as exc: + log(f" V7 live context init failed for {trade_id}: {exc}") + return None + elif self._last_exf: + try: + ctx_v7.set_exf( + funding=float(self._last_exf.get("funding", 0.0) or 0.0), + dvol=float(self._last_exf.get("dvol", 0.0) or 0.0), + fear_greed=float(self._last_exf.get("fear_greed", 0.0) or 0.0), + taker=float(self._last_exf.get("taker", 0.0) or 0.0), + ) + except Exception: + pass + + ob_imb = 0.0 + if self.ob_eng is not None: + try: + ob_sig = self.ob_eng.get_signal(asset, float(eval_bar)) + ob_imb = float(getattr(ob_sig, "imbalance_ma5", 0.0) or 0.0) + except Exception as exc: + log(f" V7 live OB signal failed for {trade_id}: {exc}") + + cur_px = float( + prices.get(asset, getattr(pos, "current_price", 0.0)) + or getattr(pos, "current_price", 0.0) + or 0.0 + ) + if cur_px <= 0.0: + return None + + decision = self._v7_exit_engine.evaluate( + ctx_v7, + cur_px, + eval_bar, + ob_imb, + asset=asset, + ) + self._v7_decisions[trade_id] = decision + self._record_v7_decision( + trade_id=trade_id, + asset=asset, + side=str(pending.get("side", "SHORT") or "SHORT"), + decision=decision, + current_price=cur_px, + ob_imbalance=ob_imb, + vel_div_now=vel_div, + v50_vel=v50_vel, + v750_vel=v750_vel, + source="live_exit", + bar_idx=eval_bar, + ) + + action = str(decision.get("action", "HOLD") or "HOLD") + if action != "HOLD": + log( + " V7 live decision: " + f"{trade_id} {asset} action={action} reason={decision.get('reason', '')} " + f"pressure={float(decision.get('exit_pressure', 0.0) or 0.0):+.3f} " + f"pnl_pct={float(decision.get('pnl_pct', 0.0) or 0.0):+.3f}" + ) + return decision + + def _sync_sc_gauge_advisor(self, scan_number: int, vel_div: float) -> None: + """Shadow-only bucket gauge advisory surface.""" + if self._sc_gauge is None: + return + try: + payload = self._read_esof_payload() + trade_history = getattr(self.eng, "trade_history", []) + open_tid = next(iter(self._pending_entries.keys()), "") + pending = self._pending_entries.get(open_tid, {}) if open_tid else {} + asset = str(pending.get("asset", "")) + rec = self._sc_gauge.evaluate( + trade_id=str(open_tid or ""), + asset=asset, + sc=_safe_float(payload.get("advisory_score", payload.get("score", 0.0)) if payload else None), + vel_div=float(vel_div or 0.0), + exf_snapshot=getattr(self, "_last_exf", {}) or {}, + obf_snapshot=self._current_obf_snapshot(asset, self.bar_idx), + trade_history=trade_history, + current_mult=float(self._last_esof_size_mult or 1.0), + esof_payload=payload, + scan_number=int(scan_number or 0), + bar_idx=int(self.bar_idx), + strategy="blue", + log_shadow=True, + ) + if open_tid: + pending["sc_bucket_gauge"] = rec + pending["sc_bucket_gauge_exec_mult"] = float(self._last_esof_size_mult or 1.0) + self._pending_entries[open_tid] = pending + now = time.time() + if now - self._sc_gauge_last_log >= 300: + self._sc_gauge_last_log = now + log( + f"SC_GAUGE: sc={rec['sc']:+.3f} bucket={rec['bucket_id']} " + f"cur={rec['current_mult']:.2f} rec={rec['recommended_size_mult']:.2f} " + f"tp={rec['recommended_tp_mult']:.2f} hold={rec['recommended_hold_mult']:.2f} " + f"cut={rec['recommended_sc_cut']:+.2f} conf={rec['confidence']:.2f}" + ) + except Exception as e: + log(f"SC_GAUGE error: {e}") + + # ── CH position-state persistence ───────────────────────────────────────── + + def _ps_write_open(self, tid: str, entry: dict): + """Persist OPEN row to position_state on entry. Fire-and-forget via ch_put.""" + try: + ch_put("position_state", { + "ts": entry['entry_ts'], + "trade_id": tid, + "asset": entry['asset'], + "direction": -1 if entry['side'] == 'SHORT' else 1, + "entry_price": entry['entry_price'], + "quantity": entry['quantity'], + "notional": round(entry['quantity'] * entry['entry_price'], 4), + "leverage": entry['leverage'], + "bucket_id": int(self._bucket_assignments.get(entry['asset'], -1)), + "entry_bar": self.bar_idx, + "status": "OPEN", + "exit_reason": "", + "pnl": 0.0, + "bars_held": 0, + }) + except Exception as e: + log(f" position_state OPEN write failed: {e}") + + def _ps_write_closed(self, tid: str, pending: dict, x: dict): + """Persist CLOSED row to position_state on exit (supersedes OPEN row via ReplacingMergeTree).""" + try: + ch_put("position_state", { + "ts": _ch_ts_us(), + "trade_id": tid, + "asset": pending.get('asset', ''), + "direction": -1 if pending.get('side') == 'SHORT' else 1, + "entry_price": pending.get('entry_price', 0.0), + "quantity": pending.get('quantity', 0.0), + "notional": round(pending.get('quantity', 0.0) * pending.get('entry_price', 0.0), 4), + "leverage": pending.get('leverage', 0.0), + "bucket_id": int(self._bucket_assignments.get(pending.get('asset', ''), -1)), + "entry_bar": 0, + "status": "CLOSED", + "exit_reason": str(x.get('reason', 'UNKNOWN')), + "pnl": float(x.get('net_pnl', 0) or 0), + "bars_held": int(x.get('bars_held', 0) or 0), + }) + except Exception as e: + log(f" position_state CLOSED write failed: {e}") + + def _restore_position_state(self): + """On startup: check CH for an OPEN position and restore engine state.""" + try: + import urllib.request, base64 as _b64 + sql = ("SELECT trade_id, asset, direction, entry_price, quantity, " + "notional, leverage, bucket_id, bars_held " + "FROM dolphin.position_state FINAL " + "WHERE status = 'OPEN' " + "ORDER BY ts DESC LIMIT 1 FORMAT TabSeparated") + req = urllib.request.Request( + "http://localhost:8123/?database=dolphin", + data=sql.encode(), + headers={"Authorization": "Basic " + + _b64.b64encode(b"dolphin:dolphin_ch_2026").decode()}) + with urllib.request.urlopen(req, timeout=5) as r: + row = r.read().decode().strip() + if not row: + log(" position_state: no open position to restore") + return + + cols = row.split('\t') + if len(cols) < 9: + log(f" position_state: unexpected row format: {row}") + return + + trade_id = cols[0] + asset = cols[1] + direction = int(cols[2]) + entry_price = float(cols[3]) + quantity = float(cols[4]) + notional = float(cols[5]) + leverage = float(cols[6]) + bucket_id = int(cols[7]) + stored_bars = int(cols[8]) + + # Estimate entry_bar so MAX_HOLD countdown continues from where it left off + restored_entry_bar = max(0, self.bar_idx - stored_bars) + + pos = NDPosition( + trade_id = trade_id, + asset = asset, + direction = direction, + entry_price = entry_price, + entry_bar = restored_entry_bar, + notional = notional, + leverage = leverage, + fraction = notional / max(self.eng.capital * leverage, 1.0), + entry_vel_div = 0.0, + bucket_idx = 0, # signal-strength bucket (not KMeans); 0=safe default + current_price = entry_price, + ) + with self.eng_lock: + self.eng.position = pos + self.eng.exit_manager.setup_position( + trade_id, entry_price, direction, restored_entry_bar, + ) + # NOTE: do NOT arm hibernate protect here. + # _day_posture starts as 'APEX' — the posture sync block on the + # first incoming scan will detect the APEX→HIBERNATE transition + # and call _hibernate_protect_position() at the right moment. + + # Rebuild _pending_entries so the exit CH write fires correctly + side = 'SHORT' if direction == -1 else 'LONG' + self._pending_entries[trade_id] = { + 'asset': asset, + 'side': side, + 'entry_price': entry_price, + 'quantity': quantity, + 'leverage': leverage, + 'vel_div_entry': 0.0, + 'boost_at_entry': 1.0, + 'beta_at_entry': 1.0, + 'posture': 'RESTORED', + 'entry_ts': _ch_ts_us(), + 'entry_date': (self.current_day or ''), + } + if self._v7_exit_engine is not None: + try: + ctx = self._v7_exit_engine.make_context( + entry_price=entry_price, + entry_bar=restored_entry_bar, + side=1 if direction == -1 else 0, + ) + self._v7_contexts[trade_id] = ctx + self._v7_decision_seq[trade_id] = 0 + except Exception as e: + log(f" V7 live restore context failed: {e}") + log(f" position_state RESTORED: {asset} {side} entry={entry_price} " + f"notional={notional:.0f} bars_held≈{stored_bars} trade={trade_id}") + + except Exception as e: + log(f" position_state restore error: {e}") + + def _hibernate_protect_position(self): + """Arm per-bucket TP+SL instead of immediate HIBERNATE_HALT. + + Must be called under eng_lock with an open position. + Sets stop_pct_override on the live exit_manager state so the position + exits via FIXED_TP or STOP_LOSS rather than being force-closed. + Records trade_id in _hibernate_protect_active so the exit path can + re-label the reason and finalize posture once the position closes. + """ + pos = self.eng.position + if pos is None: + return + bucket = self._bucket_assignments.get(pos.asset, 'default') + sl_pct = _BUCKET_SL_PCT.get(bucket, _BUCKET_SL_PCT['default']) + tp_pct = self.eng.exit_manager.fixed_tp_pct + + # Patch the live exit_manager state for this trade_id + em_state = self.eng.exit_manager._positions.get(pos.trade_id) + if em_state is not None: + em_state['stop_pct_override'] = sl_pct + else: + # Position not registered in exit_manager (shouldn't happen, but be safe) + log(f" HIBERNATE_PROTECT: trade {pos.trade_id} not in exit_manager — arming anyway via re-setup") + self.eng.exit_manager.setup_position( + pos.trade_id, pos.entry_price, pos.direction, pos.entry_bar, + stop_pct_override=sl_pct, + ) + + self._hibernate_protect_active = pos.trade_id + log(f"HIBERNATE_PROTECT armed: {pos.asset} B{bucket} " + f"SL={sl_pct*100:.2f}% TP={tp_pct*100:.2f}% trade={pos.trade_id}") + + def _connect_hz(self): + log("Connecting to Hazelcast...") + import hazelcast + self.hz_client = hazelcast.HazelcastClient(cluster_name=HZ_CLUSTER, cluster_members=[HZ_HOST]) + self.features_map = self.hz_client.get_map("DOLPHIN_FEATURES") + self.safety_map = self.hz_client.get_map("DOLPHIN_SAFETY") + self.pnl_map = self.hz_client.get_map("DOLPHIN_PNL_BLUE") + self.state_map = self.hz_client.get_map("DOLPHIN_STATE_BLUE") + self.heartbeat_map = self.hz_client.get_map("DOLPHIN_HEARTBEAT") + # Immediate heartbeat — prevents Cat1=0 during startup gap + try: + self.heartbeat_map.blocking().put('nautilus_flow_heartbeat', json.dumps({ + 'ts': time.time(), + 'iso': datetime.now(timezone.utc).isoformat(), + 'phase': 'starting', + 'flow': 'nautilus_event_trader', + })) + except Exception: + pass + log(" Hz connected") + + def _read_posture(self): + now = time.time() + if now - self.posture_cache_time < 10: + return self.cached_posture + try: + posture_raw = self.safety_map.blocking().get("latest") or self.safety_map.blocking().get("posture") + if posture_raw: + if isinstance(posture_raw, str): + try: + parsed = json.loads(posture_raw) + self.cached_posture = parsed.get("posture", posture_raw) + except (json.JSONDecodeError, AttributeError): + self.cached_posture = posture_raw + else: + self.cached_posture = posture_raw.get("posture", "APEX") + self.posture_cache_time = now + except: + pass + return self.cached_posture + + def _rollover_day(self): + today = datetime.now(timezone.utc).strftime('%Y-%m-%d') + if today == self.current_day: + return + posture = self._read_posture() + with self.eng_lock: + if today != self.current_day: # double-checked: only one thread calls begin_day + if getattr(self, 'acb', None): + try: + exf_raw = self.features_map.blocking().get('exf_latest') if self.features_map else None + es_raw = self.features_map.blocking().get('latest_eigen_scan') if self.features_map else None + + exf_snapshot = json.loads(exf_raw) if isinstance(exf_raw, str) else (exf_raw or {}) + eigen_scan = json.loads(es_raw) if isinstance(es_raw, str) else (es_raw or {}) + + w750_vel = eigen_scan.get('w750_velocity', 0.0) + + if exf_snapshot: + self.acb.get_dynamic_boost_from_hz( + date_str=today, + exf_snapshot=exf_snapshot, + w750_velocity=float(w750_vel) if w750_vel else None, + direction=self.trade_direction, + ) + log(f"ACB: Pre-warmed cache for {today} from HZ") + except Exception as e: + log(f"ACB Rollover Error: {e}") + + self.eng.begin_day(today, posture=posture, direction=self.trade_direction) + self.bar_idx = 0 + self.current_day = today + log( + f"begin_day({today}) called with posture={posture} " + f"direction={_direction_label(self.trade_direction)}" + ) + + def _compute_vol_ok(self, scan): + assets = scan.get('assets', []) + prices = scan.get('asset_prices', []) + if not assets or not prices: + return True + prices_dict = dict(zip(assets, prices)) + btc_price = prices_dict.get('BTCUSDT') + if btc_price is None: + return True + self.btc_prices.append(float(btc_price)) + if len(self.btc_prices) < BTC_VOL_WINDOW: + return True + import numpy as np + arr = np.array(self.btc_prices) + dvol = float(np.std(np.diff(arr) / arr[:-1])) + return dvol > VOL_P60_THRESHOLD + + @staticmethod + def _normalize_ng7(scan: dict) -> dict: + """ + Promote NG7-format scan to NG5-compatible flat dict. + NG7 embeds eigenvalue windows and prices inside result{} — the engine + expects flat top-level fields. Mapping derived from continuous_convert.py: + vel_div = w50_velocity − w750_velocity (fast minus slow eigenvalue velocity) + w50_velocity = multi_window_results["50"].tracking_data.lambda_max_velocity + w750_velocity = multi_window_results["750"].tracking_data.lambda_max_velocity + assets = sorted(current_prices.keys()), BTCUSDT always last + """ + result = scan.get('result') or {} + mw = result.get('multi_window_results') or {} + + def _vel(win): + v = (mw.get(str(win)) or {}).get('tracking_data', {}).get('lambda_max_velocity') + try: + f = float(v) + return f if math.isfinite(f) else 0.0 + except (TypeError, ValueError): + return 0.0 + + v50 = _vel(50) + v150 = _vel(150) + v750 = _vel(750) + + cp = (result.get('pricing_data') or {}).get('current_prices') or {} + assets = [a for a in cp if a != 'BTCUSDT'] + if 'BTCUSDT' in cp: + assets.append('BTCUSDT') # BTC always last — matches NG5/Arrow convention + prices = [float(cp[a]) for a in assets] + + instability = float((result.get('regime_prediction') or {}) + .get('instability_score') or 0.0) + + return { + **scan, + 'vel_div': v50 - v750, + 'w50_velocity': v50, + 'w750_velocity': v750, + 'assets': assets, + 'asset_prices': prices, + 'instability_50': instability, + } + + def on_scan(self, event): + """Reactor-thread entry point — dispatches immediately to worker thread.""" + if not event.value: + return + listener_time = time.time() + self._scan_executor.submit(self._process_scan, event, listener_time) + + def _process_scan(self, event, listener_time): + try: + if not event.value: + return + + scan = json.loads(event.value) if isinstance(event.value, str) else event.value + + # Normalise NG7 format → NG5-compatible flat dict before any field access + if scan.get('version') == 'NG7': + scan = self._normalize_ng7(scan) + + scan_number = int(scan.get('scan_number') or 0) + + # Dedup: scan_number is authoritative (monotonically increasing). + # file_mtime / timestamp are unreliable across NG7 restart probes. + with self._dedup_lock: + if scan_number > 0 and scan_number <= self.last_scan_number: + return + self.last_scan_number = scan_number + self.scans_processed += 1 + + self._rollover_day() + + assets = scan.get('assets') or [] + if assets and not self.ob_assets: + self._wire_obf(assets) + + prices = scan.get('asset_prices') or [] + if assets and prices and len(assets) != len(prices): + log(f"WARN scan #{scan_number}: assets/prices mismatch " + f"({len(assets)}≠{len(prices)}) — dropped") + return + prices_dict = dict(zip(assets, prices)) if assets and prices else {} + # Remove stablecoins — they should never be selected as a trade asset + for sym in _STABLECOIN_SYMBOLS: + prices_dict.pop(sym, None) + + self._record_bounce_prices(prices_dict) + + vol_ok = self._compute_vol_ok(scan) + + vel_div = float(scan.get('vel_div') or 0.0) + if not math.isfinite(vel_div): + log(f"WARN scan #{scan_number}: non-finite vel_div={vel_div} — clamped to 0.0") + vel_div = 0.0 + + v50_vel = float(scan.get('w50_velocity') or 0.0) + v750_vel = float(scan.get('w750_velocity') or 0.0) + if not math.isfinite(v50_vel): v50_vel = 0.0 + if not math.isfinite(v750_vel): v750_vel = 0.0 + self.last_w750_vel = v750_vel + + # Feed live OB data into OBF engine for this bar (AGENT_SPEC_OBF_LIVE_SWITCHOVER) + if self.ob_eng is not None and self.ob_assets: + self.ob_eng.step_live(self.ob_assets, self.bar_idx) + + # Live posture sync — update engine posture + regime_dd_halt together + posture_now = self._read_posture() + with self.eng_lock: + prev_posture = getattr(self.eng, '_day_posture', 'APEX') + if posture_now != prev_posture: + if posture_now in ('TURTLE', 'HIBERNATE'): + self.eng.regime_dd_halt = True # always block new entries + if (posture_now == 'HIBERNATE' + and self.eng.position is not None + and not self._hibernate_protect_active): + # Position in flight: arm TP+SL instead of letting + # _manage_position() fire HIBERNATE_HALT next bar. + # _day_posture stays at prev value — no HALT fires. + self._hibernate_protect_position() + else: + self.eng._day_posture = posture_now + log(f"POSTURE_SYNC: {posture_now} — halt set") + else: + self.eng._day_posture = posture_now + self.eng.regime_dd_halt = False + if self._hibernate_protect_active: + log(f"POSTURE_SYNC: {posture_now} — posture recovered, clearing protect mode") + self._hibernate_protect_active = None + else: + log(f"POSTURE_SYNC: {posture_now} — halt lifted") + + # EsoF value gate — exposure only, no alpha or selection changes. + self._sync_esof_size_gate() + self._sync_sc_threshold_advisor(scan_number=scan_number, vel_div=vel_div) + self._sync_sc_gauge_advisor(scan_number=scan_number, vel_div=vel_div) + if self._market_state_runtime is not None: + try: + self._market_state_runtime.update_scan_state( + scan_payload=scan, + prices_dict=prices_dict, + scan_number=scan_number, + vel_div=vel_div, + v50_vel=v50_vel, + v750_vel=v750_vel, + vol_ok=vol_ok, + posture=posture_now, + exf_snapshot=getattr(self, "_last_exf", {}) or {}, + esof_payload=self._read_esof_payload(), + top_k_assets=5, + ) + except Exception as e: + log(f" MarketStateRuntime scan update failed: {e}") + + step_start = time.time() + with self.eng_lock: + result = self.eng.step_bar( + bar_idx=self.bar_idx, vel_div=vel_div, prices=prices_dict, + vol_regime_ok=vol_ok, v50_vel=v50_vel, v750_vel=v750_vel + ) + self.bar_idx += 1 + scan_to_fill_ms = (time.time() - listener_time) * 1000 + step_bar_ms = (time.time() - step_start) * 1000 + log(f"LATENCY scan #{scan_number}: scan→fill={scan_to_fill_ms:.1f}ms step_bar={step_bar_ms:.1f}ms vel_div={vel_div:.5f}") + + ch_put("eigen_scans", { + "ts": _ch_ts_us(), + "scan_number": scan_number, + "scan_uuid": str(scan.get("scan_uuid") or ""), + "vel_div": vel_div, + "w50_velocity": v50_vel, + "w750_velocity": v750_vel, + "instability_50": float(scan.get("instability_50") or 0.0), + "scan_to_fill_ms": scan_to_fill_ms, + "step_bar_ms": step_bar_ms, + }) + + if result.get('entry'): + self.trades_executed += 1 + e = result['entry'] + log(f"ENTRY: {e} [{ALGO_VERSION}]") + # Cache entry fields for CH trade_events on exit + tid = e.get('trade_id') + if tid: + self._pending_entries[tid] = { + 'asset': e.get('asset', ''), + 'side': 'SHORT' if e.get('direction', -1) == -1 else 'LONG', + 'entry_price': float(e.get('entry_price', 0) or 0), + 'quantity': round(float(e.get('notional', 0) or 0) / float(e.get('entry_price', 1) or 1), 6), + 'leverage': float(e.get('leverage', 0) or 0), + 'vel_div_entry': float(e.get('vel_div', 0) or 0), + 'boost_at_entry': float(getattr(getattr(self, 'eng', None), 'acb_boost', 1.0) or 1.0), + 'beta_at_entry': float(getattr(getattr(self, 'eng', None), 'acb_beta', 1.0) or 1.0), + 'posture': posture_now, + 'entry_ts': _ch_ts_us(), + 'entry_date': (self.current_day or ''), + 'entry_bar': self.bar_idx, + } + # Persist position to CH so restarts can recover it + self._ps_write_open(tid, self._pending_entries[tid]) + self._announce_position_event( + kind="trade_entry", + severity="info", + title=f"[BLUE] ENTRY {e.get('asset', '')} {self._pending_entries[tid]['side']}", + message=( + f"entry={float(e.get('entry_price', 0) or 0):.6f} " + f"qty={self._pending_entries[tid]['quantity']:.6f} " + f"lev={self._pending_entries[tid]['leverage']:.2f}x" + ), + metadata={ + "trade_id": tid, + "asset": self._pending_entries[tid]["asset"], + "side": self._pending_entries[tid]["side"], + "entry_price": self._pending_entries[tid]["entry_price"], + "quantity": self._pending_entries[tid]["quantity"], + "leverage": self._pending_entries[tid]["leverage"], + "vel_div_entry": self._pending_entries[tid]["vel_div_entry"], + "boost_at_entry": self._pending_entries[tid]["boost_at_entry"], + "beta_at_entry": self._pending_entries[tid]["beta_at_entry"], + "posture": self._pending_entries[tid]["posture"], + "entry_ts": self._pending_entries[tid]["entry_ts"], + }, + ) + if self._v7_exit_engine is not None: + try: + side = 1 if e.get('direction', -1) == -1 else 0 + ctx = self._v7_exit_engine.make_context( + entry_price=float(e.get('entry_price', 0) or 0), + entry_bar=max(0, self.bar_idx - 1), + side=side, + ) + if self._last_exf: + ctx.set_exf( + funding=float(self._last_exf.get('funding', 0.0) or 0.0), + dvol=float(self._last_exf.get('dvol', 0.0) or 0.0), + fear_greed=float(self._last_exf.get('fear_greed', 0.0) or 0.0), + taker=float(self._last_exf.get('taker', 0.0) or 0.0), + ) + self._v7_contexts[tid] = ctx + self._v7_decisions.pop(tid, None) + self._v7_decision_seq[tid] = 0 + except Exception as e: + log(f" V7 live context init failed for {tid}: {e}") + # Shadow AE: notify of entry (vel_div at entry bar is in scope) + if self._ae is not None: + try: + self._ae.on_entry( + trade_id=tid, + asset=e.get('asset', ''), + direction=int(e.get('direction', -1)), + entry_price=float(e.get('entry_price', 0) or 0), + vel_div_entry=vel_div, + ) + except Exception: + pass + if self._sc_advisor is not None: + try: + payload = self._read_esof_payload() + rec = self._sc_advisor.evaluate( + trade_id=tid, + asset=e.get('asset', ''), + sc=_safe_float(payload.get('advisory_score', payload.get('score', 0.0)) if payload else None), + vel_div=vel_div, + exf_snapshot=getattr(self, "_last_exf", {}) or {}, + trade_history=getattr(self.eng, 'trade_history', []), + current_mult=float(self._last_esof_size_mult or 1.0), + esof_payload=payload, + scan_number=scan_number, + bar_idx=self.bar_idx, + strategy="blue", + log_shadow=True, + ) + self._pending_entries[tid]['sc_threshold_advisor'] = rec + self._pending_entries[tid]['sc_exec_mult'] = float(self._last_esof_size_mult or 1.0) + except Exception: + pass + if self._sc_gauge is not None: + try: + payload = self._read_esof_payload() + rec = self._sc_gauge.evaluate( + trade_id=tid, + asset=e.get('asset', ''), + sc=_safe_float(payload.get('advisory_score', payload.get('score', 0.0)) if payload else None), + vel_div=vel_div, + exf_snapshot=getattr(self, "_last_exf", {}) or {}, + obf_snapshot=self._current_obf_snapshot(e.get('asset', ''), self.bar_idx), + trade_history=getattr(self.eng, 'trade_history', []), + current_mult=float(self._last_esof_size_mult or 1.0), + esof_payload=payload, + scan_number=scan_number, + bar_idx=self.bar_idx, + strategy="blue", + log_shadow=True, + ) + self._pending_entries[tid]['sc_bucket_gauge'] = rec + self._pending_entries[tid]['sc_bucket_gauge_exec_mult'] = float(self._last_esof_size_mult or 1.0) + except Exception: + pass + if self._bounce_advisor is not None: + try: + entry_ts_val = float(self._pending_entries[tid].get('entry_ts', 0) or 0) + entry_ts_dt = datetime.fromtimestamp(entry_ts_val / 1_000_000, tz=timezone.utc) if entry_ts_val else None + bounce_rec = self._bounce_eval( + trade_id=tid, + asset=str(e.get('asset', '')), + side=self._pending_entries[tid]['side'], + source="entry", + scan_number=scan_number, + entry_ts=entry_ts_dt, + current_price=float(prices_dict.get(e.get('asset', ''), e.get('entry_price', 0)) or e.get('entry_price', 0) or 0), + entry_price=float(e.get('entry_price', 0) or 0), + quantity=float(self._pending_entries[tid].get('quantity', 0) or 0), + notional=float(e.get('notional', 0) or 0), + leverage=float(e.get('leverage', 0) or 0), + vel_div=vel_div, + current_mult=float(self._last_esof_size_mult or 1.0), + bars_held=0, + log_shadow=True, + ) + if bounce_rec: + self._pending_entries[tid]['bounce_advisor_entry'] = bounce_rec + self._pending_entries[tid]['bounce_advisor_latest'] = bounce_rec + except Exception as e: + log(f" BounceAdvisor entry eval failed for {tid}: {e}") + + if (self._v7_exit_engine is not None + and self.eng is not None + and getattr(self.eng, 'position', None) is not None + and not self._v7_live_exit_enabled): + pos = self.eng.position + tid_v7 = getattr(pos, 'trade_id', '') + pending_v7 = self._pending_entries.get(tid_v7, {}) + ctx_v7 = self._v7_contexts.get(tid_v7) + if ctx_v7 is None and pending_v7: + try: + ctx_v7 = self._v7_exit_engine.make_context( + entry_price=float(pending_v7.get('entry_price', pos.entry_price) or pos.entry_price or 0.0), + entry_bar=int(pending_v7.get('entry_bar', max(0, self.bar_idx - 1)) or max(0, self.bar_idx - 1)), + side=1 if pending_v7.get('side', 'SHORT') == 'SHORT' else 0, + ) + if self._last_exf: + ctx_v7.set_exf( + funding=float(self._last_exf.get('funding', 0.0) or 0.0), + dvol=float(self._last_exf.get('dvol', 0.0) or 0.0), + fear_greed=float(self._last_exf.get('fear_greed', 0.0) or 0.0), + taker=float(self._last_exf.get('taker', 0.0) or 0.0), + ) + self._v7_contexts[tid_v7] = ctx_v7 + self._v7_decision_seq.setdefault(tid_v7, 0) + except Exception as e: + log(f" V7 live context restore failed for {tid_v7}: {e}") + ctx_v7 = None + if ctx_v7 is not None and pending_v7: + try: + if self.ob_eng is not None: + ob_sig = self.ob_eng.get_signal(pos.asset, float(max(0, self.bar_idx - 1))) + ob_imb = float(getattr(ob_sig, 'imbalance_ma5', 0.0) or 0.0) + else: + ob_imb = 0.0 + cur_px = float(prices_dict.get(pos.asset, pos.current_price) or pos.current_price or 0.0) + if cur_px > 0.0: + v7dec = self._v7_exit_engine.evaluate( + ctx_v7, + cur_px, + max(0, self.bar_idx - 1), + ob_imb, + asset=pos.asset, + ) + self._v7_decisions[tid_v7] = v7dec + self._record_v7_decision( + trade_id=tid_v7, + asset=pos.asset, + side=pending_v7.get('side', 'SHORT'), + decision=v7dec, + current_price=cur_px, + ob_imbalance=ob_imb, + vel_div_now=vel_div, + v50_vel=v50_vel, + v750_vel=v750_vel, + bar_idx=max(0, self.bar_idx - 1), + ) + if self._bounce_advisor is not None: + try: + entry_ts_val = float(pending_v7.get('entry_ts', 0) or 0) + entry_ts_dt = datetime.fromtimestamp(entry_ts_val / 1_000_000, tz=timezone.utc) if entry_ts_val else None + bounce_rec = self._bounce_eval( + trade_id=tid_v7, + asset=pos.asset, + side=pending_v7.get('side', 'SHORT'), + source="open_scan", + scan_number=scan_number, + entry_ts=entry_ts_dt, + current_price=cur_px, + entry_price=float(pending_v7.get('entry_price', pos.entry_price) or pos.entry_price or 0.0), + quantity=float(pending_v7.get('quantity', getattr(pos, 'quantity', 0.0)) or getattr(pos, 'quantity', 0.0) or 0.0), + notional=float(pending_v7.get('notional', getattr(pos, 'notional', 0.0)) or getattr(pos, 'notional', 0.0) or 0.0), + leverage=float(pending_v7.get('leverage', getattr(pos, 'leverage', 0.0)) or getattr(pos, 'leverage', 0.0) or 0.0), + vel_div=vel_div, + current_mult=float(self._last_esof_size_mult or 1.0), + bars_held=max(0, int(self.bar_idx - int(pending_v7.get('entry_bar', max(0, self.bar_idx - 1)) or max(0, self.bar_idx - 1)))), + log_shadow=True, + ) + if bounce_rec: + pending_v7['bounce_advisor_latest'] = bounce_rec + self._pending_entries[tid_v7] = pending_v7 + except Exception as e: + log(f" BounceAdvisor open-scan eval failed for {tid_v7}: {e}") + except Exception as e: + log(f" V7 live evaluate failed for {tid_v7}: {e}") + + if result.get('exit'): + x = result['exit'] + tid = x.get('trade_id') + # Hibernate-protected exits: re-label reason, finalize posture + if tid and self._hibernate_protect_active == tid: + _orig = x.get('reason', '') + _map = {'FIXED_TP': 'HIBERNATE_TP', 'STOP_LOSS': 'HIBERNATE_SL', + 'MAX_HOLD': 'HIBERNATE_MAXHOLD'} + x['reason'] = _map.get(_orig, f'HIBERNATE_{_orig}') + self._hibernate_protect_active = None + # Position closed — now safe to commit posture to HIBERNATE + _cur_posture = self._read_posture() + if _cur_posture == 'HIBERNATE': + self.eng._day_posture = 'HIBERNATE' + log(f"HIBERNATE_PROTECT: closed via {x['reason']} — posture finalized HIBERNATE") + else: + log(f"HIBERNATE_PROTECT: closed via {x['reason']} — posture recovered to {_cur_posture}") + log(f"EXIT: {x} [{ALGO_VERSION}]") + tid = x.get('trade_id') + pending = self._pending_entries.pop(tid, {}) if tid else {} + if tid: + self._v7_contexts.pop(tid, None) + self._v7_decisions.pop(tid, None) + self._v7_decision_seq.pop(tid, None) + if pending: + # exact bar price the engine exited against — prices_dict is still in scope + exit_price = float(prices_dict.get(pending['asset'], 0) or 0) + if self._sc_advisor is not None: + try: + _rec = pending.get('sc_threshold_advisor') + if _rec: + self._sc_advisor.observe_outcome( + _rec, + executed_mult=float(pending.get('sc_exec_mult', self._last_esof_size_mult) or 1.0), + pnl_pct=float(x.get('pnl_pct', 0) or 0), + exit_reason=str(x.get('reason', 'UNKNOWN')), + ) + except Exception: + pass + if self._sc_gauge is not None: + try: + _rec = pending.get('sc_bucket_gauge') + if _rec: + self._sc_gauge.observe_outcome( + _rec, + executed_mult=float(pending.get('sc_bucket_gauge_exec_mult', self._last_esof_size_mult) or 1.0), + pnl_pct=float(x.get('pnl_pct', 0) or 0), + exit_reason=str(x.get('reason', 'UNKNOWN')), + ) + except Exception: + pass + if self._bounce_advisor is not None: + try: + _bounce_rec = pending.get('bounce_advisor_entry') + if _bounce_rec: + self._bounce_advisor.observe_outcome( + _bounce_rec, + pnl_pct=float(x.get('pnl_pct', 0) or 0), + exit_reason=str(x.get('reason', 'UNKNOWN')), + ) + except Exception as e: + log(f" BounceAdvisor outcome update failed for {tid}: {e}") + if self._market_state_runtime is not None: + try: + self._market_state_runtime.online_update_from_trade( + asset=str(pending.get("asset", "")), + entry_price=float(pending.get("entry_price", 0) or 0), + exit_price=float(exit_price), + direction=-1 if str(pending.get("side", "SHORT")).upper() == "SHORT" else 1, + pnl_pct=float(x.get("pnl_pct", 0) or 0), + bars_held=int(x.get("bars_held", 0) or 0), + exit_reason=str(x.get("reason", "UNKNOWN")), + trade_id=str(tid or ""), + leverage=float(pending.get("leverage", 1.0) or 1.0), + ) + except Exception as e: + log(f" MarketStateRuntime outcome update failed for {tid}: {e}") + ch_put("trade_events", { + "ts": _ch_ts_us(), + "date": pending['entry_date'], + "strategy": "blue", + "asset": pending['asset'], + "side": pending['side'], + "entry_price": pending['entry_price'], + "exit_price": exit_price, + "quantity": pending['quantity'], + "pnl": float(x.get('net_pnl', 0) or 0), + "pnl_pct": float(x.get('pnl_pct', 0) or 0), + "exit_reason": str(x.get('reason', 'UNKNOWN')), + "vel_div_entry": pending['vel_div_entry'], + "boost_at_entry": pending['boost_at_entry'], + "beta_at_entry": pending['beta_at_entry'], + "posture": pending['posture'], + "leverage": pending['leverage'], + "bars_held": int(x.get('bars_held', 0) or 0), + "regime_signal": 0, + }) + # Mark position closed in CH (supersedes OPEN row via ReplacingMergeTree) + self._ps_write_closed(tid, pending, x) + self._announce_position_event( + kind="trade_exit", + severity="info" if float(x.get("pnl_pct", 0) or 0) >= 0 else "warning", + title=f"[BLUE] EXIT {pending.get('asset', '')} {pending.get('side', '')}", + message=( + f"reason={x.get('reason', 'UNKNOWN')} " + f"pnl={float(x.get('net_pnl', 0) or 0):+.2f} " + f"pnl_pct={float(x.get('pnl_pct', 0) or 0):+.3%}" + ), + metadata={ + "trade_id": tid, + "asset": pending.get("asset", ""), + "side": pending.get("side", ""), + "entry_price": pending.get("entry_price", 0), + "exit_price": exit_price, + "quantity": pending.get("quantity", 0), + "pnl": float(x.get("net_pnl", 0) or 0), + "pnl_pct": float(x.get("pnl_pct", 0) or 0), + "exit_reason": str(x.get("reason", "UNKNOWN")), + "bars_held": int(x.get("bars_held", 0) or 0), + "posture": pending.get("posture", ""), + }, + ) + # Shadow AE: record outcome for online update + if self._ae is not None and tid: + try: + self._ae.on_exit( + trade_id=tid, + actual_exit_reason=str(x.get('reason', 'UNKNOWN')), + pnl_pct=float(x.get('pnl_pct', 0) or 0), + ) + except Exception: + pass + + # Shadow AE: per-bar evaluate for all open trades — daemon thread, zero hot-path impact + if self._ae is not None and self._pending_entries: + _ae_ref = self._ae + _pending_snap = dict(self._pending_entries) # shallow copy under GIL + _prices_snap = dict(prices_dict) + _vel_now = vel_div + _bar = self.bar_idx + def _ae_eval(): + for _tid, _p in _pending_snap.items(): + try: + _cur = _prices_snap.get(_p['asset'], 0) or 0 + if not _cur: + continue + _entry_px = float(_p.get('entry_price', 0) or 0) + _bars_held = max(0, int(_bar - int(_p.get('entry_bar', _bar)))) + _shadow_pnl_pct = ((_entry_px - _cur) / _entry_px) if _entry_px > 0 else 0.0 + _shadow = _ae_ref.evaluate( + trade_id=_tid, + asset=_p['asset'], + direction=-1, + entry_price=_entry_px, + current_price=_cur, + bars_held=_bars_held, + vel_div_now=_vel_now, + ) + _ae_ref.log_shadow(_shadow, pnl_pct=_shadow_pnl_pct) + except Exception: + pass + threading.Thread(target=_ae_eval, daemon=True).start() + + self._push_state(scan_number, vel_div, vol_ok, self._read_posture()) + + except Exception as e: + log(f"ERROR in _process_scan: {e}") + + def on_exf_update(self, event): + if not event.value: return + snapshot = json.loads(event.value) if isinstance(event.value, str) else event.value + if not self.current_day or not self.acb: return + try: + self._last_exf = { + 'funding': float(snapshot.get('funding_btc', 0.0)), + 'dvol': float(snapshot.get('dvol_btc', 50.0)), + 'fear_greed': float(snapshot.get('fng', 50.0)), + 'taker': float(snapshot.get('taker', 0.5)), + } + w750_vel = getattr(self, 'last_w750_vel', None) + acb_info = self.acb.get_dynamic_boost_from_hz( + date_str=self.current_day, + exf_snapshot=snapshot, + w750_velocity=float(w750_vel) if w750_vel else None, + direction=self.trade_direction, + ) + with self.eng_lock: + if hasattr(self.eng, 'update_acb_boost'): + subday_exit = self.eng.update_acb_boost( + boost=acb_info['boost'], + beta=acb_info['beta'] + ) + if subday_exit is not None: + log(f"SUBDAY_EXIT: {subday_exit} [{ALGO_VERSION}]") + tid = subday_exit.get('trade_id') + pending = {} + if tid: + pending = self._pending_entries.pop(tid, {}) + if pending and self._sc_advisor is not None: + try: + _rec = pending.get('sc_threshold_advisor') + if _rec: + self._sc_advisor.observe_outcome( + _rec, + executed_mult=float(pending.get('sc_exec_mult', self._last_esof_size_mult) or 1.0), + pnl_pct=float(subday_exit.get('pnl_pct', 0) or 0), + exit_reason=str(subday_exit.get('reason', 'SUBDAY_ACB_NORMALIZATION')), + ) + except Exception: + pass + if pending and self._sc_gauge is not None: + try: + _rec_g = pending.get('sc_bucket_gauge') + if _rec_g: + self._sc_gauge.observe_outcome( + _rec_g, + executed_mult=float(pending.get('sc_bucket_gauge_exec_mult', self._last_esof_size_mult) or 1.0), + pnl_pct=float(subday_exit.get('pnl_pct', 0) or 0), + exit_reason=str(subday_exit.get('reason', 'SUBDAY_ACB_NORMALIZATION')), + ) + except Exception: + pass + if pending and self._bounce_advisor is not None: + try: + _bounce_rec = pending.get('bounce_advisor_entry') + if _bounce_rec: + self._bounce_advisor.observe_outcome( + _bounce_rec, + pnl_pct=float(subday_exit.get('pnl_pct', 0) or 0), + exit_reason=str(subday_exit.get('reason', 'SUBDAY_ACB_NORMALIZATION')), + ) + except Exception as e: + log(f" BounceAdvisor outcome update failed for {tid}: {e}") + if self._market_state_runtime is not None: + try: + self._market_state_runtime.online_update_from_trade( + asset=str(pending.get("asset", "")), + entry_price=float(pending.get("entry_price", 0) or 0), + exit_price=float(subday_exit.get("exit_price", 0) or 0), + direction=-1 if str(pending.get("side", "SHORT")).upper() == "SHORT" else 1, + pnl_pct=float(subday_exit.get("pnl_pct", 0) or 0), + bars_held=int(subday_exit.get("bars_held", 0) or 0), + exit_reason=str(subday_exit.get("reason", "SUBDAY_ACB_NORMALIZATION")), + trade_id=str(tid or ""), + leverage=float(pending.get("leverage", 1.0) or 1.0), + ) + except Exception as e: + log(f" MarketStateRuntime outcome update failed for {tid}: {e}") + ch_put("trade_events", { + "ts": _ch_ts_us(), + "date": self.current_day or '', + "strategy": "blue", + "asset": pending.get('asset', subday_exit.get('asset', '')), + "side": pending.get('side', 'SHORT'), + "entry_price": pending.get('entry_price', 0), + "exit_price": float(subday_exit.get('exit_price', 0) or 0), + "quantity": round(float(pending.get('notional', 0) or 0) / max(float(pending.get('entry_price', 1) or 1), 1e-12), 6), + "pnl": float(subday_exit.get('net_pnl', 0) or 0), + "pnl_pct": float(subday_exit.get('pnl_pct', 0) or 0), + "exit_reason": str(subday_exit.get('reason', 'SUBDAY_ACB_NORMALIZATION')), + "vel_div_entry": float(pending.get('vel_div_entry', 0) or 0), + "boost_at_entry": float(pending.get('boost_at_entry', 0) or 0), + "beta_at_entry": float(pending.get('beta_at_entry', 0) or 0), + "posture": pending.get('posture', ''), + "leverage": float(pending.get('leverage', 0) or 0), + "bars_held": int(subday_exit.get('bars_held', 0) or 0), + "regime_signal": 0, + }) + self._announce_position_event( + kind="trade_exit", + severity="info" if float(subday_exit.get("pnl_pct", 0) or 0) >= 0 else "warning", + title=f"[BLUE] EXIT {pending.get('asset', '')} {pending.get('side', '')}", + message=( + f"reason={subday_exit.get('reason', 'SUBDAY_ACB_NORMALIZATION')} " + f"pnl={float(subday_exit.get('net_pnl', 0) or 0):+.2f} " + f"pnl_pct={float(subday_exit.get('pnl_pct', 0) or 0):+.3%}" + ), + metadata={ + "trade_id": tid, + "asset": pending.get("asset", subday_exit.get("asset", "")), + "side": pending.get("side", "SHORT"), + "entry_price": pending.get("entry_price", 0), + "exit_price": float(subday_exit.get("exit_price", 0) or 0), + "quantity": round(float(pending.get("notional", 0) or 0) / max(float(pending.get("entry_price", 1) or 1), 1e-12), 6), + "pnl": float(subday_exit.get("net_pnl", 0) or 0), + "pnl_pct": float(subday_exit.get("pnl_pct", 0) or 0), + "exit_reason": str(subday_exit.get("reason", "SUBDAY_ACB_NORMALIZATION")), + "bars_held": int(subday_exit.get("bars_held", 0) or 0), + "posture": pending.get("posture", ""), + }, + ) + now = time.time() + if now - self._exf_log_time >= 300: + self._exf_log_time = now + log(f"ACB subday: boost={acb_info['boost']:.4f} beta={acb_info['beta']:.4f} " + f"signals={acb_info['signals']:.1f} src={acb_info.get('source','?')}") + # ACB_EXIT disabled: update_acb_boost() called to keep boost/beta current + # (ACBv6 intact), but SUBDAY_ACB_NORMALIZATION exits are suppressed. + except ValueError as e: + log(f"ACB Stale Data Fallback: {e}") + except Exception as e: + log(f"on_exf_update Error: {e}") + + def _wire_obf(self, assets): + if not assets or self.ob_assets: + return + self.ob_assets = assets + from nautilus_dolphin.nautilus.hz_ob_provider import HZOBProvider + live_ob = HZOBProvider( + hz_cluster=HZ_CLUSTER, + hz_host=HZ_HOST, + assets=assets, + ) + self.ob_eng = OBFeatureEngine(live_ob) + # No preload_date() call — live mode uses step_live() per scan + self.eng.set_ob_engine(self.ob_eng) + log(f" OBF wired: HZOBProvider, {len(assets)} assets (LIVE mode)") + + def _save_capital(self): + """Persist capital to HZ (primary) and disk (fallback) so restarts survive HZ loss.""" + capital = getattr(self.eng, 'capital', None) + if capital is None or not math.isfinite(capital) or capital < 1.0: + return + payload = json.dumps({'capital': capital, 'ts': time.time()}) + # Primary: Hazelcast + try: + self.state_map.blocking().put('capital_checkpoint', payload) + except Exception as e: + log(f" capital HZ save failed: {e}") + # Secondary: local disk (survives HZ restart) + try: + CAPITAL_DISK_CHECKPOINT.write_text(payload) + except Exception as e: + log(f" capital disk save failed: {e}") + + def _restore_capital(self): + """On startup, restore capital from HZ or disk checkpoint.""" + def _try_load(raw, source): + if not raw: + return False + try: + data = json.loads(raw) + saved = float(data.get('capital', 0)) + age_h = (time.time() - data.get('ts', 0)) / 3600 + if saved >= 1.0 and math.isfinite(saved) and age_h < 72: + self.eng.capital = saved + log(f" Capital restored from {source}: ${saved:,.2f} (age {age_h:.1f}h)") + return True + except Exception: + pass + return False + + # Primary: Hazelcast + try: + raw = self.state_map.blocking().get('capital_checkpoint') + if _try_load(raw, 'HZ'): + return + except Exception as e: + log(f" capital HZ restore failed: {e}") + + # Secondary: disk fallback + try: + if CAPITAL_DISK_CHECKPOINT.exists(): + raw = CAPITAL_DISK_CHECKPOINT.read_text() + if _try_load(raw, 'disk'): + return + except Exception as e: + log(f" capital disk restore failed: {e}") + + log(" Capital: no valid checkpoint — starting at initial_capital") + + def _push_state(self, scan_number, vel_div, vol_ok, posture): + try: + with self.eng_lock: + capital = getattr(self.eng, 'capital', 25000.0) + # Engine uses a single NDPosition object, not a list + pos = getattr(self.eng, 'position', None) + if pos is not None: + open_notional = float(getattr(pos, 'notional', 0) or 0) + open_positions_list = [{ + 'asset': pos.asset, + 'side': 'SHORT' if pos.direction == -1 else 'LONG', + 'entry_price': pos.entry_price, + 'quantity': round(open_notional / pos.entry_price, 6) if pos.entry_price else 0, + 'notional': open_notional, + 'leverage': float(getattr(pos, 'leverage', 0) or 0), + 'unrealized_pnl': round(pos.pnl_pct * open_notional, 2), + }] + else: + open_notional = 0.0 + open_positions_list = [] + cur_leverage = (open_notional / capital) if capital and capital > 0 and math.isfinite(capital) else 0.0 + + snapshot = { + 'capital': capital if math.isfinite(capital) else None, + 'open_positions': open_positions_list, + 'algo_version': ALGO_VERSION, + 'last_scan_number': scan_number, 'last_vel_div': vel_div, + 'vol_ok': vol_ok, 'posture': posture, + 'scans_processed': self.scans_processed, + 'trades_executed': self.trades_executed, + 'bar_idx': self.bar_idx, + 'timestamp': datetime.now(timezone.utc).isoformat(), + # Leverage envelope — for TUI slider + 'leverage_soft_cap': getattr(self.eng, 'base_max_leverage', 8.0), + 'leverage_abs_cap': getattr(self.eng, 'abs_max_leverage', 9.0), + 'open_notional': round(open_notional, 2), + 'current_leverage': round(cur_leverage, 4), + } + future = self.state_map.put('engine_snapshot', json.dumps(snapshot)) + future.add_done_callback(lambda f: None) + # Heartbeat — MHS checks age < 30s; we run every scan (~11s) + if self.heartbeat_map is not None: + hb = json.dumps({ + 'ts': time.time(), + 'iso': datetime.now(timezone.utc).isoformat(), + 'run_date': self.current_day, + 'phase': 'trading', + 'flow': 'nautilus_event_trader', + }) + self.heartbeat_map.put('nautilus_flow_heartbeat', hb) + # Persist capital so next restart resumes from here + if capital is not None and math.isfinite(capital) and capital >= 1.0: + self._save_capital() + except Exception as e: + log(f" Failed to push state: {e}") + + def run(self): + global running + log("=" * 70) + log("🐬 DOLPHIN Nautilus Event-Driven Trader Starting") + log("=" * 70) + + self._build_engine() + self._connect_hz() + self._restore_capital() + self._rollover_day() + self._restore_position_state() + + def listener(event): + self.on_scan(event) + + self.features_map.add_entry_listener( + key='latest_eigen_scan', include_value=True, + updated_func=listener, added_func=listener + ) + + def exf_listener(event): + self.on_exf_update(event) + + self.features_map.add_entry_listener( + key='exf_latest', include_value=True, + updated_func=exf_listener, added_func=exf_listener + ) + + log("✅ Hz listener registered") + log(f"🏷️ ALGO_VERSION: {ALGO_VERSION}") + log("⏳ Waiting for scans...") + + try: + while running: + time.sleep(1) + except KeyboardInterrupt: + log("Interrupted") + finally: + self.shutdown() + + def shutdown(self): + log("Shutting down...") + self._scan_executor.shutdown(wait=False) + if self.eng and self.current_day: + try: + with self.eng_lock: + summary = self.eng.end_day() + log(f"end_day: {summary}") + except Exception as e: + log(f"end_day failed: {e}") + if self._market_state_runtime is not None: + try: + self._market_state_runtime.save() + except Exception: + pass + if self.hz_client: + try: + self.hz_client.shutdown() + log("Hz disconnected") + except: + pass + log(f"🛑 Stopped. Scans: {self.scans_processed}, Trades: {self.trades_executed}") + +def signal_handler(signum, frame): + global running + log(f"Signal {signum} received") + running = False + +def main(): + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + trader = DolphinLiveTrader() + trader.run() + +if __name__ == '__main__': + main() diff --git a/prod/paper_trade_flow.py b/prod/paper_trade_flow.py new file mode 100644 index 0000000..3e46219 --- /dev/null +++ b/prod/paper_trade_flow.py @@ -0,0 +1,658 @@ +"""DOLPHIN Paper Trading — Prefect Flow. + +Runs daily at 00:05 UTC. Processes yesterday's live scan data through +the NDAlphaEngine champion stack. Logs virtual P&L to disk + Hazelcast. + +Blue deployment: champion SHORT (configs/blue.yml) +Green deployment: bidirectional SHORT+LONG (configs/green.yml) [pending LONG validation] + +Usage: + # Register flows (run once, after Prefect server is up): + PREFECT_API_URL=http://localhost:4200/api python paper_trade_flow.py --register + + # Run manually for a specific date: + PREFECT_API_URL=http://localhost:4200/api python paper_trade_flow.py \\ + --date 2026-02-25 --config configs/blue.yml +""" +import sys, json, yaml, logging, argparse, csv, urllib.request, os +from pathlib import Path +from datetime import datetime, timedelta, date, timezone +import numpy as np +import pandas as pd + +HCM_DIR = Path(__file__).parent.parent +sys.path.insert(0, str(HCM_DIR / 'nautilus_dolphin')) + +from prefect import flow, task, get_run_logger +from prefect.schedules import Cron + +import hazelcast + +logging.basicConfig(level=logging.WARNING) # suppress Prefect noise below WARNING + +# ── Paths ─────────────────────────────────────────────────────────────────────── +from dolphin_paths import get_eigenvalues_path, get_klines_dir +SCANS_DIR = get_eigenvalues_path() # platform-aware: Win → NG3 dir, Linux → /mnt/ng6_data/eigenvalues +KLINES_DIR = get_klines_dir() # vbt_cache_klines/ — NG5 parquet source (preferred) +MC_MODELS_DIR = str(HCM_DIR / 'nautilus_dolphin' / 'mc_results' / 'models') + +# Columns that are eigenvalue metadata, not asset prices +META_COLS = { + 'timestamp', 'scan_number', + 'v50_lambda_max_velocity', 'v150_lambda_max_velocity', + 'v300_lambda_max_velocity', 'v750_lambda_max_velocity', + 'vel_div', 'instability_50', 'instability_150', +} + +HZ_HOST = "localhost:5701" +HZ_CLUSTER = "dolphin" + +# Number of historical eigenvalue dates to use for ACB w750 threshold calibration +ACB_HISTORY_DAYS = 60 + + +# ── Helpers ────────────────────────────────────────────────────────────────────── + +def _get_recent_scan_dates(n: int) -> list: + """Return sorted list of up to n most-recent eigenvalue date dirs.""" + try: + dirs = sorted( + d.name for d in SCANS_DIR.iterdir() + if d.is_dir() and len(d.name) == 10 and d.name.startswith('20') + ) + return dirs[-n:] + except Exception: + return [] + + +def _fetch_btcusdt_klines_fallback(date_str: str) -> "dict[str, float]": + """Fetch BTCUSDT 1m klines from Binance futures for date_str. + + Returns a dict mapping ISO timestamp strings (minute precision) → close price. + Falls back to empty dict on any error (caller handles missing prices gracefully). + """ + try: + from datetime import timezone as tz + day_start = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=tz.utc) + day_end = day_start + timedelta(days=1) + start_ms = int(day_start.timestamp() * 1000) + end_ms = int(day_end.timestamp() * 1000) + prices: dict[str, float] = {} + # Binance returns max 1500 bars per request; 1440 bars/day fits in one call + url = ( + f"https://fapi.binance.com/fapi/v1/klines" + f"?symbol=BTCUSDT&interval=1m" + f"&startTime={start_ms}&endTime={end_ms}&limit=1500" + ) + with urllib.request.urlopen(url, timeout=10) as resp: + bars = json.loads(resp.read()) + for bar in bars: + ts_ms = int(bar[0]) + close = float(bar[4]) + ts_iso = datetime.fromtimestamp(ts_ms / 1000, tz=tz.utc).strftime("%Y-%m-%dT%H:%M") + prices[ts_iso] = close + return prices + except Exception: + return {} + + +def _load_scan_df_from_json(date_str: str) -> pd.DataFrame: + """Load daily scan JSON files → DataFrame with vel_div + asset prices. + + Each scan JSON has: + - windows: {50, 150, 300, 750} → tracking_data.lambda_max_velocity + - pricing_data.current_prices → per-asset USD prices + Returns one row per scan, sorted by scan_number. + When current_prices is empty (scanner bug), falls back to Binance 1m klines. + """ + scan_dir = SCANS_DIR / date_str + if not scan_dir.exists(): + return pd.DataFrame() + + json_files = sorted(scan_dir.glob("scan_*.json"), key=lambda f: f.name) + # Exclude the _Indicators.json companion files if any + json_files = [f for f in json_files if '__Indicators' not in f.name] + if not json_files: + return pd.DataFrame() + + total = len(json_files) + rows = [] + for i, jf in enumerate(json_files): + if i % 500 == 0 and i > 0: + print(f" [scan load] {i}/{total} files...", flush=True) + try: + with open(jf, 'r', encoding='utf-8') as fh: + data = json.load(fh) + + row = { + 'scan_number': data.get('scan_number', 0), + 'timestamp': data.get('timestamp', ''), + } + + # Eigenvalue velocity per window + windows = data.get('windows', {}) + for w_key, vel_col, inst_col in [ + ('50', 'v50_lambda_max_velocity', 'instability_50'), + ('150', 'v150_lambda_max_velocity', 'instability_150'), + ('300', 'v300_lambda_max_velocity', None), + ('750', 'v750_lambda_max_velocity', None), + ]: + w_data = windows.get(str(w_key), {}) + td = w_data.get('tracking_data', {}) + vel = td.get('lambda_max_velocity') + row[vel_col] = float(vel) if vel is not None else np.nan + if inst_col is not None: + rs = w_data.get('regime_signals', {}) + row[inst_col] = float(rs.get('instability_score', 0.0) or 0.0) + + # vel_div = w50_vel - w750_vel (canonical v2_gold_fix_v50-v750 formula) + v50 = row.get('v50_lambda_max_velocity', np.nan) + v750 = row.get('v750_lambda_max_velocity', np.nan) + row['vel_div'] = (v50 - v750) if (np.isfinite(v50) and np.isfinite(v750)) else np.nan + + # Asset prices + pricing = data.get('pricing_data', {}) + prices = pricing.get('current_prices', {}) + row.update({sym: float(px) for sym, px in prices.items() if px is not None}) + + rows.append(row) + except Exception: + continue + + if not rows: + return pd.DataFrame() + + df = pd.DataFrame(rows).sort_values('scan_number').reset_index(drop=True) + + # Fallback: if BTCUSDT prices are missing (scanner current_prices bug), + # fetch 1m klines from Binance and fill by timestamp. + if 'BTCUSDT' not in df.columns or df['BTCUSDT'].isna().all(): + btc_klines = _fetch_btcusdt_klines_fallback(date_str) + if btc_klines: + def _lookup_btc(ts_str: str) -> float: + # ts_str is ISO format from scan JSON; match to minute precision + ts_min = str(ts_str)[:16] # "YYYY-MM-DDTHH:MM" + return btc_klines.get(ts_min, np.nan) + df['BTCUSDT'] = df['timestamp'].apply(_lookup_btc) + # Forward-fill any gaps (scan timestamps between 1m bars) + df['BTCUSDT'] = df['BTCUSDT'].ffill().bfill() + + return df + + +def _load_scan_df_from_parquet(date_str: str) -> pd.DataFrame: + """Load daily scan data from vbt_cache_klines/ parquet (NG5+ preferred path). + + Returns DataFrame with vel_div + asset prices, or empty DataFrame if not available. + """ + parq_path = KLINES_DIR / f"{date_str}.parquet" + if not parq_path.exists(): + return pd.DataFrame() + try: + df = pd.read_parquet(parq_path) + if df.empty or 'vel_div' not in df.columns: + return pd.DataFrame() + return df.reset_index(drop=True) + except Exception: + return pd.DataFrame() + + +# ── Tasks ─────────────────────────────────────────────────────────────────────── + +@task(name="load_config", retries=0) +def load_config(config_path: str) -> dict: + with open(config_path) as f: + return yaml.safe_load(f) + + +@task(name="load_day_scans", retries=2, retry_delay_seconds=10) +def load_day_scans(date_str: str) -> pd.DataFrame: + """Load scan data for one date → VBT-compatible DataFrame. + + Prefers vbt_cache_klines/ parquet (NG5 native, single file, fast). + Falls back to eigenvalues/ JSON (NG3 legacy) if parquet unavailable. + Extracts vel_div, eigenvalue features, and asset prices. + """ + log = get_run_logger() + + # ── Parquet path (NG5 native — preferred) ────────────────────────────────── + df = _load_scan_df_from_parquet(date_str) + if not df.empty: + log.info(f" [parquet] Loaded {len(df)} scans for {date_str} | cols={len(df.columns)}") + else: + # ── JSON fallback (NG3 legacy) ───────────────────────────────────────── + df = _load_scan_df_from_json(date_str) + if df.empty: + log.warning(f"No usable scan data for {date_str} in {SCANS_DIR}") + return pd.DataFrame() + log.info(f" [json] Loaded {len(df)} scans for {date_str} | cols={len(df.columns)}") + + if df.empty: + log.warning(f"No usable scan data for {date_str} in {SCANS_DIR}") + return pd.DataFrame() + + # Drop rows with NaN vel_div (warmup period at start of day) + valid = df['vel_div'].notna() + n_dropped = (~valid).sum() + df = df[valid].reset_index(drop=True) + + # Verify BTCUSDT prices present (required for vol gate and DC) + if 'BTCUSDT' not in df.columns: + log.error(f"BTCUSDT prices missing from scan data on {date_str} — cannot run engine") + return pd.DataFrame() + + log.info(f" Loaded {len(df)} scans for {date_str} | cols={len(df.columns)} | " + f"vel_div range=[{df['vel_div'].min():.4f}, {df['vel_div'].max():.4f}] " + f"| {n_dropped} warmup rows dropped") + return df + + +@task(name="run_engine_day", retries=0, persist_result=False) +def run_engine_day(date_str: str, df: pd.DataFrame, engine, vol_p60: float, posture: str = 'APEX', direction: int = -1) -> dict: + """Run one day through NDAlphaEngine. Returns daily stats dict.""" + log = get_run_logger() + if df.empty or posture == 'HIBERNATE': + log.warning(f"Empty DataFrame or HIBERNATE for {date_str} — skipping.") + return {'date': date_str, 'pnl': 0.0, 'capital': engine.capital, + 'trades': 0, 'boost': 1.0, 'beta': 0.0, 'mc_status': 'NO_DATA', 'posture': posture} + + asset_cols = [c for c in df.columns if c not in META_COLS] + + # Vol gate: rolling 50-bar std of BTC returns + bp = df['BTCUSDT'].values + dvol = np.full(len(df), np.nan) + for i in range(50, len(bp)): + seg = bp[max(0, i - 50):i] + if len(seg) >= 10 and seg[0] > 0: + dvol[i] = float(np.std(np.diff(seg) / seg[:-1])) + vol_ok = np.where(np.isfinite(dvol), dvol > vol_p60, False) + + # 1. Setup day + engine.begin_day(date_str, posture=posture, direction=direction) + + # 2. Bar stream (replaces batch process_day) + for ri in range(len(df)): + row = df.iloc[ri] + vd = row.get('vel_div') + if vd is None or not np.isfinite(float(vd)): + engine._global_bar_idx += 1 + continue + + v50_raw = row.get('v50_lambda_max_velocity') + v750_raw = row.get('v750_lambda_max_velocity') + v50_val = float(v50_raw) if (v50_raw is not None and np.isfinite(float(v50_raw))) else 0.0 + v750_val = float(v750_raw) if (v750_raw is not None and np.isfinite(float(v750_raw))) else 0.0 + + prices = {} + for ac in asset_cols: + p = row.get(ac) + if p is not None and p > 0 and np.isfinite(p): + prices[ac] = float(p) + + if not prices: + engine._global_bar_idx += 1 + continue + + # OB live step: fetch HZ snapshots and compute features BEFORE step_bar(). + # This populates the live caches so get_placement/get_signal/get_market() + # return real OBF values instead of NEUTRAL defaults. + if engine.ob_engine is not None: + try: + engine.ob_engine.step_live(list(prices.keys()), ri) + except Exception: + pass # OBF degraded → NEUTRAL values, trading continues + + engine.step_bar( + bar_idx=ri, + vel_div=float(vd), + prices=prices, + vol_regime_ok=bool(vol_ok[ri]), + v50_vel=v50_val, + v750_vel=v750_val, + ) + + # 3. Finalize day + result = engine.end_day() + result['posture'] = posture + log.info(f" {date_str}: PnL={result.get('pnl', 0):+.2f} " + f"T={result.get('trades', 0)} boost={result.get('boost', 1.0):.2f}x " + f"MC={result.get('mc_status', '?')} Posture={posture}") + return result + + +@task(name="write_hz_state", retries=3, retry_delay_seconds=5, persist_result=False) +def write_hz_state(hz_host: str, hz_cluster: str, imap_name: str, key: str, value: dict): + """Write state dict to Hazelcast IMap. Creates own client per call (serialization-safe).""" + client = hazelcast.HazelcastClient(cluster_name=hz_cluster, cluster_members=[hz_host]) + try: + client.get_map(imap_name).blocking().put(key, json.dumps(value)) + finally: + client.shutdown() + + +@task(name="log_pnl", retries=0, persist_result=False) +def log_pnl(log_dir: Path, date_str: str, result: dict, capital: float): + log_dir.mkdir(parents=True, exist_ok=True) + row = {**result, 'date': date_str, 'capital': capital, + 'logged_at': datetime.now(timezone.utc).isoformat()} + log_file = log_dir / f"paper_pnl_{date_str[:7]}.jsonl" + with open(log_file, 'a') as f: + f.write(json.dumps(row) + '\n') + + +# ── Flow ──────────────────────────────────────────────────────────────────────── + +@flow(name="dolphin-paper-trade", log_prints=True) +def dolphin_paper_trade_flow(config_path: str = "configs/blue.yml", + run_date: str = None, + instrument: bool = False): + """Daily paper trading flow. Processes one day of live eigenvalue scans. + + Scheduled at 00:05 UTC — processes yesterday's data. + Run manually with run_date='YYYY-MM-DD' for backtesting or debugging. + """ + log = get_run_logger() + + cfg = load_config(config_path) + strategy_name = cfg['strategy_name'] + eng_cfg = cfg['engine'] + pt_cfg = cfg['paper_trade'] + hz_cfg = cfg['hazelcast'] + dir_str = os.environ.get('DOLPHIN_DIRECTION', cfg.get('direction', 'short_only')) + direction_val = 1 if str(dir_str).strip().lower() in ['long', 'long_only', 'buy', '+1', '1'] else -1 + + target_date = run_date or (date.today() - timedelta(days=1)).isoformat() + log.info(f"=== {strategy_name.upper()} paper trade: {target_date} ===") + + # ── Lazy imports (numba JIT happens here) ────────────────────────────────── + from nautilus_dolphin.nautilus.proxy_boost_engine import create_d_liq_engine + from nautilus_dolphin.nautilus.adaptive_circuit_breaker import AdaptiveCircuitBreaker + from mc.mc_ml import DolphinForewarner + from nautilus_dolphin.nautilus.ob_features import OBFeatureEngine + from nautilus_dolphin.nautilus.hz_ob_provider import HZOBProvider + + client = hazelcast.HazelcastClient(cluster_name=HZ_CLUSTER, cluster_members=[HZ_HOST]) + imap_state = client.get_map(hz_cfg['imap_state']).blocking() + + # ---- Restore capital ---- + STATE_KEY = f"state_{strategy_name}_{target_date}" + restored_capital = pt_cfg['initial_capital'] + peak_capital = pt_cfg['initial_capital'] + stored_state = {} + engine_state = None + try: + raw = imap_state.get(STATE_KEY) or imap_state.get('latest') or '{}' + stored_state = json.loads(raw) + if stored_state.get('strategy') == strategy_name and stored_state.get('capital', 0) > 0: + restored_capital = float(stored_state['capital']) + peak_capital = float(stored_state.get('peak_capital', restored_capital)) + engine_state = stored_state.get('engine_state') + log.info(f"[STATE] Restored capital={restored_capital:.2f} from HZ") + except Exception as e: + log.warning(f"[STATE] HZ restore failed: {e} — using config capital") + + # ── Engine — D_LIQ_GOLD config (8x/9x LiquidationGuardEngine) ─────────── + # create_d_liq_engine() overrides max_leverage→8.0 / abs_max_leverage→9.0 + # internally (D_LIQ_SOFT_CAP / D_LIQ_ABS_CAP constants), regardless of what + # eng_cfg says. This is the certified gold leverage stack. + engine = create_d_liq_engine( + initial_capital = restored_capital, + vel_div_threshold = eng_cfg['vel_div_threshold'], + vel_div_extreme = eng_cfg['vel_div_extreme'], + min_leverage = eng_cfg['min_leverage'], + max_leverage = eng_cfg.get('max_leverage', 8.0), + abs_max_leverage = eng_cfg.get('abs_max_leverage', 9.0), + leverage_convexity = eng_cfg['leverage_convexity'], + fraction = eng_cfg['fraction'], + fixed_tp_pct = eng_cfg['fixed_tp_pct'], + stop_pct = eng_cfg['stop_pct'], + max_hold_bars = eng_cfg['max_hold_bars'], + use_direction_confirm= eng_cfg['use_direction_confirm'], + dc_lookback_bars = eng_cfg['dc_lookback_bars'], + dc_min_magnitude_bps = eng_cfg['dc_min_magnitude_bps'], + dc_skip_contradicts = eng_cfg['dc_skip_contradicts'], + dc_leverage_boost = eng_cfg['dc_leverage_boost'], + dc_leverage_reduce = eng_cfg['dc_leverage_reduce'], + use_asset_selection = eng_cfg['use_asset_selection'], + min_irp_alignment = eng_cfg['min_irp_alignment'], + use_sp_fees = eng_cfg['use_sp_fees'], + use_sp_slippage = eng_cfg['use_sp_slippage'], + sp_maker_entry_rate = eng_cfg['sp_maker_entry_rate'], + sp_maker_exit_rate = eng_cfg['sp_maker_exit_rate'], + use_ob_edge = eng_cfg['use_ob_edge'], + ob_edge_bps = eng_cfg['ob_edge_bps'], + ob_confirm_rate = eng_cfg['ob_confirm_rate'], + lookback = eng_cfg['lookback'], + use_alpha_layers = eng_cfg['use_alpha_layers'], + use_dynamic_leverage = eng_cfg['use_dynamic_leverage'], + seed = eng_cfg.get('seed', 42), + ) + engine.set_esoteric_hazard_multiplier(0.0) # gold spec: hazard=0 → base_max_leverage=8.0 + + if engine_state: + try: + engine.restore_state(engine_state) + log.info("[STATE] Restored full engine state (including open positions)") + except Exception as e: + log.error(f"[STATE] Failed to restore engine state: {e}") + + # ── ACB — preload w750 from recent history for valid p60 threshold ───────── + # w750 calibration always uses NPZ history (threshold is a population statistic). + # Daily ExF factors are sourced from HZ exf_latest (pre-lagged) when available; + # fall back to NPZ disk scan if HZ data is absent or stale (>12 h). + acb = AdaptiveCircuitBreaker() + acb.config.EIGENVALUES_PATH = SCANS_DIR # CRITICAL: override Windows default for Linux + recent_dates = _get_recent_scan_dates(ACB_HISTORY_DAYS) + if target_date not in recent_dates: + recent_dates = (recent_dates + [target_date])[-ACB_HISTORY_DAYS:] + acb.preload_w750(recent_dates) + log.info(f" ACB preloaded {len(recent_dates)} dates | w750_threshold={acb._w750_threshold:.6f}") + + # ── ACB HZ warm-up: pre-load today's boost from live exf_latest ──────────── + # The ExF service applies per-indicator lag BEFORE pushing to HZ, so the + # values in exf_latest are already delay-adjusted. Do NOT re-lag them. + _acb_hz_ok = False + try: + features_map = client.get_map('DOLPHIN_FEATURES').blocking() + exf_raw = features_map.get('exf_latest') + if exf_raw: + exf_snapshot = json.loads(exf_raw) + # Live w750 from latest scan (may be absent during warmup) + scan_raw = features_map.get('latest_eigen_scan') + w750_live: float | None = None + if scan_raw: + scan_data = json.loads(scan_raw) + w750_live = scan_data.get('w750_velocity') + + boost_info = acb.get_dynamic_boost_from_hz( + target_date, exf_snapshot, w750_velocity=w750_live, direction=direction_val + ) + stale_s = boost_info.get('max_staleness_s', 0) + log.info( + f" ACB HZ: boost={boost_info['boost']:.4f} beta={boost_info['beta']:.2f} " + f"signals={boost_info['signals']:.1f} staleness={stale_s:.0f}s" + ) + _acb_hz_ok = True + else: + log.warning(" ACB HZ: exf_latest not found — falling back to NPZ disk scan") + except ValueError as _ve: + log.warning(f" ACB HZ: snapshot stale ({_ve}) — falling back to NPZ disk scan") + except Exception as _e: + log.warning(f" ACB HZ: read failed ({_e}) — falling back to NPZ disk scan") + + if not _acb_hz_ok: + # NPZ fallback: get_dynamic_boost_for_date() will read eigenvalues/ on demand + log.info(f" ACB: using NPZ disk path for {target_date}") + + # ── Data Loading & Live OB Integration ──────────────────────────────────── + df = load_day_scans(target_date) + OB_ASSETS = [c for c in df.columns if c not in META_COLS] if not df.empty else ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT"] + + live_ob = HZOBProvider(hz_cluster=HZ_CLUSTER, hz_host=HZ_HOST) + ob_eng = OBFeatureEngine(live_ob) + # NOTE: HZOBProvider has no historical snapshots, so preload_date() is a no-op. + # OB features are fetched live via ob_eng.step_live() before each step_bar() call. + + # ── MC-Forewarner ────────────────────────────────────────────────────────── + forewarner = DolphinForewarner(models_dir=MC_MODELS_DIR) + mc_base_cfg = { + 'trial_id': 0, 'vel_div_threshold': -0.020, 'vel_div_extreme': -0.050, + 'use_direction_confirm': True, 'dc_lookback_bars': 7, + 'dc_min_magnitude_bps': 0.75, 'dc_skip_contradicts': True, + 'dc_leverage_boost': 1.00, 'dc_leverage_reduce': 0.50, + 'vd_trend_lookback': 10, 'min_leverage': 0.50, 'max_leverage': 5.00, + 'leverage_convexity': 3.00, 'fraction': 0.20, + 'use_alpha_layers': True, 'use_dynamic_leverage': True, + 'fixed_tp_pct': 0.0095, 'stop_pct': 1.00, 'max_hold_bars': 120, + 'use_sp_fees': True, 'use_sp_slippage': True, + 'sp_maker_entry_rate': 0.62, 'sp_maker_exit_rate': 0.50, + 'use_ob_edge': True, 'ob_edge_bps': 5.00, 'ob_confirm_rate': 0.40, + 'ob_imbalance_bias': -0.09, 'ob_depth_scale': 1.00, + 'use_asset_selection': True, 'min_irp_alignment': 0.45, 'lookback': 100, + 'acb_beta_high': 0.80, 'acb_beta_low': 0.20, 'acb_w750_threshold_pct': 60, + } + + engine.set_ob_engine(ob_eng) + engine.set_acb(acb) + engine.set_mc_forewarner(forewarner, mc_base_cfg) + engine.set_esoteric_hazard_multiplier(0.0) + if instrument: + engine._bar_log_enabled = True + + # vol_p60: 60th percentile of rolling 50-bar BTC return std + # Calibrated from 55-day NG3 champion window (Dec31–Feb25). + # TODO: compute adaptively from rolling scan history (Phase MIG3) + vol_p60 = pt_cfg.get('vol_p60', 0.000099) + + # ── Run ──────────────────────────────────────────────────────────────────── + # df was already loaded above to define OB_ASSETS + + # ── DOLPHIN_SAFETY (MIG3) ────────────────────────────────────────────────── + try: + safety_ref = client.get_cp_subsystem().get_atomic_reference('DOLPHIN_SAFETY').blocking() + safety_raw = safety_ref.get() + except Exception: + safety_ref = client.get_map('DOLPHIN_SAFETY').blocking() + safety_raw = safety_ref.get('latest') + + safety_state = json.loads(safety_raw) if safety_raw else {} + posture = safety_state.get('posture', 'APEX') + Rm = safety_state.get('Rm', 1.0) + log.info(f"[SURVIVAL STACK] Posture={posture} | Rm={Rm:.3f}") + + # Apply Rm to absolute max leverage + effective_max_lev = engine.abs_max_leverage * Rm + engine.abs_max_leverage = max(1.0, effective_max_lev) + + if posture == 'STALKER': + engine.abs_max_leverage = min(engine.abs_max_leverage, 2.0) + + result = run_engine_day(target_date, df, engine, vol_p60, posture=posture, direction=direction_val) + result['strategy'] = strategy_name + result['capital'] = engine.capital + + # ── Hazelcast & State Persistence ────────────────────────────────────────── + try: + # PnL write + imap_pnl = client.get_map(hz_cfg['imap_pnl']).blocking() + imap_pnl.put(target_date, json.dumps(result)) + + # State persist write + new_peak = max(engine.capital, peak_capital) + new_drawdown = 1.0 - (engine.capital / new_peak) if new_peak > 0 else 0.0 + + new_state = { + 'strategy': strategy_name, + 'capital': engine.capital, + 'date': target_date, + 'pnl': result.get('pnl', 0.0), + 'trades': result.get('trades', 0), + 'peak_capital': new_peak, + 'drawdown': new_drawdown, + 'last_date': target_date, + 'updated_at': datetime.now(timezone.utc).isoformat(), + 'engine_state': engine.get_state(), + } + imap_state.put('latest', json.dumps(new_state)) + imap_state.put(STATE_KEY, json.dumps(new_state)) + log.info(f" HZ write OK → state & {hz_cfg['imap_pnl']}[{target_date}]") + except Exception as e: + log.error(f"[STATE] HZ persist failed: {e}") + # Fallback: write to local JSON ledger + ledger_dir = Path(__file__).parent / pt_cfg.get('log_dir', 'paper_logs') + ledger_dir.mkdir(parents=True, exist_ok=True) + ledger_path = ledger_dir / f"state_ledger_{strategy_name}.jsonl" + with open(ledger_path, 'a') as f: + f.write(json.dumps({ + 'strategy': strategy_name, 'capital': engine.capital, + 'date': target_date, 'pnl': result.get('pnl', 0.0), + 'trades': result.get('trades', 0), 'peak_capital': peak_capital, + 'drawdown': 1.0 - engine.capital / max(engine.capital, peak_capital) if max(engine.capital, peak_capital) > 0 else 0.0 + }) + '\n') + finally: + live_ob.close() + client.shutdown() + + # ── Instrumentation (--instrument flag) ──────────────────────────────────── + if instrument: + instr_dir = Path(__file__).parent / pt_cfg['log_dir'] + instr_dir.mkdir(parents=True, exist_ok=True) + trades_instr_path = instr_dir / f"E2E_trades_{target_date}.csv" + with open(trades_instr_path, 'w', newline='') as f: + cw = csv.writer(f) + cw.writerow(['trade_id', 'asset', 'direction', 'entry_price', 'exit_price', + 'entry_bar', 'exit_bar', 'bars_held', 'leverage', 'notional', + 'pnl_pct', 'pnl_absolute', 'exit_reason', 'bucket_idx']) + for t in engine.trade_history: + cw.writerow([t.trade_id, t.asset, t.direction, + f"{t.entry_price:.6f}", f"{t.exit_price:.6f}", + t.entry_bar, t.exit_bar, t.bars_held, + f"{t.leverage:.4f}", f"{t.notional:.4f}", + f"{t.pnl_pct:.8f}", f"{t.pnl_absolute:.4f}", + t.exit_reason, t.bucket_idx]) + bars_instr_path = instr_dir / f"E2E_bars_{target_date}.csv" + with open(bars_instr_path, 'w', newline='') as f: + cw = csv.writer(f) + cw.writerow(['date', 'bar_idx', 'vel_div', 'vol_ok', 'posture', + 'regime_size_mult', 'position_open', 'boost', 'beta']) + for b in engine._bar_log: + cw.writerow([target_date, b['bar_idx'], f"{b['vel_div']:.8f}", + b['vol_ok'], b['posture'], f"{b['regime_size_mult']:.6f}", + b['position_open'], f"{b['boost']:.4f}", f"{b['beta']:.2f}"]) + log.info(f" Instrumentation → {trades_instr_path.name} ({len(engine.trade_history)} trades), " + f"{bars_instr_path.name} ({len(engine._bar_log)} bars)") + + # ── Disk log ─────────────────────────────────────────────────────────────── + log_pnl(Path(__file__).parent / pt_cfg['log_dir'], target_date, result, engine.capital) + + log.info(f"=== DONE: {strategy_name} {target_date} | " + f"PnL={result.get('pnl', 0):+.2f} | Capital={engine.capital:,.2f} ===") + return result + + +# ── CLI entry point ────────────────────────────────────────────────────────────── +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='DOLPHIN paper trading flow') + parser.add_argument('--config', default='configs/blue.yml', help='Strategy config YAML') + parser.add_argument('--date', default=None, help='YYYY-MM-DD (default: yesterday)') + parser.add_argument('--register', action='store_true', help='Register Prefect deployments') + parser.add_argument('--instrument', action='store_true', help='Write per-trade + bar CSVs to log_dir') + args = parser.parse_args() + + if args.register: + from prefect.client.schemas.schedules import CronSchedule as CS + for color, cfg_path in [('blue', 'configs/blue.yml'), ('green', 'configs/green.yml')]: + abs_cfg = str(Path(__file__).parent / cfg_path) + deployment = dolphin_paper_trade_flow.to_deployment( + name=f"dolphin-paper-{color}", + parameters={"config_path": abs_cfg}, + schedule=CS(cron="5 0 * * *", timezone="UTC"), + work_pool_name="dolphin", + tags=[color, "paper-trade", "dolphin"], + ) + deployment.apply() + print(f"Registered: dolphin-paper-{color}") + else: + os.environ.setdefault('PREFECT_API_URL', 'http://localhost:4200/api') + dolphin_paper_trade_flow(config_path=args.config, run_date=args.date, + instrument=args.instrument) diff --git a/prod/tests/test_long_capability_layers.py b/prod/tests/test_long_capability_layers.py new file mode 100644 index 0000000..079c04b --- /dev/null +++ b/prod/tests/test_long_capability_layers.py @@ -0,0 +1,194 @@ +import math +import sys +from pathlib import Path +from types import SimpleNamespace + +import pytest + +ROOT = Path("/mnt/dolphinng5_predict") +sys.path.insert(0, str(ROOT / "nautilus_dolphin")) +sys.path.insert(1, str(ROOT)) +if "nautilus_dolphin" in sys.modules: + pkg = sys.modules["nautilus_dolphin"] + pkg_file = str(getattr(pkg, "__file__", "") or "") + if not pkg_file.endswith("nautilus_dolphin/nautilus_dolphin/__init__.py"): + del sys.modules["nautilus_dolphin"] + +from nautilus_dolphin.nautilus.adaptive_circuit_breaker import AdaptiveCircuitBreaker, ACBConfig +from nautilus_dolphin.nautilus.alpha_bet_sizer import AlphaBetSizer +from nautilus_dolphin.nautilus.alpha_exit_manager import AlphaExitManager +from nautilus_dolphin.nautilus.alpha_signal_generator import AlphaSignalGenerator +from nautilus_dolphin.nautilus.esf_alpha_orchestrator import NDAlphaEngine +from nautilus_dolphin.nautilus.dolphin_actor import _trade_direction_from_config + + +def test_signal_generator_long_gate_and_dc_are_side_aware(): + gen = AlphaSignalGenerator(use_direction_confirm=True) + rising_prices = [100.0, 100.1, 100.2, 100.3, 100.4, 100.5, 100.6, 101.0] + falling_prices = [101.0, 100.8, 100.6, 100.4, 100.2, 100.0, 99.8, 99.5] + + long_sig = gen.generate( + vel_div=0.025, + vel_div_history=[0.012] * 10, + asset_price_history=rising_prices, + trade_direction=1, + ) + assert long_sig.is_valid + assert long_sig.direction == 1 + assert long_sig.dc_status == "CONFIRM" + + contradicted = gen.generate( + vel_div=0.025, + vel_div_history=[0.012] * 10, + asset_price_history=falling_prices, + trade_direction=1, + ) + assert not contradicted.is_valid + assert contradicted.dc_status == "SKIP_CONTRADICT" + + +def test_bet_sizer_trend_multiplier_is_direction_aware_for_long(): + sizer = AlphaBetSizer( + base_fraction=0.20, + min_leverage=0.5, + max_leverage=8.0, + use_alpha_layers=True, + use_dynamic_leverage=True, + ) + favorable_long = sizer.calculate_size(25000, 0.025, vel_div_trend=0.02, trade_direction=1) + adverse_long = sizer.calculate_size(25000, 0.025, vel_div_trend=-0.02, trade_direction=1) + favorable_short = sizer.calculate_size(25000, -0.035, vel_div_trend=-0.02, trade_direction=-1) + adverse_short = sizer.calculate_size(25000, -0.035, vel_div_trend=0.02, trade_direction=-1) + + assert favorable_long["fraction"] > adverse_long["fraction"] + assert favorable_short["fraction"] > adverse_short["fraction"] + + +def test_ndalphaengine_enters_long_when_begin_day_direction_is_long(): + engine = NDAlphaEngine( + initial_capital=1000.0, + use_asset_selection=False, + use_direction_confirm=False, + use_sp_fees=False, + use_sp_slippage=False, + use_ob_edge=False, + use_alpha_layers=False, + use_dynamic_leverage=False, + lookback=1, + ) + engine.begin_day("2026-05-08", posture="APEX", direction=1) + + res = engine.step_bar(0, vel_div=0.025, prices={"BTCUSDT": 100.0}, vol_regime_ok=True) + + assert res["entry"] is not None + assert res["entry"]["direction"] == 1 + assert engine.position is not None + assert engine.position.direction == 1 + + +def test_ndalphaengine_short_default_preserved(): + engine = NDAlphaEngine( + initial_capital=1000.0, + use_asset_selection=False, + use_direction_confirm=False, + use_sp_fees=False, + use_sp_slippage=False, + use_ob_edge=False, + use_alpha_layers=False, + use_dynamic_leverage=False, + lookback=1, + ) + engine.begin_day("2026-05-08", posture="APEX") + + res = engine.step_bar(0, vel_div=-0.035, prices={"BTCUSDT": 100.0}, vol_regime_ok=True) + + assert res["entry"] is not None + assert res["entry"]["direction"] == -1 + + +def test_acb_short_default_and_long_cache_are_side_separated(): + acb = AdaptiveCircuitBreaker() + acb._w750_threshold = 0.001 + bullish = { + "funding_btc": 0.0002, + "dvol_btc": 30.0, + "fng": 80.0, + "taker": 1.25, + "available": True, + } + short = acb._calculate_signals(bullish) + long = acb._calculate_signals(bullish, direction=1) + + assert short["signals"] == pytest.approx(0.0) + assert long["signals"] == pytest.approx(4.0) + + snap = dict(bullish, _acb_ready=True, _staleness_s={}) + short_hz = acb.get_dynamic_boost_from_hz("2026-05-08", snap, w750_velocity=0.002, direction=-1) + long_hz = acb.get_dynamic_boost_from_hz("2026-05-08", snap, w750_velocity=0.002, direction=1) + + assert short_hz["side"] == "SHORT" + assert long_hz["side"] == "LONG" + assert short_hz["boost"] == pytest.approx(1.0) + assert long_hz["boost"] == pytest.approx(1.0 + 0.5 * math.log1p(4.0)) + assert acb.get_dynamic_boost_for_date("2026-05-08")["side"] == "SHORT" + assert acb.get_dynamic_boost_for_date("2026-05-08", direction=1)["side"] == "LONG" + + +def test_acb_short_threshold_regression_values_still_match_v6(): + acb = AdaptiveCircuitBreaker() + factors = { + "funding_btc": -0.0002, + "dvol_btc": 85.0, + "fng": 20.0, + "taker": 0.75, + "available": True, + } + + result = acb._calculate_signals(factors) + + assert result["signals"] == pytest.approx(4.0) + assert result["severity"] == 7 + + +def test_acb_ob_beta_modulation_is_side_aware(): + acb = AdaptiveCircuitBreaker() + acb._w750_threshold = 0.001 + calm_ob = SimpleNamespace( + get_macro=lambda: SimpleNamespace(regime_signal=-1, depth_velocity=0.1, cascade_count=0) + ) + stress_ob = SimpleNamespace( + get_macro=lambda: SimpleNamespace(regime_signal=1, depth_velocity=-0.3, cascade_count=2) + ) + + long_calm = acb.get_dynamic_boost_from_hz( + "2026-05-08", {"_acb_ready": True, "_staleness_s": {}}, w750_velocity=0.002, ob_engine=calm_ob, direction=1 + ) + short_calm = acb.get_dynamic_boost_from_hz( + "2026-05-09", {"_acb_ready": True, "_staleness_s": {}}, w750_velocity=0.002, ob_engine=calm_ob, direction=-1 + ) + short_stress = acb.get_dynamic_boost_from_hz( + "2026-05-10", {"_acb_ready": True, "_staleness_s": {}}, w750_velocity=0.002, ob_engine=stress_ob, direction=-1 + ) + + assert long_calm["beta"] == pytest.approx(1.0) + assert short_calm["beta"] == pytest.approx(0.68) + assert short_stress["beta"] == pytest.approx(1.0) + + +def test_exit_manager_optional_vd_exit_is_long_aware(): + manager = AlphaExitManager(vd_enabled=True, vd_consec_bars=2) + manager.setup_position("long-1", entry_price=100.0, direction=1, entry_bar=0) + + first = manager.evaluate("long-1", current_price=100.1, current_bar=1, vel_div=-0.02) + second = manager.evaluate("long-1", current_price=100.1, current_bar=2, vel_div=-0.02) + + assert first["action"] == "HOLD" + assert second["action"] == "EXIT" + assert second["reason"] == "VD_INVALIDATION" + + +def test_prodgreen_direction_parser_is_explicit_and_case_insensitive(): + assert _trade_direction_from_config("LONG_ONLY") == 1 + assert _trade_direction_from_config("short_only") == -1 + with pytest.raises(ValueError): + _trade_direction_from_config("bidirectional")