Files
DOLPHIN/prod/docs/OBF_SUBSYSTEM.md
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

79 KiB
Executable File
Raw Permalink Blame History

Order Book Feature (OBF) Subsystem — Complete Technical Reference

Status: Production-ready under Prefect — Sprint 1 hardening complete Last updated: 2026-03-22 (sprint 1 complete — all P0/P1/P2 reliability gaps closed) Test coverage: 120 unit tests + 56 E2E tests — all passing (18 new tests added in sprint 1) Data sources studied: All files in external_factors/, nautilus_dolphin/nautilus_dolphin/nautilus/, alpha_engine/execution/, prod/


Table of Contents

  1. Why This Subsystem Exists
  2. System Architecture Overview
  3. Data Flow — End to End
  4. Layer 1 — OBStreamService (Live L2 Book)
  5. Layer 2 — OBSnapshot and OBProvider Hierarchy
  6. Layer 3 — OBFeatureEngine (Batch Backtesting)
  7. Layer 3B — LiveOBFeatureEngine (Streaming Production)
  8. Sub-system 1 — Per-Asset Placement Features
  9. Sub-system 2 — Per-Asset Signal Features
  10. Sub-system 3 — Cross-Asset Market Signal
  11. Sub-system 4 — Macro Regime / Market Structure
  12. Layer 4 — OBPlacer (Execution Advice)
  13. SmartPlacer Integration
  14. BacktestAdapter and Fill Simulator
  15. Prefect Flow — obf_prefect_flow
  16. AsyncOBThread — Async/Sync Bridge
  17. OBFPersistenceService — Parquet Layer
  18. Hazelcast Integration
  19. Timing, Rate Limits, and Lag Detection
  20. Parquet Schema — Complete Field Reference
  21. Neutral Defaults and Graceful Degradation
  22. Import Architecture and the Shim Loader
  23. Test Architecture
  24. Invariants That Must Never Change
  25. Expected Performance Characteristics
  26. Future: Nautilus Trader Integration
  27. Deployment
  28. Known Caveats and Edge Cases

1. Why This Subsystem Exists

The alpha engine makes directional predictions on crypto futures. Those predictions need to be executed. Execution is not free:

Method Binance Futures fee Notes
Taker (market order) 0.05% per side Immediate fill, crosses spread
Maker (limit order) 0.02% per side Passive fill, may not fill

On a $9,375 leveraged position, the fee difference per side is 0.03% × $9,375 = $2.81. Over hundreds of trades per week this is material. But a maker order that never fills is an opportunity cost equal to the entire signal's expected value.

The OBF subsystem answers two questions per trade signal:

  1. Is the book liquid enough to get a maker fill? (Sub-systems 1 and 2)
  2. Is the order book confirming or contradicting the signal direction? (Sub-systems 2, 3, and 4)

These answers feed SmartPlacer, which decides: TAKER / MAKER / PATIENT_MAKER / SKIP.

Additionally, the raw order book data is persisted to Parquet for:

  • Backtesting future strategies against real L2 microstructure
  • Calibrating fill probability models
  • Detecting market structure regimes in strategy development

2. System Architecture Overview

Binance Futures WebSocket
  wss://fstream.binance.com/stream
  streams: btcusdt@depth@100ms + ethusdt@depth@100ms + solusdt@depth@100ms
         │
         ▼
┌────────────────────────────────────┐
│         OBStreamService            │  external_factors/ob_stream_service.py
│  In-memory L2 book (price→qty)     │
│  REST snapshot on init/reconnect   │
│  asyncio event loop                │
└────────────────┬───────────────────┘
                 │  get_depth_buckets() → dict with 5-bucket notional arrays
                 ▼
┌────────────────────────────────────┐
│         AsyncOBThread              │  prod/obf_prefect_flow.py
│  daemon threading.Thread           │
│  Bridges async → sync for Prefect  │
│  get_depth_buckets_sync() → dict   │
└────────────────┬───────────────────┘
                 │  raw_snaps dict per asset
                 ▼
┌────────────────────────────────────┐
│       LiveOBFeatureEngine          │  prod/obf_persistence.py
│  Incremental rolling computation   │
│  4 sub-systems per asset           │
│  Sub-3: cross-asset market signal  │
│  Sub-4: macro regime               │
└─────────┬──────────────────────────┘
          │                        │
          ▼                        ▼
  ┌────────────────────┐   ┌─────────────────────┐
  │  Hazelcast         │   │  OBFPersistenceService│
  │  DOLPHIN_FEATURES  │   │  flush 300s          │
  │  obf_latest        │   │  first flush 60s     │
  │  asset_X_ob        │   │  /mnt/ng6_data/      │
  │  circuit breaker   │   │  ob_features/        │
  │  preflight probe   │   │  cleanup >7d         │
  └────────────────────┘   └─────────────────────┘
          │
          ▼
  ┌────────────────────┐
  │  HZOBProvider      │  nautilus_dolphin/.../hz_ob_provider.py
  │  lazy connect      │  discovers assets from HZ key set
  │  OBPlacer          │  nautilus_dolphin/.../ob_placer.py
  │  SmartPlacer       │  alpha_engine/execution/smart_placer.py
  └────────────────────┘
          │
          ▼
  PlacementDecision: MAKER / TAKER / PATIENT_MAKER / SKIP

For backtesting, the path is different:

ob_data/{ASSET}-bookDepth-{DATE}.csv
         │
         ▼
  CSVOBProvider → OBFeatureEngine.preload_date()
         │  (batch, pre-computes all features for one date)
         ▼
  engine.get_placement(asset, bar_idx)
  engine.get_signal(asset, bar_idx)
  engine.get_market(bar_idx)
  engine.get_macro()

3. Data Flow — End to End

3.1 Production (live) path — every 0.5 s

t=0.000  WS event received by OBStreamService (100ms cadence from Binance)
t=0.000  _apply_event() updates in-memory bid/ask dicts
t=0.500  Prefect flow wakes on poll_interval_s=0.5
t=0.500  get_depth_buckets_sync(asset) called for each asset
         └── asyncio.run_coroutine_threadsafe(..., timeout=0.5s)
             └── get_depth_buckets() reads sorted bids/asks, bins into 5 pct buckets
t=0.500  local_ts = time.time() stamped
t=0.500  LiveOBFeatureEngine.update(raw_snaps) called
         ├── Sub-1: depth_1pct_usd, depth_quality, fill_probability, spread_proxy_bps
         ├── Sub-2: imbalance, imbalance_ma5, imbalance_persistence, depth_asymmetry, withdrawal_velocity
         ├── Sub-3: median_imbalance, agreement_pct, depth_pressure
         └── Sub-4: depth_velocity, cascade_count, acceleration, regime_signal
t=0.501  circuit breaker check: if HZ open, skip HZ entirely this cycle
t=0.501  hz_push_obf_task.submit(client, "asset_BTCUSDT_ob", ...) × 3 [fire-and-forget]
t=0.502  hz_push_obf_task.submit(client, "obf_latest", ...).result(timeout=1.5) [blocks]
t=0.502  _write_local_cache(consolidated) — atomic JSON to ob_cache/latest_ob_features.json
t=0.502  pushed_at = time.time()
t=0.502  lag_s = pushed_at  local_ts  (typically < 10 ms on same machine)
t=0.502  persist.update_snapshot(asset, row) for each asset  [O(1), lock-protected]
t=0.500  (first flush after 60s, then every 300s)  OBFPersistenceService._flush() → Parquet
t=0.500  after flush: _cleanup_old_partitions() prunes dirs older than MAX_FILE_AGE_DAYS=7

3.2 Backtesting path

CSVOBProvider._load_asset(asset)
  └── reads {ASSET}-bookDepth-{DATE}.csv
  └── filters to files where: cutoff_start <= file_date < reference_date
  └── pivots 10 rows/snapshot → arrays of shape [N, 5]
  └── caches in self._cache["{asset}_{reference_date}"]

OBFeatureEngine.preload_date("2025-01-16", ["BTCUSDT"])
  └── iterates all N snapshots in O(N) pass
  └── builds rolling imb_hist and dep_hist arrays incrementally
  └── computes all numba kernels per snapshot
  └── stores in _preloaded_placement[asset][snap_idx], _preloaded_signal[asset][snap_idx]
  └── computes cross-asset features at each snap_idx
  └── updates _preloaded_macro at each snap_idx (uses latest)
  └── _preloaded = True

During backtest bar loop:
  engine.get_placement("BTCUSDT", bar_idx)
    └── _resolve_idx(): if bar_idx < 1e9 → snap_idx = bar_idx // 6
        (VBT 5s bars : OB 30s snapshots = 6:1)
    └── returns _preloaded_placement["BTCUSDT"][snap_idx]

4. Layer 1 — OBStreamService (Live L2 Book)

Source: external_factors/ob_stream_service.py

4.1 Initialization Protocol (Binance Diff Book)

Binance Futures requires a specific protocol to build a synchronized L2 book from diff streams. The service implements this exactly:

  1. Start buffering WS events immediately before the REST snapshot is ready.
  2. Fetch REST snapshot via GET /fapi/v1/depth?symbol={ASSET}&limit=1000.
    • Weight cost: 20 per request. Used only on init and reconnect. Not rate-limited in steady state.
    • Returns lastUpdateId (the sequence number of the snapshot).
  3. Replay buffered events, discarding any with event['u'] <= last_update_id (already reflected in snapshot).
  4. Mark initialized[asset] = True.

This protocol ensures the local book is exactly synchronized with Binance's server-side book at the time of snapshot. Any events with u > lastUpdateId that arrived while the REST call was in flight are buffered and applied in order.

4.2 Update Application

Every WebSocket diff event carries:

  • 'b' — list of [price_str, qty_str] bid updates
  • 'a' — list of [price_str, qty_str] ask updates
  • 'u' — final update ID

For each price/qty pair:

  • qty == 0.0delete that price level from the dict
  • qty > 0.0upsert that price level

This is the standard Binance L2 maintenance algorithm. The result is a bids: Dict[float, float] and asks: Dict[float, float] that are exact replicas of the exchange's book state.

