Files
siloqy/prod/tests/test_multi_exit_retraction_contract.py

314 lines
11 KiB
Python
Raw Normal View History

from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass, field
from typing import Dict, List
import math
import random
import pytest
EPS = 1e-9
@dataclass
class ExitLeg:
trade_id: str
chain_root_trade_id: str
exit_seq: int
exit_leg_id: str
chain_prev_leg_id: str
chain_head_leg_id: str
command_id: str
fraction: float
qty: float
exit_price: float
fee: float
net_pnl: float
remaining_after: float
reason: str
chain_token: str
@dataclass
class ParentTrade:
trade_id: str
side: str # SHORT | LONG
entry_price: float
entry_qty: float
remaining_qty: float
realized_pnl_total: float = 0.0
realized_fees_total: float = 0.0
exit_seq: int = 0
status: str = "OPEN" # OPEN | PARTIALLY_CLOSED | CLOSED
version: int = 0
legs: List[ExitLeg] = field(default_factory=list)
chain_root_trade_id: str = ""
chain_head_leg_id: str = ""
chain_prev_leg_id: str = ""
chain_token: str = ""
def _chain_token(payload: dict) -> str:
return hashlib.sha256(json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str).encode()).hexdigest()
class MiniRetractionRuntime:
"""
Contract-reference runtime:
- all partial exits route through one handler
- idempotent by command_id
- financial accumulation by executed leg
"""
def __init__(self, fee_rate: float = 0.00055):
self.fee_rate = fee_rate
self.capital = 25_000.0
self.trades: Dict[str, ParentTrade] = {}
self.applied_commands: Dict[str, ExitLeg] = {}
def open_trade(self, trade_id: str, side: str, entry_price: float, qty: float) -> None:
assert trade_id not in self.trades
t = ParentTrade(
trade_id=trade_id,
side=side,
entry_price=entry_price,
entry_qty=qty,
remaining_qty=qty,
)
t.chain_root_trade_id = trade_id
t.chain_head_leg_id = f"{trade_id}:open"
t.chain_prev_leg_id = ""
t.chain_token = _chain_token({
"trade_id": trade_id,
"chain_root_trade_id": trade_id,
"chain_head_leg_id": t.chain_head_leg_id,
"chain_prev_leg_id": "",
"chain_seq": 0,
"side": side,
"entry_price": entry_price,
"entry_qty": qty,
"remaining_qty": qty,
"realized_pnl_total": 0.0,
"realized_fees_total": 0.0,
})
self.trades[trade_id] = t
def retract(
self,
trade_id: str,
*,
command_id: str,
fraction: float,
exit_price: float,
reason: str,
) -> ExitLeg | None:
if command_id in self.applied_commands:
return self.applied_commands[command_id]
if not (0 < fraction <= 1.0):
return None
t = self.trades.get(trade_id)
if not t or t.status == "CLOSED":
return None
expected = _chain_token({
"trade_id": t.trade_id,
"chain_root_trade_id": t.chain_root_trade_id or t.trade_id,
"chain_head_leg_id": t.chain_head_leg_id or f"{t.trade_id}:open",
"chain_prev_leg_id": t.chain_prev_leg_id or "",
"chain_seq": t.exit_seq,
"side": t.side,
"entry_price": t.entry_price,
"entry_qty": t.entry_qty,
"remaining_qty": t.remaining_qty,
"realized_pnl_total": t.realized_pnl_total,
"realized_fees_total": t.realized_fees_total,
})
if t.chain_token and t.chain_token != expected:
return None
requested_qty = t.remaining_qty * fraction
qty = min(max(requested_qty, 0.0), t.remaining_qty)
if qty <= EPS:
return None
sign = -1.0 if t.side == "SHORT" else 1.0
gross = sign * (exit_price - t.entry_price) * qty
fee = self.fee_rate * (t.entry_price * qty + exit_price * qty)
net = gross - fee
t.exit_seq += 1
t.version += 1
t.remaining_qty = max(0.0, t.remaining_qty - qty)
t.realized_pnl_total += net
t.realized_fees_total += fee
self.capital += net
t.status = "CLOSED" if t.remaining_qty <= EPS else "PARTIALLY_CLOSED"
prev_head = t.chain_head_leg_id or f"{t.trade_id}:open"
t.chain_prev_leg_id = prev_head
t.chain_head_leg_id = f"{t.trade_id}:x{t.exit_seq:03d}"
t.chain_token = _chain_token({
"trade_id": t.trade_id,
"chain_root_trade_id": t.chain_root_trade_id or t.trade_id,
"chain_head_leg_id": t.chain_head_leg_id,
"chain_prev_leg_id": prev_head,
"chain_seq": t.exit_seq,
"side": t.side,
"entry_price": t.entry_price,
"entry_qty": t.entry_qty,
"remaining_qty": t.remaining_qty,
"realized_pnl_total": t.realized_pnl_total,
"realized_fees_total": t.realized_fees_total,
})
leg = ExitLeg(
trade_id=t.trade_id,
chain_root_trade_id=t.chain_root_trade_id or t.trade_id,
exit_seq=t.exit_seq,
exit_leg_id=f"{t.trade_id}:x{t.exit_seq:03d}",
chain_prev_leg_id=prev_head,
chain_head_leg_id=t.chain_head_leg_id,
command_id=command_id,
fraction=fraction,
qty=qty,
exit_price=exit_price,
fee=fee,
net_pnl=net,
remaining_after=t.remaining_qty,
reason=reason,
chain_token=t.chain_token,
)
t.legs.append(leg)
self.applied_commands[command_id] = leg
return leg
def _assert_parent_invariants(t: ParentTrade) -> None:
total_qty = sum(l.qty for l in t.legs)
assert total_qty <= t.entry_qty + EPS
assert math.isclose(t.remaining_qty, max(0.0, t.entry_qty - total_qty), abs_tol=1e-8)
assert math.isclose(t.realized_pnl_total, sum(l.net_pnl for l in t.legs), rel_tol=0, abs_tol=1e-8)
assert math.isclose(t.realized_fees_total, sum(l.fee for l in t.legs), rel_tol=0, abs_tol=1e-8)
if t.legs:
assert t.chain_head_leg_id == t.legs[-1].exit_leg_id
assert t.chain_token == t.legs[-1].chain_token
if t.remaining_qty <= EPS:
assert t.status == "CLOSED"
elif t.legs:
assert t.status == "PARTIALLY_CLOSED"
else:
assert t.status == "OPEN"
def test_retract_default_half_then_close_preserves_lineage_and_math() -> None:
rt = MiniRetractionRuntime()
rt.open_trade("T1", "SHORT", 100.0, 10.0)
l1 = rt.retract("T1", command_id="c1", fraction=0.5, exit_price=99.0, reason="HOTKEY_RETRACT")
assert l1 is not None
assert l1.exit_leg_id == "T1:x001"
assert l1.qty == 5.0
assert l1.chain_prev_leg_id == "T1:open"
assert l1.chain_head_leg_id == "T1:x001"
l2 = rt.retract("T1", command_id="c2", fraction=1.0, exit_price=98.5, reason="HOTKEY_RETRACT")
assert l2 is not None
assert l2.exit_leg_id == "T1:x002"
assert l2.qty == 5.0
assert l2.chain_prev_leg_id == "T1:x001"
assert l2.chain_head_leg_id == "T1:x002"
t = rt.trades["T1"]
_assert_parent_invariants(t)
assert t.status == "CLOSED"
assert t.exit_seq == 2
def test_idempotent_command_does_not_double_execute() -> None:
rt = MiniRetractionRuntime()
rt.open_trade("T2", "SHORT", 50.0, 20.0)
first = rt.retract("T2", command_id="dup", fraction=0.5, exit_price=49.8, reason="V7_RETRACT")
second = rt.retract("T2", command_id="dup", fraction=0.5, exit_price=49.7, reason="V7_RETRACT")
assert first is not None
assert second is not None
assert first.exit_leg_id == second.exit_leg_id
assert len(rt.trades["T2"].legs) == 1
_assert_parent_invariants(rt.trades["T2"])
def test_invalid_fraction_rejected() -> None:
rt = MiniRetractionRuntime()
rt.open_trade("T3", "SHORT", 10.0, 10.0)
assert rt.retract("T3", command_id="a", fraction=0.0, exit_price=9.9, reason="HOTKEY_RETRACT") is None
assert rt.retract("T3", command_id="b", fraction=1.5, exit_price=9.9, reason="HOTKEY_RETRACT") is None
assert len(rt.trades["T3"].legs) == 0
_assert_parent_invariants(rt.trades["T3"])
def test_long_side_accounting_sign_is_correct() -> None:
rt = MiniRetractionRuntime()
rt.open_trade("T4", "LONG", 100.0, 2.0)
leg = rt.retract("T4", command_id="c", fraction=1.0, exit_price=101.0, reason="HOTKEY_RETRACT")
assert leg is not None
assert leg.net_pnl > 0
_assert_parent_invariants(rt.trades["T4"])
def test_over_many_random_partial_exits_invariants_hold() -> None:
rng = random.Random(1337)
rt = MiniRetractionRuntime()
rt.open_trade("T5", "SHORT", 120.0, 30.0)
for i in range(200):
t = rt.trades["T5"]
if t.status == "CLOSED":
break
# Biased toward smaller retractions; occasionally force full close
frac = 1.0 if i % 37 == 0 else max(0.01, min(0.99, rng.random() * 0.6))
px = 120.0 - rng.random() * 2.0
rt.retract("T5", command_id=f"cmd-{i}", fraction=frac, exit_price=px, reason="V7_RETRACT")
_assert_parent_invariants(rt.trades["T5"])
t = rt.trades["T5"]
# Must eventually close under forced 1.0 fractions
assert t.status == "CLOSED"
_assert_parent_invariants(t)
def test_capital_updates_are_leg_immediate() -> None:
rt = MiniRetractionRuntime()
start = rt.capital
rt.open_trade("T6", "SHORT", 200.0, 4.0)
l1 = rt.retract("T6", command_id="r1", fraction=0.5, exit_price=199.0, reason="HOTKEY_RETRACT")
assert l1 is not None
mid = rt.capital
assert not math.isclose(mid, start, abs_tol=1e-12)
l2 = rt.retract("T6", command_id="r2", fraction=1.0, exit_price=198.5, reason="HOTKEY_RETRACT")
assert l2 is not None
assert math.isclose(rt.capital, start + l1.net_pnl + l2.net_pnl, rel_tol=0, abs_tol=1e-8)
def test_command_on_closed_trade_is_noop() -> None:
rt = MiniRetractionRuntime()
rt.open_trade("T7", "SHORT", 100.0, 1.0)
rt.retract("T7", command_id="x1", fraction=1.0, exit_price=99.7, reason="HOTKEY_RETRACT")
t = rt.trades["T7"]
assert t.status == "CLOSED"
n = len(t.legs)
out = rt.retract("T7", command_id="x2", fraction=0.5, exit_price=99.6, reason="HOTKEY_RETRACT")
assert out is None
assert len(t.legs) == n
_assert_parent_invariants(t)
def test_tampered_chain_head_is_rejected() -> None:
rt = MiniRetractionRuntime()
rt.open_trade("T8", "SHORT", 75.0, 8.0)
first = rt.retract("T8", command_id="c1", fraction=0.5, exit_price=74.5, reason="HOTKEY_RETRACT")
assert first is not None
rt.trades["T8"].chain_head_leg_id = "T8:x999"
second = rt.retract("T8", command_id="c2", fraction=0.5, exit_price=74.0, reason="HOTKEY_RETRACT")
assert second is None
with pytest.raises(AssertionError):
_assert_parent_invariants(rt.trades["T8"])