"""Python prototype of the Zinc hot-path plane. This is an in-memory stand-in for the eventual Zinc-backed shared memory regions. The interface is explicit so the implementation can be swapped later without touching the kernel logic. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Dict, Iterable, List, Mapping, Optional, Protocol import threading import time from .contracts import KernelIntent, TradeSlot from .control import KernelControlSnapshot class ZincPlane(Protocol): """Hot-path plane for intents, state and control.""" def publish_intent(self, intent: KernelIntent) -> None: ... def write_slot(self, slot: TradeSlot) -> None: ... def read_slots(self) -> List[TradeSlot]: ... def update_control(self, control: KernelControlSnapshot) -> None: ... def read_control(self) -> KernelControlSnapshot: ... def wait_on_intent(self, timeout_ms: int = 1000) -> bool: ... def notify_intent(self) -> None: ... def wait_on_state(self, timeout_ms: int = 1000) -> bool: ... def notify_state(self) -> None: ... def wait_on_control(self, timeout_ms: int = 1000) -> bool: ... def notify_control(self) -> None: ... @dataclass class InMemoryZincPlane: """Simple in-memory Zinc lookalike for Python prototype tests.""" intent_region: List[KernelIntent] = field(default_factory=list) state_region: Dict[int, TradeSlot] = field(default_factory=dict) control_region: Optional[KernelControlSnapshot] = None _intent_seq: int = field(default=0, init=False, repr=False) _state_seq: int = field(default=0, init=False, repr=False) _control_seq: int = field(default=0, init=False, repr=False) _intent_observed_seq: int = field(default=0, init=False, repr=False) _state_observed_seq: int = field(default=0, init=False, repr=False) _control_observed_seq: int = field(default=0, init=False, repr=False) _signal: threading.Condition = field(default_factory=threading.Condition, init=False, repr=False) def publish_intent(self, intent: KernelIntent) -> None: with self._signal: self.intent_region.append(intent) self._intent_seq += 1 self._signal.notify_all() def write_slot(self, slot: TradeSlot) -> None: with self._signal: self.state_region[int(slot.slot_id)] = slot self._state_seq += 1 self._signal.notify_all() def read_slots(self) -> List[TradeSlot]: return [self.state_region[key] for key in sorted(self.state_region)] def update_control(self, control: KernelControlSnapshot) -> None: with self._signal: self.control_region = control self._control_seq += 1 self._signal.notify_all() def read_control(self) -> KernelControlSnapshot: if self.control_region is None: return KernelControlSnapshot() return self.control_region def wait_on_intent(self, timeout_ms: int = 1000) -> bool: return self._wait_for_change("_intent_seq", "_intent_observed_seq", timeout_ms) def notify_intent(self) -> None: with self._signal: self._intent_seq += 1 self._signal.notify_all() def wait_on_state(self, timeout_ms: int = 1000) -> bool: return self._wait_for_change("_state_seq", "_state_observed_seq", timeout_ms) def notify_state(self) -> None: with self._signal: self._state_seq += 1 self._signal.notify_all() def wait_on_control(self, timeout_ms: int = 1000) -> bool: return self._wait_for_change("_control_seq", "_control_observed_seq", timeout_ms) def notify_control(self) -> None: with self._signal: self._control_seq += 1 self._signal.notify_all() def _wait_for_change(self, seq_attr: str, observed_attr: str, timeout_ms: int) -> bool: timeout_s = None if timeout_ms is None or timeout_ms < 0 else max(0.0, timeout_ms / 1000.0) deadline = None if timeout_s is None else time.monotonic() + timeout_s with self._signal: observed = getattr(self, observed_attr) while getattr(self, seq_attr) == observed: if deadline is None: self._signal.wait() continue remaining = deadline - time.monotonic() if remaining <= 0: return False self._signal.wait(timeout=remaining) setattr(self, observed_attr, getattr(self, seq_attr)) return True