Files
siloqy/prod/clean_arch/violet/test_violet_divergence.py

120 lines
3.7 KiB
Python
Raw Normal View History

"""V1: FeedDivergenceMonitor — row shape vs DDL, sign, staleness, seqs."""
from __future__ import annotations
import re
import sys
from pathlib import Path
from types import SimpleNamespace
sys.path.insert(0, "/mnt/dolphinng5_predict")
import pytest
from prod.clean_arch.violet.clock import PlaneClock, mono_ns
from prod.clean_arch.violet.divergence import (
FeedDivergenceMonitor,
from_bingx_symbol,
to_bingx_symbol,
)
DDL_PATH = Path(
"/mnt/dolphinng5_predict/prod/clickhouse/violet/20_violet_feed_divergence.sql"
)
def _mk(sink_rows, venue_budget_ns=2_000_000_000):
return FeedDivergenceMonitor(
sink=lambda table, row: sink_rows.append((table, row)),
scan_clock=PlaneClock("scan", 12_000_000_000),
venue_clock=PlaneClock("venue", venue_budget_ns),
session_id="sess-test",
)
def _snapshot(assets, prices):
return SimpleNamespace(scan_payload={"assets": assets, "asset_prices": prices})
def test_symbol_mapping_round_trip():
assert to_bingx_symbol("FETUSDT") == "FET-USDT"
assert to_bingx_symbol("FET-USDT") == "FET-USDT" # idempotent
assert from_bingx_symbol("FET-USDT") == "FETUSDT"
def test_row_keys_exactly_match_ddl_columns():
"""Parse the shipped DDL: emitted row keys must equal the column set."""
ddl = DDL_PATH.read_text()
cols = set(re.findall(r"`(\w+)`", ddl))
rows = []
m = _mk(rows)
m.on_venue_tick("FET-USDT", 0.1877, 0.1879)
m.on_scan(_snapshot(["FETUSDT"], [0.2176]))
assert len(rows) == 1
table, row = rows[0]
assert table == "violet_feed_divergence"
assert set(row.keys()) == cols, (set(row.keys()) ^ cols)
def test_bps_sign_convention_venue_above_scan_positive():
rows = []
m = _mk(rows)
m.on_venue_tick("BTC-USDT", 101.0, 101.0)
m.on_scan(_snapshot(["BTCUSDT"], [100.0]))
assert rows[0][1]["divergence_bps"] == pytest.approx(100.0) # +1% = +100bps
rows.clear()
m.on_venue_tick("BTC-USDT", 99.0, 99.0)
m.on_scan(_snapshot(["BTCUSDT"], [100.0]))
assert rows[0][1]["divergence_bps"] == pytest.approx(-100.0)
def test_fet_incident_magnitude():
"""The motivating case: scan 0.2176 vs venue ~0.1878 ⇒ ~ -1369 bps."""
rows = []
m = _mk(rows)
m.on_venue_tick("FET-USDT", 0.1877, 0.1879)
m.on_scan(_snapshot(["FETUSDT"], [0.2176]))
bps = rows[0][1]["divergence_bps"]
assert bps == pytest.approx((0.1878 - 0.2176) / 0.2176 * 1e4, rel=1e-6)
assert bps < -1300
def test_stale_venue_mid_suppressed():
rows = []
m = _mk(rows, venue_budget_ns=1) # everything is stale
m.on_venue_tick("BTC-USDT", 100.0, 100.0)
import time
time.sleep(0.001)
m.on_scan(_snapshot(["BTCUSDT"], [100.0]))
assert rows == [] # no phantom divergence
def test_seq_propagation_and_no_mid_no_row():
rows = []
m = _mk(rows)
m.on_venue_tick("BTC-USDT", 100.0, 100.0)
m.on_venue_tick("BTC-USDT", 100.2, 100.2) # venue_seq advances to 2
m.on_scan(_snapshot(["BTCUSDT", "ETHUSDT"], [100.0, 2000.0]))
assert len(rows) == 1 # ETH has no venue mid
row = rows[0][1]
assert row["venue_seq"] == 2
assert row["scan_seq"] == 1
m.on_scan(_snapshot(["BTCUSDT"], [100.1]))
assert rows[-1][1]["scan_seq"] == 2
def test_garbage_inputs_ignored():
rows = []
m = _mk(rows)
m.on_venue_tick("BTC-USDT", 0.0, -1.0) # invalid quotes ignored
m.on_venue_tick("BTC-USDT", "x", None) # type garbage ignored
m.on_scan(_snapshot(["BTCUSDT"], ["nan-ish", 0.0]))
m.on_scan(SimpleNamespace(scan_payload=None))
m.on_scan(SimpleNamespace()) # no payload attr at all
assert rows == []
if __name__ == "__main__":
raise SystemExit(pytest.main([__file__, "-v"]))