2026-06-12 16:09:04 +02:00
|
|
|
"""VIOLET V1: scan-vs-venue price divergence monitor (the FET detector).
|
|
|
|
|
|
|
|
|
|
The 2026-06-11 FET incident entered on a scan price of 0.2176 while the
|
|
|
|
|
venue traded 0.1878 — a 15% phantom divergence the system had no way to
|
|
|
|
|
see. This monitor continuously samples, per asset, the gap between the
|
|
|
|
|
scan-plane price (the alpha-side eigen-scan universe) and the execution
|
|
|
|
|
venue's mid (BingX bookTicker), and journals one row per (scan, asset) to
|
|
|
|
|
``dolphin_violet.violet_feed_divergence``.
|
|
|
|
|
|
|
|
|
|
Venue mid sources:
|
|
|
|
|
- WS (default): ``prod.bingx.market_stream.BingxMarketStream``
|
|
|
|
|
bookTicker subscription per symbol;
|
|
|
|
|
- REST fallback (``DOLPHIN_VIOLET_VENUE_MID_MODE=rest``): 1 s poll of the
|
|
|
|
|
public quote endpoint — kept because the WS client is unexercised
|
|
|
|
|
in-tree; harden or drop at V2.
|
|
|
|
|
|
|
|
|
|
Only PUBLIC data — runs even when VIOLET is DARK (no API keys).
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import logging
|
|
|
|
|
import time
|
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
from typing import Any, Callable, Dict, List, Optional
|
|
|
|
|
|
VIOLET V2a: V-TYPES domain layer + hypothesis properties + divergence reject-at-source
domain.py: refined scalar aliases (BarsHeld kills the bars_held=-106
UInt16 poison class by construction), DivergenceRow (DDL-shaped, frozen,
extra=forbid), ExecDriverSettings (env boundary for the V2 driver; ttl
override exists because the shared router clamps TTLs >= 0.5s),
ExecGateReport schema, beartype 'typed' decorator with
DOLPHIN_VIOLET_BEARTYPE=0 kill-switch.
divergence.py: rows now parse through DivergenceRow before the sink —
malformed rows die at the source with a rate-limited WARNING + counter,
never at the head of the CH spool.
Properties (hypothesis, derandomized): ExecutionRouter state machine
(fill/retry mutual exclusion via pop-semantics, R1 exit escalation same
trade_id, bounded retry chains, <=1 working ENTER), LatencyHistogram
percentile laws (member-of-samples, monotone, extremes), DivergenceRow
parse laws. 34 new tests; violet suite 64 green; router 77 green; zero
shared-file edits.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 00:08:18 +02:00
|
|
|
from pydantic import ValidationError
|
|
|
|
|
|
2026-06-12 16:09:04 +02:00
|
|
|
from .clock import PlaneClock, mono_ns
|
VIOLET V2a: V-TYPES domain layer + hypothesis properties + divergence reject-at-source
domain.py: refined scalar aliases (BarsHeld kills the bars_held=-106
UInt16 poison class by construction), DivergenceRow (DDL-shaped, frozen,
extra=forbid), ExecDriverSettings (env boundary for the V2 driver; ttl
override exists because the shared router clamps TTLs >= 0.5s),
ExecGateReport schema, beartype 'typed' decorator with
DOLPHIN_VIOLET_BEARTYPE=0 kill-switch.
divergence.py: rows now parse through DivergenceRow before the sink —
malformed rows die at the source with a rate-limited WARNING + counter,
never at the head of the CH spool.
Properties (hypothesis, derandomized): ExecutionRouter state machine
(fill/retry mutual exclusion via pop-semantics, R1 exit escalation same
trade_id, bounded retry chains, <=1 working ENTER), LatencyHistogram
percentile laws (member-of-samples, monotone, extremes), DivergenceRow
parse laws. 34 new tests; violet suite 64 green; router 77 green; zero
shared-file edits.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 00:08:18 +02:00
|
|
|
from .domain import DivergenceRow
|
2026-06-12 16:09:04 +02:00
|
|
|
|
|
|
|
|
LOGGER = logging.getLogger("violet.divergence")
|
|
|
|
|
|
|
|
|
|
Sink = Callable[[str, dict], None]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def to_bingx_symbol(asset: str) -> str:
|
|
|
|
|
"""BTCUSDT → BTC-USDT (idempotent for already-hyphenated input)."""
|
|
|
|
|
a = str(asset or "").upper()
|
|
|
|
|
if "-" in a or not a.endswith("USDT"):
|
|
|
|
|
return a
|
|
|
|
|
return f"{a[:-4]}-USDT"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def from_bingx_symbol(symbol: str) -> str:
|
|
|
|
|
"""BTC-USDT → BTCUSDT."""
|
|
|
|
|
return str(symbol or "").upper().replace("-", "")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class _VenueMid:
|
|
|
|
|
mid: float
|
|
|
|
|
mono: int
|
|
|
|
|
seq: int
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FeedDivergenceMonitor:
|
|
|
|
|
"""Samples scan-vs-venue divergence and journals it.
|
|
|
|
|
|
|
|
|
|
``on_venue_tick`` is fed by the WS/REST source; ``on_scan`` is called by
|
|
|
|
|
the launcher's main loop with each data-feed snapshot. A divergence row
|
|
|
|
|
is emitted only when the venue mid is fresher than the venue plane's
|
|
|
|
|
staleness budget — a stale mid must never masquerade as live divergence.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
TABLE = "violet_feed_divergence"
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
|
|
|
|
sink: Sink,
|
|
|
|
|
scan_clock: PlaneClock,
|
|
|
|
|
venue_clock: PlaneClock,
|
|
|
|
|
session_id: str,
|
|
|
|
|
table: str = TABLE,
|
|
|
|
|
logger: logging.Logger | None = None,
|
|
|
|
|
) -> None:
|
|
|
|
|
self.sink = sink
|
|
|
|
|
self.scan_clock = scan_clock
|
|
|
|
|
self.venue_clock = venue_clock
|
|
|
|
|
self.session_id = str(session_id)
|
|
|
|
|
self.table = table
|
|
|
|
|
self.logger = logger or LOGGER
|
|
|
|
|
self._mids: Dict[str, _VenueMid] = {}
|
|
|
|
|
self._stream: Any = None
|
|
|
|
|
self._stream_task: Optional[asyncio.Task] = None
|
|
|
|
|
self._rest_task: Optional[asyncio.Task] = None
|
|
|
|
|
self.rows_emitted = 0
|
VIOLET V2a: V-TYPES domain layer + hypothesis properties + divergence reject-at-source
domain.py: refined scalar aliases (BarsHeld kills the bars_held=-106
UInt16 poison class by construction), DivergenceRow (DDL-shaped, frozen,
extra=forbid), ExecDriverSettings (env boundary for the V2 driver; ttl
override exists because the shared router clamps TTLs >= 0.5s),
ExecGateReport schema, beartype 'typed' decorator with
DOLPHIN_VIOLET_BEARTYPE=0 kill-switch.
divergence.py: rows now parse through DivergenceRow before the sink —
malformed rows die at the source with a rate-limited WARNING + counter,
never at the head of the CH spool.
Properties (hypothesis, derandomized): ExecutionRouter state machine
(fill/retry mutual exclusion via pop-semantics, R1 exit escalation same
trade_id, bounded retry chains, <=1 working ENTER), LatencyHistogram
percentile laws (member-of-samples, monotone, extremes), DivergenceRow
parse laws. 34 new tests; violet suite 64 green; router 77 green; zero
shared-file edits.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 00:08:18 +02:00
|
|
|
self.rows_rejected = 0
|
2026-06-12 16:09:04 +02:00
|
|
|
|
|
|
|
|
# ── venue side ───────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def on_venue_tick(self, symbol: str, bid: float, ask: float) -> None:
|
|
|
|
|
try:
|
|
|
|
|
b, a = float(bid), float(ask)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
return
|
|
|
|
|
if not (b > 0 and a > 0):
|
|
|
|
|
return
|
|
|
|
|
seq = self.venue_clock.tick()
|
|
|
|
|
self._mids[from_bingx_symbol(symbol)] = _VenueMid(
|
|
|
|
|
mid=(b + a) / 2.0, mono=mono_ns(), seq=seq
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async def _on_ws_frame(self, payload: dict) -> None:
|
|
|
|
|
"""BingX bookTicker frame: {dataType: 'BTC-USDT@bookTicker',
|
|
|
|
|
data: {... 'b': bid, 'a': ask ...}} (field names per BingX swap WS)."""
|
|
|
|
|
try:
|
|
|
|
|
data_type = str(payload.get("dataType", "") or "")
|
|
|
|
|
if "@bookTicker" not in data_type:
|
|
|
|
|
return
|
|
|
|
|
symbol = data_type.split("@", 1)[0]
|
|
|
|
|
data = payload.get("data") or {}
|
|
|
|
|
bid = data.get("b") or data.get("bidPrice") or data.get("bestBid")
|
|
|
|
|
ask = data.get("a") or data.get("askPrice") or data.get("bestAsk")
|
|
|
|
|
if bid is None or ask is None:
|
|
|
|
|
return
|
|
|
|
|
self.on_venue_tick(symbol, float(bid), float(ask))
|
|
|
|
|
except Exception as exc: # noqa: BLE001 — never kill the stream
|
|
|
|
|
self.logger.debug("divergence WS frame parse skipped: %s", exc)
|
|
|
|
|
|
|
|
|
|
async def _rest_poll_loop(self, symbols: List[str], rest_base_url: str) -> None:
|
|
|
|
|
import aiohttp
|
|
|
|
|
|
|
|
|
|
url = rest_base_url.rstrip("/") + "/openApi/swap/v2/quote/bookTicker"
|
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
|
|
|
while True:
|
|
|
|
|
for sym in symbols:
|
|
|
|
|
try:
|
|
|
|
|
async with session.get(
|
|
|
|
|
url, params={"symbol": to_bingx_symbol(sym)},
|
|
|
|
|
timeout=aiohttp.ClientTimeout(total=5),
|
|
|
|
|
) as resp:
|
|
|
|
|
body = await resp.json(content_type=None)
|
|
|
|
|
data = (body or {}).get("data") or {}
|
|
|
|
|
# quote payloads use bidPrice/askPrice (REST shape)
|
|
|
|
|
book = data.get("book_ticker") or data
|
|
|
|
|
bid = book.get("bid_price") or book.get("bidPrice") or book.get("b")
|
|
|
|
|
ask = book.get("ask_price") or book.get("askPrice") or book.get("a")
|
|
|
|
|
if bid is not None and ask is not None:
|
|
|
|
|
self.on_venue_tick(sym, float(bid), float(ask))
|
|
|
|
|
except Exception as exc: # noqa: BLE001
|
|
|
|
|
self.logger.debug("divergence REST poll %s failed: %s", sym, exc)
|
|
|
|
|
await asyncio.sleep(1.0)
|
|
|
|
|
|
|
|
|
|
async def start(
|
|
|
|
|
self,
|
|
|
|
|
symbols: List[str],
|
|
|
|
|
*,
|
|
|
|
|
mode: str = "ws",
|
|
|
|
|
ws_url: str = "wss://open-api-swap.bingx.com/swap-market",
|
|
|
|
|
rest_base_url: str = "https://open-api.bingx.com",
|
|
|
|
|
) -> None:
|
|
|
|
|
if mode == "rest":
|
|
|
|
|
self._rest_task = asyncio.create_task(
|
|
|
|
|
self._rest_poll_loop(symbols, rest_base_url),
|
|
|
|
|
name="violet_divergence_rest",
|
|
|
|
|
)
|
|
|
|
|
self.logger.info("divergence: REST mid poll started for %d symbols", len(symbols))
|
|
|
|
|
return
|
|
|
|
|
from prod.bingx.market_stream import BingxMarketStream
|
|
|
|
|
|
|
|
|
|
self._stream = BingxMarketStream(ws_url=ws_url, on_event=self._on_ws_frame)
|
|
|
|
|
for sym in symbols:
|
|
|
|
|
self._stream.subscribe(f"{to_bingx_symbol(sym)}@bookTicker")
|
|
|
|
|
self._stream_task = asyncio.create_task(
|
|
|
|
|
self._stream.run_forever(), name="violet_divergence_ws"
|
|
|
|
|
)
|
|
|
|
|
self.logger.info("divergence: WS bookTicker started for %d symbols", len(symbols))
|
|
|
|
|
|
|
|
|
|
async def stop(self) -> None:
|
|
|
|
|
for task in (self._stream_task, self._rest_task):
|
|
|
|
|
if task is not None:
|
|
|
|
|
task.cancel()
|
|
|
|
|
try:
|
|
|
|
|
await task
|
|
|
|
|
except (asyncio.CancelledError, Exception):
|
|
|
|
|
pass
|
|
|
|
|
self._stream_task = self._rest_task = None
|
|
|
|
|
|
|
|
|
|
# ── scan side ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def on_scan(self, snapshot: Any) -> int:
|
|
|
|
|
"""Called once per data-feed snapshot. Returns rows emitted."""
|
|
|
|
|
payload = getattr(snapshot, "scan_payload", None)
|
|
|
|
|
if not isinstance(payload, dict):
|
|
|
|
|
return 0
|
|
|
|
|
assets = payload.get("assets") or []
|
|
|
|
|
prices = payload.get("asset_prices") or []
|
|
|
|
|
if not (isinstance(assets, list) and isinstance(prices, list) and assets):
|
|
|
|
|
return 0
|
|
|
|
|
scan_seq = self.scan_clock.tick()
|
|
|
|
|
now = mono_ns()
|
|
|
|
|
emitted = 0
|
|
|
|
|
for asset, price in zip(assets, prices):
|
|
|
|
|
try:
|
|
|
|
|
scan_price = float(price)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
continue
|
|
|
|
|
if scan_price <= 0:
|
|
|
|
|
continue
|
|
|
|
|
key = str(asset).upper()
|
|
|
|
|
vm = self._mids.get(key)
|
|
|
|
|
if vm is None:
|
|
|
|
|
continue
|
|
|
|
|
# Stale venue mid must never masquerade as live divergence.
|
|
|
|
|
if (now - vm.mono) > self.venue_clock.staleness_budget_ns:
|
|
|
|
|
continue
|
|
|
|
|
divergence_bps = (vm.mid - scan_price) / scan_price * 1e4
|
VIOLET V2a: V-TYPES domain layer + hypothesis properties + divergence reject-at-source
domain.py: refined scalar aliases (BarsHeld kills the bars_held=-106
UInt16 poison class by construction), DivergenceRow (DDL-shaped, frozen,
extra=forbid), ExecDriverSettings (env boundary for the V2 driver; ttl
override exists because the shared router clamps TTLs >= 0.5s),
ExecGateReport schema, beartype 'typed' decorator with
DOLPHIN_VIOLET_BEARTYPE=0 kill-switch.
divergence.py: rows now parse through DivergenceRow before the sink —
malformed rows die at the source with a rate-limited WARNING + counter,
never at the head of the CH spool.
Properties (hypothesis, derandomized): ExecutionRouter state machine
(fill/retry mutual exclusion via pop-semantics, R1 exit escalation same
trade_id, bounded retry chains, <=1 working ENTER), LatencyHistogram
percentile laws (member-of-samples, monotone, extremes), DivergenceRow
parse laws. 34 new tests; violet suite 64 green; router 77 green; zero
shared-file edits.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 00:08:18 +02:00
|
|
|
# V-TYPES: parse, don't validate — a malformed row dies here,
|
|
|
|
|
# never at the head of the CH spool (the bars_held lesson).
|
|
|
|
|
try:
|
|
|
|
|
row = DivergenceRow(
|
|
|
|
|
ts=int(time.time() * 1000), # DateTime64(3)
|
|
|
|
|
session_id=self.session_id,
|
|
|
|
|
asset=key,
|
|
|
|
|
scan_price=scan_price,
|
|
|
|
|
venue_mid=vm.mid,
|
|
|
|
|
divergence_bps=divergence_bps,
|
|
|
|
|
scan_seq=int(scan_seq),
|
|
|
|
|
venue_seq=int(vm.seq),
|
|
|
|
|
mono_ns=int(now),
|
|
|
|
|
)
|
|
|
|
|
except ValidationError as exc:
|
|
|
|
|
self.rows_rejected += 1
|
|
|
|
|
if self.rows_rejected == 1 or self.rows_rejected % 1000 == 0:
|
|
|
|
|
self.logger.warning(
|
|
|
|
|
"divergence row REJECTED at source (#%d) asset=%s: %s",
|
|
|
|
|
self.rows_rejected, key, exc,
|
|
|
|
|
)
|
|
|
|
|
continue
|
|
|
|
|
self.sink(self.table, row.model_dump())
|
2026-06-12 16:09:04 +02:00
|
|
|
emitted += 1
|
|
|
|
|
self.rows_emitted += emitted
|
|
|
|
|
return emitted
|