diff --git a/prod/bingx/sandbox_status.py b/prod/bingx/sandbox_status.py new file mode 100644 index 0000000..3bbb37b --- /dev/null +++ b/prod/bingx/sandbox_status.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +DEFAULT_SANDBOX_STATUS_PATH = Path("/tmp/bingx_sandbox_status.json") + + +@dataclass(frozen=True) +class BingxSandboxStatus: + """Small sidecar snapshot for BingX demo/testnet state. + + The snapshot is intentionally local-only so it can be used by tests and + operators without writing into BLUE state, ClickHouse, or production logs. + """ + + ts: str + environment: str + balance: float + equity: float + available_margin: float + unrealized_profit: float + used_margin: float + open_positions: int + open_orders: int + account_currency: str = "VST" + clean: bool = False + notes: dict[str, Any] | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "ts": self.ts, + "environment": self.environment, + "account_currency": self.account_currency, + "balance": self.balance, + "equity": self.equity, + "available_margin": self.available_margin, + "unrealized_profit": self.unrealized_profit, + "used_margin": self.used_margin, + "open_positions": self.open_positions, + "open_orders": self.open_orders, + "clean": self.clean, + "notes": self.notes or {}, + } + + +def _safe_float(value: Any, default: float = 0.0) -> float: + try: + out = float(value) + except Exception: + return default + return out if out == out else default + + +def _count_positions(positions: Any) -> int: + if isinstance(positions, list): + return sum(1 for item in positions if isinstance(item, dict)) + return 0 + + +def _count_orders(open_orders: Any) -> int: + if isinstance(open_orders, dict): + orders = open_orders.get("orders") + if isinstance(orders, list): + return sum(1 for item in orders if isinstance(item, dict)) + if isinstance(open_orders, list): + return sum(1 for item in open_orders if isinstance(item, dict)) + return 0 + + +def build_sandbox_status( + *, + balance_payload: dict[str, Any], + positions_payload: Any, + open_orders_payload: Any, + environment: str = "VST", + account_currency: str = "VST", + notes: dict[str, Any] | None = None, +) -> BingxSandboxStatus: + balance_row = balance_payload.get("balance", balance_payload) if isinstance(balance_payload, dict) else {} + if not isinstance(balance_row, dict): + balance_row = {} + balance = _safe_float(balance_row.get("balance"), 0.0) + equity = _safe_float(balance_row.get("equity"), balance) + available_margin = _safe_float(balance_row.get("availableMargin"), 0.0) + unrealized_profit = _safe_float(balance_row.get("unrealizedProfit"), 0.0) + used_margin = _safe_float(balance_row.get("usedMargin"), 0.0) + open_positions = _count_positions(positions_payload) + open_orders = _count_orders(open_orders_payload) + return BingxSandboxStatus( + ts=datetime.now(timezone.utc).isoformat(), + environment=str(environment), + account_currency=str(account_currency), + balance=balance, + equity=equity, + available_margin=available_margin, + unrealized_profit=unrealized_profit, + used_margin=used_margin, + open_positions=open_positions, + open_orders=open_orders, + clean=(open_positions == 0 and open_orders == 0), + notes=notes or {}, + ) + + +def snapshot_path(path: str | Path | None = None) -> Path: + return Path(path) if path is not None else DEFAULT_SANDBOX_STATUS_PATH + + +def write_sandbox_status(status: BingxSandboxStatus, path: str | Path | None = None) -> Path: + target = snapshot_path(path) + target.write_text(json.dumps(status.to_dict(), indent=2, sort_keys=True)) + return target + + +def load_sandbox_status(path: str | Path | None = None) -> dict[str, Any] | None: + target = snapshot_path(path) + if not target.exists(): + return None + try: + return json.loads(target.read_text()) + except Exception: + return None diff --git a/prod/nautilus_event_trader.py b/prod/nautilus_event_trader.py index 4dcbc08..8f721e7 100644 --- a/prod/nautilus_event_trader.py +++ b/prod/nautilus_event_trader.py @@ -4,12 +4,14 @@ DOLPHIN Nautilus Event-Driven Trader """ import sys import json +import hashlib import math import os import time import signal import threading import urllib.request +import uuid from typing import Optional from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone @@ -30,11 +32,23 @@ from nautilus_dolphin.nautilus.esf_alpha_orchestrator import NDPosition from nautilus_dolphin.nautilus.adaptive_circuit_breaker import AdaptiveCircuitBreaker from nautilus_dolphin.nautilus.ob_features import OBFeatureEngine from nautilus_dolphin.nautilus.ob_provider import MockOBProvider -from nautilus_dolphin.nautilus.esof_size_gate import parse_esof_payload, esof_gate_from_payload +from nautilus_dolphin.nautilus.esof_size_gate import ( + parse_esof_payload, esof_gate_from_payload, esof_score_from_payload, + esof_size_mult_from_score, ESOF_STALE_FALLBACK_MULT, ESOF_FRESHNESS_S, +) +try: + sys.path.insert(0, '/mnt/dolphinng5_predict/Observability') + from esof_advisor import compute_esof as _compute_esof_inline +except Exception: + _compute_esof_inline = None try: from adaptive_exit.market_state_runtime import MarketStateRuntime except Exception: MarketStateRuntime = None +try: + from adaptive_exit.advanced_sl import AdvancedSLRuntime +except Exception: + AdvancedSLRuntime = None try: from adaptive_exit.sc_threshold_advisor import SCThresholdAdvisor except Exception: @@ -48,6 +62,10 @@ try: from adaptive_exit.bounce_advisor import BounceAdvisor except Exception: BounceAdvisor = None +try: + from adaptive_exit.post_win_long_overlay import PostWinExecutionFSM +except Exception: + PostWinExecutionFSM = None try: from nautilus_dolphin.nautilus.alpha_exit_v7_engine import AlphaExitEngineV7, TradeContextV7 except Exception: @@ -83,7 +101,7 @@ ENGINE_KWARGS = dict( initial_capital=25000.0, vel_div_threshold=-0.02, vel_div_extreme=-0.05, min_leverage=0.5, max_leverage=8.0, # note: create_d_liq_engine overrides to D_LIQ_SOFT_CAP=8.0 leverage_convexity=3.0, - fraction=0.20, fixed_tp_pct=0.0095, stop_pct=1.0, max_hold_bars=250, # gold spec: 250 + fraction=0.20, fixed_tp_pct=0.0020, stop_pct=1.0, max_hold_bars=250, # TP research 2026-05-11: 0.95→0.20% use_direction_confirm=True, dc_lookback_bars=7, dc_min_magnitude_bps=0.75, dc_skip_contradicts=True, dc_leverage_boost=1.0, dc_leverage_reduce=0.5, use_asset_selection=True, min_irp_alignment=0.0, # gold spec: no IRP filter @@ -118,6 +136,13 @@ def _direction_label(direction: int) -> str: return "LONG" if int(direction) == 1 else "SHORT" +def _normalize_v7_exit_reason(reason: str) -> str: + text = str(reason or "").strip() + if text == "V7_MAE_SL_VOL_NORM": + return "V7.1_MAE_SL_VOL_NORM" + return text + + def _safe_float(value, default: float = 0.0) -> float: try: out = float(value) @@ -175,7 +200,28 @@ _BUCKET_SL_PCT: dict = { # Gold-calibrated from full 5-year BTC history: 0.00026414 (stricter, ~2.7x tighter). # 2026-04-07: switched to 56-day gold window value (0.00009868) — the exact threshold # used in the T=2155 ROI=+189% backtest. More permissive; paper trading to gather data. -VOL_P60_THRESHOLD = 0.00009868 +# 2026-05-09 weekend mode: runtime-configurable lower gate for low-vol tape. +# +# Legacy references preserved: +# VOL_P60_THRESHOLD_LEGACY_MAIN = 0.00026414 +# VOL_P60_THRESHOLD_GOLD_56D = 0.00009868 +VOL_P60_THRESHOLD_LEGACY_MAIN = 0.00026414 +VOL_P60_THRESHOLD_GOLD_56D = 0.00009868 +VOL_P60_THRESHOLD_WEEKEND_DEFAULT = 0.00003 +VOL_P60_THRESHOLD_RELAXED_TEMP = 0.00015838 + + +def _vol_p60_threshold_from_env(default: float = VOL_P60_THRESHOLD_LEGACY_MAIN) -> float: + raw = os.environ.get("DOLPHIN_VOL_P60_THRESHOLD") + if raw is None: + return float(default) + try: + out = float(str(raw).strip()) + except Exception: + return float(default) + if not math.isfinite(out) or out <= 0.0: + return float(default) + return float(out) # Algorithm Versioning # v1_shakedown: v50-v150 (noise bug), loose vol gate @@ -196,6 +242,57 @@ def log(msg): with open(TRADE_LOG, 'a') as f: f.write(line + '\n') + +def _chain_digest(payload: dict) -> str: + """Stable digest for BLUE exit-chain state.""" + body = json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str).encode() + return hashlib.sha256(body).hexdigest() + + +def _build_chain_state( + *, + trade_id: str, + asset: str, + side: str, + entry_price: float, + quantity: float, + notional: float, + entry_bar: int, + entry_ts: int, + retraction_legs: int = 0, + realized_pnl_legs_total: float = 0.0, + chain_root_trade_id: str | None = None, + chain_head_leg_id: str | None = None, + chain_prev_leg_id: str = "", + chain_mode: str = "LIVE", +) -> dict: + """Build a deterministic chain snapshot for the current open trade head.""" + root = str(chain_root_trade_id or trade_id or "") + seq = max(0, int(retraction_legs)) + head = str(chain_head_leg_id or (f"{trade_id}:open" if seq <= 0 else f"{trade_id}:x{seq:03d}")) + prev = str(chain_prev_leg_id or "") + anchor = { + "trade_id": str(trade_id or ""), + "chain_root_trade_id": root, + "chain_head_leg_id": head, + "chain_prev_leg_id": prev, + "chain_seq": seq, + "chain_mode": str(chain_mode or "LIVE"), + "asset": str(asset or ""), + "side": str(side or "").upper(), + "entry_price": round(float(entry_price or 0.0), 12), + "quantity": round(float(quantity or 0.0), 12), + "notional": round(float(notional or 0.0), 12), + "entry_bar": int(entry_bar or 0), + "entry_ts": int(entry_ts or 0), + "retraction_legs": seq, + "realized_pnl_legs_total": round(float(realized_pnl_legs_total or 0.0), 12), + } + anchor["chain_token"] = _chain_digest(anchor) + anchor["chain_version"] = 1 + anchor["chain_kind"] = "ROOT" if seq <= 0 else "LEG" + return anchor + class DolphinLiveTrader: def __init__(self): self.eng = None @@ -205,7 +302,9 @@ class DolphinLiveTrader: self.pnl_map = None self.state_map = None self.heartbeat_map = None + self.control_map = None self.eng_lock = threading.Lock() + self._heartbeat_stop = threading.Event() self._dedup_lock = threading.Lock() # guards atomic check-and-set on last_scan_number self._scan_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="scan") self.last_scan_number = -1 @@ -241,11 +340,20 @@ class DolphinLiveTrader: self._bounce_advisor_last_log = 0.0 self._bounce_price_history: dict[str, deque] = {} self._market_state_runtime = MarketStateRuntime() if MarketStateRuntime is not None else None + self._advanced_sl = AdvancedSLRuntime.load() if AdvancedSLRuntime is not None else None self._hibernate_protect_active: str | None = None # trade_id being protected self._bucket_assignments: dict = {} # asset → KMeans bucket_id (loaded from pkl) self._last_esof_size_mult: float = 1.0 + self._restore_failed: bool = False + self._restore_failure_reason: str = "" + self._restore_source: str = "" self.trade_direction: int = _direction_from_env() + self.vol_p60_threshold: float = _vol_p60_threshold_from_env() + self._runtime_direction: int = self.trade_direction + self._efsm = PostWinExecutionFSM() if PostWinExecutionFSM is not None else None self._trade_announcement_center = None + self._processed_retract_commands: deque = deque(maxlen=5000) + self._processed_retract_set: set[str] = set() _seed_runtime_env(ANNOUNCEMENT_RUNTIME_ENV) if ANNOUNCEMENT_CONFIG.exists(): try: @@ -258,6 +366,29 @@ class DolphinLiveTrader: except Exception as e: log(f" Position announcements: {e}") self._trade_announcement_center = None + if self._efsm is not None: + log(" EFSM: loaded (post-win LONG overlay)") + if self._advanced_sl is not None: + log(" AdvancedSL: loaded (shadow prototype)") + + def _resolve_runtime_direction(self) -> int: + """Resolve active trade direction for the next eligible entry.""" + base = int(self.trade_direction) + if base != -1 or self._efsm is None: + return base + with self.eng_lock: + has_open_position = getattr(self.eng, "position", None) is not None + if has_open_position: + return base + return 1 if int(self._efsm.pending_slots) > 0 else base + + def _apply_runtime_direction(self) -> None: + """Apply current runtime direction to the engine regime.""" + resolved = self._resolve_runtime_direction() + with self.eng_lock: + if getattr(self.eng, "regime_direction", self.trade_direction) != resolved: + self.eng.regime_direction = resolved + self._runtime_direction = resolved def _build_engine(self): log("Building NDAlphaEngine...") @@ -269,6 +400,12 @@ class DolphinLiveTrader: self.eng = create_d_liq_engine(**engine_kwargs) log(f" Engine: {type(self.eng).__name__}") log(f" Direction: {_direction_label(self.trade_direction)} ({self.trade_direction:+d})") + log( + " VOL gate threshold: " + f"{self.vol_p60_threshold:.8f} " + f"(legacy_main={VOL_P60_THRESHOLD_LEGACY_MAIN:.8f}, gold_56d={VOL_P60_THRESHOLD_GOLD_56D:.8f}, " + f"relaxed_temp={VOL_P60_THRESHOLD_RELAXED_TEMP:.7f})" + ) log(f" ACB subday exits: {'ON' if engine_kwargs['allow_subday_acb_exit'] else 'OFF'}") log(f" Leverage: soft={self.eng.base_max_leverage}x abs={self.eng.abs_max_leverage}x") @@ -297,7 +434,7 @@ class DolphinLiveTrader: 'dc_leverage_boost': 1.00, 'dc_leverage_reduce': 0.50, 'vd_trend_lookback': 10, 'min_leverage': 0.50, 'max_leverage': 8.00, # gold spec 'leverage_convexity': 3.00, 'fraction': 0.20, 'use_alpha_layers': True, - 'use_dynamic_leverage': True, 'fixed_tp_pct': 0.0095, 'stop_pct': 1.00, + 'use_dynamic_leverage': True, 'fixed_tp_pct': 0.0020, 'stop_pct': 1.00, 'max_hold_bars': 250, 'use_sp_fees': True, 'use_sp_slippage': True, # gold spec 'sp_maker_entry_rate': 0.62, 'sp_maker_exit_rate': 0.50, 'use_ob_edge': True, 'ob_edge_bps': 5.00, 'ob_confirm_rate': 0.40, @@ -419,19 +556,99 @@ class DolphinLiveTrader: return None def _sync_esof_size_gate(self) -> None: - """Update the shared engine with the current continuous EsoF size multiplier.""" + """Update the shared engine with the current continuous EsoF size multiplier. + + When the HZ payload is stale or missing (daemon died, HZ restarted), + falls back to inline computation using the canonical compute_esof() from + esof_advisor.py — single implementation, no parallel code. + """ payload = self._read_esof_payload() - score, mult = esof_gate_from_payload(payload) + score = esof_score_from_payload(payload, max_age_s=ESOF_FRESHNESS_S) + source = "hz" + + if score is None and _compute_esof_inline is not None: + try: + inline = _compute_esof_inline() + score = esof_score_from_payload(inline, max_age_s=None) + if score is not None: + source = "inline" + payload = inline + except Exception: + pass + + mult = esof_size_mult_from_score(score) with self.eng_lock: if hasattr(self.eng, "set_esof_advisory_score"): self.eng.set_esof_advisory_score(score) if mult != self._last_esof_size_mult: self._last_esof_size_mult = mult if score is None: - log(f"EsoF size gate: neutral mult={mult:.2f} (no fresh score)") + log(f"EsoF size gate: STALE-FALLBACK mult={mult:.2f} (no HZ + no inline)") + elif source == "inline": + log(f"EsoF size gate: INLINE sc={score:+.3f} mult={mult:.2f} (HZ stale)") else: log(f"EsoF size gate: sc={score:+.3f} mult={mult:.2f}") + def _sync_tp_threshold(self) -> None: + """Read live TP threshold from HZ control plane and propagate to engine. + + HZ key: DOLPHIN_FEATURES["live_tp_threshold"] → JSON {"tp_pct": 0.0020, "ts": ...} + If absent or stale, keeps the current default (0.0020 from ENGINE_KWARGS). + A tighter TP cuts open positions immediately; a wider TP extends the hold. + """ + if not self.features_map: + return + try: + raw = self.features_map.blocking().get("live_tp_threshold") + if not raw: + return + payload = json.loads(raw) if isinstance(raw, str) else raw + tp_pct = float(payload.get("tp_pct", 0)) + if tp_pct <= 0: + return + with self.eng_lock: + old = self.eng.set_live_tp_pct(tp_pct) + if abs(old - tp_pct) > 1e-6: + log(f"TP threshold: {old*100:.2f}% → {tp_pct*100:.2f}% (HZ control plane)") + except Exception: + pass + + def _inject_obf_midprice(self, prices_dict: dict) -> dict: + """Override scan price for the open position's asset with live OB mid-price. + + Scan prices are quantized to ~4 decimal places (e.g. 0.1255 vs 0.1256), + which is too coarse for a 0.20% TP on low-priced assets. The OBF universe + service has live WebSocket bid/ask at ~0.1s resolution with full precision. + This method substitutes the scan price with (best_bid + best_ask) / 2 for + the position's asset only, so TP evaluation sees the true market price. + """ + pos = self.eng.position + if pos is None or not pos.asset: + return prices_dict + try: + raw = self.features_map.blocking().get("obf_universe_latest") + if not raw: + return prices_dict + obf = json.loads(raw) + asset_data = obf.get(pos.asset) + if not asset_data or not isinstance(asset_data, dict): + return prices_dict + best_bid = float(asset_data.get("best_bid", 0) or 0) + best_ask = float(asset_data.get("best_ask", 0) or 0) + if best_bid <= 0 or best_ask <= 0: + return prices_dict + mid = (best_bid + best_ask) / 2.0 + if pos.asset in prices_dict: + scan_px = prices_dict[pos.asset] + drift = abs(mid - scan_px) / scan_px if scan_px > 0 else 1.0 + if drift > 0.05: + return prices_dict + out = dict(prices_dict) + out[pos.asset] = mid + return out + except Exception: + return prices_dict + def _sync_sc_threshold_advisor(self, scan_number: int, vel_div: float) -> None: """Shadow-only advisory layer for tracking / future threshold learning.""" if self._sc_advisor is None: @@ -651,7 +868,7 @@ class DolphinLiveTrader: "decision_seq": seq, "bars_held": int(decision.get("bars_held", 0) or 0), "action": str(decision.get("action", "UNKNOWN") or "UNKNOWN"), - "reason": str(decision.get("reason") or ""), + "reason": _normalize_v7_exit_reason(decision.get("reason") or ""), "pnl_pct": float(decision.get("pnl_pct", 0.0) or 0.0), "mfe": float(decision.get("mfe", 0.0) or 0.0), "mae": float(decision.get("mae", 0.0) or 0.0), @@ -828,6 +1045,340 @@ class DolphinLiveTrader: except Exception as e: log(f"SC_GAUGE error: {e}") + def _resolve_trade_id(self, explicit: str | None = None, *, create_if_missing: bool = False) -> str: + """Resolve a trade_id from the event, live position, or pending entry.""" + tid = str(explicit or "").strip() + if tid: + return tid + pos = getattr(self.eng, "position", None) + if pos is not None: + pos_tid = str(getattr(pos, "trade_id", "") or "").strip() + if pos_tid: + return pos_tid + if len(self._pending_entries) == 1: + pending_tid = next(iter(self._pending_entries.keys())) + if pending_tid: + return pending_tid + if create_if_missing: + return uuid.uuid4().hex[:16] + return "" + + def _query_clickhouse_tsv( + self, + sql: str, + *, + db_candidates: tuple[str, ...] = ("dolphin", "dolphin_prodgreen"), + timeout: float = 5.0, + ) -> tuple[str, str]: + """Run a small ClickHouse HTTP query and return (raw_text, db_used).""" + import base64 as _b64 + + auth = "Basic " + _b64.b64encode(b"dolphin:dolphin_ch_2026").decode() + last_exc: Exception | None = None + for db in db_candidates: + try: + req = urllib.request.Request( + f"http://localhost:8123/?database={db}", + data=sql.encode(), + headers={"Authorization": auth}, + ) + with urllib.request.urlopen(req, timeout=timeout) as r: + return r.read().decode().strip(), db + except Exception as exc: + last_exc = exc + raise last_exc or RuntimeError("ClickHouse query failed") + + def _parse_capital_blob(self, raw, source: str) -> tuple[float, dict] | None: + """Parse a HZ/JSON state blob and validate the capital payload.""" + if not raw: + return None + try: + data = json.loads(raw) if isinstance(raw, str) else (raw if isinstance(raw, dict) else {}) + capital = float(data.get("capital", 0) or 0) + if capital >= 1.0 and math.isfinite(capital): + return capital, data + log(f" restore candidate rejected from {source}: capital={capital!r}") + except Exception as exc: + log(f" restore candidate parse failed from {source}: {exc}") + return None + + def _parse_timestamp_seconds(self, value) -> float | None: + """Parse epoch/ISO timestamps into UTC epoch seconds.""" + if value is None: + return None + try: + if isinstance(value, (int, float)): + ts = float(value) + elif isinstance(value, str): + text = value.strip() + if not text: + return None + try: + ts = float(text) + except ValueError: + dt = datetime.fromisoformat(text.replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + ts = dt.timestamp() + else: + return None + if not math.isfinite(ts): + return None + # Accept millisecond epochs as well. + if ts > 1.0e12: + ts /= 1000.0 + return ts if ts > 0 else None + except Exception: + return None + + def _extract_state_timestamp(self, blob: dict) -> float | None: + """Extract the best timestamp from a state blob.""" + if not isinstance(blob, dict): + return None + for key in ("updated_at", "timestamp", "ts", "iso"): + if key not in blob: + continue + parsed = self._parse_timestamp_seconds(blob.get(key)) + if parsed is not None: + return parsed + return None + + def _mark_restore_failure(self, reason: str) -> None: + """Mark restore as failed and force the trader into halt mode.""" + self._restore_failed = True + self._restore_failure_reason = reason + try: + with self.eng_lock: + if self.eng is not None: + self.eng.regime_dd_halt = True + self.eng._day_posture = "HIBERNATE" + except Exception: + pass + log(f"RESTORE HALT: {reason}") + + def _restore_capital_from_legacy_checkpoint(self) -> bool: + """Legacy escape hatch for the old scalar checkpoint path.""" + if not _env_bool("DOLPHIN_ALLOW_LEGACY_CAPITAL_CHECKPOINT", False): + return False + + def _try_load(raw, source): + parsed = self._parse_capital_blob(raw, source) + if parsed is None: + return False + capital, _ = parsed + self.eng.capital = capital + self._restore_source = source + log(f" Capital restored from legacy {source}: ${capital:,.2f}") + return True + + try: + raw = self.state_map.blocking().get("capital_checkpoint") + if _try_load(raw, "HZ capital_checkpoint"): + return True + except Exception as e: + log(f" capital HZ legacy restore failed: {e}") + + try: + if CAPITAL_DISK_CHECKPOINT.exists(): + raw = CAPITAL_DISK_CHECKPOINT.read_text() + if _try_load(raw, "disk capital_checkpoint"): + return True + except Exception as e: + log(f" capital disk legacy restore failed: {e}") + return False + + def _restore_capital_from_state(self) -> bool: + """Restore capital from live HZ state or ledger-backed snapshots.""" + parsed_state = {} + for key, label in ( + ("latest_nautilus", "HZ latest_nautilus"), + ("engine_snapshot", "HZ engine_snapshot"), + ): + try: + raw = self.state_map.blocking().get(key) + except Exception as e: + log(f" capital {key} read failed: {e}") + raw = None + parsed = self._parse_capital_blob(raw, label) + if parsed is not None: + capital, blob = parsed + parsed_state[key] = ( + label, + capital, + blob, + self._extract_state_timestamp(blob), + ) + + day_key = datetime.now(timezone.utc).strftime('%Y-%m-%d') + if self.pnl_map is not None: + try: + raw = self.pnl_map.blocking().get(day_key) + except Exception as e: + log(f" capital pnl_map[{day_key}] read failed: {e}") + raw = None + parsed = self._parse_capital_blob(raw, f"HZ pnl[{day_key}]") + if parsed is not None: + capital, blob = parsed + parsed_state["pnl_day"] = ( + f"HZ pnl[{day_key}]", + capital, + blob, + self._extract_state_timestamp(blob), + ) + + if parsed_state: + restore_tol = max(0.0001, _safe_float(os.environ.get("DOLPHIN_CAPITAL_RESTORE_TOL_PCT"), 0.002)) + stale_lag_s = max(0.0, _safe_float(os.environ.get("DOLPHIN_CAPITAL_SEED_STALE_LAG_SEC"), 180.0)) + force_latest_seed = _env_bool("DOLPHIN_FORCE_LATEST_NAUTILUS_RESTORE", False) + + def _mismatch(a: float, b: float) -> bool: + return abs(a - b) > max(1.0, abs(a) * restore_tol) + + # Common-sense restore order: + # 1) latest_nautilus = researched replay seed / operator-confirmed seed + # 2) daily pnl map = corroborating capital sensor + # 3) engine_snapshot = live observation only + if "latest_nautilus" in parsed_state: + label, capital, latest_blob, latest_ts = parsed_state["latest_nautilus"] + reject_latest = False + reject_details: list[str] = [] + mismatch_details: list[str] = [] + + if "pnl_day" in parsed_state: + pnl_label, pnl_capital, _, pnl_ts = parsed_state["pnl_day"] + if _mismatch(pnl_capital, capital): + mismatch_details.append( + f"{pnl_label} ${pnl_capital:,.2f}" + ) + if not force_latest_seed: + if latest_ts is None and pnl_ts is not None: + reject_latest = True + reject_details.append(f"{pnl_label} has timestamp, latest_nautilus does not") + elif latest_ts is not None and pnl_ts is not None and latest_ts + stale_lag_s < pnl_ts: + reject_latest = True + reject_details.append( + f"{pnl_label} is newer by {pnl_ts - latest_ts:.1f}s" + ) + if "engine_snapshot" in parsed_state: + engine_label, engine_capital, _, engine_ts = parsed_state["engine_snapshot"] + if _mismatch(engine_capital, capital): + mismatch_details.append( + f"{engine_label} ${engine_capital:,.2f}" + ) + if not force_latest_seed: + if latest_ts is None and engine_ts is not None: + reject_latest = True + reject_details.append(f"{engine_label} has timestamp, latest_nautilus does not") + elif latest_ts is not None and engine_ts is not None and latest_ts + stale_lag_s < engine_ts: + reject_latest = True + reject_details.append( + f"{engine_label} is newer by {engine_ts - latest_ts:.1f}s" + ) + + if reject_latest: + detail = "; ".join(reject_details) if reject_details else "freshness/consistency guard" + log( + " Capital seed mismatch: ignoring stale latest_nautilus " + f"${capital:,.2f} ({detail})" + ) + else: + self.eng.capital = capital + self._restore_source = label + if mismatch_details: + log( + " Capital sensor mismatch: preferring latest_nautilus " + f"${capital:,.2f} over " + ", ".join(mismatch_details) + ) + log(f" Capital restored from {label}: ${capital:,.2f}") + return True + + if "pnl_day" in parsed_state and "engine_snapshot" in parsed_state: + pnl_label, pnl_capital, _, pnl_ts = parsed_state["pnl_day"] + eng_label, eng_capital, _, eng_ts = parsed_state["engine_snapshot"] + if _mismatch(pnl_capital, eng_capital): + if pnl_ts is not None and eng_ts is not None: + if eng_ts > pnl_ts: + log( + " Capital sensor mismatch: preferring fresher engine_snapshot " + f"${eng_capital:,.2f} over {pnl_label} ${pnl_capital:,.2f}" + ) + self.eng.capital = eng_capital + self._restore_source = eng_label + log(f" Capital restored from {eng_label}: ${eng_capital:,.2f}") + return True + elif eng_ts is not None and pnl_ts is None: + log( + " Capital sensor mismatch: preferring timestamped engine_snapshot " + f"${eng_capital:,.2f} over untimestamped {pnl_label} ${pnl_capital:,.2f}" + ) + self.eng.capital = eng_capital + self._restore_source = eng_label + log(f" Capital restored from {eng_label}: ${eng_capital:,.2f}") + return True + + if "pnl_day" in parsed_state: + label, capital, _, _ = parsed_state["pnl_day"] + self.eng.capital = capital + self._restore_source = label + log(f" Capital restored from {label}: ${capital:,.2f}") + return True + + label, capital, _, _ = parsed_state["engine_snapshot"] + self.eng.capital = capital + self._restore_source = label + log(f" Capital restored from {label}: ${capital:,.2f}") + return True + + for sql, label in ( + ( + "SELECT ts, capital, trades_executed, posture, phase " + "FROM status_snapshots ORDER BY ts DESC LIMIT 1 FORMAT TabSeparated", + "status_snapshots", + ), + ( + "SELECT ts, capital_after, capital_before, pnl, exit_reason, trade_id " + "FROM trade_events " + "WHERE strategy='blue' AND capital_after > 0 " + "ORDER BY ts DESC LIMIT 1 FORMAT TabSeparated", + "trade_events", + ), + ): + try: + raw, db = self._query_clickhouse_tsv(sql) + if not raw: + continue + cols = raw.split("\t") + capital = None + if label == "status_snapshots" and len(cols) >= 2: + capital = float(cols[1]) + elif label == "trade_events" and len(cols) >= 4: + cap_after = float(cols[1]) + cap_before = float(cols[2]) + pnl = float(cols[3]) + expected = cap_before + pnl + if math.isfinite(cap_after) and math.isfinite(expected): + if abs(cap_after - expected) <= max(1.0, abs(expected) * 0.002): + capital = cap_after + else: + log( + f" restore candidate rejected from {db}.{label}: " + f"capital_after={cap_after:.2f} expected={expected:.2f} " + f"exit_reason={cols[4] if len(cols) > 4 else ''}" + ) + if capital is not None and math.isfinite(capital) and capital >= 1.0: + self.eng.capital = capital + self._restore_source = f"{db}.{label}" + log(f" Capital restored from {db}.{label}: ${capital:,.2f}") + return True + except Exception as e: + log(f" capital {label} replay failed: {e}") + + if self._restore_capital_from_legacy_checkpoint(): + return True + + self._mark_restore_failure("no sane capital source found (HZ state and ledger replay unavailable)") + return False + # ── CH position-state persistence ───────────────────────────────────────── def _ps_write_open(self, tid: str, entry: dict): @@ -842,7 +1393,7 @@ class DolphinLiveTrader: "quantity": entry['quantity'], "notional": round(entry['quantity'] * entry['entry_price'], 4), "leverage": entry['leverage'], - "bucket_id": int(self._bucket_assignments.get(entry['asset'], -1)), + "bucket_id": int(getattr(self, "_bucket_assignments", {}).get(entry['asset'], -1)), "entry_bar": self.bar_idx, "status": "OPEN", "exit_reason": "", @@ -864,7 +1415,7 @@ class DolphinLiveTrader: "quantity": pending.get('quantity', 0.0), "notional": round(pending.get('quantity', 0.0) * pending.get('entry_price', 0.0), 4), "leverage": pending.get('leverage', 0.0), - "bucket_id": int(self._bucket_assignments.get(pending.get('asset', ''), -1)), + "bucket_id": int(getattr(self, "_bucket_assignments", {}).get(pending.get('asset', ''), -1)), "entry_bar": 0, "status": "CLOSED", "exit_reason": str(x.get('reason', 'UNKNOWN')), @@ -878,11 +1429,32 @@ class DolphinLiveTrader: """On startup: check CH for an OPEN position and restore engine state.""" try: import urllib.request, base64 as _b64 - sql = ("SELECT trade_id, asset, direction, entry_price, quantity, " - "notional, leverage, bucket_id, bars_held " - "FROM dolphin.position_state FINAL " - "WHERE status = 'OPEN' " - "ORDER BY ts DESC LIMIT 1 FORMAT TabSeparated") + # IMPORTANT: + # Never filter status='OPEN' first, otherwise stale historical OPEN rows + # can be resurrected forever even after a newer CLOSED row exists. + # Resolve latest row per trade_id first, then keep only currently-OPEN. + sql = ( + "SELECT trade_id, asset, direction, entry_price, quantity, " + "notional, leverage, bucket_id, bars_held " + "FROM (" + " SELECT " + " trade_id, " + " argMax(asset, ts) AS asset, " + " argMax(direction, ts) AS direction, " + " argMax(entry_price, ts) AS entry_price, " + " argMax(quantity, ts) AS quantity, " + " argMax(notional, ts) AS notional, " + " argMax(leverage, ts) AS leverage, " + " argMax(bucket_id, ts) AS bucket_id, " + " argMax(bars_held, ts) AS bars_held, " + " argMax(status, ts) AS status, " + " argMax(ts, ts) AS last_ts " + " FROM dolphin.position_state " + " GROUP BY trade_id" + ") " + "WHERE status = 'OPEN' " + "ORDER BY last_ts DESC LIMIT 1 FORMAT TabSeparated" + ) req = urllib.request.Request( "http://localhost:8123/?database=dolphin", data=sql.encode(), @@ -897,6 +1469,7 @@ class DolphinLiveTrader: cols = row.split('\t') if len(cols) < 9: log(f" position_state: unexpected row format: {row}") + self._mark_restore_failure("position_state row malformed") return trade_id = cols[0] @@ -909,8 +1482,67 @@ class DolphinLiveTrader: bucket_id = int(cols[7]) stored_bars = int(cols[8]) + if not trade_id.strip(): + self._mark_restore_failure("position_state row missing trade_id") + return + if not asset.strip(): + self._mark_restore_failure(f"position_state row missing asset for trade {trade_id}") + return + if direction not in (-1, 1): + self._mark_restore_failure(f"position_state row invalid direction for trade {trade_id}: {direction}") + return + if not (math.isfinite(entry_price) and entry_price > 0): + self._mark_restore_failure(f"position_state row invalid entry_price for trade {trade_id}: {entry_price}") + return + if not (math.isfinite(quantity) and quantity > 0): + self._mark_restore_failure(f"position_state row invalid quantity for trade {trade_id}: {quantity}") + return + if not (math.isfinite(notional) and notional > 0): + self._mark_restore_failure(f"position_state row invalid notional for trade {trade_id}: {notional}") + return + if not (math.isfinite(leverage) and leverage > 0): + self._mark_restore_failure(f"position_state row invalid leverage for trade {trade_id}: {leverage}") + return + if stored_bars < 0: + self._mark_restore_failure(f"position_state row invalid bars_held for trade {trade_id}: {stored_bars}") + return + derived_notional = quantity * entry_price + if math.isfinite(derived_notional) and derived_notional > 0: + if abs(notional - derived_notional) > max(1.0, abs(derived_notional) * 0.01): + log( + " position_state notional mismatch: " + f"stored={notional:.6f} derived={derived_notional:.6f} trade={trade_id} " + "— using derived value" + ) + notional = derived_notional + # Estimate entry_bar so MAX_HOLD countdown continues from where it left off restored_entry_bar = max(0, self.bar_idx - stored_bars) + chain_recon = self._load_chain_ledger_state(trade_id) + chain_meta = {} + if chain_recon: + chain_meta.update(chain_recon) + nested_chain = chain_recon.get("chain") + if isinstance(nested_chain, dict): + chain_meta.update(nested_chain) + chain_seed_pending = { + "asset": asset, + "side": 'SHORT' if direction == -1 else 'LONG', + "entry_price": entry_price, + "quantity": quantity, + "notional": notional, + "notional_entry": notional, + "leverage": leverage, + "entry_bar": int(chain_meta.get("entry_bar", restored_entry_bar) if chain_recon else restored_entry_bar), + "entry_ts": int(chain_meta.get("entry_ts", 0) or 0) if chain_recon else 0, + "retraction_legs": int(chain_meta.get("retraction_legs", chain_meta.get("chain_seq", 0)) or 0) if chain_recon else 0, + "realized_pnl_legs_total": float(chain_meta.get("realized_pnl_legs_total", 0.0) or 0.0) if chain_recon else 0.0, + } + try: + chain_state = self._chain_state_from_reconstruction(trade_id, chain_seed_pending, chain_recon) + except Exception as chain_err: + self._mark_restore_failure(str(chain_err)) + return pos = NDPosition( trade_id = trade_id, @@ -938,17 +1570,30 @@ class DolphinLiveTrader: # Rebuild _pending_entries so the exit CH write fires correctly side = 'SHORT' if direction == -1 else 'LONG' self._pending_entries[trade_id] = { + 'trade_id': trade_id, 'asset': asset, 'side': side, 'entry_price': entry_price, 'quantity': quantity, + 'notional': float(quantity * entry_price), + 'notional_entry': float(quantity * entry_price), 'leverage': leverage, 'vel_div_entry': 0.0, 'boost_at_entry': 1.0, 'beta_at_entry': 1.0, 'posture': 'RESTORED', - 'entry_ts': _ch_ts_us(), + 'entry_ts': int(chain_meta.get("entry_ts", _ch_ts_us()) or _ch_ts_us()) if chain_recon else _ch_ts_us(), 'entry_date': (self.current_day or ''), + 'retraction_legs': int(chain_state.get("chain_seq", 0) or 0), + 'realized_pnl_legs_total': float(chain_state.get("realized_pnl_legs_total", 0.0) or 0.0), + 'chain_root_trade_id': chain_state.get("chain_root_trade_id", trade_id), + 'chain_head_leg_id': chain_state.get("chain_head_leg_id", f"{trade_id}:open"), + 'chain_prev_leg_id': chain_state.get("chain_prev_leg_id", ""), + 'chain_seq': int(chain_state.get("chain_seq", 0) or 0), + 'chain_token': chain_state.get("chain_token", ""), + 'chain_mode': chain_state.get("chain_mode", "LIVE"), + 'chain_version': int(chain_state.get("chain_version", 1) or 1), + 'chain_kind': chain_state.get("chain_kind", "ROOT"), } if self._v7_exit_engine is not None: try: @@ -966,6 +1611,7 @@ class DolphinLiveTrader: except Exception as e: log(f" position_state restore error: {e}") + self._mark_restore_failure(f"position_state restore error: {e}") def _hibernate_protect_position(self): """Arm per-bucket TP+SL instead of immediate HIBERNATE_HALT. @@ -979,7 +1625,7 @@ class DolphinLiveTrader: pos = self.eng.position if pos is None: return - bucket = self._bucket_assignments.get(pos.asset, 'default') + bucket = getattr(self, "_bucket_assignments", {}).get(pos.asset, 'default') sl_pct = _BUCKET_SL_PCT.get(bucket, _BUCKET_SL_PCT['default']) tp_pct = self.eng.exit_manager.fixed_tp_pct @@ -1008,6 +1654,13 @@ class DolphinLiveTrader: self.pnl_map = self.hz_client.get_map("DOLPHIN_PNL_BLUE") self.state_map = self.hz_client.get_map("DOLPHIN_STATE_BLUE") self.heartbeat_map = self.hz_client.get_map("DOLPHIN_HEARTBEAT") + self.control_map = self.hz_client.get_map("DOLPHIN_CONTROL_PLANE") + if self._advanced_sl is not None: + try: + self._advanced_sl.bind_hz(features_map=self.features_map, state_map=self.state_map) + self._advanced_sl.publish_control_plane() + except Exception: + pass # Immediate heartbeat — prevents Cat1=0 during startup gap try: self.heartbeat_map.blocking().put('nautilus_flow_heartbeat', json.dumps({ @@ -1019,6 +1672,23 @@ class DolphinLiveTrader: except Exception: pass log(" Hz connected") + + def _heartbeat_loop(self): + """Out-of-band heartbeat writer (independent of scan loop).""" + while not self._heartbeat_stop.is_set(): + try: + if self.heartbeat_map is not None: + hb = json.dumps({ + 'ts': time.time(), + 'iso': datetime.now(timezone.utc).isoformat(), + 'run_date': self.current_day, + 'phase': 'trading', + 'flow': 'nautilus_event_trader', + }) + self.heartbeat_map.blocking().put('nautilus_flow_heartbeat', hb) + except Exception as e: + log(f" Heartbeat loop put failed: {e}") + self._heartbeat_stop.wait(10.0) def _read_posture(self): now = time.time() @@ -1075,6 +1745,327 @@ class DolphinLiveTrader: f"begin_day({today}) called with posture={posture} " f"direction={_direction_label(self.trade_direction)}" ) + + def _mark_retract_command_seen(self, command_id: str) -> None: + if not command_id or command_id in self._processed_retract_set: + return + self._processed_retract_commands.append(command_id) + self._processed_retract_set.add(command_id) + + def _build_retract_exit(self, *, trade_id: str, reason: str, bars_held: int, pnl_pct: float, net_pnl: float) -> dict: + return { + "trade_id": trade_id, + "reason": reason, + "bars_held": int(max(0, bars_held)), + "pnl_pct": float(pnl_pct), + "net_pnl": float(net_pnl), + } + + def _chain_state_for_pending( + self, + trade_id: str, + pending: dict, + *, + chain_mode: str = "LIVE", + chain_head_leg_id: str | None = None, + chain_prev_leg_id: str | None = None, + chain_seq: int | None = None, + ) -> dict: + """Return the canonical linked-list state for the current open trade head.""" + seq = int(chain_seq if chain_seq is not None else pending.get("retraction_legs", 0) or 0) + quantity = float(pending.get("quantity", 0.0) or 0.0) + entry_price = float(pending.get("entry_price", 0.0) or 0.0) + notional = float(pending.get("notional", pending.get("notional_entry", 0.0)) or 0.0) + entry_bar = int(pending.get("entry_bar", 0) or 0) + entry_ts = int(pending.get("entry_ts", 0) or 0) + realized = float(pending.get("realized_pnl_legs_total", 0.0) or 0.0) + return _build_chain_state( + trade_id=str(trade_id or ""), + asset=str(pending.get("asset", "") or ""), + side=str(pending.get("side", "") or "SHORT"), + entry_price=entry_price, + quantity=quantity, + notional=notional, + entry_bar=entry_bar, + entry_ts=entry_ts, + retraction_legs=seq, + realized_pnl_legs_total=realized, + chain_root_trade_id=str(pending.get("chain_root_trade_id", trade_id) or trade_id), + chain_head_leg_id=chain_head_leg_id or pending.get("chain_head_leg_id"), + chain_prev_leg_id=chain_prev_leg_id if chain_prev_leg_id is not None else str(pending.get("chain_prev_leg_id", "") or ""), + chain_mode=chain_mode, + ) + + def _load_chain_ledger_state(self, trade_id: str) -> dict | None: + """Load the latest reconstruction payload for a trade, if ClickHouse is reachable.""" + try: + import base64 as _b64 + escaped_tid = str(trade_id or "").replace("'", "''") + sql = ( + "SELECT event_type, event_id, payload_json " + "FROM dolphin.trade_reconstruction " + f"WHERE trade_id = '{escaped_tid}' " + "ORDER BY ts DESC LIMIT 1 FORMAT JSONEachRow" + ) + req = urllib.request.Request( + "http://localhost:8123/?database=dolphin", + data=sql.encode(), + headers={"Authorization": "Basic " + + _b64.b64encode(b"dolphin:dolphin_ch_2026").decode()}, + ) + with urllib.request.urlopen(req, timeout=5) as r: + raw = r.read().decode().strip() + if not raw: + return None + row = json.loads(raw.splitlines()[0]) + payload = json.loads(row.get("payload_json", "{}") or "{}") + payload["event_type"] = row.get("event_type", "") + payload["event_id"] = row.get("event_id", "") + return payload + except Exception: + return None + + def _chain_state_from_reconstruction(self, trade_id: str, pending: dict, recon: dict | None) -> dict: + """Merge reconstruction payload chain hints with the current live state.""" + chain_data = {} + seq = 0 + prev_leg_id = "" + head_leg_id = f"{trade_id}:open" + chain_mode = "LEGACY" + if recon: + chain_data.update(recon) + nested = recon.get("chain") + if isinstance(nested, dict): + chain_data.update(nested) + seq = int(chain_data.get("chain_seq", chain_data.get("retraction_legs", 0)) or 0) + prev_leg_id = str(chain_data.get("chain_prev_leg_id", "") or "") + head_leg_id = str(chain_data.get("chain_head_leg_id", "") or head_leg_id) + chain_mode = str(chain_data.get("chain_mode", "LIVE") or "LIVE") + if "chain_token" not in chain_data: + chain_mode = "LEGACY_REBUILT" + chain = self._chain_state_for_pending( + trade_id, + pending, + chain_mode=chain_mode, + chain_head_leg_id=head_leg_id, + chain_prev_leg_id=prev_leg_id, + chain_seq=seq, + ) + if chain_data.get("chain_token"): + expected = str(chain_data.get("chain_token", "") or "") + if expected != chain.get("chain_token"): + raise ValueError( + f"chain token mismatch for trade {trade_id}: " + f"stored={expected[:12]} derived={chain.get('chain_token','')[:12]}" + ) + return chain + + def _apply_internal_retract(self, cmd: dict, prices_dict: dict) -> tuple[dict | None, str]: + """Apply partial retraction on in-memory BLUE position; returns (forced_exit, status).""" + with self.eng_lock: + pos = getattr(self.eng, "position", None) + if pos is None: + return None, "NO_POSITION" + tid = str(getattr(pos, "trade_id", "") or "") + if not tid: + return None, "NO_TRADE_ID" + req_tid = str(cmd.get("trade_id", "") or "").strip() + if req_tid and req_tid != tid: + return None, f"TRADE_MISMATCH open={tid} cmd={req_tid}" + pending = self._pending_entries.get(tid) or {} + side = str(pending.get("side", "SHORT") or "SHORT").upper() + entry_price = float(pending.get("entry_price", getattr(pos, "entry_price", 0.0)) or 0.0) + if entry_price <= 0: + return None, "BAD_ENTRY_PRICE" + open_notional = float(getattr(pos, "notional", 0.0) or 0.0) + if open_notional <= 0: + return None, "ZERO_NOTIONAL" + frac = float(cmd.get("fraction", 0.0) or 0.0) + if not (0.0 < frac <= 1.0): + return None, "BAD_FRACTION" + expected_chain = self._chain_state_for_pending(tid, pending) + cmd_chain_token = str(cmd.get("chain_token", "") or "").strip() + cmd_chain_head = str(cmd.get("chain_head_leg_id", "") or "").strip() + cmd_chain_root = str(cmd.get("chain_root_trade_id", "") or "").strip() + cmd_chain_seq = int(cmd.get("chain_seq", expected_chain["chain_seq"]) or expected_chain["chain_seq"]) + if not cmd_chain_token or not cmd_chain_head or not cmd_chain_root: + return None, "NO_CHAIN_LINK" + if cmd_chain_root != expected_chain["chain_root_trade_id"]: + return None, f"CHAIN_ROOT_MISMATCH expected={expected_chain['chain_root_trade_id']} cmd={cmd_chain_root}" + if cmd_chain_head != expected_chain["chain_head_leg_id"] or cmd_chain_token != expected_chain["chain_token"]: + return None, ( + f"CHAIN_MISMATCH head={expected_chain['chain_head_leg_id']} " + f"seq={expected_chain['chain_seq']} token={expected_chain['chain_token'][:12]}" + ) + if cmd_chain_seq != expected_chain["chain_seq"]: + return None, ( + f"CHAIN_SEQ_MISMATCH expected={expected_chain['chain_seq']} cmd={cmd_chain_seq}" + ) + reduce_notional = min(open_notional, open_notional * frac) + if reduce_notional <= 0.0: + return None, "ZERO_REDUCE_NOTIONAL" + current_price = float(prices_dict.get(pos.asset, getattr(pos, "current_price", entry_price)) or entry_price) + if current_price <= 0: + current_price = entry_price + direction = -1.0 if side == "SHORT" else 1.0 + pnl_pct_now = direction * ((current_price - entry_price) / entry_price) + net_pnl_leg = pnl_pct_now * reduce_notional + bars_held = max(0, int(self.bar_idx - int(pending.get("entry_bar", max(0, self.bar_idx - 1)) or max(0, self.bar_idx - 1)))) + self.eng.capital = float(getattr(self.eng, "capital", 0.0) or 0.0) + net_pnl_leg + remaining_notional = max(0.0, open_notional - reduce_notional) + pos.notional = remaining_notional + pos.current_price = current_price + pos.pnl_pct = pnl_pct_now + pending.setdefault("notional_entry", float(pending.get("notional", open_notional) or open_notional)) + pending["notional"] = remaining_notional + pending["quantity"] = round((remaining_notional / entry_price), 6) if entry_price > 0 else 0.0 + pending["retraction_legs"] = int(pending.get("retraction_legs", 0) or 0) + 1 + pending["realized_pnl_legs_total"] = float(pending.get("realized_pnl_legs_total", 0.0) or 0.0) + net_pnl_leg + leg_seq = int(pending["retraction_legs"]) + leg_id = f"{tid}:x{leg_seq:03d}" + chain_state = self._chain_state_for_pending( + tid, + { + **pending, + "chain_root_trade_id": expected_chain["chain_root_trade_id"], + "chain_prev_leg_id": expected_chain["chain_head_leg_id"], + "chain_head_leg_id": leg_id, + "chain_mode": "LIVE", + }, + chain_mode="LIVE", + chain_head_leg_id=leg_id, + chain_prev_leg_id=expected_chain["chain_head_leg_id"], + chain_seq=leg_seq, + ) + self._pending_entries[tid] = pending + pending.update(chain_state) + current_bars_held = bars_held + entry_bar = int(pending.get("entry_bar", max(0, self.bar_idx - current_bars_held)) or max(0, self.bar_idx - current_bars_held)) + ch_put("position_state", { + "ts": _ch_ts_us(), + "trade_id": tid, + "asset": str(getattr(pos, "asset", pending.get("asset", ""))), + "direction": -1 if side == "SHORT" else 1, + "entry_price": entry_price, + "quantity": pending["quantity"], + "notional": round(remaining_notional, 4), + "leverage": pending.get("leverage", getattr(pos, "leverage", 0.0)), + "bucket_id": int(getattr(self, "_bucket_assignments", {}).get(pending.get("asset", ""), -1)), + "entry_bar": entry_bar, + "status": "OPEN", + "exit_reason": "", + "pnl": float(pending.get("realized_pnl_legs_total", 0.0) or 0.0), + "bars_held": current_bars_held, + }) + ch_put("trade_exit_legs", { + "ts": _ch_ts_us(), + "date": str(pending.get("entry_date", self.current_day or "")), + "strategy": "blue", + "trade_id": tid, + "chain_root_trade_id": str(chain_state.get("chain_root_trade_id", tid) or tid), + "chain_head_leg_id": str(chain_state.get("chain_head_leg_id", leg_id) or leg_id), + "chain_prev_leg_id": str(chain_state.get("chain_prev_leg_id", "") or ""), + "chain_seq": int(chain_state.get("chain_seq", leg_seq) or leg_seq), + "chain_token": str(chain_state.get("chain_token", "") or ""), + "chain_mode": str(chain_state.get("chain_mode", "LIVE") or "LIVE"), + "exit_leg_id": leg_id, + "exit_seq": leg_seq, + "command_id": str(cmd.get("command_id", "")), + "source": str(cmd.get("source", "internal")), + "reason": str(cmd.get("reason", "RETRACT")), + "asset": str(getattr(pos, "asset", pending.get("asset", ""))), + "side": side, + "entry_price": entry_price, + "exit_price": current_price, + "fraction": frac, + "exit_notional": reduce_notional, + "remaining_notional": remaining_notional, + "remaining_qty": (remaining_notional / entry_price) if entry_price > 0 else 0.0, + "pnl_pct_leg": pnl_pct_now, + "pnl_leg": net_pnl_leg, + "pnl_realized_total": float(pending.get("realized_pnl_legs_total", 0.0) or 0.0), + "bars_held": bars_held, + }) + ch_put("trade_reconstruction", { + "ts": _ch_ts_us(), + "trade_id": tid, + "event_type": "PARTIAL_EXIT", + "event_id": leg_id, + "payload_json": json.dumps({ + "command": cmd, + "entry_price": entry_price, + "exit_price": current_price, + "exit_notional": reduce_notional, + "remaining_notional": remaining_notional, + "pnl_pct_leg": pnl_pct_now, + "pnl_leg": net_pnl_leg, + "pnl_realized_total": float(pending.get("realized_pnl_legs_total", 0.0) or 0.0), + "bar_idx": int(self.bar_idx), + "chain": chain_state, + }), + }) + if remaining_notional <= 1e-9: + self.eng.position = None + try: + self.eng.exit_manager._positions.pop(tid, None) + except Exception: + pass + total_realized = float(pending.get("realized_pnl_legs_total", 0.0) or 0.0) + denom = max(float(pending.get("notional_entry", open_notional) or open_notional), 1e-12) + forced = self._build_retract_exit( + trade_id=tid, + reason=str(cmd.get("reason", "RETRACT_FULL")), + bars_held=bars_held, + pnl_pct=total_realized / denom, + net_pnl=total_realized, + ) + return forced, "FULL_CLOSE" + return None, "PARTIAL_OK" + + def _process_runtime_commands(self, prices_dict: dict) -> dict | None: + """Drain BLUE runtime commands from control plane and apply retractions.""" + if self.control_map is None: + return None + key = "blue_runtime_commands" + try: + raw = self.control_map.blocking().get(key) + if not raw: + return None + queue = json.loads(raw) if isinstance(raw, str) else list(raw) + if not isinstance(queue, list) or not queue: + return None + self.control_map.blocking().put(key, json.dumps([])) + except Exception as e: + log(f"RUNTIME_CMD read failed: {e}") + return None + forced_exit = None + for cmd in queue: + if not isinstance(cmd, dict): + continue + cid = str(cmd.get("command_id", "") or "") + if cid and cid in self._processed_retract_set: + ch_put("hotkey_audit", { + "ts": int(time.time() * 1000), + "hotkey": "RETRACT_REPLAY", + "request_json": json.dumps(cmd, default=str), + "result": "IDEMPOTENT_REPLAY", + "effect_json": json.dumps({}, default=str), + }) + continue + if str(cmd.get("action", "") or "").upper() != "RETRACT": + continue + fx, status = self._apply_internal_retract(cmd, prices_dict) + self._mark_retract_command_seen(cid) + ch_put("hotkey_audit", { + "ts": int(time.time() * 1000), + "hotkey": "RETRACT", + "request_json": json.dumps(cmd, default=str), + "result": status, + "effect_json": json.dumps({"forced_exit": bool(fx)}, default=str), + }) + if fx is not None: + forced_exit = fx + return forced_exit def _compute_vol_ok(self, scan): assets = scan.get('assets', []) @@ -1091,7 +2082,7 @@ class DolphinLiveTrader: import numpy as np arr = np.array(self.btc_prices) dvol = float(np.std(np.diff(arr) / arr[:-1])) - return dvol > VOL_P60_THRESHOLD + return dvol > float(self.vol_p60_threshold) @staticmethod def _normalize_ng7(scan: dict) -> dict: @@ -1140,14 +2131,14 @@ class DolphinLiveTrader: def on_scan(self, event): """Reactor-thread entry point — dispatches immediately to worker thread.""" - if not event.value: + if self._restore_failed or not event.value: return listener_time = time.time() self._scan_executor.submit(self._process_scan, event, listener_time) def _process_scan(self, event, listener_time): try: - if not event.value: + if self._restore_failed or not event.value: return scan = json.loads(event.value) if isinstance(event.value, str) else event.value @@ -1208,6 +2199,16 @@ class DolphinLiveTrader: if posture_now != prev_posture: if posture_now in ('TURTLE', 'HIBERNATE'): self.eng.regime_dd_halt = True # always block new entries + if posture_now == 'HIBERNATE' and self.eng.position is not None: + open_tid = str(getattr(self.eng.position, "trade_id", "") or "") + if not open_tid: + self._mark_restore_failure("HIBERNATE posture with open position missing trade_id") + return + if open_tid not in self._pending_entries: + self._mark_restore_failure( + f"HIBERNATE posture with open position missing pending entry: {open_tid}" + ) + return if (posture_now == 'HIBERNATE' and self.eng.position is not None and not self._hibernate_protect_active): @@ -1229,8 +2230,10 @@ class DolphinLiveTrader: # EsoF value gate — exposure only, no alpha or selection changes. self._sync_esof_size_gate() + self._sync_tp_threshold() self._sync_sc_threshold_advisor(scan_number=scan_number, vel_div=vel_div) self._sync_sc_gauge_advisor(scan_number=scan_number, vel_div=vel_div) + self._apply_runtime_direction() if self._market_state_runtime is not None: try: self._market_state_runtime.update_scan_state( @@ -1249,6 +2252,9 @@ class DolphinLiveTrader: except Exception as e: log(f" MarketStateRuntime scan update failed: {e}") + if self.eng.position is not None and prices_dict: + prices_dict = self._inject_obf_midprice(prices_dict) + step_start = time.time() with self.eng_lock: result = self.eng.step_bar( @@ -1277,13 +2283,26 @@ class DolphinLiveTrader: e = result['entry'] log(f"ENTRY: {e} [{ALGO_VERSION}]") # Cache entry fields for CH trade_events on exit - tid = e.get('trade_id') + tid = self._resolve_trade_id(e.get('trade_id'), create_if_missing=True) + e['trade_id'] = tid if tid: + efsm_decision = None + overlay_flip = False + if self._efsm is not None and int(e.get('direction', -1)) == 1 and int(self.trade_direction) == -1: + efsm_decision = self._efsm.tag_next_entry( + asset=str(e.get('asset', '') or ''), + entry_ts=datetime.now(timezone.utc), + metadata={"trade_id": tid}, + ) + overlay_flip = bool(efsm_decision and efsm_decision.action == "TAG" and efsm_decision.side == "LONG") self._pending_entries[tid] = { + 'trade_id': tid, 'asset': e.get('asset', ''), 'side': 'SHORT' if e.get('direction', -1) == -1 else 'LONG', 'entry_price': float(e.get('entry_price', 0) or 0), 'quantity': round(float(e.get('notional', 0) or 0) / float(e.get('entry_price', 1) or 1), 6), + 'notional': float(e.get('notional', 0) or 0), + 'notional_entry': float(e.get('notional', 0) or 0), 'leverage': float(e.get('leverage', 0) or 0), 'vel_div_entry': float(e.get('vel_div', 0) or 0), 'boost_at_entry': float(getattr(getattr(self, 'eng', None), 'acb_boost', 1.0) or 1.0), @@ -1292,9 +2311,35 @@ class DolphinLiveTrader: 'entry_ts': _ch_ts_us(), 'entry_date': (self.current_day or ''), 'entry_bar': self.bar_idx, + 'overlay_flip': overlay_flip, + 'overlay_reason': getattr(efsm_decision, "reason", "") if efsm_decision else "", + 'overlay_slot': int(getattr(efsm_decision, "consumed_slot", 0) or 0) if efsm_decision else 0, + 'retraction_legs': 0, + 'realized_pnl_legs_total': 0.0, } + self._pending_entries[tid].update(self._chain_state_for_pending( + tid, + self._pending_entries[tid], + chain_mode="LIVE", + chain_head_leg_id=f"{tid}:open", + chain_prev_leg_id="", + chain_seq=0, + )) + if overlay_flip: + log( + f"EFSM TAG: trade_id={tid} asset={e.get('asset','')} " + f"slot={self._pending_entries[tid]['overlay_slot']} " + f"reason={self._pending_entries[tid]['overlay_reason']}" + ) # Persist position to CH so restarts can recover it self._ps_write_open(tid, self._pending_entries[tid]) + ch_put("trade_reconstruction", { + "ts": _ch_ts_us(), + "trade_id": tid, + "event_type": "OPEN", + "event_id": f"{tid}:open", + "payload_json": json.dumps(self._pending_entries[tid], default=str), + }) self._announce_position_event( kind="trade_entry", severity="info", @@ -1476,6 +2521,34 @@ class DolphinLiveTrader: v750_vel=v750_vel, bar_idx=max(0, self.bar_idx - 1), ) + v7_action = str(v7dec.get("action", "") if isinstance(v7dec, dict) else getattr(v7dec, "action", "")).upper() + if v7_action == "RETRACT": + try: + cmd = { + "command_id": f"v7-retract-{uuid.uuid4().hex[:16]}", + "trade_id": tid_v7, + "action": "RETRACT", + "fraction": 0.50, + "reason": "V7_RETRACT", + "source": "v7", + "ts": float(time.time()), + "asset": pos.asset, + "chain_root_trade_id": str(pending_v7.get("chain_root_trade_id", tid_v7) or tid_v7), + "chain_head_leg_id": str(pending_v7.get("chain_head_leg_id", f"{tid_v7}:open") or f"{tid_v7}:open"), + "chain_prev_leg_id": str(pending_v7.get("chain_prev_leg_id", "") or ""), + "chain_seq": int(pending_v7.get("chain_seq", pending_v7.get("retraction_legs", 0)) or 0), + "chain_token": str(pending_v7.get("chain_token", "") or ""), + } + raw_q = self.control_map.blocking().get("blue_runtime_commands") if self.control_map else None + q = json.loads(raw_q) if isinstance(raw_q, str) and raw_q else [] + if not isinstance(q, list): + q = [] + q.append(cmd) + q = q[-200:] + if self.control_map is not None: + self.control_map.blocking().put("blue_runtime_commands", json.dumps(q)) + except Exception as e: + log(f" V7 retract enqueue failed for {tid_v7}: {e}") if self._bounce_advisor is not None: try: entry_ts_val = float(pending_v7.get('entry_ts', 0) or 0) @@ -1505,6 +2578,10 @@ class DolphinLiveTrader: except Exception as e: log(f" V7 live evaluate failed for {tid_v7}: {e}") + _forced_exit = self._process_runtime_commands(prices_dict) + if _forced_exit is not None and not result.get('exit'): + result['exit'] = _forced_exit + if result.get('exit'): x = result['exit'] tid = x.get('trade_id') @@ -1522,8 +2599,17 @@ class DolphinLiveTrader: log(f"HIBERNATE_PROTECT: closed via {x['reason']} — posture finalized HIBERNATE") else: log(f"HIBERNATE_PROTECT: closed via {x['reason']} — posture recovered to {_cur_posture}") + x['reason'] = _normalize_v7_exit_reason(x.get('reason', '')) log(f"EXIT: {x} [{ALGO_VERSION}]") - tid = x.get('trade_id') + _exit_reason_raw = str(x.get('reason', '')) + if _exit_reason_raw in ('FIXED_TP', 'HIBERNATE_TP'): + _tp_used = self.eng.exit_manager.fixed_tp_pct + _pos = self.eng.position + _bars = int(x.get('bars_held', 0) or 0) + log(f" TP_EXIT: tp_pct={_tp_used*100:.2f}% bars_held={_bars} " + f"pnl_pct={float(x.get('pnl_pct',0) or 0):+.4f}") + tid = self._resolve_trade_id(x.get('trade_id'), create_if_missing=True) + x['trade_id'] = tid pending = self._pending_entries.pop(tid, {}) if tid else {} if tid: self._v7_contexts.pop(tid, None) @@ -1582,10 +2668,28 @@ class DolphinLiveTrader: ) except Exception as e: log(f" MarketStateRuntime outcome update failed for {tid}: {e}") + if self._efsm is not None: + try: + _efsm_out = self._efsm.observe_closed_trade( + trade_id=str(tid or ""), + asset=str(pending.get("asset", "") or ""), + side=str(pending.get("side", "SHORT") or "SHORT"), + pnl=float(x.get("net_pnl", 0) or 0), + pnl_pct=float(x.get("pnl_pct", 0) or 0), + leverage=float(pending.get("leverage", 0) or 0), + closed_ts=datetime.now(timezone.utc), + was_overlay_flip=bool(pending.get("overlay_flip", False)), + metadata={"exit_reason": str(x.get("reason", "UNKNOWN"))}, + ) + if _efsm_out.action in {"ARMED", "TAG", "RESET"}: + log(f"EFSM { _efsm_out.action }: { _efsm_out.to_dict() }") + except Exception as e: + log(f" EFSM observe_closed_trade failed for {tid}: {e}") ch_put("trade_events", { "ts": _ch_ts_us(), "date": pending['entry_date'], "strategy": "blue", + "trade_id": tid, "asset": pending['asset'], "side": pending['side'], "entry_price": pending['entry_price'], @@ -1601,6 +2705,29 @@ class DolphinLiveTrader: "leverage": pending['leverage'], "bars_held": int(x.get('bars_held', 0) or 0), "regime_signal": 0, + "tp_threshold": float(self.eng.exit_manager.fixed_tp_pct), + }) + ch_put("trade_reconstruction", { + "ts": _ch_ts_us(), + "trade_id": str(tid or ""), + "event_type": "CLOSE", + "event_id": f"{tid}:close", + "payload_json": json.dumps({ + "exit": x, + "pending": pending, + "exit_price": exit_price, + "retraction_legs": int(pending.get("retraction_legs", 0) or 0), + "retraction_realized_total": float(pending.get("realized_pnl_legs_total", 0.0) or 0.0), + "chain": { + "trade_id": tid, + "chain_root_trade_id": pending.get("chain_root_trade_id", tid), + "chain_head_leg_id": pending.get("chain_head_leg_id", f"{tid}:open"), + "chain_prev_leg_id": pending.get("chain_prev_leg_id", ""), + "chain_seq": int(pending.get("retraction_legs", 0) or 0), + "chain_token": pending.get("chain_token", ""), + "chain_mode": pending.get("chain_mode", "LIVE"), + }, + }, default=str), }) # Mark position closed in CH (supersedes OPEN row via ReplacingMergeTree) self._ps_write_closed(tid, pending, x) @@ -1625,6 +2752,9 @@ class DolphinLiveTrader: "exit_reason": str(x.get("reason", "UNKNOWN")), "bars_held": int(x.get("bars_held", 0) or 0), "posture": pending.get("posture", ""), + "overlay_flip": bool(pending.get("overlay_flip", False)), + "overlay_reason": str(pending.get("overlay_reason", "")), + "overlay_slot": int(pending.get("overlay_slot", 0) or 0), }, ) # Shadow AE: record outcome for online update @@ -1654,6 +2784,7 @@ class DolphinLiveTrader: _entry_px = float(_p.get('entry_price', 0) or 0) _bars_held = max(0, int(_bar - int(_p.get('entry_bar', _bar)))) _shadow_pnl_pct = ((_entry_px - _cur) / _entry_px) if _entry_px > 0 else 0.0 + _recent_prices = self._bounce_price_path(_p['asset']) _shadow = _ae_ref.evaluate( trade_id=_tid, asset=_p['asset'], @@ -1664,6 +2795,28 @@ class DolphinLiveTrader: vel_div_now=_vel_now, ) _ae_ref.log_shadow(_shadow, pnl_pct=_shadow_pnl_pct) + if self._advanced_sl is not None: + try: + _ms_state = dict(self._market_state_runtime.latest_state) if self._market_state_runtime and getattr(self._market_state_runtime, "latest_state", None) else {} + _ms_bundle = dict(self._market_state_runtime.latest_bundle_dict) if self._market_state_runtime and getattr(self._market_state_runtime, "latest_bundle_dict", None) else {} + _v7 = dict(self._v7_decisions.get(_tid, {}) or {}) + _adv = self._advanced_sl.evaluate( + trade_id=_tid, + asset=_p['asset'], + side=str(_p.get("side", "SHORT") or "SHORT"), + entry_price=_entry_px, + current_price=_cur, + bars_held=_bars_held, + recent_prices=_recent_prices, + ae_shadow=_shadow, + v7_decision=_v7, + market_state=_ms_state, + market_bundle=_ms_bundle, + exf_snapshot=dict(self._last_exf or {}), + ) + self._advanced_sl.log_shadow(_adv, pnl_pct=_shadow_pnl_pct) + except Exception: + pass except Exception: pass threading.Thread(target=_ae_eval, daemon=True).start() @@ -1699,7 +2852,8 @@ class DolphinLiveTrader: ) if subday_exit is not None: log(f"SUBDAY_EXIT: {subday_exit} [{ALGO_VERSION}]") - tid = subday_exit.get('trade_id') + tid = self._resolve_trade_id(subday_exit.get('trade_id'), create_if_missing=True) + subday_exit['trade_id'] = tid pending = {} if tid: pending = self._pending_entries.pop(tid, {}) @@ -1753,10 +2907,28 @@ class DolphinLiveTrader: ) except Exception as e: log(f" MarketStateRuntime outcome update failed for {tid}: {e}") + if self._efsm is not None: + try: + _efsm_sub = self._efsm.observe_closed_trade( + trade_id=str(tid or ""), + asset=str(pending.get("asset", "") or ""), + side=str(pending.get("side", "SHORT") or "SHORT"), + pnl=float(subday_exit.get("net_pnl", 0) or 0), + pnl_pct=float(subday_exit.get("pnl_pct", 0) or 0), + leverage=float(pending.get("leverage", 0) or 0), + closed_ts=datetime.now(timezone.utc), + was_overlay_flip=bool(pending.get("overlay_flip", False)), + metadata={"exit_reason": str(subday_exit.get("reason", "SUBDAY_ACB_NORMALIZATION"))}, + ) + if _efsm_sub.action in {"ARMED", "TAG", "RESET"}: + log(f"EFSM { _efsm_sub.action }: { _efsm_sub.to_dict() }") + except Exception as e: + log(f" EFSM observe_closed_trade failed for {tid}: {e}") ch_put("trade_events", { "ts": _ch_ts_us(), "date": self.current_day or '', "strategy": "blue", + "trade_id": tid, "asset": pending.get('asset', subday_exit.get('asset', '')), "side": pending.get('side', 'SHORT'), "entry_price": pending.get('entry_price', 0), @@ -1783,8 +2955,8 @@ class DolphinLiveTrader: f"pnl_pct={float(subday_exit.get('pnl_pct', 0) or 0):+.3%}" ), metadata={ - "trade_id": tid, - "asset": pending.get("asset", subday_exit.get("asset", "")), + "trade_id": tid, + "asset": pending.get("asset", subday_exit.get("asset", "")), "side": pending.get("side", "SHORT"), "entry_price": pending.get("entry_price", 0), "exit_price": float(subday_exit.get("exit_price", 0) or 0), @@ -1794,6 +2966,9 @@ class DolphinLiveTrader: "exit_reason": str(subday_exit.get("reason", "SUBDAY_ACB_NORMALIZATION")), "bars_held": int(subday_exit.get("bars_held", 0) or 0), "posture": pending.get("posture", ""), + "overlay_flip": bool(pending.get("overlay_flip", False)), + "overlay_reason": str(pending.get("overlay_reason", "")), + "overlay_slot": int(pending.get("overlay_slot", 0) or 0), }, ) now = time.time() @@ -1841,40 +3016,17 @@ class DolphinLiveTrader: log(f" capital disk save failed: {e}") def _restore_capital(self): - """On startup, restore capital from HZ or disk checkpoint.""" - def _try_load(raw, source): - if not raw: - return False - try: - data = json.loads(raw) - saved = float(data.get('capital', 0)) - age_h = (time.time() - data.get('ts', 0)) / 3600 - if saved >= 1.0 and math.isfinite(saved) and age_h < 72: - self.eng.capital = saved - log(f" Capital restored from {source}: ${saved:,.2f} (age {age_h:.1f}h)") - return True - except Exception: - pass - return False + """Restore capital from live HZ state or ledger-backed snapshots. - # Primary: Hazelcast - try: - raw = self.state_map.blocking().get('capital_checkpoint') - if _try_load(raw, 'HZ'): - return - except Exception as e: - log(f" capital HZ restore failed: {e}") - - # Secondary: disk fallback - try: - if CAPITAL_DISK_CHECKPOINT.exists(): - raw = CAPITAL_DISK_CHECKPOINT.read_text() - if _try_load(raw, 'disk'): - return - except Exception as e: - log(f" capital disk restore failed: {e}") - - log(" Capital: no valid checkpoint — starting at initial_capital") + The raw scalar checkpoint is legacy-only and requires the explicit + DOLPHIN_ALLOW_LEGACY_CAPITAL_CHECKPOINT=1 escape hatch. + """ + self._restore_failed = False + self._restore_failure_reason = "" + self._restore_source = "" + if self._restore_capital_from_state(): + return + log(" Capital: no sane state source found — restore halted") def _push_state(self, scan_number, vel_div, vol_ok, posture): try: @@ -1883,13 +3035,22 @@ class DolphinLiveTrader: # Engine uses a single NDPosition object, not a list pos = getattr(self.eng, 'position', None) if pos is not None: + pending = self._pending_entries.get(getattr(pos, "trade_id", ""), {}) open_notional = float(getattr(pos, 'notional', 0) or 0) open_positions_list = [{ + 'trade_id': getattr(pos, 'trade_id', ''), 'asset': pos.asset, 'side': 'SHORT' if pos.direction == -1 else 'LONG', 'entry_price': pos.entry_price, 'quantity': round(open_notional / pos.entry_price, 6) if pos.entry_price else 0, 'notional': open_notional, + 'retraction_legs': int(pending.get('retraction_legs', 0) or 0), + 'realized_pnl_legs_total': float(pending.get('realized_pnl_legs_total', 0.0) or 0.0), + 'chain_root_trade_id': str(pending.get('chain_root_trade_id', getattr(pos, 'trade_id', '')) or getattr(pos, 'trade_id', '')), + 'chain_head_leg_id': str(pending.get('chain_head_leg_id', f"{getattr(pos, 'trade_id', '')}:open") or f"{getattr(pos, 'trade_id', '')}:open"), + 'chain_prev_leg_id': str(pending.get('chain_prev_leg_id', '') or ''), + 'chain_seq': int(pending.get('chain_seq', pending.get('retraction_legs', 0)) or 0), + 'chain_token': str(pending.get('chain_token', '') or ''), 'leverage': float(getattr(pos, 'leverage', 0) or 0), 'unrealized_pnl': round(pos.pnl_pct * open_notional, 2), }] @@ -1904,6 +3065,7 @@ class DolphinLiveTrader: 'algo_version': ALGO_VERSION, 'last_scan_number': scan_number, 'last_vel_div': vel_div, 'vol_ok': vol_ok, 'posture': posture, + 'vol_gate_threshold': float(self.vol_p60_threshold), 'scans_processed': self.scans_processed, 'trades_executed': self.trades_executed, 'bar_idx': self.bar_idx, @@ -1913,10 +3075,15 @@ class DolphinLiveTrader: 'leverage_abs_cap': getattr(self.eng, 'abs_max_leverage', 9.0), 'open_notional': round(open_notional, 2), 'current_leverage': round(cur_leverage, 4), + 'trade_direction_base': int(self.trade_direction), + 'trade_direction_runtime': int(self._runtime_direction), + 'efsm': self._efsm.snapshot() if self._efsm is not None else None, + 'advanced_sl': self._advanced_sl.snapshot_dict() if self._advanced_sl is not None else None, } future = self.state_map.put('engine_snapshot', json.dumps(snapshot)) future.add_done_callback(lambda f: None) - # Heartbeat — MHS checks age < 30s; we run every scan (~11s) + # Heartbeat — MHS checks age < 30s; force blocking put to avoid + # silent async drop/stall under client backpressure. if self.heartbeat_map is not None: hb = json.dumps({ 'ts': time.time(), @@ -1925,7 +3092,10 @@ class DolphinLiveTrader: 'phase': 'trading', 'flow': 'nautilus_event_trader', }) - self.heartbeat_map.put('nautilus_flow_heartbeat', hb) + try: + self.heartbeat_map.blocking().put('nautilus_flow_heartbeat', hb) + except Exception as hb_err: + log(f" Heartbeat put failed: {hb_err}") # Persist capital so next restart resumes from here if capital is not None and math.isfinite(capital) and capital >= 1.0: self._save_capital() @@ -1940,9 +3110,25 @@ class DolphinLiveTrader: self._build_engine() self._connect_hz() + threading.Thread(target=self._heartbeat_loop, daemon=True).start() self._restore_capital() + if self._restore_failed: + log(f"RESTORE HALT: {self._restore_failure_reason}") + self.shutdown() + return self._rollover_day() self._restore_position_state() + if self._restore_failed: + log(f"RESTORE HALT: {self._restore_failure_reason}") + self.shutdown() + return + # Seed the live snapshot immediately so engine_snapshot and + # capital_checkpoint reflect the restored capital before scan traffic. + try: + posture = self._read_posture() + self._push_state(self.bar_idx, 0.0, True, posture) + except Exception as e: + log(f" Startup seed push failed: {e}") def listener(event): self.on_scan(event) diff --git a/prod/tests/test_bingx_nautilus_execution.py b/prod/tests/test_bingx_nautilus_execution.py new file mode 100644 index 0000000..7dd0bc1 --- /dev/null +++ b/prod/tests/test_bingx_nautilus_execution.py @@ -0,0 +1,559 @@ +""" +test_bingx_nautilus_execution.py +================================ +End-to-end tests for the Nautilus execution path: + engine.step_bar() -> _exec_submit_entry() -> cache.instrument() -> order_factory -> submit_order + +Tests cover: + 1. Instrument registration from exec client into Nautilus cache + 2. _exec_submit_entry returns early when instrument missing + 3. _exec_submit_entry succeeds when instrument is in cache + 4. Data-venue fallback when exec-venue instrument not available + 5. Full order payload correctness (tags, side, quantity precision) + 6. Venue symbol mapping uses raw_symbol from cached instrument + 7. _venue_symbol fallback when instrument not in cache + 8. Integration: build_actor_config with split venues +""" + +from __future__ import annotations + +import asyncio +import math +import sys +from decimal import Decimal +from pathlib import Path +from types import SimpleNamespace +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parents[2])) +sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "nautilus_dolphin")) + +from nautilus_trader.model.enums import OrderSide, OrderType +from nautilus_trader.model.identifiers import InstrumentId +from nautilus_trader.model.objects import Currency, Price, Quantity + +from prod.bingx.enums import BINGX_VENUE, BingxEnvironment +from prod.bingx.execution import BingxExecutionClient +from prod.bingx.sandbox_status import build_sandbox_status +from prod.bingx.sandbox_status import write_sandbox_status + + +# -- Helpers ------------------------------------------------------------------- + + +def _make_bingx_instrument(symbol: str = "BTCUSDT"): + return SimpleNamespace( + id=InstrumentId.from_str(f"{symbol}.BINGX"), + instrument_id=InstrumentId.from_str(f"{symbol}.BINGX"), + symbol=SimpleNamespace(value=symbol), + raw_symbol=SimpleNamespace(value=f"{symbol[:-4]}-USDT"), + base_currency=Currency.from_str(symbol[:3]), + quote_currency=Currency.from_str("USDT"), + size_precision=3, + price_precision=2, + maker_fee=Decimal("0.0002"), + taker_fee=Decimal("0.0005"), + ) + + +def _make_binance_instrument(symbol: str = "BTCUSDT"): + return SimpleNamespace( + id=InstrumentId.from_str(f"{symbol}.BINANCE"), + instrument_id=InstrumentId.from_str(f"{symbol}.BINANCE"), + symbol=SimpleNamespace(value=symbol), + raw_symbol=SimpleNamespace(value=symbol), + base_currency=Currency.from_str(symbol[:3]), + quote_currency=Currency.from_str("USDT"), + size_precision=3, + price_precision=2, + maker_fee=Decimal("0.0002"), + taker_fee=Decimal("0.0005"), + ) + + +class FakeCache: + def __init__(self, instruments=None): + self._instruments = dict(instruments or {}) + + def instrument(self, instrument_id): + return self._instruments.get(instrument_id) + + def add_instrument(self, instrument): + self._instruments[instrument.id] = instrument + + def instruments(self): + return list(self._instruments.values()) + + def positions(self, venue=None): + return [] + + def order(self, client_order_id): + return None + + def add_currency(self, currency): + pass + + +class FakeProvider: + def __init__(self, instruments=None): + self._instruments = list(instruments or []) + + def list_all(self): + return self._instruments + + def currencies(self): + return {} + + async def initialize(self): + pass + + +async def _noop(*args, **kwargs): + pass + + +async def _persist_sandbox_status(client, *, environment: str = "VST", notes: dict[str, Any] | None = None): + balance = await client.signed_get("/openApi/swap/v2/user/balance") + positions = await client.signed_get("/openApi/swap/v2/user/positions") + open_orders = await client.signed_get("/openApi/swap/v2/trade/openOrders") + status = build_sandbox_status( + balance_payload=balance, + positions_payload=positions, + open_orders_payload=open_orders, + environment=environment, + notes=notes or {}, + ) + write_sandbox_status(status) + return status + + +def _make_connect_stub(cache, provider): + return SimpleNamespace( + _cache=cache, + _provider=provider, + _config=SimpleNamespace(prefer_websocket=False), + _log=SimpleNamespace(info=lambda *a, **kw: None, warning=lambda *a, **kw: None), + _start_pollers=lambda: None, + _refresh_account_state=_noop, + _restore_journal_snapshot=_noop, + _persist_journal_snapshot=_noop, + _await_account_registered=_noop, + ) + + +def _make_actor_stub(cache, exec_venue="BINGX", data_venue="BINANCE"): + log_messages = [] + + class Log: + def info(self, msg, *a, **kw): + log_messages.append(("info", msg)) + def warning(self, msg, *a, **kw): + log_messages.append(("warning", msg)) + def error(self, msg, *a, **kw): + log_messages.append(("error", msg)) + def debug(self, msg, *a, **kw): + log_messages.append(("debug", msg)) + + return SimpleNamespace( + cache=cache, + log=Log(), + _log_messages=log_messages, + dolphin_config={ + "engine": {"max_account_leverage": 2.0}, + "paper_trade": {"initial_capital": 25000.0}, + }, + engine=SimpleNamespace(capital=100000.0), + _last_portfolio_capital=100000.0, + _exec_venue_name=lambda: exec_venue, + _data_venue_name=lambda: data_venue, + _exec_open_positions={}, + order_factory=SimpleNamespace( + market=lambda **kw: SimpleNamespace( + instrument_id=kw.get("instrument_id"), + order_side=kw.get("order_side"), + quantity=kw.get("quantity"), + tags=kw.get("tags", []), + client_order_id=SimpleNamespace(value="test-coid"), + order_type=OrderType.MARKET, + strategy_id=SimpleNamespace(value="test-strat"), + ) + ), + submit_order=MagicMock(), + clock=SimpleNamespace(timestamp_ns=lambda: 1000), + ) + + +# -- Test: Instrument Registration -------------------------------------------- + + +class TestExecClientRegistersInstrumentsInCache: + def test_instruments_registered_after_connect(self): + inst1 = _make_bingx_instrument("BTCUSDT") + inst2 = _make_bingx_instrument("ETHUSDT") + cache = FakeCache() + provider = FakeProvider([inst1, inst2]) + stub = _make_connect_stub(cache, provider) + + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(BingxExecutionClient._connect(stub)) + finally: + loop.close() + + assert cache.instrument(InstrumentId.from_str("BTCUSDT.BINGX")) is inst1 + assert cache.instrument(InstrumentId.from_str("ETHUSDT.BINGX")) is inst2 + + def test_empty_provider_no_crash(self): + cache = FakeCache() + provider = FakeProvider([]) + stub = _make_connect_stub(cache, provider) + + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(BingxExecutionClient._connect(stub)) + finally: + loop.close() + + assert len(list(cache.instruments())) == 0 + + +# -- Test: _exec_submit_entry instrument lookup ------------------------------ + + +class TestExecSubmitEntryInstrumentLookup: + def test_returns_early_when_no_exec_instrument_no_data_fallback(self): + cache = FakeCache() + stub = _make_actor_stub(cache) + + entry = {"asset": "XLMUSDT", "direction": -1, "notional": 5000.0, "entry_price": 0.10, "trade_id": "t1", "leverage": 1.0} + prices = {"XLMUSDT": 0.10} + + result = DolphinActor._exec_submit_entry(stub, entry, prices) + + assert result is None + assert not stub.submit_order.called + error_msgs = [m for lvl, m in stub._log_messages if lvl == "error"] + assert any("not in cache" in m for m in error_msgs) + + def test_succeeds_with_exec_instrument_in_cache(self): + inst = _make_bingx_instrument("XLMUSDT") + cache = FakeCache({inst.id: inst}) + stub = _make_actor_stub(cache) + + entry = {"asset": "XLMUSDT", "direction": -1, "notional": 5000.0, "entry_price": 0.10, "trade_id": "t1", "leverage": 1.0} + prices = {"XLMUSDT": 0.10} + + DolphinActor._exec_submit_entry(stub, entry, prices) + + assert stub.submit_order.called + order = stub.submit_order.call_args[0][0] + assert str(order.instrument_id) == "XLMUSDT.BINGX" + assert order.order_side == OrderSide.SELL + assert order.tags[0] == "type:entry" + assert "direction:SHORT" in order.tags + + def test_data_venue_fallback_with_warning(self): + binance_inst = _make_binance_instrument("XLMUSDT") + cache = FakeCache({binance_inst.id: binance_inst}) + stub = _make_actor_stub(cache, exec_venue="BINGX", data_venue="BINANCE") + + entry = {"asset": "XLMUSDT", "direction": -1, "notional": 5000.0, "entry_price": 0.10, "trade_id": "t1", "leverage": 1.0} + prices = {"XLMUSDT": 0.10} + + DolphinActor._exec_submit_entry(stub, entry, prices) + + assert stub.submit_order.called + warn_msgs = [m for lvl, m in stub._log_messages if lvl == "warning"] + assert any("borrowing metadata" in m for m in warn_msgs) + + def test_info_log_on_successful_entry(self): + inst = _make_bingx_instrument("XLMUSDT") + cache = FakeCache({inst.id: inst}) + stub = _make_actor_stub(cache) + + entry = {"asset": "XLMUSDT", "direction": -1, "notional": 5000.0, "entry_price": 0.10, "trade_id": "t42", "leverage": 1.5} + prices = {"XLMUSDT": 0.10} + + DolphinActor._exec_submit_entry(stub, entry, prices) + + info_msgs = [m for lvl, m in stub._log_messages if lvl == "info"] + assert any("[EXEC] ENTRY SHORT" in m and "XLMUSDT" in m for m in info_msgs) + + def test_quantity_uses_instrument_size_precision(self): + inst = _make_bingx_instrument("BTCUSDT") + inst.size_precision = 4 + cache = FakeCache({inst.id: inst}) + stub = _make_actor_stub(cache) + + entry = {"asset": "BTCUSDT", "direction": 1, "notional": 50000.0, "entry_price": 100000.0, "trade_id": "t1", "leverage": 1.0} + prices = {"BTCUSDT": 100000.0} + + DolphinActor._exec_submit_entry(stub, entry, prices) + + order = stub.submit_order.call_args[0][0] + assert order.quantity.precision == 4 + + +# -- Test: _venue_symbol mapping --------------------------------------------- + + +class TestVenueSymbolMapping: + def test_uses_raw_symbol_when_instrument_in_cache(self): + inst = _make_bingx_instrument("TRXUSDT") + cache = FakeCache({inst.id: inst}) + stub = SimpleNamespace(_cache=cache) + result = BingxExecutionClient._venue_symbol(stub, InstrumentId.from_str("TRXUSDT.BINGX")) + assert result == "TRX-USDT" + + def test_fallback_converts_usdt_suffix(self): + stub = SimpleNamespace(_cache=FakeCache()) + result = BingxExecutionClient._venue_symbol(stub, InstrumentId.from_str("XLMUSDT.BINGX")) + assert result == "XLM-USDT" + + def test_fallback_passes_through_hyphenated(self): + stub = SimpleNamespace(_cache=FakeCache()) + result = BingxExecutionClient._venue_symbol(stub, InstrumentId.from_str("BTC-USDT.BINGX")) + assert result == "BTC-USDT" + + +# -- Test: _map_submit_order ------------------------------------------------- + + +class TestMapSubmitOrderForMarketOrder: + def test_market_sell_with_tags(self): + inst = _make_bingx_instrument("ETHUSDT") + cache = FakeCache({inst.id: inst}) + + order = SimpleNamespace( + instrument_id=InstrumentId.from_str("ETHUSDT.BINGX"), + side=OrderSide.SELL, + order_type=OrderType.MARKET, + quantity=Quantity.from_str("1.500"), + client_order_id=SimpleNamespace(value="test-cid-001"), + is_post_only=False, + is_reduce_only=False, + has_price=False, + has_trigger_price=False, + price=None, + trigger_price=None, + time_in_force=None, + tags=["type:entry", "direction:SHORT", "cm:2.50", "tid:t99"], + ) + + adapter = SimpleNamespace( + _cache=cache, + _config=SimpleNamespace(use_reduce_only=True, recv_window_ms=5000), + _venue_symbol=lambda iid: BingxExecutionClient._venue_symbol(adapter, iid), + _format_quantity=lambda q: str(q), + _format_price=lambda p: str(p), + _map_order_type=BingxExecutionClient._map_order_type, + _map_time_in_force=BingxExecutionClient._map_time_in_force, + ) + + # Rebind _venue_symbol after adapter exists so self-referencing works + adapter._venue_symbol = lambda iid: BingxExecutionClient._venue_symbol(adapter, iid) + + payload = BingxExecutionClient._map_submit_order(adapter, order) + assert payload["symbol"] == "ETH-USDT" + assert payload["side"] == "SELL" + assert payload["type"] == "MARKET" + assert payload["quantity"] == "1.500" + assert payload["clientOrderId"] == "test-cid-001" + + +# -- Test: Order tag parsing for leverage ------------------------------------ + + +class TestLeverageTagParsing: + def test_extracts_lev_tag(self): + order = SimpleNamespace(tags=["type:entry", "lev:2.50", "cm:2.50", "tid:t1"]) + assert BingxExecutionClient._parse_leverage_from_tags(order) == 2.50 + + def test_extracts_cm_tag(self): + order = SimpleNamespace(tags=["type:entry", "cm:3.00", "tid:t1"]) + assert BingxExecutionClient._parse_leverage_from_tags(order) == 3.00 + + def test_returns_none_no_tags(self): + order = SimpleNamespace(tags=[]) + assert BingxExecutionClient._parse_leverage_from_tags(order) is None + + def test_returns_none_no_leverage_tags(self): + order = SimpleNamespace(tags=["type:entry", "direction:SHORT"]) + assert BingxExecutionClient._parse_leverage_from_tags(order) is None + + +# -- Test: Split venue configuration ----------------------------------------- + + +class TestSplitVenueConfig: + def test_split_venues_preserved(self): + from prod.launch_dolphin_live import build_actor_config + cfg = build_actor_config(data_venue="BINANCE", exec_venue="BINGX") + assert cfg["data_venue"] == "BINANCE" + assert cfg["exec_venue"] == "BINGX" + assert cfg["venue"] == "BINGX" + + +# -- Import DolphinActor ----------------------------------------------------- + + +try: + from nautilus_dolphin.nautilus.dolphin_actor import DolphinActor + HAS_DOLPHIN_ACTOR = True +except ImportError: + HAS_DOLPHIN_ACTOR = False + + +@pytest.mark.skipif(not HAS_DOLPHIN_ACTOR, reason="DolphinActor not importable") +class TestDolphinActorExecSubmitEntry: + def _actor_stub(self, cache): + return _make_actor_stub(cache) + + def test_full_entry_flow_short_order(self): + inst = _make_bingx_instrument("SOLUSDT") + cache = FakeCache({inst.id: inst}) + stub = self._actor_stub(cache) + + entry = { + "asset": "SOLUSDT", "direction": -1, "notional": 3000.0, + "entry_price": 150.0, "trade_id": "trade-sol-001", + "leverage": 2.0, "vel_div": -0.035, + } + prices = {"SOLUSDT": 150.0} + + DolphinActor._exec_submit_entry(stub, entry, prices) + + assert stub.submit_order.called + order = stub.submit_order.call_args[0][0] + assert "direction:SHORT" in order.tags + assert "cm:2.00" in order.tags + assert "lev:2.00" in order.tags + assert "tid:trade-sol-001" in order.tags + + def test_full_entry_flow_long_order(self): + inst = _make_bingx_instrument("ADAUSDT") + cache = FakeCache({inst.id: inst}) + stub = self._actor_stub(cache) + + entry = { + "asset": "ADAUSDT", "direction": 1, "notional": 2000.0, + "entry_price": 0.45, "trade_id": "trade-ada-002", + "leverage": 1.0, "vel_div": -0.025, + } + prices = {"ADAUSDT": 0.45} + + DolphinActor._exec_submit_entry(stub, entry, prices) + + assert stub.submit_order.called + order = stub.submit_order.call_args[0][0] + assert order.order_side == OrderSide.BUY + assert "direction:LONG" in order.tags + + def test_caps_notional_when_near_capacity_limit(self): + inst = _make_bingx_instrument("BTCUSDT") + cache = FakeCache({inst.id: inst}) + stub = self._actor_stub(cache) + stub.engine = SimpleNamespace(capital=10.0) + stub.dolphin_config["engine"]["max_account_leverage"] = 0.01 + + entry = { + "asset": "BTCUSDT", "direction": -1, "notional": 5000.0, + "entry_price": 100000.0, "trade_id": "t1", "leverage": 1.0, + } + prices = {"BTCUSDT": 100000.0} + + DolphinActor._exec_submit_entry(stub, entry, prices) + assert stub.submit_order.called + warn_msgs = [m for lvl, m in stub._log_messages if lvl == "warning"] + assert any("capped by portfolio exposure" in m for m in warn_msgs) + + def test_skips_when_notional_zero(self): + inst = _make_bingx_instrument("BTCUSDT") + cache = FakeCache({inst.id: inst}) + stub = self._actor_stub(cache) + + entry = { + "asset": "BTCUSDT", "direction": -1, "notional": 0.0, + "entry_price": 100000.0, "trade_id": "t1", "leverage": 1.0, + } + prices = {"BTCUSDT": 100000.0} + + result = DolphinActor._exec_submit_entry(stub, entry, prices) + assert result is None + assert not stub.submit_order.called + + +# -- Live integration (requires BingX VST credentials) ----------------------- + + +@pytest.mark.skipif( + not Path("/mnt/dolphinng5_predict/.env").exists(), + reason="No .env file (no BingX credentials)", +) +class TestLiveInstrumentProvider: + def test_loads_instruments_from_vst(self): + import os + from dotenv import load_dotenv + load_dotenv("/mnt/dolphinng5_predict/.env") + + from prod.bingx.config import BingxExecClientConfig + from prod.bingx.http import BingxHttpClient + from prod.bingx.instrument_provider import BingxInstrumentProvider, BingxInstrumentProviderConfig + + async def _run(): + cfg = BingxExecClientConfig( + api_key=os.environ.get("BINGX_API_KEY", ""), + secret_key=os.environ.get("BINGX_SECRET_KEY", ""), + environment=BingxEnvironment.VST, + ) + client = BingxHttpClient(config=cfg) + provider = BingxInstrumentProvider( + client=client, + config=BingxInstrumentProviderConfig(load_all=True), + ) + await provider.initialize() + instruments = provider.list_all() + await _persist_sandbox_status(client, notes={"test": "loads_instruments_from_vst"}) + await client.close() + return instruments + + instruments = asyncio.run(_run()) + assert len(instruments) > 0 + symbols = {i.symbol.value for i in instruments} + assert "BTCUSDT" in symbols + assert "ETHUSDT" in symbols + + def test_trxusdt_instrument_has_correct_precision(self): + import os + from dotenv import load_dotenv + load_dotenv("/mnt/dolphinng5_predict/.env") + + from prod.bingx.config import BingxExecClientConfig + from prod.bingx.http import BingxHttpClient + from prod.bingx.instrument_provider import BingxInstrumentProvider, BingxInstrumentProviderConfig + + async def _run(): + cfg = BingxExecClientConfig( + api_key=os.environ.get("BINGX_API_KEY", ""), + secret_key=os.environ.get("BINGX_SECRET_KEY", ""), + environment=BingxEnvironment.VST, + ) + client = BingxHttpClient(config=cfg) + provider = BingxInstrumentProvider( + client=client, + config=BingxInstrumentProviderConfig(load_all=True), + ) + await provider.initialize() + inst = provider.find(InstrumentId.from_str("TRXUSDT.BINGX")) + await _persist_sandbox_status(client, notes={"test": "trxusdt_instrument_has_correct_precision"}) + await client.close() + return inst + + inst = asyncio.run(_run()) + assert inst is not None + assert inst.size_precision >= 1 + assert inst.price_precision >= 1 + assert inst.raw_symbol.value == "TRX-USDT" diff --git a/prod/tests/test_bingx_sandbox_status.py b/prod/tests/test_bingx_sandbox_status.py new file mode 100644 index 0000000..9cddac5 --- /dev/null +++ b/prod/tests/test_bingx_sandbox_status.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +from pathlib import Path + +from prod.bingx.sandbox_status import build_sandbox_status +from prod.bingx.sandbox_status import load_sandbox_status +from prod.bingx.sandbox_status import write_sandbox_status + + +def test_build_sandbox_status_marks_clean_when_flat(): + status = build_sandbox_status( + balance_payload={ + "balance": { + "balance": "12000.5", + "equity": "12000.5", + "availableMargin": "12000.5", + "unrealizedProfit": "0", + "usedMargin": "0", + } + }, + positions_payload=[], + open_orders_payload={"orders": []}, + environment="VST", + ) + + assert status.clean is True + assert status.balance == 12000.5 + assert status.equity == 12000.5 + assert status.open_positions == 0 + assert status.open_orders == 0 + + +def test_build_sandbox_status_marks_dirty_when_positions_or_orders_exist(): + status = build_sandbox_status( + balance_payload={ + "balance": { + "balance": "12000.5", + "equity": "12500.5", + "availableMargin": "9000.5", + "unrealizedProfit": "500", + "usedMargin": "3000", + } + }, + positions_payload=[{"symbol": "BTC-USDT"}, {"symbol": "ETH-USDT"}], + open_orders_payload={"orders": [{"symbol": "BTC-USDT"}]}, + environment="VST", + ) + + assert status.clean is False + assert status.open_positions == 2 + assert status.open_orders == 1 + assert status.unrealized_profit == 500.0 + + +def test_write_and_load_sandbox_status_round_trip(tmp_path: Path): + status = build_sandbox_status( + balance_payload={"balance": {"balance": "10", "equity": "11", "availableMargin": "9", "unrealizedProfit": "1", "usedMargin": "2"}}, + positions_payload=[], + open_orders_payload=[], + environment="VST", + notes={"source": "unit-test"}, + ) + path = tmp_path / "bingx_sandbox_status.json" + write_sandbox_status(status, path) + + loaded = load_sandbox_status(path) + assert loaded is not None + assert loaded["balance"] == 10.0 + assert loaded["equity"] == 11.0 + assert loaded["clean"] is True + assert loaded["notes"]["source"] == "unit-test" diff --git a/prod/tests/test_capital_restore_selection.py b/prod/tests/test_capital_restore_selection.py new file mode 100644 index 0000000..2091605 --- /dev/null +++ b/prod/tests/test_capital_restore_selection.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +"""Tests for capital restore source selection on startup.""" + +import json +import os +from datetime import datetime, timezone +from unittest.mock import patch + +import pytest + +from prod.nautilus_event_trader import DolphinLiveTrader + + +class _MapStub: + def __init__(self, payloads): + self._payloads = payloads + + def blocking(self): + return self + + def get(self, key): + return self._payloads.get(key) + + +def _build_trader() -> DolphinLiveTrader: + trader = DolphinLiveTrader() + trader._build_engine() + trader.eng.begin_day(datetime.now(timezone.utc).strftime("%Y-%m-%d"), posture="APEX") + return trader + + +def test_restore_prefers_fresher_engine_snapshot_over_stale_latest_nautilus(): + trader = _build_trader() + trader.eng.capital = 25_000.0 + + trader.state_map = _MapStub( + { + "latest_nautilus": json.dumps( + { + "capital": 31_049.44, + "updated_at": "2026-05-13T10:52:40+00:00", + } + ), + "engine_snapshot": json.dumps( + { + "capital": 33_150.07, + "timestamp": "2026-05-13T16:20:38+00:00", + } + ), + } + ) + trader.pnl_map = _MapStub({}) + + with patch.dict(os.environ, {"DOLPHIN_CAPITAL_SEED_STALE_LAG_SEC": "180"}, clear=False): + trader._restore_capital() + + assert trader.eng.capital == pytest.approx(33_150.07, abs=0.01) + assert trader._restore_source == "HZ engine_snapshot" + + +def test_restore_can_force_latest_nautilus_override(): + trader = _build_trader() + trader.eng.capital = 25_000.0 + + trader.state_map = _MapStub( + { + "latest_nautilus": json.dumps( + { + "capital": 31_049.44, + "updated_at": "2026-05-13T10:52:40+00:00", + } + ), + "engine_snapshot": json.dumps( + { + "capital": 33_150.07, + "timestamp": "2026-05-13T16:20:38+00:00", + } + ), + } + ) + trader.pnl_map = _MapStub({}) + + with patch.dict(os.environ, {"DOLPHIN_FORCE_LATEST_NAUTILUS_RESTORE": "1"}, clear=False): + trader._restore_capital() + + assert trader.eng.capital == pytest.approx(31_049.44, abs=0.01) + assert trader._restore_source == "HZ latest_nautilus"