#!/usr/bin/env python3 """ Continuous Arrow to Parquet converter. Run this and let it work - it processes batches with progress. """ import sys import time from pathlib import Path from datetime import datetime # Platform-aware paths (dolphin_paths resolves Win vs Linux) sys.path.insert(0, str(Path(__file__).parent.parent / 'nautilus_dolphin')) from dolphin_paths import get_arb512_storage_root, get_klines_dir, get_project_root ARROW_BASE = get_arb512_storage_root() / 'arrow_scans' OUTPUT_DIR = get_klines_dir() LOG_FILE = get_project_root() / 'prod' / 'convert_log.txt' def log(msg): ts = datetime.now().strftime('%H:%M:%S') line = f'[{ts}] {msg}' print(line, flush=True) with open(LOG_FILE, 'a') as f: f.write(line + '\n') def get_dates(): arrow = set(d.name for d in ARROW_BASE.iterdir() if d.is_dir() and len(d.name)==10) parquet = set(f.stem for f in OUTPUT_DIR.glob('*.parquet')) return arrow, parquet def convert_one(date_str): """Convert a single date using direct implementation.""" import pandas as pd import numpy as np import pyarrow as pa import pyarrow.ipc as ipc import json date_dir = ARROW_BASE / date_str arrow_files = sorted(date_dir.glob('scan_*.arrow')) if not arrow_files: return False, "no_arrow_files" rows = [] last_prices = {} CORE_COLS = ['timestamp', 'scan_number', 'v50_lambda_max_velocity', 'v150_lambda_max_velocity', 'v300_lambda_max_velocity', 'v750_lambda_max_velocity', 'vel_div', 'instability_50', 'instability_150'] EXCLUDED = {'TUSDUSDT', 'USDCUSDT'} for af in arrow_files: try: with pa.memory_map(str(af), 'r') as src: table = ipc.open_file(src).read_all() if len(table) == 0: continue row_raw = {col: table.column(col)[0].as_py() for col in table.column_names} ts_ns = row_raw.get('timestamp_ns') or 0 if ts_ns: ts = pd.Timestamp(ts_ns, unit='ns', tz='UTC').tz_localize(None) else: ts = pd.NaT v50 = float(row_raw.get('w50_velocity', 0) or 0) v150 = float(row_raw.get('w150_velocity', 0) or 0) v300 = row_raw.get('w300_velocity') v750 = row_raw.get('w750_velocity') vd = float(row_raw.get('vel_div', v50 - v150) or (v50 - v150)) i50 = row_raw.get('w50_instability') i150 = row_raw.get('w150_instability') if v50 == 0.0 and v150 == 0.0: continue assets_raw = json.loads(row_raw.get('assets_json', '[]') or '[]') prices_raw = json.loads(row_raw.get('asset_prices_json', '[]') or '[]') price_map = {} for asset, price in zip(assets_raw, prices_raw): if asset in EXCLUDED: continue if price is not None and float(price) > 0: price_map[asset] = float(price) last_prices[asset] = float(price) elif asset in last_prices: price_map[asset] = last_prices[asset] if 'BTCUSDT' not in price_map: continue rec = { 'timestamp': ts, 'scan_number': int(row_raw.get('scan_number', 0) or 0), 'v50_lambda_max_velocity': v50, 'v150_lambda_max_velocity': v150, 'v300_lambda_max_velocity': float(v300) if v300 is not None else np.nan, 'v750_lambda_max_velocity': float(v750) if v750 is not None else np.nan, 'vel_div': vd, 'instability_50': float(i50) if i50 is not None else np.nan, 'instability_150': float(i150) if i150 is not None else np.nan, } rec.update(price_map) rows.append(rec) except Exception: continue if not rows: return False, "no_valid_rows" df = pd.DataFrame(rows) df = df.sort_values('timestamp').reset_index(drop=True) price_cols = [c for c in df.columns if c not in CORE_COLS] if price_cols: df[price_cols] = df[price_cols].ffill() btc_count = df['BTCUSDT'].notna().sum() keep_price_cols = [c for c in price_cols if c in df.columns and df[c].notna().sum() == btc_count] final_cols = CORE_COLS + keep_price_cols df = df[[c for c in final_cols if c in df.columns]] out_file = OUTPUT_DIR / f"{date_str}.parquet" df.to_parquet(out_file, engine='pyarrow', compression='snappy') return True, f"rows_{len(df)}" def main(): log('='*50) log('CONTINUOUS CONVERTER START') log('='*50) arrow, parquet = get_dates() to_do = sorted(arrow - parquet) log(f'Total Arrow: {len(arrow)}, Parquet: {len(parquet)}, To convert: {len(to_do)}') if not to_do: log('COMPLETE - nothing to do!') return # Process in chunks with progress total = len(to_do) success = 0 failed = 0 start_time = time.time() for i, date_str in enumerate(to_do): ok, status = convert_one(date_str) if ok: success += 1 else: failed += 1 log(f' FAIL {date_str}: {status}') # Progress every 5 files if (i + 1) % 5 == 0: elapsed = time.time() - start_time rate = (i + 1) / elapsed * 60 if elapsed > 0 else 0 remaining = (total - i - 1) / ((i + 1) / elapsed) if elapsed > 0 and i > 0 else 0 log(f'Progress: {i+1}/{total} | Rate: {rate:.1f}/min | Est remaining: {remaining/60:.1f}h | OK: {success} Fail: {failed}') log('='*50) log(f'DONE: {success}/{total} converted, {failed} failed') log('='*50) if __name__ == '__main__': main()