PINK Phase 0: FET -$5,990 fix batch — leverage-free PnL, true fill prices, reconcile baseline anchors
Defects fix (FET -$5,990 replay, 2026-06-11): - realized_pnl() and mark_price(): PnL = qty × Δprice, side-signed; no ×leverage inflation (was 3× every leg). - BingX MARKET fill events carry true fill price (avgPrice/lastFillPrice), never the order's nominal price (protective bound ±20-25% from mark, poisoned PnL to -$5,990 on a +$164 round-trip). - Fill routing by ORDER IDENTITY first, FSM state second — late entry-remainder fills during EXIT_WORKING no longer misclassify as exits. - Entry basis = VWAP across entry fills, not last fill price. - reconcile_from_slots / restore_state: re-anchor _last_settled_pnl / _slot_was_closed to adopted slot state (cross-restart double-book of carried PnL). - ACCOUNT_UPDATE with wallet_balance=0 dropped (margin-only frames no longer zero e_available_margin). - Foreign-fill skip on shared VST account (PRODGREEN collision filter). - exec_router TTL: entry-requote venue-truth gate (recent own fill + live exchange position probes prevent double-entry). - bingx_direct: openOrders fetched BEFORE positions (sequential ordering prevents dangerous tear → double-entries). - Dual-leverage translation via map_internal_conviction_to_exchange_leverage() (strategy conviction → integer at-exchange leverage, bankers rounding). - BLUE-parity alpha components wired: asset picker (IRP universe ranking) + alpha sizer (cubic-convex dynamic leverage, 0.5-8.0 range). - ch_writer: date_time_input_format=best_effort on insert URLs; flush error logging at WARNING with counter. - blue_parity.price_of(): hyphen-tolerant fallback (FET-USDT → FETUSDT). - Fill test updated to incremental filled_size semantics (BingX WS lastFilledQty). - Env-override base URLs, supervisord autorestart, per-asset DC histories, single-slot invariant, fill-attribution filter. Co-authored-by: CommandCodeBot <noreply@commandcode.ai>
This commit is contained in:
@@ -15,6 +15,7 @@ import inspect
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
import time
|
||||
from dataclasses import dataclass, field, replace
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
@@ -39,6 +40,11 @@ from prod.clean_arch.dita_v2.contracts import (
|
||||
TradeSide as DitaTradeSide,
|
||||
TradeStage,
|
||||
)
|
||||
from prod.clean_arch.dita_v2.exec_router import (
|
||||
ExecConfig,
|
||||
ExecutionRouter,
|
||||
MissAction,
|
||||
)
|
||||
from prod.clean_arch.dita_v2.rust_backend import ExecutionKernel
|
||||
from prod.clean_arch.persistence import PinkClickHousePersistence
|
||||
from prod.clean_arch.ports.data_feed import DataFeedPort, MarketSnapshot
|
||||
@@ -293,16 +299,57 @@ class PinkDirectRuntime:
|
||||
_last_vel_div: float = field(default=0.0, init=False, repr=False, compare=False)
|
||||
_last_vol_ok: bool = field(default=True, init=False, repr=False, compare=False)
|
||||
# Price history for Direction Confirmation (DC) gate — last 10 prices (5 needed for 7-bar)
|
||||
# Points at the CURRENT effective symbol's deque inside _price_histories;
|
||||
# kept as a direct alias so the DC gate (and tests) read one deque.
|
||||
_price_history: Any = field(default=None, init=False, repr=False, compare=False)
|
||||
# Per-asset DC price histories — PINK is multi-asset (BLUE-parity IRP
|
||||
# picker, 2026-06-10); a single mixed-asset deque would corrupt the gate.
|
||||
_price_histories: Any = field(default=None, init=False, repr=False, compare=False)
|
||||
# ACB boost — multiplied into intent leverage (SYSTEM BIBLE §10); default=1.0 (no-op)
|
||||
_last_acb_boost: float = field(default=1.0, init=False, repr=False, compare=False)
|
||||
# Symbols PINK has ordered this session (plus reconciled slot asset).
|
||||
# Used to attribute WS fills on the shared VST account: BingX does not
|
||||
# echo clientOrderId on WS, so symbol membership is the ownership test.
|
||||
_own_fill_symbols: Any = field(default=None, init=False, repr=False, compare=False)
|
||||
# BLUE-parity alpha components (dita_v2/blue_parity.py, 2026-06-10).
|
||||
# asset_picker: IRP universe ranking — PINK trades the SAME ~50-asset
|
||||
# universe as BLUE, not a hardcoded snapshot symbol. None = legacy
|
||||
# single-symbol behavior.
|
||||
asset_picker: Any = field(default=None, repr=False, compare=False)
|
||||
# alpha_sizer: cubic-convex dynamic leverage + alpha-layer fraction.
|
||||
# Shared with the DecisionEngine (same instance injected there); the
|
||||
# runtime feeds it vel_div per scan and trade-close PnL feedback.
|
||||
alpha_sizer: Any = field(default=None, repr=False, compare=False)
|
||||
# Sizer trade-feedback state: trade_id and capital at the last ENTER.
|
||||
_sizer_open_tid: str = field(default="", init=False, repr=False, compare=False)
|
||||
_sizer_entry_capital: float = field(default=0.0, init=False, repr=False, compare=False)
|
||||
# Execution router (maker/taker policy). Injectable for tests; built from
|
||||
# env at connect() when None. None/style=taker == legacy MARKET behavior.
|
||||
exec_router: Any = field(default=None, repr=False, compare=False)
|
||||
_exec_ttl_task: Optional[asyncio.Task] = field(default=None, init=False, repr=False, compare=False)
|
||||
# trade_id → submitted KernelIntent for working maker quotes (retry/fallback rebuild)
|
||||
_working_intents: Any = field(default=None, init=False, repr=False, compare=False)
|
||||
# Monotonic ts of the last OWN fill seen on the WS account stream. The TTL
|
||||
# handler refuses to re-quote within the hot window after any own fill —
|
||||
# REST venue reconcile lags WS fills by seconds (live double-entry 2026-06-10).
|
||||
_last_own_fill_mono: float = field(default=0.0, init=False, repr=False, compare=False)
|
||||
|
||||
async def connect(self, initial_capital: float = 25000.0) -> None:
|
||||
"""Connect data feed, venue, seed capital from exchange, start WS stream."""
|
||||
from collections import deque
|
||||
self._price_history = deque(maxlen=10)
|
||||
self._price_histories = {}
|
||||
self._own_fill_symbols = set()
|
||||
await self.data_feed.connect()
|
||||
venue = self.kernel.venue
|
||||
# Back-reference for the venue's reconcile/cancel paths (2026-06-10):
|
||||
# lets reconcile() fetch symbol-scoped fills for the slot's asset so
|
||||
# maker fills reach the FSM. Was referenced in bingx_venue but never
|
||||
# wired anywhere — silently dead until today.
|
||||
try:
|
||||
venue._kernel_ref = self.kernel
|
||||
except Exception:
|
||||
pass
|
||||
if hasattr(venue, "connect"):
|
||||
try:
|
||||
result = venue.connect()
|
||||
@@ -316,6 +363,12 @@ class PinkDirectRuntime:
|
||||
# 100K+ would cause a ~75K reconcile delta → capital_frozen=True.
|
||||
live_capital = await self._fetch_exchange_wallet_balance(initial_capital)
|
||||
_reconcile_position_slot(self.kernel, live_capital, slot_id=0)
|
||||
try:
|
||||
slot_asset = str(self.kernel.slot(0).asset or "") if self.kernel.max_slots > 0 else ""
|
||||
if slot_asset:
|
||||
self._own_fill_symbols.add(slot_asset.upper())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Seed the kernel's atomic K-account from exchange truth.
|
||||
self.kernel.set_seed_capital(live_capital)
|
||||
@@ -335,7 +388,33 @@ class PinkDirectRuntime:
|
||||
self._run_account_stream(), name="pink_account_stream"
|
||||
)
|
||||
|
||||
# Execution router: maker/taker policy layer. Built from env unless a
|
||||
# test injected one. With style=taker (default) it is a pure pass-
|
||||
# through and the TTL loop has nothing to do — legacy behavior.
|
||||
if self.exec_router is None:
|
||||
self.exec_router = ExecutionRouter(ExecConfig.from_env(), logger=self.logger)
|
||||
self._working_intents = {}
|
||||
self.logger.info("EXEC_ROUTER: style=%s entry_ttl=%.1fs exit_ttl=%.1fs "
|
||||
"miss=%s retries=%d exhaust=%s post_only=%s",
|
||||
self.exec_router.config.style,
|
||||
self.exec_router.config.entry_ttl_s,
|
||||
self.exec_router.config.exit_ttl_s,
|
||||
self.exec_router.config.entry_miss,
|
||||
self.exec_router.config.entry_retries,
|
||||
self.exec_router.config.retry_exhaust,
|
||||
self.exec_router.config.post_only)
|
||||
self._exec_ttl_task = asyncio.create_task(
|
||||
self._exec_ttl_loop(), name="pink_exec_ttl"
|
||||
)
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
if self._exec_ttl_task is not None:
|
||||
self._exec_ttl_task.cancel()
|
||||
try:
|
||||
await self._exec_ttl_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._exec_ttl_task = None
|
||||
if self._account_stream_task is not None:
|
||||
self._account_stream_task.cancel()
|
||||
try:
|
||||
@@ -506,6 +585,31 @@ class PinkDirectRuntime:
|
||||
except Exception as exc:
|
||||
self.logger.warning("Fee calibration failed: %s", exc)
|
||||
|
||||
def _fill_is_ours(self, event: Any) -> bool:
|
||||
"""Attribute a WS fill on the shared VST account to PINK or a foreign
|
||||
system (PRODGREEN/BLUE/manual).
|
||||
|
||||
Order of evidence:
|
||||
1. clientOrderId prefix "p-" — definitive PINK signature (BingX WS
|
||||
currently does NOT echo it, but honour it if that changes).
|
||||
2. A non-empty foreign clientOrderId — definitively not ours.
|
||||
3. Symbol membership in the set of symbols PINK has ordered this
|
||||
session (incl. the reconciled slot asset). PINK trades multiple
|
||||
assets (BTC, TRX, ALGO, …) so this must NOT be a hardcoded symbol.
|
||||
4. Unattributable (no cid, no symbol) — process it; the
|
||||
ACCOUNT_UPDATE reseed bounds any contamination.
|
||||
"""
|
||||
cid = str(getattr(event, "client_order_id", "") or "")
|
||||
if cid.startswith("p-"):
|
||||
return True
|
||||
if cid:
|
||||
return False
|
||||
sym = str(getattr(event, "symbol", "") or "").upper()
|
||||
if not sym:
|
||||
return True
|
||||
own = self._own_fill_symbols or set()
|
||||
return sym in own
|
||||
|
||||
async def _run_account_stream(self) -> None:
|
||||
"""
|
||||
Background task: WS stream → kernel.on_account_event() → reconcile gate.
|
||||
@@ -529,6 +633,26 @@ class PinkDirectRuntime:
|
||||
try:
|
||||
async for event in stream.subscribe():
|
||||
if event.kind in {ExchangeEventKind.FULL_FILL, ExchangeEventKind.PARTIAL_FILL}:
|
||||
# Skip fills PINK does not own (e.g. PRODGREEN trading on
|
||||
# the shared VST account). BingX WS delivers all fills on
|
||||
# the listen key regardless of origin. Processing a foreign
|
||||
# fill contaminates K without a matching PINK intent,
|
||||
# causing a persistent reconcile delta. Ownership is by
|
||||
# clientOrderId / session symbol set — PINK is multi-asset
|
||||
# (BTC, TRX, ALGO, …), never a hardcoded symbol.
|
||||
if self._fill_is_ours(event):
|
||||
# Latch for the exec-router TTL handler: an own fill
|
||||
# just happened; do NOT trust a stale-flat venue
|
||||
# snapshot to justify re-quoting (2026-06-10 live
|
||||
# double-entry: REST reconcile lagged the WS fill).
|
||||
self._last_own_fill_mono = time.monotonic()
|
||||
if not self._fill_is_ours(event):
|
||||
self.logger.info(
|
||||
"Foreign fill skipped: symbol=%s qty=%s cid=%r oid=%s",
|
||||
event.symbol, event.fill_qty,
|
||||
event.client_order_id, event.order_id,
|
||||
)
|
||||
continue
|
||||
# Immediately predict+fold fee from model so K tracks E
|
||||
# without waiting for FILL_SETTLED. When FILL_SETTLED
|
||||
# arrives with the actual fee, it replaces the prediction
|
||||
@@ -583,6 +707,21 @@ class PinkDirectRuntime:
|
||||
# crash recovery + session-to-session calibration continuity.
|
||||
_persist_kernel_snapshot(self.kernel, self.logger)
|
||||
elif event.kind == ExchangeEventKind.ACCOUNT_UPDATE:
|
||||
# BingX WS also sends position/margin-only ACCOUNT_UPDATE
|
||||
# frames with no USDT balance entry; the parser yields
|
||||
# wallet_balance=0 for those. They carry no E-facts —
|
||||
# folding them zeroes e_available_margin (available_capital
|
||||
# =0.0) against a stale e_wallet and re-runs reconcile on
|
||||
# stale data (the 2026-06-09 stuck-freeze). Drop them.
|
||||
if not (event.wallet_balance and event.wallet_balance > 0):
|
||||
continue
|
||||
# Re-seed K on every balance-bearing ACCOUNT_UPDATE (poll
|
||||
# gap-backfill AND live WS). The exchange is the ledger of
|
||||
# record; the update carries the post-trade wallet balance
|
||||
# (wb = cash), so seeding from it keeps K ≈ E even when a
|
||||
# shared-account system (e.g. PRODGREEN) trades and moves
|
||||
# wb without PINK making a fill.
|
||||
self.kernel.reset_and_seed(float(event.wallet_balance))
|
||||
result = self.kernel.on_account_event({
|
||||
"kind": "ACCOUNT_UPDATE",
|
||||
"wallet_balance": event.wallet_balance,
|
||||
@@ -781,6 +920,303 @@ class PinkDirectRuntime:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ── Execution-router drivers ─────────────────────────────────────────────
|
||||
# The router (dita_v2/exec_router.py) is pure policy; these methods are
|
||||
# the only place its decisions touch the kernel/venue. Every await is
|
||||
# followed by a state re-check: fills race cancels on a live venue.
|
||||
|
||||
_SLOT_OPENISH = ("PARTIAL_FILL", "POSITION_OPENED", "POSITION_OPEN",
|
||||
"EXIT_REQUESTED", "EXIT_SENT", "EXIT_ACKED", "EXIT_WORKING",
|
||||
"POSITION_PARTIALLY_CLOSED")
|
||||
|
||||
def _exec_slot_view(self) -> tuple[str, str, float]:
|
||||
"""(trade_id, fsm_state_name, size) of slot 0 — tolerant of mocks."""
|
||||
try:
|
||||
slot = self.kernel.slot(0)
|
||||
except Exception:
|
||||
return "", "", 0.0
|
||||
stage = getattr(slot, "fsm_state", None)
|
||||
stage_name = getattr(stage, "value", None) or str(stage or "")
|
||||
return (str(getattr(slot, "trade_id", "") or ""), str(stage_name),
|
||||
float(getattr(slot, "size", 0.0) or 0.0))
|
||||
|
||||
def _exec_plan_for(self, decision: Any, kernel_intent: KernelIntent,
|
||||
snapshot: Any) -> Any:
|
||||
router = self.exec_router
|
||||
if router is None:
|
||||
return None
|
||||
try:
|
||||
ref = float(getattr(kernel_intent, "reference_price", 0.0) or 0.0)
|
||||
side = getattr(kernel_intent, "side", None)
|
||||
side_name = getattr(side, "value", None) or str(side or "SHORT")
|
||||
if decision.action == DecisionAction.ENTER:
|
||||
return router.plan_entry(
|
||||
trade_id=kernel_intent.trade_id, asset=kernel_intent.asset,
|
||||
position_side=side_name, reference_price=ref,
|
||||
)
|
||||
return router.plan_exit(
|
||||
trade_id=kernel_intent.trade_id, asset=kernel_intent.asset,
|
||||
position_side=side_name, reference_price=ref,
|
||||
reason=str(getattr(decision, "reason", "") or ""),
|
||||
)
|
||||
except Exception as exc:
|
||||
# Router failure must never block trading — degrade to legacy taker.
|
||||
self.logger.warning("EXEC_ROUTER plan failed (%s) — taker fallback", exc)
|
||||
return None
|
||||
|
||||
def _exec_after_submit(self, plan: Any, kernel_intent: KernelIntent,
|
||||
outcome: Any) -> None:
|
||||
"""Classify a maker submit: filled now, resting, or rejected.
|
||||
|
||||
Resting and rejected both register as working — a rejected post-only
|
||||
quote registers with an already-expired deadline so the TTL loop
|
||||
resolves it through the one shared miss/fallback path within a tick.
|
||||
"""
|
||||
router = self.exec_router
|
||||
if router is None:
|
||||
return
|
||||
try:
|
||||
tid = kernel_intent.trade_id
|
||||
slot_tid, stage, size = self._exec_slot_view()
|
||||
filled = (
|
||||
(plan.action == "ENTER" and slot_tid == tid and size > 0.0
|
||||
and stage in self._SLOT_OPENISH)
|
||||
or (plan.action == "EXIT" and (
|
||||
slot_tid != tid or stage in ("POSITION_CLOSED", "CLOSED",
|
||||
"TRADE_TERMINAL_WRITTEN", "IDLE")))
|
||||
)
|
||||
if filled:
|
||||
self._emit("exec_router", event="immediate_fill", action=plan.action,
|
||||
trade_id=tid, reason=plan.reason)
|
||||
return
|
||||
rejected = stage in ("ORDER_REJECTED", "EXIT_REJECTED")
|
||||
wo = router.register_working(
|
||||
trade_id=tid, asset=kernel_intent.asset,
|
||||
position_side=(getattr(kernel_intent.side, "value", None)
|
||||
or str(kernel_intent.side)),
|
||||
plan=plan,
|
||||
base_trade_id=plan.metadata.get("base_trade_id") or tid,
|
||||
retry_n=int(plan.metadata.get("retry_n", 0) or 0),
|
||||
)
|
||||
self._working_intents[tid] = kernel_intent
|
||||
if rejected:
|
||||
wo.deadline = router.clock() # resolve immediately via TTL loop
|
||||
self._emit("exec_router", event="working", action=plan.action,
|
||||
trade_id=tid, limit_price=plan.limit_price,
|
||||
ttl_s=plan.ttl_s, rejected=rejected, reason=plan.reason)
|
||||
self.logger.info("EXEC_ROUTER working %s %s @ %.10g ttl=%.1fs%s",
|
||||
plan.action, tid, plan.limit_price, plan.ttl_s,
|
||||
" (post-only REJECTED — instant resolve)" if rejected else "")
|
||||
except Exception as exc:
|
||||
self.logger.warning("EXEC_ROUTER after-submit failed: %s", exc)
|
||||
|
||||
async def _exec_cancel_working(self, trade_id: str, *, reason: str) -> None:
|
||||
"""Cancel a working quote via the kernel (idempotent on the venue)."""
|
||||
router = self.exec_router
|
||||
intent = (self._working_intents or {}).get(trade_id)
|
||||
if router is None or router.working(trade_id) is None:
|
||||
return
|
||||
try:
|
||||
base = intent
|
||||
cancel_intent = KernelIntent(
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
intent_id=f"{trade_id}-cxl",
|
||||
trade_id=trade_id,
|
||||
slot_id=0,
|
||||
asset=(base.asset if base is not None else ""),
|
||||
side=(base.side if base is not None else DitaTradeSide.SHORT),
|
||||
action=KernelCommandType.CANCEL,
|
||||
reference_price=(base.reference_price if base is not None else 0.0),
|
||||
target_size=(base.target_size if base is not None else 0.0),
|
||||
leverage=(base.leverage if base is not None else 1.0),
|
||||
reason=f"exec_router:{reason}",
|
||||
)
|
||||
await self.kernel.process_intent_async(cancel_intent)
|
||||
except Exception as exc:
|
||||
self.logger.warning("EXEC_ROUTER cancel %s failed: %s", trade_id, exc)
|
||||
router.note_cancel(trade_id)
|
||||
(self._working_intents or {}).pop(trade_id, None)
|
||||
self._emit("exec_router", event="cancel", trade_id=trade_id, reason=reason)
|
||||
|
||||
def _exec_safe_to_requote(self, wo: Any) -> bool:
|
||||
"""True only when the venue is provably flat for a re-quote.
|
||||
|
||||
Fails SAFE (returns False) on: a recent own fill (REST reconcile lags
|
||||
WS fills by seconds), any live exchange position, or any probe error.
|
||||
"""
|
||||
if time.monotonic() - (self._last_own_fill_mono or 0.0) < 5.0:
|
||||
return False
|
||||
try:
|
||||
venue = self.kernel.venue
|
||||
rows = venue.open_positions() if hasattr(venue, "open_positions") else []
|
||||
for row in rows or []:
|
||||
qty = abs(float(row.get("positionAmt") or row.get("positionQty")
|
||||
or row.get("qty") or 0.0))
|
||||
if qty > 1e-9:
|
||||
return False
|
||||
except Exception as exc:
|
||||
self.logger.warning("EXEC_ROUTER requote probe failed (%s) — fail safe", exc)
|
||||
return False
|
||||
return True
|
||||
|
||||
async def _exec_ttl_loop(self) -> None:
|
||||
"""1 s sweep: resolve expired maker quotes (fill-check → cancel →
|
||||
miss policy / exit escalation). Scan cadence (~10 s) is too coarse
|
||||
for 5–8 s TTLs, hence the dedicated task."""
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(1.0)
|
||||
router = self.exec_router
|
||||
if router is None:
|
||||
continue
|
||||
for wo in router.expired():
|
||||
try:
|
||||
await self._handle_expired_working(wo)
|
||||
except Exception as exc:
|
||||
self.logger.error("EXEC_ROUTER expiry handler failed for %s: %s",
|
||||
wo.trade_id, exc, exc_info=True)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
|
||||
async def _handle_expired_working(self, wo: Any) -> None:
|
||||
router = self.exec_router
|
||||
if router is None or router.working(wo.trade_id) is None:
|
||||
return # already resolved (fill/cancel notification raced us)
|
||||
|
||||
# 1. Drain any late venue events first — the quote may already be filled.
|
||||
await self.pump_venue_events()
|
||||
if router.working(wo.trade_id) is None:
|
||||
return
|
||||
|
||||
def _entry_filled() -> bool:
|
||||
slot_tid, stage, size = self._exec_slot_view()
|
||||
return slot_tid == wo.trade_id and size > 0.0 and stage in self._SLOT_OPENISH
|
||||
|
||||
def _exit_done() -> bool:
|
||||
slot_tid, stage, size = self._exec_slot_view()
|
||||
return (slot_tid != wo.trade_id or size <= 0.0
|
||||
or stage in ("POSITION_CLOSED", "CLOSED",
|
||||
"TRADE_TERMINAL_WRITTEN", "IDLE"))
|
||||
|
||||
# 2. Cancel the quote (idempotent; CANCEL_REJECT on a filled order is
|
||||
# harmless). For a partially-filled entry this cancels the remainder.
|
||||
intent = (self._working_intents or {}).get(wo.trade_id)
|
||||
try:
|
||||
cancel_intent = KernelIntent(
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
intent_id=f"{wo.trade_id}-ttlcxl",
|
||||
trade_id=wo.trade_id,
|
||||
slot_id=0,
|
||||
asset=wo.asset,
|
||||
side=(intent.side if intent is not None else DitaTradeSide.SHORT),
|
||||
action=KernelCommandType.CANCEL,
|
||||
reference_price=(intent.reference_price if intent is not None else 0.0),
|
||||
target_size=(intent.target_size if intent is not None else 0.0),
|
||||
leverage=(intent.leverage if intent is not None else 1.0),
|
||||
reason="exec_router:ttl_expired",
|
||||
)
|
||||
await self.kernel.process_intent_async(cancel_intent)
|
||||
except Exception as exc:
|
||||
self.logger.warning("EXEC_ROUTER ttl-cancel %s failed: %s", wo.trade_id, exc)
|
||||
await self.pump_venue_events()
|
||||
|
||||
# 3. Re-classify after the cancel round-trip (fill may have raced it).
|
||||
if wo.action == "ENTER" and _entry_filled():
|
||||
router.note_fill(wo.trade_id)
|
||||
(self._working_intents or {}).pop(wo.trade_id, None)
|
||||
self._emit("exec_router", event="fill_after_ttl", trade_id=wo.trade_id)
|
||||
return
|
||||
if wo.action == "EXIT" and _exit_done():
|
||||
router.note_fill(wo.trade_id)
|
||||
(self._working_intents or {}).pop(wo.trade_id, None)
|
||||
self._emit("exec_router", event="fill_after_ttl", trade_id=wo.trade_id)
|
||||
return
|
||||
|
||||
router.note_cancel(wo.trade_id)
|
||||
base_intent = (self._working_intents or {}).pop(wo.trade_id, None)
|
||||
|
||||
# 4. EXIT: never strand a position — escalate to MARKET, same trade_id.
|
||||
if wo.action == "EXIT":
|
||||
_tid, plan = router.market_fallback_plan(wo)
|
||||
if base_intent is not None and not _exit_done():
|
||||
market_exit = replace(
|
||||
base_intent,
|
||||
intent_id=f"{wo.trade_id}-mkt",
|
||||
order_type="MARKET", limit_price=0.0,
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
metadata={**(base_intent.metadata or {}),
|
||||
"_time_in_force": "GTC",
|
||||
"_exec_reason": plan.reason},
|
||||
)
|
||||
market_exit = self._exit_intent_from_slot(market_exit)
|
||||
self.logger.warning("EXEC_ROUTER exit TTL → MARKET fallback %s", wo.trade_id)
|
||||
self._emit("exec_router", event="exit_market_fallback", trade_id=wo.trade_id)
|
||||
await self.kernel.process_intent_async(market_exit)
|
||||
await self.pump_venue_events()
|
||||
return
|
||||
|
||||
# 5. ENTER miss policy: skip | retry (bounded) | market.
|
||||
action = router.entry_miss_action(wo)
|
||||
self._emit("exec_router", event="entry_miss", trade_id=wo.trade_id,
|
||||
action=action, retry_n=wo.retry_n)
|
||||
if action == MissAction.SKIP or base_intent is None:
|
||||
self.logger.info("EXEC_ROUTER entry miss %s → skip", wo.trade_id)
|
||||
return
|
||||
slot_tid, stage, size = self._exec_slot_view()
|
||||
if size > 0.0 or stage in self._SLOT_OPENISH:
|
||||
# Slot occupied (raced fill of remainder / another trade) — never
|
||||
# double-enter.
|
||||
self.logger.warning("EXEC_ROUTER entry miss %s: slot busy (%s) — skip",
|
||||
wo.trade_id, stage)
|
||||
return
|
||||
# Venue-truth gate (live double-entry fix, 2026-06-10): the kernel slot
|
||||
# can show flat while the venue holds a position the REST reconcile has
|
||||
# not surfaced yet. Re-quote ONLY when the venue is provably flat AND
|
||||
# no own fill landed in the hot window. Ambiguity → skip; a skipped
|
||||
# entry is always safe, a doubled position is not.
|
||||
if not self._exec_safe_to_requote(wo):
|
||||
self.logger.warning(
|
||||
"EXEC_ROUTER entry miss %s: venue not provably flat "
|
||||
"(recent own fill or live position) — skip requote", wo.trade_id)
|
||||
self._emit("exec_router", event="requote_blocked", trade_id=wo.trade_id)
|
||||
return
|
||||
ref = 0.0
|
||||
try:
|
||||
# Multi-asset: prefer the working order's OWN asset history; the
|
||||
# alias deque may track a different effective symbol.
|
||||
hist = None
|
||||
if self._price_histories is not None:
|
||||
hist = self._price_histories.get(str(wo.asset or "").upper())
|
||||
if not hist:
|
||||
hist = self._price_history
|
||||
if hist:
|
||||
ref = float(hist[-1])
|
||||
except Exception:
|
||||
ref = 0.0
|
||||
if ref <= 0.0:
|
||||
ref = float(wo.plan.limit_price or 0.0)
|
||||
if action == MissAction.RETRY:
|
||||
new_tid, plan = router.retry_plan(wo, reference_price=ref)
|
||||
else: # MARKET
|
||||
new_tid, plan = router.market_fallback_plan(wo)
|
||||
new_intent = replace(
|
||||
base_intent,
|
||||
trade_id=new_tid, intent_id=new_tid,
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
reference_price=(ref if ref > 0.0 else base_intent.reference_price),
|
||||
order_type=plan.order_type,
|
||||
limit_price=float(plan.limit_price or 0.0),
|
||||
metadata={**(base_intent.metadata or {}),
|
||||
"_time_in_force": ("PostOnly" if plan.post_only else "GTC"),
|
||||
"_exec_reason": plan.reason},
|
||||
)
|
||||
self.logger.info("EXEC_ROUTER entry %s → %s as %s", wo.trade_id,
|
||||
action, new_tid)
|
||||
outcome = await self.kernel.process_intent_async(new_intent)
|
||||
if plan.is_maker:
|
||||
self._exec_after_submit(plan, new_intent, outcome)
|
||||
await self.pump_venue_events()
|
||||
|
||||
async def pump_venue_events(
|
||||
self, snapshot: Any | None = None, *, market_state: Any = None
|
||||
) -> int:
|
||||
@@ -823,6 +1259,30 @@ class PinkDirectRuntime:
|
||||
) != KernelDiagnosticCode.DUPLICATE_EVENT:
|
||||
applied.append(event)
|
||||
|
||||
# Resolve working maker quotes against applied venue truth so the
|
||||
# TTL loop never cancels an order the venue already terminalised.
|
||||
if self.exec_router is not None:
|
||||
for event in applied:
|
||||
tid = str(getattr(event, "trade_id", "") or "")
|
||||
if not tid or self.exec_router.working(tid) is None:
|
||||
continue
|
||||
kind = getattr(getattr(event, "kind", None), "value", "") or str(
|
||||
getattr(event, "kind", "") or "")
|
||||
status = getattr(getattr(event, "status", None), "value", "") or str(
|
||||
getattr(event, "status", "") or "")
|
||||
if "FULL_FILL" in kind or status == "FILLED":
|
||||
self.exec_router.note_fill(tid)
|
||||
(self._working_intents or {}).pop(tid, None)
|
||||
elif "CANCEL_ACK" in kind or status in ("CANCELED", "CANCELLED"):
|
||||
# Venue-side cancel (incl. post-only reject surfacing via
|
||||
# reconcile): do NOT drop the working order here — pull its
|
||||
# deadline to now so the TTL loop resolves it through the
|
||||
# one shared miss/escalation path (retry/market/skip per
|
||||
# config; exits always escalate to MARKET).
|
||||
wo = self.exec_router.working(tid)
|
||||
if wo is not None:
|
||||
wo.deadline = self.exec_router.clock()
|
||||
|
||||
if applied and self.persistence is not None:
|
||||
slot_dict = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {}
|
||||
persist_snapshot = snapshot
|
||||
@@ -846,6 +1306,26 @@ class PinkDirectRuntime:
|
||||
# Exits are never frozen — only new ENTERs are blocked on reconcile ERROR.
|
||||
if getattr(self, "_enter_frozen", False):
|
||||
return "account reconcile ERROR — new ENTERs frozen until K≈E restored"
|
||||
# SINGLE-SLOT INVARIANT (2026-06-10, after two live double-entries):
|
||||
# never ENTER while the exchange shows ANY open position. A filled
|
||||
# maker entry vanishes from openOrders; the symbol-less reconcile
|
||||
# fetches no fills, reads the disappearance as a cancel, frees the
|
||||
# slot, and the next scan re-enters → 2x position. The exchange
|
||||
# position list is the truth that survives that misclassification.
|
||||
if time.monotonic() - (getattr(self, "_last_own_fill_mono", 0.0) or 0.0) < 5.0:
|
||||
return "own fill within hot window — entry deferred until state settles"
|
||||
try:
|
||||
venue = self.kernel.venue
|
||||
rows = venue.open_positions() if hasattr(venue, "open_positions") else []
|
||||
for row in rows or []:
|
||||
qty = abs(float(row.get("positionAmt") or row.get("positionQty")
|
||||
or row.get("qty") or 0.0))
|
||||
if qty > 1e-9:
|
||||
return (f"exchange holds open position "
|
||||
f"({row.get('symbol')} qty={qty}) — single-slot ENTER blocked")
|
||||
except Exception as exc:
|
||||
self.logger.warning("entry venue-position probe failed (%s) — blocking ENTER", exc)
|
||||
return "venue position probe failed — fail safe, no ENTER without proof"
|
||||
"""Return why an ENTER's sizing inputs are unsafe, or None if sound.
|
||||
|
||||
notional = capital × fraction × leverage is self-limiting; the only way
|
||||
@@ -891,6 +1371,110 @@ class PinkDirectRuntime:
|
||||
exit_size = policy_size if policy_ok else 0.0
|
||||
return replace(kernel_intent, target_size=exit_size)
|
||||
|
||||
def _effective_snapshot(self, snapshot: MarketSnapshot) -> tuple[MarketSnapshot, str]:
|
||||
"""Retarget the regime snapshot onto the TRADE asset (BLUE parity).
|
||||
|
||||
The data feed delivers the BTC-anchored eigen scan (regime signal:
|
||||
vel_div, irp). The TRADE asset is a separate concern — BLUE selects
|
||||
it per signal from the scan's ~50-asset universe via IRP (SYSTEM
|
||||
BIBLE §5). Rules, in priority order:
|
||||
|
||||
1. Slot occupied → the slot's asset, priced from the universe
|
||||
payload. Exits MUST evaluate the position's own price, never
|
||||
the regime anchor's.
|
||||
2. Flat + picker warm → IRP top candidate for the SHORT regime.
|
||||
3. Flat + picker cold / no candidate → entries suppressed
|
||||
(returned block reason; BLUE has no BTC fallback when IRP is
|
||||
enabled).
|
||||
|
||||
Returns (snapshot, enter_block_reason). Empty reason = entries OK.
|
||||
Also feeds the picker/sizer one observation per NEW scan (deduped on
|
||||
scan_number inside the components).
|
||||
"""
|
||||
import dataclasses as _dc
|
||||
|
||||
payload = snapshot.scan_payload if isinstance(snapshot.scan_payload, dict) else {}
|
||||
scan_number = int(payload.get("scan_number") or snapshot.scan_number or 0)
|
||||
vel_div = payload.get("vel_div")
|
||||
if vel_div is None:
|
||||
vel_div = snapshot.velocity_divergence
|
||||
if self.alpha_sizer is not None and vel_div is not None:
|
||||
try:
|
||||
self.alpha_sizer.observe(vel_div, scan_number)
|
||||
except Exception:
|
||||
pass
|
||||
if self.asset_picker is None:
|
||||
return snapshot, ""
|
||||
try:
|
||||
self.asset_picker.observe(payload, scan_number)
|
||||
except Exception as exc:
|
||||
self.logger.warning("asset picker observe failed: %s", exc)
|
||||
|
||||
# 1. Open/working slot → follow its asset.
|
||||
slot_asset = ""
|
||||
try:
|
||||
if self.kernel.max_slots > 0:
|
||||
slot = self.kernel.slot(0)
|
||||
if not slot.is_free():
|
||||
slot_asset = str(getattr(slot, "asset", "") or "").upper()
|
||||
except Exception:
|
||||
slot_asset = ""
|
||||
if slot_asset:
|
||||
if slot_asset == str(snapshot.symbol).upper():
|
||||
return snapshot, ""
|
||||
px = self.asset_picker.price_of(slot_asset)
|
||||
if px is None or px <= 0:
|
||||
# No universe price for the slot asset (e.g. adopted stray).
|
||||
# Exits must not be evaluated at the anchor's price — a BTC
|
||||
# price against a stray's entry price would fire a bogus SL.
|
||||
self.logger.error(
|
||||
"no universe price for open slot asset %s — policy step "
|
||||
"degraded to HOLD (exit eval needs the asset's own price)",
|
||||
slot_asset)
|
||||
return snapshot, f"all:no price for open slot asset {slot_asset}"
|
||||
return _dc.replace(snapshot, symbol=slot_asset, price=float(px)), ""
|
||||
|
||||
# 2./3. Flat → IRP pick.
|
||||
if not self.asset_picker.warm:
|
||||
return snapshot, "IRP picker warming up — universe history incomplete"
|
||||
choice = None
|
||||
try:
|
||||
choice = self.asset_picker.pick(direction=-1)
|
||||
except Exception as exc:
|
||||
self.logger.warning("asset picker rank failed: %s", exc)
|
||||
if choice is None:
|
||||
return snapshot, "no IRP candidate passed gates (BLUE: no fallback asset)"
|
||||
asset, px, ars = choice
|
||||
asset = str(asset).upper()
|
||||
if asset == str(snapshot.symbol).upper():
|
||||
return snapshot, ""
|
||||
return _dc.replace(snapshot, symbol=asset, price=float(px)), ""
|
||||
|
||||
def _sizer_trade_feedback(self, acc: dict, slot_dict: dict) -> None:
|
||||
"""Close-out detection → feed realized PnL into the alpha layers.
|
||||
|
||||
Capital-delta PnL (net of fees) — the kernel's capital is the
|
||||
authoritative ledger, and bucket/streak multipliers only need the
|
||||
sign and rough magnitude.
|
||||
"""
|
||||
if self.alpha_sizer is None or not self._sizer_open_tid:
|
||||
return
|
||||
open_tid = str(slot_dict.get("trade_id") or "") if slot_dict else ""
|
||||
still_open = (
|
||||
open_tid == self._sizer_open_tid
|
||||
and float(slot_dict.get("size") or 0.0) > 0
|
||||
and not slot_dict.get("closed", False)
|
||||
)
|
||||
if still_open:
|
||||
return
|
||||
pnl = float(acc.get("capital") or 0.0) - self._sizer_entry_capital
|
||||
self._sizer_open_tid = ""
|
||||
try:
|
||||
self.alpha_sizer.record_close(pnl)
|
||||
self.logger.info("alpha sizer feedback: trade closed pnl=%.4f", pnl)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def step(self, snapshot: MarketSnapshot) -> Decision:
|
||||
"""Single policy + execution cycle.
|
||||
|
||||
@@ -903,6 +1487,11 @@ class PinkDirectRuntime:
|
||||
6. Persist
|
||||
"""
|
||||
market_state = self._update_market_state_runtime(snapshot)
|
||||
# BLUE-parity retarget (2026-06-10): regime signal stays BTC-anchored,
|
||||
# the TRADE asset comes from the slot (exits) or the IRP picker
|
||||
# (entries). Must run BEFORE the fill pump / policy so every consumer
|
||||
# below sees the trade asset's own price.
|
||||
snapshot, _enter_block = self._effective_snapshot(snapshot)
|
||||
# Drain any late fills BEFORE the policy reads slot/account state, so a
|
||||
# resting LIMIT that filled since the last cycle is reflected.
|
||||
await self.pump_venue_events(snapshot, market_state=market_state)
|
||||
@@ -939,7 +1528,16 @@ class PinkDirectRuntime:
|
||||
closed=False,
|
||||
)
|
||||
|
||||
# Price history for DC gate — update before decide() so current tick is included
|
||||
# Sizer feedback: detect a trade that closed since the last cycle.
|
||||
self._sizer_trade_feedback(acc, slot_dict)
|
||||
|
||||
# Price history for DC gate — per effective asset (multi-asset since
|
||||
# 2026-06-10). _price_history aliases the current asset's deque so the
|
||||
# DC gate below reads a single-asset series.
|
||||
if self._price_histories is not None:
|
||||
from collections import deque as _deque
|
||||
_sym = str(snapshot.symbol or "").upper()
|
||||
self._price_history = self._price_histories.setdefault(_sym, _deque(maxlen=10))
|
||||
if self._price_history is not None and snapshot.price and snapshot.price > 0:
|
||||
self._price_history.append(float(snapshot.price))
|
||||
|
||||
@@ -955,6 +1553,15 @@ class PinkDirectRuntime:
|
||||
# dc_skip_contradicts = True → rising price during short window = HOLD.
|
||||
dc_blocked = self._dc_contradicts()
|
||||
decision = self.decision_engine.decide(snapshot, context, legacy_position)
|
||||
if _enter_block:
|
||||
_block_all = _enter_block.startswith("all:")
|
||||
if decision.action == DecisionAction.ENTER or (
|
||||
_block_all and decision.action == DecisionAction.EXIT
|
||||
):
|
||||
import dataclasses
|
||||
decision = dataclasses.replace(decision, action=DecisionAction.HOLD, reason="ASSET_PICKER_BLOCK")
|
||||
self.logger.info("action blocked by asset picker: %s (vel_div=%.4f scan=%d)",
|
||||
_enter_block, self._last_vel_div, self._last_scan_number)
|
||||
if dc_blocked and decision.action == DecisionAction.ENTER:
|
||||
import dataclasses
|
||||
decision = dataclasses.replace(decision, action=DecisionAction.HOLD_DC_CONTRADICTED, reason="DC_CONTRADICT")
|
||||
@@ -971,10 +1578,15 @@ class PinkDirectRuntime:
|
||||
intent = plan.intent
|
||||
|
||||
# ACB boost (SYSTEM BIBLE §10): multiply intent leverage by the current boost
|
||||
# factor from acb_processor_service. Capped at exchange_leverage_cap (3x).
|
||||
# factor from acb_processor_service. Capped at the STRATEGY max leverage
|
||||
# (decision config) — intent.leverage is fractional conviction; the
|
||||
# integer exchange cap is applied separately at the venue boundary
|
||||
# (map_internal_conviction_to_exchange_leverage). The old hardcoded
|
||||
# min(3.0, …) silently clamped BLUE-parity conviction.
|
||||
if self._last_acb_boost != 1.0 and intent is not None:
|
||||
import dataclasses as _dc
|
||||
boosted_lev = min(3.0, max(1.0, float(intent.leverage or 1.0) * self._last_acb_boost))
|
||||
_lev_cap = float(getattr(self.decision_engine.config, "max_leverage", 3.0) or 3.0)
|
||||
boosted_lev = min(_lev_cap, max(0.5, float(intent.leverage or 1.0) * self._last_acb_boost))
|
||||
intent = _dc.replace(intent, leverage=boosted_lev)
|
||||
|
||||
if decision.action in {DecisionAction.ENTER, DecisionAction.EXIT}:
|
||||
@@ -1014,8 +1626,63 @@ class PinkDirectRuntime:
|
||||
# overshoot an open position.
|
||||
kernel_intent = self._exit_intent_from_slot(kernel_intent)
|
||||
|
||||
# ── Execution router: decide HOW this intent reaches the venue ──
|
||||
# (taker MARKET vs post-only maker LIMIT). Policy only — sizing,
|
||||
# signal and TP logic above are untouched. plan=None or
|
||||
# style=taker leaves kernel_intent exactly as built (legacy path).
|
||||
exec_plan = self._exec_plan_for(decision, kernel_intent, snapshot)
|
||||
if exec_plan is not None and exec_plan.suppress:
|
||||
# Duplicate guard: a working maker quote already represents
|
||||
# this action. Do not double-submit.
|
||||
self.logger.info("EXEC_ROUTER suppress %s %s: %s",
|
||||
decision.action.value, kernel_intent.trade_id,
|
||||
exec_plan.reason)
|
||||
self._emit("exec_router", event="suppress",
|
||||
action=decision.action.value,
|
||||
trade_id=kernel_intent.trade_id, reason=exec_plan.reason)
|
||||
return decision
|
||||
if exec_plan is not None and exec_plan.metadata.get("preempt_working"):
|
||||
# Urgent exit preempting a resting maker exit: cancel the
|
||||
# quote first so the MARKET close cannot double-fill.
|
||||
await self._exec_cancel_working(kernel_intent.trade_id,
|
||||
reason="urgent_exit_preempt")
|
||||
if exec_plan is not None and exec_plan.order_type == "LIMIT":
|
||||
kernel_intent = replace(
|
||||
kernel_intent,
|
||||
order_type="LIMIT",
|
||||
limit_price=float(exec_plan.limit_price),
|
||||
metadata={
|
||||
**(kernel_intent.metadata or {}),
|
||||
"_time_in_force": "PostOnly" if exec_plan.post_only else "GTC",
|
||||
"_exec_reason": exec_plan.reason,
|
||||
},
|
||||
)
|
||||
|
||||
# Register the symbol BEFORE the order can fill so the account
|
||||
# stream attributes the resulting WS fill to PINK.
|
||||
try:
|
||||
if intent.asset and self._own_fill_symbols is not None:
|
||||
self._own_fill_symbols.add(str(intent.asset).upper())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Alpha-layer feedback: remember which bucket sized this ENTER and
|
||||
# the capital baseline, so the close can be credited back.
|
||||
if decision.action == DecisionAction.ENTER and self.alpha_sizer is not None:
|
||||
try:
|
||||
self.alpha_sizer.note_entry()
|
||||
self._sizer_open_tid = str(kernel_intent.trade_id or "")
|
||||
self._sizer_entry_capital = float(context.capital)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
outcome = await self.kernel.process_intent_async(kernel_intent)
|
||||
|
||||
# Maker quotes that did not terminally fill register as working
|
||||
# so the TTL loop can cancel/retry/fall back.
|
||||
if exec_plan is not None and exec_plan.is_maker:
|
||||
self._exec_after_submit(exec_plan, kernel_intent, outcome)
|
||||
|
||||
# Locate the source of any non-finite intent the kernel rejected:
|
||||
# log the full upstream provenance (snapshot price, account capital,
|
||||
# leverage, sizing) so a numerical error can be traced to its origin
|
||||
|
||||
Reference in New Issue
Block a user