Includes core prod + GREEN/BLUE subsystems: - prod/ (BLUE harness, configs, scripts, docs) - nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved) - adaptive_exit/ (AEM engine + models/bucket_assignments.pkl) - Observability/ (EsoF advisor, TUI, dashboards) - external_factors/ (EsoF producer) - mc_forewarning_qlabs_fork/ (MC regime/envelope) Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
79 KiB
Executable File
Order Book Feature (OBF) Subsystem — Complete Technical Reference
Status: Production-ready under Prefect — Sprint 1 hardening complete
Last updated: 2026-03-22 (sprint 1 complete — all P0/P1/P2 reliability gaps closed)
Test coverage: 120 unit tests + 56 E2E tests — all passing (18 new tests added in sprint 1)
Data sources studied: All files in external_factors/, nautilus_dolphin/nautilus_dolphin/nautilus/, alpha_engine/execution/, prod/
Table of Contents
- Why This Subsystem Exists
- System Architecture Overview
- Data Flow — End to End
- Layer 1 — OBStreamService (Live L2 Book)
- Layer 2 — OBSnapshot and OBProvider Hierarchy
- Layer 3 — OBFeatureEngine (Batch Backtesting)
- Layer 3B — LiveOBFeatureEngine (Streaming Production)
- Sub-system 1 — Per-Asset Placement Features
- Sub-system 2 — Per-Asset Signal Features
- Sub-system 3 — Cross-Asset Market Signal
- Sub-system 4 — Macro Regime / Market Structure
- Layer 4 — OBPlacer (Execution Advice)
- SmartPlacer Integration
- BacktestAdapter and Fill Simulator
- Prefect Flow — obf_prefect_flow
- AsyncOBThread — Async/Sync Bridge
- OBFPersistenceService — Parquet Layer
- Hazelcast Integration
- Timing, Rate Limits, and Lag Detection
- Parquet Schema — Complete Field Reference
- Neutral Defaults and Graceful Degradation
- Import Architecture and the Shim Loader
- Test Architecture
- Invariants That Must Never Change
- Expected Performance Characteristics
- Future: Nautilus Trader Integration
- Deployment
- Known Caveats and Edge Cases
1. Why This Subsystem Exists
The alpha engine makes directional predictions on crypto futures. Those predictions need to be executed. Execution is not free:
| Method | Binance Futures fee | Notes |
|---|---|---|
| Taker (market order) | 0.05% per side | Immediate fill, crosses spread |
| Maker (limit order) | 0.02% per side | Passive fill, may not fill |
On a $9,375 leveraged position, the fee difference per side is 0.03% × $9,375 = $2.81. Over hundreds of trades per week this is material. But a maker order that never fills is an opportunity cost equal to the entire signal's expected value.
The OBF subsystem answers two questions per trade signal:
- Is the book liquid enough to get a maker fill? (Sub-systems 1 and 2)
- Is the order book confirming or contradicting the signal direction? (Sub-systems 2, 3, and 4)
These answers feed SmartPlacer, which decides: TAKER / MAKER / PATIENT_MAKER / SKIP.
Additionally, the raw order book data is persisted to Parquet for:
- Backtesting future strategies against real L2 microstructure
- Calibrating fill probability models
- Detecting market structure regimes in strategy development
2. System Architecture Overview
Binance Futures WebSocket
wss://fstream.binance.com/stream
streams: btcusdt@depth@100ms + ethusdt@depth@100ms + solusdt@depth@100ms
│
▼
┌────────────────────────────────────┐
│ OBStreamService │ external_factors/ob_stream_service.py
│ In-memory L2 book (price→qty) │
│ REST snapshot on init/reconnect │
│ asyncio event loop │
└────────────────┬───────────────────┘
│ get_depth_buckets() → dict with 5-bucket notional arrays
▼
┌────────────────────────────────────┐
│ AsyncOBThread │ prod/obf_prefect_flow.py
│ daemon threading.Thread │
│ Bridges async → sync for Prefect │
│ get_depth_buckets_sync() → dict │
└────────────────┬───────────────────┘
│ raw_snaps dict per asset
▼
┌────────────────────────────────────┐
│ LiveOBFeatureEngine │ prod/obf_persistence.py
│ Incremental rolling computation │
│ 4 sub-systems per asset │
│ Sub-3: cross-asset market signal │
│ Sub-4: macro regime │
└─────────┬──────────────────────────┘
│ │
▼ ▼
┌────────────────────┐ ┌─────────────────────┐
│ Hazelcast │ │ OBFPersistenceService│
│ DOLPHIN_FEATURES │ │ flush 300s │
│ obf_latest │ │ first flush 60s │
│ asset_X_ob │ │ /mnt/ng6_data/ │
│ circuit breaker │ │ ob_features/ │
│ preflight probe │ │ cleanup >7d │
└────────────────────┘ └─────────────────────┘
│
▼
┌────────────────────┐
│ HZOBProvider │ nautilus_dolphin/.../hz_ob_provider.py
│ lazy connect │ discovers assets from HZ key set
│ OBPlacer │ nautilus_dolphin/.../ob_placer.py
│ SmartPlacer │ alpha_engine/execution/smart_placer.py
└────────────────────┘
│
▼
PlacementDecision: MAKER / TAKER / PATIENT_MAKER / SKIP
For backtesting, the path is different:
ob_data/{ASSET}-bookDepth-{DATE}.csv
│
▼
CSVOBProvider → OBFeatureEngine.preload_date()
│ (batch, pre-computes all features for one date)
▼
engine.get_placement(asset, bar_idx)
engine.get_signal(asset, bar_idx)
engine.get_market(bar_idx)
engine.get_macro()
3. Data Flow — End to End
3.1 Production (live) path — every 0.5 s
t=0.000 WS event received by OBStreamService (100ms cadence from Binance)
t=0.000 _apply_event() updates in-memory bid/ask dicts
t=0.500 Prefect flow wakes on poll_interval_s=0.5
t=0.500 get_depth_buckets_sync(asset) called for each asset
└── asyncio.run_coroutine_threadsafe(..., timeout=0.5s)
└── get_depth_buckets() reads sorted bids/asks, bins into 5 pct buckets
t=0.500 local_ts = time.time() stamped
t=0.500 LiveOBFeatureEngine.update(raw_snaps) called
├── Sub-1: depth_1pct_usd, depth_quality, fill_probability, spread_proxy_bps
├── Sub-2: imbalance, imbalance_ma5, imbalance_persistence, depth_asymmetry, withdrawal_velocity
├── Sub-3: median_imbalance, agreement_pct, depth_pressure
└── Sub-4: depth_velocity, cascade_count, acceleration, regime_signal
t=0.501 circuit breaker check: if HZ open, skip HZ entirely this cycle
t=0.501 hz_push_obf_task.submit(client, "asset_BTCUSDT_ob", ...) × 3 [fire-and-forget]
t=0.502 hz_push_obf_task.submit(client, "obf_latest", ...).result(timeout=1.5) [blocks]
t=0.502 _write_local_cache(consolidated) — atomic JSON to ob_cache/latest_ob_features.json
t=0.502 pushed_at = time.time()
t=0.502 lag_s = pushed_at − local_ts (typically < 10 ms on same machine)
t=0.502 persist.update_snapshot(asset, row) for each asset [O(1), lock-protected]
t=0.500 (first flush after 60s, then every 300s) OBFPersistenceService._flush() → Parquet
t=0.500 after flush: _cleanup_old_partitions() prunes dirs older than MAX_FILE_AGE_DAYS=7
3.2 Backtesting path
CSVOBProvider._load_asset(asset)
└── reads {ASSET}-bookDepth-{DATE}.csv
└── filters to files where: cutoff_start <= file_date < reference_date
└── pivots 10 rows/snapshot → arrays of shape [N, 5]
└── caches in self._cache["{asset}_{reference_date}"]
OBFeatureEngine.preload_date("2025-01-16", ["BTCUSDT"])
└── iterates all N snapshots in O(N) pass
└── builds rolling imb_hist and dep_hist arrays incrementally
└── computes all numba kernels per snapshot
└── stores in _preloaded_placement[asset][snap_idx], _preloaded_signal[asset][snap_idx]
└── computes cross-asset features at each snap_idx
└── updates _preloaded_macro at each snap_idx (uses latest)
└── _preloaded = True
During backtest bar loop:
engine.get_placement("BTCUSDT", bar_idx)
└── _resolve_idx(): if bar_idx < 1e9 → snap_idx = bar_idx // 6
(VBT 5s bars : OB 30s snapshots = 6:1)
└── returns _preloaded_placement["BTCUSDT"][snap_idx]
4. Layer 1 — OBStreamService (Live L2 Book)
Source: external_factors/ob_stream_service.py
4.1 Initialization Protocol (Binance Diff Book)
Binance Futures requires a specific protocol to build a synchronized L2 book from diff streams. The service implements this exactly:
- Start buffering WS events immediately before the REST snapshot is ready.
- Fetch REST snapshot via
GET /fapi/v1/depth?symbol={ASSET}&limit=1000.- Weight cost: 20 per request. Used only on init and reconnect. Not rate-limited in steady state.
- Returns
lastUpdateId(the sequence number of the snapshot).
- Replay buffered events, discarding any with
event['u'] <= last_update_id(already reflected in snapshot). - Mark
initialized[asset] = True.
This protocol ensures the local book is exactly synchronized with Binance's server-side book at the time of snapshot. Any events with u > lastUpdateId that arrived while the REST call was in flight are buffered and applied in order.
4.2 Update Application
Every WebSocket diff event carries:
'b'— list of[price_str, qty_str]bid updates'a'— list of[price_str, qty_str]ask updates'u'— final update ID
For each price/qty pair:
qty == 0.0→ delete that price level from the dictqty > 0.0→ upsert that price level
This is the standard Binance L2 maintenance algorithm. The result is a bids: Dict[float, float] and asks: Dict[float, float] that are exact replicas of the exchange's book state.
4.3 Reconnection Handling
On websockets.exceptions.ConnectionClosed:
- All assets reset to
initialized[a] = False. - REST snapshots refetched concurrently for all assets using the shared session.
- WS reconnection attempted after
reconnect_delayseconds (exponential backoff).
Exponential backoff (P1-2):
reconnect_delaystarts at 3s, doubles each failure: 3→6→12→24→48→96→120→120…- Capped at
_RECONNECT_MAX_S = 120swith ±1s random jitter per attempt - Resets to 3s if the connection stayed live for ≥
_RECONNECT_STABLE_S = 60s - Prevents REST request storms under sustained exchange instability (2400 weight/min limit)
WS stall detection (P0-2):
self.last_event_ts: floatstamped on every received WS messageis_stale(threshold_s=30.0) -> boolreturnsTrueif no events for >30s- Hot loop checks
ob_thread.is_stale()everyLOG_STATUS_EVERY=120cycles and logs an error
During the reconnection window, get_depth_buckets() returns None (book not initialized). The flow handles this gracefully (skips those assets that cycle).
4.4 Depth Bucket Extraction
get_depth_buckets(asset) converts the raw price→qty dicts into the 5-bucket format needed downstream.
Crossed book guard (P0-3): Before computing, validates best_bid < best_ask. If the book is crossed (best_bid >= best_ask) — which can occur during reconnection, partial snapshot application, or exchange instability — the method returns None instead of computing corrupted features. Corrupted features would otherwise propagate to HZ and the Parquet archive.
For each bid level at price p, quantity q:
dist_pct = (mid - p) / mid × 100
idx = int(dist_pct) ← truncated, so idx 0 = [0%, 1%), idx 1 = [1%, 2%), etc.
if idx < max_depth_pct (=5):
bid_notional[idx] += p × q ← USD value
bid_depth[idx] += q ← quantity in asset units
else: break ← bids sorted descending, safe to break
Same logic for asks (sorted ascending, dist_pct = (p - mid) / mid × 100).
Returns:
{
"timestamp": time.time(), # local extract wall clock
"asset": "BTCUSDT",
"bid_notional": np.array([...]), # [5] float64, USD at 0-1%, 1-2%, 2-3%, 3-4%, 4-5% below mid
"ask_notional": np.array([...]), # [5] float64, USD at 0-1%, 1-2%, ... above mid
"bid_depth": np.array([...]), # [5] float64, asset qty at each band
"ask_depth": np.array([...]), # [5] float64, asset qty at each band
"best_bid": float, # highest bid price
"best_ask": float, # lowest ask price
"spread_bps": float, # (ask - bid) / mid × 10,000
}
Important: Index 0 corresponds to the nearest-to-mid band (within 1% of mid). Index 4 is the farthest (4–5% from mid). This is consistent throughout all downstream consumers.
4.5 Thread Safety
The service uses per-asset asyncio.Lock objects. All reads and writes to bids[asset] and asks[asset] are protected. This is important because get_depth_buckets() sorts the full dict while WS events continue arriving.
5. Layer 2 — OBSnapshot and OBProvider Hierarchy
Source: nautilus_dolphin/nautilus_dolphin/nautilus/ob_provider.py
5.1 OBSnapshot Dataclass
@dataclass
class OBSnapshot:
timestamp: float # Unix seconds
asset: str # e.g. "BTCUSDT"
bid_notional: np.ndarray # [5] float64 — USD at -1% to -5% from mid (cumulative per band)
ask_notional: np.ndarray # [5] float64 — USD at +1% to +5% from mid
bid_depth: np.ndarray # [5] float64 — asset qty at each band
ask_depth: np.ndarray # [5] float64 — asset qty at each band
All notional arrays use the same indexing convention as OBStreamService:
[0]= within 1% of mid (the "near" band, most liquid)[4]= 4–5% from mid (the "far" band, less liquid)
Notional values represent the total USD value within that percentage band, not cumulative from mid. For example, bid_notional[2] is the total USD of bids between 2% and 3% below mid.
5.2 OBProvider Abstract Base Class
class OBProvider(ABC):
@abstractmethod
def get_snapshot(self, asset: str, timestamp: float) -> Optional[OBSnapshot]: ...
@abstractmethod
def get_assets(self) -> List[str]: ...
@abstractmethod
def get_all_timestamps(self, asset: str) -> np.ndarray: ...
Three concrete implementations:
| Class | Source | Use case |
|---|---|---|
CSVOBProvider |
ob_data/{ASSET}-bookDepth-{DATE}.csv |
Historical replay, backtesting |
MockOBProvider |
Synthetic, fully configurable | Unit testing |
HZOBProvider |
Hazelcast asset_{ASSET}_ob key |
Live alpha engine reads |
5.3 CSVOBProvider
CSV format: One row per price level per timestamp:
timestamp, percentage, depth, notional
2025-01-15 00:00:05, -5, 4697.736, 444223397.14
2025-01-15 00:00:05, -4, 2103.451, 199827634.85
...
2025-01-15 00:00:05, +1, 891.234, 84667183.21
...
10 rows per snapshot (5 bid levels: -1 to -5, 5 ask levels: +1 to +5). The percentage sign follows the convention: negative = bid side (below mid), positive = ask side.
Index mapping:
percentage = -1→idx = 0(near bid)percentage = -5→idx = 4(far bid)percentage = +1→idx = 0(near ask)percentage = +5→idx = 4(far ask)
Date filtering (anti-leakage): When reference_date is set, _load_asset() only loads CSV files where:
reference_date − 7 days <= file_date < reference_date
Files from reference_date itself are excluded. This prevents the feature engine from seeing future data when computing the median depth reference used in normalization. The caller must pass "2025-01-16" to use data from 2025-01-15.
Lookup: Binary search (bisect.bisect_left) on the sorted timestamp array. O(log N) per query. With 2880 snapshots/day (30s intervals) this is effectively instant.
Tolerance: tolerance_s=30.0 — if the nearest snapshot is more than 30 s away from the requested timestamp, None is returned.
5.4 MockOBProvider
Generates synthetic snapshots with configurable imbalance_bias and depth_scale.
The mathematical relationship between imbalance_bias and the computed imbalance is exact:
bid_mult = 1 + bias
ask_mult = 1 − bias
total_bid = base × Σ(level_weights) × bid_mult = base × 15 × (1 + bias)
total_ask = base × Σ(level_weights) × ask_mult = base × 15 × (1 − bias)
imbalance = (bid − ask) / (bid + ask)
= (15(1+b) − 15(1−b)) / (15(1+b) + 15(1−b))
= 2b × 15 / (2 × 15)
= b
Therefore compute_imbalance_nb(bid_notional, ask_notional) == imbalance_bias exactly. This makes tests deterministic.
level_weights = [1.0, 2.0, 3.0, 4.0, 5.0] — depth increases with distance from mid, reflecting realistic order book shape where more passive orders sit further out.
imbalance_biases: Dict[str, float] allows per-asset overrides, so you can test cross-asset disagreement scenarios.
num_snapshots=2880 with 30 s spacing models a full trading day (2880 × 30 = 86400 s).
5.5 HZOBProvider
Reads live data from Hazelcast key asset_{ASSET}_ob. Used by the production alpha engine to construct OBSnapshot objects for the OBFeatureEngine. Not used in the OBF flow itself — the flow pushes to HZ; consumers pull via this provider.
Lazy connection (P1-4): The HZ client is not created at construction time. _ensure_connected() is called on first use (first get_snapshot() or get_assets() call). This prevents failures during unit tests and avoids connection race conditions at startup.
Dynamic asset discovery (P1-4): get_assets() scans HZ key set for keys matching asset_*_ob pattern. This reflects the actual assets being pushed by the OBF flow, eliminating the prior hardcoded list that included BNBUSDT (not tracked). Alternatively, an explicit assets: List[str] list can be passed to the constructor for deterministic, fast operation.
Error handling: Separate json.JSONDecodeError catch logs the specific field; all other exceptions log and return None. No exception propagates to the caller.
6. Layer 3 — OBFeatureEngine (Batch Backtesting)
Source: nautilus_dolphin/nautilus_dolphin/nautilus/ob_features.py
6.1 Design Philosophy
The batch OBFeatureEngine is designed for fast backtesting: it pre-computes all features for an entire date in a single O(N) pass, then serves results in O(1) via dict lookup. This is the right design for vectorbt integration where all bars are processed up front.
In production, LiveOBFeatureEngine (Section 7) handles incremental streaming.
6.2 State
_imbalance_history: Dict[str, deque] # per-asset rolling imbalance values
_depth_1pct_history: Dict[str, deque] # per-asset rolling 1% depth values
_market_depth_history: deque # aggregate depth across all assets
_median_depth_ref: Dict[str, float] # computed in preload, used for normalization
_preloaded_placement: Dict[str, Dict[int, OBPlacementFeatures]]
_preloaded_signal: Dict[str, Dict[int, OBSignalFeatures]]
_preloaded_market: Dict[int, OBMarketFeatures]
_preloaded_macro_map: Dict[int, OBMacroFeatures] # per snap_idx (P1-3 fix)
_ts_to_idx: Dict[str, Dict[float, int]] # timestamp → snap index map
bar_to_snap_ratio: int = 6 # configurable (P2-7)
6.3 preload_date() Algorithm
1. Set provider.reference_date = date_str (enables date filtering in CSVOBProvider)
2. For each asset:
a. Load all N snapshots
b. Compute depths: depths[i] = bid_notional[i][0] + ask_notional[i][0]
c. _median_depth_ref[asset] = median(depths) ← normalization reference
3. For snap_idx in range(N):
For each asset:
a. Compute raw features via numba kernels (imb, d1pct, dq, fp, sp, da)
b. Append to imb_hist[asset], dep_hist[asset]
c. Compute rolling features (ma5, persistence, velocity)
d. Store in _preloaded_placement[asset][snap_idx]
e. Store in _preloaded_signal[asset][snap_idx]
f. Collect (imb, velocity) for cross-asset computation
Compute Sub-3 (market agreement) from all assets' imbalances
Compute Sub-4 (macro regime) from all assets' velocities
Update _preloaded_market[snap_idx]
Update _preloaded_macro_map[snap_idx] = OBMacroFeatures(...) ← per-timestamp (P1-3)
4. _preloaded = True
6.4 Index Resolution
The _resolve_idx() method handles two modes:
Timestamp mode (timestamp_or_idx > 1e9): Direct lookup in _ts_to_idx, or binary search for nearest.
Bar index mode (integer < 1e9): Converts VBT 5-second bar indices to OB snapshot indices using bar_idx // self.bar_to_snap_ratio. Defaults assume:
- VBT bars: 5 s granularity
- OB snapshots: 30 s granularity
- Ratio:
bar_to_snap_ratio = 6bars per snapshot (configurable via constructor, P2-7)
This integer division alignment means every OB feature is used for exactly 6 consecutive VBT bars. The features are refreshed every 30 s, which matches the Binance bookDepth snapshot cadence used when building CSV data.
Per-timestamp macro (P1-3): get_macro(timestamp_or_idx) now resolves the correct snapshot index and returns _preloaded_macro_map[snap_idx]. Previously it returned only the last computed value regardless of bar index — silently wrong for any non-terminal bar in a backtest.
7. Layer 3B — LiveOBFeatureEngine (Streaming Production)
Source: prod/obf_persistence.py → class LiveOBFeatureEngine
7.1 Why a Separate Class Exists
OBFeatureEngine.preload_date() requires all data upfront. In production:
- Data arrives one snapshot at a time, every 0.5 s.
- There is no future data to preload.
- The
median_depth_refmust be bootstrapped from the first N live observations.
LiveOBFeatureEngine solves all three problems.
7.2 Rolling State
_imb_hist: Dict[str, deque(maxlen=DEPTH_LOOKBACK=60)] # 60 × 0.5 s = 30 s
_dep_hist: Dict[str, deque(maxlen=DEPTH_LOOKBACK=60)] # same
_median_ref: Dict[str, float] # calibrated once after MEDIAN_CALIB_N=60 samples
_calib_buf: Dict[str, list] # accumulates first 60 depth values
_calibrated: Dict[str, bool] # True after calibration
_market_dep: deque(maxlen=60) # aggregate depth for acceleration computation
7.3 Median Calibration
For the first 60 snapshots (MEDIAN_CALIB_N = 60), _calib_buf[asset] accumulates depth_1pct values. After 60 samples, _median_ref[asset] is set to np.median(calib_buf) and _calibrated[asset] = True.
Before calibration is complete, med_ref = max(d1pct, 1.0) — this gives depth_quality = 1.0 exactly (no normalization until the reference is stable). The 60-sample window is 30 seconds at 0.5 s polling — a reasonable warm-up period.
7.4 update() Return Structure
{
"per_asset": {
"BTCUSDT": {
"depth_1pct_usd": float, # Sub-1
"depth_quality": float,
"fill_probability": float,
"spread_proxy_bps": float,
"imbalance": float, # Sub-2
"imbalance_ma5": float,
"imbalance_persistence": float,
"depth_asymmetry": float,
"withdrawal_velocity": float,
},
"ETHUSDT": { ... },
"SOLUSDT": { ... },
},
"market": {
"median_imbalance": float, # Sub-3
"agreement_pct": float,
"depth_pressure": float,
},
"macro": {
"depth_velocity": float, # Sub-4
"cascade_count": int,
"acceleration": float,
"regime_signal": int, # -1=CALM, 0=NEUTRAL, 1=STRESS
}
}
If snap = None for an asset (book not yet initialized), per_asset[asset] = None and that asset is excluded from cross-asset computations.
8. Sub-system 1 — Per-Asset Placement Features
Numba kernels: all @njit(cache=True) for fast JIT compilation with on-disk caching.
8.1 compute_imbalance_nb(bid_notional, ask_notional)
\text{imbalance} = \frac{\sum bid\_notional - \sum ask\_notional}{\sum bid\_notional + \sum ask\_notional}
- Range:
[-1.0, +1.0] +1.0= all depth on bid side (strong buy pressure)-1.0= all depth on ask side (strong sell pressure)0.0if total depth < 1e-9 (guard against zero division)- Uses entire 5-level depth vector (not just near-mid)
This is the most fundamental microstructure signal. It captures the instantaneous notional imbalance in the visible order book.
8.2 compute_depth_1pct_nb(bid_notional, ask_notional)
\text{depth\_1pct} = bid\_notional[0] + ask\_notional[0]
Total USD liquidity within ±1% of mid price. This is the "execution liquidity" — the depth that a market order of typical DOLPHIN size would encounter immediately.
On BTC, this was typically >$100M on 2025-01-15 (verified in test TestCSVOBProviderKnownValues). Thin markets show <$20M.
8.3 compute_depth_quality_nb(depth_1pct, median_ref)
\text{depth\_quality} = \frac{depth\_1pct}{median\_ref}
Normalizes current liquidity against the historical median for that asset.
1.0= typical liquidity>1.0= above-average liquidity (better fill probability)<1.0= below-average (thinner than usual)- Returns
1.0ifmedian_ref < 1e-9(no reference yet)
8.4 compute_fill_probability_nb(depth_quality)
\text{fill\_probability} = 1 - e^{-k \cdot \max(0, depth\_quality)}, \quad k = 2.0
Exponential saturation model. Calibrated so that quality = 1.0 (typical depth) gives approximately 1 - e^{-2} ≈ 0.865 (86.5% fill probability). Rationale: at typical BTC depth, a small maker order has very high fill probability.
Key values:
| depth_quality | fill_probability |
|---|---|
| 0.0 | 0.000 (no depth) |
| 0.5 | 0.632 |
| 1.0 | 0.865 |
| 1.5 | 0.950 |
| 2.0 | 0.982 |
The OBPlacer uses fill_probability < 0.30 as a gate to force TAKER execution (thin book = don't wait for maker fill).
8.5 compute_spread_proxy_nb(bid_notional, ask_notional)
\text{ratio} = \frac{bid\_notional[0] + ask\_notional[0]}{\sum_{i=0}^{4}(bid\_notional[i] + ask\_notional[i])}
\text{spread\_proxy} = \max(0.1, (1 - ratio \times 3) \times 5)
This is a depth-gradient proxy for spread. The intuition: in tight markets, a large fraction of total depth sits near mid (high ratio). In wide-spread markets, near-mid depth is thin relative to the full book (low ratio). Returns approximate spread in bps-like units.
ratio ≈ 0.3(30% of depth within 1%) → spread_proxy ≈ 0.5 (tight)ratio ≈ 0.05(5% of depth within 1%) → spread_proxy ≈ 4.25 (wide)- Returns
10.0if total depth < 1e-9 (no data = assume wide spread)
This is used in OBPlacer to cap limit order offset: offset = min(offset, spread_proxy_bps × 0.4) — never place our limit more than 40% of the spread from mid.
8.6 OBPlacementFeatures Dataclass
@dataclass
class OBPlacementFeatures:
depth_1pct_usd: float # USD depth within ±1% of mid
depth_quality: float # vs median; 1.0 = typical
fill_probability: float # [0, 1]
spread_proxy_bps: float # approximate spread proxy
Neutral default: NEUTRAL_PLACEMENT = OBPlacementFeatures(0.0, 1.0, 0.5, 2.0)
- Zero depth (no data)
- Quality = 1.0 (don't penalize when no reference)
- Fill probability = 0.5 (uncertain)
- Spread proxy = 2.0 (moderate spread assumption)
9. Sub-system 2 — Per-Asset Signal Features
9.1 compute_depth_asymmetry_nb(bid_notional, ask_notional)
\text{asymmetry} = \frac{bid\_notional[0] + ask\_notional[0]}{bid\_notional[2] + ask\_notional[2]}
Ratio of near-mid depth (±1%) to mid-depth (±3%). High values indicate depth is concentrated near mid — a tight, liquid market. Low values indicate depth is spread out or front-running is occurring.
Uses index [2] (the 3% level) as the "far" reference because [4] (5%) can be sparse in fast markets.
9.2 compute_imbalance_persistence_nb(imbalance_history, n)
current_sign = sign(imbalance_history[-1])
persistence = fraction of last n values with same sign as current_sign
- Range:
[0, 1] 1.0= all recent snapshots had same directional pressure0.5= random / neutral (default when history too short)0.0= all recent snapshots were opposite direction
This distinguishes sustained directional pressure from momentary spikes. A high-persistence imbalance is much stronger evidence of an incoming move than a single snapshot.
Lookback: IMBALANCE_LOOKBACK = 10 snapshots (5 s in live mode at 0.5 s, or 5 min in batch mode at 30 s).
9.3 Rolling MA5
imbalance_ma5 = mean(imbalance_history[-5:]). Simple 5-snapshot moving average. Smooths noise while being fast-reacting. In live mode at 0.5 s polling, this is a 2.5 s average.
9.4 compute_withdrawal_velocity_nb(depth_history, lookback)
\text{velocity} = \frac{depth\_history[-1] - depth\_history[-1 - lookback]}{depth\_history[-1 - lookback]}
Fractional change in 1% depth over the lookback window. Negative = depth is being withdrawn (market makers pulling quotes). Positive = depth is building.
DEPTH_LOOKBACK = 60snapshots = 30 s lookback in live modeCASCADE_THRESHOLD = -0.10: velocity below -0.10 means ≥10% of near-mid depth was withdrawn in 30 s — a stress signal- Returns
0.0if fewer than 2 samples in history
9.5 OBSignalFeatures Dataclass
@dataclass
class OBSignalFeatures:
imbalance: float # [-1, +1] raw directional pressure
imbalance_ma5: float # 5-snapshot smoothed
imbalance_persistence: float # [0, 1] direction consistency
depth_asymmetry: float # near/far depth ratio
withdrawal_velocity: float # rate of depth drain (negative = withdrawal)
Neutral default: NEUTRAL_SIGNAL = OBSignalFeatures(0.0, 0.0, 0.5, 1.0, 0.0)
10. Sub-system 3 — Cross-Asset Market Signal
10.1 compute_market_agreement_nb(asset_imbalances, n_assets)
This kernel operates on a vector of per-asset imbalances (one per tracked asset, e.g. [imb_BTC, imb_ETH, imb_SOL]).
Step 1: Median imbalance
Finds the median of the n_assets values (sort + select middle element). The median is more robust than mean to outlier assets.
Step 2: Agreement fraction
median_sign = sign(median)
agreement_pct = count(assets with same sign as median) / n_assets
- Returns
(0.0, 0.5)ifabs(median) < 1e-9— undefined direction - Returns
(median, agreement_pct)otherwise
Interpretation:
median > 0, agreement_pct = 1.0— all assets bid-heavy simultaneously. Strong buy-side pressure across the market. High conviction long.median < 0, agreement_pct = 1.0— all assets ask-heavy simultaneously. Strong sell-side pressure. High conviction short.agreement_pct = 0.33(one of three) — divergence, noisy market.
10.2 Depth Pressure
depth_pressure = mean(asset_imbalances). Simpler aggregate than median — the mean captures the total directional force. Used alongside median_imbalance as a cross-check.
10.3 OBMarketFeatures Dataclass
@dataclass
class OBMarketFeatures:
median_imbalance: float # cross-asset median
agreement_pct: float # [0, 1] fraction agreeing
depth_pressure: float # mean imbalance across assets
Neutral default: NEUTRAL_MARKET = OBMarketFeatures(0.0, 0.5, 0.0)
11. Sub-system 4 — Macro Regime / Market Structure
11.1 compute_cascade_signal_nb(depth_velocities, n_assets, threshold)
Evaluates the vector of per-asset withdrawal_velocity values.
cascade_count = count(assets where velocity < threshold) # threshold = -0.10
mean_velocity = mean(depth_velocities)
# Regime classification:
if cascade_count >= max(2, n_assets // 2) AND mean_velocity < threshold:
regime = 1 # STRESS: majority of assets withdrawing depth
elif mean_velocity > abs(threshold):
regime = -1 # CALM: depth building across market
else:
regime = 0 # NEUTRAL
STRESS regime (1): Multiple assets simultaneously withdrawing order book depth at ≥10% rate. This typically precedes sharp moves. Maker orders should be avoided or made fast-timeout; urgency is high.
CALM regime (-1): Depth building across the market. Market makers are posting aggressively. Good conditions for patient maker execution.
NEUTRAL regime (0): No clear structural signal.
11.2 Acceleration
Second derivative of aggregate market depth:
recent_vel = (market_dep[-1] - market_dep[-10]) / market_dep[-10]
prior_vel = (market_dep[-10] - market_dep[-20]) / market_dep[-20]
acceleration = recent_vel - prior_vel
Requires at least 20 market depth samples in history. Negative acceleration (depth withdrawing and accelerating) is a precursor to STRESS regime.
11.3 OBMacroFeatures Dataclass
@dataclass
class OBMacroFeatures:
depth_velocity: float # mean withdrawal velocity across all assets
cascade_count: int # number of assets in withdrawal
acceleration: float # 2nd derivative of aggregate depth
regime_signal: int # -1=CALM, 0=NEUTRAL, 1=STRESS
Neutral default: NEUTRAL_MACRO = OBMacroFeatures(0.0, 0, 0.0, 0)
12. Layer 4 — OBPlacer (Execution Advice)
Source: nautilus_dolphin/nautilus_dolphin/nautilus/ob_placer.py
12.1 OBPlacementAdvice
@dataclass
class OBPlacementAdvice:
method: str # "MAKER" | "TAKER" | "PATIENT_MAKER" | "SKIP"
offset_bps: float # limit offset from mid (maker only)
timeout_s: float # max wait for maker fill
confidence: float # [0, 1] how confident in this advice
reason: str # human-readable explanation
12.2 Decision Tree
The OBPlacer.advise(placement, signal, signal_confidence, direction) method applies the following logic:
1. signal_confidence < 0.40 → SKIP("signal_too_weak")
2. signal_confidence > 0.85 → TAKER("high_confidence_taker")
(signal so strong, don't risk missing it)
3. fill_probability < 0.30 → TAKER("thin_book_taker")
(book too thin for maker, take immediately)
4. Compute effective imbalance from trade direction:
eff_imb = −imbalance if direction == SHORT else imbalance
(for SHORT: positive imbalance contradicts, negative confirms)
ob_confirmation:
eff_imb > 0.15 → +1 (confirms)
eff_imb < −0.15 → −1 (contradicts)
else → 0 (neutral)
5. Compute patience [0, 1] — inverse of confidence in maker zone:
patience = 1.0 − (confidence − 0.40) / (0.85 − 0.40)
(confidence 0.40 → patience 1.0; confidence 0.85 → patience 0.0)
OB adjustments:
ob_confirms: patience += 0.25 (can wait, OB supporting us)
ob_contradicts: patience −= 0.30 (urgent, OB fighting us)
6. Map patience to parameters:
timeout = 10 + patience × (50 − 10) # FAST_S=10, PATIENT_S=50
offset = 0 + patience × (2.0 − 0) # MIN_BPS=0, MAX_BPS=2.0
offset = min(offset, spread_proxy_bps × 0.4) # spread sanity cap
7. Choose method:
ob_confirms AND fill_prob > 0.70 → PATIENT_MAKER("ob_confirms_deep_book")
ob_contradicts → MAKER("ob_contradicts_fast_maker", timeout=10s)
else → MAKER("neutral_ob_default_maker")
8. confidence = min(1.0, fill_probability × (0.7 + 0.3 × patience))
12.3 Constants
| Constant | Value | Source |
|---|---|---|
CONFIDENCE_TAKER_THRESHOLD |
0.85 | ob_placer.py + smartplacer_constants.py |
CONFIDENCE_MAKER_THRESHOLD |
0.40 | ob_placer.py + smartplacer_constants.py |
IMBALANCE_STRONG |
0.15 | ob_placer.py |
OFFSET_MIN_BPS |
0.0 | ob_placer.py |
OFFSET_MAX_BPS |
2.0 | ob_placer.py |
TIMEOUT_FAST_S |
10 | ob_placer.py + smartplacer_constants.py |
TIMEOUT_DEFAULT_S |
25 | smartplacer_constants.py |
TIMEOUT_PATIENT_S |
50 | ob_placer.py + smartplacer_constants.py |
13. SmartPlacer Integration
Source: alpha_engine/execution/smart_placer.py + alpha_engine/execution/smartplacer_constants.py
SmartPlacer is the alpha engine's execution decision module. It is separate from OBPlacer but implements a parallel logic tree, using OB features as inputs.
13.1 ob_confirms_signal(direction, imbalance, threshold=0.15)
For SHORT: effective_imb = −imbalance
For LONG: effective_imb = +imbalance
effective_imb > threshold → CONFIRMS (OB pressure aligns with trade direction)
effective_imb < −threshold → CONTRADICTS (OB pressure opposes trade direction)
else → NEUTRAL
This function bridges OB microstructure to execution timing. Example: signal says SHORT. OB imbalance = -0.25 (ask-heavy). effective_imb = -(-0.25) = +0.25 > 0.15 → CONFIRMS. Sell pressure in the book aligns with our short, so we can be more patient.
13.2 decide_placement(confidence, ob_confirmation, spread_bps, vol_regime)
Three-gate decision tree identical in structure to OBPlacer.advise() but with additional volatility regime input:
GATE 1: confidence < 0.40 → SKIP
GATE 2: confidence > 0.85 → TAKER
MAKER ZONE:
patience = 1 − (confidence − 0.40) / 0.45
OB adjustment: CONFIRMS +0.25, CONTRADICTS −0.30
Volatility: HIGH × 0.7, LOW × 1.2 (capped at 1.0)
timeout = 10 + patience × 40
offset = 0 + patience × 2.0 (capped at spread × 0.4)
fallback = ABORT if CONTRADICTS AND confidence < 0.55
= TAKER otherwise
13.3 compute_limit_price(direction, best_bid, best_ask, offset_bps)
mid = (best_bid + best_ask) / 2
offset_abs = mid × offset_bps / 10000
SHORT (selling): price = best_ask − offset_abs (undercut competing sellers)
floor: price ≥ mid × (1 + 1e-6) (must remain on ask side)
LONG (buying): price = best_bid + offset_abs (outbid competing buyers)
ceiling: price ≤ mid × (1 − 1e-6) (must remain on bid side)
The 1e-6 buffer ensures the limit price never crosses the spread — placing a limit inside the spread would immediately fill as a taker order, defeating the purpose.
13.4 Fee Economics
| Value | |
|---|---|
| Taker fee | 0.05% per side |
| Maker fee | 0.02% per side |
| Fee delta | 0.03% per side |
| Leveraged notional | $9,375 (=$3,750 × 2.5×) |
| Savings per maker fill | $2.81 |
| Expected savings (25s, 76% fill rate) | $2.14 |
| Expected savings (50s, 82% fill rate) | $2.31 |
13.5 Adverse Move Abort (check_adverse_move)
After 5 s grace period, if price moves > 5 bps against our direction, the maker order is abandoned:
- SHORT: price went up 5+ bps → abort (we're trying to sell, but price is rising against us)
- LONG: price went down 5+ bps → abort
This prevents being caught in a rapidly deteriorating position while waiting for a maker fill that will never come at a meaningful price.
14. BacktestAdapter and Fill Simulator
Sources: alpha_engine/execution/adapters.py, alpha_engine/execution/fill_simulator.py
These are the stochastic simulation layer that must be preserved intact for ROI continuity regression testing.
14.1 BacktestAdapter.submit_order()
For MAKER orders:
fill_prob = 0.7 − (size / 100_000) − (volatility × 100)
fill_prob = clamp(fill_prob, 0.1, 0.9)
if random.random() < fill_prob:
fill at limit_price
This model captures:
- Size penalty: larger orders are harder to fill passively (fill_prob decreases linearly)
- Volatility penalty: high-vol markets have faster queue dynamics, lower passive fill probability
- Base rate of 0.70 at size=0, vol=0 (reflecting a benign market)
For LIMIT (marketable limit) and MARKET orders, fill is immediate with spread-based slippage.
14.2 simulate_maker_fill() — Walk-Forward Simulation
More sophisticated than BacktestAdapter, designed for detailed backtesting:
1. Filter OB snapshots to window [signal_time, signal_time + timeout_s]
2. Walk forward through snapshots
3. At each snapshot, check:
a. Adverse move abort (after 5s grace):
SHORT: if (current_mid − entry_mid) / entry_mid × 10000 > 5.0 → ABORT
LONG: if (entry_mid − current_mid) / entry_mid × 10000 > 5.0 → ABORT
b. Fill condition:
SHORT: filled if best_bid >= limit_price OR approx_mid >= limit_price
LONG: filled if best_ask <= limit_price OR approx_mid <= limit_price
4. If would fill: apply fill_discount = 0.80 stochastic gate
if random.random() < 0.80 → MAKER fill at limit_price
5. If timeout expires: TIMEOUT (no fill)
14.3 FILL_DISCOUNT_FACTOR = 0.80 — DO NOT CHANGE
This constant is the backbone of the stochastic regression suite. It represents queue position uncertainty: even if the market price crosses our limit, we may not be at the front of the queue. The 80% discount means:
- 20% of "would-fill" scenarios are attributed to queue displacement
- This is calibrated to produce realistic maker fill rates of ~70–85%
Changing this value would break backward compatibility with all historical backtests and invalidate the ROI regression suite. It is tested explicitly: assert DEFAULT_CONFIG.FILL_DISCOUNT_FACTOR == 0.80.
15. Prefect Flow — obf_prefect_flow
Source: prod/obf_prefect_flow.py
15.1 Flow Parameters
| Parameter | Default | Description |
|---|---|---|
warmup_s |
8.0 | Seconds to wait after WS start before first push |
poll_interval_s |
0.5 | Target cycle time (2 Hz) |
assets |
["BTCUSDT","ETHUSDT","SOLUSDT"] |
Assets to track |
15.2 Startup Sequence
1. AsyncOBThread.start() ← daemon thread with its own asyncio loop
2. AsyncOBThread.wait_ready(10s) ← block until event loop is running
3. time.sleep(warmup_s) ← allow REST snapshots to complete
4. Log init_status (which assets are synchronized)
5. OBFPersistenceService.start() ← background Parquet flush thread
6. LiveOBFeatureEngine(assets) ← initialize rolling state
7. make_hz_client() ← connect to Hazelcast
8. _hz_preflight(client) ← probe HZ with dummy put(); retry 5× @ 3s; abort if all fail
9. Enter hot loop
The 8 s warmup is critical. The Binance REST snapshot for 3 assets takes ~200–500 ms each. The WS stream starts immediately and buffers events. After 8 s:
- All 3 REST snapshots have completed in parallel
- Buffered events have been applied
- Books are synchronized
If after warmup n_ready == 0, the flow continues but logs a warning. Per-asset dark streak counters will trigger warnings at 5 consecutive dark cycles (2.5s).
HZ Preflight (P1-8): Before entering the hot loop, _hz_preflight() writes a test key (_obf_heartbeat) to Hazelcast. If this fails after 5 retries × 3s delay, the flow aborts rather than running silently with a broken HZ client.
15.3 Hot Loop (steps A–K)
A. Per-asset: get_depth_buckets_sync() [~2 ms including asyncio round-trip]
├── _none_streak[asset] counter updated
├── warn if _none_streak == _DARK_WARN_AFTER (5 cycles = 2.5s)
└── log restore if streak clears
B. Stamp local_ts
C. feature_engine.update(raw_snaps) [~1 ms, numba kernels]
D. Build consolidated HZ payload [dict operations]
E. HZ circuit breaker check:
├── if circuit OPEN: skip E→F, decrement cooldown
└── if circuit CLOSED:
E. hz_push_obf_task.submit(asset_payload) × N [fire-and-forget]
F. hz_push_obf_task.submit(obf_latest).result(timeout=1.5) [blocks]
G. on failure: _hz_consec_failures++; open circuit if ≥5
H. _write_local_cache() atomic JSON [~1 ms]
I. Build row dicts + persist.update_snapshot() per asset [O(1) each]
J. Periodic status log (every 120 pushes = every 60s)
├── check ob_thread.is_stale(30s) — log error if WS silent
├── log top-3 error keys from push_errors dict
└── log circuit breaker state if open
K. Sleep remainder of poll_interval_s [monotonic clock]
Circuit Breaker (P0-1):
_HZ_CIRCUIT_OPEN_AFTER = 5consecutive consolidated push failures → circuit opens_HZ_CIRCUIT_RESET_AFTER = 30cycles (~15s) cooldown → circuit closes, one probe attempt- While circuit is open: HZ push skipped, Parquet persistence and local cache still run
- Prevents 24-second hot-loop freezes during HZ outages
Dark Streak Counter (P0-4):
_none_streak: Dict[str, int]per asset- Warns at
_DARK_WARN_AFTER = 5consecutive None cycles (2.5s) - Logs restoration when book comes back
_all_dark_cyclescounter:log.criticalat_DARK_CRITICAL_AFTER = 120(~60s, all assets dark)
15.4 Prefect Best Practices Applied
@task(cache_policy=NO_CACHE)onhz_push_obf_task: prevents Prefect from trying to serialize the Hazelcast client object for caching. Without this, Prefect would attempt to pickle the HZ client and fail.retries=3, retry_delay_seconds=2on HZ push: tolerates transient Hazelcast blips (network reconnection, leader election).get_run_logger()called once inside flow context, not at module level.log_prints=Trueon@flow: allprint()statements inside the flow are captured into Prefect logs._push_seqand_pushed_atmetadata fields in every HZ payload for debugging.
15.5 Local Cache
ob_cache/latest_ob_features.json — atomic write via tmp → rename. Allows the DOLPHIN scanner (running in a separate process) to read the latest OBF state without accessing Hazelcast. PID-stamped temp file prevents collision if multiple flow instances start simultaneously.
16. AsyncOBThread — Async/Sync Bridge
Source: prod/obf_prefect_flow.py → class AsyncOBThread
16.1 The Problem
OBStreamService is fully async (asyncio). The Prefect flow is synchronous Python. You cannot await inside a @flow function. The solution is to run the asyncio event loop in a dedicated daemon thread.
16.2 Thread Architecture
Main thread (Prefect flow, synchronous)
│
│ AsyncOBThread(threading.Thread, daemon=True)
│ ├── self._loop = asyncio.new_event_loop()
│ ├── asyncio.set_event_loop(self._loop)
│ └── self._loop.run_until_complete(self._run_forever())
│ │
│ ├── asyncio.create_task(self.service.stream()) ← starts WS + REST
│ ├── self._ready.set() ← signals main thread
│ └── await asyncio.get_event_loop().create_future() ← parks forever
│
│ Sync API:
└── get_depth_buckets_sync(asset) → Optional[dict]
└── asyncio.run_coroutine_threadsafe(
self.service.get_depth_buckets(asset),
self._loop
).result(timeout=HZ_PUSH_INTERVAL_S) # 0.5s timeout
16.3 run_coroutine_threadsafe
This stdlib function is the correct way to call async code from a different thread. It:
- Creates a
concurrent.futures.Futurewrapping the coroutine - Schedules it on
self._loop(thread-safe) - Returns the future, which
.result(timeout=...)blocks on
The 0.5 s timeout matches poll_interval_s. If the book takes more than 0.5 s to respond (e.g., lock contention from a WS event storm), get_depth_buckets_sync() returns None gracefully. This asset is skipped for this cycle and picked up next cycle.
16.4 daemon=True and stop() (P2-5)
The thread is set as a daemon. When the Prefect flow exits, the finally block calls ob_thread.stop() for a graceful shutdown.
stop() signals the asyncio event loop via _stop_future.set_result(None) (thread-safe via call_soon_threadsafe). The _run_forever() coroutine is parked on await self._stop_future and wakes immediately. stop() then calls join(timeout=5.0).
Without stop(), tests that create AsyncOBThread would leave orphaned aiohttp.ClientSession objects and generate ResourceWarning: Unclosed client session noise.
17. OBFPersistenceService — Parquet Layer
Source: prod/obf_persistence.py → class OBFPersistenceService
17.1 Purpose
Builds a historical record of live order book features for future backtesting. Arrow/Parquet was chosen for:
- Zero-copy memory mapping when reading
- Columnar compression (Snappy) with predictable file sizes
- Hive partitioning compatible with Spark, DuckDB, Pandas, PyArrow Dataset API
- Schema enforcement at write time (no surprise type coercions)
17.2 Thread Safety Model
Main thread (hot loop, 2 Hz):
persist.update_snapshot(asset, row)
└── with self._lock: ← held for ~1 µs
self._history[asset].append(row)
Background thread (daemon, "obf-persist"):
every flush_interval_s (300 s):
persist._flush()
└── with self._lock: ← snapshot all deques
snapshots = {a: list(q) for a, q in self._history.items()}
(lock released immediately after copy)
└── _write_parquet(asset, rows, ...) per asset
└── build column arrays → pa.table → pq.write_table
└── atomic: write to .tmp → os.rename to .parquet
└── md5 checksum written alongside
The lock is held for the minimum possible time: only during the list(q) snapshot copy. Parquet file writing (the slow part) happens outside the lock. This ensures the hot loop is never blocked by disk I/O.
17.3 Rolling Buffer
deque(maxlen=HISTORY_MAXLEN=700) per asset. At poll_interval_s=0.5:
700 × 0.5 s = 350 s ≥ 300 s flush interval (+ 100 safety margin)
HISTORY_MAXLEN = int(FLUSH_INTERVAL_S / POLL_INTERVAL_S) + 100 = 700 (P0-5 fix).
Previous value was 360 (180s), which was less than the 300s flush interval — up to 120s of data was always silently evicted before each flush. This is now fixed: the deque always covers at least one full flush interval.
First flush timing (P1-9): _run() sleeps FIRST_FLUSH_S = 60 (not flush_interval_s = 300) before the first flush. This ensures the first Parquet file is written within 60s of startup, not 300s. Crashes within the first minute still lose up to 60s of data (WAL is a future improvement, P0-5).
17.4 Atomic Write Protocol
tmp_path = out_path.with_suffix(".tmp")
pq.write_table(table, tmp_path, compression="snappy")
tmp_path.rename(out_path) # atomic on POSIX filesystems
If the process crashes mid-write, only the .tmp file is orphaned — the target .parquet file is never partially written. Readers always see complete Parquet files.
17.5 MD5 Checksum
After each Parquet file is written, an MD5 checksum file {filename}.parquet.md5 is written alongside:
digest = hashlib.md5(out_path.read_bytes()).hexdigest()
(out_dir / f"{filename}.md5").write_text(digest)
Used for:
- Data integrity verification when copying files to remote storage
- Detecting bit rot in long-term archival
- Validating downloads if files are replicated across nodes
17.6 Hive Partition Structure
/mnt/ng6_data/ob_features/
└── exchange=binance/
├── symbol=BTCUSDT/
│ └── date=2026-03-22/
│ ├── part-14-30-00.parquet
│ ├── part-14-30-00.parquet.md5
│ ├── part-14-35-00.parquet
│ └── part-14-35-00.parquet.md5
├── symbol=ETHUSDT/
│ └── date=2026-03-22/
│ └── ...
└── symbol=SOLUSDT/
└── ...
Reading with PyArrow Dataset API:
import pyarrow.dataset as ds
dataset = ds.dataset("/mnt/ng6_data/ob_features", partitioning="hive")
table = dataset.to_table(filter=ds.field("symbol") == "BTCUSDT")
Note: When reading a Parquet file directly with pq.read_table(), PyArrow infers and adds the Hive partition columns (exchange, symbol, date) to the schema automatically. These columns are NOT in OB_SCHEMA — they are partition metadata. Schema validation must use a subset check: expected_names ⊆ written_names.
18. Hazelcast Integration
Map: DOLPHIN_FEATURES
18.1 Keys Published by OBF Flow
| Key | Content | Consumers |
|---|---|---|
asset_BTCUSDT_ob |
Raw OB snapshot (bid/ask notional arrays, best bid/ask, spread) | HZOBProvider, alpha engine |
asset_ETHUSDT_ob |
Same for ETH | Same |
asset_SOLUSDT_ob |
Same for SOL | Same |
obf_latest |
Consolidated all-asset features (all 4 sub-systems flattened) | Alpha engine, scanner, monitoring |
18.2 obf_latest Payload Structure
{
"timestamp": "2026-03-22T14:30:00.000000+00:00",
"local_ts": 1742654200.123,
"compute_ts": 1742654200.124,
"assets": ["BTCUSDT", "ETHUSDT", "SOLUSDT"],
"_push_seq": 42180,
"_pushed_at": "2026-03-22T14:30:00.131000+00:00",
"_n_assets_live": 3,
"_n_assets_total": 3,
"_all_live": true,
"market_median_imbalance": 0.142,
"market_agreement_pct": 0.667,
"market_depth_pressure": 0.093,
"macro_depth_velocity": -0.023,
"macro_cascade_count": 0,
"macro_acceleration": 0.002,
"macro_regime_signal": 0,
"btcusdt_depth_1pct_usd": 183421034.50,
"btcusdt_depth_quality": 1.12,
"btcusdt_fill_probability": 0.895,
"btcusdt_spread_proxy_bps": 0.82,
"btcusdt_imbalance": 0.187,
"btcusdt_imbalance_ma5": 0.143,
"btcusdt_imbalance_persistence": 0.8,
"btcusdt_depth_asymmetry": 1.43,
"btcusdt_withdrawal_velocity": -0.012,
... (same fields for ethusdt_, solusdt_)
}
18.3 asset_{ASSET}_ob Payload Structure
{
"timestamp": 1742654200.123,
"asset": "BTCUSDT",
"bid_notional": [92340000.0, 45120000.0, 23400000.0, 14200000.0, 8100000.0],
"ask_notional": [87210000.0, 41300000.0, 21900000.0, 13800000.0, 7700000.0],
"bid_depth": [0.924, 0.451, 0.234, 0.142, 0.081],
"ask_depth": [0.872, 0.413, 0.219, 0.138, 0.077],
"best_bid": 99980.50,
"best_ask": 99981.20,
"spread_bps": 0.70,
"_pushed_at": "2026-03-22T14:30:00.131000+00:00",
"_push_seq": 126540
}
19. Timing, Rate Limits, and Lag Detection
19.1 Binance Rate Limits
| Endpoint | Weight | Usage in OBF |
|---|---|---|
wss://fstream.binance.com/stream |
N/A (push) | Continuous, no rate limit |
GET /fapi/v1/depth?limit=1000 |
20 per request | Init only + reconnect only |
The WebSocket @depth@100ms stream is a server-push stream. Binance pushes diffs every 100ms without any request from the client. There is no rate limit on receiving WS messages.
The REST /depth endpoint (weight 20) is only called:
- Once per asset on startup (3 calls total for BTC/ETH/SOL)
- Once per asset after any WebSocket reconnection
With a 2,400 weight-per-minute limit on Binance Futures, 3 calls = 60 weight — trivial. Even with frequent reconnections, this will never approach the rate limit.
There is no polling of REST endpoints in the hot loop.
19.2 Three-Timestamp Architecture
Every row in the Parquet output carries three timestamps:
| Field | Meaning | Typical delta from previous |
|---|---|---|
exchange_ts |
Binance's timestamp in the WS event (when the exchange processed the update) |
~100ms before local_ts |
local_ts |
Wall clock when get_depth_buckets() completed (local Python extract time) |
~1–5ms after exchange_ts |
pushed_at |
Wall clock after HZ push and persistence buffer write | ~10–50ms after local_ts |
lag_s = pushed_at − local_ts measures the processing latency within one cycle: feature computation + HZ round-trip.
19.3 Drift Warning
Every LOG_STATUS_EVERY = 120 push cycles (60 seconds at 2 Hz), the flow logs:
OBF status | pushes=240 errors=0 lag_s=0.012 assets_live=3/3 files=6
If lag_s > poll_interval_s × 2 (i.e., > 1.0 s for default config):
WARNING: OBF LAG DRIFT DETECTED: lag_s=1.234 > 2×poll_interval=1.000
This indicates the system is falling behind — the push cycle is taking longer than the poll interval. Possible causes: HZ overload, slow disk I/O, CPU saturation.
19.4 WS-to-Local Lag
The exchange_ts vs local_ts difference reflects:
- Network latency from Binance servers (typically 5–50ms from a datacenter)
- Python asyncio scheduling overhead (~1ms)
- WS message parsing
For monitoring: if local_ts − exchange_ts > 200ms consistently, investigate network connectivity or if the event loop is blocked.
20. Parquet Schema — Complete Field Reference
Source: prod/obf_persistence.py → OB_SCHEMA
Total fields: 34. Arrow types chosen for precision vs storage tradeoff:
float64for timing and critical financial values (bid/ask prices, notional)float32for derived features (acceptable precision, halves storage)int32for integer signalsstringfor asset identifier (partitioned separately, stored once per file)
| Field | Arrow Type | Description |
|---|---|---|
| Timing | ||
exchange_ts |
float64 | Unix seconds, from Binance WS event |
local_ts |
float64 | Wall clock at OB extraction |
pushed_at |
float64 | Wall clock after HZ push |
lag_s |
float32 | pushed_at − local_ts |
| Identity | ||
asset |
string | e.g. "BTCUSDT" |
| Top-of-Book | ||
best_bid |
float64 | Highest bid price at extraction time |
best_ask |
float64 | Lowest ask price at extraction time |
spread_bps |
float32 | (best_ask − best_bid) / mid × 10,000 |
| Depth Vectors | ||
bid_notional_0 |
float64 | USD bids within 0–1% of mid |
bid_notional_1 |
float64 | USD bids within 1–2% of mid |
bid_notional_2 |
float64 | USD bids within 2–3% of mid |
bid_notional_3 |
float64 | USD bids within 3–4% of mid |
bid_notional_4 |
float64 | USD bids within 4–5% of mid |
ask_notional_0 |
float64 | USD asks within 0–1% of mid |
ask_notional_1 |
float64 | USD asks within 1–2% of mid |
ask_notional_2 |
float64 | USD asks within 2–3% of mid |
ask_notional_3 |
float64 | USD asks within 3–4% of mid |
ask_notional_4 |
float64 | USD asks within 4–5% of mid |
| Sub-system 1: Placement | ||
depth_1pct_usd |
float64 | bid_notional_0 + ask_notional_0 |
depth_quality |
float32 | depth_1pct_usd / median_ref |
fill_probability |
float32 | 1 − exp(−2 × depth_quality) |
spread_proxy_bps |
float32 | Derived spread indicator |
| Sub-system 2: Signal | ||
imbalance |
float32 | (bid − ask) / (bid + ask) across all 5 levels |
imbalance_ma5 |
float32 | 5-snapshot smoothed imbalance |
imbalance_persistence |
float32 | Fraction of last 10 with same sign |
depth_asymmetry |
float32 | (near_bid+ask) / (mid_bid+ask) |
withdrawal_velocity |
float32 | Rate of depth drain over 30 s |
| Sub-system 3: Market | ||
median_imbalance |
float32 | Cross-asset median imbalance |
agreement_pct |
float32 | Fraction of assets agreeing with median |
depth_pressure |
float32 | Mean cross-asset imbalance |
| Sub-system 4: Macro | ||
depth_velocity |
float32 | Mean per-asset withdrawal velocity |
cascade_count |
int32 | Assets with velocity < −0.10 |
acceleration |
float32 | 2nd derivative of aggregate depth |
regime_signal |
int32 | −1=CALM, 0=NEUTRAL, 1=STRESS |
21. Neutral Defaults and Graceful Degradation
All four sub-systems have defined neutral defaults, returned when data is unavailable:
NEUTRAL_PLACEMENT = OBPlacementFeatures(
depth_1pct_usd=0.0, # no depth data
depth_quality=1.0, # don't penalize — assume normal
fill_probability=0.5, # uncertain
spread_proxy_bps=2.0, # moderate spread assumption
)
NEUTRAL_SIGNAL = OBSignalFeatures(
imbalance=0.0, # no directional bias
imbalance_ma5=0.0,
imbalance_persistence=0.5,
depth_asymmetry=1.0,
withdrawal_velocity=0.0,
)
NEUTRAL_MARKET = OBMarketFeatures(
median_imbalance=0.0,
agreement_pct=0.5, # 50% = uncertain, not 0%
depth_pressure=0.0,
)
NEUTRAL_MACRO = OBMacroFeatures(
depth_velocity=0.0,
cascade_count=0,
acceleration=0.0,
regime_signal=0, # NEUTRAL (not STRESS)
)
The choice of agreement_pct=0.5 for neutral (not 0.0 or 1.0) is deliberate: it represents maximum uncertainty, not confirmed disagreement or agreement.
The OBFeatureEngine returns these neutrals when:
_preloaded = False(preload not yet run)- Requested
assetnot in preloaded data - Requested snapshot index out of range
The LiveOBFeatureEngine returns per_asset[asset] = None when the book for that asset is not yet initialized.
22. Import Architecture and the Shim Loader
22.1 The Problem
nautilus_dolphin/nautilus_dolphin/nautilus/__init__.py contains:
import nautilus_trader # ← raises RuntimeError if not installed
The modules ob_provider.py, ob_features.py, ob_placer.py use relative imports:
from .ob_provider import OBProvider, OBSnapshot
Relative imports require a proper Python package context (__package__ must be set). Simply adding the nautilus/ directory to sys.path breaks relative imports.
22.2 The Shim Solution
tests/_naut_ob_shim.py creates a fake package namespace _naut_ob using importlib.util:
import types, sys, importlib.util
from pathlib import Path
_naut_dir = Path(__file__).parent.parent / "nautilus_dolphin" / "nautilus_dolphin" / "nautilus"
# Create fake package
_p = types.ModuleType("_naut_ob")
_p.__path__ = [str(_naut_dir)]
_p.__package__ = "_naut_ob"
sys.modules["_naut_ob"] = _p
# Load each module as _naut_ob.{name}
def _load(name):
fn = f"_naut_ob.{name}"
sp = importlib.util.spec_from_file_location(fn, _naut_dir / f"{name}.py")
m = importlib.util.module_from_spec(sp)
m.__package__ = "_naut_ob" # ← makes relative imports work
sys.modules[fn] = m
sp.loader.exec_module(m)
return m
With m.__package__ = "_naut_ob", when ob_features.py executes from .ob_provider import OBSnapshot, Python resolves it as from _naut_ob.ob_provider import OBSnapshot — which exists in sys.modules because _load("ob_provider") was called first.
prod/obf_persistence.py contains the same shim logic inline in LiveOBFeatureEngine.update() as a fallback, allowing the production flow to work both with and without nautilus_trader installed.
23. Test Architecture
23.1 Unit Tests — tests/test_obf_unit.py (120 tests)
| Class | Tests | What it covers |
|---|---|---|
TestImbalanceKernel |
7 | compute_imbalance_nb: zero case, full bias, partial, symmetric |
TestDepthKernels |
9 | depth_1pct, depth_quality, fill_probability, spread_proxy, depth_asymmetry |
TestRollingKernels |
10 | persistence, withdrawal_velocity, short history edge cases |
TestMockOBProvider |
8 | bias mechanics, depth_scale, per-asset overrides, timestamps |
TestCSVOBProviderKnownValues |
6 | Anchored against real 2025-01-15 BTC data |
TestOBFeatureEngine |
11 | preload_date, all 4 sub-systems, neutral returns |
TestLiveOBFeatureEngine |
10 | incremental update, calibration, market/macro, None snap handling |
TestOBPlacer |
10 | all decision branches, OB confirmation, SKIP/TAKER/MAKER/PATIENT_MAKER |
TestBacktestAdapterStochastic |
6 | fill_prob formula, size penalty, FILL_DISCOUNT_FACTOR invariant |
TestSmartPlacerConstants |
4 | frozen dataclass, threshold ordering, timeout ordering |
TestSmartPlacerDecisions |
8 | decide_placement, compute_limit_price, ob_confirms_signal |
TestOBFFlowImport |
6 | AsyncOBThread, OBFPersistenceService, constants |
TestHZOBProvider (new) |
7 | Deserialise HZ payload, missing key→None, malformed JSON, dtype, discovery |
TestOBStreamServiceStaleDetection (new) |
4 | is_stale(): cold/recent/old/custom threshold |
TestOBStreamServiceCrossedBook (new) |
4 | Crossed book→None, locked book→None, valid book→dict, uninit→None |
TestOBStreamServiceBufferReplay (new) |
3 | Buffer drain: stale events discarded, zero-qty removal, level update |
Critical invariant test:
def test_fill_discount_is_0_80(self):
self.assertAlmostEqual(DEFAULT_CONFIG.FILL_DISCOUNT_FACTOR, 0.80)
Known-value anchor (from 2025-01-15 CSV data):
def test_btc_depth_1pct_above_100m(self):
# BTC 1% depth should be > $100M on a normal trading day
snap = self.provider.get_snapshot_by_index("BTCUSDT", 10)
d1pct = snap.bid_notional[0] + snap.ask_notional[0]
self.assertGreater(d1pct, 100_000_000)
23.2 E2E Tests — tests/test_obf_e2e.py (56 tests)
| Class | Tests | What it covers |
|---|---|---|
TestOBStreamServiceMockState |
8 | _apply_event, zero qty removal, best bid/ask, bucket shape |
TestFullPipeline |
7 | End-to-end: MockOBProvider → OBFeatureEngine → OBPlacer |
TestParquetPersistence |
10 | force_flush, schema, row counts, hive partition, MD5, lag_s, multi-asset |
TestPersistenceServiceLifecycle |
6 | start/stop, thread daemon, concurrent updates, stats |
TestReplayPipeline |
5 | CSVOBProvider → OBFeatureEngine with real 2025-01-15 data |
TestDualPathValidation |
5 | Live + stochastic paths run simultaneously without interference |
TestOBFPrefectIntegration |
14 | Flow constants, AsyncOBThread, warmup, HZ key format, lag detection |
Dual-path test (the most important E2E test):
def test_live_and_stochastic_coexist(self):
# Live feature engine should not affect BacktestAdapter state
live_engine = LiveOBFeatureEngine(["BTCUSDT"])
backtest_adapter = BacktestAdapter(price_data, ...)
# Run both simultaneously
for _ in range(10):
snap = {"BTCUSDT": mock_snap}
live_result = live_engine.update(snap)
backtest_adapter.set_time(t)
fills = backtest_adapter.submit_order("BTCUSDT", "LONG", 100000, 0.1, "MAKER")
# Neither should corrupt the other's state
self.assertIsNotNone(live_result)
self.assertIsNotNone(fills)
23.3 Running Tests
# Unit tests only (fast, ~23s — numba warmup on first run)
python -m unittest tests.test_obf_unit -v
# E2E tests only (slower, ~183s — includes CSV loads and Parquet writes)
python -m unittest tests.test_obf_e2e -v
# Full suite (ExtF + EsoF + OBF)
python tests/run_all_tests.py
24. Invariants That Must Never Change
These are contractual constants embedded in tests and regression suites:
| Invariant | Value | Reason |
|---|---|---|
FILL_DISCOUNT_FACTOR |
0.80 | Calibrated against historical fill rate data; changing it invalidates all backtest PnL history |
CONFIDENCE_TAKER_THRESHOLD |
0.85 | Hardcoded in OBPlacer, SmartPlacer, constants — must be identical in all three |
CONFIDENCE_MAKER_THRESHOLD |
0.40 | Same as above |
CASCADE_THRESHOLD |
−0.10 | Regime detection boundary; changing it changes which macro regimes are declared STRESS |
MEDIAN_CALIB_N |
60 | Warmup period for live depth reference; first 60 live cycles produce depth_quality=1.0 |
| OBSnapshot array index convention | [0] = near, [4] = far |
Used in every kernel and CSV parser; reversing would silently corrupt all features |
| CSV file date filter | cutoff <= file_date < reference_date |
Anti-leakage; including reference_date itself would allow future data into median computation |
25. Expected Performance Characteristics
25.1 Computational Load
| Component | Time | Notes |
|---|---|---|
get_depth_buckets_sync() per asset |
~2 ms | asyncio round-trip + sort + bin |
LiveOBFeatureEngine.update() (3 assets) |
~1 ms | Numba kernels are JIT-compiled; first call ~2s warmup |
hz_push_obf_task() (submit, not result) |
~0.5 ms | Non-blocking submit |
_write_local_cache() |
~1 ms | Small JSON file |
persist.update_snapshot() × 3 |
<0.01 ms | Deque append, lock-protected |
| Total hot loop | ~5 ms | Leaving ~495 ms of sleep per 0.5 s cycle |
OBFPersistenceService._flush() (300 assets rows × 3) |
~50 ms | Parquet write, happens in background thread |
25.2 Memory Usage
| Component | Memory |
|---|---|
| In-memory L2 book (3 assets, up to 1000 levels each) | ~6 MB |
| Rolling deques (LiveOBFeatureEngine, 3 assets, 60 samples) | ~50 KB |
| Rolling deques (OBFPersistenceService, 3 assets, 360 samples × 34 fields) | ~1.5 MB |
| OBF Parquet file (per 5-min flush, 3 assets, ~360 rows) | ~150 KB (snappy compressed) |
25.3 Parquet Storage Growth
At 2 Hz, 3 assets, 5-minute flush intervals:
Rows per file per asset: 360
Rows per day per asset: 2 × 60 × 60 × 24 = 172,800
Files per day per asset: 288 (one per 5 minutes)
Compressed file size: ~150 KB per 5-min file
Storage per asset/day: ~43 MB (uncompressed ~200 MB)
Storage all 3 assets/day: ~130 MB compressed
Storage per year: ~47 GB compressed
25.4 Known-Value Anchors (from 2025-01-15 BTC data)
| Metric | Expected value |
|---|---|
depth_1pct_usd for BTC |
> $100M |
imbalance range |
[-1.0, +1.0], typical |
fill_probability at normal depth |
~0.86–0.95 |
agreement_pct in trending market |
> 0.67 (2/3 assets agree) |
| WS event latency (local_ts − exchange_ts) | 5–50ms from datacenter |
lag_s (hot loop processing latency) |
< 20ms typically |
26. Future: Nautilus Trader Integration
The user has explicitly stated: "at some point we will bring Nautilus Trader in to handle the 'actual trade execution' for the system."
The OBF subsystem is designed to accommodate this:
26.1 What Changes with Nautilus Trader
Nautilus Trader is a high-performance trading framework with:
- Its own order book management (
nautilus_trader.model.orderbook) - Direct exchange connectivity (Binance Futures connector exists)
- Native order submission and fill management
When integrated:
OBStreamServiceandAsyncOBThreadwill be replaced by Nautilus's nativeBinanceFuturesDataClientBacktestAdapterwill be replaced by Nautilus's backtesting engineLiveAdapter(currently a stub) will be replaced by Nautilus'sBinanceFuturesExecutionClient
26.2 What Stays the Same
OBFeatureEnginekernels (numba functions are framework-agnostic)OBSnapshotdataclass (Nautilus provides raw data; we still need to convert to our format)LiveOBFeatureEngine(logic unchanged, just fed from Nautilus's data stream)OBFPersistenceService(Parquet persistence is independent)OBPlacerandSmartPlacer(decision logic unchanged)FILL_DISCOUNT_FACTOR = 0.80(preserved for regression continuity)- All Parquet data accumulated before integration (directly usable for Nautilus backtesting)
26.3 Transition Path
The cleanest transition:
- Add a
NautilusOBProvider(OBProvider)that reads from Nautilus's order book - Add a
NautilusAdapter(ExchangeAdapter)that wraps Nautilus order submission - Run both
OBStreamServiceand Nautilus in parallel for a validation period - Cross-validate:
LiveOBFeatureEngine.update()outputs should match between sources - Cut over once cross-validation passes (use
HZOBProviderto bridge the data)
The existing Parquet archive (/mnt/ng6_data/ob_features/) is directly usable as Nautilus's historical data source after format adaptation.
27. Deployment
27.1 Direct Launch (Testing / Development)
cd /mnt/dolphinng5_predict
PREFECT_API_URL=http://localhost:4200/api \
nohup python3 prod/obf_prefect_flow.py > /tmp/obf_prefect.log 2>&1 &
27.2 Prefect Deployment
# Register the deployment
prefect deploy -n obf-daemon --pool dolphin
# Or via YAML
prefect deploy --all # picks up prod/obf_deployment.yaml
prod/obf_deployment.yaml:
name: obf-daemon
entrypoint: prod/obf_prefect_flow.py:obf_prefect_flow
version: 1.0.0
tags: ["obf", "order-book", "production"]
work_pool:
name: dolphin
work_queue_name: default
parameters:
warmup_s: 8
poll_interval_s: 0.5
assets: [BTCUSDT, ETHUSDT, SOLUSDT]
infrastructure:
type: process
27.3 Adding an Asset
- Add the asset to
assetsinobf_deployment.yaml - Add a CSV file
ob_data/{ASSET}-bookDepth-{DATE}.csvfor historical replay - The Binance WS stream supports multiple assets in a single connection (comma-separated streams URL)
- No code changes required
27.4 Monitoring
Prefect UI → obf-prefect-flow → most recent run. Every 60 seconds the flow logs:
OBF status | pushes=240 errors=0 lag_s=0.012 assets_live=3/3 files=6
Critical alerts:
errors > 0and increasing: Hazelcast connectivity issueassets_live < 3: One or more books failed to initialize or lost WS connectionlag_s > 1.0s: System is overloaded or HZ is slowLAG DRIFT DETECTED: Investigate immediately
28. Known Caveats and Edge Cases
28.1 Pre-calibration Period (First 30 Seconds Live)
For the first 60 samples (30 seconds at 0.5 s polling), depth_quality = 1.0 for all assets because _median_ref is not yet calibrated. During this period:
fill_probabilitywill be1 - e^{-2} ≈ 0.865regardless of actual depth- This is acceptable — it represents maximum uncertainty, which defaults to the "normal" case
28.2 Reconnection Gaps
After a WebSocket reconnection, affected assets return None from get_depth_buckets_sync() until the REST snapshot completes (~200–500 ms). The flow skips those assets for those cycles. The _n_assets_live counter in obf_latest will reflect this.
28.3 OBFeatureEngine Macro State (Batch Mode) — FIXED
Fixed in sprint 1 (P1-3)._preloaded_macro is a single OBMacroFeatures value.
_preloaded_macro_map: Dict[int, OBMacroFeatures] now stores a separate value for each snapshot index. get_macro(timestamp_or_idx) resolves the correct bar and returns the right macro state. Backtests at any bar index now receive the macro regime as it was at that exact time, not the end-of-day value.
28.4 Bar Index Alignment (6:1 ratio) — CONFIGURABLE
_resolve_idx(asset, bar_idx) uses bar_idx // self.bar_to_snap_ratio to convert VBT 5-second bar indices to OB 30-second snapshot indices. Defaults assume:
- VBT uses exactly 5 second bars
- OB snapshots are exactly 30 seconds apart
- Timestamps start at the same UTC epoch
bar_to_snap_ratio is now a constructor parameter (P2-7 fix). Default remains 6. Pass a different value if bar granularity changes.
28.5 compute_market_agreement_nb with Uniform Zero Imbalance
If all(imbalance == 0.0) (e.g., all assets are perfectly balanced), median == 0.0 and the kernel returns (0.0, 0.5) — neutral agreement. This is the spec-defined behavior. Tests verify this via MockOBProvider(imbalance_bias=0.0).
28.6 Hive Partition Columns in PyArrow Schema
When reading a Hive-partitioned Parquet directory with pyarrow.dataset or pq.read_table(), PyArrow adds exchange, symbol, and date columns to the schema. These are NOT in OB_SCHEMA. Code that compares schema names must use subset checking, not equality.
28.7 CSV File Date Boundary
CSVOBProvider loads files where cutoff_start <= file_date < reference_date. To use data from date 2025-01-15, call preload_date("2025-01-16", ...). This is intentional (anti-leakage) but non-obvious. Tests document this with preload_date("2025-01-16", ["BTCUSDT"]) to access 2025-01-15 data.
29. Reliability Mechanisms (Sprint 1 Hardening)
All items below were identified in the P0/P1/P2 audit and implemented in sprint 1 (2026-03-22).
| Mechanism | Trigger | Action | Where |
|---|---|---|---|
| HZ Circuit Breaker | 5 consecutive consolidated push failures | Skip all HZ for 30 cycles (~15s), then probe again | obf_prefect_flow.py |
| HZ Preflight | Startup before hot loop | Write test key, retry 5×@3s; abort flow if all fail | obf_prefect_flow.py |
| WS Stall Watchdog | last_event_ts not updated for >30s |
log.error every status cycle; is_stale() exposed for callers |
ob_stream_service.py |
| Crossed Book Guard | best_bid >= best_ask |
Return None from get_depth_buckets(); no features computed |
ob_stream_service.py |
| Dark Streak Counter | Asset returns None for N cycles |
Warn at 5 (2.5s); log restore; critical log at 120 all-dark (60s) | obf_prefect_flow.py |
| Exponential Backoff | WS reconnection failures | 3→6→12→…→120s, ±1s jitter; resets after 60s stable | ob_stream_service.py |
| Fire-and-forget pushes | Per-asset HZ pushes | submit() without .result(); only consolidated push blocks |
obf_prefect_flow.py |
| Per-key error tracking | HZ push failures | push_errors: Dict[str, int]; top-3 logged |
obf_prefect_flow.py |
| First flush at 60s | Startup | First Parquet file written 60s after start (not 5min) | obf_persistence.py |
| Buffer covers flush window | Continuous | HISTORY_MAXLEN=700 ≥ FLUSH_INTERVAL/POLL + margin |
obf_persistence.py |
| Microsecond filename | Double flush within 1s | %H-%M-%S-%f prevents collision |
obf_persistence.py |
| Partition cleanup | Each flush | Prune date= dirs older than MAX_FILE_AGE_DAYS=7 |
obf_persistence.py |
| Shared aiohttp session | REST fetches | Single session for stream() lifetime; no per-call SSL handshake |
ob_stream_service.py |
| Per-timestamp macro | Backtesting | _preloaded_macro_map[snap_idx] — not a single end-of-day value |
ob_features.py |
| Dynamic HZ asset discovery | HZOBProvider.get_assets() |
Key-set scan for asset_*_ob; no hardcoded list |
hz_ob_provider.py |
| AsyncOBThread.stop() | Flow shutdown/test teardown | Signal _stop_future; thread is joinable |
obf_prefect_flow.py |
This document was generated from exhaustive study of all OBF-related source code and updated after sprint 1 hardening. Sources: external_factors/ob_stream_service.py, nautilus_dolphin/nautilus_dolphin/nautilus/ob_provider.py, nautilus_dolphin/nautilus_dolphin/nautilus/ob_features.py, nautilus_dolphin/nautilus_dolphin/nautilus/ob_placer.py, nautilus_dolphin/nautilus_dolphin/nautilus/hz_ob_provider.py, alpha_engine/execution/smart_placer.py, alpha_engine/execution/smartplacer_constants.py, alpha_engine/execution/adapters.py, alpha_engine/execution/fill_simulator.py, prod/obf_persistence.py, prod/obf_prefect_flow.py, prod/obf_deployment.yaml, tests/test_obf_unit.py, tests/test_obf_e2e.py, tests/_naut_ob_shim.py, scripts/verify_parquet_archive.py, ob_cache/SCHEMA.md.