2026-06-03 13:26:36 +02:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
from typing import Any, Optional, Protocol
|
|
|
|
|
|
|
|
|
|
from .contracts import KernelTransition, TradeSlot
|
|
|
|
|
from .control import KernelControlSnapshot
|
|
|
|
|
from .journal import _transition_row
|
|
|
|
|
from .projection import build_position_state_row
|
|
|
|
|
from .utils import json_safe
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── Fire-and-forget Hz write helpers ─────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def _hz_write_no_wait(hz_map: Any, key: str, value: str) -> None:
|
|
|
|
|
"""Submit Hz write to the client's internal thread pool. Never blocks.
|
|
|
|
|
|
|
|
|
|
.put() without .blocking() returns a hazelcast Future immediately.
|
|
|
|
|
The Future is intentionally discarded — the network write is already
|
|
|
|
|
queued in the Hz client's thread pool and is not cancelled by GC.
|
|
|
|
|
Hz writes are observability-only; any failure must never affect trading.
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
hz_map.put(key, value)
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _json_encode(payload: dict) -> str:
|
|
|
|
|
return json.dumps(payload, separators=(",", ":"), ensure_ascii=False, default=str)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _utcnow_iso() -> str:
|
|
|
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _today_iso() -> str:
|
|
|
|
|
return datetime.now(timezone.utc).date().isoformat()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HazelcastClientLike(Protocol):
|
|
|
|
|
def get_map(self, name: str): ...
|
|
|
|
|
def get_topic(self, name: str): ...
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HazelcastProjector:
|
|
|
|
|
"""Durable BLUE/PINK-compatible projection mirror."""
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
client: HazelcastClientLike | None = None,
|
|
|
|
|
*,
|
|
|
|
|
active_slots_map: str = "dita_active_slots",
|
|
|
|
|
events_topic: str = "dita_trade_events",
|
|
|
|
|
) -> None:
|
|
|
|
|
self.client = client
|
|
|
|
|
self.active_slots_map = active_slots_map
|
|
|
|
|
self.events_topic = events_topic
|
|
|
|
|
|
|
|
|
|
def publish_slot(self, slot: TradeSlot) -> None:
|
|
|
|
|
if self.client is None:
|
|
|
|
|
return
|
|
|
|
|
self.client.get_map(self.active_slots_map).put(slot.trade_id, build_position_state_row(slot))
|
|
|
|
|
|
|
|
|
|
def publish_event(self, event_type: str, payload: dict[str, Any]) -> None:
|
|
|
|
|
if self.client is None:
|
|
|
|
|
return
|
|
|
|
|
topic = self.client.get_topic(self.events_topic)
|
|
|
|
|
topic.publish(
|
|
|
|
|
json.dumps(
|
|
|
|
|
{"event_type": event_type, "payload": json_safe(payload)},
|
|
|
|
|
ensure_ascii=False,
|
|
|
|
|
sort_keys=True,
|
|
|
|
|
default=str,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HazelcastRowWriter:
|
|
|
|
|
"""Callback bridge for ``HazelcastProjection`` writer hooks."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, client: HazelcastClientLike) -> None:
|
|
|
|
|
self.client = client
|
|
|
|
|
|
|
|
|
|
def __call__(self, name: str, row: dict[str, Any]) -> None:
|
|
|
|
|
if name.endswith("trade_events"):
|
|
|
|
|
self.client.get_topic(name).publish(
|
|
|
|
|
json.dumps(row, ensure_ascii=False, sort_keys=True, default=str)
|
|
|
|
|
)
|
|
|
|
|
return
|
|
|
|
|
if name.endswith("control"):
|
|
|
|
|
key = "control"
|
|
|
|
|
else:
|
|
|
|
|
key = str(row.get("trade_id", row.get("slot_id", row.get("event_id", ""))))
|
|
|
|
|
self.client.get_map(name).put(key, json_safe(row))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── PINK DITAv2 non-blocking Hz state writer ──────────────────────────────────
|
|
|
|
|
|
|
|
|
|
class PinkHzStateWriter:
|
|
|
|
|
"""Non-blocking Hz writer for PINK DITAv2 kernel state.
|
|
|
|
|
|
|
|
|
|
Dedicated Hz client (separate from the data-feed read client).
|
|
|
|
|
All writes are fire-and-forget: .put() returns a Future that is intentionally
|
|
|
|
|
discarded. A failed write = missed TUI update only — never affects trading.
|
|
|
|
|
|
|
|
|
|
BLUE-compatible schema (same shape as DOLPHIN_STATE_BLUE) written to
|
|
|
|
|
DOLPHIN_STATE_PINK / DOLPHIN_PNL_PINK — no overlap with BLUE maps.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
cluster: str,
|
|
|
|
|
host: str,
|
|
|
|
|
state_map_name: str,
|
|
|
|
|
pnl_map_name: str,
|
|
|
|
|
) -> None:
|
|
|
|
|
import hazelcast
|
|
|
|
|
self._client = hazelcast.HazelcastClient(
|
|
|
|
|
cluster_name=cluster,
|
|
|
|
|
cluster_members=[host],
|
|
|
|
|
)
|
|
|
|
|
# Non-blocking proxies (.put() returns Future, does NOT block)
|
|
|
|
|
self._state_map = self._client.get_map(state_map_name)
|
|
|
|
|
self._pnl_map = self._client.get_map(pnl_map_name)
|
|
|
|
|
|
|
|
|
|
def write_engine_snapshot(
|
|
|
|
|
self,
|
|
|
|
|
slot_dict: dict,
|
|
|
|
|
acc_dict: dict,
|
|
|
|
|
posture: str = "APEX",
|
|
|
|
|
our_leverage: float = 0.0,
|
PINK: TUI Hz fix + DC gate + ACB boost + 10 new tests (104/104 green)
TUI Hz fix:
- hazelcast_projection.py: write_engine_snapshot now writes all NAUTILUS-era
field aliases (trades_executed, current_leverage, open_positions as list,
last_scan_number, last_vel_div, vol_ok, open_notional) so gear_rows/capital
panel work with no TUI changes.
- dolphin_status_pink.py: _normalize_eng_for_tui() safety-net translation added;
render() uses it on every Hz read.
DC gate (SYSTEM BIBLE §4.2, champion config):
- pink_direct.py: _dc_contradicts() — 7-tick lookback, 0.75 bps threshold.
Rising price (chg > 0.75 bps) blocks ENTER via dataclasses.replace(HOLD, DC_CONTRADICT).
Price history deque initialized in connect(); dc_skip_contradicts=True enforced.
ACB boost (SYSTEM BIBLE §10):
- hazelcast_feed.py: fix wrong key "latest_acb" → "acb_boost" (DOLPHIN_FEATURES key
written by acb_processor_service.py).
- pink_direct.py: _last_acb_boost read from scan_payload["acb_boost"] first (scan
bridge may embed it), then Hz direct fallback. Applied to intent.leverage via
dataclasses.replace() after IntentEngine.plan(), capped at 3x.
- _last_scan_number, _last_vel_div, _last_vol_ok tracked from scan_payload.
OBF gate: NOT implemented. OBF shards (DOLPHIN_FEATURES_SHARD_*) require new
Hz map connections + symbol routing. Gap documented; requires separate decision.
Tests: TestDCGate (5) + TestNormalizeEngForTui (5) — 10 new, 104 total, all green.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 14:00:48 +02:00
|
|
|
scan_number: int = 0,
|
|
|
|
|
vel_div: float = 0.0,
|
|
|
|
|
vol_ok: bool = True,
|
2026-06-03 13:26:36 +02:00
|
|
|
) -> None:
|
PINK: TUI Hz fix + DC gate + ACB boost + 10 new tests (104/104 green)
TUI Hz fix:
- hazelcast_projection.py: write_engine_snapshot now writes all NAUTILUS-era
field aliases (trades_executed, current_leverage, open_positions as list,
last_scan_number, last_vel_div, vol_ok, open_notional) so gear_rows/capital
panel work with no TUI changes.
- dolphin_status_pink.py: _normalize_eng_for_tui() safety-net translation added;
render() uses it on every Hz read.
DC gate (SYSTEM BIBLE §4.2, champion config):
- pink_direct.py: _dc_contradicts() — 7-tick lookback, 0.75 bps threshold.
Rising price (chg > 0.75 bps) blocks ENTER via dataclasses.replace(HOLD, DC_CONTRADICT).
Price history deque initialized in connect(); dc_skip_contradicts=True enforced.
ACB boost (SYSTEM BIBLE §10):
- hazelcast_feed.py: fix wrong key "latest_acb" → "acb_boost" (DOLPHIN_FEATURES key
written by acb_processor_service.py).
- pink_direct.py: _last_acb_boost read from scan_payload["acb_boost"] first (scan
bridge may embed it), then Hz direct fallback. Applied to intent.leverage via
dataclasses.replace() after IntentEngine.plan(), capped at 3x.
- _last_scan_number, _last_vel_div, _last_vol_ok tracked from scan_payload.
OBF gate: NOT implemented. OBF shards (DOLPHIN_FEATURES_SHARD_*) require new
Hz map connections + symbol routing. Gap documented; requires separate decision.
Tests: TestDCGate (5) + TestNormalizeEngForTui (5) — 10 new, 104 total, all green.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 14:00:48 +02:00
|
|
|
"""Write full engine state. Called after every kernel mutation (non-blocking).
|
|
|
|
|
|
|
|
|
|
Field names mirror DOLPHIN_STATE_BLUE["engine_snapshot"] where possible so
|
|
|
|
|
the existing PINK TUI panels (gear_rows, capital panel, etc.) work without
|
|
|
|
|
modification. DITAv2-specific fields are additive.
|
|
|
|
|
"""
|
|
|
|
|
open_pos_int = int(acc_dict.get("open_positions", 0))
|
|
|
|
|
trade_seq = int(acc_dict.get("trade_seq", 0))
|
|
|
|
|
size = float(slot_dict.get("size") or 0.0)
|
|
|
|
|
ep = float(slot_dict.get("entry_price") or 0.0)
|
|
|
|
|
open_notional = size * ep
|
2026-06-03 13:26:36 +02:00
|
|
|
payload: dict[str, Any] = {
|
PINK: TUI Hz fix + DC gate + ACB boost + 10 new tests (104/104 green)
TUI Hz fix:
- hazelcast_projection.py: write_engine_snapshot now writes all NAUTILUS-era
field aliases (trades_executed, current_leverage, open_positions as list,
last_scan_number, last_vel_div, vol_ok, open_notional) so gear_rows/capital
panel work with no TUI changes.
- dolphin_status_pink.py: _normalize_eng_for_tui() safety-net translation added;
render() uses it on every Hz read.
DC gate (SYSTEM BIBLE §4.2, champion config):
- pink_direct.py: _dc_contradicts() — 7-tick lookback, 0.75 bps threshold.
Rising price (chg > 0.75 bps) blocks ENTER via dataclasses.replace(HOLD, DC_CONTRADICT).
Price history deque initialized in connect(); dc_skip_contradicts=True enforced.
ACB boost (SYSTEM BIBLE §10):
- hazelcast_feed.py: fix wrong key "latest_acb" → "acb_boost" (DOLPHIN_FEATURES key
written by acb_processor_service.py).
- pink_direct.py: _last_acb_boost read from scan_payload["acb_boost"] first (scan
bridge may embed it), then Hz direct fallback. Applied to intent.leverage via
dataclasses.replace() after IntentEngine.plan(), capped at 3x.
- _last_scan_number, _last_vel_div, _last_vol_ok tracked from scan_payload.
OBF gate: NOT implemented. OBF shards (DOLPHIN_FEATURES_SHARD_*) require new
Hz map connections + symbol routing. Gap documented; requires separate decision.
Tests: TestDCGate (5) + TestNormalizeEngForTui (5) — 10 new, 104 total, all green.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 14:00:48 +02:00
|
|
|
# Core (BLUE-compatible names)
|
2026-06-03 13:26:36 +02:00
|
|
|
"strategy": "pink",
|
|
|
|
|
"capital": acc_dict.get("capital", 0.0),
|
|
|
|
|
"equity": acc_dict.get("equity", 0.0),
|
|
|
|
|
"available_capital": acc_dict.get("available_capital", 0.0),
|
|
|
|
|
"pnl": acc_dict.get("realized_pnl_total", 0.0),
|
|
|
|
|
"fee_total": acc_dict.get("fee_total", 0.0),
|
|
|
|
|
"posture": posture,
|
|
|
|
|
"capital_frozen": bool(acc_dict.get("capital_frozen", False)),
|
PINK: TUI Hz fix + DC gate + ACB boost + 10 new tests (104/104 green)
TUI Hz fix:
- hazelcast_projection.py: write_engine_snapshot now writes all NAUTILUS-era
field aliases (trades_executed, current_leverage, open_positions as list,
last_scan_number, last_vel_div, vol_ok, open_notional) so gear_rows/capital
panel work with no TUI changes.
- dolphin_status_pink.py: _normalize_eng_for_tui() safety-net translation added;
render() uses it on every Hz read.
DC gate (SYSTEM BIBLE §4.2, champion config):
- pink_direct.py: _dc_contradicts() — 7-tick lookback, 0.75 bps threshold.
Rising price (chg > 0.75 bps) blocks ENTER via dataclasses.replace(HOLD, DC_CONTRADICT).
Price history deque initialized in connect(); dc_skip_contradicts=True enforced.
ACB boost (SYSTEM BIBLE §10):
- hazelcast_feed.py: fix wrong key "latest_acb" → "acb_boost" (DOLPHIN_FEATURES key
written by acb_processor_service.py).
- pink_direct.py: _last_acb_boost read from scan_payload["acb_boost"] first (scan
bridge may embed it), then Hz direct fallback. Applied to intent.leverage via
dataclasses.replace() after IntentEngine.plan(), capped at 3x.
- _last_scan_number, _last_vel_div, _last_vol_ok tracked from scan_payload.
OBF gate: NOT implemented. OBF shards (DOLPHIN_FEATURES_SHARD_*) require new
Hz map connections + symbol routing. Gap documented; requires separate decision.
Tests: TestDCGate (5) + TestNormalizeEngForTui (5) — 10 new, 104 total, all green.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 14:00:48 +02:00
|
|
|
"updated_at": _utcnow_iso(),
|
|
|
|
|
# TUI-compatible aliases (NAUTILUS-era field names expected by gear_rows etc.)
|
|
|
|
|
"trades_executed": trade_seq,
|
|
|
|
|
"current_leverage": our_leverage,
|
|
|
|
|
"leverage_abs_cap": 3.0,
|
|
|
|
|
"open_notional": open_notional,
|
|
|
|
|
"open_positions": [slot_dict] if open_pos_int > 0 else [],
|
|
|
|
|
"last_scan_number": scan_number,
|
|
|
|
|
"scans_processed": scan_number,
|
|
|
|
|
"last_vel_div": vel_div,
|
|
|
|
|
"vol_ok": vol_ok,
|
|
|
|
|
"bar_idx": scan_number,
|
|
|
|
|
# DITAv2-native fields
|
|
|
|
|
"trade_seq": trade_seq,
|
2026-06-03 13:26:36 +02:00
|
|
|
"our_leverage": our_leverage,
|
|
|
|
|
"slot": slot_dict,
|
|
|
|
|
}
|
|
|
|
|
_hz_write_no_wait(self._state_map, "engine_snapshot", _json_encode(payload))
|
PINK: TUI Hz fix + DC gate + ACB boost + 10 new tests (104/104 green)
TUI Hz fix:
- hazelcast_projection.py: write_engine_snapshot now writes all NAUTILUS-era
field aliases (trades_executed, current_leverage, open_positions as list,
last_scan_number, last_vel_div, vol_ok, open_notional) so gear_rows/capital
panel work with no TUI changes.
- dolphin_status_pink.py: _normalize_eng_for_tui() safety-net translation added;
render() uses it on every Hz read.
DC gate (SYSTEM BIBLE §4.2, champion config):
- pink_direct.py: _dc_contradicts() — 7-tick lookback, 0.75 bps threshold.
Rising price (chg > 0.75 bps) blocks ENTER via dataclasses.replace(HOLD, DC_CONTRADICT).
Price history deque initialized in connect(); dc_skip_contradicts=True enforced.
ACB boost (SYSTEM BIBLE §10):
- hazelcast_feed.py: fix wrong key "latest_acb" → "acb_boost" (DOLPHIN_FEATURES key
written by acb_processor_service.py).
- pink_direct.py: _last_acb_boost read from scan_payload["acb_boost"] first (scan
bridge may embed it), then Hz direct fallback. Applied to intent.leverage via
dataclasses.replace() after IntentEngine.plan(), capped at 3x.
- _last_scan_number, _last_vel_div, _last_vol_ok tracked from scan_payload.
OBF gate: NOT implemented. OBF shards (DOLPHIN_FEATURES_SHARD_*) require new
Hz map connections + symbol routing. Gap documented; requires separate decision.
Tests: TestDCGate (5) + TestNormalizeEngForTui (5) — 10 new, 104 total, all green.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 14:00:48 +02:00
|
|
|
# Compact "latest" — same shape as BLUE's DOLPHIN_STATE_BLUE["latest"]
|
2026-06-03 13:26:36 +02:00
|
|
|
_hz_write_no_wait(self._state_map, "latest", _json_encode({
|
|
|
|
|
"strategy": "pink",
|
|
|
|
|
"capital": payload["capital"],
|
|
|
|
|
"date": _today_iso(),
|
|
|
|
|
"pnl": payload["pnl"],
|
PINK: TUI Hz fix + DC gate + ACB boost + 10 new tests (104/104 green)
TUI Hz fix:
- hazelcast_projection.py: write_engine_snapshot now writes all NAUTILUS-era
field aliases (trades_executed, current_leverage, open_positions as list,
last_scan_number, last_vel_div, vol_ok, open_notional) so gear_rows/capital
panel work with no TUI changes.
- dolphin_status_pink.py: _normalize_eng_for_tui() safety-net translation added;
render() uses it on every Hz read.
DC gate (SYSTEM BIBLE §4.2, champion config):
- pink_direct.py: _dc_contradicts() — 7-tick lookback, 0.75 bps threshold.
Rising price (chg > 0.75 bps) blocks ENTER via dataclasses.replace(HOLD, DC_CONTRADICT).
Price history deque initialized in connect(); dc_skip_contradicts=True enforced.
ACB boost (SYSTEM BIBLE §10):
- hazelcast_feed.py: fix wrong key "latest_acb" → "acb_boost" (DOLPHIN_FEATURES key
written by acb_processor_service.py).
- pink_direct.py: _last_acb_boost read from scan_payload["acb_boost"] first (scan
bridge may embed it), then Hz direct fallback. Applied to intent.leverage via
dataclasses.replace() after IntentEngine.plan(), capped at 3x.
- _last_scan_number, _last_vel_div, _last_vol_ok tracked from scan_payload.
OBF gate: NOT implemented. OBF shards (DOLPHIN_FEATURES_SHARD_*) require new
Hz map connections + symbol routing. Gap documented; requires separate decision.
Tests: TestDCGate (5) + TestNormalizeEngForTui (5) — 10 new, 104 total, all green.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 14:00:48 +02:00
|
|
|
"trades": trade_seq,
|
2026-06-03 13:26:36 +02:00
|
|
|
"posture": posture,
|
|
|
|
|
"updated_at": payload["updated_at"],
|
|
|
|
|
}))
|
|
|
|
|
|
|
|
|
|
def write_daily_pnl(self, acc_dict: dict, posture: str = "APEX") -> None:
|
|
|
|
|
"""Write per-date PnL row. Called on trade close only."""
|
|
|
|
|
_hz_write_no_wait(self._pnl_map, _today_iso(), _json_encode({
|
|
|
|
|
"pnl": acc_dict.get("realized_pnl_total", 0.0),
|
|
|
|
|
"capital": acc_dict.get("capital", 0.0),
|
|
|
|
|
"trades": int(acc_dict.get("trade_seq", 0)),
|
|
|
|
|
"posture": posture,
|
|
|
|
|
}))
|
|
|
|
|
|
|
|
|
|
def close(self) -> None:
|
|
|
|
|
try:
|
|
|
|
|
self._client.shutdown()
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|