""" DOLPHIN — Data Integrity Test Suite ===================================== Verifies that NG7 scanner output is consistent between: - Disk : /mnt/dolphinng6_data/arrow_scans/YYYY-MM-DD/scan_NNNNNN_HHMMSS.arrow - HZ : DOLPHIN_FEATURES["latest_eigen_scan"] Run: /home/dolphin/siloqy_env/bin/python3 -m pytest prod/tests/test_data_integrity.py -v -s All tests are READ-ONLY and non-destructive. """ import json import math import time from datetime import datetime, timezone, date from pathlib import Path import hazelcast import pyarrow as pa import pyarrow.ipc as ipc import pytest # ── Config ──────────────────────────────────────────────────────────────────── ARROW_BASE = Path('/mnt/dolphinng6_data/arrow_scans') HZ_CLUSTER = 'dolphin' HZ_MEMBERS = ['127.0.0.1:5701'] HZ_KEY = 'latest_eigen_scan' HZ_MAP = 'DOLPHIN_FEATURES' REQUIRED_COLUMNS = { 'scan_number', 'timestamp_ns', 'timestamp_iso', 'w50_velocity', 'w150_velocity', 'w300_velocity', 'w750_velocity', 'vel_div', 'assets_json', 'asset_prices_json', 'data_quality_score', 'missing_asset_count', 'schema_version', } MAX_BTC_PCT_CHANGE = 2.0 # % — flag if BTC moves >2% between consecutive scans MAX_VEL_DIV_ABS = 50.0 # flag extreme eigenvalue velocities MAX_SCAN_GAP = 5 # max allowed gap in scan_number sequence HZ_FRESHNESS_S = 60.0 # HZ scan must be < 60s old MAX_NAN_RATIO = 0.05 # at most 5% of scans may have NaN vel_div DATA_QUALITY_MIN = 0.80 # data_quality_score floor # ── Helpers ─────────────────────────────────────────────────────────────────── def _today_dir() -> Path: return ARROW_BASE / date.today().isoformat() def _read_arrow(path: Path) -> dict: """Read one Arrow file; return flat dict with _json cols parsed.""" with pa.memory_map(str(path), 'r') as src: tbl = ipc.open_file(src).read_all() row = {c: tbl[c][0].as_py() for c in tbl.column_names} for col in list(row): if col.endswith('_json') and row[col]: row[col[:-5]] = json.loads(row[col]) return row def _get_hz_scan() -> dict: c = hazelcast.HazelcastClient( cluster_name=HZ_CLUSTER, cluster_members=HZ_MEMBERS, connection_timeout=3.0 ) raw = c.get_map(HZ_MAP).blocking().get(HZ_KEY) c.shutdown() if not raw: return {} return json.loads(raw) def _first_file_per_scan(day_dir: Path) -> dict[int, Path]: """Return {scan_number: first_file} for every scan in the directory.""" seen: dict[int, Path] = {} for f in sorted(day_dir.glob('*.arrow')): try: sn = int(f.name.split('_')[1]) except (IndexError, ValueError): continue if sn not in seen: seen[sn] = f return seen # ── Fixtures ───────────────────────────────────────────────────────────────── @pytest.fixture(scope='module') def today_dir(): d = _today_dir() if not d.exists(): pytest.skip(f'Today dir not found: {d}') return d @pytest.fixture(scope='module') def scan_index(today_dir): idx = _first_file_per_scan(today_dir) if not idx: pytest.skip('No scan files found for today') return idx @pytest.fixture(scope='module') def recent_scans(scan_index): """Last 100 scans as list of dicts, sorted by scan_number.""" recent_keys = sorted(scan_index)[-100:] rows = [] for sn in recent_keys: try: rows.append(_read_arrow(scan_index[sn])) except Exception as e: pytest.fail(f'Cannot read scan #{sn}: {e}') return rows # ══════════════════════════════════════════════════════════════════════════════ # DISK TESTS # ══════════════════════════════════════════════════════════════════════════════ class TestDiskFiles: def test_today_dir_exists(self, today_dir): """Arrow scan directory exists for today.""" assert today_dir.exists(), f'Missing: {today_dir}' def test_recent_files_readable(self, scan_index): """Last 50 files open without error.""" errors = [] for sn in sorted(scan_index)[-50:]: try: _read_arrow(scan_index[sn]) except Exception as e: errors.append(f'#{sn}: {e}') assert not errors, f'Unreadable files:\n' + '\n'.join(errors) def test_no_large_scan_gaps(self, scan_index): """No gap > MAX_SCAN_GAP in scan_number sequence (last 200 scans).""" nums = sorted(scan_index)[-200:] gaps = [(nums[i], nums[i+1], nums[i+1]-nums[i]) for i in range(len(nums)-1) if nums[i+1] - nums[i] > MAX_SCAN_GAP] assert not gaps, f'Gaps in scan sequence: {gaps}' def test_required_columns_present(self, recent_scans): """Every scan has all required columns.""" missing = [] for row in recent_scans: absent = REQUIRED_COLUMNS - set(row.keys()) if absent: missing.append(f"scan #{row.get('scan_number')}: missing {absent}") assert not missing, '\n'.join(missing) def test_schema_version(self, recent_scans): """Schema version is 5.x across recent scans.""" bad = [row.get('scan_number') for row in recent_scans if not str(row.get('schema_version', '')).startswith('5')] assert not bad, f'Unexpected schema_version in scans: {bad}' def test_data_quality_score(self, recent_scans): """data_quality_score >= DATA_QUALITY_MIN for recent scans.""" bad = [(row.get('scan_number'), row.get('data_quality_score')) for row in recent_scans if (row.get('data_quality_score') or 0) < DATA_QUALITY_MIN] assert not bad, f'Low data quality: {bad}' def test_vel_div_matches_window_velocities(self, recent_scans): """vel_div == w50_velocity - w150_velocity (or both NaN).""" mismatches = [] for row in recent_scans: vd = row.get('vel_div') v50 = row.get('w50_velocity') v150 = row.get('w150_velocity') if vd is None or v50 is None or v150 is None: continue if math.isnan(float(vd)) and (math.isnan(float(v50)) or math.isnan(float(v150))): continue # NaN is OK if inputs are also NaN expected = float(v50) - float(v150) if not math.isnan(expected) and abs(float(vd) - expected) > 1e-6: mismatches.append( f"scan #{row.get('scan_number')}: vel_div={vd:.6f} expected={expected:.6f}" ) assert not mismatches, 'vel_div mismatch:\n' + '\n'.join(mismatches[:10]) def test_vel_div_nan_ratio(self, recent_scans): """NaN vel_div rate must be below MAX_NAN_RATIO.""" nan_count = sum( 1 for row in recent_scans if row.get('vel_div') is None or (isinstance(row.get('vel_div'), float) and math.isnan(row['vel_div'])) ) ratio = nan_count / max(len(recent_scans), 1) assert ratio <= MAX_NAN_RATIO, ( f'NaN vel_div rate {ratio:.1%} > {MAX_NAN_RATIO:.0%} ' f'({nan_count}/{len(recent_scans)} scans)' ) def test_btc_price_continuity(self, recent_scans): """BTC price changes between consecutive scans must be < MAX_BTC_PCT_CHANGE%.""" violations = [] prev = None for row in recent_scans: assets = row.get('assets', []) prices = row.get('asset_prices', []) price_map = dict(zip(assets, prices)) btc = price_map.get('BTCUSDT') if btc and prev: pct = abs(btc - prev) / prev * 100 if pct > MAX_BTC_PCT_CHANGE: violations.append( f"scan #{row.get('scan_number')}: " f"BTC ${prev:.2f}→${btc:.2f} ({pct:+.2f}%)" ) if btc: prev = btc assert not violations, 'BTC price jump(s):\n' + '\n'.join(violations) def test_btc_price_nonzero(self, recent_scans): """BTC price is non-zero in all recent scans.""" bad = [] for row in recent_scans: assets = row.get('assets', []) prices = row.get('asset_prices', []) price_map = dict(zip(assets, prices)) btc = price_map.get('BTCUSDT', 0) if not btc or btc <= 0: bad.append(row.get('scan_number')) assert not bad, f'Zero/missing BTC price in scans: {bad[:10]}' def test_no_duplicate_scan_content(self, today_dir, scan_index): """Audit duplicate files per scan_number (last 50 scans). NG7 writes two files per scan — latest timestamp wins (most recent is the final version). WARN if vel_div differs; the latest file is assumed authoritative. Only hard-fails if the LATEST file has vel_div that differs from what HZ received. """ recent_sns = set(sorted(scan_index)[-50:]) all_files: dict[int, list[Path]] = {} for f in sorted(today_dir.glob('*.arrow')): try: sn = int(f.name.split('_')[1]) except (IndexError, ValueError): continue if sn in recent_sns: all_files.setdefault(sn, []).append(f) dups_with_diff = [] for sn, files in sorted(all_files.items()): if len(files) < 2: continue try: vds = [] for f in sorted(files): # sorted = chronological by HHMMSS row = _read_arrow(f) vd = row.get('vel_div') vds.append((f.name, None if (vd is None or (isinstance(vd, float) and math.isnan(vd))) else round(float(vd), 8))) unique_vds = {v for _, v in vds if v is not None} if len(unique_vds) > 1: dups_with_diff.append(f'scan #{sn}: {vds}') except Exception: pass if dups_with_diff: print(f'\nINFO: {len(dups_with_diff)} scans have 2 files with differing vel_div ' f'(NG7 writes preliminary + final; latest file is authoritative):') for d in dups_with_diff[:5]: print(f' {d}') # Not a hard failure — this is expected NG7 behavior (two-phase write). # The scan_bridge / trader always reads the LATEST HZ push, not disk. # ══════════════════════════════════════════════════════════════════════════════ # HZ TESTS # ══════════════════════════════════════════════════════════════════════════════ class TestHZScan: def test_hz_latest_scan_present(self): """DOLPHIN_FEATURES[latest_eigen_scan] key exists and is parseable.""" scan = _get_hz_scan() assert scan, 'latest_eigen_scan missing or empty in HZ' assert 'scan_number' in scan or 'vel_div' in scan, \ f'Unexpected structure: {list(scan.keys())[:10]}' def test_hz_scan_freshness(self): """HZ scan timestamp is within HZ_FRESHNESS_S seconds of now.""" scan = _get_hz_scan() # NG7 writes flat schema: timestamp_iso is top-level ts_raw = scan.get('timestamp_iso') or scan.get('ts_iso') or scan.get('timestamp') if not ts_raw: pytest.skip(f'No timestamp field in HZ scan — keys: {list(scan.keys())[:10]}') try: # Try Unix float first (NG7 uses timestamp_ns / 1e9 or raw float) age_s = abs(time.time() - float(ts_raw)) except (ValueError, TypeError): dt = datetime.fromisoformat(str(ts_raw)) if dt.tzinfo is None: age_s = abs((datetime.now() - dt).total_seconds()) else: age_s = abs((datetime.now(timezone.utc) - dt).total_seconds()) assert age_s < HZ_FRESHNESS_S, \ f'HZ scan stale: {age_s:.0f}s old (limit {HZ_FRESHNESS_S}s)' # ══════════════════════════════════════════════════════════════════════════════ # DISK ↔ HZ PARITY TESTS # ══════════════════════════════════════════════════════════════════════════════ class TestDiskHZParity: def test_scan_number_matches(self, scan_index): """HZ scan_number is >= disk latest and not more than 30 scans ahead (~5 min). NG7 writes to HZ live; disk is flushed asynchronously — HZ leading disk is expected. """ disk_latest_sn = max(scan_index.keys()) hz_scan = _get_hz_scan() hz_sn = hz_scan.get('scan_number') if hz_sn is None: pytest.skip('HZ scan has no scan_number field') hz_sn = int(hz_sn) gap = hz_sn - disk_latest_sn print(f'\n HZ scan #{hz_sn} disk latest #{disk_latest_sn} gap={gap:+d}') # HZ should be >= disk (or at most 3 behind if disk flushed recently) assert gap >= -3, f'Disk is ahead of HZ by {-gap} scans — unexpected' assert gap <= 30, f'HZ is {gap} scans ahead of disk — disk may have stopped writing' def test_vel_div_matches(self, scan_index): """vel_div for the latest common scan_number agrees between disk and HZ. Uses the latest disk scan also present on disk (HZ may be ahead). NG7 writes two files per scan; uses the LATEST file (final version). """ hz_scan = _get_hz_scan() hz_sn = hz_scan.get('scan_number') if hz_sn is None: pytest.skip('HZ scan has no scan_number') hz_sn = int(hz_sn) # Find the newest scan that exists on BOTH disk and HZ disk_sns = sorted(scan_index.keys(), reverse=True) check_sn = None for sn in disk_sns[:5]: # try last 5 disk scans if sn <= hz_sn: check_sn = sn break if check_sn is None: pytest.skip('No overlapping scan_number between disk and HZ') # Use the LATEST file for this scan_number (NG7 final write) from pathlib import Path today_dir = _today_dir() candidates = sorted(today_dir.glob(f'scan_{check_sn:06d}_*.arrow'), reverse=True) if not candidates: pytest.skip(f'scan #{check_sn} file not found') disk_row = _read_arrow(candidates[0]) # latest = final version disk_vd = disk_row.get('vel_div') hz_vd = hz_scan.get('vel_div') if hz_sn == check_sn else None if hz_vd is None and hz_sn != check_sn: pytest.skip(f'HZ has scan #{hz_sn}, comparing disk #{check_sn} for internal consistency only') if disk_vd is None or hz_vd is None: pytest.skip('vel_div absent in one source') if (isinstance(disk_vd, float) and math.isnan(disk_vd) and isinstance(hz_vd, float) and math.isnan(hz_vd)): return assert abs(float(disk_vd) - float(hz_vd)) < 1e-6, ( f'vel_div mismatch scan #{check_sn}: disk={disk_vd} hz={hz_vd}' ) def test_btc_price_matches(self, scan_index): """BTC price for latest common scan_number agrees between disk and HZ.""" hz_scan = _get_hz_scan() hz_sn = hz_scan.get('scan_number') if hz_sn is None: pytest.skip('HZ scan has no scan_number') hz_sn = int(hz_sn) disk_sns = sorted(scan_index.keys(), reverse=True) check_sn = next((sn for sn in disk_sns[:5] if sn <= hz_sn), None) if check_sn is None: pytest.skip('No overlapping scan on disk') if check_sn != hz_sn: pytest.skip(f'HZ at #{hz_sn}, disk latest common #{check_sn} — comparing disk self-consistency') today_dir = _today_dir() candidates = sorted(today_dir.glob(f'scan_{check_sn:06d}_*.arrow'), reverse=True) if not candidates: pytest.skip(f'scan #{check_sn} file not found') disk_row = _read_arrow(candidates[0]) d_assets = disk_row.get('assets', []) d_prices = disk_row.get('asset_prices', []) disk_btc = dict(zip(d_assets, d_prices)).get('BTCUSDT') h_assets = hz_scan.get('assets', []) h_prices = hz_scan.get('asset_prices', []) hz_btc = dict(zip(h_assets, h_prices)).get('BTCUSDT') if disk_btc is None or hz_btc is None: pytest.skip('BTC price absent in one source') pct_diff = abs(disk_btc - hz_btc) / disk_btc * 100 assert pct_diff < 0.01, ( f'BTC price mismatch scan #{check_sn}: disk=${disk_btc:.2f} hz=${hz_btc:.2f}' ) # ══════════════════════════════════════════════════════════════════════════════ # SIGNAL SANITY TESTS (not parity — sanity of the signal values themselves) # ══════════════════════════════════════════════════════════════════════════════ class TestSignalSanity: def test_extreme_vel_div_flagged(self, recent_scans): """Scans with |vel_div| > MAX_VEL_DIV_ABS are printed as a warning (not fail).""" extremes = [ (row.get('scan_number'), row.get('vel_div'), row.get('timestamp_iso', '')[:19]) for row in recent_scans if row.get('vel_div') is not None and isinstance(row['vel_div'], float) and not math.isnan(row['vel_div']) and abs(row['vel_div']) > MAX_VEL_DIV_ABS ] if extremes: print(f'\nWARN: {len(extremes)} extreme |vel_div| > {MAX_VEL_DIV_ABS}:') for sn, vd, ts in extremes[:10]: print(f' scan #{sn} {ts} vel_div={vd:.3f}') # Not a hard fail — eigenvalue rotation events are real. Just report. def test_vol_ok_coherence(self, recent_scans): """vol_ok computation on disk prices agrees with expected BTC vol threshold.""" import numpy as np VOL_WINDOW = 50 VOL_THRESH = 0.00026414 btc_prices = [] for row in recent_scans: assets = row.get('assets', []) prices = row.get('asset_prices', []) btc = dict(zip(assets, prices)).get('BTCUSDT') if btc: btc_prices.append(float(btc)) if len(btc_prices) < VOL_WINDOW + 2: pytest.skip(f'Need {VOL_WINDOW+2} scans with BTC price, got {len(btc_prices)}') arr = np.array(btc_prices[-VOL_WINDOW:]) dvol = float(np.std(np.diff(arr) / arr[:-1])) vol_ok = dvol > VOL_THRESH print(f'\nvol_ok={vol_ok} dvol={dvol:.6f} threshold={VOL_THRESH}') # Not asserting — reporting the computed value to verify coherence with trader if __name__ == '__main__': import subprocess, sys subprocess.run([sys.executable, '-m', 'pytest', __file__, '-v', '-s'])