"""Distal fix for MALFORMED_OPEN_RESTORE_BUG — regression suite (Option A). Pins the causal fix set applied 2026-06-12: 1. `_ps_write_open` is the SINGLE gate for OPEN position_state rows and enforces the lifecycle invariant (qty > 0 AND notional > dust). 2. The partial-retract path persists remainders THROUGH that gate (previously a raw ch_put bypass — the causal origin of zero-size OPEN snapshots). 3. The dust threshold (POSITION_DUST_NOTIONAL_USD) is shared between the full-close decision and the write gate, so no remainder can be "open" in memory yet round to a zero-size row on disk. 4. The terminal retract leg writes its trade_exit_legs + trade_reconstruction rows (the old early-return lost the final leg from the §38.9 replay surface). 5. Restore-candidate rejection with exhausted fallbacks marks `_restore_failed` (halt) instead of silently trading from flat (XTZ 863c21da single-slot-violation class). 6. Chain-token mismatch emits a queryable CHAIN_TOKEN_MISMATCH journal event, not just a log line. 7. Restored entry_bar preserves bars_held continuity (negative entry_bar allowed; the old max(0, ...) clamp reset the MAX_HOLD clock). """ from __future__ import annotations import json import threading import tempfile from dataclasses import dataclass import importlib.util from pathlib import Path import pytest _MOD_PATH = Path("/mnt/dolphinng5_predict/prod/nautilus_event_trader.py") _SPEC = importlib.util.spec_from_file_location("nautilus_event_trader_distal_mod", _MOD_PATH) assert _SPEC and _SPEC.loader mod = importlib.util.module_from_spec(_SPEC) _SPEC.loader.exec_module(mod) # type: ignore[arg-type] # ── harness (mirrors test_multi_exit_retraction_integration.py) ───────────── @dataclass class _Pos: trade_id: str asset: str entry_price: float notional: float current_price: float = 0.0 pnl_pct: float = 0.0 class _ExitMgr: def __init__(self) -> None: self._positions: dict[str, dict] = {} class _Eng: def __init__(self, pos: _Pos | None, capital: float = 25_000.0) -> None: self.position = pos self.capital = capital self.exit_manager = _ExitMgr() self.regime_dd_halt = False self._day_posture = "APEX" if pos is not None: self.exit_manager._positions[pos.trade_id] = {"dummy": True} class _Map: def __init__(self, initial: dict | None = None) -> None: self._d = dict(initial or {}) self._lock = threading.Lock() def blocking(self): return self def get(self, key): with self._lock: return self._d.get(key) def put(self, key, val): with self._lock: self._d[key] = val class _F: def add_done_callback(self, _cb): return None return _F() def _mk_trader() -> "mod.DolphinLiveTrader": t = object.__new__(mod.DolphinLiveTrader) tmpdir = Path(tempfile.mkdtemp(prefix="dolphin_distal_test_")) mod.CAPITAL_DISK_CHECKPOINT = tmpdir / "capital_checkpoint.json" mod.CAPITAL_CORRECTIVE_REPLAY = tmpdir / "capital_replay.json" mod.CAPITAL_UPDATE_LEDGER = tmpdir / "capital_update_ledger.json" t.eng_lock = threading.Lock() t.state_map = _Map({}) t.pnl_map = _Map({}) t.control_map = _Map({"blue_runtime_commands": "[]"}) t._processed_retract_commands = mod.deque(maxlen=5000) t._processed_retract_set = set() t._runtime_command_lock = threading.Lock() t._pending_entries = {} t._last_prices_dict = {} t.current_day = "2026-06-12" t.bar_idx = 100 t._restore_failed = False t._restore_failure_reason = "" return t def _seed_pending(t, trade_id: str, *, notional: float = 1000.0, entry_price: float = 1.0) -> None: t._pending_entries[trade_id] = { "asset": "STXUSDT", "side": "SHORT", "entry_price": entry_price, "entry_bar": 90, "entry_date": "2026-06-12", "notional": notional, "notional_entry": notional, "retraction_legs": 0, "realized_pnl_legs_total": 0.0, "leverage": 2.0, } pending = t._pending_entries[trade_id] pending.update(t._chain_state_for_pending( trade_id, pending, chain_mode="LIVE", chain_head_leg_id=f"{trade_id}:open", chain_prev_leg_id="", chain_seq=0, )) def _retract_cmd(t, trade_id: str, *, command_id: str, fraction: float) -> dict: pending = t._pending_entries[trade_id] return { "command_id": command_id, "trade_id": trade_id, "action": "RETRACT", "fraction": fraction, "reason": "HOTKEY_RETRACT", "source": "tui_hotkey", "chain_root_trade_id": pending["chain_root_trade_id"], "chain_head_leg_id": pending["chain_head_leg_id"], "chain_prev_leg_id": pending["chain_prev_leg_id"], "chain_seq": pending["chain_seq"], "chain_token": pending["chain_token"], } def _run_retract(t, rows, *, fraction: float, notional: float = 1000.0, price: float = 1.0): pos = _Pos("T-1", "STXUSDT", 1.0, notional, current_price=price) t.eng = _Eng(pos) _seed_pending(t, "T-1", notional=notional) cmd = _retract_cmd(t, "T-1", command_id=f"c-{fraction}", fraction=fraction) t.control_map.put("blue_runtime_commands", json.dumps([cmd])) return t._process_runtime_commands({"STXUSDT": price}) # ── 1+2: lifecycle invariant at the single write gate ─────────────────────── def test_ps_write_open_refuses_zero_quantity(monkeypatch): rows = [] monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row))) t = _mk_trader() ok = t._ps_write_open("T-z", { "asset": "STXUSDT", "side": "SHORT", "entry_price": 1.0, "quantity": 0.0, "leverage": 2.0, "entry_ts": 1, }) assert ok is False assert rows == [] def test_ps_write_open_refuses_dust_notional(monkeypatch): rows = [] monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row))) t = _mk_trader() # qty>0 but notional rounds to <= $0.01 (the malformed-row recipe) ok = t._ps_write_open("T-d", { "asset": "STXUSDT", "side": "SHORT", "entry_price": 0.0001, "quantity": 40.0, "leverage": 2.0, "entry_ts": 1, # notional 0.004 }) assert ok is False assert rows == [] def test_ps_write_open_accepts_valid_entry(monkeypatch): rows = [] monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row))) t = _mk_trader() ok = t._ps_write_open("T-v", { "asset": "STXUSDT", "side": "SHORT", "entry_price": 1.0, "quantity": 1000.0, "leverage": 2.0, "entry_ts": 777, }) assert ok is True assert len(rows) == 1 tbl, row = rows[0] assert tbl == "position_state" assert row["status"] == "OPEN" assert row["quantity"] > 0 and row["notional"] > 0 assert row["ts"] == 777 and row["bars_held"] == 0 # ── 2+3: retract remainder goes through the gate; thresholds aligned ──────── def test_partial_retract_open_row_never_zero_sized(monkeypatch): """Sweep fractions: every surviving OPEN row must satisfy the invariant.""" for fraction in (0.1, 0.5, 0.9, 0.99, 0.999): rows = [] monkeypatch.setattr(mod, "ch_put", lambda tbl, row, _r=rows: _r.append((tbl, row))) monkeypatch.setattr(mod, "_ch_ts_us", lambda: 123) t = _mk_trader() forced = _run_retract(t, rows, fraction=fraction) open_rows = [r for tbl, r in rows if tbl == "position_state" and r.get("status") == "OPEN"] if forced is None: assert len(open_rows) == 1, f"fraction={fraction}" assert open_rows[0]["quantity"] > 0.0 assert open_rows[0]["notional"] > mod.POSITION_DUST_NOTIONAL_USD else: assert open_rows == [], f"fraction={fraction} full-close wrote OPEN row" def test_dust_remainder_is_full_close(monkeypatch): """$1000 × 0.99999 retract leaves $0.01 — at/below dust ⇒ FULL_CLOSE, no OPEN snapshot, forced exit carries capital_already_realized.""" rows = [] monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row))) monkeypatch.setattr(mod, "_ch_ts_us", lambda: 123) t = _mk_trader() forced = _run_retract(t, rows, fraction=0.99999) assert forced is not None assert forced.get("capital_already_realized") is True assert t.eng.position is None open_rows = [r for tbl, r in rows if tbl == "position_state" and r.get("status") == "OPEN"] assert open_rows == [] def test_full_retract_writes_terminal_leg_rows(monkeypatch): """fraction=1.0: terminal leg MUST appear in trade_exit_legs and trade_reconstruction (FULL_RETRACT_EXIT) — the §38.9 replay surface.""" rows = [] monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row))) monkeypatch.setattr(mod, "_ch_ts_us", lambda: 123) t = _mk_trader() forced = _run_retract(t, rows, fraction=1.0, price=0.95) assert forced is not None and forced["reason"] == "HOTKEY_RETRACT" legs = [r for tbl, r in rows if tbl == "trade_exit_legs"] assert len(legs) == 1 assert legs[0]["exit_seq"] == 1 assert legs[0]["remaining_notional"] <= mod.POSITION_DUST_NOTIONAL_USD recon = [r for tbl, r in rows if tbl == "trade_reconstruction"] assert any(r["event_type"] == "FULL_RETRACT_EXIT" for r in recon) open_rows = [r for tbl, r in rows if tbl == "position_state" and r.get("status") == "OPEN"] assert open_rows == [] # capital realized exactly once for the leg: 5% on $1000 short = +$50 assert pytest.approx(t.eng.capital, abs=1e-6) == 25_050.0 assert pytest.approx(forced["net_pnl"], abs=1e-6) == 50.0 def test_partial_then_full_chain_keeps_all_legs(monkeypatch): """Two-leg chain: every leg lands in trade_exit_legs; totals coherent.""" rows = [] monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row))) monkeypatch.setattr(mod, "_ch_ts_us", lambda: 123) t = _mk_trader() pos = _Pos("T-1", "STXUSDT", 1.0, 1000.0, current_price=0.95) t.eng = _Eng(pos) _seed_pending(t, "T-1", notional=1000.0) cmd1 = _retract_cmd(t, "T-1", command_id="c-a", fraction=0.5) t.control_map.put("blue_runtime_commands", json.dumps([cmd1])) assert t._process_runtime_commands({"STXUSDT": 0.95}) is None cmd2 = _retract_cmd(t, "T-1", command_id="c-b", fraction=1.0) t.control_map.put("blue_runtime_commands", json.dumps([cmd2])) forced = t._process_runtime_commands({"STXUSDT": 0.95}) assert forced is not None legs = [r for tbl, r in rows if tbl == "trade_exit_legs"] assert [l["exit_seq"] for l in legs] == [1, 2] # both legs at +5%: 0.05*500 + 0.05*500 = 50 total assert pytest.approx(forced["net_pnl"], abs=1e-6) == 50.0 assert pytest.approx(legs[0]["pnl_leg"] + legs[1]["pnl_leg"], abs=1e-6) == 50.0 # ── 5: reject-exhaustion halts instead of silently trading flat ───────────── def test_restore_reject_exhaustion_marks_failure(monkeypatch): """CH returns a candidate with invalid leverage; HZ has neither a position nor flat-proof ⇒ _restore_failed must be set (halt).""" monkeypatch.setattr(mod, "ch_put", lambda *a, **k: None) class _Resp: def __init__(self, body: bytes): self._b = body def read(self): return self._b def __enter__(self): return self def __exit__(self, *a): return False # leverage = 0 → invalid candidate; fresh ts to pass staleness # columns: trade_id asset direction entry_price quantity notional # leverage bucket_id bars_held last_ts (10 fields) import datetime as _dt ts = _dt.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") row = f"T-bad\tXTZUSDT\t-1\t0.2276\t1000\t227.6\t0\t0\t14\t{ts}" import urllib.request as _ur monkeypatch.setattr(_ur, "urlopen", lambda *a, **k: _Resp(row.encode())) t = _mk_trader() t.eng = _Eng(None) t._restore_state_snapshots = {} t._parse_timestamp_seconds = mod.DolphinLiveTrader._parse_timestamp_seconds.__get__(t) t._restore_position_state() assert t._restore_failed is True assert "leverage" in t._restore_failure_reason # ── 6: chain mismatch is a queryable journal event ─────────────────────────── def test_chain_token_mismatch_emits_journal_event(monkeypatch): rows = [] monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row))) monkeypatch.setattr(mod, "_ch_ts_us", lambda: 123) t = _mk_trader() pending = { "asset": "XTZUSDT", "side": "SHORT", "entry_price": 0.2276, "quantity": 1000.0, "notional": 227.6, "notional_entry": 227.6, "leverage": 2.0, "entry_bar": 0, "retraction_legs": 0, "realized_pnl_legs_total": 0.0, } recon = {"chain_token": "deadbeef" * 8, "chain_seq": 0, "chain_head_leg_id": "T-x:open", "chain_mode": "LIVE"} chain = t._chain_state_from_reconstruction("T-x", pending, recon) assert chain["chain_mode"] == "LEGACY_REBUILT_MISMATCH" mismatch_rows = [r for tbl, r in rows if tbl == "trade_reconstruction" and r.get("event_type") == "CHAIN_TOKEN_MISMATCH"] assert len(mismatch_rows) == 1 payload = json.loads(mismatch_rows[0]["payload_json"]) assert payload["stored_token"].startswith("deadbeef") assert payload["derived_token"] != payload["stored_token"] def test_chain_token_match_emits_nothing(monkeypatch): rows = [] monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row))) t = _mk_trader() pending = { "asset": "XTZUSDT", "side": "SHORT", "entry_price": 0.2276, "quantity": 1000.0, "notional": 227.6, "notional_entry": 227.6, "leverage": 2.0, "entry_bar": 0, "retraction_legs": 0, "realized_pnl_legs_total": 0.0, } # derive the true token with the SAME chain parameters the recon will # carry (mode LIVE), then feed it back as the stored token expected = t._chain_state_for_pending( "T-y", pending, chain_mode="LIVE", chain_head_leg_id="T-y:open", chain_prev_leg_id="", chain_seq=0, ) recon = {"chain_token": expected["chain_token"], "chain_seq": 0, "chain_head_leg_id": "T-y:open", "chain_mode": "LIVE"} chain = t._chain_state_from_reconstruction("T-y", pending, recon) assert chain["chain_mode"] != "LEGACY_REBUILT_MISMATCH" assert [r for tbl, r in rows if r.get("event_type") == "CHAIN_TOKEN_MISMATCH"] == [] # ── 7: bars_held continuity across restore ────────────────────────────────── def test_restored_entry_bar_preserves_bars_held(monkeypatch): """boot bar_idx=0, stored_bars=34 ⇒ entry_bar=-34 ⇒ bars_held resumes at 34 immediately (the XTZ bars_held≈0 / MAX_HOLD-reset fix).""" captured = {} monkeypatch.setattr(mod, "ch_put", lambda *a, **k: None) class _Resp: def __init__(self, body: bytes): self._b = body def read(self): return self._b def __enter__(self): return self def __exit__(self, *a): return False import datetime as _dt ts = _dt.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") row = f"T-c\tXTZUSDT\t-1\t0.2276\t248173\t56484.4\t6.37\t0\t34\t{ts}" import urllib.request as _ur monkeypatch.setattr(_ur, "urlopen", lambda *a, **k: _Resp(row.encode())) class _RecordingExitMgr(_ExitMgr): def setup_position(self, trade_id, entry_price, direction, entry_bar, **kw): captured["entry_bar"] = entry_bar self._positions[trade_id] = {"entry_bar": entry_bar} t = _mk_trader() t.bar_idx = 0 # fresh boot t.eng = _Eng(None) t.eng.exit_manager = _RecordingExitMgr() t._restore_state_snapshots = {} t._parse_timestamp_seconds = mod.DolphinLiveTrader._parse_timestamp_seconds.__get__(t) t._load_chain_ledger_state = lambda _tid: None t._v7_exit_engine = None t._seed_posture_for_restored_position = lambda: None t._apply_catastrophic_floor_to_open_position = lambda: None t._restore_position_state() assert t._restore_failed is False assert t.eng.position is not None assert captured["entry_bar"] == -34 # bars_held = current_bar - entry_bar = 0 - (-34) = 34 → clock continues assert (t.bar_idx - captured["entry_bar"]) == 34 if __name__ == "__main__": import sys sys.exit(pytest.main([__file__, "-v"]))