4.3 Reconnection Handling

On websockets.exceptions.ConnectionClosed:

  1. All assets reset to initialized[a] = False.
  2. REST snapshots refetched concurrently for all assets using the shared session.
  3. WS reconnection attempted after reconnect_delay seconds (exponential backoff).

Exponential backoff (P1-2):

  • reconnect_delay starts at 3s, doubles each failure: 3→6→12→24→48→96→120→120…
  • Capped at _RECONNECT_MAX_S = 120s with ±1s random jitter per attempt
  • Resets to 3s if the connection stayed live for ≥ _RECONNECT_STABLE_S = 60s
  • Prevents REST request storms under sustained exchange instability (2400 weight/min limit)

WS stall detection (P0-2):

  • self.last_event_ts: float stamped on every received WS message
  • is_stale(threshold_s=30.0) -> bool returns True if no events for >30s
  • Hot loop checks ob_thread.is_stale() every LOG_STATUS_EVERY=120 cycles and logs an error

During the reconnection window, get_depth_buckets() returns None (book not initialized). The flow handles this gracefully (skips those assets that cycle).

4.4 Depth Bucket Extraction

get_depth_buckets(asset) converts the raw price→qty dicts into the 5-bucket format needed downstream.

Crossed book guard (P0-3): Before computing, validates best_bid < best_ask. If the book is crossed (best_bid >= best_ask) — which can occur during reconnection, partial snapshot application, or exchange instability — the method returns None instead of computing corrupted features. Corrupted features would otherwise propagate to HZ and the Parquet archive.

For each bid level at price p, quantity q:
  dist_pct = (mid - p) / mid × 100
  idx = int(dist_pct)          ← truncated, so idx 0 = [0%, 1%), idx 1 = [1%, 2%), etc.
  if idx < max_depth_pct (=5):
      bid_notional[idx] += p × q    ← USD value
      bid_depth[idx]    += q        ← quantity in asset units
  else: break                  ← bids sorted descending, safe to break

Same logic for asks (sorted ascending, dist_pct = (p - mid) / mid × 100).

Returns:

{
  "timestamp":    time.time(),       # local extract wall clock
  "asset":        "BTCUSDT",
  "bid_notional": np.array([...]),   # [5] float64, USD at 0-1%, 1-2%, 2-3%, 3-4%, 4-5% below mid
  "ask_notional": np.array([...]),   # [5] float64, USD at 0-1%, 1-2%, ... above mid
  "bid_depth":    np.array([...]),   # [5] float64, asset qty at each band
  "ask_depth":    np.array([...]),   # [5] float64, asset qty at each band
  "best_bid":     float,             # highest bid price
  "best_ask":     float,             # lowest ask price
  "spread_bps":   float,             # (ask - bid) / mid × 10,000
}

Important: Index 0 corresponds to the nearest-to-mid band (within 1% of mid). Index 4 is the farthest (45% from mid). This is consistent throughout all downstream consumers.

4.5 Thread Safety

The service uses per-asset asyncio.Lock objects. All reads and writes to bids[asset] and asks[asset] are protected. This is important because get_depth_buckets() sorts the full dict while WS events continue arriving.


5. Layer 2 — OBSnapshot and OBProvider Hierarchy

Source: nautilus_dolphin/nautilus_dolphin/nautilus/ob_provider.py

5.1 OBSnapshot Dataclass

@dataclass
class OBSnapshot:
    timestamp:    float         # Unix seconds
    asset:        str           # e.g. "BTCUSDT"
    bid_notional: np.ndarray    # [5] float64 — USD at -1% to -5% from mid (cumulative per band)
    ask_notional: np.ndarray    # [5] float64 — USD at +1% to +5% from mid
    bid_depth:    np.ndarray    # [5] float64 — asset qty at each band
    ask_depth:    np.ndarray    # [5] float64 — asset qty at each band

All notional arrays use the same indexing convention as OBStreamService:

  • [0] = within 1% of mid (the "near" band, most liquid)
  • [4] = 45% from mid (the "far" band, less liquid)

Notional values represent the total USD value within that percentage band, not cumulative from mid. For example, bid_notional[2] is the total USD of bids between 2% and 3% below mid.

5.2 OBProvider Abstract Base Class

class OBProvider(ABC):
    @abstractmethod
    def get_snapshot(self, asset: str, timestamp: float) -> Optional[OBSnapshot]: ...
    @abstractmethod
    def get_assets(self) -> List[str]: ...
    @abstractmethod
    def get_all_timestamps(self, asset: str) -> np.ndarray: ...

Three concrete implementations:

Class Source Use case
CSVOBProvider ob_data/{ASSET}-bookDepth-{DATE}.csv Historical replay, backtesting
MockOBProvider Synthetic, fully configurable Unit testing
HZOBProvider Hazelcast asset_{ASSET}_ob key Live alpha engine reads

5.3 CSVOBProvider

CSV format: One row per price level per timestamp:

timestamp,           percentage, depth,        notional
2025-01-15 00:00:05, -5,         4697.736,     444223397.14
2025-01-15 00:00:05, -4,         2103.451,     199827634.85
...
2025-01-15 00:00:05, +1,          891.234,      84667183.21
...

10 rows per snapshot (5 bid levels: -1 to -5, 5 ask levels: +1 to +5). The percentage sign follows the convention: negative = bid side (below mid), positive = ask side.

Index mapping:

  • percentage = -1idx = 0 (near bid)
  • percentage = -5idx = 4 (far bid)
  • percentage = +1idx = 0 (near ask)
  • percentage = +5idx = 4 (far ask)

Date filtering (anti-leakage): When reference_date is set, _load_asset() only loads CSV files where:

reference_date  7 days <= file_date < reference_date

Files from reference_date itself are excluded. This prevents the feature engine from seeing future data when computing the median depth reference used in normalization. The caller must pass "2025-01-16" to use data from 2025-01-15.

Lookup: Binary search (bisect.bisect_left) on the sorted timestamp array. O(log N) per query. With 2880 snapshots/day (30s intervals) this is effectively instant.

Tolerance: tolerance_s=30.0 — if the nearest snapshot is more than 30 s away from the requested timestamp, None is returned.

5.4 MockOBProvider

Generates synthetic snapshots with configurable imbalance_bias and depth_scale.

The mathematical relationship between imbalance_bias and the computed imbalance is exact:

bid_mult = 1 + bias
ask_mult = 1  bias

total_bid = base × Σ(level_weights) × bid_mult = base × 15 × (1 + bias)
total_ask = base × Σ(level_weights) × ask_mult = base × 15 × (1  bias)

imbalance = (bid  ask) / (bid + ask)
           = (15(1+b)  15(1b)) / (15(1+b) + 15(1b))
           = 2b × 15 / (2 × 15)
           = b

Therefore compute_imbalance_nb(bid_notional, ask_notional) == imbalance_bias exactly. This makes tests deterministic.

level_weights = [1.0, 2.0, 3.0, 4.0, 5.0] — depth increases with distance from mid, reflecting realistic order book shape where more passive orders sit further out.

imbalance_biases: Dict[str, float] allows per-asset overrides, so you can test cross-asset disagreement scenarios.

num_snapshots=2880 with 30 s spacing models a full trading day (2880 × 30 = 86400 s).

5.5 HZOBProvider

Reads live data from Hazelcast key asset_{ASSET}_ob. Used by the production alpha engine to construct OBSnapshot objects for the OBFeatureEngine. Not used in the OBF flow itself — the flow pushes to HZ; consumers pull via this provider.

Lazy connection (P1-4): The HZ client is not created at construction time. _ensure_connected() is called on first use (first get_snapshot() or get_assets() call). This prevents failures during unit tests and avoids connection race conditions at startup.

Dynamic asset discovery (P1-4): get_assets() scans HZ key set for keys matching asset_*_ob pattern. This reflects the actual assets being pushed by the OBF flow, eliminating the prior hardcoded list that included BNBUSDT (not tracked). Alternatively, an explicit assets: List[str] list can be passed to the constructor for deterministic, fast operation.

Error handling: Separate json.JSONDecodeError catch logs the specific field; all other exceptions log and return None. No exception propagates to the caller.


6. Layer 3 — OBFeatureEngine (Batch Backtesting)

Source: nautilus_dolphin/nautilus_dolphin/nautilus/ob_features.py

6.1 Design Philosophy

The batch OBFeatureEngine is designed for fast backtesting: it pre-computes all features for an entire date in a single O(N) pass, then serves results in O(1) via dict lookup. This is the right design for vectorbt integration where all bars are processed up front.

In production, LiveOBFeatureEngine (Section 7) handles incremental streaming.

6.2 State

_imbalance_history: Dict[str, deque]    # per-asset rolling imbalance values
_depth_1pct_history: Dict[str, deque]   # per-asset rolling 1% depth values
_market_depth_history: deque            # aggregate depth across all assets
_median_depth_ref: Dict[str, float]     # computed in preload, used for normalization
_preloaded_placement: Dict[str, Dict[int, OBPlacementFeatures]]
_preloaded_signal:   Dict[str, Dict[int, OBSignalFeatures]]
_preloaded_market:   Dict[int, OBMarketFeatures]
_preloaded_macro_map: Dict[int, OBMacroFeatures]  # per snap_idx (P1-3 fix)
_ts_to_idx:           Dict[str, Dict[float, int]] # timestamp → snap index map
bar_to_snap_ratio:    int = 6                      # configurable (P2-7)

6.3 preload_date() Algorithm

1. Set provider.reference_date = date_str  (enables date filtering in CSVOBProvider)
2. For each asset:
     a. Load all N snapshots
     b. Compute depths: depths[i] = bid_notional[i][0] + ask_notional[i][0]
     c. _median_depth_ref[asset] = median(depths)  ← normalization reference
