from __future__ import annotations import json from datetime import datetime, timezone from typing import Any, Optional, Protocol from .contracts import KernelTransition, TradeSlot from .control import KernelControlSnapshot from .journal import _transition_row from .projection import build_position_state_row from .utils import json_safe # ── Fire-and-forget Hz write helpers ───────────────────────────────────────── def _hz_write_no_wait(hz_map: Any, key: str, value: str) -> None: """Submit Hz write to the client's internal thread pool. Never blocks. .put() without .blocking() returns a hazelcast Future immediately. The Future is intentionally discarded — the network write is already queued in the Hz client's thread pool and is not cancelled by GC. Hz writes are observability-only; any failure must never affect trading. """ try: hz_map.put(key, value) except Exception: pass def _json_encode(payload: dict) -> str: return json.dumps(payload, separators=(",", ":"), ensure_ascii=False, default=str) def _utcnow_iso() -> str: return datetime.now(timezone.utc).isoformat() def _today_iso() -> str: return datetime.now(timezone.utc).date().isoformat() class HazelcastClientLike(Protocol): def get_map(self, name: str): ... def get_topic(self, name: str): ... class HazelcastProjector: """Durable BLUE/PINK-compatible projection mirror.""" def __init__( self, client: HazelcastClientLike | None = None, *, active_slots_map: str = "dita_active_slots", events_topic: str = "dita_trade_events", ) -> None: self.client = client self.active_slots_map = active_slots_map self.events_topic = events_topic def publish_slot(self, slot: TradeSlot) -> None: if self.client is None: return self.client.get_map(self.active_slots_map).put(slot.trade_id, build_position_state_row(slot)) def publish_event(self, event_type: str, payload: dict[str, Any]) -> None: if self.client is None: return topic = self.client.get_topic(self.events_topic) topic.publish( json.dumps( {"event_type": event_type, "payload": json_safe(payload)}, ensure_ascii=False, sort_keys=True, default=str, ) ) class HazelcastRowWriter: """Callback bridge for ``HazelcastProjection`` writer hooks.""" def __init__(self, client: HazelcastClientLike) -> None: self.client = client def __call__(self, name: str, row: dict[str, Any]) -> None: if name.endswith("trade_events"): self.client.get_topic(name).publish( json.dumps(row, ensure_ascii=False, sort_keys=True, default=str) ) return if name.endswith("control"): key = "control" else: key = str(row.get("trade_id", row.get("slot_id", row.get("event_id", "")))) self.client.get_map(name).put(key, json_safe(row)) # ── PINK DITAv2 non-blocking Hz state writer ────────────────────────────────── class PinkHzStateWriter: """Non-blocking Hz writer for PINK DITAv2 kernel state. Dedicated Hz client (separate from the data-feed read client). All writes are fire-and-forget: .put() returns a Future that is intentionally discarded. A failed write = missed TUI update only — never affects trading. BLUE-compatible schema (same shape as DOLPHIN_STATE_BLUE) written to DOLPHIN_STATE_PINK / DOLPHIN_PNL_PINK — no overlap with BLUE maps. """ def __init__( self, cluster: str, host: str, state_map_name: str, pnl_map_name: str, ) -> None: import hazelcast self._client = hazelcast.HazelcastClient( cluster_name=cluster, cluster_members=[host], ) # Non-blocking proxies (.put() returns Future, does NOT block) self._state_map = self._client.get_map(state_map_name) self._pnl_map = self._client.get_map(pnl_map_name) def write_engine_snapshot( self, slot_dict: dict, acc_dict: dict, posture: str = "APEX", our_leverage: float = 0.0, ) -> None: """Write full engine state. Called after every kernel mutation (non-blocking).""" payload: dict[str, Any] = { "strategy": "pink", "capital": acc_dict.get("capital", 0.0), "equity": acc_dict.get("equity", 0.0), "available_capital": acc_dict.get("available_capital", 0.0), "pnl": acc_dict.get("realized_pnl_total", 0.0), "fee_total": acc_dict.get("fee_total", 0.0), "open_positions": int(acc_dict.get("open_positions", 0)), "trade_seq": int(acc_dict.get("trade_seq", 0)), "posture": posture, "capital_frozen": bool(acc_dict.get("capital_frozen", False)), "our_leverage": our_leverage, "slot": slot_dict, "updated_at": _utcnow_iso(), } _hz_write_no_wait(self._state_map, "engine_snapshot", _json_encode(payload)) # Compact "latest" key — same shape as BLUE's DOLPHIN_STATE_BLUE["latest"] _hz_write_no_wait(self._state_map, "latest", _json_encode({ "strategy": "pink", "capital": payload["capital"], "date": _today_iso(), "pnl": payload["pnl"], "trades": payload["trade_seq"], "posture": posture, "updated_at": payload["updated_at"], })) def write_daily_pnl(self, acc_dict: dict, posture: str = "APEX") -> None: """Write per-date PnL row. Called on trade close only.""" _hz_write_no_wait(self._pnl_map, _today_iso(), _json_encode({ "pnl": acc_dict.get("realized_pnl_total", 0.0), "capital": acc_dict.get("capital", 0.0), "trades": int(acc_dict.get("trade_seq", 0)), "posture": posture, })) def close(self) -> None: try: self._client.shutdown() except Exception: pass