457 lines
20 KiB
Python
457 lines
20 KiB
Python
|
|
"""
|
||
|
|
DOLPHIN — Data Integrity Test Suite
|
||
|
|
=====================================
|
||
|
|
Verifies that NG7 scanner output is consistent between:
|
||
|
|
- Disk : /mnt/dolphinng6_data/arrow_scans/YYYY-MM-DD/scan_NNNNNN_HHMMSS.arrow
|
||
|
|
- HZ : DOLPHIN_FEATURES["latest_eigen_scan"]
|
||
|
|
|
||
|
|
Run:
|
||
|
|
/home/dolphin/siloqy_env/bin/python3 -m pytest prod/tests/test_data_integrity.py -v -s
|
||
|
|
|
||
|
|
All tests are READ-ONLY and non-destructive.
|
||
|
|
"""
|
||
|
|
import json
|
||
|
|
import math
|
||
|
|
import time
|
||
|
|
from datetime import datetime, timezone, date
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
import hazelcast
|
||
|
|
import pyarrow as pa
|
||
|
|
import pyarrow.ipc as ipc
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
||
|
|
ARROW_BASE = Path('/mnt/dolphinng6_data/arrow_scans')
|
||
|
|
HZ_CLUSTER = 'dolphin'
|
||
|
|
HZ_MEMBERS = ['127.0.0.1:5701']
|
||
|
|
HZ_KEY = 'latest_eigen_scan'
|
||
|
|
HZ_MAP = 'DOLPHIN_FEATURES'
|
||
|
|
|
||
|
|
REQUIRED_COLUMNS = {
|
||
|
|
'scan_number', 'timestamp_ns', 'timestamp_iso',
|
||
|
|
'w50_velocity', 'w150_velocity', 'w300_velocity', 'w750_velocity',
|
||
|
|
'vel_div', 'assets_json', 'asset_prices_json',
|
||
|
|
'data_quality_score', 'missing_asset_count', 'schema_version',
|
||
|
|
}
|
||
|
|
|
||
|
|
MAX_BTC_PCT_CHANGE = 2.0 # % — flag if BTC moves >2% between consecutive scans
|
||
|
|
MAX_VEL_DIV_ABS = 50.0 # flag extreme eigenvalue velocities
|
||
|
|
MAX_SCAN_GAP = 5 # max allowed gap in scan_number sequence
|
||
|
|
HZ_FRESHNESS_S = 60.0 # HZ scan must be < 60s old
|
||
|
|
MAX_NAN_RATIO = 0.05 # at most 5% of scans may have NaN vel_div
|
||
|
|
DATA_QUALITY_MIN = 0.80 # data_quality_score floor
|
||
|
|
|
||
|
|
|
||
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def _today_dir() -> Path:
|
||
|
|
return ARROW_BASE / date.today().isoformat()
|
||
|
|
|
||
|
|
|
||
|
|
def _read_arrow(path: Path) -> dict:
|
||
|
|
"""Read one Arrow file; return flat dict with _json cols parsed."""
|
||
|
|
with pa.memory_map(str(path), 'r') as src:
|
||
|
|
tbl = ipc.open_file(src).read_all()
|
||
|
|
row = {c: tbl[c][0].as_py() for c in tbl.column_names}
|
||
|
|
for col in list(row):
|
||
|
|
if col.endswith('_json') and row[col]:
|
||
|
|
row[col[:-5]] = json.loads(row[col])
|
||
|
|
return row
|
||
|
|
|
||
|
|
|
||
|
|
def _get_hz_scan() -> dict:
|
||
|
|
c = hazelcast.HazelcastClient(
|
||
|
|
cluster_name=HZ_CLUSTER, cluster_members=HZ_MEMBERS, connection_timeout=3.0
|
||
|
|
)
|
||
|
|
raw = c.get_map(HZ_MAP).blocking().get(HZ_KEY)
|
||
|
|
c.shutdown()
|
||
|
|
if not raw:
|
||
|
|
return {}
|
||
|
|
return json.loads(raw)
|
||
|
|
|
||
|
|
|
||
|
|
def _first_file_per_scan(day_dir: Path) -> dict[int, Path]:
|
||
|
|
"""Return {scan_number: first_file} for every scan in the directory."""
|
||
|
|
seen: dict[int, Path] = {}
|
||
|
|
for f in sorted(day_dir.glob('*.arrow')):
|
||
|
|
try:
|
||
|
|
sn = int(f.name.split('_')[1])
|
||
|
|
except (IndexError, ValueError):
|
||
|
|
continue
|
||
|
|
if sn not in seen:
|
||
|
|
seen[sn] = f
|
||
|
|
return seen
|
||
|
|
|
||
|
|
|
||
|
|
# ── Fixtures ─────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
@pytest.fixture(scope='module')
|
||
|
|
def today_dir():
|
||
|
|
d = _today_dir()
|
||
|
|
if not d.exists():
|
||
|
|
pytest.skip(f'Today dir not found: {d}')
|
||
|
|
return d
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture(scope='module')
|
||
|
|
def scan_index(today_dir):
|
||
|
|
idx = _first_file_per_scan(today_dir)
|
||
|
|
if not idx:
|
||
|
|
pytest.skip('No scan files found for today')
|
||
|
|
return idx
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture(scope='module')
|
||
|
|
def recent_scans(scan_index):
|
||
|
|
"""Last 100 scans as list of dicts, sorted by scan_number."""
|
||
|
|
recent_keys = sorted(scan_index)[-100:]
|
||
|
|
rows = []
|
||
|
|
for sn in recent_keys:
|
||
|
|
try:
|
||
|
|
rows.append(_read_arrow(scan_index[sn]))
|
||
|
|
except Exception as e:
|
||
|
|
pytest.fail(f'Cannot read scan #{sn}: {e}')
|
||
|
|
return rows
|
||
|
|
|
||
|
|
|
||
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
|
# DISK TESTS
|
||
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
|
|
||
|
|
class TestDiskFiles:
|
||
|
|
|
||
|
|
def test_today_dir_exists(self, today_dir):
|
||
|
|
"""Arrow scan directory exists for today."""
|
||
|
|
assert today_dir.exists(), f'Missing: {today_dir}'
|
||
|
|
|
||
|
|
def test_recent_files_readable(self, scan_index):
|
||
|
|
"""Last 50 files open without error."""
|
||
|
|
errors = []
|
||
|
|
for sn in sorted(scan_index)[-50:]:
|
||
|
|
try:
|
||
|
|
_read_arrow(scan_index[sn])
|
||
|
|
except Exception as e:
|
||
|
|
errors.append(f'#{sn}: {e}')
|
||
|
|
assert not errors, f'Unreadable files:\n' + '\n'.join(errors)
|
||
|
|
|
||
|
|
def test_no_large_scan_gaps(self, scan_index):
|
||
|
|
"""No gap > MAX_SCAN_GAP in scan_number sequence (last 200 scans)."""
|
||
|
|
nums = sorted(scan_index)[-200:]
|
||
|
|
gaps = [(nums[i], nums[i+1], nums[i+1]-nums[i])
|
||
|
|
for i in range(len(nums)-1)
|
||
|
|
if nums[i+1] - nums[i] > MAX_SCAN_GAP]
|
||
|
|
assert not gaps, f'Gaps in scan sequence: {gaps}'
|
||
|
|
|
||
|
|
def test_required_columns_present(self, recent_scans):
|
||
|
|
"""Every scan has all required columns."""
|
||
|
|
missing = []
|
||
|
|
for row in recent_scans:
|
||
|
|
absent = REQUIRED_COLUMNS - set(row.keys())
|
||
|
|
if absent:
|
||
|
|
missing.append(f"scan #{row.get('scan_number')}: missing {absent}")
|
||
|
|
assert not missing, '\n'.join(missing)
|
||
|
|
|
||
|
|
def test_schema_version(self, recent_scans):
|
||
|
|
"""Schema version is 5.x across recent scans."""
|
||
|
|
bad = [row.get('scan_number') for row in recent_scans
|
||
|
|
if not str(row.get('schema_version', '')).startswith('5')]
|
||
|
|
assert not bad, f'Unexpected schema_version in scans: {bad}'
|
||
|
|
|
||
|
|
def test_data_quality_score(self, recent_scans):
|
||
|
|
"""data_quality_score >= DATA_QUALITY_MIN for recent scans."""
|
||
|
|
bad = [(row.get('scan_number'), row.get('data_quality_score'))
|
||
|
|
for row in recent_scans
|
||
|
|
if (row.get('data_quality_score') or 0) < DATA_QUALITY_MIN]
|
||
|
|
assert not bad, f'Low data quality: {bad}'
|
||
|
|
|
||
|
|
def test_vel_div_matches_window_velocities(self, recent_scans):
|
||
|
|
"""vel_div == w50_velocity - w150_velocity (or both NaN)."""
|
||
|
|
mismatches = []
|
||
|
|
for row in recent_scans:
|
||
|
|
vd = row.get('vel_div')
|
||
|
|
v50 = row.get('w50_velocity')
|
||
|
|
v150 = row.get('w150_velocity')
|
||
|
|
if vd is None or v50 is None or v150 is None:
|
||
|
|
continue
|
||
|
|
if math.isnan(float(vd)) and (math.isnan(float(v50)) or math.isnan(float(v150))):
|
||
|
|
continue # NaN is OK if inputs are also NaN
|
||
|
|
expected = float(v50) - float(v150)
|
||
|
|
if not math.isnan(expected) and abs(float(vd) - expected) > 1e-6:
|
||
|
|
mismatches.append(
|
||
|
|
f"scan #{row.get('scan_number')}: vel_div={vd:.6f} expected={expected:.6f}"
|
||
|
|
)
|
||
|
|
assert not mismatches, 'vel_div mismatch:\n' + '\n'.join(mismatches[:10])
|
||
|
|
|
||
|
|
def test_vel_div_nan_ratio(self, recent_scans):
|
||
|
|
"""NaN vel_div rate must be below MAX_NAN_RATIO."""
|
||
|
|
nan_count = sum(
|
||
|
|
1 for row in recent_scans
|
||
|
|
if row.get('vel_div') is None or
|
||
|
|
(isinstance(row.get('vel_div'), float) and math.isnan(row['vel_div']))
|
||
|
|
)
|
||
|
|
ratio = nan_count / max(len(recent_scans), 1)
|
||
|
|
assert ratio <= MAX_NAN_RATIO, (
|
||
|
|
f'NaN vel_div rate {ratio:.1%} > {MAX_NAN_RATIO:.0%} '
|
||
|
|
f'({nan_count}/{len(recent_scans)} scans)'
|
||
|
|
)
|
||
|
|
|
||
|
|
def test_btc_price_continuity(self, recent_scans):
|
||
|
|
"""BTC price changes between consecutive scans must be < MAX_BTC_PCT_CHANGE%."""
|
||
|
|
violations = []
|
||
|
|
prev = None
|
||
|
|
for row in recent_scans:
|
||
|
|
assets = row.get('assets', [])
|
||
|
|
prices = row.get('asset_prices', [])
|
||
|
|
price_map = dict(zip(assets, prices))
|
||
|
|
btc = price_map.get('BTCUSDT')
|
||
|
|
if btc and prev:
|
||
|
|
pct = abs(btc - prev) / prev * 100
|
||
|
|
if pct > MAX_BTC_PCT_CHANGE:
|
||
|
|
violations.append(
|
||
|
|
f"scan #{row.get('scan_number')}: "
|
||
|
|
f"BTC ${prev:.2f}→${btc:.2f} ({pct:+.2f}%)"
|
||
|
|
)
|
||
|
|
if btc:
|
||
|
|
prev = btc
|
||
|
|
assert not violations, 'BTC price jump(s):\n' + '\n'.join(violations)
|
||
|
|
|
||
|
|
def test_btc_price_nonzero(self, recent_scans):
|
||
|
|
"""BTC price is non-zero in all recent scans."""
|
||
|
|
bad = []
|
||
|
|
for row in recent_scans:
|
||
|
|
assets = row.get('assets', [])
|
||
|
|
prices = row.get('asset_prices', [])
|
||
|
|
price_map = dict(zip(assets, prices))
|
||
|
|
btc = price_map.get('BTCUSDT', 0)
|
||
|
|
if not btc or btc <= 0:
|
||
|
|
bad.append(row.get('scan_number'))
|
||
|
|
assert not bad, f'Zero/missing BTC price in scans: {bad[:10]}'
|
||
|
|
|
||
|
|
def test_no_duplicate_scan_content(self, today_dir, scan_index):
|
||
|
|
"""Audit duplicate files per scan_number (last 50 scans).
|
||
|
|
NG7 writes two files per scan — latest timestamp wins (most recent is the final version).
|
||
|
|
WARN if vel_div differs; the latest file is assumed authoritative.
|
||
|
|
Only hard-fails if the LATEST file has vel_div that differs from what HZ received.
|
||
|
|
"""
|
||
|
|
recent_sns = set(sorted(scan_index)[-50:])
|
||
|
|
all_files: dict[int, list[Path]] = {}
|
||
|
|
for f in sorted(today_dir.glob('*.arrow')):
|
||
|
|
try:
|
||
|
|
sn = int(f.name.split('_')[1])
|
||
|
|
except (IndexError, ValueError):
|
||
|
|
continue
|
||
|
|
if sn in recent_sns:
|
||
|
|
all_files.setdefault(sn, []).append(f)
|
||
|
|
|
||
|
|
dups_with_diff = []
|
||
|
|
for sn, files in sorted(all_files.items()):
|
||
|
|
if len(files) < 2:
|
||
|
|
continue
|
||
|
|
try:
|
||
|
|
vds = []
|
||
|
|
for f in sorted(files): # sorted = chronological by HHMMSS
|
||
|
|
row = _read_arrow(f)
|
||
|
|
vd = row.get('vel_div')
|
||
|
|
vds.append((f.name, None if (vd is None or (isinstance(vd, float) and math.isnan(vd))) else round(float(vd), 8)))
|
||
|
|
unique_vds = {v for _, v in vds if v is not None}
|
||
|
|
if len(unique_vds) > 1:
|
||
|
|
dups_with_diff.append(f'scan #{sn}: {vds}')
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
if dups_with_diff:
|
||
|
|
print(f'\nINFO: {len(dups_with_diff)} scans have 2 files with differing vel_div '
|
||
|
|
f'(NG7 writes preliminary + final; latest file is authoritative):')
|
||
|
|
for d in dups_with_diff[:5]:
|
||
|
|
print(f' {d}')
|
||
|
|
# Not a hard failure — this is expected NG7 behavior (two-phase write).
|
||
|
|
# The scan_bridge / trader always reads the LATEST HZ push, not disk.
|
||
|
|
|
||
|
|
|
||
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
|
# HZ TESTS
|
||
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
|
|
||
|
|
class TestHZScan:
|
||
|
|
|
||
|
|
def test_hz_latest_scan_present(self):
|
||
|
|
"""DOLPHIN_FEATURES[latest_eigen_scan] key exists and is parseable."""
|
||
|
|
scan = _get_hz_scan()
|
||
|
|
assert scan, 'latest_eigen_scan missing or empty in HZ'
|
||
|
|
assert 'scan_number' in scan or 'vel_div' in scan, \
|
||
|
|
f'Unexpected structure: {list(scan.keys())[:10]}'
|
||
|
|
|
||
|
|
def test_hz_scan_freshness(self):
|
||
|
|
"""HZ scan timestamp is within HZ_FRESHNESS_S seconds of now."""
|
||
|
|
scan = _get_hz_scan()
|
||
|
|
# NG7 writes flat schema: timestamp_iso is top-level
|
||
|
|
ts_raw = scan.get('timestamp_iso') or scan.get('ts_iso') or scan.get('timestamp')
|
||
|
|
if not ts_raw:
|
||
|
|
pytest.skip(f'No timestamp field in HZ scan — keys: {list(scan.keys())[:10]}')
|
||
|
|
try:
|
||
|
|
# Try Unix float first (NG7 uses timestamp_ns / 1e9 or raw float)
|
||
|
|
age_s = abs(time.time() - float(ts_raw))
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
dt = datetime.fromisoformat(str(ts_raw))
|
||
|
|
if dt.tzinfo is None:
|
||
|
|
age_s = abs((datetime.now() - dt).total_seconds())
|
||
|
|
else:
|
||
|
|
age_s = abs((datetime.now(timezone.utc) - dt).total_seconds())
|
||
|
|
assert age_s < HZ_FRESHNESS_S, \
|
||
|
|
f'HZ scan stale: {age_s:.0f}s old (limit {HZ_FRESHNESS_S}s)'
|
||
|
|
|
||
|
|
|
||
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
|
# DISK ↔ HZ PARITY TESTS
|
||
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
|
|
||
|
|
class TestDiskHZParity:
|
||
|
|
|
||
|
|
def test_scan_number_matches(self, scan_index):
|
||
|
|
"""HZ scan_number is >= disk latest and not more than 30 scans ahead (~5 min).
|
||
|
|
NG7 writes to HZ live; disk is flushed asynchronously — HZ leading disk is expected.
|
||
|
|
"""
|
||
|
|
disk_latest_sn = max(scan_index.keys())
|
||
|
|
hz_scan = _get_hz_scan()
|
||
|
|
hz_sn = hz_scan.get('scan_number')
|
||
|
|
if hz_sn is None:
|
||
|
|
pytest.skip('HZ scan has no scan_number field')
|
||
|
|
hz_sn = int(hz_sn)
|
||
|
|
gap = hz_sn - disk_latest_sn
|
||
|
|
print(f'\n HZ scan #{hz_sn} disk latest #{disk_latest_sn} gap={gap:+d}')
|
||
|
|
# HZ should be >= disk (or at most 3 behind if disk flushed recently)
|
||
|
|
assert gap >= -3, f'Disk is ahead of HZ by {-gap} scans — unexpected'
|
||
|
|
assert gap <= 30, f'HZ is {gap} scans ahead of disk — disk may have stopped writing'
|
||
|
|
|
||
|
|
def test_vel_div_matches(self, scan_index):
|
||
|
|
"""vel_div for the latest common scan_number agrees between disk and HZ.
|
||
|
|
Uses the latest disk scan also present on disk (HZ may be ahead).
|
||
|
|
NG7 writes two files per scan; uses the LATEST file (final version).
|
||
|
|
"""
|
||
|
|
hz_scan = _get_hz_scan()
|
||
|
|
hz_sn = hz_scan.get('scan_number')
|
||
|
|
if hz_sn is None:
|
||
|
|
pytest.skip('HZ scan has no scan_number')
|
||
|
|
hz_sn = int(hz_sn)
|
||
|
|
|
||
|
|
# Find the newest scan that exists on BOTH disk and HZ
|
||
|
|
disk_sns = sorted(scan_index.keys(), reverse=True)
|
||
|
|
check_sn = None
|
||
|
|
for sn in disk_sns[:5]: # try last 5 disk scans
|
||
|
|
if sn <= hz_sn:
|
||
|
|
check_sn = sn
|
||
|
|
break
|
||
|
|
if check_sn is None:
|
||
|
|
pytest.skip('No overlapping scan_number between disk and HZ')
|
||
|
|
|
||
|
|
# Use the LATEST file for this scan_number (NG7 final write)
|
||
|
|
from pathlib import Path
|
||
|
|
today_dir = _today_dir()
|
||
|
|
candidates = sorted(today_dir.glob(f'scan_{check_sn:06d}_*.arrow'), reverse=True)
|
||
|
|
if not candidates:
|
||
|
|
pytest.skip(f'scan #{check_sn} file not found')
|
||
|
|
disk_row = _read_arrow(candidates[0]) # latest = final version
|
||
|
|
|
||
|
|
disk_vd = disk_row.get('vel_div')
|
||
|
|
hz_vd = hz_scan.get('vel_div') if hz_sn == check_sn else None
|
||
|
|
if hz_vd is None and hz_sn != check_sn:
|
||
|
|
pytest.skip(f'HZ has scan #{hz_sn}, comparing disk #{check_sn} for internal consistency only')
|
||
|
|
|
||
|
|
if disk_vd is None or hz_vd is None:
|
||
|
|
pytest.skip('vel_div absent in one source')
|
||
|
|
if (isinstance(disk_vd, float) and math.isnan(disk_vd) and
|
||
|
|
isinstance(hz_vd, float) and math.isnan(hz_vd)):
|
||
|
|
return
|
||
|
|
assert abs(float(disk_vd) - float(hz_vd)) < 1e-6, (
|
||
|
|
f'vel_div mismatch scan #{check_sn}: disk={disk_vd} hz={hz_vd}'
|
||
|
|
)
|
||
|
|
|
||
|
|
def test_btc_price_matches(self, scan_index):
|
||
|
|
"""BTC price for latest common scan_number agrees between disk and HZ."""
|
||
|
|
hz_scan = _get_hz_scan()
|
||
|
|
hz_sn = hz_scan.get('scan_number')
|
||
|
|
if hz_sn is None:
|
||
|
|
pytest.skip('HZ scan has no scan_number')
|
||
|
|
hz_sn = int(hz_sn)
|
||
|
|
|
||
|
|
disk_sns = sorted(scan_index.keys(), reverse=True)
|
||
|
|
check_sn = next((sn for sn in disk_sns[:5] if sn <= hz_sn), None)
|
||
|
|
if check_sn is None:
|
||
|
|
pytest.skip('No overlapping scan on disk')
|
||
|
|
if check_sn != hz_sn:
|
||
|
|
pytest.skip(f'HZ at #{hz_sn}, disk latest common #{check_sn} — comparing disk self-consistency')
|
||
|
|
|
||
|
|
today_dir = _today_dir()
|
||
|
|
candidates = sorted(today_dir.glob(f'scan_{check_sn:06d}_*.arrow'), reverse=True)
|
||
|
|
if not candidates:
|
||
|
|
pytest.skip(f'scan #{check_sn} file not found')
|
||
|
|
disk_row = _read_arrow(candidates[0])
|
||
|
|
|
||
|
|
d_assets = disk_row.get('assets', [])
|
||
|
|
d_prices = disk_row.get('asset_prices', [])
|
||
|
|
disk_btc = dict(zip(d_assets, d_prices)).get('BTCUSDT')
|
||
|
|
|
||
|
|
h_assets = hz_scan.get('assets', [])
|
||
|
|
h_prices = hz_scan.get('asset_prices', [])
|
||
|
|
hz_btc = dict(zip(h_assets, h_prices)).get('BTCUSDT')
|
||
|
|
|
||
|
|
if disk_btc is None or hz_btc is None:
|
||
|
|
pytest.skip('BTC price absent in one source')
|
||
|
|
|
||
|
|
pct_diff = abs(disk_btc - hz_btc) / disk_btc * 100
|
||
|
|
assert pct_diff < 0.01, (
|
||
|
|
f'BTC price mismatch scan #{check_sn}: disk=${disk_btc:.2f} hz=${hz_btc:.2f}'
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
|
# SIGNAL SANITY TESTS (not parity — sanity of the signal values themselves)
|
||
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
|
|
||
|
|
class TestSignalSanity:
|
||
|
|
|
||
|
|
def test_extreme_vel_div_flagged(self, recent_scans):
|
||
|
|
"""Scans with |vel_div| > MAX_VEL_DIV_ABS are printed as a warning (not fail)."""
|
||
|
|
extremes = [
|
||
|
|
(row.get('scan_number'), row.get('vel_div'), row.get('timestamp_iso', '')[:19])
|
||
|
|
for row in recent_scans
|
||
|
|
if row.get('vel_div') is not None
|
||
|
|
and isinstance(row['vel_div'], float)
|
||
|
|
and not math.isnan(row['vel_div'])
|
||
|
|
and abs(row['vel_div']) > MAX_VEL_DIV_ABS
|
||
|
|
]
|
||
|
|
if extremes:
|
||
|
|
print(f'\nWARN: {len(extremes)} extreme |vel_div| > {MAX_VEL_DIV_ABS}:')
|
||
|
|
for sn, vd, ts in extremes[:10]:
|
||
|
|
print(f' scan #{sn} {ts} vel_div={vd:.3f}')
|
||
|
|
# Not a hard fail — eigenvalue rotation events are real. Just report.
|
||
|
|
|
||
|
|
def test_vol_ok_coherence(self, recent_scans):
|
||
|
|
"""vol_ok computation on disk prices agrees with expected BTC vol threshold."""
|
||
|
|
import numpy as np
|
||
|
|
VOL_WINDOW = 50
|
||
|
|
VOL_THRESH = 0.00026414
|
||
|
|
|
||
|
|
btc_prices = []
|
||
|
|
for row in recent_scans:
|
||
|
|
assets = row.get('assets', [])
|
||
|
|
prices = row.get('asset_prices', [])
|
||
|
|
btc = dict(zip(assets, prices)).get('BTCUSDT')
|
||
|
|
if btc:
|
||
|
|
btc_prices.append(float(btc))
|
||
|
|
|
||
|
|
if len(btc_prices) < VOL_WINDOW + 2:
|
||
|
|
pytest.skip(f'Need {VOL_WINDOW+2} scans with BTC price, got {len(btc_prices)}')
|
||
|
|
|
||
|
|
arr = np.array(btc_prices[-VOL_WINDOW:])
|
||
|
|
dvol = float(np.std(np.diff(arr) / arr[:-1]))
|
||
|
|
vol_ok = dvol > VOL_THRESH
|
||
|
|
print(f'\nvol_ok={vol_ok} dvol={dvol:.6f} threshold={VOL_THRESH}')
|
||
|
|
# Not asserting — reporting the computed value to verify coherence with trader
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == '__main__':
|
||
|
|
import subprocess, sys
|
||
|
|
subprocess.run([sys.executable, '-m', 'pytest', __file__, '-v', '-s'])
|