229 lines
9.2 KiB
Python
229 lines
9.2 KiB
Python
|
|
import asyncio
|
||
|
|
import aiohttp
|
||
|
|
import json
|
||
|
|
import time
|
||
|
|
import logging
|
||
|
|
import numpy as np
|
||
|
|
from typing import Dict, List, Optional
|
||
|
|
from collections import defaultdict
|
||
|
|
|
||
|
|
# Setup basic logging
|
||
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s')
|
||
|
|
logger = logging.getLogger("OBStreamService")
|
||
|
|
|
||
|
|
try:
|
||
|
|
import websockets
|
||
|
|
except ImportError:
|
||
|
|
logger.warning("websockets package not found. Run pip install websockets aiohttp")
|
||
|
|
|
||
|
|
class OBStreamService:
|
||
|
|
"""
|
||
|
|
Real-Time Order Book Streamer for Binance Futures.
|
||
|
|
Connects via WebSockets to maintain a perfectly synchronized local L2 Book,
|
||
|
|
and slices the book into 5% notional depth buckets dynamically for the
|
||
|
|
SmartPlacer and OBFeatureEngine layers.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, assets: List[str], max_depth_pct: int = 5):
|
||
|
|
self.assets = [a.upper() for a in assets]
|
||
|
|
self.streams = [f"{a.lower()}@depth@100ms" for a in self.assets]
|
||
|
|
self.max_depth_pct = max_depth_pct
|
||
|
|
|
||
|
|
# In-memory Order Book caches (Price -> Quantity)
|
||
|
|
self.bids: Dict[str, Dict[float, float]] = {a: {} for a in self.assets}
|
||
|
|
self.asks: Dict[str, Dict[float, float]] = {a: {} for a in self.assets}
|
||
|
|
|
||
|
|
# Synchronization mechanisms
|
||
|
|
self.last_update_id: Dict[str, int] = {a: 0 for a in self.assets}
|
||
|
|
self.buffer: Dict[str, List[dict]] = {a: [] for a in self.assets}
|
||
|
|
self.initialized: Dict[str, bool] = {a: False for a in self.assets}
|
||
|
|
|
||
|
|
# Optional: Lock for thread-safe reads if requested asynchronously
|
||
|
|
self.locks: Dict[str, asyncio.Lock] = {a: asyncio.Lock() for a in self.assets}
|
||
|
|
|
||
|
|
async def fetch_snapshot(self, asset: str):
|
||
|
|
"""Fetch REST snapshot of the Order Book to initialize local state."""
|
||
|
|
url = f"https://fapi.binance.com/fapi/v1/depth?symbol={asset}&limit=1000"
|
||
|
|
try:
|
||
|
|
async with aiohttp.ClientSession() as session:
|
||
|
|
async with session.get(url) as resp:
|
||
|
|
data = await resp.json()
|
||
|
|
|
||
|
|
if 'lastUpdateId' not in data:
|
||
|
|
logger.error(f"Failed to fetch snapshot for {asset}: {data}")
|
||
|
|
return
|
||
|
|
|
||
|
|
last_id = data['lastUpdateId']
|
||
|
|
|
||
|
|
async with self.locks[asset]:
|
||
|
|
self.bids[asset] = {float(p): float(q) for p, q in data['bids']}
|
||
|
|
self.asks[asset] = {float(p): float(q) for p, q in data['asks']}
|
||
|
|
self.last_update_id[asset] = last_id
|
||
|
|
|
||
|
|
# Apply any buffered updates
|
||
|
|
buffered = self.buffer[asset]
|
||
|
|
for event in buffered:
|
||
|
|
if event['u'] <= last_id:
|
||
|
|
continue # Ignore old events
|
||
|
|
self._apply_event(asset, event)
|
||
|
|
|
||
|
|
self.buffer[asset].clear()
|
||
|
|
self.initialized[asset] = True
|
||
|
|
|
||
|
|
logger.info(f"Synchronized L2 book for {asset} (UpdateId: {last_id})")
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Error initializing snapshot for {asset}: {e}")
|
||
|
|
|
||
|
|
def _apply_event(self, asset: str, event: dict):
|
||
|
|
"""Apply a streaming diff event to the local book."""
|
||
|
|
bids = self.bids[asset]
|
||
|
|
asks = self.asks[asset]
|
||
|
|
|
||
|
|
# Process Bids
|
||
|
|
for p_str, q_str in event['b']:
|
||
|
|
p, q = float(p_str), float(q_str)
|
||
|
|
if q == 0.0:
|
||
|
|
bids.pop(p, None)
|
||
|
|
else:
|
||
|
|
bids[p] = q
|
||
|
|
|
||
|
|
# Process Asks
|
||
|
|
for p_str, q_str in event['a']:
|
||
|
|
p, q = float(p_str), float(q_str)
|
||
|
|
if q == 0.0:
|
||
|
|
asks.pop(p, None)
|
||
|
|
else:
|
||
|
|
asks[p] = q
|
||
|
|
|
||
|
|
self.last_update_id[asset] = event['u']
|
||
|
|
|
||
|
|
async def stream(self):
|
||
|
|
"""Main loop: connect to WebSocket streams and maintain books."""
|
||
|
|
import websockets
|
||
|
|
|
||
|
|
# 1. Fire off REST snapshot initialization concurrently
|
||
|
|
for a in self.assets:
|
||
|
|
asyncio.create_task(self.fetch_snapshot(a))
|
||
|
|
|
||
|
|
# 2. Start WebSocket listening instantly to buffer diffs
|
||
|
|
stream_url = "wss://fstream.binance.com/stream?streams=" + "/".join(self.streams)
|
||
|
|
logger.info(f"Connecting to Binance Stream: {stream_url}")
|
||
|
|
|
||
|
|
while True:
|
||
|
|
try:
|
||
|
|
async with websockets.connect(stream_url, ping_interval=20, ping_timeout=20) as ws:
|
||
|
|
logger.info("WebSocket connected. Streaming depth diffs...")
|
||
|
|
while True:
|
||
|
|
msg = await ws.recv()
|
||
|
|
data = json.loads(msg)
|
||
|
|
|
||
|
|
if 'data' in data:
|
||
|
|
ev = data['data']
|
||
|
|
asset = ev['s'].upper()
|
||
|
|
|
||
|
|
async with self.locks[asset]:
|
||
|
|
if not self.initialized[asset]:
|
||
|
|
self.buffer[asset].append(ev)
|
||
|
|
else:
|
||
|
|
self._apply_event(asset, ev)
|
||
|
|
|
||
|
|
except websockets.exceptions.ConnectionClosed as e:
|
||
|
|
logger.warning(f"WebSocket closed ({e}). Reconnecting in 3s...")
|
||
|
|
# Require re-init on disconnect to prevent drifted states
|
||
|
|
for a in self.assets:
|
||
|
|
self.initialized[a] = False
|
||
|
|
asyncio.create_task(self.fetch_snapshot(a))
|
||
|
|
await asyncio.sleep(3)
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Stream error: {e}")
|
||
|
|
await asyncio.sleep(3)
|
||
|
|
|
||
|
|
async def get_depth_buckets(self, asset: str) -> Optional[dict]:
|
||
|
|
"""
|
||
|
|
Extract the Notional Depth vectors matching OBSnapshot.
|
||
|
|
Creates 5 elements summing USD depth between 0-1%, 1-2%, ..., 4-5% from mid.
|
||
|
|
"""
|
||
|
|
async with self.locks[asset]:
|
||
|
|
if not self.initialized[asset]:
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Extract and sort bids (descending) & asks (ascending)
|
||
|
|
bids = sorted(self.bids[asset].items(), key=lambda x: -x[0])
|
||
|
|
asks = sorted(self.asks[asset].items(), key=lambda x: x[0])
|
||
|
|
|
||
|
|
if not bids or not asks:
|
||
|
|
return None
|
||
|
|
|
||
|
|
best_bid = bids[0][0]
|
||
|
|
best_ask = asks[0][0]
|
||
|
|
mid = (best_bid + best_ask) / 2.0
|
||
|
|
|
||
|
|
bid_not = np.zeros(self.max_depth_pct, dtype=np.float64)
|
||
|
|
ask_not = np.zeros(self.max_depth_pct, dtype=np.float64)
|
||
|
|
bid_dep = np.zeros(self.max_depth_pct, dtype=np.float64)
|
||
|
|
ask_dep = np.zeros(self.max_depth_pct, dtype=np.float64)
|
||
|
|
|
||
|
|
# Bin bids into percentages
|
||
|
|
for p, q in bids:
|
||
|
|
dist_pct = (mid - p) / mid * 100
|
||
|
|
idx = int(dist_pct)
|
||
|
|
if idx < self.max_depth_pct:
|
||
|
|
bid_not[idx] += p * q
|
||
|
|
bid_dep[idx] += q
|
||
|
|
else: # Since sorted, if we exceed max distance, we can safely break
|
||
|
|
break
|
||
|
|
|
||
|
|
# Bin asks into percentages
|
||
|
|
for p, q in asks:
|
||
|
|
dist_pct = (p - mid) / mid * 100
|
||
|
|
idx = int(dist_pct)
|
||
|
|
if idx < self.max_depth_pct:
|
||
|
|
ask_not[idx] += p * q
|
||
|
|
ask_dep[idx] += q
|
||
|
|
else:
|
||
|
|
break
|
||
|
|
|
||
|
|
return {
|
||
|
|
"timestamp": time.time(),
|
||
|
|
"asset": asset,
|
||
|
|
"bid_notional": bid_not,
|
||
|
|
"ask_notional": ask_not,
|
||
|
|
"bid_depth": bid_dep,
|
||
|
|
"ask_depth": ask_dep,
|
||
|
|
"best_bid": best_bid,
|
||
|
|
"best_ask": best_ask,
|
||
|
|
"spread_bps": (best_ask - best_bid) / mid * 10_000
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
# -----------------------------------------------------------------------------
|
||
|
|
# Standalone run/test hook
|
||
|
|
# -----------------------------------------------------------------------------
|
||
|
|
async def demo():
|
||
|
|
assets_to_track = ["BTCUSDT", "ETHUSDT", "SOLUSDT"]
|
||
|
|
service = OBStreamService(assets=assets_to_track)
|
||
|
|
|
||
|
|
# Run the streaming listener in the background
|
||
|
|
asyncio.create_task(service.stream())
|
||
|
|
|
||
|
|
await asyncio.sleep(4) # Let it initialize
|
||
|
|
|
||
|
|
for _ in range(3):
|
||
|
|
print("\n--- Current Real-Time OB Snapshots ---")
|
||
|
|
for asset in assets_to_track:
|
||
|
|
snap = await service.get_depth_buckets(asset)
|
||
|
|
if snap:
|
||
|
|
imb = (snap['bid_notional'][0] - snap['ask_notional'][0]) / (snap['bid_notional'][0] + snap['ask_notional'][0] + 1e-9)
|
||
|
|
b1 = snap['bid_notional'][0]
|
||
|
|
a1 = snap['ask_notional'][0]
|
||
|
|
print(f"{asset:10s} | Spread: {snap['spread_bps']:.2f} bps | 1% Bid: ${b1:,.0f} | 1% Ask: ${a1:,.0f} | 1% Imb: {imb:+.3f}")
|
||
|
|
else:
|
||
|
|
print(f"{asset:10s} | Waiting for init...")
|
||
|
|
await asyncio.sleep(2)
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
try:
|
||
|
|
asyncio.run(demo())
|
||
|
|
except KeyboardInterrupt:
|
||
|
|
print("OB Streamer shut down manually.")
|