BLUE: TP_FLOOR profit-floor + malformed-OPEN Option A (first add of trader)

TP_FLOOR (LINK 5e05eeeb, -$1,248.71): once the BASE 0.20% TP is crossed,
regression to base exits — caps the left tail of the OB cascade x1.40
TP-widening (which is logged per decision now: dynamic_tp_pct,
tp_mod_factor, cascade_count, ob_regime_signal, tp_floor_armed on
v7_decision_events). Class default OFF (champion parity); live ON via
DOLPHIN_TP_FLOOR.

Malformed-OPEN Option A (causal fix): POSITION_DUST_NOTIONAL_USD shared by
the full-close decision and the single _ps_write_open lifecycle gate (OPEN
rows can never round to zero size on disk); retract terminal leg writes its
trade_exit_legs + trade_reconstruction rows; restore reject-exhaustion halts
for unknown-corruption classes and flat-continues only for the documented
zero-size tombstone class; chain-token mismatch emits a CHAIN_TOKEN_MISMATCH
journal event; restored entry_bar preserves bars_held continuity (negative
entry_bar allowed, Int32) in both CH and HZ restore paths.

Tests: test_tp_floor.py 16/16 incl. LINK golden replay;
test_malformed_open_distal.py 11/11. Suites before/after identical except
one PRE-EXISTING failure fixed (full-close zero-size-row test).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Codex
2026-06-12 14:59:49 +02:00
parent 4929087f7a
commit 520257de7a
3 changed files with 5872 additions and 0 deletions

View File

