Files
siloqy/prod/clean_arch/dita_v2/zinc_plane.py
Codex 3d7b00e28d Snapshot PINK DITAv2 system + Sprint 0 flaw-fix verification
First commit of the previously-untracked PINK-on-DITAv2 migration system
(execution moves to the Rust kernel; policy stays on legacy DITA, so Alpha
Engine algorithmic integrity is preserved). BLUE is untouched.

Sprint 0 (safety snapshot + flaw-fix verification, MARKET single-leg scope):
- Verified Rust FSM fixes (flaws 2,4,10,11,13) by source read of lib.rs.
- Hardened 5 vacuous/guarded assertions in test_flaws.py so each flaw test
  genuinely exercises its fix. Most important: Flaw 5 now asserts capital
  moves by EXACTLY realized PnL (was entering/exiting at the same price).
- Offline suites: 533 passed, 0 failed (35 flaws + 402 kernel/accounting/
  bridge + 96 runtime/persistence/multi-exit/restart/seams).
- GATE PASS: MARKET-path-critical flaws 1,2,5 confirmed fixed + green.
- Added SPRINT0_FLAW_VERIFICATION.md report and _rust_kernel/.gitignore
  (excludes Rust target/ build artifacts).

LIMIT/partial-fill remain explicitly out of scope (MARKET-only bring-up).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-30 18:26:43 +02:00

136 lines
4.6 KiB
Python

"""Python prototype of the Zinc hot-path plane.
This is an in-memory stand-in for the eventual Zinc-backed shared memory
regions. The interface is explicit so the implementation can be swapped later
without touching the kernel logic.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Mapping, Optional, Protocol
import threading
import time
from .contracts import KernelIntent, TradeSlot
from .control import KernelControlSnapshot
class ZincPlane(Protocol):
"""Hot-path plane for intents, state and control."""
def publish_intent(self, intent: KernelIntent) -> None:
...
def write_slot(self, slot: TradeSlot) -> None:
...
def read_slots(self) -> List[TradeSlot]:
...
def update_control(self, control: KernelControlSnapshot) -> None:
...
def read_control(self) -> KernelControlSnapshot:
...
def wait_on_intent(self, timeout_ms: int = 1000) -> bool:
...
def notify_intent(self) -> None:
...
def wait_on_state(self, timeout_ms: int = 1000) -> bool:
...
def notify_state(self) -> None:
...
def wait_on_control(self, timeout_ms: int = 1000) -> bool:
...
def notify_control(self) -> None:
...
@dataclass
class InMemoryZincPlane:
"""Simple in-memory Zinc lookalike for Python prototype tests."""
intent_region: List[KernelIntent] = field(default_factory=list)
state_region: Dict[int, TradeSlot] = field(default_factory=dict)
control_region: Optional[KernelControlSnapshot] = None
_intent_seq: int = field(default=0, init=False, repr=False)
_state_seq: int = field(default=0, init=False, repr=False)
_control_seq: int = field(default=0, init=False, repr=False)
_intent_observed_seq: int = field(default=0, init=False, repr=False)
_state_observed_seq: int = field(default=0, init=False, repr=False)
_control_observed_seq: int = field(default=0, init=False, repr=False)
_signal: threading.Condition = field(default_factory=threading.Condition, init=False, repr=False)
def publish_intent(self, intent: KernelIntent) -> None:
with self._signal:
self.intent_region.append(intent)
self._intent_seq += 1
self._signal.notify_all()
def write_slot(self, slot: TradeSlot) -> None:
with self._signal:
self.state_region[int(slot.slot_id)] = slot
self._state_seq += 1
self._signal.notify_all()
def read_slots(self) -> List[TradeSlot]:
return [self.state_region[key] for key in sorted(self.state_region)]
def update_control(self, control: KernelControlSnapshot) -> None:
with self._signal:
self.control_region = control
self._control_seq += 1
self._signal.notify_all()
def read_control(self) -> KernelControlSnapshot:
if self.control_region is None:
return KernelControlSnapshot()
return self.control_region
def wait_on_intent(self, timeout_ms: int = 1000) -> bool:
return self._wait_for_change("_intent_seq", "_intent_observed_seq", timeout_ms)
def notify_intent(self) -> None:
with self._signal:
self._intent_seq += 1
self._signal.notify_all()
def wait_on_state(self, timeout_ms: int = 1000) -> bool:
return self._wait_for_change("_state_seq", "_state_observed_seq", timeout_ms)
def notify_state(self) -> None:
with self._signal:
self._state_seq += 1
self._signal.notify_all()
def wait_on_control(self, timeout_ms: int = 1000) -> bool:
return self._wait_for_change("_control_seq", "_control_observed_seq", timeout_ms)
def notify_control(self) -> None:
with self._signal:
self._control_seq += 1
self._signal.notify_all()
def _wait_for_change(self, seq_attr: str, observed_attr: str, timeout_ms: int) -> bool:
timeout_s = None if timeout_ms is None or timeout_ms < 0 else max(0.0, timeout_ms / 1000.0)
deadline = None if timeout_s is None else time.monotonic() + timeout_s
with self._signal:
observed = getattr(self, observed_attr)
while getattr(self, seq_attr) == observed:
if deadline is None:
self._signal.wait()
continue
remaining = deadline - time.monotonic()
if remaining <= 0:
return False
self._signal.wait(timeout=remaining)
setattr(self, observed_attr, getattr(self, seq_attr))
return True