340 lines
14 KiB
Markdown
340 lines
14 KiB
Markdown
|
|
# AGENT SPEC: OBF Live Switchover — MockOBProvider → HZOBProvider + step_live()
|
||
|
|
|
||
|
|
**Status**: Ready to implement
|
||
|
|
**Complexity**: Medium (~150 LOC across 2 files + tests)
|
||
|
|
**Blocking**: Live capital deployment (paper trading acceptable with Mock)
|
||
|
|
**Created**: 2026-03-26
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## 1. Background & Current State
|
||
|
|
|
||
|
|
The OBF subsystem has **all infrastructure in place** but is wired with synthetic data:
|
||
|
|
|
||
|
|
| Component | Status |
|
||
|
|
|---|---|
|
||
|
|
| `obf_prefect_flow.py` | ✅ Running — pushes live L2 snapshots to `DOLPHIN_FEATURES["asset_{ASSET}_ob"]` at ~100ms |
|
||
|
|
| `HZOBProvider` (`hz_ob_provider.py`) | ✅ Exists — reads the correct HZ map and key format |
|
||
|
|
| `OBFeatureEngine` (`ob_features.py`) | ⚠️ Preload-only — no live streaming path |
|
||
|
|
| `nautilus_event_trader.py` | ❌ Wired to `MockOBProvider` with static biases |
|
||
|
|
|
||
|
|
**Root cause the switch is blocked**: `OBFeatureEngine.preload_date()` is the only ingestion path. It calls `provider.get_all_timestamps(asset)` to enumerate all snapshots upfront. `HZOBProvider.get_all_timestamps()` correctly returns `[]` (real-time has no history) — so `preload_date()` with `HZOBProvider` builds empty caches, and all downstream `get_placement/get_signal/get_market` calls return `None`.
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## 2. HZ Payload Format (verified from `obf_prefect_flow.py`)
|
||
|
|
|
||
|
|
Key: `asset_{SYMBOL}_ob` in map `DOLPHIN_FEATURES`
|
||
|
|
|
||
|
|
```json
|
||
|
|
{
|
||
|
|
"timestamp": "2026-03-26T12:34:56.789000+00:00",
|
||
|
|
"bid_notional": [1234567.0, 987654.0, 876543.0, 765432.0, 654321.0],
|
||
|
|
"ask_notional": [1234567.0, 987654.0, 876543.0, 765432.0, 654321.0],
|
||
|
|
"bid_depth": [0.123, 0.456, 0.789, 1.012, 1.234],
|
||
|
|
"ask_depth": [0.123, 0.456, 0.789, 1.012, 1.234],
|
||
|
|
"_pushed_at": "2026-03-26T12:34:56.901000+00:00",
|
||
|
|
"_push_seq": 1711453296901
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
`HZOBProvider.get_snapshot()` already parses this and normalizes `timestamp` to a Unix float (ISO→float fix applied 2026-03-26).
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## 3. What Needs to Be Built
|
||
|
|
|
||
|
|
### 3.1 Add `step_live()` to `OBFeatureEngine` (`ob_features.py`)
|
||
|
|
|
||
|
|
This is the **core change**. Add a new public method that:
|
||
|
|
1. Fetches fresh snapshots for all assets from the provider
|
||
|
|
2. Runs the same feature computation pipeline as `preload_date()`'s inner loop
|
||
|
|
3. Stores results in new live caches keyed by `bar_idx` (integer)
|
||
|
|
4. Updates `_median_depth_ref` incrementally via EMA
|
||
|
|
|
||
|
|
**Method signature**:
|
||
|
|
```python
|
||
|
|
def step_live(self, assets: List[str], bar_idx: int) -> None:
|
||
|
|
"""Fetch live snapshots and compute OBF features for the current bar.
|
||
|
|
|
||
|
|
Call this ONCE per scan event, BEFORE calling engine.step_bar().
|
||
|
|
Results are stored and retrievable via get_placement/get_signal/get_market(bar_idx).
|
||
|
|
"""
|
||
|
|
```
|
||
|
|
|
||
|
|
**Implementation steps inside `step_live()`**:
|
||
|
|
|
||
|
|
```python
|
||
|
|
def step_live(self, assets: List[str], bar_idx: int) -> None:
|
||
|
|
wall_ts = time.time()
|
||
|
|
asset_imbalances = []
|
||
|
|
asset_velocities = []
|
||
|
|
|
||
|
|
for asset in assets:
|
||
|
|
snap = self.provider.get_snapshot(asset, wall_ts)
|
||
|
|
if snap is None:
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Initialise per-asset rolling histories on first call
|
||
|
|
if asset not in self._imbalance_history:
|
||
|
|
self._imbalance_history[asset] = deque(maxlen=self.IMBALANCE_LOOKBACK)
|
||
|
|
if asset not in self._depth_1pct_history:
|
||
|
|
self._depth_1pct_history[asset] = deque(maxlen=self.DEPTH_LOOKBACK)
|
||
|
|
|
||
|
|
# Incremental median_depth_ref via EMA (alpha=0.01 → ~100-bar half-life)
|
||
|
|
d1pct = compute_depth_1pct_nb(snap.bid_notional, snap.ask_notional)
|
||
|
|
if asset not in self._median_depth_ref:
|
||
|
|
self._median_depth_ref[asset] = d1pct
|
||
|
|
else:
|
||
|
|
self._median_depth_ref[asset] = (
|
||
|
|
0.99 * self._median_depth_ref[asset] + 0.01 * d1pct
|
||
|
|
)
|
||
|
|
|
||
|
|
# Feature kernels (same as preload_date inner loop)
|
||
|
|
imb = compute_imbalance_nb(snap.bid_notional, snap.ask_notional)
|
||
|
|
dq = compute_depth_quality_nb(d1pct, self._median_depth_ref[asset])
|
||
|
|
fp = compute_fill_probability_nb(dq)
|
||
|
|
sp = compute_spread_proxy_nb(snap.bid_notional, snap.ask_notional)
|
||
|
|
da = compute_depth_asymmetry_nb(snap.bid_notional, snap.ask_notional)
|
||
|
|
|
||
|
|
self._imbalance_history[asset].append(imb)
|
||
|
|
self._depth_1pct_history[asset].append(d1pct)
|
||
|
|
|
||
|
|
imb_arr = np.array(self._imbalance_history[asset], dtype=np.float64)
|
||
|
|
ma5_n = min(5, len(imb_arr))
|
||
|
|
imb_ma5 = float(np.mean(imb_arr[-ma5_n:])) if ma5_n > 0 else imb
|
||
|
|
persist = compute_imbalance_persistence_nb(imb_arr, self.IMBALANCE_LOOKBACK)
|
||
|
|
dep_arr = np.array(self._depth_1pct_history[asset], dtype=np.float64)
|
||
|
|
velocity = compute_withdrawal_velocity_nb(
|
||
|
|
dep_arr, min(self.DEPTH_LOOKBACK, len(dep_arr) - 1)
|
||
|
|
)
|
||
|
|
|
||
|
|
# Store in live caches
|
||
|
|
if asset not in self._live_placement:
|
||
|
|
self._live_placement[asset] = {}
|
||
|
|
if asset not in self._live_signal:
|
||
|
|
self._live_signal[asset] = {}
|
||
|
|
|
||
|
|
self._live_placement[asset][bar_idx] = OBPlacementFeatures(
|
||
|
|
depth_1pct_usd=d1pct, depth_quality=dq,
|
||
|
|
fill_probability=fp, spread_proxy_bps=sp,
|
||
|
|
)
|
||
|
|
self._live_signal[asset][bar_idx] = OBSignalFeatures(
|
||
|
|
imbalance=imb, imbalance_ma5=imb_ma5,
|
||
|
|
imbalance_persistence=persist, depth_asymmetry=da,
|
||
|
|
withdrawal_velocity=velocity,
|
||
|
|
)
|
||
|
|
|
||
|
|
asset_imbalances.append(imb)
|
||
|
|
asset_velocities.append(velocity)
|
||
|
|
|
||
|
|
# Cross-asset macro (Sub-3 + Sub-4)
|
||
|
|
if asset_imbalances:
|
||
|
|
imb_arr_cross = np.array(asset_imbalances, dtype=np.float64)
|
||
|
|
vel_arr_cross = np.array(asset_velocities, dtype=np.float64)
|
||
|
|
n = len(asset_imbalances)
|
||
|
|
med_imb, agreement = compute_market_agreement_nb(imb_arr_cross, n)
|
||
|
|
cascade = compute_cascade_signal_nb(vel_arr_cross, n, self.CASCADE_THRESHOLD)
|
||
|
|
|
||
|
|
# Update macro depth history
|
||
|
|
if not hasattr(self, '_live_macro_depth_hist'):
|
||
|
|
self._live_macro_depth_hist = deque(maxlen=self.DEPTH_LOOKBACK)
|
||
|
|
agg_depth = float(np.mean([
|
||
|
|
self._median_depth_ref.get(a, 0.0) for a in assets
|
||
|
|
]))
|
||
|
|
self._live_macro_depth_hist.append(agg_depth)
|
||
|
|
macro_dep_arr = np.array(self._live_macro_depth_hist, dtype=np.float64)
|
||
|
|
depth_vel = compute_withdrawal_velocity_nb(
|
||
|
|
macro_dep_arr, min(self.DEPTH_LOOKBACK, len(macro_dep_arr) - 1)
|
||
|
|
)
|
||
|
|
# acceleration: simple first-difference of velocity
|
||
|
|
if not hasattr(self, '_live_macro_vel_prev'):
|
||
|
|
self._live_macro_vel_prev = depth_vel
|
||
|
|
accel = depth_vel - self._live_macro_vel_prev
|
||
|
|
self._live_macro_vel_prev = depth_vel
|
||
|
|
|
||
|
|
if not hasattr(self, '_live_macro'):
|
||
|
|
self._live_macro = {}
|
||
|
|
self._live_macro[bar_idx] = OBMacroFeatures(
|
||
|
|
median_imbalance=med_imb, agreement_pct=agreement,
|
||
|
|
depth_pressure=float(np.sum(imb_arr_cross)),
|
||
|
|
cascade_regime=cascade,
|
||
|
|
depth_velocity=depth_vel, acceleration=accel,
|
||
|
|
)
|
||
|
|
|
||
|
|
self._live_mode = True
|
||
|
|
self._live_bar_idx = bar_idx
|
||
|
|
```
|
||
|
|
|
||
|
|
**New instance variables to initialise in `__init__`** (add after existing init):
|
||
|
|
```python
|
||
|
|
self._live_placement: Dict[str, Dict[int, OBPlacementFeatures]] = {}
|
||
|
|
self._live_signal: Dict[str, Dict[int, OBSignalFeatures]] = {}
|
||
|
|
self._live_macro: Dict[int, OBMacroFeatures] = {}
|
||
|
|
self._live_mode: bool = False
|
||
|
|
self._live_bar_idx: int = -1
|
||
|
|
self._live_macro_depth_hist: deque = deque(maxlen=self.DEPTH_LOOKBACK)
|
||
|
|
self._live_macro_vel_prev: float = 0.0
|
||
|
|
```
|
||
|
|
|
||
|
|
### 3.2 Modify `_resolve_idx()` to handle live bar lookups
|
||
|
|
|
||
|
|
In `_resolve_idx()` (currently line 549), add a live-mode branch **before** the existing logic:
|
||
|
|
|
||
|
|
```python
|
||
|
|
def _resolve_idx(self, asset: str, timestamp_or_idx: float) -> Optional[int]:
|
||
|
|
# Live mode: bar_idx is the key directly (small integers, no ts_to_idx lookup)
|
||
|
|
if self._live_mode:
|
||
|
|
bar = int(timestamp_or_idx)
|
||
|
|
if asset in self._live_placement and bar in self._live_placement[asset]:
|
||
|
|
return bar
|
||
|
|
# Fall back to latest known bar (graceful degradation)
|
||
|
|
if asset in self._live_placement and self._live_placement[asset]:
|
||
|
|
return max(self._live_placement[asset].keys())
|
||
|
|
return None
|
||
|
|
# ... existing preload logic unchanged below ...
|
||
|
|
```
|
||
|
|
|
||
|
|
### 3.3 Modify `get_placement()`, `get_signal()`, `get_market()`, `get_macro()` to use live caches
|
||
|
|
|
||
|
|
Each method currently reads from `_preloaded_placement[asset][idx]`. Add a live-mode branch:
|
||
|
|
|
||
|
|
```python
|
||
|
|
def get_placement(self, asset: str, timestamp_or_idx: float) -> OBPlacementFeatures:
|
||
|
|
idx = self._resolve_idx(asset, timestamp_or_idx)
|
||
|
|
if idx is None:
|
||
|
|
return OBPlacementFeatures(...) # defaults (same as today)
|
||
|
|
if self._live_mode:
|
||
|
|
return self._live_placement.get(asset, {}).get(idx, OBPlacementFeatures(...))
|
||
|
|
return self._preloaded_placement.get(asset, {}).get(idx, OBPlacementFeatures(...))
|
||
|
|
```
|
||
|
|
|
||
|
|
Apply same pattern to `get_signal()`, `get_market()`, `get_macro()`.
|
||
|
|
|
||
|
|
### 3.4 Update `nautilus_event_trader.py` — `_wire_obf()`
|
||
|
|
|
||
|
|
Replace `MockOBProvider` with `HZOBProvider`:
|
||
|
|
|
||
|
|
```python
|
||
|
|
def _wire_obf(self, assets):
|
||
|
|
if not assets or self.ob_assets:
|
||
|
|
return
|
||
|
|
self.ob_assets = assets
|
||
|
|
from nautilus_dolphin.nautilus.hz_ob_provider import HZOBProvider
|
||
|
|
live_ob = HZOBProvider(
|
||
|
|
hz_cluster=HZ_CLUSTER,
|
||
|
|
hz_host=HZ_HOST,
|
||
|
|
assets=assets,
|
||
|
|
)
|
||
|
|
self.ob_eng = OBFeatureEngine(live_ob)
|
||
|
|
# No preload_date() call — live mode uses step_live() per scan
|
||
|
|
self.eng.set_ob_engine(self.ob_eng)
|
||
|
|
log(f" OBF wired: HZOBProvider, {len(assets)} assets (LIVE mode)")
|
||
|
|
```
|
||
|
|
|
||
|
|
Store `self.ob_eng` on `DolphinLiveTrader` so it can be called from `on_scan`.
|
||
|
|
|
||
|
|
### 3.5 Call `step_live()` in `on_scan()` before `step_bar()`
|
||
|
|
|
||
|
|
In `DolphinLiveTrader.on_scan()`, after `self._rollover_day()` and `_wire_obf()`, add:
|
||
|
|
|
||
|
|
```python
|
||
|
|
# Feed live OB data into OBF engine for this bar
|
||
|
|
if self.ob_eng is not None and self.ob_assets:
|
||
|
|
self.ob_eng.step_live(self.ob_assets, self.bar_idx)
|
||
|
|
```
|
||
|
|
|
||
|
|
This must happen **before** the `eng.step_bar()` call so OBF features are fresh for this bar.
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## 4. Live Cache Eviction (Memory Management)
|
||
|
|
|
||
|
|
`_live_placement/signal/macro` grow unboundedly as dicts. Add LRU eviction — keep only the last `N=500` bar_idx entries:
|
||
|
|
|
||
|
|
```python
|
||
|
|
# At end of step_live(), after storing:
|
||
|
|
MAX_LIVE_CACHE = 500
|
||
|
|
for asset in list(self._live_placement.keys()):
|
||
|
|
if len(self._live_placement[asset]) > MAX_LIVE_CACHE:
|
||
|
|
oldest = sorted(self._live_placement[asset].keys())[:-MAX_LIVE_CACHE]
|
||
|
|
for k in oldest:
|
||
|
|
del self._live_placement[asset][k]
|
||
|
|
# Same for _live_signal, _live_macro
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## 5. Staleness Guard
|
||
|
|
|
||
|
|
If `obf_prefect_flow.py` is down, `HZOBProvider.get_snapshot()` returns `None` for all assets (graceful). `step_live()` skips assets with no snapshot. The engine falls back to `ob_engine is None` behaviour (random 40% pass at `ob_confirm_rate`).
|
||
|
|
|
||
|
|
Add a staleness warning log in `step_live()` if 0 snapshots were fetched for more than 3 consecutive bars:
|
||
|
|
|
||
|
|
```python
|
||
|
|
if fetched_count == 0:
|
||
|
|
self._live_stale_count = getattr(self, '_live_stale_count', 0) + 1
|
||
|
|
if self._live_stale_count >= 3:
|
||
|
|
logger.warning("OBF step_live: no snapshots for %d bars — OBF gate degraded to random", self._live_stale_count)
|
||
|
|
else:
|
||
|
|
self._live_stale_count = 0
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## 6. Files to Modify
|
||
|
|
|
||
|
|
| File | Full Path | Change |
|
||
|
|
|---|---|---|
|
||
|
|
| `ob_features.py` | `/mnt/dolphinng5_predict/nautilus_dolphin/nautilus_dolphin/nautilus/ob_features.py` | Add `step_live()`, live caches in `__init__`, live branch in `_resolve_idx/get_*` |
|
||
|
|
| `nautilus_event_trader.py` | `/mnt/dolphinng5_predict/prod/nautilus_event_trader.py` | `_wire_obf()` → `HZOBProvider`; add `self.ob_eng`; call `ob_eng.step_live()` in `on_scan` |
|
||
|
|
| `hz_ob_provider.py` | `/mnt/dolphinng5_predict/nautilus_dolphin/nautilus_dolphin/nautilus/hz_ob_provider.py` | Timestamp ISO→float normalization (DONE 2026-03-26) |
|
||
|
|
|
||
|
|
**Do NOT modify**:
|
||
|
|
- `/mnt/dolphinng5_predict/nautilus_dolphin/nautilus_dolphin/nautilus/alpha_orchestrator.py` — `set_ob_engine()` / `get_placement()` calls unchanged
|
||
|
|
- `/mnt/dolphinng5_predict/prod/obf_prefect_flow.py` — already writing correct format
|
||
|
|
- `/mnt/dolphinng5_predict/nautilus_dolphin/nautilus_dolphin/nautilus/dolphin_actor.py` — paper mode uses `preload_date()` which stays as-is
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## 7. Tests to Write
|
||
|
|
|
||
|
|
In `/mnt/dolphinng5_predict/nautilus_dolphin/tests/test_hz_ob_provider_live.py`:
|
||
|
|
|
||
|
|
```
|
||
|
|
test_step_live_fetches_snapshots — mock HZOBProvider returns valid OBSnapshot
|
||
|
|
test_step_live_populates_placement_cache — after step_live(bar_idx=5), get_placement(asset, 5.0) returns non-default
|
||
|
|
test_step_live_populates_signal_cache — imbalance, persistence populated
|
||
|
|
test_step_live_market_features — agreement_pct and cascade computed
|
||
|
|
test_step_live_none_snapshot_skipped — provider returns None → asset skipped gracefully
|
||
|
|
test_step_live_stale_warning — 3 consecutive empty → warning logged
|
||
|
|
test_step_live_cache_eviction — after 501 bars, oldest entries deleted
|
||
|
|
test_resolve_idx_live_mode — live mode returns bar_idx directly
|
||
|
|
test_resolve_idx_live_fallback — unknown bar_idx → latest bar returned
|
||
|
|
test_median_depth_ema — _median_depth_ref converges via EMA
|
||
|
|
test_hz_ob_provider_timestamp_iso — ISO string timestamp normalised to float
|
||
|
|
test_hz_ob_provider_timestamp_float — float timestamp passes through unchanged
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## 8. Verification After Implementation
|
||
|
|
|
||
|
|
1. Start `obf_prefect_flow.py` (confirm running via supervisorctl)
|
||
|
|
2. Check HZ: `DOLPHIN_FEATURES["asset_BTCUSDT_ob"]` has fresh data (< 10s old)
|
||
|
|
3. Run `nautilus_event_trader.py` — look for `OBF wired: HZOBProvider` in log
|
||
|
|
4. On first scan, look for no errors in `step_live()`
|
||
|
|
5. After 10 scans: `get_placement("BTCUSDT", bar_idx)` should return non-zero `fill_probability`
|
||
|
|
6. Compare ob_edge decisions vs Mock run — expect variance (live book reacts to market)
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## 9. Data Quality Caveat (preserved from assessment 2026-03-26)
|
||
|
|
|
||
|
|
> **IMPORTANT**: Until this spec is implemented, OBF runs on `MockOBProvider` with static per-asset imbalance biases (BTC=-0.086, ETH=-0.092, BNB=+0.05, SOL=+0.05). All four OBF functional dimensions compute and produce real outputs feeding the alpha gate — but with frozen, market-unresponsive inputs. The OB cascade regime will always be CALM (no depth drain in mock data). This is acceptable for paper trading; it is NOT acceptable for live capital deployment.
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
*Created: 2026-03-26*
|
||
|
|
*Author: Claude (session Final_ND-Trader_Check)*
|