@@ -0,0 +1,436 @@
"""Distal fix for MALFORMED_OPEN_RESTORE_BUG — regression suite (Option A).
Pins the causal fix set applied 2026-06-12:
1. `_ps_write_open` is the SINGLE gate for OPEN position_state rows and
enforces the lifecycle invariant (qty > 0 AND notional > dust).
2. The partial-retract path persists remainders THROUGH that gate
(previously a raw ch_put bypass — the causal origin of zero-size
OPEN snapshots).
3. The dust threshold (POSITION_DUST_NOTIONAL_USD) is shared between the
full-close decision and the write gate, so no remainder can be "open"
in memory yet round to a zero-size row on disk.
4. The terminal retract leg writes its trade_exit_legs +
trade_reconstruction rows (the old early-return lost the final leg
from the §38.9 replay surface).
5. Restore-candidate rejection with exhausted fallbacks marks
`_restore_failed` (halt) instead of silently trading from flat
(XTZ 863c21da single-slot-violation class).
6. Chain-token mismatch emits a queryable CHAIN_TOKEN_MISMATCH journal
event, not just a log line.
7. Restored entry_bar preserves bars_held continuity (negative entry_bar
allowed; the old max(0, ...) clamp reset the MAX_HOLD clock).
"""
from __future__ import annotations
import json
import threading
import tempfile
from dataclasses import dataclass
import importlib.util
from pathlib import Path
import pytest
_MOD_PATH = Path("/mnt/dolphinng5_predict/prod/nautilus_event_trader.py")
_SPEC = importlib.util.spec_from_file_location("nautilus_event_trader_distal_mod", _MOD_PATH)
assert _SPEC and _SPEC.loader
mod = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(mod) # type: ignore[arg-type]
# ── harness (mirrors test_multi_exit_retraction_integration.py) ─────────────
@dataclass
class _Pos:
trade_id: str
asset: str
entry_price: float
notional: float
current_price: float = 0.0
pnl_pct: float = 0.0
class _ExitMgr:
def __init__(self) -> None:
self._positions: dict[str, dict] = {}
class _Eng:
def __init__(self, pos: _Pos | None, capital: float = 25_000.0) -> None:
self.position = pos
self.capital = capital
self.exit_manager = _ExitMgr()
self.regime_dd_halt = False
self._day_posture = "APEX"
if pos is not None:
self.exit_manager._positions[pos.trade_id] = {"dummy": True}
class _Map:
def __init__(self, initial: dict | None = None) -> None:
self._d = dict(initial or {})
self._lock = threading.Lock()
def blocking(self):
return self
def get(self, key):
with self._lock:
return self._d.get(key)
def put(self, key, val):
with self._lock:
self._d[key] = val
class _F:
def add_done_callback(self, _cb):
return None
return _F()
def _mk_trader() -> "mod.DolphinLiveTrader":
t = object.__new__(mod.DolphinLiveTrader)
tmpdir = Path(tempfile.mkdtemp(prefix="dolphin_distal_test_"))
mod.CAPITAL_DISK_CHECKPOINT = tmpdir / "capital_checkpoint.json"
mod.CAPITAL_CORRECTIVE_REPLAY = tmpdir / "capital_replay.json"
mod.CAPITAL_UPDATE_LEDGER = tmpdir / "capital_update_ledger.json"
t.eng_lock = threading.Lock()
t.state_map = _Map({})
t.pnl_map = _Map({})
t.control_map = _Map({"blue_runtime_commands": "[]"})
t._processed_retract_commands = mod.deque(maxlen=5000)
t._processed_retract_set = set()
t._runtime_command_lock = threading.Lock()
t._pending_entries = {}
t._last_prices_dict = {}
t.current_day = "2026-06-12"
t.bar_idx = 100
t._restore_failed = False
t._restore_failure_reason = ""
return t
def _seed_pending(t, trade_id: str, *, notional: float = 1000.0,
entry_price: float = 1.0) -> None:
t._pending_entries[trade_id] = {
"asset": "STXUSDT",
"side": "SHORT",
"entry_price": entry_price,
"entry_bar": 90,
"entry_date": "2026-06-12",
"notional": notional,
"notional_entry": notional,
"retraction_legs": 0,
"realized_pnl_legs_total": 0.0,
"leverage": 2.0,
}
pending = t._pending_entries[trade_id]
pending.update(t._chain_state_for_pending(
trade_id, pending,
chain_mode="LIVE",
chain_head_leg_id=f"{trade_id}:open",
chain_prev_leg_id="",
chain_seq=0,
))
def _retract_cmd(t, trade_id: str, *, command_id: str, fraction: float) -> dict:
pending = t._pending_entries[trade_id]
return {
"command_id": command_id,
"trade_id": trade_id,
"action": "RETRACT",
"fraction": fraction,
"reason": "HOTKEY_RETRACT",
"source": "tui_hotkey",
"chain_root_trade_id": pending["chain_root_trade_id"],
"chain_head_leg_id": pending["chain_head_leg_id"],
"chain_prev_leg_id": pending["chain_prev_leg_id"],
"chain_seq": pending["chain_seq"],
"chain_token": pending["chain_token"],
}
def _run_retract(t, rows, *, fraction: float, notional: float = 1000.0,
price: float = 1.0):
pos = _Pos("T-1", "STXUSDT", 1.0, notional, current_price=price)
t.eng = _Eng(pos)
_seed_pending(t, "T-1", notional=notional)
cmd = _retract_cmd(t, "T-1", command_id=f"c-{fraction}", fraction=fraction)
t.control_map.put("blue_runtime_commands", json.dumps([cmd]))
return t._process_runtime_commands({"STXUSDT": price})
# ── 1+2: lifecycle invariant at the single write gate ───────────────────────
def test_ps_write_open_refuses_zero_quantity(monkeypatch):
rows = []
monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row)))
t = _mk_trader()
ok = t._ps_write_open("T-z", {
"asset": "STXUSDT", "side": "SHORT", "entry_price": 1.0,
"quantity": 0.0, "leverage": 2.0, "entry_ts": 1,
})
assert ok is False
assert rows == []
def test_ps_write_open_refuses_dust_notional(monkeypatch):
rows = []
monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row)))
t = _mk_trader()
# qty>0 but notional rounds to <= $0.01 (the malformed-row recipe)
ok = t._ps_write_open("T-d", {
"asset": "STXUSDT", "side": "SHORT", "entry_price": 0.0001,
"quantity": 40.0, "leverage": 2.0, "entry_ts": 1, # notional 0.004
})
assert ok is False
assert rows == []
def test_ps_write_open_accepts_valid_entry(monkeypatch):
rows = []
monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row)))
t = _mk_trader()
ok = t._ps_write_open("T-v", {
"asset": "STXUSDT", "side": "SHORT", "entry_price": 1.0,
"quantity": 1000.0, "leverage": 2.0, "entry_ts": 777,
})
assert ok is True
assert len(rows) == 1
tbl, row = rows[0]
assert tbl == "position_state"
assert row["status"] == "OPEN"
assert row["quantity"] > 0 and row["notional"] > 0
assert row["ts"] == 777 and row["bars_held"] == 0
# ── 2+3: retract remainder goes through the gate; thresholds aligned ────────
def test_partial_retract_open_row_never_zero_sized(monkeypatch):
"""Sweep fractions: every surviving OPEN row must satisfy the invariant."""
for fraction in (0.1, 0.5, 0.9, 0.99, 0.999):
rows = []
monkeypatch.setattr(mod, "ch_put", lambda tbl, row, _r=rows: _r.append((tbl, row)))
monkeypatch.setattr(mod, "_ch_ts_us", lambda: 123)
t = _mk_trader()
forced = _run_retract(t, rows, fraction=fraction)
open_rows = [r for tbl, r in rows
if tbl == "position_state" and r.get("status") == "OPEN"]
if forced is None:
assert len(open_rows) == 1, f"fraction={fraction}"
assert open_rows[0]["quantity"] > 0.0
assert open_rows[0]["notional"] > mod.POSITION_DUST_NOTIONAL_USD
else:
assert open_rows == [], f"fraction={fraction} full-close wrote OPEN row"
def test_dust_remainder_is_full_close(monkeypatch):
"""$1000 × 0.99999 retract leaves $0.01 — at/below dust ⇒ FULL_CLOSE,
no OPEN snapshot, forced exit carries capital_already_realized."""
rows = []
monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row)))
monkeypatch.setattr(mod, "_ch_ts_us", lambda: 123)
t = _mk_trader()
forced = _run_retract(t, rows, fraction=0.99999)
assert forced is not None
assert forced.get("capital_already_realized") is True
assert t.eng.position is None
open_rows = [r for tbl, r in rows
if tbl == "position_state" and r.get("status") == "OPEN"]
assert open_rows == []
def test_full_retract_writes_terminal_leg_rows(monkeypatch):
"""fraction=1.0: terminal leg MUST appear in trade_exit_legs and
trade_reconstruction (FULL_RETRACT_EXIT) — the §38.9 replay surface."""
rows = []
monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row)))
monkeypatch.setattr(mod, "_ch_ts_us", lambda: 123)
t = _mk_trader()
forced = _run_retract(t, rows, fraction=1.0, price=0.95)
assert forced is not None and forced["reason"] == "HOTKEY_RETRACT"
legs = [r for tbl, r in rows if tbl == "trade_exit_legs"]
assert len(legs) == 1
assert legs[0]["exit_seq"] == 1
assert legs[0]["remaining_notional"] <= mod.POSITION_DUST_NOTIONAL_USD
recon = [r for tbl, r in rows if tbl == "trade_reconstruction"]
assert any(r["event_type"] == "FULL_RETRACT_EXIT" for r in recon)
open_rows = [r for tbl, r in rows
if tbl == "position_state" and r.get("status") == "OPEN"]
assert open_rows == []
# capital realized exactly once for the leg: 5% on $1000 short = +$50
assert pytest.approx(t.eng.capital, abs=1e-6) == 25_050.0
assert pytest.approx(forced["net_pnl"], abs=1e-6) == 50.0
def test_partial_then_full_chain_keeps_all_legs(monkeypatch):
"""Two-leg chain: every leg lands in trade_exit_legs; totals coherent."""
rows = []
monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row)))
monkeypatch.setattr(mod, "_ch_ts_us", lambda: 123)
t = _mk_trader()
pos = _Pos("T-1", "STXUSDT", 1.0, 1000.0, current_price=0.95)
t.eng = _Eng(pos)
_seed_pending(t, "T-1", notional=1000.0)
cmd1 = _retract_cmd(t, "T-1", command_id="c-a", fraction=0.5)
t.control_map.put("blue_runtime_commands", json.dumps([cmd1]))
assert t._process_runtime_commands({"STXUSDT": 0.95}) is None
cmd2 = _retract_cmd(t, "T-1", command_id="c-b", fraction=1.0)
t.control_map.put("blue_runtime_commands", json.dumps([cmd2]))
forced = t._process_runtime_commands({"STXUSDT": 0.95})
assert forced is not None
legs = [r for tbl, r in rows if tbl == "trade_exit_legs"]
assert [l["exit_seq"] for l in legs] == [1, 2]
# both legs at +5%: 0.05*500 + 0.05*500 = 50 total
assert pytest.approx(forced["net_pnl"], abs=1e-6) == 50.0
assert pytest.approx(legs[0]["pnl_leg"] + legs[1]["pnl_leg"], abs=1e-6) == 50.0
# ── 5: reject-exhaustion halts instead of silently trading flat ─────────────
def test_restore_reject_exhaustion_marks_failure(monkeypatch):
"""CH returns a candidate with invalid leverage; HZ has neither a
position nor flat-proof ⇒ _restore_failed must be set (halt)."""
monkeypatch.setattr(mod, "ch_put", lambda *a, **k: None)
class _Resp:
def __init__(self, body: bytes):
self._b = body
def read(self):
return self._b
def __enter__(self):
return self
def __exit__(self, *a):
return False
# leverage = 0 → invalid candidate; fresh ts to pass staleness
# columns: trade_id asset direction entry_price quantity notional
# leverage bucket_id bars_held last_ts (10 fields)
import datetime as _dt
ts = _dt.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
row = f"T-bad\tXTZUSDT\t-1\t0.2276\t1000\t227.6\t0\t0\t14\t{ts}"
import urllib.request as _ur
monkeypatch.setattr(_ur, "urlopen", lambda *a, **k: _Resp(row.encode()))
t = _mk_trader()
t.eng = _Eng(None)
t._restore_state_snapshots = {}
t._parse_timestamp_seconds = mod.DolphinLiveTrader._parse_timestamp_seconds.__get__(t)
t._restore_position_state()
assert t._restore_failed is True
assert "leverage" in t._restore_failure_reason
# ── 6: chain mismatch is a queryable journal event ───────────────────────────
def test_chain_token_mismatch_emits_journal_event(monkeypatch):
rows = []
monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row)))
monkeypatch.setattr(mod, "_ch_ts_us", lambda: 123)
t = _mk_trader()
pending = {
"asset": "XTZUSDT", "side": "SHORT", "entry_price": 0.2276,
"quantity": 1000.0, "notional": 227.6, "notional_entry": 227.6,
"leverage": 2.0, "entry_bar": 0,
"retraction_legs": 0, "realized_pnl_legs_total": 0.0,
}
recon = {"chain_token": "deadbeef" * 8, "chain_seq": 0,
"chain_head_leg_id": "T-x:open", "chain_mode": "LIVE"}
chain = t._chain_state_from_reconstruction("T-x", pending, recon)
assert chain["chain_mode"] == "LEGACY_REBUILT_MISMATCH"
mismatch_rows = [r for tbl, r in rows
if tbl == "trade_reconstruction"
and r.get("event_type") == "CHAIN_TOKEN_MISMATCH"]
assert len(mismatch_rows) == 1
payload = json.loads(mismatch_rows[0]["payload_json"])
assert payload["stored_token"].startswith("deadbeef")
assert payload["derived_token"] != payload["stored_token"]
def test_chain_token_match_emits_nothing(monkeypatch):
rows = []
monkeypatch.setattr(mod, "ch_put", lambda tbl, row: rows.append((tbl, row)))
t = _mk_trader()
pending = {
"asset": "XTZUSDT", "side": "SHORT", "entry_price": 0.2276,
"quantity": 1000.0, "notional": 227.6, "notional_entry": 227.6,
"leverage": 2.0, "entry_bar": 0,
"retraction_legs": 0, "realized_pnl_legs_total": 0.0,
}
# derive the true token with the SAME chain parameters the recon will
# carry (mode LIVE), then feed it back as the stored token
expected = t._chain_state_for_pending(
"T-y", pending, chain_mode="LIVE",
chain_head_leg_id="T-y:open", chain_prev_leg_id="", chain_seq=0,
)
recon = {"chain_token": expected["chain_token"], "chain_seq": 0,
"chain_head_leg_id": "T-y:open", "chain_mode": "LIVE"}
chain = t._chain_state_from_reconstruction("T-y", pending, recon)
assert chain["chain_mode"] != "LEGACY_REBUILT_MISMATCH"
assert [r for tbl, r in rows
if r.get("event_type") == "CHAIN_TOKEN_MISMATCH"] == []
# ── 7: bars_held continuity across restore ──────────────────────────────────
def test_restored_entry_bar_preserves_bars_held(monkeypatch):
"""boot bar_idx=0, stored_bars=34 ⇒ entry_bar=-34 ⇒ bars_held resumes
at 34 immediately (the XTZ bars_held≈0 / MAX_HOLD-reset fix)."""
captured = {}
monkeypatch.setattr(mod, "ch_put", lambda *a, **k: None)
class _Resp:
def __init__(self, body: bytes):
self._b = body
def read(self):
return self._b
def __enter__(self):
return self
def __exit__(self, *a):
return False
import datetime as _dt
ts = _dt.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
row = f"T-c\tXTZUSDT\t-1\t0.2276\t248173\t56484.4\t6.37\t0\t34\t{ts}"
import urllib.request as _ur
monkeypatch.setattr(_ur, "urlopen", lambda *a, **k: _Resp(row.encode()))
class _RecordingExitMgr(_ExitMgr):
def setup_position(self, trade_id, entry_price, direction, entry_bar,
**kw):
captured["entry_bar"] = entry_bar
self._positions[trade_id] = {"entry_bar": entry_bar}
t = _mk_trader()
t.bar_idx = 0 # fresh boot
t.eng = _Eng(None)
t.eng.exit_manager = _RecordingExitMgr()
t._restore_state_snapshots = {}
t._parse_timestamp_seconds = mod.DolphinLiveTrader._parse_timestamp_seconds.__get__(t)
t._load_chain_ledger_state = lambda _tid: None
t._v7_exit_engine = None
t._seed_posture_for_restored_position = lambda: None
t._apply_catastrophic_floor_to_open_position = lambda: None
t._restore_position_state()
assert t._restore_failed is False
assert t.eng.position is not None
assert captured["entry_bar"] == -34
# bars_held = current_bar - entry_bar = 0 - (-34) = 34 → clock continues
assert (t.bar_idx - captured["entry_bar"]) == 34
if __name__ == "__main__":
import sys
sys.exit(pytest.main([__file__, "-v"]))