Files
DOLPHIN/prod/docs/OBF_SUBSYSTEM.md

1796 lines
79 KiB
Markdown
Raw Permalink Normal View History

# Order Book Feature (OBF) Subsystem — Complete Technical Reference
**Status:** Production-ready under Prefect — Sprint 1 hardening complete
**Last updated:** 2026-03-22 (sprint 1 complete — all P0/P1/P2 reliability gaps closed)
**Test coverage:** 120 unit tests + 56 E2E tests — all passing (18 new tests added in sprint 1)
**Data sources studied:** All files in `external_factors/`, `nautilus_dolphin/nautilus_dolphin/nautilus/`, `alpha_engine/execution/`, `prod/`
---
## Table of Contents
1. [Why This Subsystem Exists](#1-why-this-subsystem-exists)
2. [System Architecture Overview](#2-system-architecture-overview)
3. [Data Flow — End to End](#3-data-flow--end-to-end)
4. [Layer 1 — OBStreamService (Live L2 Book)](#4-layer-1--obstreamservice-live-l2-book)
5. [Layer 2 — OBSnapshot and OBProvider Hierarchy](#5-layer-2--obsnapshot-and-obprovider-hierarchy)
6. [Layer 3 — OBFeatureEngine (Batch Backtesting)](#6-layer-3--obfeatureengine-batch-backtesting)
7. [Layer 3B — LiveOBFeatureEngine (Streaming Production)](#7-layer-3b--liveobfeatureengine-streaming-production)
8. [Sub-system 1 — Per-Asset Placement Features](#8-sub-system-1--per-asset-placement-features)
9. [Sub-system 2 — Per-Asset Signal Features](#9-sub-system-2--per-asset-signal-features)
10. [Sub-system 3 — Cross-Asset Market Signal](#10-sub-system-3--cross-asset-market-signal)
11. [Sub-system 4 — Macro Regime / Market Structure](#11-sub-system-4--macro-regime--market-structure)
12. [Layer 4 — OBPlacer (Execution Advice)](#12-layer-4--obplacer-execution-advice)
13. [SmartPlacer Integration](#13-smartplacer-integration)
14. [BacktestAdapter and Fill Simulator](#14-backtestandapter-and-fill-simulator)
15. [Prefect Flow — obf_prefect_flow](#15-prefect-flow--obf_prefect_flow)
16. [AsyncOBThread — Async/Sync Bridge](#16-asyncobthread--asyncsync-bridge)
17. [OBFPersistenceService — Parquet Layer](#17-obfpersistenceservice--parquet-layer)
18. [Hazelcast Integration](#18-hazelcast-integration)
19. [Timing, Rate Limits, and Lag Detection](#19-timing-rate-limits-and-lag-detection)
20. [Parquet Schema — Complete Field Reference](#20-parquet-schema--complete-field-reference)
21. [Neutral Defaults and Graceful Degradation](#21-neutral-defaults-and-graceful-degradation)
22. [Import Architecture and the Shim Loader](#22-import-architecture-and-the-shim-loader)
23. [Test Architecture](#23-test-architecture)
24. [Invariants That Must Never Change](#24-invariants-that-must-never-change)
25. [Expected Performance Characteristics](#25-expected-performance-characteristics)
26. [Future: Nautilus Trader Integration](#26-future-nautilus-trader-integration)
27. [Deployment](#27-deployment)
28. [Known Caveats and Edge Cases](#28-known-caveats-and-edge-cases)
---
## 1. Why This Subsystem Exists
The alpha engine makes directional predictions on crypto futures. Those predictions need to be *executed*. Execution is not free:
| Method | Binance Futures fee | Notes |
|---|---|---|
| Taker (market order) | 0.05% per side | Immediate fill, crosses spread |
| Maker (limit order) | 0.02% per side | Passive fill, may not fill |
On a $9,375 leveraged position, the fee difference per side is `0.03% × $9,375 = $2.81`. Over hundreds of trades per week this is material. But a maker order that *never fills* is an opportunity cost equal to the entire signal's expected value.
The OBF subsystem answers two questions per trade signal:
1. **Is the book liquid enough to get a maker fill?** (Sub-systems 1 and 2)
2. **Is the order book confirming or contradicting the signal direction?** (Sub-systems 2, 3, and 4)
These answers feed `SmartPlacer`, which decides: `TAKER` / `MAKER` / `PATIENT_MAKER` / `SKIP`.
Additionally, the raw order book data is persisted to Parquet for:
- Backtesting future strategies against real L2 microstructure
- Calibrating fill probability models
- Detecting market structure regimes in strategy development
---
## 2. System Architecture Overview
```
Binance Futures WebSocket
wss://fstream.binance.com/stream
streams: btcusdt@depth@100ms + ethusdt@depth@100ms + solusdt@depth@100ms
┌────────────────────────────────────┐
│ OBStreamService │ external_factors/ob_stream_service.py
│ In-memory L2 book (price→qty) │
│ REST snapshot on init/reconnect │
│ asyncio event loop │
└────────────────┬───────────────────┘
│ get_depth_buckets() → dict with 5-bucket notional arrays
┌────────────────────────────────────┐
│ AsyncOBThread │ prod/obf_prefect_flow.py
│ daemon threading.Thread │
│ Bridges async → sync for Prefect │
│ get_depth_buckets_sync() → dict │
└────────────────┬───────────────────┘
│ raw_snaps dict per asset
┌────────────────────────────────────┐
│ LiveOBFeatureEngine │ prod/obf_persistence.py
│ Incremental rolling computation │
│ 4 sub-systems per asset │
│ Sub-3: cross-asset market signal │
│ Sub-4: macro regime │
└─────────┬──────────────────────────┘
│ │
▼ ▼
┌────────────────────┐ ┌─────────────────────┐
│ Hazelcast │ │ OBFPersistenceService│
│ DOLPHIN_FEATURES │ │ flush 300s │
│ obf_latest │ │ first flush 60s │
│ asset_X_ob │ │ /mnt/ng6_data/ │
│ circuit breaker │ │ ob_features/ │
│ preflight probe │ │ cleanup >7d │
└────────────────────┘ └─────────────────────┘
┌────────────────────┐
│ HZOBProvider │ nautilus_dolphin/.../hz_ob_provider.py
│ lazy connect │ discovers assets from HZ key set
│ OBPlacer │ nautilus_dolphin/.../ob_placer.py
│ SmartPlacer │ alpha_engine/execution/smart_placer.py
└────────────────────┘
PlacementDecision: MAKER / TAKER / PATIENT_MAKER / SKIP
```
For **backtesting**, the path is different:
```
ob_data/{ASSET}-bookDepth-{DATE}.csv
CSVOBProvider → OBFeatureEngine.preload_date()
│ (batch, pre-computes all features for one date)
engine.get_placement(asset, bar_idx)
engine.get_signal(asset, bar_idx)
engine.get_market(bar_idx)
engine.get_macro()
```
---
## 3. Data Flow — End to End
### 3.1 Production (live) path — every 0.5 s
```
t=0.000 WS event received by OBStreamService (100ms cadence from Binance)
t=0.000 _apply_event() updates in-memory bid/ask dicts
t=0.500 Prefect flow wakes on poll_interval_s=0.5
t=0.500 get_depth_buckets_sync(asset) called for each asset
└── asyncio.run_coroutine_threadsafe(..., timeout=0.5s)
└── get_depth_buckets() reads sorted bids/asks, bins into 5 pct buckets
t=0.500 local_ts = time.time() stamped
t=0.500 LiveOBFeatureEngine.update(raw_snaps) called
├── Sub-1: depth_1pct_usd, depth_quality, fill_probability, spread_proxy_bps
├── Sub-2: imbalance, imbalance_ma5, imbalance_persistence, depth_asymmetry, withdrawal_velocity
├── Sub-3: median_imbalance, agreement_pct, depth_pressure
└── Sub-4: depth_velocity, cascade_count, acceleration, regime_signal
t=0.501 circuit breaker check: if HZ open, skip HZ entirely this cycle
t=0.501 hz_push_obf_task.submit(client, "asset_BTCUSDT_ob", ...) × 3 [fire-and-forget]
t=0.502 hz_push_obf_task.submit(client, "obf_latest", ...).result(timeout=1.5) [blocks]
t=0.502 _write_local_cache(consolidated) — atomic JSON to ob_cache/latest_ob_features.json
t=0.502 pushed_at = time.time()
t=0.502 lag_s = pushed_at local_ts (typically < 10 ms on same machine)
t=0.502 persist.update_snapshot(asset, row) for each asset [O(1), lock-protected]
t=0.500 (first flush after 60s, then every 300s) OBFPersistenceService._flush() → Parquet
t=0.500 after flush: _cleanup_old_partitions() prunes dirs older than MAX_FILE_AGE_DAYS=7
```
### 3.2 Backtesting path
```
CSVOBProvider._load_asset(asset)
└── reads {ASSET}-bookDepth-{DATE}.csv
└── filters to files where: cutoff_start <= file_date < reference_date
└── pivots 10 rows/snapshot → arrays of shape [N, 5]
└── caches in self._cache["{asset}_{reference_date}"]
OBFeatureEngine.preload_date("2025-01-16", ["BTCUSDT"])
└── iterates all N snapshots in O(N) pass
└── builds rolling imb_hist and dep_hist arrays incrementally
└── computes all numba kernels per snapshot
└── stores in _preloaded_placement[asset][snap_idx], _preloaded_signal[asset][snap_idx]
└── computes cross-asset features at each snap_idx
└── updates _preloaded_macro at each snap_idx (uses latest)
└── _preloaded = True
During backtest bar loop:
engine.get_placement("BTCUSDT", bar_idx)
└── _resolve_idx(): if bar_idx < 1e9 snap_idx = bar_idx // 6
(VBT 5s bars : OB 30s snapshots = 6:1)
└── returns _preloaded_placement["BTCUSDT"][snap_idx]
```
---
## 4. Layer 1 — OBStreamService (Live L2 Book)
**Source:** `external_factors/ob_stream_service.py`
### 4.1 Initialization Protocol (Binance Diff Book)
Binance Futures requires a specific protocol to build a synchronized L2 book from diff streams. The service implements this exactly:
1. **Start buffering WS events immediately** before the REST snapshot is ready.
2. **Fetch REST snapshot** via `GET /fapi/v1/depth?symbol={ASSET}&limit=1000`.
- Weight cost: 20 per request. Used **only on init and reconnect**. Not rate-limited in steady state.
- Returns `lastUpdateId` (the sequence number of the snapshot).
3. **Replay buffered events**, discarding any with `event['u'] <= last_update_id` (already reflected in snapshot).
4. Mark `initialized[asset] = True`.
This protocol ensures the local book is **exactly synchronized** with Binance's server-side book at the time of snapshot. Any events with `u > lastUpdateId` that arrived while the REST call was in flight are buffered and applied in order.
### 4.2 Update Application
Every WebSocket diff event carries:
- `'b'` — list of `[price_str, qty_str]` bid updates
- `'a'` — list of `[price_str, qty_str]` ask updates
- `'u'` — final update ID
For each price/qty pair:
- `qty == 0.0`**delete** that price level from the dict
- `qty > 0.0`**upsert** that price level
This is the standard Binance L2 maintenance algorithm. The result is a `bids: Dict[float, float]` and `asks: Dict[float, float]` that are exact replicas of the exchange's book state.
### 4.3 Reconnection Handling
On `websockets.exceptions.ConnectionClosed`:
1. All assets reset to `initialized[a] = False`.
2. REST snapshots refetched concurrently for all assets using the shared session.
3. WS reconnection attempted after `reconnect_delay` seconds (exponential backoff).
**Exponential backoff (P1-2):**
- `reconnect_delay` starts at 3s, doubles each failure: 3→6→12→24→48→96→120→120…
- Capped at `_RECONNECT_MAX_S = 120s` with ±1s random jitter per attempt
- Resets to 3s if the connection stayed live for ≥ `_RECONNECT_STABLE_S = 60s`
- Prevents REST request storms under sustained exchange instability (2400 weight/min limit)
**WS stall detection (P0-2):**
- `self.last_event_ts: float` stamped on every received WS message
- `is_stale(threshold_s=30.0) -> bool` returns `True` if no events for >30s
- Hot loop checks `ob_thread.is_stale()` every `LOG_STATUS_EVERY=120` cycles and logs an error
During the reconnection window, `get_depth_buckets()` returns `None` (book not initialized). The flow handles this gracefully (skips those assets that cycle).
### 4.4 Depth Bucket Extraction
`get_depth_buckets(asset)` converts the raw price→qty dicts into the 5-bucket format needed downstream.
**Crossed book guard (P0-3):** Before computing, validates `best_bid < best_ask`. If the book is crossed (`best_bid >= best_ask`) — which can occur during reconnection, partial snapshot application, or exchange instability — the method returns `None` instead of computing corrupted features. Corrupted features would otherwise propagate to HZ and the Parquet archive.
```
For each bid level at price p, quantity q:
dist_pct = (mid - p) / mid × 100
idx = int(dist_pct) ← truncated, so idx 0 = [0%, 1%), idx 1 = [1%, 2%), etc.
if idx < max_depth_pct (=5):
bid_notional[idx] += p × q ← USD value
bid_depth[idx] += q ← quantity in asset units
else: break ← bids sorted descending, safe to break
```
Same logic for asks (sorted ascending, `dist_pct = (p - mid) / mid × 100`).
Returns:
```python
{
"timestamp": time.time(), # local extract wall clock
"asset": "BTCUSDT",
"bid_notional": np.array([...]), # [5] float64, USD at 0-1%, 1-2%, 2-3%, 3-4%, 4-5% below mid
"ask_notional": np.array([...]), # [5] float64, USD at 0-1%, 1-2%, ... above mid
"bid_depth": np.array([...]), # [5] float64, asset qty at each band
"ask_depth": np.array([...]), # [5] float64, asset qty at each band
"best_bid": float, # highest bid price
"best_ask": float, # lowest ask price
"spread_bps": float, # (ask - bid) / mid × 10,000
}
```
**Important:** Index `0` corresponds to the **nearest-to-mid** band (within 1% of mid). Index `4` is the **farthest** (45% from mid). This is consistent throughout all downstream consumers.
### 4.5 Thread Safety
The service uses per-asset `asyncio.Lock` objects. All reads and writes to `bids[asset]` and `asks[asset]` are protected. This is important because `get_depth_buckets()` sorts the full dict while WS events continue arriving.
---
## 5. Layer 2 — OBSnapshot and OBProvider Hierarchy
**Source:** `nautilus_dolphin/nautilus_dolphin/nautilus/ob_provider.py`
### 5.1 OBSnapshot Dataclass
```python
@dataclass
class OBSnapshot:
timestamp: float # Unix seconds
asset: str # e.g. "BTCUSDT"
bid_notional: np.ndarray # [5] float64 — USD at -1% to -5% from mid (cumulative per band)
ask_notional: np.ndarray # [5] float64 — USD at +1% to +5% from mid
bid_depth: np.ndarray # [5] float64 — asset qty at each band
ask_depth: np.ndarray # [5] float64 — asset qty at each band
```
All notional arrays use the **same indexing convention** as `OBStreamService`:
- `[0]` = within 1% of mid (the "near" band, most liquid)
- `[4]` = 45% from mid (the "far" band, less liquid)
Notional values represent the **total USD value within that percentage band**, not cumulative from mid. For example, `bid_notional[2]` is the total USD of bids between 2% and 3% below mid.
### 5.2 OBProvider Abstract Base Class
```python
class OBProvider(ABC):
@abstractmethod
def get_snapshot(self, asset: str, timestamp: float) -> Optional[OBSnapshot]: ...
@abstractmethod
def get_assets(self) -> List[str]: ...
@abstractmethod
def get_all_timestamps(self, asset: str) -> np.ndarray: ...
```
Three concrete implementations:
| Class | Source | Use case |
|---|---|---|
| `CSVOBProvider` | `ob_data/{ASSET}-bookDepth-{DATE}.csv` | Historical replay, backtesting |
| `MockOBProvider` | Synthetic, fully configurable | Unit testing |
| `HZOBProvider` | Hazelcast `asset_{ASSET}_ob` key | Live alpha engine reads |
### 5.3 CSVOBProvider
**CSV format:** One row per price level per timestamp:
```
timestamp, percentage, depth, notional
2025-01-15 00:00:05, -5, 4697.736, 444223397.14
2025-01-15 00:00:05, -4, 2103.451, 199827634.85
...
2025-01-15 00:00:05, +1, 891.234, 84667183.21
...
```
10 rows per snapshot (5 bid levels: -1 to -5, 5 ask levels: +1 to +5). The `percentage` sign follows the convention: **negative = bid side (below mid), positive = ask side**.
Index mapping:
- `percentage = -1``idx = 0` (near bid)
- `percentage = -5``idx = 4` (far bid)
- `percentage = +1``idx = 0` (near ask)
- `percentage = +5``idx = 4` (far ask)
**Date filtering (anti-leakage):** When `reference_date` is set, `_load_asset()` only loads CSV files where:
```
reference_date 7 days <= file_date < reference_date
```
Files from `reference_date` itself are **excluded**. This prevents the feature engine from seeing future data when computing the median depth reference used in normalization. The caller must pass `"2025-01-16"` to use data from `2025-01-15`.
**Lookup:** Binary search (`bisect.bisect_left`) on the sorted timestamp array. O(log N) per query. With 2880 snapshots/day (30s intervals) this is effectively instant.
**Tolerance:** `tolerance_s=30.0` — if the nearest snapshot is more than 30 s away from the requested timestamp, `None` is returned.
### 5.4 MockOBProvider
Generates synthetic snapshots with configurable `imbalance_bias` and `depth_scale`.
The mathematical relationship between `imbalance_bias` and the computed imbalance is exact:
```
bid_mult = 1 + bias
ask_mult = 1 bias
total_bid = base × Σ(level_weights) × bid_mult = base × 15 × (1 + bias)
total_ask = base × Σ(level_weights) × ask_mult = base × 15 × (1 bias)
imbalance = (bid ask) / (bid + ask)
= (15(1+b) 15(1b)) / (15(1+b) + 15(1b))
= 2b × 15 / (2 × 15)
= b
```
Therefore `compute_imbalance_nb(bid_notional, ask_notional) == imbalance_bias` exactly. This makes tests deterministic.
`level_weights = [1.0, 2.0, 3.0, 4.0, 5.0]` — depth increases with distance from mid, reflecting realistic order book shape where more passive orders sit further out.
`imbalance_biases: Dict[str, float]` allows per-asset overrides, so you can test cross-asset disagreement scenarios.
`num_snapshots=2880` with 30 s spacing models a full trading day (2880 × 30 = 86400 s).
### 5.5 HZOBProvider
Reads live data from Hazelcast key `asset_{ASSET}_ob`. Used by the production alpha engine to construct `OBSnapshot` objects for the `OBFeatureEngine`. Not used in the OBF flow itself — the flow pushes to HZ; consumers pull via this provider.
**Lazy connection (P1-4):** The HZ client is not created at construction time. `_ensure_connected()` is called on first use (first `get_snapshot()` or `get_assets()` call). This prevents failures during unit tests and avoids connection race conditions at startup.
**Dynamic asset discovery (P1-4):** `get_assets()` scans HZ key set for keys matching `asset_*_ob` pattern. This reflects the actual assets being pushed by the OBF flow, eliminating the prior hardcoded list that included `BNBUSDT` (not tracked). Alternatively, an explicit `assets: List[str]` list can be passed to the constructor for deterministic, fast operation.
**Error handling:** Separate `json.JSONDecodeError` catch logs the specific field; all other exceptions log and return `None`. No exception propagates to the caller.
---
## 6. Layer 3 — OBFeatureEngine (Batch Backtesting)
**Source:** `nautilus_dolphin/nautilus_dolphin/nautilus/ob_features.py`
### 6.1 Design Philosophy
The batch `OBFeatureEngine` is designed for **fast backtesting**: it pre-computes all features for an entire date in a single O(N) pass, then serves results in O(1) via dict lookup. This is the right design for vectorbt integration where all bars are processed up front.
In production, `LiveOBFeatureEngine` (Section 7) handles incremental streaming.
### 6.2 State
```python
_imbalance_history: Dict[str, deque] # per-asset rolling imbalance values
_depth_1pct_history: Dict[str, deque] # per-asset rolling 1% depth values
_market_depth_history: deque # aggregate depth across all assets
_median_depth_ref: Dict[str, float] # computed in preload, used for normalization
_preloaded_placement: Dict[str, Dict[int, OBPlacementFeatures]]
_preloaded_signal: Dict[str, Dict[int, OBSignalFeatures]]
_preloaded_market: Dict[int, OBMarketFeatures]
_preloaded_macro_map: Dict[int, OBMacroFeatures] # per snap_idx (P1-3 fix)
_ts_to_idx: Dict[str, Dict[float, int]] # timestamp → snap index map
bar_to_snap_ratio: int = 6 # configurable (P2-7)
```
### 6.3 preload_date() Algorithm
```
1. Set provider.reference_date = date_str (enables date filtering in CSVOBProvider)
2. For each asset:
a. Load all N snapshots
b. Compute depths: depths[i] = bid_notional[i][0] + ask_notional[i][0]
c. _median_depth_ref[asset] = median(depths) ← normalization reference
3. For snap_idx in range(N):
For each asset:
a. Compute raw features via numba kernels (imb, d1pct, dq, fp, sp, da)
b. Append to imb_hist[asset], dep_hist[asset]
c. Compute rolling features (ma5, persistence, velocity)
d. Store in _preloaded_placement[asset][snap_idx]
e. Store in _preloaded_signal[asset][snap_idx]
f. Collect (imb, velocity) for cross-asset computation
Compute Sub-3 (market agreement) from all assets' imbalances
Compute Sub-4 (macro regime) from all assets' velocities
Update _preloaded_market[snap_idx]
Update _preloaded_macro_map[snap_idx] = OBMacroFeatures(...) ← per-timestamp (P1-3)
4. _preloaded = True
```
### 6.4 Index Resolution
The `_resolve_idx()` method handles two modes:
**Timestamp mode** (`timestamp_or_idx > 1e9`): Direct lookup in `_ts_to_idx`, or binary search for nearest.
**Bar index mode** (integer < 1e9): Converts VBT 5-second bar indices to OB snapshot indices using `bar_idx // self.bar_to_snap_ratio`. Defaults assume:
- VBT bars: 5 s granularity
- OB snapshots: 30 s granularity
- Ratio: `bar_to_snap_ratio = 6` bars per snapshot (configurable via constructor, P2-7)
This integer division alignment means every OB feature is used for exactly 6 consecutive VBT bars. The features are refreshed every 30 s, which matches the Binance `bookDepth` snapshot cadence used when building CSV data.
**Per-timestamp macro (P1-3):** `get_macro(timestamp_or_idx)` now resolves the correct snapshot index and returns `_preloaded_macro_map[snap_idx]`. Previously it returned only the last computed value regardless of bar index — silently wrong for any non-terminal bar in a backtest.
---
## 7. Layer 3B — LiveOBFeatureEngine (Streaming Production)
**Source:** `prod/obf_persistence.py``class LiveOBFeatureEngine`
### 7.1 Why a Separate Class Exists
`OBFeatureEngine.preload_date()` requires all data upfront. In production:
- Data arrives one snapshot at a time, every 0.5 s.
- There is no future data to preload.
- The `median_depth_ref` must be bootstrapped from the first N live observations.
`LiveOBFeatureEngine` solves all three problems.
### 7.2 Rolling State
```python
_imb_hist: Dict[str, deque(maxlen=DEPTH_LOOKBACK=60)] # 60 × 0.5 s = 30 s
_dep_hist: Dict[str, deque(maxlen=DEPTH_LOOKBACK=60)] # same
_median_ref: Dict[str, float] # calibrated once after MEDIAN_CALIB_N=60 samples
_calib_buf: Dict[str, list] # accumulates first 60 depth values
_calibrated: Dict[str, bool] # True after calibration
_market_dep: deque(maxlen=60) # aggregate depth for acceleration computation
```
### 7.3 Median Calibration
For the first 60 snapshots (`MEDIAN_CALIB_N = 60`), `_calib_buf[asset]` accumulates `depth_1pct` values. After 60 samples, `_median_ref[asset]` is set to `np.median(calib_buf)` and `_calibrated[asset] = True`.
Before calibration is complete, `med_ref = max(d1pct, 1.0)` — this gives `depth_quality = 1.0` exactly (no normalization until the reference is stable). The 60-sample window is 30 seconds at 0.5 s polling — a reasonable warm-up period.
### 7.4 update() Return Structure
```python
{
"per_asset": {
"BTCUSDT": {
"depth_1pct_usd": float, # Sub-1
"depth_quality": float,
"fill_probability": float,
"spread_proxy_bps": float,
"imbalance": float, # Sub-2
"imbalance_ma5": float,
"imbalance_persistence": float,
"depth_asymmetry": float,
"withdrawal_velocity": float,
},
"ETHUSDT": { ... },
"SOLUSDT": { ... },
},
"market": {
"median_imbalance": float, # Sub-3
"agreement_pct": float,
"depth_pressure": float,
},
"macro": {
"depth_velocity": float, # Sub-4
"cascade_count": int,
"acceleration": float,
"regime_signal": int, # -1=CALM, 0=NEUTRAL, 1=STRESS
}
}
```
If `snap = None` for an asset (book not yet initialized), `per_asset[asset] = None` and that asset is excluded from cross-asset computations.
---
## 8. Sub-system 1 — Per-Asset Placement Features
**Numba kernels:** all `@njit(cache=True)` for fast JIT compilation with on-disk caching.
### 8.1 `compute_imbalance_nb(bid_notional, ask_notional)`
$$\text{imbalance} = \frac{\sum bid\_notional - \sum ask\_notional}{\sum bid\_notional + \sum ask\_notional}$$
- Range: `[-1.0, +1.0]`
- `+1.0` = all depth on bid side (strong buy pressure)
- `-1.0` = all depth on ask side (strong sell pressure)
- `0.0` if total depth < 1e-9 (guard against zero division)
- Uses entire 5-level depth vector (not just near-mid)
**This is the most fundamental microstructure signal.** It captures the instantaneous notional imbalance in the visible order book.
### 8.2 `compute_depth_1pct_nb(bid_notional, ask_notional)`
$$\text{depth\_1pct} = bid\_notional[0] + ask\_notional[0]$$
Total USD liquidity within ±1% of mid price. This is the "execution liquidity" — the depth that a market order of typical DOLPHIN size would encounter immediately.
On BTC, this was typically **>$100M on 2025-01-15** (verified in test `TestCSVOBProviderKnownValues`). Thin markets show <$20M.
### 8.3 `compute_depth_quality_nb(depth_1pct, median_ref)`
$$\text{depth\_quality} = \frac{depth\_1pct}{median\_ref}$$
Normalizes current liquidity against the historical median for that asset.
- `1.0` = typical liquidity
- `>1.0` = above-average liquidity (better fill probability)
- `<1.0` = below-average (thinner than usual)
- Returns `1.0` if `median_ref < 1e-9` (no reference yet)
### 8.4 `compute_fill_probability_nb(depth_quality)`
$$\text{fill\_probability} = 1 - e^{-k \cdot \max(0, depth\_quality)}, \quad k = 2.0$$
Exponential saturation model. Calibrated so that `quality = 1.0` (typical depth) gives approximately `1 - e^{-2} ≈ 0.865` (86.5% fill probability). Rationale: at typical BTC depth, a small maker order has very high fill probability.
Key values:
| depth_quality | fill_probability |
|---|---|
| 0.0 | 0.000 (no depth) |
| 0.5 | 0.632 |
| 1.0 | 0.865 |
| 1.5 | 0.950 |
| 2.0 | 0.982 |
The `OBPlacer` uses `fill_probability < 0.30` as a gate to force `TAKER` execution (thin book = don't wait for maker fill).
### 8.5 `compute_spread_proxy_nb(bid_notional, ask_notional)`
$$\text{ratio} = \frac{bid\_notional[0] + ask\_notional[0]}{\sum_{i=0}^{4}(bid\_notional[i] + ask\_notional[i])}$$
$$\text{spread\_proxy} = \max(0.1, (1 - ratio \times 3) \times 5)$$
This is a **depth-gradient proxy for spread**. The intuition: in tight markets, a large fraction of total depth sits near mid (high ratio). In wide-spread markets, near-mid depth is thin relative to the full book (low ratio). Returns approximate spread in bps-like units.
- `ratio ≈ 0.3` (30% of depth within 1%) → spread_proxy ≈ 0.5 (tight)
- `ratio ≈ 0.05` (5% of depth within 1%) → spread_proxy ≈ 4.25 (wide)
- Returns `10.0` if total depth < 1e-9 (no data = assume wide spread)
This is used in `OBPlacer` to cap limit order offset: `offset = min(offset, spread_proxy_bps × 0.4)` — never place our limit more than 40% of the spread from mid.
### 8.6 `OBPlacementFeatures` Dataclass
```python
@dataclass
class OBPlacementFeatures:
depth_1pct_usd: float # USD depth within ±1% of mid
depth_quality: float # vs median; 1.0 = typical
fill_probability: float # [0, 1]
spread_proxy_bps: float # approximate spread proxy
```
**Neutral default:** `NEUTRAL_PLACEMENT = OBPlacementFeatures(0.0, 1.0, 0.5, 2.0)`
- Zero depth (no data)
- Quality = 1.0 (don't penalize when no reference)
- Fill probability = 0.5 (uncertain)
- Spread proxy = 2.0 (moderate spread assumption)
---
## 9. Sub-system 2 — Per-Asset Signal Features
### 9.1 `compute_depth_asymmetry_nb(bid_notional, ask_notional)`
$$\text{asymmetry} = \frac{bid\_notional[0] + ask\_notional[0]}{bid\_notional[2] + ask\_notional[2]}$$
Ratio of near-mid depth (±1%) to mid-depth (±3%). High values indicate depth is **concentrated near mid** — a tight, liquid market. Low values indicate depth is spread out or front-running is occurring.
Uses index `[2]` (the 3% level) as the "far" reference because `[4]` (5%) can be sparse in fast markets.
### 9.2 `compute_imbalance_persistence_nb(imbalance_history, n)`
```
current_sign = sign(imbalance_history[-1])
persistence = fraction of last n values with same sign as current_sign
```
- Range: `[0, 1]`
- `1.0` = all recent snapshots had same directional pressure
- `0.5` = random / neutral (default when history too short)
- `0.0` = all recent snapshots were opposite direction
This distinguishes **sustained directional pressure** from momentary spikes. A high-persistence imbalance is much stronger evidence of an incoming move than a single snapshot.
Lookback: `IMBALANCE_LOOKBACK = 10` snapshots (5 s in live mode at 0.5 s, or 5 min in batch mode at 30 s).
### 9.3 Rolling MA5
`imbalance_ma5 = mean(imbalance_history[-5:])`. Simple 5-snapshot moving average. Smooths noise while being fast-reacting. In live mode at 0.5 s polling, this is a 2.5 s average.
### 9.4 `compute_withdrawal_velocity_nb(depth_history, lookback)`
$$\text{velocity} = \frac{depth\_history[-1] - depth\_history[-1 - lookback]}{depth\_history[-1 - lookback]}$$
Fractional change in 1% depth over the lookback window. Negative = depth is being withdrawn (market makers pulling quotes). Positive = depth is building.
- `DEPTH_LOOKBACK = 60` snapshots = 30 s lookback in live mode
- **`CASCADE_THRESHOLD = -0.10`**: velocity below -0.10 means ≥10% of near-mid depth was withdrawn in 30 s — a stress signal
- Returns `0.0` if fewer than 2 samples in history
### 9.5 `OBSignalFeatures` Dataclass
```python
@dataclass
class OBSignalFeatures:
imbalance: float # [-1, +1] raw directional pressure
imbalance_ma5: float # 5-snapshot smoothed
imbalance_persistence: float # [0, 1] direction consistency
depth_asymmetry: float # near/far depth ratio
withdrawal_velocity: float # rate of depth drain (negative = withdrawal)
```
**Neutral default:** `NEUTRAL_SIGNAL = OBSignalFeatures(0.0, 0.0, 0.5, 1.0, 0.0)`
---
## 10. Sub-system 3 — Cross-Asset Market Signal
### 10.1 `compute_market_agreement_nb(asset_imbalances, n_assets)`
This kernel operates on a vector of per-asset imbalances (one per tracked asset, e.g. `[imb_BTC, imb_ETH, imb_SOL]`).
**Step 1: Median imbalance**
Finds the median of the n_assets values (sort + select middle element). The median is more robust than mean to outlier assets.
**Step 2: Agreement fraction**
```
median_sign = sign(median)
agreement_pct = count(assets with same sign as median) / n_assets
```
- Returns `(0.0, 0.5)` if `abs(median) < 1e-9` — undefined direction
- Returns `(median, agreement_pct)` otherwise
**Interpretation:**
- `median > 0, agreement_pct = 1.0` — all assets bid-heavy simultaneously. Strong buy-side pressure across the market. High conviction long.
- `median < 0, agreement_pct = 1.0` — all assets ask-heavy simultaneously. Strong sell-side pressure. High conviction short.
- `agreement_pct = 0.33` (one of three) — divergence, noisy market.
### 10.2 Depth Pressure
`depth_pressure = mean(asset_imbalances)`. Simpler aggregate than median — the mean captures the total directional force. Used alongside `median_imbalance` as a cross-check.
### 10.3 `OBMarketFeatures` Dataclass
```python
@dataclass
class OBMarketFeatures:
median_imbalance: float # cross-asset median
agreement_pct: float # [0, 1] fraction agreeing
depth_pressure: float # mean imbalance across assets
```
**Neutral default:** `NEUTRAL_MARKET = OBMarketFeatures(0.0, 0.5, 0.0)`
---
## 11. Sub-system 4 — Macro Regime / Market Structure
### 11.1 `compute_cascade_signal_nb(depth_velocities, n_assets, threshold)`
Evaluates the vector of per-asset `withdrawal_velocity` values.
```python
cascade_count = count(assets where velocity < threshold) # threshold = -0.10
mean_velocity = mean(depth_velocities)
# Regime classification:
if cascade_count >= max(2, n_assets // 2) AND mean_velocity < threshold:
regime = 1 # STRESS: majority of assets withdrawing depth
elif mean_velocity > abs(threshold):
regime = -1 # CALM: depth building across market
else:
regime = 0 # NEUTRAL
```
**STRESS regime (1):** Multiple assets simultaneously withdrawing order book depth at ≥10% rate. This typically precedes sharp moves. Maker orders should be avoided or made fast-timeout; urgency is high.
**CALM regime (-1):** Depth building across the market. Market makers are posting aggressively. Good conditions for patient maker execution.
**NEUTRAL regime (0):** No clear structural signal.
### 11.2 Acceleration
Second derivative of aggregate market depth:
```python
recent_vel = (market_dep[-1] - market_dep[-10]) / market_dep[-10]
prior_vel = (market_dep[-10] - market_dep[-20]) / market_dep[-20]
acceleration = recent_vel - prior_vel
```
Requires at least 20 market depth samples in history. Negative acceleration (depth withdrawing and accelerating) is a precursor to STRESS regime.
### 11.3 `OBMacroFeatures` Dataclass
```python
@dataclass
class OBMacroFeatures:
depth_velocity: float # mean withdrawal velocity across all assets
cascade_count: int # number of assets in withdrawal
acceleration: float # 2nd derivative of aggregate depth
regime_signal: int # -1=CALM, 0=NEUTRAL, 1=STRESS
```
**Neutral default:** `NEUTRAL_MACRO = OBMacroFeatures(0.0, 0, 0.0, 0)`
---
## 12. Layer 4 — OBPlacer (Execution Advice)
**Source:** `nautilus_dolphin/nautilus_dolphin/nautilus/ob_placer.py`
### 12.1 OBPlacementAdvice
```python
@dataclass
class OBPlacementAdvice:
method: str # "MAKER" | "TAKER" | "PATIENT_MAKER" | "SKIP"
offset_bps: float # limit offset from mid (maker only)
timeout_s: float # max wait for maker fill
confidence: float # [0, 1] how confident in this advice
reason: str # human-readable explanation
```
### 12.2 Decision Tree
The `OBPlacer.advise(placement, signal, signal_confidence, direction)` method applies the following logic:
```
1. signal_confidence < 0.40 SKIP("signal_too_weak")
2. signal_confidence > 0.85 → TAKER("high_confidence_taker")
(signal so strong, don't risk missing it)
3. fill_probability < 0.30 TAKER("thin_book_taker")
(book too thin for maker, take immediately)
4. Compute effective imbalance from trade direction:
eff_imb = imbalance if direction == SHORT else imbalance
(for SHORT: positive imbalance contradicts, negative confirms)
ob_confirmation:
eff_imb > 0.15 → +1 (confirms)
eff_imb < 0.15 → 1 (contradicts)
else → 0 (neutral)
5. Compute patience [0, 1] — inverse of confidence in maker zone:
patience = 1.0 (confidence 0.40) / (0.85 0.40)
(confidence 0.40 → patience 1.0; confidence 0.85 → patience 0.0)
OB adjustments:
ob_confirms: patience += 0.25 (can wait, OB supporting us)
ob_contradicts: patience = 0.30 (urgent, OB fighting us)
6. Map patience to parameters:
timeout = 10 + patience × (50 10) # FAST_S=10, PATIENT_S=50
offset = 0 + patience × (2.0 0) # MIN_BPS=0, MAX_BPS=2.0
offset = min(offset, spread_proxy_bps × 0.4) # spread sanity cap
7. Choose method:
ob_confirms AND fill_prob > 0.70 → PATIENT_MAKER("ob_confirms_deep_book")
ob_contradicts → MAKER("ob_contradicts_fast_maker", timeout=10s)
else → MAKER("neutral_ob_default_maker")
8. confidence = min(1.0, fill_probability × (0.7 + 0.3 × patience))
```
### 12.3 Constants
| Constant | Value | Source |
|---|---|---|
| `CONFIDENCE_TAKER_THRESHOLD` | 0.85 | ob_placer.py + smartplacer_constants.py |
| `CONFIDENCE_MAKER_THRESHOLD` | 0.40 | ob_placer.py + smartplacer_constants.py |
| `IMBALANCE_STRONG` | 0.15 | ob_placer.py |
| `OFFSET_MIN_BPS` | 0.0 | ob_placer.py |
| `OFFSET_MAX_BPS` | 2.0 | ob_placer.py |
| `TIMEOUT_FAST_S` | 10 | ob_placer.py + smartplacer_constants.py |
| `TIMEOUT_DEFAULT_S` | 25 | smartplacer_constants.py |
| `TIMEOUT_PATIENT_S` | 50 | ob_placer.py + smartplacer_constants.py |
---
## 13. SmartPlacer Integration
**Source:** `alpha_engine/execution/smart_placer.py` + `alpha_engine/execution/smartplacer_constants.py`
`SmartPlacer` is the alpha engine's execution decision module. It is **separate from OBPlacer** but implements a parallel logic tree, using OB features as inputs.
### 13.1 `ob_confirms_signal(direction, imbalance, threshold=0.15)`
```
For SHORT: effective_imb = imbalance
For LONG: effective_imb = +imbalance
effective_imb > threshold → CONFIRMS (OB pressure aligns with trade direction)
effective_imb < threshold → CONTRADICTS (OB pressure opposes trade direction)
else → NEUTRAL
```
This function bridges OB microstructure to execution timing. Example: signal says SHORT. OB imbalance = -0.25 (ask-heavy). `effective_imb = -(-0.25) = +0.25 > 0.15` → CONFIRMS. Sell pressure in the book aligns with our short, so we can be more patient.
### 13.2 `decide_placement(confidence, ob_confirmation, spread_bps, vol_regime)`
Three-gate decision tree identical in structure to `OBPlacer.advise()` but with additional volatility regime input:
```
GATE 1: confidence < 0.40 SKIP
GATE 2: confidence > 0.85 → TAKER
MAKER ZONE:
patience = 1 (confidence 0.40) / 0.45
OB adjustment: CONFIRMS +0.25, CONTRADICTS 0.30
Volatility: HIGH × 0.7, LOW × 1.2 (capped at 1.0)
timeout = 10 + patience × 40
offset = 0 + patience × 2.0 (capped at spread × 0.4)
fallback = ABORT if CONTRADICTS AND confidence < 0.55
= TAKER otherwise
```
### 13.3 `compute_limit_price(direction, best_bid, best_ask, offset_bps)`
```
mid = (best_bid + best_ask) / 2
offset_abs = mid × offset_bps / 10000
SHORT (selling): price = best_ask offset_abs (undercut competing sellers)
floor: price ≥ mid × (1 + 1e-6) (must remain on ask side)
LONG (buying): price = best_bid + offset_abs (outbid competing buyers)
ceiling: price ≤ mid × (1 1e-6) (must remain on bid side)
```
The 1e-6 buffer ensures the limit price never crosses the spread — placing a limit inside the spread would immediately fill as a taker order, defeating the purpose.
### 13.4 Fee Economics
| | Value |
|---|---|
| Taker fee | 0.05% per side |
| Maker fee | 0.02% per side |
| Fee delta | 0.03% per side |
| Leveraged notional | $9,375 (=$3,750 × 2.5×) |
| Savings per maker fill | $2.81 |
| Expected savings (25s, 76% fill rate) | $2.14 |
| Expected savings (50s, 82% fill rate) | $2.31 |
### 13.5 Adverse Move Abort (`check_adverse_move`)
After 5 s grace period, if price moves `> 5 bps` against our direction, the maker order is abandoned:
- SHORT: price went up 5+ bps → abort (we're trying to sell, but price is rising against us)
- LONG: price went down 5+ bps → abort
This prevents being caught in a rapidly deteriorating position while waiting for a maker fill that will never come at a meaningful price.
---
## 14. BacktestAdapter and Fill Simulator
**Sources:** `alpha_engine/execution/adapters.py`, `alpha_engine/execution/fill_simulator.py`
These are the **stochastic simulation layer** that must be preserved intact for ROI continuity regression testing.
### 14.1 BacktestAdapter.submit_order()
For `MAKER` orders:
```python
fill_prob = 0.7 (size / 100_000) (volatility × 100)
fill_prob = clamp(fill_prob, 0.1, 0.9)
if random.random() < fill_prob:
fill at limit_price
```
This model captures:
- **Size penalty:** larger orders are harder to fill passively (fill_prob decreases linearly)
- **Volatility penalty:** high-vol markets have faster queue dynamics, lower passive fill probability
- Base rate of 0.70 at size=0, vol=0 (reflecting a benign market)
For `LIMIT` (marketable limit) and `MARKET` orders, fill is immediate with spread-based slippage.
### 14.2 `simulate_maker_fill()` — Walk-Forward Simulation
More sophisticated than `BacktestAdapter`, designed for detailed backtesting:
```
1. Filter OB snapshots to window [signal_time, signal_time + timeout_s]
2. Walk forward through snapshots
3. At each snapshot, check:
a. Adverse move abort (after 5s grace):
SHORT: if (current_mid entry_mid) / entry_mid × 10000 > 5.0 → ABORT
LONG: if (entry_mid current_mid) / entry_mid × 10000 > 5.0 → ABORT
b. Fill condition:
SHORT: filled if best_bid >= limit_price OR approx_mid >= limit_price
LONG: filled if best_ask <= limit_price OR approx_mid <= limit_price
4. If would fill: apply fill_discount = 0.80 stochastic gate
if random.random() < 0.80 MAKER fill at limit_price
5. If timeout expires: TIMEOUT (no fill)
```
### 14.3 FILL_DISCOUNT_FACTOR = 0.80 — DO NOT CHANGE
This constant is the **backbone of the stochastic regression suite**. It represents queue position uncertainty: even if the market price crosses our limit, we may not be at the front of the queue. The 80% discount means:
- 20% of "would-fill" scenarios are attributed to queue displacement
- This is calibrated to produce realistic maker fill rates of ~7085%
Changing this value would break backward compatibility with all historical backtests and invalidate the ROI regression suite. It is tested explicitly: `assert DEFAULT_CONFIG.FILL_DISCOUNT_FACTOR == 0.80`.
---
## 15. Prefect Flow — obf_prefect_flow
**Source:** `prod/obf_prefect_flow.py`
### 15.1 Flow Parameters
| Parameter | Default | Description |
|---|---|---|
| `warmup_s` | 8.0 | Seconds to wait after WS start before first push |
| `poll_interval_s` | 0.5 | Target cycle time (2 Hz) |
| `assets` | `["BTCUSDT","ETHUSDT","SOLUSDT"]` | Assets to track |
### 15.2 Startup Sequence
```
1. AsyncOBThread.start() ← daemon thread with its own asyncio loop
2. AsyncOBThread.wait_ready(10s) ← block until event loop is running
3. time.sleep(warmup_s) ← allow REST snapshots to complete
4. Log init_status (which assets are synchronized)
5. OBFPersistenceService.start() ← background Parquet flush thread
6. LiveOBFeatureEngine(assets) ← initialize rolling state
7. make_hz_client() ← connect to Hazelcast
8. _hz_preflight(client) ← probe HZ with dummy put(); retry 5× @ 3s; abort if all fail
9. Enter hot loop
```
The 8 s warmup is critical. The Binance REST snapshot for 3 assets takes ~200500 ms each. The WS stream starts immediately and buffers events. After 8 s:
- All 3 REST snapshots have completed in parallel
- Buffered events have been applied
- Books are synchronized
If after warmup `n_ready == 0`, the flow continues but logs a warning. Per-asset dark streak counters will trigger warnings at 5 consecutive dark cycles (2.5s).
**HZ Preflight (P1-8):** Before entering the hot loop, `_hz_preflight()` writes a test key (`_obf_heartbeat`) to Hazelcast. If this fails after 5 retries × 3s delay, the flow aborts rather than running silently with a broken HZ client.
### 15.3 Hot Loop (steps AK)
```
A. Per-asset: get_depth_buckets_sync() [~2 ms including asyncio round-trip]
├── _none_streak[asset] counter updated
├── warn if _none_streak == _DARK_WARN_AFTER (5 cycles = 2.5s)
└── log restore if streak clears
B. Stamp local_ts
C. feature_engine.update(raw_snaps) [~1 ms, numba kernels]
D. Build consolidated HZ payload [dict operations]
E. HZ circuit breaker check:
├── if circuit OPEN: skip E→F, decrement cooldown
└── if circuit CLOSED:
E. hz_push_obf_task.submit(asset_payload) × N [fire-and-forget]
F. hz_push_obf_task.submit(obf_latest).result(timeout=1.5) [blocks]
G. on failure: _hz_consec_failures++; open circuit if ≥5
H. _write_local_cache() atomic JSON [~1 ms]
I. Build row dicts + persist.update_snapshot() per asset [O(1) each]
J. Periodic status log (every 120 pushes = every 60s)
├── check ob_thread.is_stale(30s) — log error if WS silent
├── log top-3 error keys from push_errors dict
└── log circuit breaker state if open
K. Sleep remainder of poll_interval_s [monotonic clock]
```
**Circuit Breaker (P0-1):**
- `_HZ_CIRCUIT_OPEN_AFTER = 5` consecutive consolidated push failures → circuit opens
- `_HZ_CIRCUIT_RESET_AFTER = 30` cycles (~15s) cooldown → circuit closes, one probe attempt
- While circuit is open: HZ push skipped, Parquet persistence and local cache still run
- Prevents 24-second hot-loop freezes during HZ outages
**Dark Streak Counter (P0-4):**
- `_none_streak: Dict[str, int]` per asset
- Warns at `_DARK_WARN_AFTER = 5` consecutive None cycles (2.5s)
- Logs restoration when book comes back
- `_all_dark_cycles` counter: `log.critical` at `_DARK_CRITICAL_AFTER = 120` (~60s, all assets dark)
### 15.4 Prefect Best Practices Applied
- `@task(cache_policy=NO_CACHE)` on `hz_push_obf_task`: prevents Prefect from trying to serialize the Hazelcast client object for caching. Without this, Prefect would attempt to pickle the HZ client and fail.
- `retries=3, retry_delay_seconds=2` on HZ push: tolerates transient Hazelcast blips (network reconnection, leader election).
- `get_run_logger()` called once inside flow context, not at module level.
- `log_prints=True` on `@flow`: all `print()` statements inside the flow are captured into Prefect logs.
- `_push_seq` and `_pushed_at` metadata fields in every HZ payload for debugging.
### 15.5 Local Cache
`ob_cache/latest_ob_features.json` — atomic write via `tmp → rename`. Allows the DOLPHIN scanner (running in a separate process) to read the latest OBF state without accessing Hazelcast. PID-stamped temp file prevents collision if multiple flow instances start simultaneously.
---
## 16. AsyncOBThread — Async/Sync Bridge
**Source:** `prod/obf_prefect_flow.py``class AsyncOBThread`
### 16.1 The Problem
`OBStreamService` is fully async (`asyncio`). The Prefect flow is synchronous Python. You cannot `await` inside a `@flow` function. The solution is to run the asyncio event loop in a dedicated daemon thread.
### 16.2 Thread Architecture
```
Main thread (Prefect flow, synchronous)
│ AsyncOBThread(threading.Thread, daemon=True)
│ ├── self._loop = asyncio.new_event_loop()
│ ├── asyncio.set_event_loop(self._loop)
│ └── self._loop.run_until_complete(self._run_forever())
│ │
│ ├── asyncio.create_task(self.service.stream()) ← starts WS + REST
│ ├── self._ready.set() ← signals main thread
│ └── await asyncio.get_event_loop().create_future() ← parks forever
│ Sync API:
└── get_depth_buckets_sync(asset) → Optional[dict]
└── asyncio.run_coroutine_threadsafe(
self.service.get_depth_buckets(asset),
self._loop
).result(timeout=HZ_PUSH_INTERVAL_S) # 0.5s timeout
```
### 16.3 `run_coroutine_threadsafe`
This stdlib function is the correct way to call async code from a different thread. It:
1. Creates a `concurrent.futures.Future` wrapping the coroutine
2. Schedules it on `self._loop` (thread-safe)
3. Returns the future, which `.result(timeout=...)` blocks on
The 0.5 s timeout matches `poll_interval_s`. If the book takes more than 0.5 s to respond (e.g., lock contention from a WS event storm), `get_depth_buckets_sync()` returns `None` gracefully. This asset is skipped for this cycle and picked up next cycle.
### 16.4 `daemon=True` and `stop()` (P2-5)
The thread is set as a daemon. When the Prefect flow exits, the finally block calls `ob_thread.stop()` for a graceful shutdown.
`stop()` signals the asyncio event loop via `_stop_future.set_result(None)` (thread-safe via `call_soon_threadsafe`). The `_run_forever()` coroutine is parked on `await self._stop_future` and wakes immediately. `stop()` then calls `join(timeout=5.0)`.
Without `stop()`, tests that create `AsyncOBThread` would leave orphaned `aiohttp.ClientSession` objects and generate `ResourceWarning: Unclosed client session` noise.
---
## 17. OBFPersistenceService — Parquet Layer
**Source:** `prod/obf_persistence.py``class OBFPersistenceService`
### 17.1 Purpose
Builds a historical record of live order book features for future backtesting. Arrow/Parquet was chosen for:
- Zero-copy memory mapping when reading
- Columnar compression (Snappy) with predictable file sizes
- Hive partitioning compatible with Spark, DuckDB, Pandas, PyArrow Dataset API
- Schema enforcement at write time (no surprise type coercions)
### 17.2 Thread Safety Model
```
Main thread (hot loop, 2 Hz):
persist.update_snapshot(asset, row)
└── with self._lock: ← held for ~1 µs
self._history[asset].append(row)
Background thread (daemon, "obf-persist"):
every flush_interval_s (300 s):
persist._flush()
└── with self._lock: ← snapshot all deques
snapshots = {a: list(q) for a, q in self._history.items()}
(lock released immediately after copy)
└── _write_parquet(asset, rows, ...) per asset
└── build column arrays → pa.table → pq.write_table
└── atomic: write to .tmp → os.rename to .parquet
└── md5 checksum written alongside
```
The lock is held for the minimum possible time: only during the `list(q)` snapshot copy. Parquet file writing (the slow part) happens outside the lock. This ensures the hot loop is never blocked by disk I/O.
### 17.3 Rolling Buffer
`deque(maxlen=HISTORY_MAXLEN=700)` per asset. At `poll_interval_s=0.5`:
```
700 × 0.5 s = 350 s ≥ 300 s flush interval (+ 100 safety margin)
```
`HISTORY_MAXLEN = int(FLUSH_INTERVAL_S / POLL_INTERVAL_S) + 100 = 700` (P0-5 fix).
Previous value was `360` (180s), which was less than the 300s flush interval — up to 120s of data was always silently evicted before each flush. This is now fixed: the deque always covers at least one full flush interval.
**First flush timing (P1-9):** `_run()` sleeps `FIRST_FLUSH_S = 60` (not `flush_interval_s = 300`) before the first flush. This ensures the first Parquet file is written within 60s of startup, not 300s. Crashes within the first minute still lose up to 60s of data (WAL is a future improvement, P0-5).
### 17.4 Atomic Write Protocol
```python
tmp_path = out_path.with_suffix(".tmp")
pq.write_table(table, tmp_path, compression="snappy")
tmp_path.rename(out_path) # atomic on POSIX filesystems
```
If the process crashes mid-write, only the `.tmp` file is orphaned — the target `.parquet` file is never partially written. Readers always see complete Parquet files.
### 17.5 MD5 Checksum
After each Parquet file is written, an MD5 checksum file `{filename}.parquet.md5` is written alongside:
```python
digest = hashlib.md5(out_path.read_bytes()).hexdigest()
(out_dir / f"{filename}.md5").write_text(digest)
```
Used for:
- Data integrity verification when copying files to remote storage
- Detecting bit rot in long-term archival
- Validating downloads if files are replicated across nodes
### 17.6 Hive Partition Structure
```
/mnt/ng6_data/ob_features/
└── exchange=binance/
├── symbol=BTCUSDT/
│ └── date=2026-03-22/
│ ├── part-14-30-00.parquet
│ ├── part-14-30-00.parquet.md5
│ ├── part-14-35-00.parquet
│ └── part-14-35-00.parquet.md5
├── symbol=ETHUSDT/
│ └── date=2026-03-22/
│ └── ...
└── symbol=SOLUSDT/
└── ...
```
**Reading with PyArrow Dataset API:**
```python
import pyarrow.dataset as ds
dataset = ds.dataset("/mnt/ng6_data/ob_features", partitioning="hive")
table = dataset.to_table(filter=ds.field("symbol") == "BTCUSDT")
```
**Note:** When reading a Parquet file directly with `pq.read_table()`, PyArrow infers and adds the Hive partition columns (`exchange`, `symbol`, `date`) to the schema automatically. These columns are NOT in `OB_SCHEMA` — they are partition metadata. Schema validation must use a subset check: `expected_names ⊆ written_names`.
---
## 18. Hazelcast Integration
**Map:** `DOLPHIN_FEATURES`
### 18.1 Keys Published by OBF Flow
| Key | Content | Consumers |
|---|---|---|
| `asset_BTCUSDT_ob` | Raw OB snapshot (bid/ask notional arrays, best bid/ask, spread) | `HZOBProvider`, alpha engine |
| `asset_ETHUSDT_ob` | Same for ETH | Same |
| `asset_SOLUSDT_ob` | Same for SOL | Same |
| `obf_latest` | Consolidated all-asset features (all 4 sub-systems flattened) | Alpha engine, scanner, monitoring |
### 18.2 `obf_latest` Payload Structure
```json
{
"timestamp": "2026-03-22T14:30:00.000000+00:00",
"local_ts": 1742654200.123,
"compute_ts": 1742654200.124,
"assets": ["BTCUSDT", "ETHUSDT", "SOLUSDT"],
"_push_seq": 42180,
"_pushed_at": "2026-03-22T14:30:00.131000+00:00",
"_n_assets_live": 3,
"_n_assets_total": 3,
"_all_live": true,
"market_median_imbalance": 0.142,
"market_agreement_pct": 0.667,
"market_depth_pressure": 0.093,
"macro_depth_velocity": -0.023,
"macro_cascade_count": 0,
"macro_acceleration": 0.002,
"macro_regime_signal": 0,
"btcusdt_depth_1pct_usd": 183421034.50,
"btcusdt_depth_quality": 1.12,
"btcusdt_fill_probability": 0.895,
"btcusdt_spread_proxy_bps": 0.82,
"btcusdt_imbalance": 0.187,
"btcusdt_imbalance_ma5": 0.143,
"btcusdt_imbalance_persistence": 0.8,
"btcusdt_depth_asymmetry": 1.43,
"btcusdt_withdrawal_velocity": -0.012,
... (same fields for ethusdt_, solusdt_)
}
```
### 18.3 `asset_{ASSET}_ob` Payload Structure
```json
{
"timestamp": 1742654200.123,
"asset": "BTCUSDT",
"bid_notional": [92340000.0, 45120000.0, 23400000.0, 14200000.0, 8100000.0],
"ask_notional": [87210000.0, 41300000.0, 21900000.0, 13800000.0, 7700000.0],
"bid_depth": [0.924, 0.451, 0.234, 0.142, 0.081],
"ask_depth": [0.872, 0.413, 0.219, 0.138, 0.077],
"best_bid": 99980.50,
"best_ask": 99981.20,
"spread_bps": 0.70,
"_pushed_at": "2026-03-22T14:30:00.131000+00:00",
"_push_seq": 126540
}
```
---
## 19. Timing, Rate Limits, and Lag Detection
### 19.1 Binance Rate Limits
| Endpoint | Weight | Usage in OBF |
|---|---|---|
| `wss://fstream.binance.com/stream` | N/A (push) | Continuous, no rate limit |
| `GET /fapi/v1/depth?limit=1000` | 20 per request | Init only + reconnect only |
The WebSocket `@depth@100ms` stream is a **server-push** stream. Binance pushes diffs every 100ms without any request from the client. There is no rate limit on receiving WS messages.
The REST `/depth` endpoint (weight 20) is only called:
1. Once per asset on startup (3 calls total for BTC/ETH/SOL)
2. Once per asset after any WebSocket reconnection
With a 2,400 weight-per-minute limit on Binance Futures, 3 calls = 60 weight — trivial. Even with frequent reconnections, this will never approach the rate limit.
**There is no polling of REST endpoints in the hot loop.**
### 19.2 Three-Timestamp Architecture
Every row in the Parquet output carries three timestamps:
| Field | Meaning | Typical delta from previous |
|---|---|---|
| `exchange_ts` | Binance's `timestamp` in the WS event (when the exchange processed the update) | ~100ms before local_ts |
| `local_ts` | Wall clock when `get_depth_buckets()` completed (local Python extract time) | ~15ms after exchange_ts |
| `pushed_at` | Wall clock after HZ push and persistence buffer write | ~1050ms after local_ts |
`lag_s = pushed_at local_ts` measures the **processing latency** within one cycle: feature computation + HZ round-trip.
### 19.3 Drift Warning
Every `LOG_STATUS_EVERY = 120` push cycles (60 seconds at 2 Hz), the flow logs:
```
OBF status | pushes=240 errors=0 lag_s=0.012 assets_live=3/3 files=6
```
If `lag_s > poll_interval_s × 2` (i.e., > 1.0 s for default config):
```
WARNING: OBF LAG DRIFT DETECTED: lag_s=1.234 > 2×poll_interval=1.000
```
This indicates the system is falling behind — the push cycle is taking longer than the poll interval. Possible causes: HZ overload, slow disk I/O, CPU saturation.
### 19.4 WS-to-Local Lag
The `exchange_ts` vs `local_ts` difference reflects:
- Network latency from Binance servers (typically 550ms from a datacenter)
- Python asyncio scheduling overhead (~1ms)
- WS message parsing
For monitoring: if `local_ts exchange_ts > 200ms` consistently, investigate network connectivity or if the event loop is blocked.
---
## 20. Parquet Schema — Complete Field Reference
**Source:** `prod/obf_persistence.py``OB_SCHEMA`
Total fields: 34. Arrow types chosen for precision vs storage tradeoff:
- `float64` for timing and critical financial values (bid/ask prices, notional)
- `float32` for derived features (acceptable precision, halves storage)
- `int32` for integer signals
- `string` for asset identifier (partitioned separately, stored once per file)
| Field | Arrow Type | Description |
|---|---|---|
| **Timing** | | |
| `exchange_ts` | float64 | Unix seconds, from Binance WS event |
| `local_ts` | float64 | Wall clock at OB extraction |
| `pushed_at` | float64 | Wall clock after HZ push |
| `lag_s` | float32 | `pushed_at local_ts` |
| **Identity** | | |
| `asset` | string | e.g. "BTCUSDT" |
| **Top-of-Book** | | |
| `best_bid` | float64 | Highest bid price at extraction time |
| `best_ask` | float64 | Lowest ask price at extraction time |
| `spread_bps` | float32 | `(best_ask best_bid) / mid × 10,000` |
| **Depth Vectors** | | |
| `bid_notional_0` | float64 | USD bids within 01% of mid |
| `bid_notional_1` | float64 | USD bids within 12% of mid |
| `bid_notional_2` | float64 | USD bids within 23% of mid |
| `bid_notional_3` | float64 | USD bids within 34% of mid |
| `bid_notional_4` | float64 | USD bids within 45% of mid |
| `ask_notional_0` | float64 | USD asks within 01% of mid |
| `ask_notional_1` | float64 | USD asks within 12% of mid |
| `ask_notional_2` | float64 | USD asks within 23% of mid |
| `ask_notional_3` | float64 | USD asks within 34% of mid |
| `ask_notional_4` | float64 | USD asks within 45% of mid |
| **Sub-system 1: Placement** | | |
| `depth_1pct_usd` | float64 | `bid_notional_0 + ask_notional_0` |
| `depth_quality` | float32 | `depth_1pct_usd / median_ref` |
| `fill_probability` | float32 | `1 exp(2 × depth_quality)` |
| `spread_proxy_bps` | float32 | Derived spread indicator |
| **Sub-system 2: Signal** | | |
| `imbalance` | float32 | `(bid ask) / (bid + ask)` across all 5 levels |
| `imbalance_ma5` | float32 | 5-snapshot smoothed imbalance |
| `imbalance_persistence` | float32 | Fraction of last 10 with same sign |
| `depth_asymmetry` | float32 | `(near_bid+ask) / (mid_bid+ask)` |
| `withdrawal_velocity` | float32 | Rate of depth drain over 30 s |
| **Sub-system 3: Market** | | |
| `median_imbalance` | float32 | Cross-asset median imbalance |
| `agreement_pct` | float32 | Fraction of assets agreeing with median |
| `depth_pressure` | float32 | Mean cross-asset imbalance |
| **Sub-system 4: Macro** | | |
| `depth_velocity` | float32 | Mean per-asset withdrawal velocity |
| `cascade_count` | int32 | Assets with velocity < 0.10 |
| `acceleration` | float32 | 2nd derivative of aggregate depth |
| `regime_signal` | int32 | 1=CALM, 0=NEUTRAL, 1=STRESS |
---
## 21. Neutral Defaults and Graceful Degradation
All four sub-systems have defined neutral defaults, returned when data is unavailable:
```python
NEUTRAL_PLACEMENT = OBPlacementFeatures(
depth_1pct_usd=0.0, # no depth data
depth_quality=1.0, # don't penalize — assume normal
fill_probability=0.5, # uncertain
spread_proxy_bps=2.0, # moderate spread assumption
)
NEUTRAL_SIGNAL = OBSignalFeatures(
imbalance=0.0, # no directional bias
imbalance_ma5=0.0,
imbalance_persistence=0.5,
depth_asymmetry=1.0,
withdrawal_velocity=0.0,
)
NEUTRAL_MARKET = OBMarketFeatures(
median_imbalance=0.0,
agreement_pct=0.5, # 50% = uncertain, not 0%
depth_pressure=0.0,
)
NEUTRAL_MACRO = OBMacroFeatures(
depth_velocity=0.0,
cascade_count=0,
acceleration=0.0,
regime_signal=0, # NEUTRAL (not STRESS)
)
```
The choice of `agreement_pct=0.5` for neutral (not 0.0 or 1.0) is deliberate: it represents maximum uncertainty, not confirmed disagreement or agreement.
The `OBFeatureEngine` returns these neutrals when:
- `_preloaded = False` (preload not yet run)
- Requested `asset` not in preloaded data
- Requested snapshot index out of range
The `LiveOBFeatureEngine` returns `per_asset[asset] = None` when the book for that asset is not yet initialized.
---
## 22. Import Architecture and the Shim Loader
### 22.1 The Problem
`nautilus_dolphin/nautilus_dolphin/nautilus/__init__.py` contains:
```python
import nautilus_trader # ← raises RuntimeError if not installed
```
The modules `ob_provider.py`, `ob_features.py`, `ob_placer.py` use relative imports:
```python
from .ob_provider import OBProvider, OBSnapshot
```
Relative imports require a proper Python package context (`__package__` must be set). Simply adding the `nautilus/` directory to `sys.path` breaks relative imports.
### 22.2 The Shim Solution
**`tests/_naut_ob_shim.py`** creates a fake package namespace `_naut_ob` using `importlib.util`:
```python
import types, sys, importlib.util
from pathlib import Path
_naut_dir = Path(__file__).parent.parent / "nautilus_dolphin" / "nautilus_dolphin" / "nautilus"
# Create fake package
_p = types.ModuleType("_naut_ob")
_p.__path__ = [str(_naut_dir)]
_p.__package__ = "_naut_ob"
sys.modules["_naut_ob"] = _p
# Load each module as _naut_ob.{name}
def _load(name):
fn = f"_naut_ob.{name}"
sp = importlib.util.spec_from_file_location(fn, _naut_dir / f"{name}.py")
m = importlib.util.module_from_spec(sp)
m.__package__ = "_naut_ob" # ← makes relative imports work
sys.modules[fn] = m
sp.loader.exec_module(m)
return m
```
With `m.__package__ = "_naut_ob"`, when `ob_features.py` executes `from .ob_provider import OBSnapshot`, Python resolves it as `from _naut_ob.ob_provider import OBSnapshot` — which exists in `sys.modules` because `_load("ob_provider")` was called first.
**`prod/obf_persistence.py`** contains the same shim logic inline in `LiveOBFeatureEngine.update()` as a fallback, allowing the production flow to work both with and without `nautilus_trader` installed.
---
## 23. Test Architecture
### 23.1 Unit Tests — `tests/test_obf_unit.py` (120 tests)
| Class | Tests | What it covers |
|---|---|---|
| `TestImbalanceKernel` | 7 | `compute_imbalance_nb`: zero case, full bias, partial, symmetric |
| `TestDepthKernels` | 9 | depth_1pct, depth_quality, fill_probability, spread_proxy, depth_asymmetry |
| `TestRollingKernels` | 10 | persistence, withdrawal_velocity, short history edge cases |
| `TestMockOBProvider` | 8 | bias mechanics, depth_scale, per-asset overrides, timestamps |
| `TestCSVOBProviderKnownValues` | 6 | Anchored against real 2025-01-15 BTC data |
| `TestOBFeatureEngine` | 11 | preload_date, all 4 sub-systems, neutral returns |
| `TestLiveOBFeatureEngine` | 10 | incremental update, calibration, market/macro, None snap handling |
| `TestOBPlacer` | 10 | all decision branches, OB confirmation, SKIP/TAKER/MAKER/PATIENT_MAKER |
| `TestBacktestAdapterStochastic` | 6 | fill_prob formula, size penalty, FILL_DISCOUNT_FACTOR invariant |
| `TestSmartPlacerConstants` | 4 | frozen dataclass, threshold ordering, timeout ordering |
| `TestSmartPlacerDecisions` | 8 | decide_placement, compute_limit_price, ob_confirms_signal |
| `TestOBFFlowImport` | 6 | AsyncOBThread, OBFPersistenceService, constants |
| `TestHZOBProvider` *(new)* | 7 | Deserialise HZ payload, missing key→None, malformed JSON, dtype, discovery |
| `TestOBStreamServiceStaleDetection` *(new)* | 4 | `is_stale()`: cold/recent/old/custom threshold |
| `TestOBStreamServiceCrossedBook` *(new)* | 4 | Crossed book→None, locked book→None, valid book→dict, uninit→None |
| `TestOBStreamServiceBufferReplay` *(new)* | 3 | Buffer drain: stale events discarded, zero-qty removal, level update |
**Critical invariant test:**
```python
def test_fill_discount_is_0_80(self):
self.assertAlmostEqual(DEFAULT_CONFIG.FILL_DISCOUNT_FACTOR, 0.80)
```
**Known-value anchor** (from 2025-01-15 CSV data):
```python
def test_btc_depth_1pct_above_100m(self):
# BTC 1% depth should be > $100M on a normal trading day
snap = self.provider.get_snapshot_by_index("BTCUSDT", 10)
d1pct = snap.bid_notional[0] + snap.ask_notional[0]
self.assertGreater(d1pct, 100_000_000)
```
### 23.2 E2E Tests — `tests/test_obf_e2e.py` (56 tests)
| Class | Tests | What it covers |
|---|---|---|
| `TestOBStreamServiceMockState` | 8 | `_apply_event`, zero qty removal, best bid/ask, bucket shape |
| `TestFullPipeline` | 7 | End-to-end: MockOBProvider → OBFeatureEngine → OBPlacer |
| `TestParquetPersistence` | 10 | force_flush, schema, row counts, hive partition, MD5, lag_s, multi-asset |
| `TestPersistenceServiceLifecycle` | 6 | start/stop, thread daemon, concurrent updates, stats |
| `TestReplayPipeline` | 5 | CSVOBProvider → OBFeatureEngine with real 2025-01-15 data |
| `TestDualPathValidation` | 5 | Live + stochastic paths run simultaneously without interference |
| `TestOBFPrefectIntegration` | 14 | Flow constants, AsyncOBThread, warmup, HZ key format, lag detection |
**Dual-path test** (the most important E2E test):
```python
def test_live_and_stochastic_coexist(self):
# Live feature engine should not affect BacktestAdapter state
live_engine = LiveOBFeatureEngine(["BTCUSDT"])
backtest_adapter = BacktestAdapter(price_data, ...)
# Run both simultaneously
for _ in range(10):
snap = {"BTCUSDT": mock_snap}
live_result = live_engine.update(snap)
backtest_adapter.set_time(t)
fills = backtest_adapter.submit_order("BTCUSDT", "LONG", 100000, 0.1, "MAKER")
# Neither should corrupt the other's state
self.assertIsNotNone(live_result)
self.assertIsNotNone(fills)
```
### 23.3 Running Tests
```bash
# Unit tests only (fast, ~23s — numba warmup on first run)
python -m unittest tests.test_obf_unit -v
# E2E tests only (slower, ~183s — includes CSV loads and Parquet writes)
python -m unittest tests.test_obf_e2e -v
# Full suite (ExtF + EsoF + OBF)
python tests/run_all_tests.py
```
---
## 24. Invariants That Must Never Change
These are contractual constants embedded in tests and regression suites:
| Invariant | Value | Reason |
|---|---|---|
| `FILL_DISCOUNT_FACTOR` | 0.80 | Calibrated against historical fill rate data; changing it invalidates all backtest PnL history |
| `CONFIDENCE_TAKER_THRESHOLD` | 0.85 | Hardcoded in OBPlacer, SmartPlacer, constants — must be identical in all three |
| `CONFIDENCE_MAKER_THRESHOLD` | 0.40 | Same as above |
| `CASCADE_THRESHOLD` | 0.10 | Regime detection boundary; changing it changes which macro regimes are declared STRESS |
| `MEDIAN_CALIB_N` | 60 | Warmup period for live depth reference; first 60 live cycles produce `depth_quality=1.0` |
| OBSnapshot array index convention | `[0]` = near, `[4]` = far | Used in every kernel and CSV parser; reversing would silently corrupt all features |
| CSV file date filter | `cutoff <= file_date < reference_date` | Anti-leakage; including `reference_date` itself would allow future data into median computation |
---
## 25. Expected Performance Characteristics
### 25.1 Computational Load
| Component | Time | Notes |
|---|---|---|
| `get_depth_buckets_sync()` per asset | ~2 ms | asyncio round-trip + sort + bin |
| `LiveOBFeatureEngine.update()` (3 assets) | ~1 ms | Numba kernels are JIT-compiled; first call ~2s warmup |
| `hz_push_obf_task()` (submit, not result) | ~0.5 ms | Non-blocking submit |
| `_write_local_cache()` | ~1 ms | Small JSON file |
| `persist.update_snapshot()` × 3 | <0.01 ms | Deque append, lock-protected |
| **Total hot loop** | **~5 ms** | Leaving ~495 ms of sleep per 0.5 s cycle |
| `OBFPersistenceService._flush()` (300 assets rows × 3) | ~50 ms | Parquet write, happens in background thread |
### 25.2 Memory Usage
| Component | Memory |
|---|---|
| In-memory L2 book (3 assets, up to 1000 levels each) | ~6 MB |
| Rolling deques (LiveOBFeatureEngine, 3 assets, 60 samples) | ~50 KB |
| Rolling deques (OBFPersistenceService, 3 assets, 360 samples × 34 fields) | ~1.5 MB |
| OBF Parquet file (per 5-min flush, 3 assets, ~360 rows) | ~150 KB (snappy compressed) |
### 25.3 Parquet Storage Growth
At 2 Hz, 3 assets, 5-minute flush intervals:
```
Rows per file per asset: 360
Rows per day per asset: 2 × 60 × 60 × 24 = 172,800
Files per day per asset: 288 (one per 5 minutes)
Compressed file size: ~150 KB per 5-min file
Storage per asset/day: ~43 MB (uncompressed ~200 MB)
Storage all 3 assets/day: ~130 MB compressed
Storage per year: ~47 GB compressed
```
### 25.4 Known-Value Anchors (from 2025-01-15 BTC data)
| Metric | Expected value |
|---|---|
| `depth_1pct_usd` for BTC | > $100M |
| `imbalance` range | [-1.0, +1.0], typical |imbalance| < 0.3 |
| `fill_probability` at normal depth | ~0.860.95 |
| `agreement_pct` in trending market | > 0.67 (2/3 assets agree) |
| WS event latency (local_ts exchange_ts) | 550ms from datacenter |
| `lag_s` (hot loop processing latency) | < 20ms typically |
---
## 26. Future: Nautilus Trader Integration
The user has explicitly stated: **"at some point we *will* bring Nautilus Trader in to handle the 'actual trade execution' for the system."**
The OBF subsystem is designed to accommodate this:
### 26.1 What Changes with Nautilus Trader
Nautilus Trader is a high-performance trading framework with:
- Its own order book management (`nautilus_trader.model.orderbook`)
- Direct exchange connectivity (Binance Futures connector exists)
- Native order submission and fill management
When integrated:
- `OBStreamService` and `AsyncOBThread` will be **replaced** by Nautilus's native `BinanceFuturesDataClient`
- `BacktestAdapter` will be **replaced** by Nautilus's backtesting engine
- `LiveAdapter` (currently a stub) will be **replaced** by Nautilus's `BinanceFuturesExecutionClient`
### 26.2 What Stays the Same
- `OBFeatureEngine` kernels (numba functions are framework-agnostic)
- `OBSnapshot` dataclass (Nautilus provides raw data; we still need to convert to our format)
- `LiveOBFeatureEngine` (logic unchanged, just fed from Nautilus's data stream)
- `OBFPersistenceService` (Parquet persistence is independent)
- `OBPlacer` and `SmartPlacer` (decision logic unchanged)
- `FILL_DISCOUNT_FACTOR = 0.80` (preserved for regression continuity)
- All Parquet data accumulated before integration (directly usable for Nautilus backtesting)
### 26.3 Transition Path
The cleanest transition:
1. Add a `NautilusOBProvider(OBProvider)` that reads from Nautilus's order book
2. Add a `NautilusAdapter(ExchangeAdapter)` that wraps Nautilus order submission
3. Run both `OBStreamService` and Nautilus in parallel for a validation period
4. Cross-validate: `LiveOBFeatureEngine.update()` outputs should match between sources
5. Cut over once cross-validation passes (use `HZOBProvider` to bridge the data)
The existing Parquet archive (`/mnt/ng6_data/ob_features/`) is directly usable as Nautilus's historical data source after format adaptation.
---
## 27. Deployment
### 27.1 Direct Launch (Testing / Development)
```bash
cd /mnt/dolphinng5_predict
PREFECT_API_URL=http://localhost:4200/api \
nohup python3 prod/obf_prefect_flow.py > /tmp/obf_prefect.log 2>&1 &
```
### 27.2 Prefect Deployment
```bash
# Register the deployment
prefect deploy -n obf-daemon --pool dolphin
# Or via YAML
prefect deploy --all # picks up prod/obf_deployment.yaml
```
`prod/obf_deployment.yaml`:
```yaml
name: obf-daemon
entrypoint: prod/obf_prefect_flow.py:obf_prefect_flow
version: 1.0.0
tags: ["obf", "order-book", "production"]
work_pool:
name: dolphin
work_queue_name: default
parameters:
warmup_s: 8
poll_interval_s: 0.5
assets: [BTCUSDT, ETHUSDT, SOLUSDT]
infrastructure:
type: process
```
### 27.3 Adding an Asset
1. Add the asset to `assets` in `obf_deployment.yaml`
2. Add a CSV file `ob_data/{ASSET}-bookDepth-{DATE}.csv` for historical replay
3. The Binance WS stream supports multiple assets in a single connection (comma-separated streams URL)
4. No code changes required
### 27.4 Monitoring
Prefect UI → `obf-prefect-flow` → most recent run. Every 60 seconds the flow logs:
```
OBF status | pushes=240 errors=0 lag_s=0.012 assets_live=3/3 files=6
```
Critical alerts:
- `errors > 0` and increasing: Hazelcast connectivity issue
- `assets_live < 3`: One or more books failed to initialize or lost WS connection
- `lag_s > 1.0s`: System is overloaded or HZ is slow
- `LAG DRIFT DETECTED`: Investigate immediately
---
## 28. Known Caveats and Edge Cases
### 28.1 Pre-calibration Period (First 30 Seconds Live)
For the first 60 samples (30 seconds at 0.5 s polling), `depth_quality = 1.0` for all assets because `_median_ref` is not yet calibrated. During this period:
- `fill_probability` will be `1 - e^{-2} ≈ 0.865` regardless of actual depth
- This is acceptable — it represents maximum uncertainty, which defaults to the "normal" case
### 28.2 Reconnection Gaps
After a WebSocket reconnection, affected assets return `None` from `get_depth_buckets_sync()` until the REST snapshot completes (~200500 ms). The flow skips those assets for those cycles. The `_n_assets_live` counter in `obf_latest` will reflect this.
### 28.3 OBFeatureEngine Macro State (Batch Mode) — FIXED
~~`_preloaded_macro` is a single `OBMacroFeatures` value.~~ **Fixed in sprint 1 (P1-3).**
`_preloaded_macro_map: Dict[int, OBMacroFeatures]` now stores a separate value for each snapshot index. `get_macro(timestamp_or_idx)` resolves the correct bar and returns the right macro state. Backtests at any bar index now receive the macro regime as it was at that exact time, not the end-of-day value.
### 28.4 Bar Index Alignment (6:1 ratio) — CONFIGURABLE
`_resolve_idx(asset, bar_idx)` uses `bar_idx // self.bar_to_snap_ratio` to convert VBT 5-second bar indices to OB 30-second snapshot indices. Defaults assume:
- VBT uses **exactly 5 second** bars
- OB snapshots are **exactly 30 seconds** apart
- Timestamps start at the same UTC epoch
`bar_to_snap_ratio` is now a constructor parameter (P2-7 fix). Default remains `6`. Pass a different value if bar granularity changes.
### 28.5 `compute_market_agreement_nb` with Uniform Zero Imbalance
If `all(imbalance == 0.0)` (e.g., all assets are perfectly balanced), `median == 0.0` and the kernel returns `(0.0, 0.5)` — neutral agreement. This is the spec-defined behavior. Tests verify this via `MockOBProvider(imbalance_bias=0.0)`.
### 28.6 Hive Partition Columns in PyArrow Schema
When reading a Hive-partitioned Parquet directory with `pyarrow.dataset` or `pq.read_table()`, PyArrow adds `exchange`, `symbol`, and `date` columns to the schema. These are NOT in `OB_SCHEMA`. Code that compares schema names must use subset checking, not equality.
### 28.7 CSV File Date Boundary
`CSVOBProvider` loads files where `cutoff_start <= file_date < reference_date`. To use data from date `2025-01-15`, call `preload_date("2025-01-16", ...)`. This is intentional (anti-leakage) but non-obvious. Tests document this with `preload_date("2025-01-16", ["BTCUSDT"])` to access `2025-01-15` data.
---
---
## 29. Reliability Mechanisms (Sprint 1 Hardening)
All items below were identified in the P0/P1/P2 audit and implemented in sprint 1 (2026-03-22).
| Mechanism | Trigger | Action | Where |
|---|---|---|---|
| **HZ Circuit Breaker** | 5 consecutive consolidated push failures | Skip all HZ for 30 cycles (~15s), then probe again | `obf_prefect_flow.py` |
| **HZ Preflight** | Startup before hot loop | Write test key, retry 5×@3s; abort flow if all fail | `obf_prefect_flow.py` |
| **WS Stall Watchdog** | `last_event_ts` not updated for >30s | `log.error` every status cycle; `is_stale()` exposed for callers | `ob_stream_service.py` |
| **Crossed Book Guard** | `best_bid >= best_ask` | Return `None` from `get_depth_buckets()`; no features computed | `ob_stream_service.py` |
| **Dark Streak Counter** | Asset returns `None` for N cycles | Warn at 5 (2.5s); log restore; critical log at 120 all-dark (60s) | `obf_prefect_flow.py` |
| **Exponential Backoff** | WS reconnection failures | 3→6→12→…→120s, ±1s jitter; resets after 60s stable | `ob_stream_service.py` |
| **Fire-and-forget pushes** | Per-asset HZ pushes | `submit()` without `.result()`; only consolidated push blocks | `obf_prefect_flow.py` |
| **Per-key error tracking** | HZ push failures | `push_errors: Dict[str, int]`; top-3 logged | `obf_prefect_flow.py` |
| **First flush at 60s** | Startup | First Parquet file written 60s after start (not 5min) | `obf_persistence.py` |
| **Buffer covers flush window** | Continuous | `HISTORY_MAXLEN=700 ≥ FLUSH_INTERVAL/POLL + margin` | `obf_persistence.py` |
| **Microsecond filename** | Double flush within 1s | `%H-%M-%S-%f` prevents collision | `obf_persistence.py` |
| **Partition cleanup** | Each flush | Prune `date=` dirs older than `MAX_FILE_AGE_DAYS=7` | `obf_persistence.py` |
| **Shared aiohttp session** | REST fetches | Single session for `stream()` lifetime; no per-call SSL handshake | `ob_stream_service.py` |
| **Per-timestamp macro** | Backtesting | `_preloaded_macro_map[snap_idx]` — not a single end-of-day value | `ob_features.py` |
| **Dynamic HZ asset discovery** | `HZOBProvider.get_assets()` | Key-set scan for `asset_*_ob`; no hardcoded list | `hz_ob_provider.py` |
| **AsyncOBThread.stop()** | Flow shutdown/test teardown | Signal `_stop_future`; thread is joinable | `obf_prefect_flow.py` |
---
*This document was generated from exhaustive study of all OBF-related source code and updated after sprint 1 hardening. Sources: `external_factors/ob_stream_service.py`, `nautilus_dolphin/nautilus_dolphin/nautilus/ob_provider.py`, `nautilus_dolphin/nautilus_dolphin/nautilus/ob_features.py`, `nautilus_dolphin/nautilus_dolphin/nautilus/ob_placer.py`, `nautilus_dolphin/nautilus_dolphin/nautilus/hz_ob_provider.py`, `alpha_engine/execution/smart_placer.py`, `alpha_engine/execution/smartplacer_constants.py`, `alpha_engine/execution/adapters.py`, `alpha_engine/execution/fill_simulator.py`, `prod/obf_persistence.py`, `prod/obf_prefect_flow.py`, `prod/obf_deployment.yaml`, `tests/test_obf_unit.py`, `tests/test_obf_e2e.py`, `tests/_naut_ob_shim.py`, `scripts/verify_parquet_archive.py`, `ob_cache/SCHEMA.md`.*