PINK: async submit + process_intent hot path; async/race flaw audit (pass 5)
N2/N3/N4 (3x Critical async bugs): - BingxVenueAdapter.submit_async(): awaits backend.submit_intent() directly in caller's event loop — no thread-pool, no asyncio.run(), no _backend_snapshot() - ExecutionKernel.process_intent_async(): same FSM guard logic as sync version; replaces venue.submit() with await venue.submit_async(); sync process_intent() untouched so all 122 tests stay green - pink_direct.step() line 952: process_intent() -> await process_intent_async() restore_state JSON parse (test fix): - ExecutionKernel.restore_state() wraps Rust FFI in try/except JSONDecodeError returns False; matches documented contract; test_restore_corrupt_json_rejected passes FLAWS doc: pass 5 table added; 21 total fixed; Z6/N5 marked resolved Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -890,6 +890,123 @@ class ExecutionKernel:
|
||||
self._record_transitions(outcome.transitions, final_slot, None)
|
||||
return final_outcome
|
||||
|
||||
async def process_intent_async(self, intent: KernelIntent) -> KernelOutcome:
|
||||
"""Async variant of process_intent for use from async step() loops.
|
||||
|
||||
Identical guard logic and Rust FSM call (both sync/fast). Only the
|
||||
venue.submit() call is replaced with await venue.submit_async() so it
|
||||
runs in the caller's event loop — no thread-pool, no cross-loop deadlock.
|
||||
|
||||
The sync process_intent() is kept for tests and backward-compat callers.
|
||||
Race safety: asyncio is single-threaded cooperative; the Rust FSM call is
|
||||
atomic (no await between FSM state change and venue call); during the
|
||||
venue.submit_async await the FSM is in ENTRY/EXIT_WORKING which blocks any
|
||||
competing ENTER via SLOT_BUSY — so no slot can be double-entered.
|
||||
"""
|
||||
self.zinc_plane.publish_intent(intent)
|
||||
if not (0 <= int(intent.slot_id) < self.max_slots):
|
||||
return KernelOutcome(
|
||||
accepted=False,
|
||||
slot_id=int(intent.slot_id),
|
||||
trade_id=intent.trade_id,
|
||||
state=TradeStage.IDLE,
|
||||
diagnostic_code=KernelDiagnosticCode.INVALID_SLOT_ID,
|
||||
details={"reason": "INVALID_SLOT_ID", "slot_id": int(intent.slot_id), "intent_id": intent.intent_id},
|
||||
)
|
||||
bad_field = _first_invalid_intent_field(intent)
|
||||
if bad_field is not None:
|
||||
name, value = bad_field
|
||||
return KernelOutcome(
|
||||
accepted=False,
|
||||
slot_id=int(intent.slot_id),
|
||||
trade_id=intent.trade_id,
|
||||
state=self._get_slot(int(intent.slot_id)).fsm_state,
|
||||
diagnostic_code=KernelDiagnosticCode.INVALID_INTENT,
|
||||
severity=KernelSeverity.WARNING,
|
||||
details={"reason": "INVALID_INTENT", "field": name, "value": str(value),
|
||||
"intent_id": intent.intent_id, "action": intent.action.value, "asset": intent.asset},
|
||||
)
|
||||
# ── Rust FSM (sync, atomic, μs-fast — no await here) ─────────────────
|
||||
payload = _intent_to_payload(intent)
|
||||
result = _get_rust().process_intent(
|
||||
self._backend, payload,
|
||||
mode=_enum_text(self.control.mode),
|
||||
verbosity=_enum_text(self.control.verbosity),
|
||||
)
|
||||
outcome = _outcome_from_payload(result["outcome"])
|
||||
self.state.refresh()
|
||||
if intent.action == KernelCommandType.ENTER and outcome.accepted:
|
||||
self._last_settled_pnl[intent.slot_id] = 0.0
|
||||
emitted_events: List[VenueEvent] = []
|
||||
all_venue_transitions: List[KernelTransition] = []
|
||||
if outcome.accepted and intent.action in {KernelCommandType.ENTER, KernelCommandType.EXIT}:
|
||||
# ── Venue I/O (async, main event loop — no cross-loop deadlock) ──
|
||||
submit_async = getattr(self.venue, "submit_async", None)
|
||||
try:
|
||||
if submit_async is not None:
|
||||
emitted_events = await submit_async(intent)
|
||||
else:
|
||||
emitted_events = self.venue.submit(intent) # fallback: mock/test venue
|
||||
except Exception as _submit_exc:
|
||||
import logging as _log
|
||||
_log.getLogger(__name__).error(
|
||||
"venue.submit_async failed (%s) — synthetic REJECTED, FSM rollback slot=%d action=%s",
|
||||
_submit_exc, intent.slot_id, intent.action.value,
|
||||
)
|
||||
emitted_events = [VenueEvent(
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
event_id=f"{intent.trade_id}:submit_error",
|
||||
trade_id=intent.trade_id, slot_id=intent.slot_id,
|
||||
kind=KernelEventKind.ORDER_REJECT, status=VenueEventStatus.REJECTED,
|
||||
venue_order_id="", venue_client_id="",
|
||||
side=intent.side, asset=intent.asset,
|
||||
price=0.0, size=float(intent.target_size or 0.0),
|
||||
filled_size=0.0, remaining_size=float(intent.target_size or 0.0),
|
||||
reason=f"VENUE_SUBMIT_ERROR:{_submit_exc}", raw_payload={},
|
||||
metadata={"intent_id": intent.intent_id, "action": intent.action.value},
|
||||
)]
|
||||
for event in emitted_events:
|
||||
evt_outcome = self.on_venue_event(event)
|
||||
all_venue_transitions.extend(evt_outcome.transitions)
|
||||
elif intent.action == KernelCommandType.CANCEL:
|
||||
slot_view = self.slot(intent.slot_id)
|
||||
if slot_view.active_exit_order is not None:
|
||||
emitted_events = self.venue.cancel(slot_view.active_exit_order, reason=intent.reason)
|
||||
elif slot_view.active_entry_order is not None and slot_view.fsm_state in {
|
||||
TradeStage.ENTRY_WORKING, TradeStage.ORDER_REQUESTED, TradeStage.ORDER_SENT, TradeStage.IDLE,
|
||||
}:
|
||||
emitted_events = self.venue.cancel(slot_view.active_entry_order, reason=intent.reason)
|
||||
else:
|
||||
emitted_events = []
|
||||
for event in emitted_events:
|
||||
evt_outcome = self.on_venue_event(event)
|
||||
all_venue_transitions.extend(evt_outcome.transitions)
|
||||
final_slot = self._get_slot(outcome.slot_id)
|
||||
rate_limit_event = next((e for e in emitted_events if e.kind == KernelEventKind.RATE_LIMITED), None)
|
||||
if rate_limit_event is not None:
|
||||
rl = dict(outcome.details)
|
||||
rl.update({"reason": rate_limit_event.reason or "RATE_LIMITED",
|
||||
"retry_after_ms": int(rate_limit_event.metadata.get("retry_after_ms", 0) or 0),
|
||||
"venue_event_kind": rate_limit_event.kind.value,
|
||||
"severity": KernelSeverity.WARNING.value, "release_eta": "few minutes", "retryable": True})
|
||||
outcome = KernelOutcome(accepted=False, slot_id=outcome.slot_id, trade_id=outcome.trade_id,
|
||||
state=final_slot.fsm_state, diagnostic_code=KernelDiagnosticCode.RATE_LIMITED,
|
||||
severity=KernelSeverity.WARNING, transitions=outcome.transitions,
|
||||
emitted_events=outcome.emitted_events, details=rl)
|
||||
all_transitions = list(outcome.transitions) + all_venue_transitions
|
||||
final_outcome = KernelOutcome(
|
||||
accepted=outcome.accepted, slot_id=outcome.slot_id, trade_id=final_slot.trade_id,
|
||||
state=final_slot.fsm_state, diagnostic_code=outcome.diagnostic_code,
|
||||
transitions=tuple(all_transitions), emitted_events=tuple(emitted_events), details=dict(outcome.details),
|
||||
)
|
||||
slots = [self._get_slot(i) for i in range(self.max_slots)]
|
||||
self.account.observe_slots(slots)
|
||||
current = self._get_slot(final_slot.slot_id)
|
||||
self.projection.write_slot(current)
|
||||
self.zinc_plane.write_slot(current)
|
||||
self._record_transitions(outcome.transitions, final_slot, None)
|
||||
return final_outcome
|
||||
|
||||
def on_venue_event(self, event: VenueEvent) -> KernelOutcome:
|
||||
result = _get_rust().on_venue_event(
|
||||
self._backend,
|
||||
@@ -1021,7 +1138,10 @@ class ExecutionKernel:
|
||||
|
||||
Safe to call on a fresh kernel (e.g. after startup) before any trades.
|
||||
"""
|
||||
return _get_rust().restore_state(self._backend, json_str)
|
||||
try:
|
||||
return _get_rust().restore_state(self._backend, json_str)
|
||||
except (ValueError, json.JSONDecodeError):
|
||||
return False
|
||||
|
||||
def is_capital_frozen(self) -> bool:
|
||||
"""Return True if the kernel's capital is frozen (reconcile ERROR active).
|
||||
|
||||
Reference in New Issue
Block a user