Files
DOLPHIN/prod/continuous_convert.py
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

170 lines
6.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Continuous Arrow to Parquet converter.
Run this and let it work - it processes batches with progress.
"""
import sys
import time
from pathlib import Path
from datetime import datetime
# Platform-aware paths (dolphin_paths resolves Win vs Linux)
sys.path.insert(0, str(Path(__file__).parent.parent / 'nautilus_dolphin'))
from dolphin_paths import get_arb512_storage_root, get_klines_dir, get_project_root
ARROW_BASE = get_arb512_storage_root() / 'arrow_scans'
OUTPUT_DIR = get_klines_dir()
LOG_FILE = get_project_root() / 'prod' / 'convert_log.txt'
def log(msg):
ts = datetime.now().strftime('%H:%M:%S')
line = f'[{ts}] {msg}'
print(line, flush=True)
with open(LOG_FILE, 'a') as f:
f.write(line + '\n')
def get_dates():
arrow = set(d.name for d in ARROW_BASE.iterdir() if d.is_dir() and len(d.name)==10)
parquet = set(f.stem for f in OUTPUT_DIR.glob('*.parquet'))
return arrow, parquet
def convert_one(date_str):
"""Convert a single date using direct implementation."""
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.ipc as ipc
import json
date_dir = ARROW_BASE / date_str
arrow_files = sorted(date_dir.glob('scan_*.arrow'))
if not arrow_files:
return False, "no_arrow_files"
rows = []
last_prices = {}
CORE_COLS = ['timestamp', 'scan_number', 'v50_lambda_max_velocity',
'v150_lambda_max_velocity', 'v300_lambda_max_velocity',
'v750_lambda_max_velocity', 'vel_div', 'instability_50', 'instability_150']
EXCLUDED = {'TUSDUSDT', 'USDCUSDT'}
for af in arrow_files:
try:
with pa.memory_map(str(af), 'r') as src:
table = ipc.open_file(src).read_all()
if len(table) == 0:
continue
row_raw = {col: table.column(col)[0].as_py() for col in table.column_names}
ts_ns = row_raw.get('timestamp_ns') or 0
if ts_ns:
ts = pd.Timestamp(ts_ns, unit='ns', tz='UTC').tz_localize(None)
else:
ts = pd.NaT
v50 = float(row_raw.get('w50_velocity', 0) or 0)
v150 = float(row_raw.get('w150_velocity', 0) or 0)
v300 = row_raw.get('w300_velocity')
v750 = row_raw.get('w750_velocity')
vd = float(row_raw.get('vel_div', v50 - v150) or (v50 - v150))
i50 = row_raw.get('w50_instability')
i150 = row_raw.get('w150_instability')
if v50 == 0.0 and v150 == 0.0:
continue
assets_raw = json.loads(row_raw.get('assets_json', '[]') or '[]')
prices_raw = json.loads(row_raw.get('asset_prices_json', '[]') or '[]')
price_map = {}
for asset, price in zip(assets_raw, prices_raw):
if asset in EXCLUDED:
continue
if price is not None and float(price) > 0:
price_map[asset] = float(price)
last_prices[asset] = float(price)
elif asset in last_prices:
price_map[asset] = last_prices[asset]
if 'BTCUSDT' not in price_map:
continue
rec = {
'timestamp': ts,
'scan_number': int(row_raw.get('scan_number', 0) or 0),
'v50_lambda_max_velocity': v50,
'v150_lambda_max_velocity': v150,
'v300_lambda_max_velocity': float(v300) if v300 is not None else np.nan,
'v750_lambda_max_velocity': float(v750) if v750 is not None else np.nan,
'vel_div': vd,
'instability_50': float(i50) if i50 is not None else np.nan,
'instability_150': float(i150) if i150 is not None else np.nan,
}
rec.update(price_map)
rows.append(rec)
except Exception:
continue
if not rows:
return False, "no_valid_rows"
df = pd.DataFrame(rows)
df = df.sort_values('timestamp').reset_index(drop=True)
price_cols = [c for c in df.columns if c not in CORE_COLS]
if price_cols:
df[price_cols] = df[price_cols].ffill()
btc_count = df['BTCUSDT'].notna().sum()
keep_price_cols = [c for c in price_cols if c in df.columns and df[c].notna().sum() == btc_count]
final_cols = CORE_COLS + keep_price_cols
df = df[[c for c in final_cols if c in df.columns]]
out_file = OUTPUT_DIR / f"{date_str}.parquet"
df.to_parquet(out_file, engine='pyarrow', compression='snappy')
return True, f"rows_{len(df)}"
def main():
log('='*50)
log('CONTINUOUS CONVERTER START')
log('='*50)
arrow, parquet = get_dates()
to_do = sorted(arrow - parquet)
log(f'Total Arrow: {len(arrow)}, Parquet: {len(parquet)}, To convert: {len(to_do)}')
if not to_do:
log('COMPLETE - nothing to do!')
return
# Process in chunks with progress
total = len(to_do)
success = 0
failed = 0
start_time = time.time()
for i, date_str in enumerate(to_do):
ok, status = convert_one(date_str)
if ok:
success += 1
else:
failed += 1
log(f' FAIL {date_str}: {status}')
# Progress every 5 files
if (i + 1) % 5 == 0:
elapsed = time.time() - start_time
rate = (i + 1) / elapsed * 60 if elapsed > 0 else 0
remaining = (total - i - 1) / ((i + 1) / elapsed) if elapsed > 0 and i > 0 else 0
log(f'Progress: {i+1}/{total} | Rate: {rate:.1f}/min | Est remaining: {remaining/60:.1f}h | OK: {success} Fail: {failed}')
log('='*50)
log(f'DONE: {success}/{total} converted, {failed} failed')
log('='*50)
if __name__ == '__main__':
main()