"""V1: FeedDivergenceMonitor — row shape vs DDL, sign, staleness, seqs.""" from __future__ import annotations import re import sys from pathlib import Path from types import SimpleNamespace sys.path.insert(0, "/mnt/dolphinng5_predict") import pytest from prod.clean_arch.violet.clock import PlaneClock, mono_ns from prod.clean_arch.violet.divergence import ( FeedDivergenceMonitor, from_bingx_symbol, to_bingx_symbol, ) DDL_PATH = Path( "/mnt/dolphinng5_predict/prod/clickhouse/violet/20_violet_feed_divergence.sql" ) def _mk(sink_rows, venue_budget_ns=2_000_000_000): return FeedDivergenceMonitor( sink=lambda table, row: sink_rows.append((table, row)), scan_clock=PlaneClock("scan", 12_000_000_000), venue_clock=PlaneClock("venue", venue_budget_ns), session_id="sess-test", ) def _snapshot(assets, prices): return SimpleNamespace(scan_payload={"assets": assets, "asset_prices": prices}) def test_symbol_mapping_round_trip(): assert to_bingx_symbol("FETUSDT") == "FET-USDT" assert to_bingx_symbol("FET-USDT") == "FET-USDT" # idempotent assert from_bingx_symbol("FET-USDT") == "FETUSDT" def test_row_keys_exactly_match_ddl_columns(): """Parse the shipped DDL: emitted row keys must equal the column set.""" ddl = DDL_PATH.read_text() cols = set(re.findall(r"`(\w+)`", ddl)) rows = [] m = _mk(rows) m.on_venue_tick("FET-USDT", 0.1877, 0.1879) m.on_scan(_snapshot(["FETUSDT"], [0.2176])) assert len(rows) == 1 table, row = rows[0] assert table == "violet_feed_divergence" assert set(row.keys()) == cols, (set(row.keys()) ^ cols) def test_bps_sign_convention_venue_above_scan_positive(): rows = [] m = _mk(rows) m.on_venue_tick("BTC-USDT", 101.0, 101.0) m.on_scan(_snapshot(["BTCUSDT"], [100.0])) assert rows[0][1]["divergence_bps"] == pytest.approx(100.0) # +1% = +100bps rows.clear() m.on_venue_tick("BTC-USDT", 99.0, 99.0) m.on_scan(_snapshot(["BTCUSDT"], [100.0])) assert rows[0][1]["divergence_bps"] == pytest.approx(-100.0) def test_fet_incident_magnitude(): """The motivating case: scan 0.2176 vs venue ~0.1878 ⇒ ~ -1369 bps.""" rows = [] m = _mk(rows) m.on_venue_tick("FET-USDT", 0.1877, 0.1879) m.on_scan(_snapshot(["FETUSDT"], [0.2176])) bps = rows[0][1]["divergence_bps"] assert bps == pytest.approx((0.1878 - 0.2176) / 0.2176 * 1e4, rel=1e-6) assert bps < -1300 def test_stale_venue_mid_suppressed(): rows = [] m = _mk(rows, venue_budget_ns=1) # everything is stale m.on_venue_tick("BTC-USDT", 100.0, 100.0) import time time.sleep(0.001) m.on_scan(_snapshot(["BTCUSDT"], [100.0])) assert rows == [] # no phantom divergence def test_seq_propagation_and_no_mid_no_row(): rows = [] m = _mk(rows) m.on_venue_tick("BTC-USDT", 100.0, 100.0) m.on_venue_tick("BTC-USDT", 100.2, 100.2) # venue_seq advances to 2 m.on_scan(_snapshot(["BTCUSDT", "ETHUSDT"], [100.0, 2000.0])) assert len(rows) == 1 # ETH has no venue mid row = rows[0][1] assert row["venue_seq"] == 2 assert row["scan_seq"] == 1 m.on_scan(_snapshot(["BTCUSDT"], [100.1])) assert rows[-1][1]["scan_seq"] == 2 def test_garbage_inputs_ignored(): rows = [] m = _mk(rows) m.on_venue_tick("BTC-USDT", 0.0, -1.0) # invalid quotes ignored m.on_venue_tick("BTC-USDT", "x", None) # type garbage ignored m.on_scan(_snapshot(["BTCUSDT"], ["nan-ish", 0.0])) m.on_scan(SimpleNamespace(scan_payload=None)) m.on_scan(SimpleNamespace()) # no payload attr at all assert rows == [] if __name__ == "__main__": raise SystemExit(pytest.main([__file__, "-v"]))