3. For snap_idx in range(N):
     For each asset:
       a. Compute raw features via numba kernels (imb, d1pct, dq, fp, sp, da)
       b. Append to imb_hist[asset], dep_hist[asset]
       c. Compute rolling features (ma5, persistence, velocity)
       d. Store in _preloaded_placement[asset][snap_idx]
       e. Store in _preloaded_signal[asset][snap_idx]
       f. Collect (imb, velocity) for cross-asset computation
     Compute Sub-3 (market agreement) from all assets' imbalances
     Compute Sub-4 (macro regime) from all assets' velocities
     Update _preloaded_market[snap_idx]
     Update _preloaded_macro_map[snap_idx] = OBMacroFeatures(...)  ← per-timestamp (P1-3)
4. _preloaded = True

6.4 Index Resolution

The _resolve_idx() method handles two modes:

Timestamp mode (timestamp_or_idx > 1e9): Direct lookup in _ts_to_idx, or binary search for nearest.

Bar index mode (integer < 1e9): Converts VBT 5-second bar indices to OB snapshot indices using bar_idx // self.bar_to_snap_ratio. Defaults assume:

  • VBT bars: 5 s granularity
  • OB snapshots: 30 s granularity
  • Ratio: bar_to_snap_ratio = 6 bars per snapshot (configurable via constructor, P2-7)

This integer division alignment means every OB feature is used for exactly 6 consecutive VBT bars. The features are refreshed every 30 s, which matches the Binance bookDepth snapshot cadence used when building CSV data.

Per-timestamp macro (P1-3): get_macro(timestamp_or_idx) now resolves the correct snapshot index and returns _preloaded_macro_map[snap_idx]. Previously it returned only the last computed value regardless of bar index — silently wrong for any non-terminal bar in a backtest.


7. Layer 3B — LiveOBFeatureEngine (Streaming Production)

Source: prod/obf_persistence.pyclass LiveOBFeatureEngine

7.1 Why a Separate Class Exists

OBFeatureEngine.preload_date() requires all data upfront. In production:

  • Data arrives one snapshot at a time, every 0.5 s.
  • There is no future data to preload.
  • The median_depth_ref must be bootstrapped from the first N live observations.

LiveOBFeatureEngine solves all three problems.

7.2 Rolling State

_imb_hist:  Dict[str, deque(maxlen=DEPTH_LOOKBACK=60)]   # 60 × 0.5 s = 30 s
_dep_hist:  Dict[str, deque(maxlen=DEPTH_LOOKBACK=60)]   # same
_median_ref:  Dict[str, float]    # calibrated once after MEDIAN_CALIB_N=60 samples
_calib_buf:   Dict[str, list]     # accumulates first 60 depth values
_calibrated:  Dict[str, bool]     # True after calibration
_market_dep:  deque(maxlen=60)    # aggregate depth for acceleration computation

7.3 Median Calibration

For the first 60 snapshots (MEDIAN_CALIB_N = 60), _calib_buf[asset] accumulates depth_1pct values. After 60 samples, _median_ref[asset] is set to np.median(calib_buf) and _calibrated[asset] = True.

Before calibration is complete, med_ref = max(d1pct, 1.0) — this gives depth_quality = 1.0 exactly (no normalization until the reference is stable). The 60-sample window is 30 seconds at 0.5 s polling — a reasonable warm-up period.

7.4 update() Return Structure

{
  "per_asset": {
    "BTCUSDT": {
      "depth_1pct_usd":        float,   # Sub-1
      "depth_quality":         float,
      "fill_probability":      float,
      "spread_proxy_bps":      float,
      "imbalance":             float,   # Sub-2
      "imbalance_ma5":         float,
      "imbalance_persistence": float,
      "depth_asymmetry":       float,
      "withdrawal_velocity":   float,
    },
    "ETHUSDT": { ... },
    "SOLUSDT": { ... },
  },
  "market": {
    "median_imbalance": float,           # Sub-3
    "agreement_pct":    float,
    "depth_pressure":   float,
  },
  "macro": {
    "depth_velocity":  float,            # Sub-4
    "cascade_count":   int,
    "acceleration":    float,
    "regime_signal":   int,              # -1=CALM, 0=NEUTRAL, 1=STRESS
  }
}

If snap = None for an asset (book not yet initialized), per_asset[asset] = None and that asset is excluded from cross-asset computations.


8. Sub-system 1 — Per-Asset Placement Features

Numba kernels: all @njit(cache=True) for fast JIT compilation with on-disk caching.

8.1 compute_imbalance_nb(bid_notional, ask_notional)

\text{imbalance} = \frac{\sum bid\_notional - \sum ask\_notional}{\sum bid\_notional + \sum ask\_notional}
  • Range: [-1.0, +1.0]
  • +1.0 = all depth on bid side (strong buy pressure)
  • -1.0 = all depth on ask side (strong sell pressure)
  • 0.0 if total depth < 1e-9 (guard against zero division)
  • Uses entire 5-level depth vector (not just near-mid)

This is the most fundamental microstructure signal. It captures the instantaneous notional imbalance in the visible order book.

8.2 compute_depth_1pct_nb(bid_notional, ask_notional)

\text{depth\_1pct} = bid\_notional[0] + ask\_notional[0]

Total USD liquidity within ±1% of mid price. This is the "execution liquidity" — the depth that a market order of typical DOLPHIN size would encounter immediately.

On BTC, this was typically >$100M on 2025-01-15 (verified in test TestCSVOBProviderKnownValues). Thin markets show <$20M.

8.3 compute_depth_quality_nb(depth_1pct, median_ref)

\text{depth\_quality} = \frac{depth\_1pct}{median\_ref}

Normalizes current liquidity against the historical median for that asset.

  • 1.0 = typical liquidity
  • >1.0 = above-average liquidity (better fill probability)
  • <1.0 = below-average (thinner than usual)
  • Returns 1.0 if median_ref < 1e-9 (no reference yet)

8.4 compute_fill_probability_nb(depth_quality)

\text{fill\_probability} = 1 - e^{-k \cdot \max(0, depth\_quality)}, \quad k = 2.0

Exponential saturation model. Calibrated so that quality = 1.0 (typical depth) gives approximately 1 - e^{-2} ≈ 0.865 (86.5% fill probability). Rationale: at typical BTC depth, a small maker order has very high fill probability.

Key values:

depth_quality fill_probability
0.0 0.000 (no depth)
0.5 0.632
1.0 0.865
1.5 0.950
2.0 0.982

