""" convnext_5s_sensor.py — Inference wrapper for the 5s ConvNeXt-1D β-TCVAE. Usage ----- sensor = ConvNext5sSensor(model_path) z_mu, z_post_std = sensor.encode_raw(arr) # arr: (C_IN, T_WIN) float64 # z_mu: (z_dim,) float64 — latent mean # z_post_std: float — mean posterior std (>1 = wide/uncertain) z_mu, z_post_std = sensor.encode_scan_window(arr2d) # arr2d: (T_WIN, C_IN) or (T_WIN, 7) — from scan parquet rows # If 7 columns, proxy_B is appended as ch7. Key differences from the 1m sensor (convnext_sensor.py): - Model path: convnext_model_5s.json - C_IN = 8 (7 FEATURE + proxy_B — NO ExF channels) - No dvol_btc, fng, funding_btc channels - FEATURE_COLS are the same 7 features as the 1m sensor - proxy_B = instability_50 - v750_lambda_max_velocity (ch7, same formula as 1m) Architecture: ConvNeXtVAE C_in=8 T_in=32 z_dim=32 base_ch=32 n_blocks=3 Input channels: ch0 v50_lambda_max_velocity ch1 v150_lambda_max_velocity ch2 v300_lambda_max_velocity ch3 v750_lambda_max_velocity ch4 vel_div ch5 instability_50 ch6 instability_150 ch7 proxy_B (= instability_50 - v750_lambda_max_velocity) """ import os import sys import json import numpy as np _DVAE_DIR = os.path.dirname(os.path.abspath(__file__)) if _DVAE_DIR not in sys.path: sys.path.insert(0, _DVAE_DIR) from convnext_dvae import ConvNeXtVAE FEATURE_COLS = [ 'v50_lambda_max_velocity', 'v150_lambda_max_velocity', 'v300_lambda_max_velocity', 'v750_lambda_max_velocity', 'vel_div', 'instability_50', 'instability_150', ] T_WIN = 32 C_IN = 8 # 7 FEATURE + proxy_B (no ExF) class ConvNext5sSensor: """ Stateless inference wrapper for the 5s ConvNeXt model. No ExF channels — 8-channel input only. Thread-safe (model weights are read-only numpy). """ def __init__(self, model_path: str): with open(model_path) as f: meta = json.load(f) arch = meta.get('architecture', {}) self.model = ConvNeXtVAE( C_in = arch.get('C_in', C_IN), T_in = arch.get('T_in', T_WIN), z_dim = arch.get('z_dim', 32), base_ch = arch.get('base_ch', 32), n_blocks = arch.get('n_blocks', 3), seed = 42, ) self.model.load(model_path) self.norm_mean = np.array(meta['norm_mean'], dtype=np.float64) if 'norm_mean' in meta else None self.norm_std = np.array(meta['norm_std'], dtype=np.float64) if 'norm_std' in meta else None self.epoch = meta.get('epoch', '?') self.val_loss = meta.get('val_loss', float('nan')) self.z_dim = arch.get('z_dim', 32) # ── low-level: encode a (C_IN, T_WIN) array ────────────────────────────── def encode_raw(self, arr: np.ndarray): """ arr: (C_IN, T_WIN) float64, already in raw (un-normalised) units. Returns z_mu (z_dim,), z_post_std float. """ x = arr[np.newaxis].astype(np.float64) # (1, C, T) if self.norm_mean is not None: x = (x - self.norm_mean[None, :, None]) / self.norm_std[None, :, None] np.clip(x, -6.0, 6.0, out=x) z_mu, z_logvar = self.model.encode(x) # (1, D) z_post_std = float(np.exp(0.5 * z_logvar).mean()) return z_mu[0], z_post_std # ── high-level: encode from a 2D scan array ─────────────────────────────── def encode_scan_window(self, arr2d: np.ndarray): """ arr2d: (T_WIN, C_IN) or (T_WIN, 7) — rows from scan parquet. If arr2d has 7 columns, proxy_B (instability_50 - v750_lambda_max_velocity) is appended as ch7 before encoding. Returns ------- z_mu : (z_dim,) float64 z_post_std : float (>1 suggests OOD regime) """ arr2d = np.asarray(arr2d, dtype=np.float64) T_actual, n_cols = arr2d.shape if n_cols == 7: # Append proxy_B = instability_50 (col5) - v750_lambda_max_velocity (col3) proxy_b = arr2d[:, 5] - arr2d[:, 3] arr2d = np.concatenate([arr2d, proxy_b[:, np.newaxis]], axis=1) # (T, 8) # Pad / trim to T_WIN rows (zero-pad at the start if shorter) if T_actual < T_WIN: pad = np.zeros((T_WIN - T_actual, C_IN), dtype=np.float64) arr2d = np.concatenate([pad, arr2d], axis=0) else: arr2d = arr2d[-T_WIN:] # keep the most recent T_WIN rows return self.encode_raw(arr2d.T) # (C_IN, T_WIN) # ── find proxy_B dimension via correlation probe ────────────────────────── def find_proxy_b_dim(self, probe_windows: np.ndarray): """ Given probe_windows of shape (N, C_IN, T_WIN), find the z-dim most correlated with the mean proxy_B value (ch7 mean) across windows. Parameters ---------- probe_windows : (N, C_IN, T_WIN) float64 Returns ------- dim_idx : int — z-dim index with highest |r| corr : float — Pearson r with proxy_B mean """ N = len(probe_windows) if N == 0: return 0, 0.0 proxy_b_means = probe_windows[:, 7, :].mean(axis=1) # (N,) — mean of ch7 per window z_mus = [] for i in range(N): z_mu, _ = self.encode_raw(probe_windows[i]) z_mus.append(z_mu) z_mus = np.stack(z_mus, axis=0) # (N, z_dim) # Pearson r between each z-dim and proxy_B mean pb_centered = proxy_b_means - proxy_b_means.mean() pb_std = pb_centered.std() + 1e-12 best_dim = 0 best_corr = 0.0 for d in range(z_mus.shape[1]): zd = z_mus[:, d] zd_c = zd - zd.mean() zd_std = zd_c.std() + 1e-12 r = float((pb_centered * zd_c).mean() / (pb_std * zd_std)) if abs(r) > abs(best_corr): best_corr = r best_dim = d return best_dim, best_corr