BLUE hardening: spool-poison guards, dead-session clock fix, HZ black-box, RETRACT race-safety

Seven uncommitted production fixes to BLUE's main runner that the LIVE
process has already been running since the 2026-06-15 17:23 restart (file
mtime 17:17, pid started 17:23). Each fix answers a documented incident;
committing now so they survive in history and a stray checkout can't
silently revert running-config code on the next restart.

1. bars_held = max(0, int(...)) at BOTH journal sites (terminal + sub-day).
   CH column is UInt16 — a negative value poisons the spool with a
   head-of-line jam (incident 2026-06-12: bars_held=-106).

2. entry_bar = int(restored_entry_bar) at BOTH reconstruction sites; NEVER
   from chain_meta. trade_reconstruction payloads carry the DEAD session's
   bar counter, so the old override reinstated the stale clock frame the
   re-anchor exists to fix → negative bars_held → same UInt16 spool poison
   (zombie-trade resurrections, incident 2026-06-12). restored_entry_bar
   already encodes hold continuity via stored_bars in THIS session's frame.

3. capital parse handles list/ledger-style payloads: when the restore blob
   is a list of update rows, take the latest dict row instead of falling
   through to {} and losing the capital anchor.

4. _connect_hz routes the `hazelcast` logger to stderr at INFO. The
   silent-HZ-death investigation found ZERO client log lines because
   nothing routed them; without this the reactor's health is invisible.

5. _dump_blackbox(reason): forensic thread dump before a watchdog restart —
   lifecycle.is_running, active_connections, every thread's stack, and a
   flag when any hazelcast/reactor-named thread is MISSING (= reactor died,
   the prime suspect for the silent 40min–8h client deaths). print()-only,
   CIFS-safe. _watchdog_restart calls it first.

6. _drain_runtime_commands / _process_runtime_commands gain
   `*, allow_retract=True`; the heartbeat path drains with
   allow_retract=False and re-queues any RETRACT commands. A RETRACT can
   force a terminal close that must run through the scan-thread close
   finalizer, so the heartbeat must not race it.

7. +import traceback (for the black-box stack dumps).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Codex
2026-06-16 12:03:20 +02:00
parent 2629795a35
commit 6d08e97e28

View File