The OBPlacer uses fill_probability < 0.30 as a gate to force TAKER execution (thin book = don't wait for maker fill).

8.5 compute_spread_proxy_nb(bid_notional, ask_notional)

\text{ratio} = \frac{bid\_notional[0] + ask\_notional[0]}{\sum_{i=0}^{4}(bid\_notional[i] + ask\_notional[i])} \text{spread\_proxy} = \max(0.1, (1 - ratio \times 3) \times 5)

This is a depth-gradient proxy for spread. The intuition: in tight markets, a large fraction of total depth sits near mid (high ratio). In wide-spread markets, near-mid depth is thin relative to the full book (low ratio). Returns approximate spread in bps-like units.

  • ratio ≈ 0.3 (30% of depth within 1%) → spread_proxy ≈ 0.5 (tight)
  • ratio ≈ 0.05 (5% of depth within 1%) → spread_proxy ≈ 4.25 (wide)
  • Returns 10.0 if total depth < 1e-9 (no data = assume wide spread)

This is used in OBPlacer to cap limit order offset: offset = min(offset, spread_proxy_bps × 0.4) — never place our limit more than 40% of the spread from mid.

8.6 OBPlacementFeatures Dataclass

@dataclass
class OBPlacementFeatures:
    depth_1pct_usd:   float  # USD depth within ±1% of mid
    depth_quality:    float  # vs median; 1.0 = typical
    fill_probability: float  # [0, 1]
    spread_proxy_bps: float  # approximate spread proxy

Neutral default: NEUTRAL_PLACEMENT = OBPlacementFeatures(0.0, 1.0, 0.5, 2.0)

  • Zero depth (no data)
  • Quality = 1.0 (don't penalize when no reference)
  • Fill probability = 0.5 (uncertain)
  • Spread proxy = 2.0 (moderate spread assumption)

9. Sub-system 2 — Per-Asset Signal Features

9.1 compute_depth_asymmetry_nb(bid_notional, ask_notional)

\text{asymmetry} = \frac{bid\_notional[0] + ask\_notional[0]}{bid\_notional[2] + ask\_notional[2]}

Ratio of near-mid depth (±1%) to mid-depth (±3%). High values indicate depth is concentrated near mid — a tight, liquid market. Low values indicate depth is spread out or front-running is occurring.

Uses index [2] (the 3% level) as the "far" reference because [4] (5%) can be sparse in fast markets.

9.2 compute_imbalance_persistence_nb(imbalance_history, n)

current_sign = sign(imbalance_history[-1])
persistence = fraction of last n values with same sign as current_sign
  • Range: [0, 1]
  • 1.0 = all recent snapshots had same directional pressure
  • 0.5 = random / neutral (default when history too short)
  • 0.0 = all recent snapshots were opposite direction

This distinguishes sustained directional pressure from momentary spikes. A high-persistence imbalance is much stronger evidence of an incoming move than a single snapshot.

Lookback: IMBALANCE_LOOKBACK = 10 snapshots (5 s in live mode at 0.5 s, or 5 min in batch mode at 30 s).

9.3 Rolling MA5

imbalance_ma5 = mean(imbalance_history[-5:]). Simple 5-snapshot moving average. Smooths noise while being fast-reacting. In live mode at 0.5 s polling, this is a 2.5 s average.

9.4 compute_withdrawal_velocity_nb(depth_history, lookback)

\text{velocity} = \frac{depth\_history[-1] - depth\_history[-1 - lookback]}{depth\_history[-1 - lookback]}

Fractional change in 1% depth over the lookback window. Negative = depth is being withdrawn (market makers pulling quotes). Positive = depth is building.

  • DEPTH_LOOKBACK = 60 snapshots = 30 s lookback in live mode
  • CASCADE_THRESHOLD = -0.10: velocity below -0.10 means ≥10% of near-mid depth was withdrawn in 30 s — a stress signal
  • Returns 0.0 if fewer than 2 samples in history

9.5 OBSignalFeatures Dataclass

@dataclass
class OBSignalFeatures:
    imbalance:             float  # [-1, +1] raw directional pressure
    imbalance_ma5:         float  # 5-snapshot smoothed
    imbalance_persistence: float  # [0, 1] direction consistency
    depth_asymmetry:       float  # near/far depth ratio
    withdrawal_velocity:   float  # rate of depth drain (negative = withdrawal)

Neutral default: NEUTRAL_SIGNAL = OBSignalFeatures(0.0, 0.0, 0.5, 1.0, 0.0)


10. Sub-system 3 — Cross-Asset Market Signal

10.1 compute_market_agreement_nb(asset_imbalances, n_assets)

This kernel operates on a vector of per-asset imbalances (one per tracked asset, e.g. [imb_BTC, imb_ETH, imb_SOL]).

Step 1: Median imbalance

Finds the median of the n_assets values (sort + select middle element). The median is more robust than mean to outlier assets.

Step 2: Agreement fraction

median_sign = sign(median)
agreement_pct = count(assets with same sign as median) / n_assets
  • Returns (0.0, 0.5) if abs(median) < 1e-9 — undefined direction
  • Returns (median, agreement_pct) otherwise

Interpretation:

  • median > 0, agreement_pct = 1.0 — all assets bid-heavy simultaneously. Strong buy-side pressure across the market. High conviction long.
  • median < 0, agreement_pct = 1.0 — all assets ask-heavy simultaneously. Strong sell-side pressure. High conviction short.
  • agreement_pct = 0.33 (one of three) — divergence, noisy market.

10.2 Depth Pressure

depth_pressure = mean(asset_imbalances). Simpler aggregate than median — the mean captures the total directional force. Used alongside median_imbalance as a cross-check.

10.3 OBMarketFeatures Dataclass

@dataclass
class OBMarketFeatures:
    median_imbalance: float  # cross-asset median
    agreement_pct:    float  # [0, 1] fraction agreeing
    depth_pressure:   float  # mean imbalance across assets

Neutral default: NEUTRAL_MARKET = OBMarketFeatures(0.0, 0.5, 0.0)


11. Sub-system 4 — Macro Regime / Market Structure

11.1 compute_cascade_signal_nb(depth_velocities, n_assets, threshold)

Evaluates the vector of per-asset withdrawal_velocity values.

cascade_count = count(assets where velocity < threshold)  # threshold = -0.10

mean_velocity = mean(depth_velocities)

# Regime classification:
if cascade_count >= max(2, n_assets // 2) AND mean_velocity < threshold:
    regime = 1   # STRESS: majority of assets withdrawing depth
elif mean_velocity > abs(threshold):
    regime = -1  # CALM: depth building across market
else:
    regime = 0   # NEUTRAL

STRESS regime (1): Multiple assets simultaneously withdrawing order book depth at ≥10% rate. This typically precedes sharp moves. Maker orders should be avoided or made fast-timeout; urgency is high.

CALM regime (-1): Depth building across the market. Market makers are posting aggressively. Good conditions for patient maker execution.

NEUTRAL regime (0): No clear structural signal.

11.2 Acceleration

Second derivative of aggregate market depth:

recent_vel = (market_dep[-1]  - market_dep[-10]) / market_dep[-10]
prior_vel  = (market_dep[-10] - market_dep[-20]) / market_dep[-20]
acceleration = recent_vel - prior_vel

Requires at least 20 market depth samples in history. Negative acceleration (depth withdrawing and accelerating) is a precursor to STRESS regime.

11.3 OBMacroFeatures Dataclass

@dataclass
class OBMacroFeatures:
    depth_velocity: float  # mean withdrawal velocity across all assets
    cascade_count:  int    # number of assets in withdrawal
    acceleration:   float  # 2nd derivative of aggregate depth
    regime_signal:  int    # -1=CALM, 0=NEUTRAL, 1=STRESS

Neutral default: NEUTRAL_MACRO = OBMacroFeatures(0.0, 0, 0.0, 0)


12. Layer 4 — OBPlacer (Execution Advice)

Source: nautilus_dolphin/nautilus_dolphin/nautilus/ob_placer.py

12.1 OBPlacementAdvice

@dataclass
class OBPlacementAdvice:
    method:     str    # "MAKER" | "TAKER" | "PATIENT_MAKER" | "SKIP"
    offset_bps: float  # limit offset from mid (maker only)
    timeout_s:  float  # max wait for maker fill
    confidence: float  # [0, 1] how confident in this advice
    reason:     str    # human-readable explanation

12.2 Decision Tree

The OBPlacer.advise(placement, signal, signal_confidence, direction) method applies the following logic:

1. signal_confidence < 0.40 → SKIP("signal_too_weak")

2. signal_confidence > 0.85 → TAKER("high_confidence_taker")
   (signal so strong, don't risk missing it)

3. fill_probability < 0.30 → TAKER("thin_book_taker")
   (book too thin for maker, take immediately)

4. Compute effective imbalance from trade direction:
   eff_imb = imbalance if direction == SHORT else imbalance
   (for SHORT: positive imbalance contradicts, negative confirms)

   ob_confirmation:
     eff_imb > 0.15  → +1 (confirms)
     eff_imb < 0.15 → 1 (contradicts)
     else            →  0 (neutral)

5. Compute patience [0, 1] — inverse of confidence in maker zone:
   patience = 1.0  (confidence  0.40) / (0.85  0.40)
   (confidence 0.40 → patience 1.0; confidence 0.85 → patience 0.0)

   OB adjustments:
     ob_confirms:    patience += 0.25 (can wait, OB supporting us)
     ob_contradicts: patience = 0.30 (urgent, OB fighting us)

6. Map patience to parameters:
   timeout = 10 + patience × (50  10)   # FAST_S=10, PATIENT_S=50
   offset  = 0  + patience × (2.0  0)   # MIN_BPS=0, MAX_BPS=2.0
   offset  = min(offset, spread_proxy_bps × 0.4)  # spread sanity cap

7. Choose method:
   ob_confirms AND fill_prob > 0.70 → PATIENT_MAKER("ob_confirms_deep_book")
   ob_contradicts                  → MAKER("ob_contradicts_fast_maker", timeout=10s)
   else                            → MAKER("neutral_ob_default_maker")

8. confidence = min(1.0, fill_probability × (0.7 + 0.3 × patience))

12.3 Constants

Constant Value Source
CONFIDENCE_TAKER_THRESHOLD 0.85 ob_placer.py + smartplacer_constants.py
CONFIDENCE_MAKER_THRESHOLD 0.40 ob_placer.py + smartplacer_constants.py
IMBALANCE_STRONG 0.15 ob_placer.py
OFFSET_MIN_BPS 0.0 ob_placer.py
OFFSET_MAX_BPS 2.0 ob_placer.py
TIMEOUT_FAST_S 10 ob_placer.py + smartplacer_constants.py
TIMEOUT_DEFAULT_S 25 smartplacer_constants.py
TIMEOUT_PATIENT_S 50 ob_placer.py + smartplacer_constants.py

13. SmartPlacer Integration

Source: alpha_engine/execution/smart_placer.py + alpha_engine/execution/smartplacer_constants.py

SmartPlacer is the alpha engine's execution decision module. It is separate from OBPlacer but implements a parallel logic tree, using OB features as inputs.

13.1 ob_confirms_signal(direction, imbalance, threshold=0.15)

For SHORT: effective_imb = imbalance
For LONG:  effective_imb = +imbalance

effective_imb > threshold  → CONFIRMS   (OB pressure aligns with trade direction)
effective_imb < threshold → CONTRADICTS (OB pressure opposes trade direction)
else                       → NEUTRAL

This function bridges OB microstructure to execution timing. Example: signal says SHORT. OB imbalance = -0.25 (ask-heavy). effective_imb = -(-0.25) = +0.25 > 0.15 → CONFIRMS. Sell pressure in the book aligns with our short, so we can be more patient.

13.2 decide_placement(confidence, ob_confirmation, spread_bps, vol_regime)

Three-gate decision tree identical in structure to OBPlacer.advise() but with additional volatility regime input:

GATE 1: confidence < 0.40 → SKIP
GATE 2: confidence > 0.85 → TAKER
MAKER ZONE:
  patience = 1  (confidence  0.40) / 0.45
  OB adjustment: CONFIRMS +0.25, CONTRADICTS 0.30
  Volatility:    HIGH × 0.7, LOW × 1.2 (capped at 1.0)
  timeout = 10 + patience × 40
  offset  = 0  + patience × 2.0 (capped at spread × 0.4)
  fallback = ABORT if CONTRADICTS AND confidence < 0.55
           = TAKER otherwise

13.3 compute_limit_price(direction, best_bid, best_ask, offset_bps)

mid = (best_bid + best_ask) / 2
offset_abs = mid × offset_bps / 10000

SHORT (selling): price = best_ask  offset_abs  (undercut competing sellers)
                 floor: price ≥ mid × (1 + 1e-6)  (must remain on ask side)

LONG (buying):   price = best_bid + offset_abs   (outbid competing buyers)
                 ceiling: price ≤ mid × (1  1e-6)  (must remain on bid side)

The 1e-6 buffer ensures the limit price never crosses the spread — placing a limit inside the spread would immediately fill as a taker order, defeating the purpose.

13.4 Fee Economics

Value
Taker fee 0.05% per side
Maker fee 0.02% per side
Fee delta 0.03% per side
Leveraged notional $9,375 (=$3,750 × 2.5×)
Savings per maker fill $2.81
Expected savings (25s, 76% fill rate) $2.14
Expected savings (50s, 82% fill rate) $2.31

13.5 Adverse Move Abort (check_adverse_move)

After 5 s grace period, if price moves > 5 bps against our direction, the maker order is abandoned:

  • SHORT: price went up 5+ bps → abort (we're trying to sell, but price is rising against us)
  • LONG: price went down 5+ bps → abort

This prevents being caught in a rapidly deteriorating position while waiting for a maker fill that will never come at a meaningful price.


14. BacktestAdapter and Fill Simulator

Sources: alpha_engine/execution/adapters.py, alpha_engine/execution/fill_simulator.py

These are the stochastic simulation layer that must be preserved intact for ROI continuity regression testing.

14.1 BacktestAdapter.submit_order()

For MAKER orders:

fill_prob = 0.7  (size / 100_000)  (volatility × 100)
fill_prob = clamp(fill_prob, 0.1, 0.9)
if random.random() < fill_prob:
    fill at limit_price

This model captures:

  • Size penalty: larger orders are harder to fill passively (fill_prob decreases linearly)
  • Volatility penalty: high-vol markets have faster queue dynamics, lower passive fill probability
  • Base rate of 0.70 at size=0, vol=0 (reflecting a benign market)

For LIMIT (marketable limit) and MARKET orders, fill is immediate with spread-based slippage.

14.2 simulate_maker_fill() — Walk-Forward Simulation

More sophisticated than BacktestAdapter, designed for detailed backtesting:

1. Filter OB snapshots to window [signal_time, signal_time + timeout_s]
2. Walk forward through snapshots
3. At each snapshot, check:
   a. Adverse move abort (after 5s grace):
      SHORT: if (current_mid  entry_mid) / entry_mid × 10000 > 5.0 → ABORT
      LONG:  if (entry_mid  current_mid) / entry_mid × 10000 > 5.0 → ABORT
   b. Fill condition:
      SHORT: filled if best_bid >= limit_price OR approx_mid >= limit_price
      LONG:  filled if best_ask <= limit_price OR approx_mid <= limit_price
4. If would fill: apply fill_discount = 0.80 stochastic gate
   if random.random() < 0.80 → MAKER fill at limit_price
5. If timeout expires: TIMEOUT (no fill)

14.3 FILL_DISCOUNT_FACTOR = 0.80 — DO NOT CHANGE

This constant is the backbone of the stochastic regression suite. It represents queue position uncertainty: even if the market price crosses our limit, we may not be at the front of the queue. The 80% discount means:

  • 20% of "would-fill" scenarios are attributed to queue displacement
  • This is calibrated to produce realistic maker fill rates of ~7085%

Changing this value would break backward compatibility with all historical backtests and invalidate the ROI regression suite. It is tested explicitly: assert DEFAULT_CONFIG.FILL_DISCOUNT_FACTOR == 0.80.


15. Prefect Flow — obf_prefect_flow

Source: prod/obf_prefect_flow.py

15.1 Flow Parameters

Parameter Default Description
warmup_s 8.0 Seconds to wait after WS start before first push
poll_interval_s 0.5 Target cycle time (2 Hz)
assets ["BTCUSDT","ETHUSDT","SOLUSDT"] Assets to track

15.2 Startup Sequence

1. AsyncOBThread.start()           ← daemon thread with its own asyncio loop
2. AsyncOBThread.wait_ready(10s)   ← block until event loop is running
3. time.sleep(warmup_s)            ← allow REST snapshots to complete
4. Log init_status (which assets are synchronized)
5. OBFPersistenceService.start()   ← background Parquet flush thread
6. LiveOBFeatureEngine(assets)     ← initialize rolling state
7. make_hz_client()                ← connect to Hazelcast
8. _hz_preflight(client)           ← probe HZ with dummy put(); retry 5× @ 3s; abort if all fail
9. Enter hot loop

The 8 s warmup is critical. The Binance REST snapshot for 3 assets takes ~200500 ms each. The WS stream starts immediately and buffers events. After 8 s:

  • All 3 REST snapshots have completed in parallel
  • Buffered events have been applied
  • Books are synchronized

If after warmup n_ready == 0, the flow continues but logs a warning. Per-asset dark streak counters will trigger warnings at 5 consecutive dark cycles (2.5s).

HZ Preflight (P1-8): Before entering the hot loop, _hz_preflight() writes a test key (_obf_heartbeat) to Hazelcast. If this fails after 5 retries × 3s delay, the flow aborts rather than running silently with a broken HZ client.

15.3 Hot Loop (steps AK)

A. Per-asset: get_depth_buckets_sync()      [~2 ms including asyncio round-trip]
   ├── _none_streak[asset] counter updated
   ├── warn if _none_streak == _DARK_WARN_AFTER (5 cycles = 2.5s)
   └── log restore if streak clears
B. Stamp local_ts
C. feature_engine.update(raw_snaps)         [~1 ms, numba kernels]
D. Build consolidated HZ payload            [dict operations]
E. HZ circuit breaker check:
   ├── if circuit OPEN: skip E→F, decrement cooldown
   └── if circuit CLOSED:
       E. hz_push_obf_task.submit(asset_payload) × N [fire-and-forget]
       F. hz_push_obf_task.submit(obf_latest).result(timeout=1.5) [blocks]
       G. on failure: _hz_consec_failures++; open circuit if ≥5
H. _write_local_cache() atomic JSON        [~1 ms]
I. Build row dicts + persist.update_snapshot() per asset  [O(1) each]
J. Periodic status log (every 120 pushes = every 60s)
   ├── check ob_thread.is_stale(30s) — log error if WS silent
   ├── log top-3 error keys from push_errors dict
   └── log circuit breaker state if open
K. Sleep remainder of poll_interval_s      [monotonic clock]

Circuit Breaker (P0-1):

  • _HZ_CIRCUIT_OPEN_AFTER = 5 consecutive consolidated push failures → circuit opens
  • _HZ_CIRCUIT_RESET_AFTER = 30 cycles (~15s) cooldown → circuit closes, one probe attempt
  • While circuit is open: HZ push skipped, Parquet persistence and local cache still run
  • Prevents 24-second hot-loop freezes during HZ outages

Dark Streak Counter (P0-4):

  • _none_streak: Dict[str, int] per asset
  • Warns at _DARK_WARN_AFTER = 5 consecutive None cycles (2.5s)
  • Logs restoration when book comes back
  • _all_dark_cycles counter: log.critical at _DARK_CRITICAL_AFTER = 120 (~60s, all assets dark)

15.4 Prefect Best Practices Applied

  • @task(cache_policy=NO_CACHE) on hz_push_obf_task: prevents Prefect from trying to serialize the Hazelcast client object for caching. Without this, Prefect would attempt to pickle the HZ client and fail.
  • retries=3, retry_delay_seconds=2 on HZ push: tolerates transient Hazelcast blips (network reconnection, leader election).
  • get_run_logger() called once inside flow context, not at module level.
  • log_prints=True on @flow: all print() statements inside the flow are captured into Prefect logs.
  • _push_seq and _pushed_at metadata fields in every HZ payload for debugging.

15.5 Local Cache

ob_cache/latest_ob_features.json — atomic write via tmp → rename. Allows the DOLPHIN scanner (running in a separate process) to read the latest OBF state without accessing Hazelcast. PID-stamped temp file prevents collision if multiple flow instances start simultaneously.


16. AsyncOBThread — Async/Sync Bridge

Source: prod/obf_prefect_flow.pyclass AsyncOBThread

16.1 The Problem

OBStreamService is fully async (asyncio). The Prefect flow is synchronous Python. You cannot await inside a @flow function. The solution is to run the asyncio event loop in a dedicated daemon thread.

16.2 Thread Architecture

Main thread (Prefect flow, synchronous)
│
│  AsyncOBThread(threading.Thread, daemon=True)
│  ├── self._loop = asyncio.new_event_loop()
│  ├── asyncio.set_event_loop(self._loop)
│  └── self._loop.run_until_complete(self._run_forever())
│           │
│           ├── asyncio.create_task(self.service.stream())  ← starts WS + REST
│           ├── self._ready.set()                           ← signals main thread
│           └── await asyncio.get_event_loop().create_future()  ← parks forever
│
│  Sync API:
└── get_depth_buckets_sync(asset) → Optional[dict]
        └── asyncio.run_coroutine_threadsafe(
                self.service.get_depth_buckets(asset),
                self._loop
            ).result(timeout=HZ_PUSH_INTERVAL_S)  # 0.5s timeout

16.3 run_coroutine_threadsafe

This stdlib function is the correct way to call async code from a different thread. It:

  1. Creates a concurrent.futures.Future wrapping the coroutine
  2. Schedules it on self._loop (thread-safe)
  3. Returns the future, which .result(timeout=...) blocks on

The 0.5 s timeout matches poll_interval_s. If the book takes more than 0.5 s to respond (e.g., lock contention from a WS event storm), get_depth_buckets_sync() returns None gracefully. This asset is skipped for this cycle and picked up next cycle.

16.4 daemon=True and stop() (P2-5)

The thread is set as a daemon. When the Prefect flow exits, the finally block calls ob_thread.stop() for a graceful shutdown.

stop() signals the asyncio event loop via _stop_future.set_result(None) (thread-safe via call_soon_threadsafe). The _run_forever() coroutine is parked on await self._stop_future and wakes immediately. stop() then calls join(timeout=5.0).

Without stop(), tests that create AsyncOBThread would leave orphaned aiohttp.ClientSession objects and generate ResourceWarning: Unclosed client session noise.


17. OBFPersistenceService — Parquet Layer

Source: prod/obf_persistence.pyclass OBFPersistenceService

17.1 Purpose

Builds a historical record of live order book features for future backtesting. Arrow/Parquet was chosen for:

  • Zero-copy memory mapping when reading
  • Columnar compression (Snappy) with predictable file sizes
  • Hive partitioning compatible with Spark, DuckDB, Pandas, PyArrow Dataset API
  • Schema enforcement at write time (no surprise type coercions)

17.2 Thread Safety Model

Main thread (hot loop, 2 Hz):
  persist.update_snapshot(asset, row)
    └── with self._lock:             ← held for ~1 µs
            self._history[asset].append(row)

Background thread (daemon, "obf-persist"):
  every flush_interval_s (300 s):
    persist._flush()
      └── with self._lock:           ← snapshot all deques
              snapshots = {a: list(q) for a, q in self._history.items()}
          (lock released immediately after copy)
          └── _write_parquet(asset, rows, ...) per asset
              └── build column arrays → pa.table → pq.write_table
              └── atomic: write to .tmp → os.rename to .parquet
              └── md5 checksum written alongside

The lock is held for the minimum possible time: only during the list(q) snapshot copy. Parquet file writing (the slow part) happens outside the lock. This ensures the hot loop is never blocked by disk I/O.

17.3 Rolling Buffer

deque(maxlen=HISTORY_MAXLEN=700) per asset. At poll_interval_s=0.5:

700 × 0.5 s = 350 s ≥ 300 s flush interval  (+ 100 safety margin)

HISTORY_MAXLEN = int(FLUSH_INTERVAL_S / POLL_INTERVAL_S) + 100 = 700 (P0-5 fix).

Previous value was 360 (180s), which was less than the 300s flush interval — up to 120s of data was always silently evicted before each flush. This is now fixed: the deque always covers at least one full flush interval.

First flush timing (P1-9): _run() sleeps FIRST_FLUSH_S = 60 (not flush_interval_s = 300) before the first flush. This ensures the first Parquet file is written within 60s of startup, not 300s. Crashes within the first minute still lose up to 60s of data (WAL is a future improvement, P0-5).

17.4 Atomic Write Protocol

tmp_path = out_path.with_suffix(".tmp")
pq.write_table(table, tmp_path, compression="snappy")
tmp_path.rename(out_path)       # atomic on POSIX filesystems

If the process crashes mid-write, only the .tmp file is orphaned — the target .parquet file is never partially written. Readers always see complete Parquet files.

17.5 MD5 Checksum

After each Parquet file is written, an MD5 checksum file {filename}.parquet.md5 is written alongside:

digest = hashlib.md5(out_path.read_bytes()).hexdigest()
(out_dir / f"{filename}.md5").write_text(digest)

Used for:

  • Data integrity verification when copying files to remote storage
  • Detecting bit rot in long-term archival
  • Validating downloads if files are replicated across nodes

17.6 Hive Partition Structure

/mnt/ng6_data/ob_features/
└── exchange=binance/
    ├── symbol=BTCUSDT/
    │   └── date=2026-03-22/
    │       ├── part-14-30-00.parquet
    │       ├── part-14-30-00.parquet.md5
    │       ├── part-14-35-00.parquet
    │       └── part-14-35-00.parquet.md5
    ├── symbol=ETHUSDT/
    │   └── date=2026-03-22/
    │       └── ...
    └── symbol=SOLUSDT/
        └── ...

Reading with PyArrow Dataset API:

import pyarrow.dataset as ds
dataset = ds.dataset("/mnt/ng6_data/ob_features", partitioning="hive")
table = dataset.to_table(filter=ds.field("symbol") == "BTCUSDT")

Note: When reading a Parquet file directly with pq.read_table(), PyArrow infers and adds the Hive partition columns (exchange, symbol, date) to the schema automatically. These columns are NOT in OB_SCHEMA — they are partition metadata. Schema validation must use a subset check: expected_names ⊆ written_names.


18. Hazelcast Integration

Map: DOLPHIN_FEATURES

18.1 Keys Published by OBF Flow

Key Content Consumers
asset_BTCUSDT_ob Raw OB snapshot (bid/ask notional arrays, best bid/ask, spread) HZOBProvider, alpha engine
asset_ETHUSDT_ob Same for ETH Same
asset_SOLUSDT_ob Same for SOL Same
obf_latest Consolidated all-asset features (all 4 sub-systems flattened) Alpha engine, scanner, monitoring

18.2 obf_latest Payload Structure

{
  "timestamp": "2026-03-22T14:30:00.000000+00:00",
  "local_ts": 1742654200.123,
  "compute_ts": 1742654200.124,
  "assets": ["BTCUSDT", "ETHUSDT", "SOLUSDT"],
  "_push_seq": 42180,
  "_pushed_at": "2026-03-22T14:30:00.131000+00:00",
  "_n_assets_live": 3,
  "_n_assets_total": 3,
  "_all_live": true,
  "market_median_imbalance": 0.142,
  "market_agreement_pct": 0.667,
  "market_depth_pressure": 0.093,
  "macro_depth_velocity": -0.023,
  "macro_cascade_count": 0,
  "macro_acceleration": 0.002,
  "macro_regime_signal": 0,
  "btcusdt_depth_1pct_usd": 183421034.50,
  "btcusdt_depth_quality": 1.12,
  "btcusdt_fill_probability": 0.895,
  "btcusdt_spread_proxy_bps": 0.82,
  "btcusdt_imbalance": 0.187,
  "btcusdt_imbalance_ma5": 0.143,
  "btcusdt_imbalance_persistence": 0.8,
  "btcusdt_depth_asymmetry": 1.43,
  "btcusdt_withdrawal_velocity": -0.012,
  ...  (same fields for ethusdt_, solusdt_)
}

18.3 asset_{ASSET}_ob Payload Structure

{
  "timestamp": 1742654200.123,
  "asset": "BTCUSDT",
  "bid_notional": [92340000.0, 45120000.0, 23400000.0, 14200000.0, 8100000.0],
  "ask_notional": [87210000.0, 41300000.0, 21900000.0, 13800000.0, 7700000.0],
  "bid_depth":    [0.924, 0.451, 0.234, 0.142, 0.081],
  "ask_depth":    [0.872, 0.413, 0.219, 0.138, 0.077],
  "best_bid":     99980.50,
  "best_ask":     99981.20,
  "spread_bps":   0.70,
  "_pushed_at":   "2026-03-22T14:30:00.131000+00:00",
  "_push_seq":    126540
}

19. Timing, Rate Limits, and Lag Detection

19.1 Binance Rate Limits

Endpoint Weight Usage in OBF
wss://fstream.binance.com/stream N/A (push) Continuous, no rate limit
GET /fapi/v1/depth?limit=1000 20 per request Init only + reconnect only

The WebSocket @depth@100ms stream is a server-push stream. Binance pushes diffs every 100ms without any request from the client. There is no rate limit on receiving WS messages.

The REST /depth endpoint (weight 20) is only called:

  1. Once per asset on startup (3 calls total for BTC/ETH/SOL)
  2. Once per asset after any WebSocket reconnection

With a 2,400 weight-per-minute limit on Binance Futures, 3 calls = 60 weight — trivial. Even with frequent reconnections, this will never approach the rate limit.

There is no polling of REST endpoints in the hot loop.

19.2 Three-Timestamp Architecture

Every row in the Parquet output carries three timestamps:

Field Meaning Typical delta from previous
exchange_ts Binance's timestamp in the WS event (when the exchange processed the update) ~100ms before local_ts
local_ts Wall clock when get_depth_buckets() completed (local Python extract time) ~15ms after exchange_ts
pushed_at Wall clock after HZ push and persistence buffer write ~1050ms after local_ts

lag_s = pushed_at local_ts measures the processing latency within one cycle: feature computation + HZ round-trip.

19.3 Drift Warning

Every LOG_STATUS_EVERY = 120 push cycles (60 seconds at 2 Hz), the flow logs:

OBF status | pushes=240 errors=0 lag_s=0.012 assets_live=3/3 files=6

If lag_s > poll_interval_s × 2 (i.e., > 1.0 s for default config):

WARNING: OBF LAG DRIFT DETECTED: lag_s=1.234 > 2×poll_interval=1.000

This indicates the system is falling behind — the push cycle is taking longer than the poll interval. Possible causes: HZ overload, slow disk I/O, CPU saturation.

19.4 WS-to-Local Lag

The exchange_ts vs local_ts difference reflects:

  • Network latency from Binance servers (typically 550ms from a datacenter)
  • Python asyncio scheduling overhead (~1ms)
  • WS message parsing

For monitoring: if local_ts exchange_ts > 200ms consistently, investigate network connectivity or if the event loop is blocked.


20. Parquet Schema — Complete Field Reference

Source: prod/obf_persistence.pyOB_SCHEMA

Total fields: 34. Arrow types chosen for precision vs storage tradeoff:

  • float64 for timing and critical financial values (bid/ask prices, notional)
  • float32 for derived features (acceptable precision, halves storage)
  • int32 for integer signals
  • string for asset identifier (partitioned separately, stored once per file)
Field Arrow Type Description
Timing
exchange_ts float64 Unix seconds, from Binance WS event
local_ts float64 Wall clock at OB extraction
pushed_at float64 Wall clock after HZ push
lag_s float32 pushed_at local_ts
Identity
asset string e.g. "BTCUSDT"
Top-of-Book
best_bid float64 Highest bid price at extraction time
best_ask float64 Lowest ask price at extraction time
spread_bps float32 (best_ask best_bid) / mid × 10,000
Depth Vectors
bid_notional_0 float64 USD bids within 01% of mid
bid_notional_1 float64 USD bids within 12% of mid
bid_notional_2 float64 USD bids within 23% of mid
bid_notional_3 float64 USD bids within 34% of mid
bid_notional_4 float64 USD bids within 45% of mid
ask_notional_0 float64 USD asks within 01% of mid
ask_notional_1 float64 USD asks within 12% of mid
ask_notional_2 float64 USD asks within 23% of mid
ask_notional_3 float64 USD asks within 34% of mid
ask_notional_4 float64 USD asks within 45% of mid
Sub-system 1: Placement
depth_1pct_usd float64 bid_notional_0 + ask_notional_0
depth_quality float32 depth_1pct_usd / median_ref
fill_probability float32 1 exp(2 × depth_quality)
spread_proxy_bps float32 Derived spread indicator
Sub-system 2: Signal
imbalance float32 (bid ask) / (bid + ask) across all 5 levels
imbalance_ma5 float32 5-snapshot smoothed imbalance
imbalance_persistence float32 Fraction of last 10 with same sign
depth_asymmetry float32 (near_bid+ask) / (mid_bid+ask)
withdrawal_velocity float32 Rate of depth drain over 30 s
Sub-system 3: Market
median_imbalance float32 Cross-asset median imbalance
agreement_pct float32 Fraction of assets agreeing with median
depth_pressure float32 Mean cross-asset imbalance
Sub-system 4: Macro
depth_velocity float32 Mean per-asset withdrawal velocity
cascade_count int32 Assets with velocity < 0.10
acceleration float32 2nd derivative of aggregate depth
regime_signal int32 1=CALM, 0=NEUTRAL, 1=STRESS

21. Neutral Defaults and Graceful Degradation

All four sub-systems have defined neutral defaults, returned when data is unavailable:

NEUTRAL_PLACEMENT = OBPlacementFeatures(
    depth_1pct_usd=0.0,     # no depth data
    depth_quality=1.0,       # don't penalize — assume normal
    fill_probability=0.5,    # uncertain
    spread_proxy_bps=2.0,    # moderate spread assumption
)

NEUTRAL_SIGNAL = OBSignalFeatures(
    imbalance=0.0,           # no directional bias
    imbalance_ma5=0.0,
    imbalance_persistence=0.5,
    depth_asymmetry=1.0,
    withdrawal_velocity=0.0,
)

NEUTRAL_MARKET = OBMarketFeatures(
    median_imbalance=0.0,
    agreement_pct=0.5,       # 50% = uncertain, not 0%
    depth_pressure=0.0,
)

NEUTRAL_MACRO = OBMacroFeatures(
    depth_velocity=0.0,
    cascade_count=0,
    acceleration=0.0,
    regime_signal=0,          # NEUTRAL (not STRESS)
)

The choice of agreement_pct=0.5 for neutral (not 0.0 or 1.0) is deliberate: it represents maximum uncertainty, not confirmed disagreement or agreement.

The OBFeatureEngine returns these neutrals when:

  • _preloaded = False (preload not yet run)
  • Requested asset not in preloaded data
  • Requested snapshot index out of range

The LiveOBFeatureEngine returns per_asset[asset] = None when the book for that asset is not yet initialized.


22. Import Architecture and the Shim Loader

22.1 The Problem

nautilus_dolphin/nautilus_dolphin/nautilus/__init__.py contains:

import nautilus_trader  # ← raises RuntimeError if not installed

The modules ob_provider.py, ob_features.py, ob_placer.py use relative imports:

from .ob_provider import OBProvider, OBSnapshot

Relative imports require a proper Python package context (__package__ must be set). Simply adding the nautilus/ directory to sys.path breaks relative imports.

22.2 The Shim Solution

tests/_naut_ob_shim.py creates a fake package namespace _naut_ob using importlib.util:

import types, sys, importlib.util
from pathlib import Path

_naut_dir = Path(__file__).parent.parent / "nautilus_dolphin" / "nautilus_dolphin" / "nautilus"

# Create fake package
_p = types.ModuleType("_naut_ob")
_p.__path__ = [str(_naut_dir)]
_p.__package__ = "_naut_ob"
sys.modules["_naut_ob"] = _p

# Load each module as _naut_ob.{name}
def _load(name):
    fn = f"_naut_ob.{name}"
    sp = importlib.util.spec_from_file_location(fn, _naut_dir / f"{name}.py")
    m = importlib.util.module_from_spec(sp)
    m.__package__ = "_naut_ob"   # ← makes relative imports work
    sys.modules[fn] = m
    sp.loader.exec_module(m)
    return m

With m.__package__ = "_naut_ob", when ob_features.py executes from .ob_provider import OBSnapshot, Python resolves it as from _naut_ob.ob_provider import OBSnapshot — which exists in sys.modules because _load("ob_provider") was called first.

prod/obf_persistence.py contains the same shim logic inline in LiveOBFeatureEngine.update() as a fallback, allowing the production flow to work both with and without nautilus_trader installed.


23. Test Architecture

23.1 Unit Tests — tests/test_obf_unit.py (120 tests)

Class Tests What it covers
TestImbalanceKernel 7 compute_imbalance_nb: zero case, full bias, partial, symmetric
TestDepthKernels 9 depth_1pct, depth_quality, fill_probability, spread_proxy, depth_asymmetry
TestRollingKernels 10 persistence, withdrawal_velocity, short history edge cases
TestMockOBProvider 8 bias mechanics, depth_scale, per-asset overrides, timestamps
TestCSVOBProviderKnownValues 6 Anchored against real 2025-01-15 BTC data
TestOBFeatureEngine 11 preload_date, all 4 sub-systems, neutral returns
TestLiveOBFeatureEngine 10 incremental update, calibration, market/macro, None snap handling
TestOBPlacer 10 all decision branches, OB confirmation, SKIP/TAKER/MAKER/PATIENT_MAKER
TestBacktestAdapterStochastic 6 fill_prob formula, size penalty, FILL_DISCOUNT_FACTOR invariant
TestSmartPlacerConstants 4 frozen dataclass, threshold ordering, timeout ordering
TestSmartPlacerDecisions 8 decide_placement, compute_limit_price, ob_confirms_signal
TestOBFFlowImport 6 AsyncOBThread, OBFPersistenceService, constants
TestHZOBProvider (new) 7 Deserialise HZ payload, missing key→None, malformed JSON, dtype, discovery
TestOBStreamServiceStaleDetection (new) 4 is_stale(): cold/recent/old/custom threshold
TestOBStreamServiceCrossedBook (new) 4 Crossed book→None, locked book→None, valid book→dict, uninit→None
TestOBStreamServiceBufferReplay (new) 3 Buffer drain: stale events discarded, zero-qty removal, level update

Critical invariant test:

def test_fill_discount_is_0_80(self):
    self.assertAlmostEqual(DEFAULT_CONFIG.FILL_DISCOUNT_FACTOR, 0.80)

Known-value anchor (from 2025-01-15 CSV data):

def test_btc_depth_1pct_above_100m(self):
    # BTC 1% depth should be > $100M on a normal trading day
    snap = self.provider.get_snapshot_by_index("BTCUSDT", 10)
    d1pct = snap.bid_notional[0] + snap.ask_notional[0]
    self.assertGreater(d1pct, 100_000_000)

23.2 E2E Tests — tests/test_obf_e2e.py (56 tests)

Class Tests What it covers
TestOBStreamServiceMockState 8 _apply_event, zero qty removal, best bid/ask, bucket shape
TestFullPipeline 7 End-to-end: MockOBProvider → OBFeatureEngine → OBPlacer
TestParquetPersistence 10 force_flush, schema, row counts, hive partition, MD5, lag_s, multi-asset
TestPersistenceServiceLifecycle 6 start/stop, thread daemon, concurrent updates, stats
TestReplayPipeline 5 CSVOBProvider → OBFeatureEngine with real 2025-01-15 data
TestDualPathValidation 5 Live + stochastic paths run simultaneously without interference
TestOBFPrefectIntegration 14 Flow constants, AsyncOBThread, warmup, HZ key format, lag detection

Dual-path test (the most important E2E test):

def test_live_and_stochastic_coexist(self):
    # Live feature engine should not affect BacktestAdapter state
    live_engine = LiveOBFeatureEngine(["BTCUSDT"])
    backtest_adapter = BacktestAdapter(price_data, ...)

    # Run both simultaneously
    for _ in range(10):
        snap = {"BTCUSDT": mock_snap}
        live_result = live_engine.update(snap)
        backtest_adapter.set_time(t)
        fills = backtest_adapter.submit_order("BTCUSDT", "LONG", 100000, 0.1, "MAKER")

    # Neither should corrupt the other's state
    self.assertIsNotNone(live_result)
    self.assertIsNotNone(fills)

23.3 Running Tests

# Unit tests only (fast, ~23s — numba warmup on first run)
python -m unittest tests.test_obf_unit -v

# E2E tests only (slower, ~183s — includes CSV loads and Parquet writes)
python -m unittest tests.test_obf_e2e -v

# Full suite (ExtF + EsoF + OBF)
python tests/run_all_tests.py

24. Invariants That Must Never Change

These are contractual constants embedded in tests and regression suites:

Invariant Value Reason
FILL_DISCOUNT_FACTOR 0.80 Calibrated against historical fill rate data; changing it invalidates all backtest PnL history
CONFIDENCE_TAKER_THRESHOLD 0.85 Hardcoded in OBPlacer, SmartPlacer, constants — must be identical in all three
CONFIDENCE_MAKER_THRESHOLD 0.40 Same as above
CASCADE_THRESHOLD 0.10 Regime detection boundary; changing it changes which macro regimes are declared STRESS
MEDIAN_CALIB_N 60 Warmup period for live depth reference; first 60 live cycles produce depth_quality=1.0
OBSnapshot array index convention [0] = near, [4] = far Used in every kernel and CSV parser; reversing would silently corrupt all features
CSV file date filter cutoff <= file_date < reference_date Anti-leakage; including reference_date itself would allow future data into median computation

25. Expected Performance Characteristics

25.1 Computational Load

Component Time Notes
get_depth_buckets_sync() per asset ~2 ms asyncio round-trip + sort + bin
LiveOBFeatureEngine.update() (3 assets) ~1 ms Numba kernels are JIT-compiled; first call ~2s warmup
hz_push_obf_task() (submit, not result) ~0.5 ms Non-blocking submit
_write_local_cache() ~1 ms Small JSON file
persist.update_snapshot() × 3 <0.01 ms Deque append, lock-protected
Total hot loop ~5 ms Leaving ~495 ms of sleep per 0.5 s cycle
OBFPersistenceService._flush() (300 assets rows × 3) ~50 ms Parquet write, happens in background thread

25.2 Memory Usage

Component Memory
In-memory L2 book (3 assets, up to 1000 levels each) ~6 MB
Rolling deques (LiveOBFeatureEngine, 3 assets, 60 samples) ~50 KB
Rolling deques (OBFPersistenceService, 3 assets, 360 samples × 34 fields) ~1.5 MB
OBF Parquet file (per 5-min flush, 3 assets, ~360 rows) ~150 KB (snappy compressed)

25.3 Parquet Storage Growth

At 2 Hz, 3 assets, 5-minute flush intervals:

Rows per file per asset: 360
Rows per day per asset:  2 × 60 × 60 × 24 = 172,800
Files per day per asset: 288 (one per 5 minutes)
Compressed file size:    ~150 KB per 5-min file
Storage per asset/day:   ~43 MB (uncompressed ~200 MB)
Storage all 3 assets/day: ~130 MB compressed
Storage per year:         ~47 GB compressed

25.4 Known-Value Anchors (from 2025-01-15 BTC data)

Metric Expected value
depth_1pct_usd for BTC > $100M
imbalance range [-1.0, +1.0], typical
fill_probability at normal depth ~0.860.95
agreement_pct in trending market > 0.67 (2/3 assets agree)
WS event latency (local_ts exchange_ts) 550ms from datacenter
lag_s (hot loop processing latency) < 20ms typically

26. Future: Nautilus Trader Integration

The user has explicitly stated: "at some point we will bring Nautilus Trader in to handle the 'actual trade execution' for the system."

The OBF subsystem is designed to accommodate this:

26.1 What Changes with Nautilus Trader

Nautilus Trader is a high-performance trading framework with:

  • Its own order book management (nautilus_trader.model.orderbook)
  • Direct exchange connectivity (Binance Futures connector exists)
  • Native order submission and fill management

When integrated:

  • OBStreamService and AsyncOBThread will be replaced by Nautilus's native BinanceFuturesDataClient
  • BacktestAdapter will be replaced by Nautilus's backtesting engine
  • LiveAdapter (currently a stub) will be replaced by Nautilus's BinanceFuturesExecutionClient

26.2 What Stays the Same

  • OBFeatureEngine kernels (numba functions are framework-agnostic)
  • OBSnapshot dataclass (Nautilus provides raw data; we still need to convert to our format)
  • LiveOBFeatureEngine (logic unchanged, just fed from Nautilus's data stream)
  • OBFPersistenceService (Parquet persistence is independent)
  • OBPlacer and SmartPlacer (decision logic unchanged)
  • FILL_DISCOUNT_FACTOR = 0.80 (preserved for regression continuity)
  • All Parquet data accumulated before integration (directly usable for Nautilus backtesting)

26.3 Transition Path

The cleanest transition:

  1. Add a NautilusOBProvider(OBProvider) that reads from Nautilus's order book
  2. Add a NautilusAdapter(ExchangeAdapter) that wraps Nautilus order submission
  3. Run both OBStreamService and Nautilus in parallel for a validation period
  4. Cross-validate: LiveOBFeatureEngine.update() outputs should match between sources
  5. Cut over once cross-validation passes (use HZOBProvider to bridge the data)

The existing Parquet archive (/mnt/ng6_data/ob_features/) is directly usable as Nautilus's historical data source after format adaptation.


27. Deployment

27.1 Direct Launch (Testing / Development)

cd /mnt/dolphinng5_predict
PREFECT_API_URL=http://localhost:4200/api \
nohup python3 prod/obf_prefect_flow.py > /tmp/obf_prefect.log 2>&1 &

27.2 Prefect Deployment

# Register the deployment
prefect deploy -n obf-daemon --pool dolphin

# Or via YAML
prefect deploy --all  # picks up prod/obf_deployment.yaml

prod/obf_deployment.yaml:

name: obf-daemon
entrypoint: prod/obf_prefect_flow.py:obf_prefect_flow
version: 1.0.0
tags: ["obf", "order-book", "production"]
work_pool:
  name: dolphin
  work_queue_name: default
parameters:
  warmup_s: 8
  poll_interval_s: 0.5
  assets: [BTCUSDT, ETHUSDT, SOLUSDT]
infrastructure:
  type: process

27.3 Adding an Asset

  1. Add the asset to assets in obf_deployment.yaml
  2. Add a CSV file ob_data/{ASSET}-bookDepth-{DATE}.csv for historical replay
  3. The Binance WS stream supports multiple assets in a single connection (comma-separated streams URL)
  4. No code changes required

27.4 Monitoring

Prefect UI → obf-prefect-flow → most recent run. Every 60 seconds the flow logs:

OBF status | pushes=240 errors=0 lag_s=0.012 assets_live=3/3 files=6

Critical alerts:

  • errors > 0 and increasing: Hazelcast connectivity issue
  • assets_live < 3: One or more books failed to initialize or lost WS connection
  • lag_s > 1.0s: System is overloaded or HZ is slow
  • LAG DRIFT DETECTED: Investigate immediately

28. Known Caveats and Edge Cases

28.1 Pre-calibration Period (First 30 Seconds Live)

For the first 60 samples (30 seconds at 0.5 s polling), depth_quality = 1.0 for all assets because _median_ref is not yet calibrated. During this period:

  • fill_probability will be 1 - e^{-2} ≈ 0.865 regardless of actual depth
  • This is acceptable — it represents maximum uncertainty, which defaults to the "normal" case

28.2 Reconnection Gaps

After a WebSocket reconnection, affected assets return None from get_depth_buckets_sync() until the REST snapshot completes (~200500 ms). The flow skips those assets for those cycles. The _n_assets_live counter in obf_latest will reflect this.

28.3 OBFeatureEngine Macro State (Batch Mode) — FIXED

_preloaded_macro is a single OBMacroFeatures value. Fixed in sprint 1 (P1-3).

_preloaded_macro_map: Dict[int, OBMacroFeatures] now stores a separate value for each snapshot index. get_macro(timestamp_or_idx) resolves the correct bar and returns the right macro state. Backtests at any bar index now receive the macro regime as it was at that exact time, not the end-of-day value.

28.4 Bar Index Alignment (6:1 ratio) — CONFIGURABLE

_resolve_idx(asset, bar_idx) uses bar_idx // self.bar_to_snap_ratio to convert VBT 5-second bar indices to OB 30-second snapshot indices. Defaults assume:

  • VBT uses exactly 5 second bars
  • OB snapshots are exactly 30 seconds apart
  • Timestamps start at the same UTC epoch

bar_to_snap_ratio is now a constructor parameter (P2-7 fix). Default remains 6. Pass a different value if bar granularity changes.

28.5 compute_market_agreement_nb with Uniform Zero Imbalance

If all(imbalance == 0.0) (e.g., all assets are perfectly balanced), median == 0.0 and the kernel returns (0.0, 0.5) — neutral agreement. This is the spec-defined behavior. Tests verify this via MockOBProvider(imbalance_bias=0.0).

28.6 Hive Partition Columns in PyArrow Schema

When reading a Hive-partitioned Parquet directory with pyarrow.dataset or pq.read_table(), PyArrow adds exchange, symbol, and date columns to the schema. These are NOT in OB_SCHEMA. Code that compares schema names must use subset checking, not equality.

28.7 CSV File Date Boundary

CSVOBProvider loads files where cutoff_start <= file_date < reference_date. To use data from date 2025-01-15, call preload_date("2025-01-16", ...). This is intentional (anti-leakage) but non-obvious. Tests document this with preload_date("2025-01-16", ["BTCUSDT"]) to access 2025-01-15 data.



29. Reliability Mechanisms (Sprint 1 Hardening)

All items below were identified in the P0/P1/P2 audit and implemented in sprint 1 (2026-03-22).

Mechanism Trigger Action Where
HZ Circuit Breaker 5 consecutive consolidated push failures Skip all HZ for 30 cycles (~15s), then probe again obf_prefect_flow.py
HZ Preflight Startup before hot loop Write test key, retry 5×@3s; abort flow if all fail obf_prefect_flow.py
WS Stall Watchdog last_event_ts not updated for >30s log.error every status cycle; is_stale() exposed for callers ob_stream_service.py
Crossed Book Guard best_bid >= best_ask Return None from get_depth_buckets(); no features computed ob_stream_service.py
Dark Streak Counter Asset returns None for N cycles Warn at 5 (2.5s); log restore; critical log at 120 all-dark (60s) obf_prefect_flow.py
Exponential Backoff WS reconnection failures 3→6→12→…→120s, ±1s jitter; resets after 60s stable ob_stream_service.py
Fire-and-forget pushes Per-asset HZ pushes submit() without .result(); only consolidated push blocks obf_prefect_flow.py
Per-key error tracking HZ push failures push_errors: Dict[str, int]; top-3 logged obf_prefect_flow.py
First flush at 60s Startup First Parquet file written 60s after start (not 5min) obf_persistence.py
Buffer covers flush window Continuous HISTORY_MAXLEN=700 ≥ FLUSH_INTERVAL/POLL + margin obf_persistence.py
Microsecond filename Double flush within 1s %H-%M-%S-%f prevents collision obf_persistence.py
Partition cleanup Each flush Prune date= dirs older than MAX_FILE_AGE_DAYS=7 obf_persistence.py
Shared aiohttp session REST fetches Single session for stream() lifetime; no per-call SSL handshake ob_stream_service.py
Per-timestamp macro Backtesting _preloaded_macro_map[snap_idx] — not a single end-of-day value ob_features.py
Dynamic HZ asset discovery HZOBProvider.get_assets() Key-set scan for asset_*_ob; no hardcoded list hz_ob_provider.py
AsyncOBThread.stop() Flow shutdown/test teardown Signal _stop_future; thread is joinable obf_prefect_flow.py

This document was generated from exhaustive study of all OBF-related source code and updated after sprint 1 hardening. Sources: external_factors/ob_stream_service.py, nautilus_dolphin/nautilus_dolphin/nautilus/ob_provider.py, nautilus_dolphin/nautilus_dolphin/nautilus/ob_features.py, nautilus_dolphin/nautilus_dolphin/nautilus/ob_placer.py, nautilus_dolphin/nautilus_dolphin/nautilus/hz_ob_provider.py, alpha_engine/execution/smart_placer.py, alpha_engine/execution/smartplacer_constants.py, alpha_engine/execution/adapters.py, alpha_engine/execution/fill_simulator.py, prod/obf_persistence.py, prod/obf_prefect_flow.py, prod/obf_deployment.yaml, tests/test_obf_unit.py, tests/test_obf_e2e.py, tests/_naut_ob_shim.py, scripts/verify_parquet_archive.py, ob_cache/SCHEMA.md.