Includes core prod + GREEN/BLUE subsystems: - prod/ (BLUE harness, configs, scripts, docs) - nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved) - adaptive_exit/ (AEM engine + models/bucket_assignments.pkl) - Observability/ (EsoF advisor, TUI, dashboards) - external_factors/ (EsoF producer) - mc_forewarning_qlabs_fork/ (MC regime/envelope) Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
170 lines
6.0 KiB
Python
Executable File
170 lines
6.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Continuous Arrow to Parquet converter.
|
|
Run this and let it work - it processes batches with progress.
|
|
"""
|
|
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
# Platform-aware paths (dolphin_paths resolves Win vs Linux)
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / 'nautilus_dolphin'))
|
|
from dolphin_paths import get_arb512_storage_root, get_klines_dir, get_project_root
|
|
|
|
ARROW_BASE = get_arb512_storage_root() / 'arrow_scans'
|
|
OUTPUT_DIR = get_klines_dir()
|
|
LOG_FILE = get_project_root() / 'prod' / 'convert_log.txt'
|
|
|
|
def log(msg):
|
|
ts = datetime.now().strftime('%H:%M:%S')
|
|
line = f'[{ts}] {msg}'
|
|
print(line, flush=True)
|
|
with open(LOG_FILE, 'a') as f:
|
|
f.write(line + '\n')
|
|
|
|
def get_dates():
|
|
arrow = set(d.name for d in ARROW_BASE.iterdir() if d.is_dir() and len(d.name)==10)
|
|
parquet = set(f.stem for f in OUTPUT_DIR.glob('*.parquet'))
|
|
return arrow, parquet
|
|
|
|
def convert_one(date_str):
|
|
"""Convert a single date using direct implementation."""
|
|
import pandas as pd
|
|
import numpy as np
|
|
import pyarrow as pa
|
|
import pyarrow.ipc as ipc
|
|
import json
|
|
|
|
date_dir = ARROW_BASE / date_str
|
|
arrow_files = sorted(date_dir.glob('scan_*.arrow'))
|
|
|
|
if not arrow_files:
|
|
return False, "no_arrow_files"
|
|
|
|
rows = []
|
|
last_prices = {}
|
|
CORE_COLS = ['timestamp', 'scan_number', 'v50_lambda_max_velocity',
|
|
'v150_lambda_max_velocity', 'v300_lambda_max_velocity',
|
|
'v750_lambda_max_velocity', 'vel_div', 'instability_50', 'instability_150']
|
|
EXCLUDED = {'TUSDUSDT', 'USDCUSDT'}
|
|
|
|
for af in arrow_files:
|
|
try:
|
|
with pa.memory_map(str(af), 'r') as src:
|
|
table = ipc.open_file(src).read_all()
|
|
if len(table) == 0:
|
|
continue
|
|
|
|
row_raw = {col: table.column(col)[0].as_py() for col in table.column_names}
|
|
|
|
ts_ns = row_raw.get('timestamp_ns') or 0
|
|
if ts_ns:
|
|
ts = pd.Timestamp(ts_ns, unit='ns', tz='UTC').tz_localize(None)
|
|
else:
|
|
ts = pd.NaT
|
|
|
|
v50 = float(row_raw.get('w50_velocity', 0) or 0)
|
|
v150 = float(row_raw.get('w150_velocity', 0) or 0)
|
|
v300 = row_raw.get('w300_velocity')
|
|
v750 = row_raw.get('w750_velocity')
|
|
vd = float(row_raw.get('vel_div', v50 - v150) or (v50 - v150))
|
|
i50 = row_raw.get('w50_instability')
|
|
i150 = row_raw.get('w150_instability')
|
|
|
|
if v50 == 0.0 and v150 == 0.0:
|
|
continue
|
|
|
|
assets_raw = json.loads(row_raw.get('assets_json', '[]') or '[]')
|
|
prices_raw = json.loads(row_raw.get('asset_prices_json', '[]') or '[]')
|
|
|
|
price_map = {}
|
|
for asset, price in zip(assets_raw, prices_raw):
|
|
if asset in EXCLUDED:
|
|
continue
|
|
if price is not None and float(price) > 0:
|
|
price_map[asset] = float(price)
|
|
last_prices[asset] = float(price)
|
|
elif asset in last_prices:
|
|
price_map[asset] = last_prices[asset]
|
|
|
|
if 'BTCUSDT' not in price_map:
|
|
continue
|
|
|
|
rec = {
|
|
'timestamp': ts,
|
|
'scan_number': int(row_raw.get('scan_number', 0) or 0),
|
|
'v50_lambda_max_velocity': v50,
|
|
'v150_lambda_max_velocity': v150,
|
|
'v300_lambda_max_velocity': float(v300) if v300 is not None else np.nan,
|
|
'v750_lambda_max_velocity': float(v750) if v750 is not None else np.nan,
|
|
'vel_div': vd,
|
|
'instability_50': float(i50) if i50 is not None else np.nan,
|
|
'instability_150': float(i150) if i150 is not None else np.nan,
|
|
}
|
|
rec.update(price_map)
|
|
rows.append(rec)
|
|
except Exception:
|
|
continue
|
|
|
|
if not rows:
|
|
return False, "no_valid_rows"
|
|
|
|
df = pd.DataFrame(rows)
|
|
df = df.sort_values('timestamp').reset_index(drop=True)
|
|
|
|
price_cols = [c for c in df.columns if c not in CORE_COLS]
|
|
if price_cols:
|
|
df[price_cols] = df[price_cols].ffill()
|
|
|
|
btc_count = df['BTCUSDT'].notna().sum()
|
|
keep_price_cols = [c for c in price_cols if c in df.columns and df[c].notna().sum() == btc_count]
|
|
final_cols = CORE_COLS + keep_price_cols
|
|
df = df[[c for c in final_cols if c in df.columns]]
|
|
|
|
out_file = OUTPUT_DIR / f"{date_str}.parquet"
|
|
df.to_parquet(out_file, engine='pyarrow', compression='snappy')
|
|
return True, f"rows_{len(df)}"
|
|
|
|
def main():
|
|
log('='*50)
|
|
log('CONTINUOUS CONVERTER START')
|
|
log('='*50)
|
|
|
|
arrow, parquet = get_dates()
|
|
to_do = sorted(arrow - parquet)
|
|
log(f'Total Arrow: {len(arrow)}, Parquet: {len(parquet)}, To convert: {len(to_do)}')
|
|
|
|
if not to_do:
|
|
log('COMPLETE - nothing to do!')
|
|
return
|
|
|
|
# Process in chunks with progress
|
|
total = len(to_do)
|
|
success = 0
|
|
failed = 0
|
|
start_time = time.time()
|
|
|
|
for i, date_str in enumerate(to_do):
|
|
ok, status = convert_one(date_str)
|
|
if ok:
|
|
success += 1
|
|
else:
|
|
failed += 1
|
|
log(f' FAIL {date_str}: {status}')
|
|
|
|
# Progress every 5 files
|
|
if (i + 1) % 5 == 0:
|
|
elapsed = time.time() - start_time
|
|
rate = (i + 1) / elapsed * 60 if elapsed > 0 else 0
|
|
remaining = (total - i - 1) / ((i + 1) / elapsed) if elapsed > 0 and i > 0 else 0
|
|
log(f'Progress: {i+1}/{total} | Rate: {rate:.1f}/min | Est remaining: {remaining/60:.1f}h | OK: {success} Fail: {failed}')
|
|
|
|
log('='*50)
|
|
log(f'DONE: {success}/{total} converted, {failed} failed')
|
|
log('='*50)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|