"""VIOLET V1: scan-vs-venue price divergence monitor (the FET detector). The 2026-06-11 FET incident entered on a scan price of 0.2176 while the venue traded 0.1878 — a 15% phantom divergence the system had no way to see. This monitor continuously samples, per asset, the gap between the scan-plane price (the alpha-side eigen-scan universe) and the execution venue's mid (BingX bookTicker), and journals one row per (scan, asset) to ``dolphin_violet.violet_feed_divergence``. Venue mid sources: - WS (default): ``prod.bingx.market_stream.BingxMarketStream`` bookTicker subscription per symbol; - REST fallback (``DOLPHIN_VIOLET_VENUE_MID_MODE=rest``): 1 s poll of the public quote endpoint — kept because the WS client is unexercised in-tree; harden or drop at V2. Only PUBLIC data — runs even when VIOLET is DARK (no API keys). """ from __future__ import annotations import asyncio import logging import time from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional from .clock import PlaneClock, mono_ns LOGGER = logging.getLogger("violet.divergence") Sink = Callable[[str, dict], None] def to_bingx_symbol(asset: str) -> str: """BTCUSDT → BTC-USDT (idempotent for already-hyphenated input).""" a = str(asset or "").upper() if "-" in a or not a.endswith("USDT"): return a return f"{a[:-4]}-USDT" def from_bingx_symbol(symbol: str) -> str: """BTC-USDT → BTCUSDT.""" return str(symbol or "").upper().replace("-", "") @dataclass class _VenueMid: mid: float mono: int seq: int class FeedDivergenceMonitor: """Samples scan-vs-venue divergence and journals it. ``on_venue_tick`` is fed by the WS/REST source; ``on_scan`` is called by the launcher's main loop with each data-feed snapshot. A divergence row is emitted only when the venue mid is fresher than the venue plane's staleness budget — a stale mid must never masquerade as live divergence. """ TABLE = "violet_feed_divergence" def __init__( self, *, sink: Sink, scan_clock: PlaneClock, venue_clock: PlaneClock, session_id: str, table: str = TABLE, logger: logging.Logger | None = None, ) -> None: self.sink = sink self.scan_clock = scan_clock self.venue_clock = venue_clock self.session_id = str(session_id) self.table = table self.logger = logger or LOGGER self._mids: Dict[str, _VenueMid] = {} self._stream: Any = None self._stream_task: Optional[asyncio.Task] = None self._rest_task: Optional[asyncio.Task] = None self.rows_emitted = 0 # ── venue side ─────────────────────────────────────────────────────────── def on_venue_tick(self, symbol: str, bid: float, ask: float) -> None: try: b, a = float(bid), float(ask) except (TypeError, ValueError): return if not (b > 0 and a > 0): return seq = self.venue_clock.tick() self._mids[from_bingx_symbol(symbol)] = _VenueMid( mid=(b + a) / 2.0, mono=mono_ns(), seq=seq ) async def _on_ws_frame(self, payload: dict) -> None: """BingX bookTicker frame: {dataType: 'BTC-USDT@bookTicker', data: {... 'b': bid, 'a': ask ...}} (field names per BingX swap WS).""" try: data_type = str(payload.get("dataType", "") or "") if "@bookTicker" not in data_type: return symbol = data_type.split("@", 1)[0] data = payload.get("data") or {} bid = data.get("b") or data.get("bidPrice") or data.get("bestBid") ask = data.get("a") or data.get("askPrice") or data.get("bestAsk") if bid is None or ask is None: return self.on_venue_tick(symbol, float(bid), float(ask)) except Exception as exc: # noqa: BLE001 — never kill the stream self.logger.debug("divergence WS frame parse skipped: %s", exc) async def _rest_poll_loop(self, symbols: List[str], rest_base_url: str) -> None: import aiohttp url = rest_base_url.rstrip("/") + "/openApi/swap/v2/quote/bookTicker" async with aiohttp.ClientSession() as session: while True: for sym in symbols: try: async with session.get( url, params={"symbol": to_bingx_symbol(sym)}, timeout=aiohttp.ClientTimeout(total=5), ) as resp: body = await resp.json(content_type=None) data = (body or {}).get("data") or {} # quote payloads use bidPrice/askPrice (REST shape) book = data.get("book_ticker") or data bid = book.get("bid_price") or book.get("bidPrice") or book.get("b") ask = book.get("ask_price") or book.get("askPrice") or book.get("a") if bid is not None and ask is not None: self.on_venue_tick(sym, float(bid), float(ask)) except Exception as exc: # noqa: BLE001 self.logger.debug("divergence REST poll %s failed: %s", sym, exc) await asyncio.sleep(1.0) async def start( self, symbols: List[str], *, mode: str = "ws", ws_url: str = "wss://open-api-swap.bingx.com/swap-market", rest_base_url: str = "https://open-api.bingx.com", ) -> None: if mode == "rest": self._rest_task = asyncio.create_task( self._rest_poll_loop(symbols, rest_base_url), name="violet_divergence_rest", ) self.logger.info("divergence: REST mid poll started for %d symbols", len(symbols)) return from prod.bingx.market_stream import BingxMarketStream self._stream = BingxMarketStream(ws_url=ws_url, on_event=self._on_ws_frame) for sym in symbols: self._stream.subscribe(f"{to_bingx_symbol(sym)}@bookTicker") self._stream_task = asyncio.create_task( self._stream.run_forever(), name="violet_divergence_ws" ) self.logger.info("divergence: WS bookTicker started for %d symbols", len(symbols)) async def stop(self) -> None: for task in (self._stream_task, self._rest_task): if task is not None: task.cancel() try: await task except (asyncio.CancelledError, Exception): pass self._stream_task = self._rest_task = None # ── scan side ───────────────────────────────────────────────────────────── def on_scan(self, snapshot: Any) -> int: """Called once per data-feed snapshot. Returns rows emitted.""" payload = getattr(snapshot, "scan_payload", None) if not isinstance(payload, dict): return 0 assets = payload.get("assets") or [] prices = payload.get("asset_prices") or [] if not (isinstance(assets, list) and isinstance(prices, list) and assets): return 0 scan_seq = self.scan_clock.tick() now = mono_ns() emitted = 0 for asset, price in zip(assets, prices): try: scan_price = float(price) except (TypeError, ValueError): continue if scan_price <= 0: continue key = str(asset).upper() vm = self._mids.get(key) if vm is None: continue # Stale venue mid must never masquerade as live divergence. if (now - vm.mono) > self.venue_clock.staleness_budget_ns: continue divergence_bps = (vm.mid - scan_price) / scan_price * 1e4 self.sink(self.table, { "ts": int(time.time() * 1000), # DateTime64(3) "session_id": self.session_id, "asset": key, "scan_price": scan_price, "venue_mid": vm.mid, "divergence_bps": divergence_bps, "scan_seq": int(scan_seq), "venue_seq": int(vm.seq), "mono_ns": int(now), }) emitted += 1 self.rows_emitted += emitted return emitted