diff --git a/prod/clean_arch/runtime/pink_direct.py b/prod/clean_arch/runtime/pink_direct.py index d484ec9..2b97e7a 100644 --- a/prod/clean_arch/runtime/pink_direct.py +++ b/prod/clean_arch/runtime/pink_direct.py @@ -12,7 +12,8 @@ from __future__ import annotations import inspect import logging -from dataclasses import dataclass +import math +from dataclasses import dataclass, replace from datetime import datetime, timezone from types import SimpleNamespace from typing import Any, Callable, Optional @@ -68,6 +69,13 @@ def _slot_to_position_dict(slot) -> dict[str, Any]: } +# Industry-smallest sane quote price. notional (capital × fraction × leverage) +# is self-limiting; the only unbounded step is size = notional / price, which +# overflows to inf as price -> 0. Any real perp quote is far above this floor, +# so a price below it (or non-finite) signals corrupt market data, not a trade. +_MIN_SANE_PRICE = 1e-8 + + def _decision_to_kernel_intent( decision: Decision, intent: Intent, @@ -365,6 +373,52 @@ class PinkDirectRuntime: ) return len(applied) + def _unsafe_entry_reason(self, kernel_intent: KernelIntent, context: Any) -> Optional[str]: + """Return why an ENTER's sizing inputs are unsafe, or None if sound. + + notional = capital × fraction × leverage is self-limiting; the only way + size = notional/price goes non-finite is a corrupt raw input. We reject + the OPEN (not clamp) because a corrupt sizing input is an untrustworthy + signal — better to skip the trade than open on bad math. + """ + cap = float(getattr(context, "capital", 0.0) or 0.0) + price = float(getattr(kernel_intent, "reference_price", 0.0) or 0.0) + lev = float(getattr(kernel_intent, "leverage", 0.0) or 0.0) + size = float(getattr(kernel_intent, "target_size", 0.0) or 0.0) + if not math.isfinite(cap) or cap <= 0.0: + return f"non-finite/non-positive capital={cap!r}" + if not math.isfinite(price) or price < _MIN_SANE_PRICE: + return f"price below sane floor or non-finite price={price!r} (floor={_MIN_SANE_PRICE:g})" + if not math.isfinite(lev) or lev <= 0.0: + return f"non-finite/non-positive leverage={lev!r}" + if not math.isfinite(size) or size <= 0.0: + return f"non-finite/non-positive size={size!r}" + return None + + def _exit_intent_from_slot(self, kernel_intent: KernelIntent) -> KernelIntent: + """Size an EXIT from the kernel's authoritative slot accounting. + + The close quantity is the real remaining position size (capped to it), + never an externally-computed value — so a malformed policy size can + neither strand a position (refuse to close) nor overshoot it. A + non-finite policy size falls back to the full remaining size. + """ + try: + slot_size = float(self.kernel.slot(int(kernel_intent.slot_id)).size or 0.0) + except Exception: + slot_size = 0.0 + policy_size = float(getattr(kernel_intent, "target_size", 0.0) or 0.0) + policy_ok = math.isfinite(policy_size) and policy_size > 0.0 + if slot_size > 0.0: + # Authoritative remaining size known: cap the close to it (and fall + # back to the full remaining if the policy size is malformed). + exit_size = min(policy_size, slot_size) if policy_ok else slot_size + else: + # Kernel reports no/unknown remaining size: trust the policy size + # (the kernel rejects NO_OPEN_POSITION if there is genuinely none). + exit_size = policy_size if policy_ok else 0.0 + return replace(kernel_intent, target_size=exit_size) + async def step(self, snapshot: MarketSnapshot) -> Decision: """Single policy + execution cycle. @@ -425,6 +479,41 @@ class PinkDirectRuntime: if decision.action in {DecisionAction.ENTER, DecisionAction.EXIT}: kernel_intent = _decision_to_kernel_intent(decision, intent, slot_id=0) + + if decision.action == DecisionAction.ENTER: + # Source guard: notional (capital×fraction×leverage) is self- + # limiting, so a non-finite size can only come from corrupt raw + # inputs — a non-finite capital, or a price below the industry + # floor that overflows size = notional/price. A corrupt sizing + # input is an untrustworthy signal: do NOT open (exits are never + # suppressed — they size from slot accounting below). + unsafe = self._unsafe_entry_reason(kernel_intent, context) + if unsafe is not None: + self.logger.error( + "ENTER suppressed (%s): price=%r capital=%r size=%r leverage=%r " + "floor=%g asset=%s", + unsafe, getattr(kernel_intent, "reference_price", None), context.capital, + getattr(kernel_intent, "target_size", None), + getattr(kernel_intent, "leverage", None), _MIN_SANE_PRICE, intent.asset, + ) + sp = float(getattr(snapshot, "price", 0.0) or 0.0) + if math.isfinite(sp) and sp >= _MIN_SANE_PRICE: + self.kernel.mark_price(snapshot.symbol, sp) + slot_dict = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} + acc = self.kernel.snapshot()["account"] + if self.persistence is not None: + self.persistence.persist_step( + snapshot=snapshot, decision=decision, intent=intent, outcome=None, + slot_dict=slot_dict, acc_dict=acc, phase="entry_suppressed", + market_state=market_state, + ) + return decision + else: + # EXIT: size the close from the kernel's authoritative slot + # accounting so a malformed policy size can never strand or + # overshoot an open position. + kernel_intent = self._exit_intent_from_slot(kernel_intent) + outcome = self.kernel.process_intent(kernel_intent) # Locate the source of any non-finite intent the kernel rejected: diff --git a/prod/tests/test_pink_sizing_guards.py b/prod/tests/test_pink_sizing_guards.py new file mode 100644 index 0000000..e2f96e2 --- /dev/null +++ b/prod/tests/test_pink_sizing_guards.py @@ -0,0 +1,103 @@ +"""Source-level sizing guards in the PINK algo runner (pink_direct). + +notional = capital × fraction × leverage is self-limiting (no division); the +only non-finite ingress is a corrupt raw input feeding size = notional / price. +So: +- ENTER: a non-finite capital or a price below the industry-smallest-price floor + is an untrustworthy signal -> suppress the OPEN (never trade on bad math). +- EXIT: size the close from the kernel's authoritative slot accounting, so a + malformed policy size can neither strand a position nor overshoot it. +""" + +from __future__ import annotations + +import asyncio +from dataclasses import replace +from datetime import datetime, timezone + +from prod.clean_arch.dita_v2 import ( + ExecutionKernel, InMemoryControlPlane, KernelCommandType, KernelControlSnapshot, + KernelMode, KernelVerbosity, MemoryKernelJournal, MockVenueAdapter, MockVenueScenario, + TradeSide, +) +from prod.clean_arch.dita_v2.contracts import KernelIntent +from prod.clean_arch.dita import DecisionAction, DecisionConfig, DecisionEngine, IntentEngine +from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime, _MIN_SANE_PRICE +from prod.clean_arch.ports.data_feed import DataFeedPort, MarketSnapshot + + +class _StubFeed(DataFeedPort): + async def connect(self): return True + async def disconnect(self): pass + async def get_latest_snapshot(self, symbol): return None + async def subscribe_snapshots(self, callback): pass + async def get_acb_update(self): return None + def get_latency_ms(self): return 0.0 + def health_check(self): return True + + +def _runtime(capital: float): + kernel = ExecutionKernel( + control_plane=InMemoryControlPlane( + KernelControlSnapshot(mode=KernelMode.DEBUG, verbosity=KernelVerbosity.TRACE) + ), + venue=MockVenueAdapter(MockVenueScenario(emit_fill_on_submit=True, partial_fill_ratio=1.0)), + journal=MemoryKernelJournal(), + ) + kernel.account.snapshot.capital = capital + kernel.account.snapshot.peak_capital = capital if capital == capital else 25000.0 + kernel.account.snapshot.equity = capital + cfg = DecisionConfig() # capital_fraction=0.20, allow_short=True, max_leverage=5 + runtime = PinkDirectRuntime( + data_feed=_StubFeed(), kernel=kernel, + decision_engine=DecisionEngine(cfg), intent_engine=IntentEngine(cfg), + persistence=None, market_state_runtime=None, + ) + return runtime, kernel + + +def _enter_snap(price: float) -> MarketSnapshot: + return MarketSnapshot( + timestamp=datetime.now(timezone.utc), symbol="BTCUSDT", price=price, + bid=price * 0.999, ask=price * 1.001, eigenvalues=[1.0], + velocity_divergence=-0.05, irp_alignment=0.5, scan_number=1, source="test", + ) + + +def test_enter_suppressed_on_nonfinite_capital(): + runtime, kernel = _runtime(float("inf")) + decision = asyncio.run(runtime.step(_enter_snap(100.0))) + assert decision.action == DecisionAction.ENTER # policy decided to enter + assert kernel.slot(0).is_free(), "ENTER must be suppressed on non-finite capital" + + +def test_enter_suppressed_on_subfloor_price(): + runtime, kernel = _runtime(25_000.0) + decision = asyncio.run(runtime.step(_enter_snap(_MIN_SANE_PRICE / 100.0))) + assert kernel.slot(0).is_free(), "ENTER must be suppressed on a sub-floor price" + + +def test_enter_proceeds_on_sane_inputs(): + runtime, kernel = _runtime(25_000.0) + decision = asyncio.run(runtime.step(_enter_snap(100.0))) + assert decision.action == DecisionAction.ENTER + assert not kernel.slot(0).is_free(), "sane ENTER must open a position" + + +def test_exit_sizes_from_kernel_slot_accounting(): + runtime, kernel = _runtime(25_000.0) + asyncio.run(runtime.step(_enter_snap(100.0))) # open a position + slot_size = float(kernel.slot(0).size) + assert slot_size > 0.0 + + base = KernelIntent( + timestamp=datetime.now(timezone.utc), intent_id="x", trade_id=kernel.slot(0).trade_id, + slot_id=0, asset="BTCUSDT", side=TradeSide.SHORT, action=KernelCommandType.EXIT, + reference_price=100.0, target_size=999.0, leverage=1.0, exit_leg_ratios=(1.0,), reason="X", + ) + # Oversized policy size -> capped to the real remaining size. + assert abs(runtime._exit_intent_from_slot(base).target_size - slot_size) < 1e-9 + # Non-finite policy size -> full remaining size (never strands). + assert abs(runtime._exit_intent_from_slot(replace(base, target_size=float("inf"))).target_size - slot_size) < 1e-9 + # Valid partial -> respected. + assert abs(runtime._exit_intent_from_slot(replace(base, target_size=slot_size * 0.5)).target_size - slot_size * 0.5) < 1e-9