diff --git a/prod/clean_arch/violet/divergence.py b/prod/clean_arch/violet/divergence.py new file mode 100644 index 0000000..51b0503 --- /dev/null +++ b/prod/clean_arch/violet/divergence.py @@ -0,0 +1,220 @@ +"""VIOLET V1: scan-vs-venue price divergence monitor (the FET detector). + +The 2026-06-11 FET incident entered on a scan price of 0.2176 while the +venue traded 0.1878 — a 15% phantom divergence the system had no way to +see. This monitor continuously samples, per asset, the gap between the +scan-plane price (the alpha-side eigen-scan universe) and the execution +venue's mid (BingX bookTicker), and journals one row per (scan, asset) to +``dolphin_violet.violet_feed_divergence``. + +Venue mid sources: +- WS (default): ``prod.bingx.market_stream.BingxMarketStream`` + bookTicker subscription per symbol; +- REST fallback (``DOLPHIN_VIOLET_VENUE_MID_MODE=rest``): 1 s poll of the + public quote endpoint — kept because the WS client is unexercised + in-tree; harden or drop at V2. + +Only PUBLIC data — runs even when VIOLET is DARK (no API keys). +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from dataclasses import dataclass +from typing import Any, Callable, Dict, List, Optional + +from .clock import PlaneClock, mono_ns + +LOGGER = logging.getLogger("violet.divergence") + +Sink = Callable[[str, dict], None] + + +def to_bingx_symbol(asset: str) -> str: + """BTCUSDT → BTC-USDT (idempotent for already-hyphenated input).""" + a = str(asset or "").upper() + if "-" in a or not a.endswith("USDT"): + return a + return f"{a[:-4]}-USDT" + + +def from_bingx_symbol(symbol: str) -> str: + """BTC-USDT → BTCUSDT.""" + return str(symbol or "").upper().replace("-", "") + + +@dataclass +class _VenueMid: + mid: float + mono: int + seq: int + + +class FeedDivergenceMonitor: + """Samples scan-vs-venue divergence and journals it. + + ``on_venue_tick`` is fed by the WS/REST source; ``on_scan`` is called by + the launcher's main loop with each data-feed snapshot. A divergence row + is emitted only when the venue mid is fresher than the venue plane's + staleness budget — a stale mid must never masquerade as live divergence. + """ + + TABLE = "violet_feed_divergence" + + def __init__( + self, + *, + sink: Sink, + scan_clock: PlaneClock, + venue_clock: PlaneClock, + session_id: str, + table: str = TABLE, + logger: logging.Logger | None = None, + ) -> None: + self.sink = sink + self.scan_clock = scan_clock + self.venue_clock = venue_clock + self.session_id = str(session_id) + self.table = table + self.logger = logger or LOGGER + self._mids: Dict[str, _VenueMid] = {} + self._stream: Any = None + self._stream_task: Optional[asyncio.Task] = None + self._rest_task: Optional[asyncio.Task] = None + self.rows_emitted = 0 + + # ── venue side ─────────────────────────────────────────────────────────── + + def on_venue_tick(self, symbol: str, bid: float, ask: float) -> None: + try: + b, a = float(bid), float(ask) + except (TypeError, ValueError): + return + if not (b > 0 and a > 0): + return + seq = self.venue_clock.tick() + self._mids[from_bingx_symbol(symbol)] = _VenueMid( + mid=(b + a) / 2.0, mono=mono_ns(), seq=seq + ) + + async def _on_ws_frame(self, payload: dict) -> None: + """BingX bookTicker frame: {dataType: 'BTC-USDT@bookTicker', + data: {... 'b': bid, 'a': ask ...}} (field names per BingX swap WS).""" + try: + data_type = str(payload.get("dataType", "") or "") + if "@bookTicker" not in data_type: + return + symbol = data_type.split("@", 1)[0] + data = payload.get("data") or {} + bid = data.get("b") or data.get("bidPrice") or data.get("bestBid") + ask = data.get("a") or data.get("askPrice") or data.get("bestAsk") + if bid is None or ask is None: + return + self.on_venue_tick(symbol, float(bid), float(ask)) + except Exception as exc: # noqa: BLE001 — never kill the stream + self.logger.debug("divergence WS frame parse skipped: %s", exc) + + async def _rest_poll_loop(self, symbols: List[str], rest_base_url: str) -> None: + import aiohttp + + url = rest_base_url.rstrip("/") + "/openApi/swap/v2/quote/bookTicker" + async with aiohttp.ClientSession() as session: + while True: + for sym in symbols: + try: + async with session.get( + url, params={"symbol": to_bingx_symbol(sym)}, + timeout=aiohttp.ClientTimeout(total=5), + ) as resp: + body = await resp.json(content_type=None) + data = (body or {}).get("data") or {} + # quote payloads use bidPrice/askPrice (REST shape) + book = data.get("book_ticker") or data + bid = book.get("bid_price") or book.get("bidPrice") or book.get("b") + ask = book.get("ask_price") or book.get("askPrice") or book.get("a") + if bid is not None and ask is not None: + self.on_venue_tick(sym, float(bid), float(ask)) + except Exception as exc: # noqa: BLE001 + self.logger.debug("divergence REST poll %s failed: %s", sym, exc) + await asyncio.sleep(1.0) + + async def start( + self, + symbols: List[str], + *, + mode: str = "ws", + ws_url: str = "wss://open-api-swap.bingx.com/swap-market", + rest_base_url: str = "https://open-api.bingx.com", + ) -> None: + if mode == "rest": + self._rest_task = asyncio.create_task( + self._rest_poll_loop(symbols, rest_base_url), + name="violet_divergence_rest", + ) + self.logger.info("divergence: REST mid poll started for %d symbols", len(symbols)) + return + from prod.bingx.market_stream import BingxMarketStream + + self._stream = BingxMarketStream(ws_url=ws_url, on_event=self._on_ws_frame) + for sym in symbols: + self._stream.subscribe(f"{to_bingx_symbol(sym)}@bookTicker") + self._stream_task = asyncio.create_task( + self._stream.run_forever(), name="violet_divergence_ws" + ) + self.logger.info("divergence: WS bookTicker started for %d symbols", len(symbols)) + + async def stop(self) -> None: + for task in (self._stream_task, self._rest_task): + if task is not None: + task.cancel() + try: + await task + except (asyncio.CancelledError, Exception): + pass + self._stream_task = self._rest_task = None + + # ── scan side ───────────────────────────────────────────────────────────── + + def on_scan(self, snapshot: Any) -> int: + """Called once per data-feed snapshot. Returns rows emitted.""" + payload = getattr(snapshot, "scan_payload", None) + if not isinstance(payload, dict): + return 0 + assets = payload.get("assets") or [] + prices = payload.get("asset_prices") or [] + if not (isinstance(assets, list) and isinstance(prices, list) and assets): + return 0 + scan_seq = self.scan_clock.tick() + now = mono_ns() + emitted = 0 + for asset, price in zip(assets, prices): + try: + scan_price = float(price) + except (TypeError, ValueError): + continue + if scan_price <= 0: + continue + key = str(asset).upper() + vm = self._mids.get(key) + if vm is None: + continue + # Stale venue mid must never masquerade as live divergence. + if (now - vm.mono) > self.venue_clock.staleness_budget_ns: + continue + divergence_bps = (vm.mid - scan_price) / scan_price * 1e4 + self.sink(self.table, { + "ts": int(time.time() * 1000), # DateTime64(3) + "session_id": self.session_id, + "asset": key, + "scan_price": scan_price, + "venue_mid": vm.mid, + "divergence_bps": divergence_bps, + "scan_seq": int(scan_seq), + "venue_seq": int(vm.seq), + "mono_ns": int(now), + }) + emitted += 1 + self.rows_emitted += emitted + return emitted diff --git a/prod/clean_arch/violet/observe_guard.py b/prod/clean_arch/violet/observe_guard.py new file mode 100644 index 0000000..ad57123 --- /dev/null +++ b/prod/clean_arch/violet/observe_guard.py @@ -0,0 +1,71 @@ +"""VIOLET observe-only hard guard (Stage V1). + +``ObserveOnlyVenue`` wraps any venue adapter and makes order placement +structurally impossible: ``submit``/``submit_async``/``cancel``/ +``cancel_async`` raise ``ObserveOnlyViolation`` and log CRITICAL. Every +other attribute (including assignment — e.g. the runtime's +``venue._kernel_ref = kernel``) delegates to the wrapped adapter, so +account streams, reconcile reads and connection management work unchanged. + +This is the hard guarantee, independent of policy configuration: even if a +policy step were ever wired into the V1 runtime by mistake, no order can +reach the exchange. +""" + +from __future__ import annotations + +import logging +from typing import Any + +LOGGER = logging.getLogger("violet.observe_guard") + +_BLOCKED = ("submit", "submit_async", "cancel", "cancel_async") +_SELF_ATTRS = ("_inner", "_logger") + + +class ObserveOnlyViolation(RuntimeError): + """An order-placement call reached the observe-only venue guard.""" + + +class ObserveOnlyVenue: + """Delegating wrapper that refuses order placement.""" + + def __init__(self, inner: Any, logger: logging.Logger | None = None) -> None: + object.__setattr__(self, "_inner", inner) + object.__setattr__(self, "_logger", logger or LOGGER) + + # -- blocked surface ----------------------------------------------------- + + def _refuse(self, name: str, *args: Any, **kwargs: Any) -> None: + msg = ( + f"OBSERVE-ONLY VIOLATION: venue.{name}() called — VIOLET V1 must " + f"never place or cancel orders (args={args!r:.200})" + ) + object.__getattribute__(self, "_logger").critical(msg) + raise ObserveOnlyViolation(msg) + + def submit(self, *a: Any, **k: Any) -> Any: + self._refuse("submit", *a, **k) + + async def submit_async(self, *a: Any, **k: Any) -> Any: + self._refuse("submit_async", *a, **k) + + def cancel(self, *a: Any, **k: Any) -> Any: + self._refuse("cancel", *a, **k) + + async def cancel_async(self, *a: Any, **k: Any) -> Any: + self._refuse("cancel_async", *a, **k) + + # -- delegation ---------------------------------------------------------- + + def __getattr__(self, name: str) -> Any: + return getattr(object.__getattribute__(self, "_inner"), name) + + def __setattr__(self, name: str, value: Any) -> None: + if name in _SELF_ATTRS: + object.__setattr__(self, name, value) + else: + setattr(object.__getattribute__(self, "_inner"), name, value) + + def __repr__(self) -> str: # pragma: no cover — debug helper + return f"ObserveOnlyVenue({object.__getattribute__(self, '_inner')!r})" diff --git a/prod/clean_arch/violet/test_violet_dark_idle.py b/prod/clean_arch/violet/test_violet_dark_idle.py new file mode 100644 index 0000000..047209b --- /dev/null +++ b/prod/clean_arch/violet/test_violet_dark_idle.py @@ -0,0 +1,109 @@ +"""V1: dark-idle behavior — no keys ⇒ idle loop, exec adapter never built.""" + +from __future__ import annotations + +import asyncio +import sys + +sys.path.insert(0, "/mnt/dolphinng5_predict") +sys.path.insert(0, "/mnt/dolphinng5_predict/nautilus_dolphin") + +import pytest + + +def _clear_keys(monkeypatch): + monkeypatch.delenv("BINGX_VIOLET_API_KEY", raising=False) + monkeypatch.delenv("BINGX_VIOLET_SECRET_KEY", raising=False) + + +def test_keys_present_detection(monkeypatch): + import prod.launch_dolphin_violet as lv + + _clear_keys(monkeypatch) + assert lv._violet_keys_present() is False + monkeypatch.setenv("BINGX_VIOLET_API_KEY", "k") + assert lv._violet_keys_present() is False # secret still missing + monkeypatch.setenv("BINGX_VIOLET_SECRET_KEY", "s") + assert lv._violet_keys_present() is True + monkeypatch.setenv("BINGX_VIOLET_API_KEY", " ") + assert lv._violet_keys_present() is False # whitespace is absent + + +def test_dark_run_never_builds_exec_adapter(monkeypatch): + """Without keys, run() must enter the dark loop and never construct the + observe runtime (and therefore never the BingX exec adapter).""" + import prod.launch_dolphin_violet as lv + + _clear_keys(monkeypatch) + monkeypatch.setenv("DOLPHIN_VIOLET_DARK_DIVERGENCE", "0") # strict idle + monkeypatch.setattr(lv, "_preflight_clickhouse", lambda: []) + + def boom(): + raise AssertionError("observe runtime must not be built while DARK") + + monkeypatch.setattr(lv, "_build_observe_runtime", boom) + + warnings: list[str] = [] + monkeypatch.setattr( + lv.LOGGER, "warning", lambda msg, *a: warnings.append(msg % a if a else str(msg)) + ) + + async def drive(): + task = asyncio.ensure_future(lv.run()) + await asyncio.sleep(0.3) # boot path reaches the dark loop + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + asyncio.run(drive()) + assert any("VIOLET DARK" in w for w in warnings), warnings + + +def test_missing_tables_idles_dark_naming_apply_cmd(monkeypatch): + import prod.launch_dolphin_violet as lv + + _clear_keys(monkeypatch) + monkeypatch.setattr(lv, "_preflight_clickhouse", lambda: ["violet_feed_divergence"]) + crits: list[str] = [] + monkeypatch.setattr( + lv.LOGGER, "critical", lambda msg, *a: crits.append(msg % a if a else str(msg)) + ) + + async def drive(): + task = asyncio.ensure_future(lv.run()) + await asyncio.sleep(0.3) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + asyncio.run(drive()) + assert crits and "apply_violet_ddl.py" in crits[0] + assert "violet_feed_divergence" in crits[0] + + +def test_violet_env_hard_sets_namespaces_and_snapshot_path(monkeypatch): + import prod.launch_dolphin_violet as lv + import prod.clean_arch.runtime.pink_direct as pd + + monkeypatch.setenv("DOLPHIN_STATE_MAP", "DOLPHIN_STATE_PINK") # poisoned env + original = pd._KERNEL_STATE_PATH + try: + lv._apply_violet_env() + import os + + assert os.environ["DOLPHIN_STATE_MAP"] == "DOLPHIN_STATE_VIOLET" + assert os.environ["DITA_V2_PREFIX"] == "violet" + assert os.environ["DOLPHIN_BINGX_ALLOW_MAINNET"] == "0" + # kernel snapshot file repointed away from PINK's fixed path + assert pd._KERNEL_STATE_PATH == lv._KERNEL_STATE_PATH_VIOLET + assert "pink" not in str(pd._KERNEL_STATE_PATH) + finally: + pd._KERNEL_STATE_PATH = original + + +if __name__ == "__main__": + raise SystemExit(pytest.main([__file__, "-v"])) diff --git a/prod/clean_arch/violet/test_violet_divergence.py b/prod/clean_arch/violet/test_violet_divergence.py new file mode 100644 index 0000000..fb31fc7 --- /dev/null +++ b/prod/clean_arch/violet/test_violet_divergence.py @@ -0,0 +1,119 @@ +"""V1: FeedDivergenceMonitor — row shape vs DDL, sign, staleness, seqs.""" + +from __future__ import annotations + +import re +import sys +from pathlib import Path +from types import SimpleNamespace + +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest + +from prod.clean_arch.violet.clock import PlaneClock, mono_ns +from prod.clean_arch.violet.divergence import ( + FeedDivergenceMonitor, + from_bingx_symbol, + to_bingx_symbol, +) + +DDL_PATH = Path( + "/mnt/dolphinng5_predict/prod/clickhouse/violet/20_violet_feed_divergence.sql" +) + + +def _mk(sink_rows, venue_budget_ns=2_000_000_000): + return FeedDivergenceMonitor( + sink=lambda table, row: sink_rows.append((table, row)), + scan_clock=PlaneClock("scan", 12_000_000_000), + venue_clock=PlaneClock("venue", venue_budget_ns), + session_id="sess-test", + ) + + +def _snapshot(assets, prices): + return SimpleNamespace(scan_payload={"assets": assets, "asset_prices": prices}) + + +def test_symbol_mapping_round_trip(): + assert to_bingx_symbol("FETUSDT") == "FET-USDT" + assert to_bingx_symbol("FET-USDT") == "FET-USDT" # idempotent + assert from_bingx_symbol("FET-USDT") == "FETUSDT" + + +def test_row_keys_exactly_match_ddl_columns(): + """Parse the shipped DDL: emitted row keys must equal the column set.""" + ddl = DDL_PATH.read_text() + cols = set(re.findall(r"`(\w+)`", ddl)) + rows = [] + m = _mk(rows) + m.on_venue_tick("FET-USDT", 0.1877, 0.1879) + m.on_scan(_snapshot(["FETUSDT"], [0.2176])) + assert len(rows) == 1 + table, row = rows[0] + assert table == "violet_feed_divergence" + assert set(row.keys()) == cols, (set(row.keys()) ^ cols) + + +def test_bps_sign_convention_venue_above_scan_positive(): + rows = [] + m = _mk(rows) + m.on_venue_tick("BTC-USDT", 101.0, 101.0) + m.on_scan(_snapshot(["BTCUSDT"], [100.0])) + assert rows[0][1]["divergence_bps"] == pytest.approx(100.0) # +1% = +100bps + rows.clear() + m.on_venue_tick("BTC-USDT", 99.0, 99.0) + m.on_scan(_snapshot(["BTCUSDT"], [100.0])) + assert rows[0][1]["divergence_bps"] == pytest.approx(-100.0) + + +def test_fet_incident_magnitude(): + """The motivating case: scan 0.2176 vs venue ~0.1878 ⇒ ~ -1369 bps.""" + rows = [] + m = _mk(rows) + m.on_venue_tick("FET-USDT", 0.1877, 0.1879) + m.on_scan(_snapshot(["FETUSDT"], [0.2176])) + bps = rows[0][1]["divergence_bps"] + assert bps == pytest.approx((0.1878 - 0.2176) / 0.2176 * 1e4, rel=1e-6) + assert bps < -1300 + + +def test_stale_venue_mid_suppressed(): + rows = [] + m = _mk(rows, venue_budget_ns=1) # everything is stale + m.on_venue_tick("BTC-USDT", 100.0, 100.0) + import time + + time.sleep(0.001) + m.on_scan(_snapshot(["BTCUSDT"], [100.0])) + assert rows == [] # no phantom divergence + + +def test_seq_propagation_and_no_mid_no_row(): + rows = [] + m = _mk(rows) + m.on_venue_tick("BTC-USDT", 100.0, 100.0) + m.on_venue_tick("BTC-USDT", 100.2, 100.2) # venue_seq advances to 2 + m.on_scan(_snapshot(["BTCUSDT", "ETHUSDT"], [100.0, 2000.0])) + assert len(rows) == 1 # ETH has no venue mid + row = rows[0][1] + assert row["venue_seq"] == 2 + assert row["scan_seq"] == 1 + m.on_scan(_snapshot(["BTCUSDT"], [100.1])) + assert rows[-1][1]["scan_seq"] == 2 + + +def test_garbage_inputs_ignored(): + rows = [] + m = _mk(rows) + m.on_venue_tick("BTC-USDT", 0.0, -1.0) # invalid quotes ignored + m.on_venue_tick("BTC-USDT", "x", None) # type garbage ignored + m.on_scan(_snapshot(["BTCUSDT"], ["nan-ish", 0.0])) + m.on_scan(SimpleNamespace(scan_payload=None)) + m.on_scan(SimpleNamespace()) # no payload attr at all + assert rows == [] + + +if __name__ == "__main__": + raise SystemExit(pytest.main([__file__, "-v"])) diff --git a/prod/clean_arch/violet/test_violet_observe_guard.py b/prod/clean_arch/violet/test_violet_observe_guard.py new file mode 100644 index 0000000..4a95c9a --- /dev/null +++ b/prod/clean_arch/violet/test_violet_observe_guard.py @@ -0,0 +1,121 @@ +"""V1: ObserveOnlyVenue — refusal + delegation contract.""" + +from __future__ import annotations + +import asyncio +import sys + +sys.path.insert(0, "/mnt/dolphinng5_predict") +sys.path.insert(0, "/mnt/dolphinng5_predict/nautilus_dolphin") + +import pytest + +from prod.clean_arch.violet.observe_guard import ObserveOnlyVenue, ObserveOnlyViolation + + +class _Inner: + def __init__(self): + self.connected = False + self.calls = [] + + def submit(self, intent): + self.calls.append(("submit", intent)) + return ["should-never-happen"] + + async def submit_async(self, intent): + self.calls.append(("submit_async", intent)) + return ["should-never-happen"] + + def cancel(self, order, reason=""): + self.calls.append(("cancel", order)) + return [] + + async def cancel_async(self, order, reason=""): + self.calls.append(("cancel_async", order)) + return [] + + async def connect(self): + self.connected = True + + async def subscribe(self): + if False: # pragma: no cover — generator shape + yield None + + +@pytest.mark.asyncio +async def test_submit_and_cancel_raise_and_never_reach_inner(): + inner = _Inner() + guard = ObserveOnlyVenue(inner) + with pytest.raises(ObserveOnlyViolation): + guard.submit({"x": 1}) + with pytest.raises(ObserveOnlyViolation): + guard.cancel(object()) + with pytest.raises(ObserveOnlyViolation): + await guard.submit_async({}) + with pytest.raises(ObserveOnlyViolation): + await guard.cancel_async(object()) + assert inner.calls == [] + + +@pytest.mark.asyncio +async def test_attribute_get_delegates(): + inner = _Inner() + guard = ObserveOnlyVenue(inner) + assert guard.connected is False + await guard.connect() + assert inner.connected is True and guard.connected is True + + +def test_attribute_set_delegates_to_inner(): + """The runtime does `venue._kernel_ref = kernel` — assignment must land + on the wrapped adapter, not the guard.""" + inner = _Inner() + guard = ObserveOnlyVenue(inner) + sentinel = object() + guard._kernel_ref = sentinel + assert getattr(inner, "_kernel_ref") is sentinel + assert not hasattr(type(guard), "_kernel_ref") + + +def test_wrapped_mock_venue_full_kernel_drive_never_submits(): + """Real ExecutionKernel over a wrapped MOCK venue: an ENTER intent is + refused at the venue boundary; the kernel's venue-submit-failure + rollback converts the refusal into a synthetic REJECT (slot returns to + IDLE) and the inner venue NEVER sees the order.""" + from prod.clean_arch.dita_v2.launcher import build_launcher_bundle + from prod.clean_arch.dita_v2.contracts import ( + KernelCommandType, + KernelIntent, + TradeSide, + TradeStage, + ) + from datetime import datetime, timezone + + bundle = build_launcher_bundle(venue_mode="MOCK", max_slots=1) + kernel = bundle.kernel + inner = kernel.venue + submitted = [] + orig_submit = inner.submit + inner.submit = lambda *a, **k: submitted.append(a) or orig_submit(*a, **k) + kernel.venue = ObserveOnlyVenue(inner) + + intent = KernelIntent( + timestamp=datetime.now(timezone.utc), + intent_id="i-1", trade_id="T-OBS-1", slot_id=0, + asset="BTCUSDT", side=TradeSide.SHORT, + action=KernelCommandType.ENTER, + reference_price=100.0, target_size=1.0, leverage=1.0, + exit_leg_ratios=(1.0,), reason="test", metadata={}, + stage=TradeStage.INTENT_CREATED, + ) + outcome = kernel.process_intent(intent) + assert submitted == [] # nothing reached the venue + assert kernel.slot(0).is_free() # FSM rolled back to IDLE + assert any( + "VENUE_SUBMIT_ERROR" in str(e.reason) + for e in outcome.emitted_events + ), "expected the synthetic REJECT from the guard refusal" + + +if __name__ == "__main__": + raise SystemExit(pytest.main([__file__, "-v"])) diff --git a/prod/launch_dolphin_violet.py b/prod/launch_dolphin_violet.py new file mode 100644 index 0000000..753cc64 --- /dev/null +++ b/prod/launch_dolphin_violet.py @@ -0,0 +1,322 @@ +#!/usr/bin/env python3 +"""DOLPHIN VIOLET launcher — Stage V1: observe-only runtime (DARK until keys). + +VIOLET is the staged rebuild of PINK-on-DITAv2 (charter: +violet-subsecond-rebuild-plan). V1 scope, all enforced here: + +- OWN namespaces everywhere: ClickHouse ``dolphin_violet``, Hazelcast + ``DOLPHIN_STATE_VIOLET``/``DOLPHIN_PNL_VIOLET``, Zinc prefix ``violet``, + trader id ``DOLPHIN-VIOLET-001``. Namespace isolation is a stage gate. +- OBSERVE-ONLY: the venue adapter is wrapped in ``ObserveOnlyVenue`` (order + placement raises) AND the policy step loop is never invoked. The account + stream folds fills/balances into the kernel and journals them; nothing is + ever submitted. +- DARK by default: without ``BINGX_VIOLET_API_KEY``/``BINGX_VIOLET_SECRET_KEY`` + the service idles with a periodic WARNING. The scan-vs-venue divergence + monitor (public data only) runs even while dark + (``DOLPHIN_VIOLET_DARK_DIVERGENCE=1`` default). +- DDL discipline: this launcher NEVER creates ClickHouse objects. If the + ``dolphin_violet`` tables are missing it logs CRITICAL naming the apply + command (prod/clickhouse/violet/apply_violet_ddl.py) and idles dark. + +Code-reuse decision (V1): import-and-parameterize from +``prod.launch_dolphin_pink`` — its import is side-effect-free (sys.path + +dotenv only; ``_apply_pink_env`` is never invoked at import). A shared +launcher-core extraction is deliberately deferred to V2, when VIOLET starts +trading and the loops converge (V2 work item per the approved plan). +""" + +from __future__ import annotations + +import asyncio +import logging +import os +import sys +import urllib.parse +import urllib.request +import uuid +from pathlib import Path + +sys.path.insert(0, "/mnt/dolphinng5_predict") +sys.path.insert(0, "/mnt/dolphinng5_predict/nautilus_dolphin") + +# Side-effect-free helpers shared with the PINK launcher (verified: importing +# the module mutates no environment). +from prod.launch_dolphin_pink import ( # noqa: E402 + _build_data_feed, + _env_bool, + _resolve_bingx_allow_mainnet, + _resolve_bingx_environment, + _resolve_bingx_exchange_leverage_cap, + _resolve_bingx_recv_window_ms, +) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s violet %(name)s: %(message)s", +) +LOGGER = logging.getLogger("launch_dolphin_violet") + +VIOLET_DEFAULTS = { + "strategy_name": "violet", + "state_map": "DOLPHIN_STATE_VIOLET", + "pnl_map": "DOLPHIN_PNL_VIOLET", + "trader_id": "DOLPHIN-VIOLET-001", + "journal_strategy": "violet", + "journal_db": "dolphin_violet", +} + +# Tables that must exist BEFORE this service may emit a single row. +REQUIRED_TABLES = ( + "account_events", "status_snapshots", "trade_events", "trade_exit_legs", + "trade_reconstruction", "position_state", "policy_events", + "anomaly_events", "v7_decision_events", "violet_feed_divergence", +) + +DDL_APPLY_CMD = ( + "python3 /mnt/dolphinng5_predict/prod/clickhouse/violet/apply_violet_ddl.py" +) + +_KERNEL_STATE_PATH_VIOLET = Path("/tmp/.violet_kernel_state.json") + + +def _apply_violet_env() -> None: + """HARD-set (not setdefault) every namespace knob before any import that + reads them. DITA_V2_PREFIX drives Zinc region names (violet_intent/ + _state/_control — disjoint from pink by construction).""" + os.environ["DITA_V2_PREFIX"] = "violet" + os.environ["DOLPHIN_STRATEGY_NAME"] = VIOLET_DEFAULTS["strategy_name"] + os.environ["DOLPHIN_STATE_MAP"] = VIOLET_DEFAULTS["state_map"] + os.environ["DOLPHIN_PNL_MAP"] = VIOLET_DEFAULTS["pnl_map"] + os.environ["DOLPHIN_JOURNAL_STRATEGY"] = VIOLET_DEFAULTS["journal_strategy"] + os.environ["DOLPHIN_JOURNAL_DB"] = VIOLET_DEFAULTS["journal_db"] + os.environ["DOLPHIN_TRADER_ID"] = VIOLET_DEFAULTS["trader_id"] + os.environ["DOLPHIN_BINGX_ENV"] = os.environ.get("DOLPHIN_BINGX_ENV", "VST") + os.environ["DOLPHIN_BINGX_ALLOW_MAINNET"] = "0" # V1 is VST-only, hard + # Repoint the kernel snapshot file: pink_direct's module constant is a + # fixed path (/tmp/.pink_kernel_state.json) — without this, VIOLET would + # restore PINK's kernel state and overwrite it on fills. + # Deferred to V2 (approved plan): parameterize the path in pink_direct. + import prod.clean_arch.runtime.pink_direct as _pd + + _pd._KERNEL_STATE_PATH = _KERNEL_STATE_PATH_VIOLET + + +def _violet_keys_present() -> bool: + return bool( + (os.environ.get("BINGX_VIOLET_API_KEY") or "").strip() + and (os.environ.get("BINGX_VIOLET_SECRET_KEY") or "").strip() + ) + + +def build_bingx_exec_client_config(): + """VIOLET's exchange config — its OWN credentials, violet journaling.""" + from prod.clean_arch.adapters.bingx_direct import ( + BingxExecClientConfig, + BingxInstrumentProviderConfig, + ) + + return BingxExecClientConfig( + api_key=os.environ.get("BINGX_VIOLET_API_KEY"), + secret_key=os.environ.get("BINGX_VIOLET_SECRET_KEY"), + environment=_resolve_bingx_environment(), + allow_mainnet=False, # V1 hard guarantee + recv_window_ms=_resolve_bingx_recv_window_ms(), + default_leverage=int(os.environ.get("DOLPHIN_BINGX_DEFAULT_LEVERAGE", "1")), + exchange_leverage_cap=_resolve_bingx_exchange_leverage_cap(), + prefer_websocket=False, + sizing_mode=os.environ.get("DOLPHIN_BINGX_SIZING_MODE", "testnet"), + journal_strategy=VIOLET_DEFAULTS["journal_strategy"], + journal_db=VIOLET_DEFAULTS["journal_db"], + instrument_provider=BingxInstrumentProviderConfig(load_all=True), + ) + + +def _preflight_clickhouse() -> list[str]: + """SELECT-probe every required dolphin_violet table. NEVER creates.""" + missing: list[str] = [] + ch_url = os.environ.get("CH_URL", "http://localhost:8123") + user = os.environ.get("CH_USER", "dolphin") + password = os.environ.get("CH_PASS", "dolphin_ch_2026") + for table in REQUIRED_TABLES: + q = urllib.parse.quote_plus( + f"SELECT 0 FROM dolphin_violet.{table} LIMIT 0" + ) + req = urllib.request.Request(f"{ch_url}/?query={q}", method="POST") + req.add_header("X-ClickHouse-User", user) + req.add_header("X-ClickHouse-Key", password) + try: + urllib.request.urlopen(req, timeout=5).read() + except Exception: + missing.append(table) + return missing + + +def _build_observe_runtime(): + """Bundle + observe guard + violet persistence/HZ. NO policy execution.""" + from prod.ch_writer import ch_put_violet + from prod.clean_arch.dita.decision import ( + DecisionConfig, + DecisionEngine, + IntentEngine, + ) + from prod.clean_arch.dita_v2.hazelcast_projection import PinkHzStateWriter + from prod.clean_arch.dita_v2.launcher import build_launcher_bundle + from prod.clean_arch.persistence.pink_clickhouse import PinkClickHousePersistence + from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime + from prod.clean_arch.violet.observe_guard import ObserveOnlyVenue + + bundle = build_launcher_bundle( + venue_mode="BINGX", max_slots=1, bingx_config=build_bingx_exec_client_config() + ) + kernel = bundle.kernel + # HARD observe-only guarantee, independent of policy config. + kernel.venue = ObserveOnlyVenue(kernel.venue) + + cfg = DecisionConfig( + vel_div_threshold=-1.0, # unreachable — V1 never decides anyway + allow_short=False, + allow_long=False, + policy_version="violet_observe_v1", + ) + persistence = PinkClickHousePersistence( + kernel.account, sink=ch_put_violet, v7_sink=ch_put_violet + ) + hz_state_writer = None + try: + hz_state_writer = PinkHzStateWriter( + cluster=os.environ.get("HZ_CLUSTER", "dolphin"), + host=os.environ.get("HZ_HOST", "localhost:5701"), + state_map_name=VIOLET_DEFAULTS["state_map"], + pnl_map_name=VIOLET_DEFAULTS["pnl_map"], + ) + except Exception: + pass # HZ down → still observes; TUI falls back to CH + + return PinkDirectRuntime( + data_feed=_build_data_feed(), + kernel=kernel, + decision_engine=DecisionEngine(cfg), + intent_engine=IntentEngine(cfg), + persistence=persistence, + market_state_runtime=None, + hz_state_writer=hz_state_writer, + ) + + +def _build_divergence(sink=None): + from prod.ch_writer import ch_put_violet + from prod.clean_arch.violet.clock import ( + PlaneClock, + SCAN_STALENESS_NS, + VENUE_STALENESS_NS, + ) + from prod.clean_arch.violet.divergence import FeedDivergenceMonitor + + return FeedDivergenceMonitor( + sink=sink or ch_put_violet, + scan_clock=PlaneClock("scan", SCAN_STALENESS_NS), + venue_clock=PlaneClock("venue", VENUE_STALENESS_NS), + session_id=uuid.uuid4().hex, + ) + + +async def _divergence_driver(divergence, data_feed, poll_s: float) -> None: + """Shared scan-sampling loop for both DARK and observe modes.""" + started = False + symbol = os.environ.get("DOLPHIN_VIOLET_SNAPSHOT_SYMBOL", "BTCUSDT") + mode = os.environ.get("DOLPHIN_VIOLET_VENUE_MID_MODE", "ws").lower() + while True: + try: + snapshot = await data_feed.get_latest_snapshot(symbol) + if snapshot is not None: + payload = getattr(snapshot, "scan_payload", None) or {} + assets = payload.get("assets") or [] + if not started and assets: + await divergence.start( + [str(a).upper() for a in assets], mode=mode + ) + started = True + if started: + divergence.on_scan(snapshot) + except Exception as exc: # noqa: BLE001 — sampling must never die + LOGGER.debug("divergence scan sample failed: %s", exc) + await asyncio.sleep(poll_s) + + +async def _dark_idle_loop(divergence_task: asyncio.Task | None) -> None: + msg = ( + "VIOLET DARK — BINGX_VIOLET_API_KEY/BINGX_VIOLET_SECRET_KEY not set; " + "idling (divergence sampling %s)." + % ("running" if divergence_task is not None else "off") + ) + LOGGER.warning(msg) + while True: + await asyncio.sleep(300) + LOGGER.warning(msg) + + +async def run() -> None: + _apply_violet_env() + poll_s = float(os.environ.get("DOLPHIN_VIOLET_POLL_INTERVAL_SEC", "1.0")) + + missing = _preflight_clickhouse() + if missing: + LOGGER.critical( + "dolphin_violet tables MISSING: %s — this launcher never creates " + "tables (DDL-before-code discipline). Run: %s — idling dark.", + missing, DDL_APPLY_CMD, + ) + while True: # dark, emitting nothing + await asyncio.sleep(300) + LOGGER.critical("still missing dolphin_violet tables: %s", missing) + + divergence_task = None + if _env_bool("DOLPHIN_VIOLET_DARK_DIVERGENCE", True) or _violet_keys_present(): + divergence = _build_divergence() + divergence_task = asyncio.create_task( + _divergence_driver(divergence, _build_data_feed(), poll_s), + name="violet_divergence_driver", + ) + + if not _violet_keys_present(): + await _dark_idle_loop(divergence_task) + return + + runtime = _build_observe_runtime() + LOGGER.info( + "VIOLET observe-only runtime starting: trader=%s zinc_prefix=violet " + "hz=%s/%s ch=dolphin_violet", + VIOLET_DEFAULTS["trader_id"], VIOLET_DEFAULTS["state_map"], + VIOLET_DEFAULTS["pnl_map"], + ) + await runtime.connect() + symbol = os.environ.get("DOLPHIN_VIOLET_SNAPSHOT_SYMBOL", "BTCUSDT") + account_sync_s = float(os.environ.get("DOLPHIN_VIOLET_ACCOUNT_SYNC_SEC", "60")) + last_sync = 0.0 + loop = asyncio.get_running_loop() + while True: + try: + snapshot = await runtime.data_feed.get_latest_snapshot(symbol) + now = loop.time() + if snapshot is not None and now - last_sync >= account_sync_s: + await runtime.reconcile_account(snapshot) + last_sync = now + # OBSERVE-ONLY: runtime.step() is NEVER called — no decisions, + # no intents, no orders. The account stream + persistence run + # as connect()-spawned background tasks. + except Exception as exc: # noqa: BLE001 — observe loop must survive + LOGGER.warning("observe loop iteration failed: %s", exc) + await asyncio.sleep(poll_s) + + +def main() -> int: + try: + asyncio.run(run()) + except KeyboardInterrupt: + LOGGER.info("VIOLET shutdown (SIGINT)") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/prod/supervisor/dolphin-supervisord.conf b/prod/supervisor/dolphin-supervisord.conf index 5bac60f..ee973b3 100644 --- a/prod/supervisor/dolphin-supervisord.conf +++ b/prod/supervisor/dolphin-supervisord.conf @@ -106,6 +106,28 @@ stdout_logfile_backups=10 redirect_stderr=true environment=PYTHONPATH="/mnt/dolphinng5_predict:/mnt/dolphinng5_predict/nautilus_dolphin",PREFECT_API_URL="http://localhost:4200/api",PYTHONUNBUFFERED="1",DOLPHIN_DATA_VENUE="BINANCE",DOLPHIN_EXEC_VENUE="BINGX",DOLPHIN_BINGX_ENV="VST",DOLPHIN_BINGX_ALLOW_MAINNET="0",DOLPHIN_TRADER_ID="DOLPHIN-PINK-001",DOLPHIN_BINGX_DEFAULT_LEVERAGE="1",DOLPHIN_BINGX_PREFER_WEBSOCKET="1",DOLPHIN_BINGX_RECV_WINDOW_MS="60000",DOLPHIN_PINK_PHASE="single_leg",DOLPHIN_PINK_VOL_P60_THRESHOLD="0.00008000",DOLPHIN_BINGX_BASE_URL_BACKUP="https://open-api-vst.bingx.com",DOLPHIN_PINK_EXEC_STYLE="maker_both",DOLPHIN_PINK_MAKER_ENTRY_TTL_S="8",DOLPHIN_PINK_MAKER_EXIT_TTL_S="5",DOLPHIN_PINK_MAKER_ENTRY_MISS="retry",DOLPHIN_PINK_MAKER_ENTRY_RETRIES="1",DOLPHIN_PINK_MAKER_RETRY_EXHAUST="skip",DOLPHIN_PINK_MAKER_OFFSET_TICKS="1",DOLPHIN_PINK_POST_ONLY="1" +; ── VIOLET — Stage V1 observe-only rebuild runtime ────────────────────────── +; DARK by design: BINGX_VIOLET_API_KEY/SECRET deliberately NOT set here — +; they belong in /mnt/dolphinng5_predict/.env once the operator provisions +; the dedicated VST subaccount. Until then the service idles with a periodic +; WARNING; the (public-data) scan-vs-venue divergence sampler still runs. +; Namespaces: CH dolphin_violet, HZ DOLPHIN_STATE_VIOLET, Zinc prefix violet. +[program:dolphin_violet] +command=/home/dolphin/siloqy_env/bin/python3 /mnt/dolphinng5_predict/prod/launch_dolphin_violet.py +directory=/mnt/dolphinng5_predict/prod +autostart=false +autorestart=true +startsecs=15 +startretries=3 +stopwaitsecs=30 +stopasgroup=true +killasgroup=true +stdout_logfile=%(ENV_DOLPHIN_LOG_ROOT)s/supervisor/dolphin_live_violet.log +stdout_logfile_maxbytes=50MB +stdout_logfile_backups=10 +redirect_stderr=true +environment=PYTHONPATH="/mnt/dolphinng5_predict:/mnt/dolphinng5_predict/nautilus_dolphin",PYTHONUNBUFFERED="1",DOLPHIN_DATA_VENUE="BINANCE",DOLPHIN_EXEC_VENUE="BINGX",DOLPHIN_BINGX_ENV="VST",DOLPHIN_BINGX_ALLOW_MAINNET="0",DOLPHIN_TRADER_ID="DOLPHIN-VIOLET-001",DITA_V2_PREFIX="violet",DOLPHIN_BINGX_RECV_WINDOW_MS="60000",DOLPHIN_VIOLET_DARK_DIVERGENCE="1",DOLPHIN_VIOLET_VENUE_MID_MODE="ws" + ; DITAv2 — supervised kernel, launched separately from the legacy PINK/BLUE stack. [program:dita_v2] command=/home/dolphin/siloqy_env/bin/python3 /mnt/dolphinng5_predict/prod/launch_dita_v2.py