# AGENT SPEC: OBF Live Switchover — MockOBProvider → HZOBProvider + step_live() **Status**: Ready to implement **Complexity**: Medium (~150 LOC across 2 files + tests) **Blocking**: Live capital deployment (paper trading acceptable with Mock) **Created**: 2026-03-26 --- ## 1. Background & Current State The OBF subsystem has **all infrastructure in place** but is wired with synthetic data: | Component | Status | |---|---| | `obf_prefect_flow.py` | ✅ Running — pushes live L2 snapshots to `DOLPHIN_FEATURES["asset_{ASSET}_ob"]` at ~100ms | | `HZOBProvider` (`hz_ob_provider.py`) | ✅ Exists — reads the correct HZ map and key format | | `OBFeatureEngine` (`ob_features.py`) | ⚠️ Preload-only — no live streaming path | | `nautilus_event_trader.py` | ❌ Wired to `MockOBProvider` with static biases | **Root cause the switch is blocked**: `OBFeatureEngine.preload_date()` is the only ingestion path. It calls `provider.get_all_timestamps(asset)` to enumerate all snapshots upfront. `HZOBProvider.get_all_timestamps()` correctly returns `[]` (real-time has no history) — so `preload_date()` with `HZOBProvider` builds empty caches, and all downstream `get_placement/get_signal/get_market` calls return `None`. --- ## 2. HZ Payload Format (verified from `obf_prefect_flow.py`) Key: `asset_{SYMBOL}_ob` in map `DOLPHIN_FEATURES` ```json { "timestamp": "2026-03-26T12:34:56.789000+00:00", "bid_notional": [1234567.0, 987654.0, 876543.0, 765432.0, 654321.0], "ask_notional": [1234567.0, 987654.0, 876543.0, 765432.0, 654321.0], "bid_depth": [0.123, 0.456, 0.789, 1.012, 1.234], "ask_depth": [0.123, 0.456, 0.789, 1.012, 1.234], "_pushed_at": "2026-03-26T12:34:56.901000+00:00", "_push_seq": 1711453296901 } ``` `HZOBProvider.get_snapshot()` already parses this and normalizes `timestamp` to a Unix float (ISO→float fix applied 2026-03-26). --- ## 3. What Needs to Be Built ### 3.1 Add `step_live()` to `OBFeatureEngine` (`ob_features.py`) This is the **core change**. Add a new public method that: 1. Fetches fresh snapshots for all assets from the provider 2. Runs the same feature computation pipeline as `preload_date()`'s inner loop 3. Stores results in new live caches keyed by `bar_idx` (integer) 4. Updates `_median_depth_ref` incrementally via EMA **Method signature**: ```python def step_live(self, assets: List[str], bar_idx: int) -> None: """Fetch live snapshots and compute OBF features for the current bar. Call this ONCE per scan event, BEFORE calling engine.step_bar(). Results are stored and retrievable via get_placement/get_signal/get_market(bar_idx). """ ``` **Implementation steps inside `step_live()`**: ```python def step_live(self, assets: List[str], bar_idx: int) -> None: wall_ts = time.time() asset_imbalances = [] asset_velocities = [] for asset in assets: snap = self.provider.get_snapshot(asset, wall_ts) if snap is None: continue # Initialise per-asset rolling histories on first call if asset not in self._imbalance_history: self._imbalance_history[asset] = deque(maxlen=self.IMBALANCE_LOOKBACK) if asset not in self._depth_1pct_history: self._depth_1pct_history[asset] = deque(maxlen=self.DEPTH_LOOKBACK) # Incremental median_depth_ref via EMA (alpha=0.01 → ~100-bar half-life) d1pct = compute_depth_1pct_nb(snap.bid_notional, snap.ask_notional) if asset not in self._median_depth_ref: self._median_depth_ref[asset] = d1pct else: self._median_depth_ref[asset] = ( 0.99 * self._median_depth_ref[asset] + 0.01 * d1pct ) # Feature kernels (same as preload_date inner loop) imb = compute_imbalance_nb(snap.bid_notional, snap.ask_notional) dq = compute_depth_quality_nb(d1pct, self._median_depth_ref[asset]) fp = compute_fill_probability_nb(dq) sp = compute_spread_proxy_nb(snap.bid_notional, snap.ask_notional) da = compute_depth_asymmetry_nb(snap.bid_notional, snap.ask_notional) self._imbalance_history[asset].append(imb) self._depth_1pct_history[asset].append(d1pct) imb_arr = np.array(self._imbalance_history[asset], dtype=np.float64) ma5_n = min(5, len(imb_arr)) imb_ma5 = float(np.mean(imb_arr[-ma5_n:])) if ma5_n > 0 else imb persist = compute_imbalance_persistence_nb(imb_arr, self.IMBALANCE_LOOKBACK) dep_arr = np.array(self._depth_1pct_history[asset], dtype=np.float64) velocity = compute_withdrawal_velocity_nb( dep_arr, min(self.DEPTH_LOOKBACK, len(dep_arr) - 1) ) # Store in live caches if asset not in self._live_placement: self._live_placement[asset] = {} if asset not in self._live_signal: self._live_signal[asset] = {} self._live_placement[asset][bar_idx] = OBPlacementFeatures( depth_1pct_usd=d1pct, depth_quality=dq, fill_probability=fp, spread_proxy_bps=sp, ) self._live_signal[asset][bar_idx] = OBSignalFeatures( imbalance=imb, imbalance_ma5=imb_ma5, imbalance_persistence=persist, depth_asymmetry=da, withdrawal_velocity=velocity, ) asset_imbalances.append(imb) asset_velocities.append(velocity) # Cross-asset macro (Sub-3 + Sub-4) if asset_imbalances: imb_arr_cross = np.array(asset_imbalances, dtype=np.float64) vel_arr_cross = np.array(asset_velocities, dtype=np.float64) n = len(asset_imbalances) med_imb, agreement = compute_market_agreement_nb(imb_arr_cross, n) cascade = compute_cascade_signal_nb(vel_arr_cross, n, self.CASCADE_THRESHOLD) # Update macro depth history if not hasattr(self, '_live_macro_depth_hist'): self._live_macro_depth_hist = deque(maxlen=self.DEPTH_LOOKBACK) agg_depth = float(np.mean([ self._median_depth_ref.get(a, 0.0) for a in assets ])) self._live_macro_depth_hist.append(agg_depth) macro_dep_arr = np.array(self._live_macro_depth_hist, dtype=np.float64) depth_vel = compute_withdrawal_velocity_nb( macro_dep_arr, min(self.DEPTH_LOOKBACK, len(macro_dep_arr) - 1) ) # acceleration: simple first-difference of velocity if not hasattr(self, '_live_macro_vel_prev'): self._live_macro_vel_prev = depth_vel accel = depth_vel - self._live_macro_vel_prev self._live_macro_vel_prev = depth_vel if not hasattr(self, '_live_macro'): self._live_macro = {} self._live_macro[bar_idx] = OBMacroFeatures( median_imbalance=med_imb, agreement_pct=agreement, depth_pressure=float(np.sum(imb_arr_cross)), cascade_regime=cascade, depth_velocity=depth_vel, acceleration=accel, ) self._live_mode = True self._live_bar_idx = bar_idx ``` **New instance variables to initialise in `__init__`** (add after existing init): ```python self._live_placement: Dict[str, Dict[int, OBPlacementFeatures]] = {} self._live_signal: Dict[str, Dict[int, OBSignalFeatures]] = {} self._live_macro: Dict[int, OBMacroFeatures] = {} self._live_mode: bool = False self._live_bar_idx: int = -1 self._live_macro_depth_hist: deque = deque(maxlen=self.DEPTH_LOOKBACK) self._live_macro_vel_prev: float = 0.0 ``` ### 3.2 Modify `_resolve_idx()` to handle live bar lookups In `_resolve_idx()` (currently line 549), add a live-mode branch **before** the existing logic: ```python def _resolve_idx(self, asset: str, timestamp_or_idx: float) -> Optional[int]: # Live mode: bar_idx is the key directly (small integers, no ts_to_idx lookup) if self._live_mode: bar = int(timestamp_or_idx) if asset in self._live_placement and bar in self._live_placement[asset]: return bar # Fall back to latest known bar (graceful degradation) if asset in self._live_placement and self._live_placement[asset]: return max(self._live_placement[asset].keys()) return None # ... existing preload logic unchanged below ... ``` ### 3.3 Modify `get_placement()`, `get_signal()`, `get_market()`, `get_macro()` to use live caches Each method currently reads from `_preloaded_placement[asset][idx]`. Add a live-mode branch: ```python def get_placement(self, asset: str, timestamp_or_idx: float) -> OBPlacementFeatures: idx = self._resolve_idx(asset, timestamp_or_idx) if idx is None: return OBPlacementFeatures(...) # defaults (same as today) if self._live_mode: return self._live_placement.get(asset, {}).get(idx, OBPlacementFeatures(...)) return self._preloaded_placement.get(asset, {}).get(idx, OBPlacementFeatures(...)) ``` Apply same pattern to `get_signal()`, `get_market()`, `get_macro()`. ### 3.4 Update `nautilus_event_trader.py` — `_wire_obf()` Replace `MockOBProvider` with `HZOBProvider`: ```python def _wire_obf(self, assets): if not assets or self.ob_assets: return self.ob_assets = assets from nautilus_dolphin.nautilus.hz_ob_provider import HZOBProvider live_ob = HZOBProvider( hz_cluster=HZ_CLUSTER, hz_host=HZ_HOST, assets=assets, ) self.ob_eng = OBFeatureEngine(live_ob) # No preload_date() call — live mode uses step_live() per scan self.eng.set_ob_engine(self.ob_eng) log(f" OBF wired: HZOBProvider, {len(assets)} assets (LIVE mode)") ``` Store `self.ob_eng` on `DolphinLiveTrader` so it can be called from `on_scan`. ### 3.5 Call `step_live()` in `on_scan()` before `step_bar()` In `DolphinLiveTrader.on_scan()`, after `self._rollover_day()` and `_wire_obf()`, add: ```python # Feed live OB data into OBF engine for this bar if self.ob_eng is not None and self.ob_assets: self.ob_eng.step_live(self.ob_assets, self.bar_idx) ``` This must happen **before** the `eng.step_bar()` call so OBF features are fresh for this bar. --- ## 4. Live Cache Eviction (Memory Management) `_live_placement/signal/macro` grow unboundedly as dicts. Add LRU eviction — keep only the last `N=500` bar_idx entries: ```python # At end of step_live(), after storing: MAX_LIVE_CACHE = 500 for asset in list(self._live_placement.keys()): if len(self._live_placement[asset]) > MAX_LIVE_CACHE: oldest = sorted(self._live_placement[asset].keys())[:-MAX_LIVE_CACHE] for k in oldest: del self._live_placement[asset][k] # Same for _live_signal, _live_macro ``` --- ## 5. Staleness Guard If `obf_prefect_flow.py` is down, `HZOBProvider.get_snapshot()` returns `None` for all assets (graceful). `step_live()` skips assets with no snapshot. The engine falls back to `ob_engine is None` behaviour (random 40% pass at `ob_confirm_rate`). Add a staleness warning log in `step_live()` if 0 snapshots were fetched for more than 3 consecutive bars: ```python if fetched_count == 0: self._live_stale_count = getattr(self, '_live_stale_count', 0) + 1 if self._live_stale_count >= 3: logger.warning("OBF step_live: no snapshots for %d bars — OBF gate degraded to random", self._live_stale_count) else: self._live_stale_count = 0 ``` --- ## 6. Files to Modify | File | Full Path | Change | |---|---|---| | `ob_features.py` | `/mnt/dolphinng5_predict/nautilus_dolphin/nautilus_dolphin/nautilus/ob_features.py` | Add `step_live()`, live caches in `__init__`, live branch in `_resolve_idx/get_*` | | `nautilus_event_trader.py` | `/mnt/dolphinng5_predict/prod/nautilus_event_trader.py` | `_wire_obf()` → `HZOBProvider`; add `self.ob_eng`; call `ob_eng.step_live()` in `on_scan` | | `hz_ob_provider.py` | `/mnt/dolphinng5_predict/nautilus_dolphin/nautilus_dolphin/nautilus/hz_ob_provider.py` | Timestamp ISO→float normalization (DONE 2026-03-26) | **Do NOT modify**: - `/mnt/dolphinng5_predict/nautilus_dolphin/nautilus_dolphin/nautilus/alpha_orchestrator.py` — `set_ob_engine()` / `get_placement()` calls unchanged - `/mnt/dolphinng5_predict/prod/obf_prefect_flow.py` — already writing correct format - `/mnt/dolphinng5_predict/nautilus_dolphin/nautilus_dolphin/nautilus/dolphin_actor.py` — paper mode uses `preload_date()` which stays as-is --- ## 7. Tests to Write In `/mnt/dolphinng5_predict/nautilus_dolphin/tests/test_hz_ob_provider_live.py`: ``` test_step_live_fetches_snapshots — mock HZOBProvider returns valid OBSnapshot test_step_live_populates_placement_cache — after step_live(bar_idx=5), get_placement(asset, 5.0) returns non-default test_step_live_populates_signal_cache — imbalance, persistence populated test_step_live_market_features — agreement_pct and cascade computed test_step_live_none_snapshot_skipped — provider returns None → asset skipped gracefully test_step_live_stale_warning — 3 consecutive empty → warning logged test_step_live_cache_eviction — after 501 bars, oldest entries deleted test_resolve_idx_live_mode — live mode returns bar_idx directly test_resolve_idx_live_fallback — unknown bar_idx → latest bar returned test_median_depth_ema — _median_depth_ref converges via EMA test_hz_ob_provider_timestamp_iso — ISO string timestamp normalised to float test_hz_ob_provider_timestamp_float — float timestamp passes through unchanged ``` --- ## 8. Verification After Implementation 1. Start `obf_prefect_flow.py` (confirm running via supervisorctl) 2. Check HZ: `DOLPHIN_FEATURES["asset_BTCUSDT_ob"]` has fresh data (< 10s old) 3. Run `nautilus_event_trader.py` — look for `OBF wired: HZOBProvider` in log 4. On first scan, look for no errors in `step_live()` 5. After 10 scans: `get_placement("BTCUSDT", bar_idx)` should return non-zero `fill_probability` 6. Compare ob_edge decisions vs Mock run — expect variance (live book reacts to market) --- ## 9. Data Quality Caveat (preserved from assessment 2026-03-26) > **IMPORTANT**: Until this spec is implemented, OBF runs on `MockOBProvider` with static per-asset imbalance biases (BTC=-0.086, ETH=-0.092, BNB=+0.05, SOL=+0.05). All four OBF functional dimensions compute and produce real outputs feeding the alpha gate — but with frozen, market-unresponsive inputs. The OB cascade regime will always be CALM (no depth drain in mock data). This is acceptable for paper trading; it is NOT acceptable for live capital deployment. --- *Created: 2026-03-26* *Author: Claude (session Final_ND-Trader_Check)*