@@ -10,6 +10,7 @@ import os
import time import time
import signal import signal
import threading import threading
import traceback
import urllib.request import urllib.request
import uuid import uuid
from dataclasses import replace from dataclasses import replace
@@ -1300,7 +1301,12 @@ class DolphinLiveTrader:
if not raw: if not raw:
return None return None
try: try:
data = json.loads(raw) if isinstance(raw, str) else (raw if isinstance(raw, dict) else {}) data = json.loads(raw) if isinstance(raw, str) else raw
if isinstance(data, list):
# ledger-style payload (list of update rows): use the latest row
data = next((e for e in reversed(data) if isinstance(e, dict)), {})
if not isinstance(data, dict):
data = {}
capital = float(data.get("capital", 0) or 0) capital = float(data.get("capital", 0) or 0)
if capital >= 1.0 and math.isfinite(capital): if capital >= 1.0 and math.isfinite(capital):
return capital, data return capital, data
@@ -1850,7 +1856,13 @@ class DolphinLiveTrader:
"notional": notional, "notional": notional,
"notional_entry": notional, "notional_entry": notional,
"leverage": leverage, "leverage": leverage,
"entry_bar": int(chain_meta.get("entry_bar", restored_entry_bar) if chain_recon else restored_entry_bar), # NEVER take entry_bar from chain_meta: trade_reconstruction
# payloads carry the DEAD session's bar counter, so the
# override reinstated the stale clock frame the re-anchor
# exists to fix (negative bars_held → UInt16 spool poison,
# incident 2026-06-12). restored_entry_bar already encodes
# hold continuity via stored_bars in THIS session's frame.
"entry_bar": int(restored_entry_bar),
"entry_ts": int(chain_meta.get("entry_ts", entry_ts_us) or entry_ts_us) if chain_recon else entry_ts_us, "entry_ts": int(chain_meta.get("entry_ts", entry_ts_us) or entry_ts_us) if chain_recon else entry_ts_us,
"retraction_legs": int(chain_meta.get("retraction_legs", chain_meta.get("chain_seq", 0)) or 0) if chain_recon else 0, "retraction_legs": int(chain_meta.get("retraction_legs", chain_meta.get("chain_seq", 0)) or 0) if chain_recon else 0,
"realized_pnl_legs_total": float(chain_meta.get("realized_pnl_legs_total", 0.0) or 0.0) if chain_recon else 0.0, "realized_pnl_legs_total": float(chain_meta.get("realized_pnl_legs_total", 0.0) or 0.0) if chain_recon else 0.0,
@@ -2112,7 +2124,11 @@ class DolphinLiveTrader:
"notional": notional, "notional": notional,
"notional_entry": notional, "notional_entry": notional,
"leverage": leverage, "leverage": leverage,
"entry_bar": int(chain_meta.get("entry_bar", restored_entry_bar) if chain_recon else restored_entry_bar), # NEVER take entry_bar from chain_meta: trade_reconstruction
# payloads carry the DEAD session's bar counter — the override
# reinstated the stale clock frame the re-anchor exists to fix
# (negative bars_held → UInt16 spool poison, incident 2026-06-12).
"entry_bar": int(restored_entry_bar),
"entry_ts": int(chain_meta.get("entry_ts", 0) or 0) if chain_recon else 0, "entry_ts": int(chain_meta.get("entry_ts", 0) or 0) if chain_recon else 0,
"retraction_legs": int(chain_meta.get("retraction_legs", chain_meta.get("chain_seq", 0)) or 0) if chain_recon else 0, "retraction_legs": int(chain_meta.get("retraction_legs", chain_meta.get("chain_seq", 0)) or 0) if chain_recon else 0,
"realized_pnl_legs_total": float(chain_meta.get("realized_pnl_legs_total", 0.0) or 0.0) if chain_recon else 0.0, "realized_pnl_legs_total": float(chain_meta.get("realized_pnl_legs_total", 0.0) or 0.0) if chain_recon else 0.0,
@@ -2421,6 +2437,17 @@ class DolphinLiveTrader:
def _connect_hz(self): def _connect_hz(self):
log("Connecting to Hazelcast...") log("Connecting to Hazelcast...")
import hazelcast import hazelcast
import logging as _logging
# Client lifecycle events (connection added/removed, heartbeat,
# reconnect attempts) at INFO to stderr — the 2026-06-12 silent-death
# investigation found ZERO client log lines because nothing routed
# them; without this the reactor's health is invisible.
_hz_logger = _logging.getLogger("hazelcast")
if not _hz_logger.handlers:
_h = _logging.StreamHandler()
_h.setFormatter(_logging.Formatter("%(asctime)s HZCLIENT %(levelname)s %(name)s: %(message)s"))
_hz_logger.addHandler(_h)
_hz_logger.setLevel(_logging.INFO)
self.hz_client = hazelcast.HazelcastClient( self.hz_client = hazelcast.HazelcastClient(
cluster_name=HZ_CLUSTER, cluster_name=HZ_CLUSTER,
cluster_members=[HZ_HOST], cluster_members=[HZ_HOST],
@@ -2474,7 +2501,11 @@ class DolphinLiveTrader:
), ),
) )
if self.control_map is not None: if self.control_map is not None:
self._drain_runtime_commands() # RETRACT can produce a forced terminal close which must
# run through the scan-thread close finalizer. The
# heartbeat may still apply non-exit commands while scans
# are quiet, but it must leave RETRACT queued.
self._drain_runtime_commands(allow_retract=False)
except Exception as e: except Exception as e:
# Never route heartbeat failures through the mounted trade log: # Never route heartbeat failures through the mounted trade log:
# if that filesystem is sick, the exception handler must still # if that filesystem is sick, the exception handler must still
@@ -2522,7 +2553,49 @@ class DolphinLiveTrader:
except Exception: except Exception:
return None return None
def _dump_blackbox(self, reason: str):
"""Forensic dump before a watchdog restart — answers WHY the HZ client
died (incidents: silent client death every 40min8h, no exception ever
reaches stderr; prime suspect is the hazelcast reactor thread, which
runs I/O + future completion + event dispatch + heartbeat manager, so
its death is silent by construction). print() only — CIFS-safe."""
try:
import sys as _sys
now_iso = datetime.now(timezone.utc).isoformat()
print(f"[{now_iso}] BLACKBOX dump ({reason}):", flush=True)
try:
running_flag = self.hz_client.lifecycle_service.is_running()
except Exception as exc:
running_flag = f"err:{exc}"
print(f" hz_client.lifecycle.is_running={running_flag}", flush=True)
try:
cm = getattr(self.hz_client, "_connection_manager", None)
conns = getattr(cm, "active_connections", None)
print(f" active_connections={conns!r}", flush=True)
except Exception as exc:
print(f" connection introspect failed: {exc}", flush=True)
frames = _sys._current_frames()
for th in threading.enumerate():
frame = frames.get(th.ident)
hz_mark = " <HZ?>" if "hazelcast" in th.name.lower() or "reactor" in th.name.lower() else ""
print(f" THREAD {th.name} daemon={th.daemon} alive={th.is_alive()}{hz_mark}", flush=True)
if frame is not None:
for fl in traceback.format_stack(frame):
for ln in fl.rstrip().splitlines():
print(f" {ln}", flush=True)
# any hazelcast-named thread MISSING from the enumeration = reactor died
hz_threads = [t.name for t in threading.enumerate()
if "hazelcast" in t.name.lower() or "reactor" in t.name.lower()]
print(f" hazelcast-ish threads present: {hz_threads or 'NONE — reactor thread is DEAD'}",
flush=True)
except Exception as exc:
print(f" BLACKBOX dump failed: {exc}", flush=True)
def _watchdog_restart(self, reason: str): def _watchdog_restart(self, reason: str):
try:
self._dump_blackbox(reason)
except Exception:
pass
print(f"[{datetime.now(timezone.utc).isoformat()}] " print(f"[{datetime.now(timezone.utc).isoformat()}] "
f"WATCHDOG_RESTART: {reason} — exiting {WATCHDOG_EXIT_CODE} for " f"WATCHDOG_RESTART: {reason} — exiting {WATCHDOG_EXIT_CODE} for "
f"supervisord respawn (capital/position restore on boot)", flush=True) f"supervisord respawn (capital/position restore on boot)", flush=True)
@@ -3694,7 +3767,12 @@ class DolphinLiveTrader:
) )
return None, "PARTIAL_OK" return None, "PARTIAL_OK"
def _process_runtime_commands(self, prices_dict: dict) -> dict | None: def _process_runtime_commands(
self,
prices_dict: dict,
*,
allow_retract: bool = True,
) -> dict | None:
"""Drain BLUE runtime commands from control plane and apply retractions.""" """Drain BLUE runtime commands from control plane and apply retractions."""
if self.control_map is None: if self.control_map is None:
return None return None
@@ -3706,7 +3784,22 @@ class DolphinLiveTrader:
queue = json.loads(raw) if isinstance(raw, str) else list(raw) queue = json.loads(raw) if isinstance(raw, str) else list(raw)
if not isinstance(queue, list) or not queue: if not isinstance(queue, list) or not queue:
return None return None
if allow_retract:
self.control_map.blocking().put(key, json.dumps([])) self.control_map.blocking().put(key, json.dumps([]))
else:
deferred = [
cmd for cmd in queue
if isinstance(cmd, dict)
and str(cmd.get("action", "") or "").upper() == "RETRACT"
]
queue = [
cmd for cmd in queue
if not (
isinstance(cmd, dict)
and str(cmd.get("action", "") or "").upper() == "RETRACT"
)
]
self.control_map.blocking().put(key, json.dumps(deferred))
except Exception as e: except Exception as e:
log(f"RUNTIME_CMD read failed: {e}") log(f"RUNTIME_CMD read failed: {e}")
return None return None
@@ -3752,14 +3845,22 @@ class DolphinLiveTrader:
continue continue
return forced_exit return forced_exit
def _drain_runtime_commands(self, prices_dict: dict | None = None) -> dict | None: def _drain_runtime_commands(
self,
prices_dict: dict | None = None,
*,
allow_retract: bool = True,
) -> dict | None:
"""Serialize queue draining so the scan and heartbeat paths do not race.""" """Serialize queue draining so the scan and heartbeat paths do not race."""
lock = getattr(self, "_runtime_command_lock", None) lock = getattr(self, "_runtime_command_lock", None)
if lock is None: if lock is None:
lock = threading.Lock() lock = threading.Lock()
self._runtime_command_lock = lock self._runtime_command_lock = lock
with lock: with lock:
return self._process_runtime_commands(dict(prices_dict or self._last_prices_dict or {})) return self._process_runtime_commands(
dict(prices_dict or self._last_prices_dict or {}),
allow_retract=allow_retract,
)
def _compute_vol_ok(self, scan): def _compute_vol_ok(self, scan):
assets = scan.get('assets', []) assets = scan.get('assets', [])
@@ -4457,7 +4558,9 @@ class DolphinLiveTrader:
"beta_at_entry": pending['beta_at_entry'], "beta_at_entry": pending['beta_at_entry'],
"posture": pending['posture'], "posture": pending['posture'],
"leverage": pending['leverage'], "leverage": pending['leverage'],
"bars_held": int(x.get('bars_held', 0) or 0), # CH column is UInt16 — a negative value poisons the spool
# (head-of-line jam, incident 2026-06-12: bars_held=-106)
"bars_held": max(0, int(x.get('bars_held', 0) or 0)),
"regime_signal": 0, "regime_signal": 0,
"tp_threshold": float(self.eng.exit_manager.fixed_tp_pct), "tp_threshold": float(self.eng.exit_manager.fixed_tp_pct),
"execution_quality_json": json.dumps(execution_quality, default=str), "execution_quality_json": json.dumps(execution_quality, default=str),
@@ -4807,7 +4910,8 @@ class DolphinLiveTrader:
"beta_at_entry": float(pending.get('beta_at_entry', 0) or 0), "beta_at_entry": float(pending.get('beta_at_entry', 0) or 0),
"posture": pending.get('posture', ''), "posture": pending.get('posture', ''),
"leverage": float(pending.get('leverage', 0) or 0), "leverage": float(pending.get('leverage', 0) or 0),
"bars_held": int(subday_exit.get('bars_held', 0) or 0), # CH column is UInt16 — negative poisons the spool
"bars_held": max(0, int(subday_exit.get('bars_held', 0) or 0)),
"regime_signal": 0, "regime_signal": 0,
"execution_quality_json": json.dumps(execution_quality, default=str), "execution_quality_json": json.dumps(execution_quality, default=str),
"market_state_bundle_json": str(pending.get("market_state_bundle_json", "") or ""), "market_state_bundle_json": str(pending.get("market_state_bundle_json", "") or ""),