Files
siloqy/prod/clean_arch/violet/divergence.py

221 lines
8.6 KiB
Python
Raw Normal View History

"""VIOLET V1: scan-vs-venue price divergence monitor (the FET detector).
The 2026-06-11 FET incident entered on a scan price of 0.2176 while the
venue traded 0.1878 a 15% phantom divergence the system had no way to
see. This monitor continuously samples, per asset, the gap between the
scan-plane price (the alpha-side eigen-scan universe) and the execution
venue's mid (BingX bookTicker), and journals one row per (scan, asset) to
``dolphin_violet.violet_feed_divergence``.
Venue mid sources:
- WS (default): ``prod.bingx.market_stream.BingxMarketStream``
bookTicker subscription per symbol;
- REST fallback (``DOLPHIN_VIOLET_VENUE_MID_MODE=rest``): 1 s poll of the
public quote endpoint kept because the WS client is unexercised
in-tree; harden or drop at V2.
Only PUBLIC data runs even when VIOLET is DARK (no API keys).
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional
from .clock import PlaneClock, mono_ns
LOGGER = logging.getLogger("violet.divergence")
Sink = Callable[[str, dict], None]
def to_bingx_symbol(asset: str) -> str:
"""BTCUSDT → BTC-USDT (idempotent for already-hyphenated input)."""
a = str(asset or "").upper()
if "-" in a or not a.endswith("USDT"):
return a
return f"{a[:-4]}-USDT"
def from_bingx_symbol(symbol: str) -> str:
"""BTC-USDT → BTCUSDT."""
return str(symbol or "").upper().replace("-", "")
@dataclass
class _VenueMid:
mid: float
mono: int
seq: int
class FeedDivergenceMonitor:
"""Samples scan-vs-venue divergence and journals it.
``on_venue_tick`` is fed by the WS/REST source; ``on_scan`` is called by
the launcher's main loop with each data-feed snapshot. A divergence row
is emitted only when the venue mid is fresher than the venue plane's
staleness budget a stale mid must never masquerade as live divergence.
"""
TABLE = "violet_feed_divergence"
def __init__(
self,
*,
sink: Sink,
scan_clock: PlaneClock,
venue_clock: PlaneClock,
session_id: str,
table: str = TABLE,
logger: logging.Logger | None = None,
) -> None:
self.sink = sink
self.scan_clock = scan_clock
self.venue_clock = venue_clock
self.session_id = str(session_id)
self.table = table
self.logger = logger or LOGGER
self._mids: Dict[str, _VenueMid] = {}
self._stream: Any = None
self._stream_task: Optional[asyncio.Task] = None
self._rest_task: Optional[asyncio.Task] = None
self.rows_emitted = 0
# ── venue side ───────────────────────────────────────────────────────────
def on_venue_tick(self, symbol: str, bid: float, ask: float) -> None:
try:
b, a = float(bid), float(ask)
except (TypeError, ValueError):
return
if not (b > 0 and a > 0):
return
seq = self.venue_clock.tick()
self._mids[from_bingx_symbol(symbol)] = _VenueMid(
mid=(b + a) / 2.0, mono=mono_ns(), seq=seq
)
async def _on_ws_frame(self, payload: dict) -> None:
"""BingX bookTicker frame: {dataType: 'BTC-USDT@bookTicker',
data: {... 'b': bid, 'a': ask ...}} (field names per BingX swap WS)."""
try:
data_type = str(payload.get("dataType", "") or "")
if "@bookTicker" not in data_type:
return
symbol = data_type.split("@", 1)[0]
data = payload.get("data") or {}
bid = data.get("b") or data.get("bidPrice") or data.get("bestBid")
ask = data.get("a") or data.get("askPrice") or data.get("bestAsk")
if bid is None or ask is None:
return
self.on_venue_tick(symbol, float(bid), float(ask))
except Exception as exc: # noqa: BLE001 — never kill the stream
self.logger.debug("divergence WS frame parse skipped: %s", exc)
async def _rest_poll_loop(self, symbols: List[str], rest_base_url: str) -> None:
import aiohttp
url = rest_base_url.rstrip("/") + "/openApi/swap/v2/quote/bookTicker"
async with aiohttp.ClientSession() as session:
while True:
for sym in symbols:
try:
async with session.get(
url, params={"symbol": to_bingx_symbol(sym)},
timeout=aiohttp.ClientTimeout(total=5),
) as resp:
body = await resp.json(content_type=None)
data = (body or {}).get("data") or {}
# quote payloads use bidPrice/askPrice (REST shape)
book = data.get("book_ticker") or data
bid = book.get("bid_price") or book.get("bidPrice") or book.get("b")
ask = book.get("ask_price") or book.get("askPrice") or book.get("a")
if bid is not None and ask is not None:
self.on_venue_tick(sym, float(bid), float(ask))
except Exception as exc: # noqa: BLE001
self.logger.debug("divergence REST poll %s failed: %s", sym, exc)
await asyncio.sleep(1.0)
async def start(
self,
symbols: List[str],
*,
mode: str = "ws",
ws_url: str = "wss://open-api-swap.bingx.com/swap-market",
rest_base_url: str = "https://open-api.bingx.com",
) -> None:
if mode == "rest":
self._rest_task = asyncio.create_task(
self._rest_poll_loop(symbols, rest_base_url),
name="violet_divergence_rest",
)
self.logger.info("divergence: REST mid poll started for %d symbols", len(symbols))
return
from prod.bingx.market_stream import BingxMarketStream
self._stream = BingxMarketStream(ws_url=ws_url, on_event=self._on_ws_frame)
for sym in symbols:
self._stream.subscribe(f"{to_bingx_symbol(sym)}@bookTicker")
self._stream_task = asyncio.create_task(
self._stream.run_forever(), name="violet_divergence_ws"
)
self.logger.info("divergence: WS bookTicker started for %d symbols", len(symbols))
async def stop(self) -> None:
for task in (self._stream_task, self._rest_task):
if task is not None:
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
pass
self._stream_task = self._rest_task = None
# ── scan side ─────────────────────────────────────────────────────────────
def on_scan(self, snapshot: Any) -> int:
"""Called once per data-feed snapshot. Returns rows emitted."""
payload = getattr(snapshot, "scan_payload", None)
if not isinstance(payload, dict):
return 0
assets = payload.get("assets") or []
prices = payload.get("asset_prices") or []
if not (isinstance(assets, list) and isinstance(prices, list) and assets):
return 0
scan_seq = self.scan_clock.tick()
now = mono_ns()
emitted = 0
for asset, price in zip(assets, prices):
try:
scan_price = float(price)
except (TypeError, ValueError):
continue
if scan_price <= 0:
continue
key = str(asset).upper()
vm = self._mids.get(key)
if vm is None:
continue
# Stale venue mid must never masquerade as live divergence.
if (now - vm.mono) > self.venue_clock.staleness_budget_ns:
continue
divergence_bps = (vm.mid - scan_price) / scan_price * 1e4
self.sink(self.table, {
"ts": int(time.time() * 1000), # DateTime64(3)
"session_id": self.session_id,
"asset": key,
"scan_price": scan_price,
"venue_mid": vm.mid,
"divergence_bps": divergence_bps,
"scan_seq": int(scan_seq),
"venue_seq": int(vm.seq),
"mono_ns": int(now),
})
emitted += 1
self.rows_emitted += emitted
return emitted