from __future__ import annotations import hashlib import json from dataclasses import dataclass, field from typing import Dict, List import math import random import pytest EPS = 1e-9 @dataclass class ExitLeg: trade_id: str chain_root_trade_id: str exit_seq: int exit_leg_id: str chain_prev_leg_id: str chain_head_leg_id: str command_id: str fraction: float qty: float exit_price: float fee: float net_pnl: float remaining_after: float reason: str chain_token: str @dataclass class ParentTrade: trade_id: str side: str # SHORT | LONG entry_price: float entry_qty: float remaining_qty: float realized_pnl_total: float = 0.0 realized_fees_total: float = 0.0 exit_seq: int = 0 status: str = "OPEN" # OPEN | PARTIALLY_CLOSED | CLOSED version: int = 0 legs: List[ExitLeg] = field(default_factory=list) chain_root_trade_id: str = "" chain_head_leg_id: str = "" chain_prev_leg_id: str = "" chain_token: str = "" def _chain_token(payload: dict) -> str: return hashlib.sha256(json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str).encode()).hexdigest() class MiniRetractionRuntime: """ Contract-reference runtime: - all partial exits route through one handler - idempotent by command_id - financial accumulation by executed leg """ def __init__(self, fee_rate: float = 0.00055): self.fee_rate = fee_rate self.capital = 25_000.0 self.trades: Dict[str, ParentTrade] = {} self.applied_commands: Dict[str, ExitLeg] = {} def open_trade(self, trade_id: str, side: str, entry_price: float, qty: float) -> None: assert trade_id not in self.trades t = ParentTrade( trade_id=trade_id, side=side, entry_price=entry_price, entry_qty=qty, remaining_qty=qty, ) t.chain_root_trade_id = trade_id t.chain_head_leg_id = f"{trade_id}:open" t.chain_prev_leg_id = "" t.chain_token = _chain_token({ "trade_id": trade_id, "chain_root_trade_id": trade_id, "chain_head_leg_id": t.chain_head_leg_id, "chain_prev_leg_id": "", "chain_seq": 0, "side": side, "entry_price": entry_price, "entry_qty": qty, "remaining_qty": qty, "realized_pnl_total": 0.0, "realized_fees_total": 0.0, }) self.trades[trade_id] = t def retract( self, trade_id: str, *, command_id: str, fraction: float, exit_price: float, reason: str, ) -> ExitLeg | None: if command_id in self.applied_commands: return self.applied_commands[command_id] if not (0 < fraction <= 1.0): return None t = self.trades.get(trade_id) if not t or t.status == "CLOSED": return None expected = _chain_token({ "trade_id": t.trade_id, "chain_root_trade_id": t.chain_root_trade_id or t.trade_id, "chain_head_leg_id": t.chain_head_leg_id or f"{t.trade_id}:open", "chain_prev_leg_id": t.chain_prev_leg_id or "", "chain_seq": t.exit_seq, "side": t.side, "entry_price": t.entry_price, "entry_qty": t.entry_qty, "remaining_qty": t.remaining_qty, "realized_pnl_total": t.realized_pnl_total, "realized_fees_total": t.realized_fees_total, }) if t.chain_token and t.chain_token != expected: return None requested_qty = t.remaining_qty * fraction qty = min(max(requested_qty, 0.0), t.remaining_qty) if qty <= EPS: return None sign = -1.0 if t.side == "SHORT" else 1.0 gross = sign * (exit_price - t.entry_price) * qty fee = self.fee_rate * (t.entry_price * qty + exit_price * qty) net = gross - fee t.exit_seq += 1 t.version += 1 t.remaining_qty = max(0.0, t.remaining_qty - qty) t.realized_pnl_total += net t.realized_fees_total += fee self.capital += net t.status = "CLOSED" if t.remaining_qty <= EPS else "PARTIALLY_CLOSED" prev_head = t.chain_head_leg_id or f"{t.trade_id}:open" t.chain_prev_leg_id = prev_head t.chain_head_leg_id = f"{t.trade_id}:x{t.exit_seq:03d}" t.chain_token = _chain_token({ "trade_id": t.trade_id, "chain_root_trade_id": t.chain_root_trade_id or t.trade_id, "chain_head_leg_id": t.chain_head_leg_id, "chain_prev_leg_id": prev_head, "chain_seq": t.exit_seq, "side": t.side, "entry_price": t.entry_price, "entry_qty": t.entry_qty, "remaining_qty": t.remaining_qty, "realized_pnl_total": t.realized_pnl_total, "realized_fees_total": t.realized_fees_total, }) leg = ExitLeg( trade_id=t.trade_id, chain_root_trade_id=t.chain_root_trade_id or t.trade_id, exit_seq=t.exit_seq, exit_leg_id=f"{t.trade_id}:x{t.exit_seq:03d}", chain_prev_leg_id=prev_head, chain_head_leg_id=t.chain_head_leg_id, command_id=command_id, fraction=fraction, qty=qty, exit_price=exit_price, fee=fee, net_pnl=net, remaining_after=t.remaining_qty, reason=reason, chain_token=t.chain_token, ) t.legs.append(leg) self.applied_commands[command_id] = leg return leg def _assert_parent_invariants(t: ParentTrade) -> None: total_qty = sum(l.qty for l in t.legs) assert total_qty <= t.entry_qty + EPS assert math.isclose(t.remaining_qty, max(0.0, t.entry_qty - total_qty), abs_tol=1e-8) assert math.isclose(t.realized_pnl_total, sum(l.net_pnl for l in t.legs), rel_tol=0, abs_tol=1e-8) assert math.isclose(t.realized_fees_total, sum(l.fee for l in t.legs), rel_tol=0, abs_tol=1e-8) if t.legs: assert t.chain_head_leg_id == t.legs[-1].exit_leg_id assert t.chain_token == t.legs[-1].chain_token if t.remaining_qty <= EPS: assert t.status == "CLOSED" elif t.legs: assert t.status == "PARTIALLY_CLOSED" else: assert t.status == "OPEN" def test_retract_default_half_then_close_preserves_lineage_and_math() -> None: rt = MiniRetractionRuntime() rt.open_trade("T1", "SHORT", 100.0, 10.0) l1 = rt.retract("T1", command_id="c1", fraction=0.5, exit_price=99.0, reason="HOTKEY_RETRACT") assert l1 is not None assert l1.exit_leg_id == "T1:x001" assert l1.qty == 5.0 assert l1.chain_prev_leg_id == "T1:open" assert l1.chain_head_leg_id == "T1:x001" l2 = rt.retract("T1", command_id="c2", fraction=1.0, exit_price=98.5, reason="HOTKEY_RETRACT") assert l2 is not None assert l2.exit_leg_id == "T1:x002" assert l2.qty == 5.0 assert l2.chain_prev_leg_id == "T1:x001" assert l2.chain_head_leg_id == "T1:x002" t = rt.trades["T1"] _assert_parent_invariants(t) assert t.status == "CLOSED" assert t.exit_seq == 2 def test_idempotent_command_does_not_double_execute() -> None: rt = MiniRetractionRuntime() rt.open_trade("T2", "SHORT", 50.0, 20.0) first = rt.retract("T2", command_id="dup", fraction=0.5, exit_price=49.8, reason="V7_RETRACT") second = rt.retract("T2", command_id="dup", fraction=0.5, exit_price=49.7, reason="V7_RETRACT") assert first is not None assert second is not None assert first.exit_leg_id == second.exit_leg_id assert len(rt.trades["T2"].legs) == 1 _assert_parent_invariants(rt.trades["T2"]) def test_invalid_fraction_rejected() -> None: rt = MiniRetractionRuntime() rt.open_trade("T3", "SHORT", 10.0, 10.0) assert rt.retract("T3", command_id="a", fraction=0.0, exit_price=9.9, reason="HOTKEY_RETRACT") is None assert rt.retract("T3", command_id="b", fraction=1.5, exit_price=9.9, reason="HOTKEY_RETRACT") is None assert len(rt.trades["T3"].legs) == 0 _assert_parent_invariants(rt.trades["T3"]) def test_long_side_accounting_sign_is_correct() -> None: rt = MiniRetractionRuntime() rt.open_trade("T4", "LONG", 100.0, 2.0) leg = rt.retract("T4", command_id="c", fraction=1.0, exit_price=101.0, reason="HOTKEY_RETRACT") assert leg is not None assert leg.net_pnl > 0 _assert_parent_invariants(rt.trades["T4"]) def test_over_many_random_partial_exits_invariants_hold() -> None: rng = random.Random(1337) rt = MiniRetractionRuntime() rt.open_trade("T5", "SHORT", 120.0, 30.0) for i in range(200): t = rt.trades["T5"] if t.status == "CLOSED": break # Biased toward smaller retractions; occasionally force full close frac = 1.0 if i % 37 == 0 else max(0.01, min(0.99, rng.random() * 0.6)) px = 120.0 - rng.random() * 2.0 rt.retract("T5", command_id=f"cmd-{i}", fraction=frac, exit_price=px, reason="V7_RETRACT") _assert_parent_invariants(rt.trades["T5"]) t = rt.trades["T5"] # Must eventually close under forced 1.0 fractions assert t.status == "CLOSED" _assert_parent_invariants(t) def test_capital_updates_are_leg_immediate() -> None: rt = MiniRetractionRuntime() start = rt.capital rt.open_trade("T6", "SHORT", 200.0, 4.0) l1 = rt.retract("T6", command_id="r1", fraction=0.5, exit_price=199.0, reason="HOTKEY_RETRACT") assert l1 is not None mid = rt.capital assert not math.isclose(mid, start, abs_tol=1e-12) l2 = rt.retract("T6", command_id="r2", fraction=1.0, exit_price=198.5, reason="HOTKEY_RETRACT") assert l2 is not None assert math.isclose(rt.capital, start + l1.net_pnl + l2.net_pnl, rel_tol=0, abs_tol=1e-8) def test_command_on_closed_trade_is_noop() -> None: rt = MiniRetractionRuntime() rt.open_trade("T7", "SHORT", 100.0, 1.0) rt.retract("T7", command_id="x1", fraction=1.0, exit_price=99.7, reason="HOTKEY_RETRACT") t = rt.trades["T7"] assert t.status == "CLOSED" n = len(t.legs) out = rt.retract("T7", command_id="x2", fraction=0.5, exit_price=99.6, reason="HOTKEY_RETRACT") assert out is None assert len(t.legs) == n _assert_parent_invariants(t) def test_tampered_chain_head_is_rejected() -> None: rt = MiniRetractionRuntime() rt.open_trade("T8", "SHORT", 75.0, 8.0) first = rt.retract("T8", command_id="c1", fraction=0.5, exit_price=74.5, reason="HOTKEY_RETRACT") assert first is not None rt.trades["T8"].chain_head_leg_id = "T8:x999" second = rt.retract("T8", command_id="c2", fraction=0.5, exit_price=74.0, reason="HOTKEY_RETRACT") assert second is None with pytest.raises(AssertionError): _assert_parent_invariants(rt.trades["T8"])