Files
DOLPHIN/prod/docs/AGENT_SPEC_OBF_LIVE_SWITCHOVER.md

340 lines
14 KiB
Markdown
Raw Permalink Normal View History

# AGENT SPEC: OBF Live Switchover — MockOBProvider → HZOBProvider + step_live()
**Status**: Ready to implement
**Complexity**: Medium (~150 LOC across 2 files + tests)
**Blocking**: Live capital deployment (paper trading acceptable with Mock)
**Created**: 2026-03-26
---
## 1. Background & Current State
The OBF subsystem has **all infrastructure in place** but is wired with synthetic data:
| Component | Status |
|---|---|
| `obf_prefect_flow.py` | ✅ Running — pushes live L2 snapshots to `DOLPHIN_FEATURES["asset_{ASSET}_ob"]` at ~100ms |
| `HZOBProvider` (`hz_ob_provider.py`) | ✅ Exists — reads the correct HZ map and key format |
| `OBFeatureEngine` (`ob_features.py`) | ⚠️ Preload-only — no live streaming path |
| `nautilus_event_trader.py` | ❌ Wired to `MockOBProvider` with static biases |
**Root cause the switch is blocked**: `OBFeatureEngine.preload_date()` is the only ingestion path. It calls `provider.get_all_timestamps(asset)` to enumerate all snapshots upfront. `HZOBProvider.get_all_timestamps()` correctly returns `[]` (real-time has no history) — so `preload_date()` with `HZOBProvider` builds empty caches, and all downstream `get_placement/get_signal/get_market` calls return `None`.
---
## 2. HZ Payload Format (verified from `obf_prefect_flow.py`)
Key: `asset_{SYMBOL}_ob` in map `DOLPHIN_FEATURES`
```json
{
"timestamp": "2026-03-26T12:34:56.789000+00:00",
"bid_notional": [1234567.0, 987654.0, 876543.0, 765432.0, 654321.0],
"ask_notional": [1234567.0, 987654.0, 876543.0, 765432.0, 654321.0],
"bid_depth": [0.123, 0.456, 0.789, 1.012, 1.234],
"ask_depth": [0.123, 0.456, 0.789, 1.012, 1.234],
"_pushed_at": "2026-03-26T12:34:56.901000+00:00",
"_push_seq": 1711453296901
}
```
`HZOBProvider.get_snapshot()` already parses this and normalizes `timestamp` to a Unix float (ISO→float fix applied 2026-03-26).
---
## 3. What Needs to Be Built
### 3.1 Add `step_live()` to `OBFeatureEngine` (`ob_features.py`)
This is the **core change**. Add a new public method that:
1. Fetches fresh snapshots for all assets from the provider
2. Runs the same feature computation pipeline as `preload_date()`'s inner loop
3. Stores results in new live caches keyed by `bar_idx` (integer)
4. Updates `_median_depth_ref` incrementally via EMA
**Method signature**:
```python
def step_live(self, assets: List[str], bar_idx: int) -> None:
"""Fetch live snapshots and compute OBF features for the current bar.
Call this ONCE per scan event, BEFORE calling engine.step_bar().
Results are stored and retrievable via get_placement/get_signal/get_market(bar_idx).
"""
```
**Implementation steps inside `step_live()`**:
```python
def step_live(self, assets: List[str], bar_idx: int) -> None:
wall_ts = time.time()
asset_imbalances = []
asset_velocities = []
for asset in assets:
snap = self.provider.get_snapshot(asset, wall_ts)
if snap is None:
continue
# Initialise per-asset rolling histories on first call
if asset not in self._imbalance_history:
self._imbalance_history[asset] = deque(maxlen=self.IMBALANCE_LOOKBACK)
if asset not in self._depth_1pct_history:
self._depth_1pct_history[asset] = deque(maxlen=self.DEPTH_LOOKBACK)
# Incremental median_depth_ref via EMA (alpha=0.01 → ~100-bar half-life)
d1pct = compute_depth_1pct_nb(snap.bid_notional, snap.ask_notional)
if asset not in self._median_depth_ref:
self._median_depth_ref[asset] = d1pct
else:
self._median_depth_ref[asset] = (
0.99 * self._median_depth_ref[asset] + 0.01 * d1pct
)
# Feature kernels (same as preload_date inner loop)
imb = compute_imbalance_nb(snap.bid_notional, snap.ask_notional)
dq = compute_depth_quality_nb(d1pct, self._median_depth_ref[asset])
fp = compute_fill_probability_nb(dq)
sp = compute_spread_proxy_nb(snap.bid_notional, snap.ask_notional)
da = compute_depth_asymmetry_nb(snap.bid_notional, snap.ask_notional)
self._imbalance_history[asset].append(imb)
self._depth_1pct_history[asset].append(d1pct)
imb_arr = np.array(self._imbalance_history[asset], dtype=np.float64)
ma5_n = min(5, len(imb_arr))
imb_ma5 = float(np.mean(imb_arr[-ma5_n:])) if ma5_n > 0 else imb
persist = compute_imbalance_persistence_nb(imb_arr, self.IMBALANCE_LOOKBACK)
dep_arr = np.array(self._depth_1pct_history[asset], dtype=np.float64)
velocity = compute_withdrawal_velocity_nb(
dep_arr, min(self.DEPTH_LOOKBACK, len(dep_arr) - 1)
)
# Store in live caches
if asset not in self._live_placement:
self._live_placement[asset] = {}
if asset not in self._live_signal:
self._live_signal[asset] = {}
self._live_placement[asset][bar_idx] = OBPlacementFeatures(
depth_1pct_usd=d1pct, depth_quality=dq,
fill_probability=fp, spread_proxy_bps=sp,
)
self._live_signal[asset][bar_idx] = OBSignalFeatures(
imbalance=imb, imbalance_ma5=imb_ma5,
imbalance_persistence=persist, depth_asymmetry=da,
withdrawal_velocity=velocity,
)
asset_imbalances.append(imb)
asset_velocities.append(velocity)
# Cross-asset macro (Sub-3 + Sub-4)
if asset_imbalances:
imb_arr_cross = np.array(asset_imbalances, dtype=np.float64)
vel_arr_cross = np.array(asset_velocities, dtype=np.float64)
n = len(asset_imbalances)
med_imb, agreement = compute_market_agreement_nb(imb_arr_cross, n)
cascade = compute_cascade_signal_nb(vel_arr_cross, n, self.CASCADE_THRESHOLD)
# Update macro depth history
if not hasattr(self, '_live_macro_depth_hist'):
self._live_macro_depth_hist = deque(maxlen=self.DEPTH_LOOKBACK)
agg_depth = float(np.mean([
self._median_depth_ref.get(a, 0.0) for a in assets
]))
self._live_macro_depth_hist.append(agg_depth)
macro_dep_arr = np.array(self._live_macro_depth_hist, dtype=np.float64)
depth_vel = compute_withdrawal_velocity_nb(
macro_dep_arr, min(self.DEPTH_LOOKBACK, len(macro_dep_arr) - 1)
)
# acceleration: simple first-difference of velocity
if not hasattr(self, '_live_macro_vel_prev'):
self._live_macro_vel_prev = depth_vel
accel = depth_vel - self._live_macro_vel_prev
self._live_macro_vel_prev = depth_vel
if not hasattr(self, '_live_macro'):
self._live_macro = {}
self._live_macro[bar_idx] = OBMacroFeatures(
median_imbalance=med_imb, agreement_pct=agreement,
depth_pressure=float(np.sum(imb_arr_cross)),
cascade_regime=cascade,
depth_velocity=depth_vel, acceleration=accel,
)
self._live_mode = True
self._live_bar_idx = bar_idx
```
**New instance variables to initialise in `__init__`** (add after existing init):
```python
self._live_placement: Dict[str, Dict[int, OBPlacementFeatures]] = {}
self._live_signal: Dict[str, Dict[int, OBSignalFeatures]] = {}
self._live_macro: Dict[int, OBMacroFeatures] = {}
self._live_mode: bool = False
self._live_bar_idx: int = -1
self._live_macro_depth_hist: deque = deque(maxlen=self.DEPTH_LOOKBACK)
self._live_macro_vel_prev: float = 0.0
```
### 3.2 Modify `_resolve_idx()` to handle live bar lookups
In `_resolve_idx()` (currently line 549), add a live-mode branch **before** the existing logic:
```python
def _resolve_idx(self, asset: str, timestamp_or_idx: float) -> Optional[int]:
# Live mode: bar_idx is the key directly (small integers, no ts_to_idx lookup)
if self._live_mode:
bar = int(timestamp_or_idx)
if asset in self._live_placement and bar in self._live_placement[asset]:
return bar
# Fall back to latest known bar (graceful degradation)
if asset in self._live_placement and self._live_placement[asset]:
return max(self._live_placement[asset].keys())
return None
# ... existing preload logic unchanged below ...
```
### 3.3 Modify `get_placement()`, `get_signal()`, `get_market()`, `get_macro()` to use live caches
Each method currently reads from `_preloaded_placement[asset][idx]`. Add a live-mode branch:
```python
def get_placement(self, asset: str, timestamp_or_idx: float) -> OBPlacementFeatures:
idx = self._resolve_idx(asset, timestamp_or_idx)
if idx is None:
return OBPlacementFeatures(...) # defaults (same as today)
if self._live_mode:
return self._live_placement.get(asset, {}).get(idx, OBPlacementFeatures(...))
return self._preloaded_placement.get(asset, {}).get(idx, OBPlacementFeatures(...))
```
Apply same pattern to `get_signal()`, `get_market()`, `get_macro()`.
### 3.4 Update `nautilus_event_trader.py` — `_wire_obf()`
Replace `MockOBProvider` with `HZOBProvider`:
```python
def _wire_obf(self, assets):
if not assets or self.ob_assets:
return
self.ob_assets = assets
from nautilus_dolphin.nautilus.hz_ob_provider import HZOBProvider
live_ob = HZOBProvider(
hz_cluster=HZ_CLUSTER,
hz_host=HZ_HOST,
assets=assets,
)
self.ob_eng = OBFeatureEngine(live_ob)
# No preload_date() call — live mode uses step_live() per scan
self.eng.set_ob_engine(self.ob_eng)
log(f" OBF wired: HZOBProvider, {len(assets)} assets (LIVE mode)")
```
Store `self.ob_eng` on `DolphinLiveTrader` so it can be called from `on_scan`.
### 3.5 Call `step_live()` in `on_scan()` before `step_bar()`
In `DolphinLiveTrader.on_scan()`, after `self._rollover_day()` and `_wire_obf()`, add:
```python
# Feed live OB data into OBF engine for this bar
if self.ob_eng is not None and self.ob_assets:
self.ob_eng.step_live(self.ob_assets, self.bar_idx)
```
This must happen **before** the `eng.step_bar()` call so OBF features are fresh for this bar.
---
## 4. Live Cache Eviction (Memory Management)
`_live_placement/signal/macro` grow unboundedly as dicts. Add LRU eviction — keep only the last `N=500` bar_idx entries:
```python
# At end of step_live(), after storing:
MAX_LIVE_CACHE = 500
for asset in list(self._live_placement.keys()):
if len(self._live_placement[asset]) > MAX_LIVE_CACHE:
oldest = sorted(self._live_placement[asset].keys())[:-MAX_LIVE_CACHE]
for k in oldest:
del self._live_placement[asset][k]
# Same for _live_signal, _live_macro
```
---
## 5. Staleness Guard
If `obf_prefect_flow.py` is down, `HZOBProvider.get_snapshot()` returns `None` for all assets (graceful). `step_live()` skips assets with no snapshot. The engine falls back to `ob_engine is None` behaviour (random 40% pass at `ob_confirm_rate`).
Add a staleness warning log in `step_live()` if 0 snapshots were fetched for more than 3 consecutive bars:
```python
if fetched_count == 0:
self._live_stale_count = getattr(self, '_live_stale_count', 0) + 1
if self._live_stale_count >= 3:
logger.warning("OBF step_live: no snapshots for %d bars — OBF gate degraded to random", self._live_stale_count)
else:
self._live_stale_count = 0
```
---
## 6. Files to Modify
| File | Full Path | Change |
|---|---|---|
| `ob_features.py` | `/mnt/dolphinng5_predict/nautilus_dolphin/nautilus_dolphin/nautilus/ob_features.py` | Add `step_live()`, live caches in `__init__`, live branch in `_resolve_idx/get_*` |
| `nautilus_event_trader.py` | `/mnt/dolphinng5_predict/prod/nautilus_event_trader.py` | `_wire_obf()``HZOBProvider`; add `self.ob_eng`; call `ob_eng.step_live()` in `on_scan` |
| `hz_ob_provider.py` | `/mnt/dolphinng5_predict/nautilus_dolphin/nautilus_dolphin/nautilus/hz_ob_provider.py` | Timestamp ISO→float normalization (DONE 2026-03-26) |
**Do NOT modify**:
- `/mnt/dolphinng5_predict/nautilus_dolphin/nautilus_dolphin/nautilus/alpha_orchestrator.py``set_ob_engine()` / `get_placement()` calls unchanged
- `/mnt/dolphinng5_predict/prod/obf_prefect_flow.py` — already writing correct format
- `/mnt/dolphinng5_predict/nautilus_dolphin/nautilus_dolphin/nautilus/dolphin_actor.py` — paper mode uses `preload_date()` which stays as-is
---
## 7. Tests to Write
In `/mnt/dolphinng5_predict/nautilus_dolphin/tests/test_hz_ob_provider_live.py`:
```
test_step_live_fetches_snapshots — mock HZOBProvider returns valid OBSnapshot
test_step_live_populates_placement_cache — after step_live(bar_idx=5), get_placement(asset, 5.0) returns non-default
test_step_live_populates_signal_cache — imbalance, persistence populated
test_step_live_market_features — agreement_pct and cascade computed
test_step_live_none_snapshot_skipped — provider returns None → asset skipped gracefully
test_step_live_stale_warning — 3 consecutive empty → warning logged
test_step_live_cache_eviction — after 501 bars, oldest entries deleted
test_resolve_idx_live_mode — live mode returns bar_idx directly
test_resolve_idx_live_fallback — unknown bar_idx → latest bar returned
test_median_depth_ema — _median_depth_ref converges via EMA
test_hz_ob_provider_timestamp_iso — ISO string timestamp normalised to float
test_hz_ob_provider_timestamp_float — float timestamp passes through unchanged
```
---
## 8. Verification After Implementation
1. Start `obf_prefect_flow.py` (confirm running via supervisorctl)
2. Check HZ: `DOLPHIN_FEATURES["asset_BTCUSDT_ob"]` has fresh data (< 10s old)
3. Run `nautilus_event_trader.py` — look for `OBF wired: HZOBProvider` in log
4. On first scan, look for no errors in `step_live()`
5. After 10 scans: `get_placement("BTCUSDT", bar_idx)` should return non-zero `fill_probability`
6. Compare ob_edge decisions vs Mock run — expect variance (live book reacts to market)
---
## 9. Data Quality Caveat (preserved from assessment 2026-03-26)
> **IMPORTANT**: Until this spec is implemented, OBF runs on `MockOBProvider` with static per-asset imbalance biases (BTC=-0.086, ETH=-0.092, BNB=+0.05, SOL=+0.05). All four OBF functional dimensions compute and produce real outputs feeding the alpha gate — but with frozen, market-unresponsive inputs. The OB cascade regime will always be CALM (no depth drain in mock data). This is acceptable for paper trading; it is NOT acceptable for live capital deployment.
---
*Created: 2026-03-26*
*Author: Claude (session Final_ND-Trader_Check)*