From 6d08e97e285773e349110d16d8bccee5b3655e8b Mon Sep 17 00:00:00 2001 From: Codex Date: Tue, 16 Jun 2026 12:03:20 +0200 Subject: [PATCH] BLUE hardening: spool-poison guards, dead-session clock fix, HZ black-box, RETRACT race-safety MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Seven uncommitted production fixes to BLUE's main runner that the LIVE process has already been running since the 2026-06-15 17:23 restart (file mtime 17:17, pid started 17:23). Each fix answers a documented incident; committing now so they survive in history and a stray checkout can't silently revert running-config code on the next restart. 1. bars_held = max(0, int(...)) at BOTH journal sites (terminal + sub-day). CH column is UInt16 — a negative value poisons the spool with a head-of-line jam (incident 2026-06-12: bars_held=-106). 2. entry_bar = int(restored_entry_bar) at BOTH reconstruction sites; NEVER from chain_meta. trade_reconstruction payloads carry the DEAD session's bar counter, so the old override reinstated the stale clock frame the re-anchor exists to fix → negative bars_held → same UInt16 spool poison (zombie-trade resurrections, incident 2026-06-12). restored_entry_bar already encodes hold continuity via stored_bars in THIS session's frame. 3. capital parse handles list/ledger-style payloads: when the restore blob is a list of update rows, take the latest dict row instead of falling through to {} and losing the capital anchor. 4. _connect_hz routes the `hazelcast` logger to stderr at INFO. The silent-HZ-death investigation found ZERO client log lines because nothing routed them; without this the reactor's health is invisible. 5. _dump_blackbox(reason): forensic thread dump before a watchdog restart — lifecycle.is_running, active_connections, every thread's stack, and a flag when any hazelcast/reactor-named thread is MISSING (= reactor died, the prime suspect for the silent 40min–8h client deaths). print()-only, CIFS-safe. _watchdog_restart calls it first. 6. _drain_runtime_commands / _process_runtime_commands gain `*, allow_retract=True`; the heartbeat path drains with allow_retract=False and re-queues any RETRACT commands. A RETRACT can force a terminal close that must run through the scan-thread close finalizer, so the heartbeat must not race it. 7. +import traceback (for the black-box stack dumps). Co-Authored-By: Claude Opus 4.8 --- prod/nautilus_event_trader.py | 124 +++++++++++++++++++++++++++++++--- 1 file changed, 114 insertions(+), 10 deletions(-) diff --git a/prod/nautilus_event_trader.py b/prod/nautilus_event_trader.py index 304f63c..bb36844 100644 --- a/prod/nautilus_event_trader.py +++ b/prod/nautilus_event_trader.py @@ -10,6 +10,7 @@ import os import time import signal import threading +import traceback import urllib.request import uuid from dataclasses import replace @@ -1300,7 +1301,12 @@ class DolphinLiveTrader: if not raw: return None try: - data = json.loads(raw) if isinstance(raw, str) else (raw if isinstance(raw, dict) else {}) + data = json.loads(raw) if isinstance(raw, str) else raw + if isinstance(data, list): + # ledger-style payload (list of update rows): use the latest row + data = next((e for e in reversed(data) if isinstance(e, dict)), {}) + if not isinstance(data, dict): + data = {} capital = float(data.get("capital", 0) or 0) if capital >= 1.0 and math.isfinite(capital): return capital, data @@ -1850,7 +1856,13 @@ class DolphinLiveTrader: "notional": notional, "notional_entry": notional, "leverage": leverage, - "entry_bar": int(chain_meta.get("entry_bar", restored_entry_bar) if chain_recon else restored_entry_bar), + # NEVER take entry_bar from chain_meta: trade_reconstruction + # payloads carry the DEAD session's bar counter, so the + # override reinstated the stale clock frame the re-anchor + # exists to fix (negative bars_held → UInt16 spool poison, + # incident 2026-06-12). restored_entry_bar already encodes + # hold continuity via stored_bars in THIS session's frame. + "entry_bar": int(restored_entry_bar), "entry_ts": int(chain_meta.get("entry_ts", entry_ts_us) or entry_ts_us) if chain_recon else entry_ts_us, "retraction_legs": int(chain_meta.get("retraction_legs", chain_meta.get("chain_seq", 0)) or 0) if chain_recon else 0, "realized_pnl_legs_total": float(chain_meta.get("realized_pnl_legs_total", 0.0) or 0.0) if chain_recon else 0.0, @@ -2112,7 +2124,11 @@ class DolphinLiveTrader: "notional": notional, "notional_entry": notional, "leverage": leverage, - "entry_bar": int(chain_meta.get("entry_bar", restored_entry_bar) if chain_recon else restored_entry_bar), + # NEVER take entry_bar from chain_meta: trade_reconstruction + # payloads carry the DEAD session's bar counter — the override + # reinstated the stale clock frame the re-anchor exists to fix + # (negative bars_held → UInt16 spool poison, incident 2026-06-12). + "entry_bar": int(restored_entry_bar), "entry_ts": int(chain_meta.get("entry_ts", 0) or 0) if chain_recon else 0, "retraction_legs": int(chain_meta.get("retraction_legs", chain_meta.get("chain_seq", 0)) or 0) if chain_recon else 0, "realized_pnl_legs_total": float(chain_meta.get("realized_pnl_legs_total", 0.0) or 0.0) if chain_recon else 0.0, @@ -2421,6 +2437,17 @@ class DolphinLiveTrader: def _connect_hz(self): log("Connecting to Hazelcast...") import hazelcast + import logging as _logging + # Client lifecycle events (connection added/removed, heartbeat, + # reconnect attempts) at INFO to stderr — the 2026-06-12 silent-death + # investigation found ZERO client log lines because nothing routed + # them; without this the reactor's health is invisible. + _hz_logger = _logging.getLogger("hazelcast") + if not _hz_logger.handlers: + _h = _logging.StreamHandler() + _h.setFormatter(_logging.Formatter("%(asctime)s HZCLIENT %(levelname)s %(name)s: %(message)s")) + _hz_logger.addHandler(_h) + _hz_logger.setLevel(_logging.INFO) self.hz_client = hazelcast.HazelcastClient( cluster_name=HZ_CLUSTER, cluster_members=[HZ_HOST], @@ -2474,7 +2501,11 @@ class DolphinLiveTrader: ), ) if self.control_map is not None: - self._drain_runtime_commands() + # RETRACT can produce a forced terminal close which must + # run through the scan-thread close finalizer. The + # heartbeat may still apply non-exit commands while scans + # are quiet, but it must leave RETRACT queued. + self._drain_runtime_commands(allow_retract=False) except Exception as e: # Never route heartbeat failures through the mounted trade log: # if that filesystem is sick, the exception handler must still @@ -2522,7 +2553,49 @@ class DolphinLiveTrader: except Exception: return None + def _dump_blackbox(self, reason: str): + """Forensic dump before a watchdog restart — answers WHY the HZ client + died (incidents: silent client death every 40min–8h, no exception ever + reaches stderr; prime suspect is the hazelcast reactor thread, which + runs I/O + future completion + event dispatch + heartbeat manager, so + its death is silent by construction). print() only — CIFS-safe.""" + try: + import sys as _sys + now_iso = datetime.now(timezone.utc).isoformat() + print(f"[{now_iso}] BLACKBOX dump ({reason}):", flush=True) + try: + running_flag = self.hz_client.lifecycle_service.is_running() + except Exception as exc: + running_flag = f"err:{exc}" + print(f" hz_client.lifecycle.is_running={running_flag}", flush=True) + try: + cm = getattr(self.hz_client, "_connection_manager", None) + conns = getattr(cm, "active_connections", None) + print(f" active_connections={conns!r}", flush=True) + except Exception as exc: + print(f" connection introspect failed: {exc}", flush=True) + frames = _sys._current_frames() + for th in threading.enumerate(): + frame = frames.get(th.ident) + hz_mark = " " if "hazelcast" in th.name.lower() or "reactor" in th.name.lower() else "" + print(f" THREAD {th.name} daemon={th.daemon} alive={th.is_alive()}{hz_mark}", flush=True) + if frame is not None: + for fl in traceback.format_stack(frame): + for ln in fl.rstrip().splitlines(): + print(f" {ln}", flush=True) + # any hazelcast-named thread MISSING from the enumeration = reactor died + hz_threads = [t.name for t in threading.enumerate() + if "hazelcast" in t.name.lower() or "reactor" in t.name.lower()] + print(f" hazelcast-ish threads present: {hz_threads or 'NONE — reactor thread is DEAD'}", + flush=True) + except Exception as exc: + print(f" BLACKBOX dump failed: {exc}", flush=True) + def _watchdog_restart(self, reason: str): + try: + self._dump_blackbox(reason) + except Exception: + pass print(f"[{datetime.now(timezone.utc).isoformat()}] " f"WATCHDOG_RESTART: {reason} — exiting {WATCHDOG_EXIT_CODE} for " f"supervisord respawn (capital/position restore on boot)", flush=True) @@ -3694,7 +3767,12 @@ class DolphinLiveTrader: ) return None, "PARTIAL_OK" - def _process_runtime_commands(self, prices_dict: dict) -> dict | None: + def _process_runtime_commands( + self, + prices_dict: dict, + *, + allow_retract: bool = True, + ) -> dict | None: """Drain BLUE runtime commands from control plane and apply retractions.""" if self.control_map is None: return None @@ -3706,7 +3784,22 @@ class DolphinLiveTrader: queue = json.loads(raw) if isinstance(raw, str) else list(raw) if not isinstance(queue, list) or not queue: return None - self.control_map.blocking().put(key, json.dumps([])) + if allow_retract: + self.control_map.blocking().put(key, json.dumps([])) + else: + deferred = [ + cmd for cmd in queue + if isinstance(cmd, dict) + and str(cmd.get("action", "") or "").upper() == "RETRACT" + ] + queue = [ + cmd for cmd in queue + if not ( + isinstance(cmd, dict) + and str(cmd.get("action", "") or "").upper() == "RETRACT" + ) + ] + self.control_map.blocking().put(key, json.dumps(deferred)) except Exception as e: log(f"RUNTIME_CMD read failed: {e}") return None @@ -3752,14 +3845,22 @@ class DolphinLiveTrader: continue return forced_exit - def _drain_runtime_commands(self, prices_dict: dict | None = None) -> dict | None: + def _drain_runtime_commands( + self, + prices_dict: dict | None = None, + *, + allow_retract: bool = True, + ) -> dict | None: """Serialize queue draining so the scan and heartbeat paths do not race.""" lock = getattr(self, "_runtime_command_lock", None) if lock is None: lock = threading.Lock() self._runtime_command_lock = lock with lock: - return self._process_runtime_commands(dict(prices_dict or self._last_prices_dict or {})) + return self._process_runtime_commands( + dict(prices_dict or self._last_prices_dict or {}), + allow_retract=allow_retract, + ) def _compute_vol_ok(self, scan): assets = scan.get('assets', []) @@ -4457,7 +4558,9 @@ class DolphinLiveTrader: "beta_at_entry": pending['beta_at_entry'], "posture": pending['posture'], "leverage": pending['leverage'], - "bars_held": int(x.get('bars_held', 0) or 0), + # CH column is UInt16 — a negative value poisons the spool + # (head-of-line jam, incident 2026-06-12: bars_held=-106) + "bars_held": max(0, int(x.get('bars_held', 0) or 0)), "regime_signal": 0, "tp_threshold": float(self.eng.exit_manager.fixed_tp_pct), "execution_quality_json": json.dumps(execution_quality, default=str), @@ -4807,7 +4910,8 @@ class DolphinLiveTrader: "beta_at_entry": float(pending.get('beta_at_entry', 0) or 0), "posture": pending.get('posture', ''), "leverage": float(pending.get('leverage', 0) or 0), - "bars_held": int(subday_exit.get('bars_held', 0) or 0), + # CH column is UInt16 — negative poisons the spool + "bars_held": max(0, int(subday_exit.get('bars_held', 0) or 0)), "regime_signal": 0, "execution_quality_json": json.dumps(execution_quality, default=str), "market_state_bundle_json": str(pending.get("market_state_bundle_json", "") or ""),