67 production .py modules that the running PINK service imports but which were never committed: prod/bingx/ (HTTP client, market/user streams, journal, config), prod/clean_arch/ adapters/persistence/runtime/dita/dita_v2 production modules and their co-located tests. Rule going forward: every module imported by launch_dolphin_pink.py / pink_direct.py must appear in git ls-files. Excludes _backup dirs, __pycache__, and non-code files. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
218 lines
6.8 KiB
Python
218 lines
6.8 KiB
Python
"""Runtime control plane for DITAv2."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import asdict, dataclass, replace
|
|
from enum import Enum
|
|
import os
|
|
import threading
|
|
import time
|
|
from typing import Any, Dict, Mapping, Optional, Protocol
|
|
|
|
from .utils import json_safe
|
|
|
|
|
|
class KernelMode(str, Enum):
|
|
NORMAL = "NORMAL"
|
|
DEBUG = "DEBUG"
|
|
|
|
|
|
class KernelVerbosity(str, Enum):
|
|
QUIET = "QUIET"
|
|
VERBOSE = "VERBOSE"
|
|
TRACE = "TRACE"
|
|
|
|
|
|
class BackendMode(str, Enum):
|
|
MOCK = "MOCK"
|
|
BINGX = "BINGX"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class KernelControlSnapshot:
|
|
"""Control plane state shared across the kernel."""
|
|
|
|
mode: KernelMode = KernelMode.NORMAL
|
|
verbosity: KernelVerbosity = KernelVerbosity.QUIET
|
|
backend_mode: BackendMode = BackendMode.MOCK
|
|
debug_clickhouse_enabled: bool = True
|
|
trace_transitions: bool = False
|
|
mirror_to_hazelcast: bool = True
|
|
active_slot_limit: int = 10
|
|
reconcile_on_restart: bool = True
|
|
runtime_namespace: str = "dita_v2"
|
|
strategy_namespace: str = "dita_v2"
|
|
event_namespace: str = "dita_v2"
|
|
actor_name: str = "ExecutionKernel"
|
|
exec_venue: str = "bingx"
|
|
data_venue: str = "binance"
|
|
ledger_authority: str = "exchange"
|
|
mock_fidelity_mode: str = "bingx_exact_shape"
|
|
|
|
def as_dict(self) -> Dict[str, Any]:
|
|
return dict(asdict(self))
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ControlUpdate:
|
|
"""Partial update to the control plane."""
|
|
|
|
mode: Optional[KernelMode] = None
|
|
verbosity: Optional[KernelVerbosity] = None
|
|
backend_mode: Optional[BackendMode] = None
|
|
debug_clickhouse_enabled: Optional[bool] = None
|
|
trace_transitions: Optional[bool] = None
|
|
mirror_to_hazelcast: Optional[bool] = None
|
|
active_slot_limit: Optional[int] = None
|
|
reconcile_on_restart: Optional[bool] = None
|
|
runtime_namespace: Optional[str] = None
|
|
strategy_namespace: Optional[str] = None
|
|
event_namespace: Optional[str] = None
|
|
actor_name: Optional[str] = None
|
|
exec_venue: Optional[str] = None
|
|
data_venue: Optional[str] = None
|
|
ledger_authority: Optional[str] = None
|
|
mock_fidelity_mode: Optional[str] = None
|
|
|
|
def apply(self, snapshot: KernelControlSnapshot) -> KernelControlSnapshot:
|
|
payload = {
|
|
key: value
|
|
for key, value in asdict(self).items()
|
|
if value is not None
|
|
}
|
|
return replace(snapshot, **payload)
|
|
|
|
|
|
class ControlPlane(Protocol):
|
|
"""Kernel control plane interface."""
|
|
|
|
def read(self) -> KernelControlSnapshot:
|
|
...
|
|
|
|
def update(self, update: ControlUpdate) -> KernelControlSnapshot:
|
|
...
|
|
|
|
def mirror(self) -> Mapping[str, Any]:
|
|
...
|
|
|
|
def wait(self, timeout_ms: int = 1000) -> bool:
|
|
...
|
|
|
|
def notify(self) -> None:
|
|
...
|
|
|
|
|
|
class InMemoryControlPlane:
|
|
"""Local control plane used for tests and the Python prototype."""
|
|
|
|
def __init__(self, snapshot: Optional[KernelControlSnapshot] = None):
|
|
self._snapshot = snapshot or KernelControlSnapshot()
|
|
self._mirror: Dict[str, Any] = {}
|
|
self._seq = 0
|
|
self._observed_seq = 0
|
|
self._signal = threading.Condition()
|
|
|
|
def read(self) -> KernelControlSnapshot:
|
|
return self._snapshot
|
|
|
|
def update(self, update: ControlUpdate) -> KernelControlSnapshot:
|
|
with self._signal:
|
|
self._snapshot = update.apply(self._snapshot)
|
|
self._mirror = self._snapshot.as_dict()
|
|
self._seq += 1
|
|
self._signal.notify_all()
|
|
return self._snapshot
|
|
|
|
def mirror(self) -> Mapping[str, Any]:
|
|
return dict(self._mirror)
|
|
|
|
def wait(self, timeout_ms: int = 1000) -> bool:
|
|
timeout_s = None if timeout_ms is None or timeout_ms < 0 else max(0.0, timeout_ms / 1000.0)
|
|
deadline = None if timeout_s is None else time.monotonic() + timeout_s
|
|
with self._signal:
|
|
observed = self._observed_seq
|
|
while self._seq == observed:
|
|
if deadline is None:
|
|
self._signal.wait()
|
|
continue
|
|
remaining = deadline - time.monotonic()
|
|
if remaining <= 0:
|
|
return False
|
|
self._signal.wait(timeout=remaining)
|
|
self._observed_seq = self._seq
|
|
return True
|
|
|
|
def notify(self) -> None:
|
|
with self._signal:
|
|
self._seq += 1
|
|
self._signal.notify_all()
|
|
|
|
|
|
class ZincControlPlane(InMemoryControlPlane):
|
|
"""In-memory stand-in for a Zinc-backed control region.
|
|
|
|
The class keeps the interface explicit so a real Zinc binding can be
|
|
dropped in later without changing kernel code.
|
|
"""
|
|
|
|
def __init__(self, snapshot: Optional[KernelControlSnapshot] = None):
|
|
super().__init__(snapshot=snapshot)
|
|
self.region: Dict[str, Any] = self._snapshot.as_dict()
|
|
|
|
def update(self, update: ControlUpdate) -> KernelControlSnapshot:
|
|
snapshot = super().update(update)
|
|
self.region = snapshot.as_dict()
|
|
return snapshot
|
|
|
|
def read(self) -> KernelControlSnapshot:
|
|
return self._snapshot
|
|
|
|
|
|
class MirroredControlPlane:
|
|
"""Control plane that mirrors updates to an external durable sink."""
|
|
|
|
def __init__(self, inner: ControlPlane, mirror_sink: Optional[Any] = None):
|
|
self.inner = inner
|
|
self.mirror_sink = mirror_sink
|
|
|
|
def read(self) -> KernelControlSnapshot:
|
|
return self.inner.read()
|
|
|
|
def update(self, update: ControlUpdate) -> KernelControlSnapshot:
|
|
snapshot = self.inner.update(update)
|
|
if self.mirror_sink is not None:
|
|
self.mirror_sink("dita_control_plane", dict(snapshot.as_dict()))
|
|
return snapshot
|
|
|
|
def mirror(self) -> Mapping[str, Any]:
|
|
return self.inner.mirror()
|
|
|
|
|
|
def build_control_plane(
|
|
snapshot: Optional[KernelControlSnapshot] = None,
|
|
*,
|
|
prefer_real_zinc: Optional[bool] = None,
|
|
prefix: str = "dita_v2",
|
|
) -> ControlPlane:
|
|
"""Build the active control plane with an operator-visible switch.
|
|
|
|
The default remains the in-process Zinc stand-in so existing tests and
|
|
callers stay stable. Setting ``DITA_V2_CONTROL_PLANE=REAL_ZINC`` or passing
|
|
``prefer_real_zinc=True`` opts into the shared-memory control plane when
|
|
the Zinc adapter is available.
|
|
"""
|
|
|
|
env_choice = os.environ.get("DITA_V2_CONTROL_PLANE", "").strip().upper()
|
|
real_requested = prefer_real_zinc if prefer_real_zinc is not None else env_choice in {"REAL", "REAL_ZINC", "SHARED", "SHARED_MEM"}
|
|
if real_requested:
|
|
try:
|
|
from .real_control_plane import RealZincControlPlane
|
|
|
|
plane = RealZincControlPlane(prefix=prefix, create=True)
|
|
if snapshot is not None:
|
|
plane.update(ControlUpdate(**{key: value for key, value in snapshot.as_dict().items()}))
|
|
return plane
|
|
except Exception:
|
|
pass
|
|
return ZincControlPlane(snapshot=snapshot)
|