Files
DOLPHIN/external_factors/realtime_exf_service.py
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

1045 lines
41 KiB
Python
Executable File

#!/usr/bin/env python3
"""
REAL-TIME EXTERNAL FACTORS SERVICE v1.0
========================================
Production-grade, HFT-optimized external factors service.
Key design decisions (empirically validated 2026-02-27, 54-day backtest):
- Per-indicator adaptive polling at native API resolution
- Uniform lag=1 day (ROBUST: +3.10% ROI, -2.02% DD, zero overfit risk)
- Binary gating (no confidence weighting - empirically validated)
- Never blocks consumer: get_indicators() returns cached data in <1ms
- Dual output: NPZ (legacy) + Arrow (new)
Empirical validation vs baseline (54-day backtest):
N: No ACB: ROI=+7.51%, DD=18.34%
A: Current (lag=0 daily avg): ROI=+9.33%, DD=12.04% <-- current production
L1: Uniform lag=1: ROI=+12.43%, DD=10.02% <-- THIS SERVICE DEFAULT
MO: Mixed optimal lags: ROI=+13.31%, DD=9.10% <-- experimental (needs 80+ days)
MS: Mixed + synth intra-day: ROI=+16.00%, DD=9.92% <-- future (needs VBT changes)
TODO (ordered by priority):
1. [CRITICAL] Re-validate lag=1 with 80+ days of data for statistical robustness
2. [HIGH] Fix the 50 dead indicators (see DEAD_INDICATORS below)
3. [HIGH] Test each repaired indicator isolated against ACB & alpha engine
4. [HIGH] Move from per-day ACB to intra-day continuous ACB once VBT supports it
5. [MED] Switch to per-indicator optimal lags once 80+ days available
6. [MED] Implement adaptive variance estimator for poll interval tuning
7. [MED] Add Arrow dual output (schema defined, writer implemented)
8. [LOW] FRED indicators: handle weekend/holiday gaps (fill-forward last value)
9. [LOW] CoinMetrics indicators: fix parse_cm returning 0 (API may need auth)
10.[LOW] Tune system sync to never generate signals with stale/missing data
"""
import asyncio
import aiohttp
import numpy as np
import time
import logging
import json
from pathlib import Path
from datetime import datetime, timezone
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Any
from collections import deque, defaultdict
from enum import Enum
import threading
logger = logging.getLogger(__name__)
# =====================================================================
# INDICATOR METADATA (from empirical analysis)
# =====================================================================
@dataclass
class IndicatorMeta:
"""Per-indicator configuration derived from empirical testing."""
name: str
source: str # API provider
url: str # Real-time endpoint
parser: str # Parser method name
poll_interval_s: float # Native update rate (seconds)
optimal_lag_days: int # Information discount lag (empirically measured)
lag_correlation: float # Pearson r at optimal lag
lag_pvalue: float # Statistical significance
acb_critical: bool # Used by ACB v2/v3
category: str # derivatives/onchain/macro/etc
# Empirically measured optimal lags (from lag_correlation_analysis):
# dvol_btc: lag=1, r=-0.4919, p=0.0002 (strongest)
# taker: lag=1, r=-0.4105, p=0.0034
# dvol_eth: lag=1, r=-0.4246, p=0.0015
# funding_btc: lag=5, r=+0.3892, p=0.0057 (slow propagation)
# ls_btc: lag=0, r=+0.2970, p=0.0362 (immediate)
# funding_eth: lag=3, r=+0.2026, p=0.1539 (not significant)
# vix: lag=1, r=-0.2044, p=0.2700 (not significant)
# fng: lag=5, r=-0.1923, p=0.1856 (not significant)
INDICATORS = {
# BINANCE DERIVATIVES (rate limit: 1200/min)
'funding_btc': IndicatorMeta('funding_btc', 'binance',
'https://fapi.binance.com/fapi/v1/fundingRate?symbol=BTCUSDT&limit=1',
'parse_binance_funding', 28800, 5, 0.3892, 0.0057, True, 'derivatives'),
'funding_eth': IndicatorMeta('funding_eth', 'binance',
'https://fapi.binance.com/fapi/v1/fundingRate?symbol=ETHUSDT&limit=1',
'parse_binance_funding', 28800, 3, 0.2026, 0.1539, True, 'derivatives'),
'oi_btc': IndicatorMeta('oi_btc', 'binance',
'https://fapi.binance.com/fapi/v1/openInterest?symbol=BTCUSDT',
'parse_binance_oi', 300, 0, 0, 1.0, False, 'derivatives'),
'oi_eth': IndicatorMeta('oi_eth', 'binance',
'https://fapi.binance.com/fapi/v1/openInterest?symbol=ETHUSDT',
'parse_binance_oi', 300, 0, 0, 1.0, False, 'derivatives'),
'ls_btc': IndicatorMeta('ls_btc', 'binance',
'https://fapi.binance.com/futures/data/globalLongShortAccountRatio?symbol=BTCUSDT&period=5m&limit=1',
'parse_binance_ls', 300, 0, 0.2970, 0.0362, True, 'derivatives'),
'ls_eth': IndicatorMeta('ls_eth', 'binance',
'https://fapi.binance.com/futures/data/globalLongShortAccountRatio?symbol=ETHUSDT&period=5m&limit=1',
'parse_binance_ls', 300, 0, 0, 1.0, False, 'derivatives'),
'ls_top': IndicatorMeta('ls_top', 'binance',
'https://fapi.binance.com/futures/data/topLongShortAccountRatio?symbol=BTCUSDT&period=5m&limit=1',
'parse_binance_ls', 300, 0, 0, 1.0, False, 'derivatives'),
'taker': IndicatorMeta('taker', 'binance',
'https://fapi.binance.com/futures/data/takerlongshortRatio?symbol=BTCUSDT&period=5m&limit=1',
'parse_binance_taker', 300, 1, -0.4105, 0.0034, True, 'derivatives'),
'basis': IndicatorMeta('basis', 'binance',
'https://fapi.binance.com/fapi/v1/premiumIndex?symbol=BTCUSDT',
'parse_binance_basis', 0.5, 0, 0, 1.0, False, 'derivatives'), # 0.5s AGGRESSIVE OVERSAMPLING
'imbal_btc': IndicatorMeta('imbal_btc', 'binance',
'https://api.binance.com/api/v3/depth?symbol=BTCUSDT&limit=100',
'parse_imbal', 0.5, 0, 0, 1.0, False, 'microstructure'), # 0.5s AGGRESSIVE OVERSAMPLING
'imbal_eth': IndicatorMeta('imbal_eth', 'binance',
'https://api.binance.com/api/v3/depth?symbol=ETHUSDT&limit=100',
'parse_imbal', 0.5, 0, 0, 1.0, False, 'microstructure'), # 0.5s AGGRESSIVE OVERSAMPLING
'spread': IndicatorMeta('spread', 'binance',
'https://api.binance.com/api/v3/ticker/bookTicker?symbol=BTCUSDT',
'parse_spread', 0.5, 0, 0, 1.0, False, 'microstructure'), # 0.5s AGGRESSIVE OVERSAMPLING
# DERIBIT (rate limit: 100/10s)
# NOTE: Deribit API now requires start_timestamp and end_timestamp parameters
# URLs are built dynamically in _build_deribit_url() - these are base URLs
'dvol_btc': IndicatorMeta('dvol_btc', 'deribit',
'dvol:BTC',
'parse_deribit_dvol', 60, 1, -0.4919, 0.0002, True, 'derivatives'),
'dvol_eth': IndicatorMeta('dvol_eth', 'deribit',
'dvol:ETH',
'parse_deribit_dvol', 60, 1, -0.4246, 0.0015, True, 'derivatives'),
'fund_dbt_btc': IndicatorMeta('fund_dbt_btc', 'deribit',
'funding:BTC-PERPETUAL',
'parse_deribit_fund', 28800, 0, 0, 1.0, False, 'derivatives'),
'fund_dbt_eth': IndicatorMeta('fund_dbt_eth', 'deribit',
'funding:ETH-PERPETUAL',
'parse_deribit_fund', 28800, 0, 0, 1.0, False, 'derivatives'),
# MACRO (FRED, rate limit: 120/min)
'vix': IndicatorMeta('vix', 'fred', 'VIXCLS', 'parse_fred', 21600, 1, -0.2044, 0.27, True, 'macro'),
'dxy': IndicatorMeta('dxy', 'fred', 'DTWEXBGS', 'parse_fred', 21600, 0, 0, 1.0, False, 'macro'),
'us10y': IndicatorMeta('us10y', 'fred', 'DGS10', 'parse_fred', 21600, 0, 0, 1.0, False, 'macro'),
'sp500': IndicatorMeta('sp500', 'fred', 'SP500', 'parse_fred', 21600, 0, 0, 1.0, False, 'macro'),
'fedfunds': IndicatorMeta('fedfunds', 'fred', 'DFF', 'parse_fred', 86400, 0, 0, 1.0, False, 'macro'),
# SENTIMENT
'fng': IndicatorMeta('fng', 'alternative', 'https://api.alternative.me/fng/?limit=1',
'parse_fng', 21600, 5, -0.1923, 0.1856, True, 'sentiment'),
# ON-CHAIN (blockchain.info)
'hashrate': IndicatorMeta('hashrate', 'blockchain', 'https://blockchain.info/q/hashrate',
'parse_bc', 1800, 0, 0, 1.0, False, 'onchain'),
# DEFI (DeFi Llama)
'tvl': IndicatorMeta('tvl', 'defillama', 'https://api.llama.fi/v2/historicalChainTvl',
'parse_dl_tvl', 21600, 0, 0, 1.0, False, 'defi'),
# LIQUIDATIONS (Coinglass) — aggregate forced liquidation data
# poll_interval=3600s (hourly update), lag=1 day (same conservative default as dvol)
# lag_correlation and lag_pvalue to be measured empirically (set 0/1.0 as placeholders)
'liq_vol_24h': IndicatorMeta(
'liq_vol_24h', 'coinglass',
'https://open-api.coinglass.com/public/v2/liquidation_chart?symbol=BTC&interval=1h&limit=24',
'parse_coinglass_liq_vol', 3600, 1, 0.0, 1.0, False, 'liquidations'),
'liq_long_ratio': IndicatorMeta(
'liq_long_ratio', 'coinglass',
'https://open-api.coinglass.com/public/v2/liquidation_chart?symbol=BTC&interval=1h&limit=24',
'parse_coinglass_liq_ratio', 3600, 1, 0.0, 1.0, False, 'liquidations'),
'liq_z_score': IndicatorMeta(
'liq_z_score', 'coinglass',
'', # derived — no direct endpoint; computed in backfiller, set 0 in real-time until history accumulated
'parse_noop', 3600, 1, 0.0, 1.0, False, 'liquidations'),
'liq_percentile': IndicatorMeta(
'liq_percentile', 'coinglass',
'', # derived — same as above
'parse_noop', 3600, 1, 0.0, 1.0, False, 'liquidations'),
}
# Rate limits per provider (requests per second)
RATE_LIMITS = {
'binance': 20.0, # 1200/min
'deribit': 10.0, # 100/10s
'fred': 2.0, # 120/min
'alternative': 0.5,
'blockchain': 0.5,
'defillama': 1.0,
'coinmetrics': 0.15, # 10/min
'coinglass': 0.5, # 1 req/2s — free tier conservative limit
}
# =====================================================================
# INDICATOR STATE
# =====================================================================
@dataclass
class IndicatorState:
"""Live state for a single indicator."""
value: float = np.nan
fetched_at: float = 0.0 # monotonic time
fetched_utc: Optional[datetime] = None
success: bool = False
error: str = ""
fetch_count: int = 0
fail_count: int = 0
# History buffer for lag support
daily_history: deque = field(default_factory=lambda: deque(maxlen=10))
# =====================================================================
# PARSERS (same as external_factors_matrix.py, inlined for independence)
# =====================================================================
class Parsers:
@staticmethod
def parse_binance_funding(d):
return float(d[0]['fundingRate']) if isinstance(d, list) and d else 0.0
@staticmethod
def parse_binance_oi(d):
if isinstance(d, list) and d: return float(d[-1].get('sumOpenInterest', 0))
return float(d.get('openInterest', 0)) if isinstance(d, dict) else 0.0
@staticmethod
def parse_binance_ls(d):
return float(d[-1]['longShortRatio']) if isinstance(d, list) and d else 1.0
@staticmethod
def parse_binance_taker(d):
return float(d[-1]['buySellRatio']) if isinstance(d, list) and d else 1.0
@staticmethod
def parse_binance_basis(d):
return float(d.get('lastFundingRate', 0)) * 365 * 3 if isinstance(d, dict) else 0.0
@staticmethod
def parse_deribit_dvol(d):
if isinstance(d, dict) and 'result' in d:
r = d['result']
if isinstance(r, dict) and 'data' in r and r['data']:
return float(r['data'][-1][4]) if len(r['data'][-1]) > 4 else 0.0
return 0.0
@staticmethod
def parse_deribit_fund(d):
if isinstance(d, dict) and 'result' in d:
r = d['result']
return float(r[-1].get('interest_8h', 0)) if isinstance(r, list) and r else float(r)
return 0.0
@staticmethod
def parse_fred(d):
if isinstance(d, dict) and 'observations' in d and d['observations']:
v = d['observations'][-1].get('value', '.')
if v != '.':
try: return float(v)
except: pass
return 0.0
@staticmethod
def parse_fng(d):
return float(d['data'][0]['value']) if isinstance(d, dict) and 'data' in d and d['data'] else 50.0
@staticmethod
def parse_bc(d):
if isinstance(d, (int, float)): return float(d)
if isinstance(d, str):
try: return float(d)
except: pass
if isinstance(d, dict) and 'values' in d and d['values']:
return float(d['values'][-1].get('y', 0))
return 0.0
@staticmethod
def parse_dl_tvl(d):
if isinstance(d, list) and d:
return float(d[-1].get('tvl', 0))
return 0.0
@staticmethod
def parse_coinglass_liq_vol(d):
"""
Coinglass /public/v2/liquidation_chart response:
{
"code": "0",
"data": {
"liquidationMap": [
{"t": <unix_ms>, "longLiquidationUsd": 1234567.0, "shortLiquidationUsd": 890123.0},
...
]
}
}
Returns log10(total_usd + 1) over all bars (24h sum).
Returns 0.0 on any error.
"""
import math
try:
bars = d.get('data', {}).get('liquidationMap', [])
if not bars:
return 0.0
total = sum(
float(b.get('longLiquidationUsd', 0)) + float(b.get('shortLiquidationUsd', 0))
for b in bars
)
return math.log10(total + 1.0)
except Exception:
return 0.0
@staticmethod
def parse_coinglass_liq_ratio(d):
"""
Same endpoint as parse_coinglass_liq_vol.
Returns long_liq / (long_liq + short_liq) over all bars.
Returns 0.5 (neutral) on error or zero total.
"""
try:
bars = d.get('data', {}).get('liquidationMap', [])
if not bars:
return 0.5
long_total = sum(float(b.get('longLiquidationUsd', 0)) for b in bars)
short_total = sum(float(b.get('shortLiquidationUsd', 0)) for b in bars)
total = long_total + short_total
return long_total / total if total > 0 else 0.5
except Exception:
return 0.5
@staticmethod
def parse_imbal(d):
if isinstance(d, dict):
bv = sum(float(b[1]) for b in d.get('bids', [])[:50])
av = sum(float(a[1]) for a in d.get('asks', [])[:50])
t = bv + av
return (bv - av) / t if t > 0 else 0.0
return 0.0
@staticmethod
def parse_spread(d):
if isinstance(d, dict):
try:
b = float(d.get('bidPrice', d.get('bid', 0)))
a = float(d.get('askPrice', d.get('ask', 0)))
return (a - b) / b * 10000 if b > 0 else 0.0
except: pass
return 0.0
@staticmethod
def parse_noop(d):
"""Placeholder for derived indicators (liq_z_score, liq_percentile).
These are computed by the backfiller, not fetched in real-time."""
return 0.0
# =====================================================================
# REAL-TIME SERVICE
# =====================================================================
class RealTimeExFService:
"""
Singleton real-time external factors service.
Design principles:
- Never blocks: get_indicators() is pure memory read
- Background asyncio loop fetches on per-indicator timers
- Per-provider rate limiting via semaphores
- History buffer per indicator for lag support
- Thread-safe via lock on state dict
"""
def __init__(self, fred_api_key: str = ""):
self.fred_api_key = fred_api_key or 'c16a9cde3e3bb5bb972bb9283485f202'
self.state: Dict[str, IndicatorState] = {
name: IndicatorState() for name in INDICATORS
}
self._lock = threading.Lock()
self._running = False
self._loop = None
self._thread = None
self._semaphores: Dict[str, asyncio.Semaphore] = {}
self._session: Optional[aiohttp.ClientSession] = None
self._current_date: str = "" # for daily history rotation
# ----- Consumer API (never blocks, <1ms) -----
def get_indicators(self, apply_lag: bool = False, dual_sample: bool = True) -> Dict[str, Any]:
"""
Get indicator values with flexible lag and dual-sampling support (T and T-24h).
Returns both real-time (T) and structural lagged values (T-24h) for indicators.
Args:
apply_lag (bool): If True, the primary key 'name' will contain the lagged value.
dual_sample (bool): If True, a secondary key '{name}_lagged' will be added.
"""
with self._lock:
result = {}
staleness = {}
now = time.monotonic()
for name, meta in INDICATORS.items():
st = self.state[name]
if not st.success or np.isnan(st.value):
continue
# 1. Primary Value calculation
if apply_lag and meta.optimal_lag_days > 0:
lag = meta.optimal_lag_days
hist = list(st.daily_history)
if len(hist) >= lag:
result[name] = hist[-lag]
else:
result[name] = st.value # fallback
else:
result[name] = st.value
# 2. Sequential Dual-Sampling (always provide T and T-24h if requested)
if dual_sample and meta.optimal_lag_days > 0:
lag = meta.optimal_lag_days
hist = list(st.daily_history)
lag_key = f"{name}_lagged"
if len(hist) >= lag:
result[lag_key] = hist[-lag]
else:
result[lag_key] = st.value
if st.fetched_at > 0:
staleness[name] = now - st.fetched_at
result['_staleness'] = staleness
return result
def get_acb_indicators(self) -> Dict[str, float]:
"""Get only the ACB-critical indicators (with lags applied)."""
full = self.get_indicators(apply_lag=True)
return {k: v for k, v in full.items()
if k in ('funding_btc', 'funding_eth', 'dvol_btc', 'dvol_eth',
'fng', 'vix', 'ls_btc', 'taker',
'mcap_bc', 'fund_dbt_btc', 'oi_btc', 'fund_dbt_eth', 'addr_btc')
and isinstance(v, (int, float))}
# ----- Background fetching -----
async def _fetch_url(self, url: str, source: str) -> Optional[Any]:
"""Fetch URL with rate limiting and error handling."""
sem = self._semaphores.get(source)
if sem:
await sem.acquire()
try:
return await self._do_fetch(url)
finally:
sem.release()
# Enforce rate limit delay
delay = 1.0 / RATE_LIMITS.get(source, 1.0)
await asyncio.sleep(delay)
return await self._do_fetch(url)
async def _do_fetch(self, url: str) -> Optional[Any]:
"""Raw HTTP fetch."""
if not self._session:
return None
try:
timeout = aiohttp.ClientTimeout(total=10)
headers = {"User-Agent": "Mozilla/5.0"}
async with self._session.get(url, timeout=timeout, headers=headers) as r:
if r.status == 200:
ct = r.headers.get('Content-Type', '')
if 'json' in ct:
return await r.json()
text = await r.text()
try: return json.loads(text)
except: return text
else:
logger.warning(f"HTTP {r.status} for {url[:60]}")
except asyncio.TimeoutError:
logger.debug(f"Timeout: {url[:60]}")
except Exception as e:
logger.debug(f"Fetch error: {e}")
return None
def _build_fred_url(self, series_id: str) -> str:
return (f"https://api.stlouisfed.org/fred/series/observations?"
f"series_id={series_id}&api_key={self.fred_api_key}"
f"&file_type=json&sort_order=desc&limit=1")
def _build_deribit_url(self, meta_url: str) -> str:
"""
Build Deribit URL with required timestamps.
meta_url format: 'dvol:BTC' or 'funding:BTC-PERPETUAL'
FIXED 2026-03-22:
- funding: use get_funding_rate_history (returns list with interest_8h per period).
get_funding_rate_value was the agent's broken fix — it returns a scalar daily
average ~100x smaller than the per-8h snapshot stored in NPZ ground truth.
Parity test (test_deribit_api_parity.py --indicators fund) confirms 8/8 PASS.
- dvol: use resolution=3600 (hourly candles) to match backfill in
external_factors_matrix.py; resolution=60 (1-min) was wrong.
Both parsers (parse_deribit_fund, parse_deribit_dvol) take the last entry [-1].
"""
import time
now = int(time.time())
if meta_url.startswith('dvol:'):
currency = meta_url.split(':')[1]
# Last 4 hours at hourly resolution — parser takes data[-1][4] (close)
start_ts = (now - 14400) * 1000
end_ts = now * 1000
return (f"https://www.deribit.com/api/v2/public/get_volatility_index_data?"
f"currency={currency}&resolution=3600"
f"&start_timestamp={start_ts}&end_timestamp={end_ts}")
elif meta_url.startswith('funding:'):
instrument = meta_url.split(':')[1]
# Last 4 hours — get_funding_rate_history returns list; parser takes r[-1]['interest_8h']
start_ts = (now - 14400) * 1000
end_ts = now * 1000
return (f"https://www.deribit.com/api/v2/public/get_funding_rate_history?"
f"instrument_name={instrument}"
f"&start_timestamp={start_ts}&end_timestamp={end_ts}")
return meta_url # Fallback
async def _fetch_indicator(self, name: str, meta: IndicatorMeta):
"""Fetch and parse a single indicator."""
# Build URL based on source
if meta.source == 'fred':
url = self._build_fred_url(meta.url)
elif meta.source == 'deribit':
url = self._build_deribit_url(meta.url)
else:
url = meta.url
# Fetch
data = await self._fetch_url(url, meta.source)
if data is None:
with self._lock:
self.state[name].fail_count += 1
self.state[name].error = "fetch_failed"
return
# Parse
parser = getattr(Parsers, meta.parser, None)
if parser is None:
logger.error(f"No parser: {meta.parser}")
return
try:
value = parser(data)
if value == 0.0 and 'imbal' not in name:
# Most parsers return 0.0 on failure
with self._lock:
self.state[name].fail_count += 1
self.state[name].error = "zero_value"
return
with self._lock:
self.state[name].value = value
self.state[name].success = True
self.state[name].fetched_at = time.monotonic()
self.state[name].fetched_utc = datetime.now(timezone.utc)
self.state[name].fetch_count += 1
self.state[name].error = ""
except Exception as e:
with self._lock:
self.state[name].fail_count += 1
self.state[name].error = str(e)
async def _indicator_loop(self, name: str, meta: IndicatorMeta):
"""Continuous poll loop for one indicator with drift-corrected timing."""
while self._running:
start_time = time.monotonic()
try:
await self._fetch_indicator(name, meta)
except Exception as e:
logger.error(f"Loop error {name}: {e}")
# Best Practice: Account for fetch duration to prevent cumulative drift
elapsed = time.monotonic() - start_time
sleep_time = max(0.1, meta.poll_interval_s - elapsed)
await asyncio.sleep(sleep_time)
async def _daily_rotation(self):
"""At midnight UTC, snapshot current values into daily history."""
while self._running:
now = datetime.now(timezone.utc)
date_str = now.strftime('%Y-%m-%d')
if date_str != self._current_date:
with self._lock:
for name, st in self.state.items():
if st.success and not np.isnan(st.value):
st.daily_history.append(st.value)
self._current_date = date_str
logger.info(f"Daily rotation: {date_str}")
await asyncio.sleep(60) # check every minute
async def _run(self):
"""Main async loop."""
connector = aiohttp.TCPConnector(limit=30, ttl_dns_cache=300)
self._session = aiohttp.ClientSession(connector=connector)
# Create rate limit semaphores
for source, rate in RATE_LIMITS.items():
max_concurrent = max(1, int(rate * 2))
self._semaphores[source] = asyncio.Semaphore(max_concurrent)
# Start per-indicator loops
tasks = []
for name, meta in INDICATORS.items():
tasks.append(asyncio.create_task(self._indicator_loop(name, meta)))
# Start daily rotation
tasks.append(asyncio.create_task(self._daily_rotation()))
logger.info(f"Started {len(INDICATORS)} indicator loops")
try:
await asyncio.gather(*tasks)
finally:
await self._session.close()
def start(self):
"""Start background thread with asyncio loop."""
if self._running:
return
self._running = True
def _thread_target():
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._loop.run_until_complete(self._run())
self._thread = threading.Thread(target=_thread_target, daemon=True)
self._thread.start()
logger.info("RealTimeExFService started")
def stop(self):
"""Stop the service."""
self._running = False
if self._thread:
self._thread.join(timeout=5)
logger.info("RealTimeExFService stopped")
def status(self) -> Dict[str, Any]:
"""Service health status."""
with self._lock:
total = len(self.state)
ok = sum(1 for s in self.state.values() if s.success)
acb_ok = sum(1 for name in ('funding_btc', 'funding_eth', 'dvol_btc',
'dvol_eth', 'fng', 'vix', 'ls_btc', 'taker')
if self.state.get(name, IndicatorState()).success)
return {
'indicators_ok': ok,
'indicators_total': total,
'acb_indicators_ok': acb_ok,
'acb_indicators_total': 8,
'details': {name: {'value': s.value, 'success': s.success,
'staleness_s': time.monotonic() - s.fetched_at if s.fetched_at > 0 else -1,
'fetches': s.fetch_count, 'fails': s.fail_count}
for name, s in self.state.items()},
}
# =====================================================================
# ACB v3 - LAG-AWARE (drop-in replacement for v2)
# =====================================================================
def calculate_adaptive_cut_v3(ext_factors: dict, config: dict = None) -> tuple:
"""
ACB v3: Same logic as v2 but expects lag-adjusted indicator values.
The lag adjustment happens in RealTimeExFService.get_acb_indicators().
This function is identical to v2 in logic - the innovation is in the
data pipeline feeding it lagged values.
For backtest: manually construct ext_factors with lagged values.
"""
from dolphin_paper_trade_adaptive_cb_v2 import ACBV2_CONFIG as DEFAULT_CONFIG
config = config or DEFAULT_CONFIG
if not ext_factors or not config.get('enabled', True):
return config.get('base_cut', 0.30), 0, 0, {'status': 'disabled'}
signals = 0
severity = 0
details = {}
# Signal 1: Funding (bearish confirmation)
funding_btc = ext_factors.get('funding_btc', 0)
if funding_btc < config['thresholds']['funding_btc_very_bearish']:
signals += 1; severity += 2
details['funding'] = f'{funding_btc:.6f} (very bearish)'
elif funding_btc < config['thresholds']['funding_btc_bearish']:
signals += 1; severity += 1
details['funding'] = f'{funding_btc:.6f} (bearish)'
else:
details['funding'] = f'{funding_btc:.6f} (neutral)'
# Signal 2: DVOL (volatility confirmation)
dvol_btc = ext_factors.get('dvol_btc', 50)
if dvol_btc > config['thresholds']['dvol_extreme']:
signals += 1; severity += 2
details['dvol'] = f'{dvol_btc:.1f} (extreme)'
elif dvol_btc > config['thresholds']['dvol_elevated']:
signals += 1; severity += 1
details['dvol'] = f'{dvol_btc:.1f} (elevated)'
else:
details['dvol'] = f'{dvol_btc:.1f} (normal)'
# Signal 3: FNG (only if confirmed by funding/DVOL)
fng = ext_factors.get('fng', 50)
funding_bearish = funding_btc < 0
dvol_elevated = dvol_btc > 55
if fng < config['thresholds']['fng_extreme_fear'] and (funding_bearish or dvol_elevated):
signals += 1; severity += 1
details['fng'] = f'{fng:.1f} (extreme fear, confirmed)'
elif fng < config['thresholds']['fng_fear'] and (funding_bearish or dvol_elevated):
signals += 0.5; severity += 0.5
details['fng'] = f'{fng:.1f} (fear, confirmed)'
else:
details['fng'] = f'{fng:.1f} (neutral or unconfirmed)'
# Signal 4: Taker ratio (strongest predictor)
taker = ext_factors.get('taker', 1.0)
if taker < config['thresholds']['taker_selling']:
signals += 1; severity += 2
details['taker'] = f'{taker:.3f} (heavy selling)'
elif taker < config['thresholds']['taker_mild_selling']:
signals += 0.5; severity += 1
details['taker'] = f'{taker:.3f} (mild selling)'
else:
details['taker'] = f'{taker:.3f} (neutral)'
# Cut calculation (identical to v2)
if signals >= 3 and severity >= 5:
cut = 0.75
elif signals >= 3:
cut = 0.65
elif signals >= 2 and severity >= 3:
cut = 0.55
elif signals >= 2:
cut = 0.45
elif signals >= 1:
cut = 0.30
else:
cut = 0.0
details['signals'] = signals
details['severity'] = severity
details['version'] = 'v3_lag_aware'
return cut, signals, severity, details
# =====================================================================
# ACB v4 - EXPANDED 10-INDICATOR ENGINE
# =====================================================================
# Empirically validated thresholds for new v4 indicators
ACB_V4_THRESHOLDS = {
'funding_eth': -3.105e-05,
'mcap_bc': 1.361e+12,
'fund_dbt_btc': -2.426e-06,
'oi_btc': 7.955e+04,
'fund_dbt_eth': -6.858e-06,
'addr_btc': 7.028e+05,
}
def calculate_adaptive_cut_v4(ext_factors: dict, config: dict = None) -> tuple:
"""
ACB v4: Expanded engine evaluating 10 empirically validated indicators.
Base cut threshold and math derived from 54-day exhaustive backtest
(+15.00% ROI, 6.68% DD).
"""
from dolphin_paper_trade_adaptive_cb_v2 import ACBV2_CONFIG as DEFAULT_CONFIG
config = config or DEFAULT_CONFIG
if not ext_factors or not config.get('enabled', True):
return config.get('base_cut', 0.30), 0, 0, {'status': 'disabled'}
# Use baseline logic for the core 4 signals
cut, signals, severity, details = calculate_adaptive_cut_v3(ext_factors, config)
# -------------------------------------------------------------
# META-ADAPTIVE OVERRIDE OR FALLBACK TO STATIC v4
# -------------------------------------------------------------
try:
from realtime_exf_service import _get_active_meta_thresholds
active_thresh = _get_active_meta_thresholds()
except Exception:
active_thresh = None
if active_thresh:
# Dynamic processing of strictly proved meta thresholds
details['version'] = 'v4_meta_adaptive'
for key, limits in active_thresh.items():
if key in ('funding_btc', 'dvol_btc', 'fng', 'taker'):
continue # Handled by v3
val = ext_factors.get(key, np.nan)
if np.isnan(val): continue
triggered = False
if limits['direction'] == '<' and val < limits['threshold']:
triggered = True
elif limits['direction'] == '>' and val > limits['threshold']:
triggered = True
if triggered:
signals += 0.5; severity += 1
details[key] = f"{val:.4g} (meta {limits['direction']} {limits['threshold']:.4g})"
else:
# Fallback 10-indicator engine statically verified on 2026-02-27
details['version'] = 'v4_expanded_static'
val = ext_factors.get('funding_eth', np.nan)
if not np.isnan(val) and val < ACB_V4_THRESHOLDS['funding_eth']:
signals += 0.5; severity += 1
details['funding_eth'] = f"{val:.6f} (< {ACB_V4_THRESHOLDS['funding_eth']})"
val = ext_factors.get('mcap_bc', np.nan)
if not np.isnan(val) and val < ACB_V4_THRESHOLDS['mcap_bc']:
signals += 0.5; severity += 1
details['mcap_bc'] = f"{val:.2e} (< {ACB_V4_THRESHOLDS['mcap_bc']:.2e})"
val = ext_factors.get('fund_dbt_btc', np.nan)
if not np.isnan(val) and val < ACB_V4_THRESHOLDS['fund_dbt_btc']:
signals += 0.5; severity += 1
details['fund_dbt_btc'] = f"{val:.2e} (< {ACB_V4_THRESHOLDS['fund_dbt_btc']:.2e})"
val = ext_factors.get('oi_btc', np.nan)
if not np.isnan(val) and val < ACB_V4_THRESHOLDS['oi_btc']:
signals += 0.5; severity += 1
details['oi_btc'] = f"{val:.1f} (< {ACB_V4_THRESHOLDS['oi_btc']:.1f})"
val = ext_factors.get('fund_dbt_eth', np.nan)
if not np.isnan(val) and val < ACB_V4_THRESHOLDS['fund_dbt_eth']:
signals += 0.5; severity += 1
details['fund_dbt_eth'] = f"{val:.2e} (< {ACB_V4_THRESHOLDS['fund_dbt_eth']:.2e})"
val = ext_factors.get('addr_btc', np.nan)
if not np.isnan(val) and val > ACB_V4_THRESHOLDS['addr_btc']:
signals += 0.5; severity += 1
details['addr_btc'] = f"{val:.1f} (> {ACB_V4_THRESHOLDS['addr_btc']:.1f})"
# Recalculate cut with updated signals and severity
if signals >= 3 and severity >= 5:
cut = 0.75
elif signals >= 3:
cut = 0.65
elif signals >= 2 and severity >= 3:
cut = 0.55
elif signals >= 2:
cut = 0.45
elif signals >= 1:
cut = 0.30
else:
cut = 0.0
details['total_signals_v4'] = signals
details['total_severity_v4'] = severity
return cut, signals, severity, details
# =====================================================================
# NPZ + ARROW DUAL WRITER
# =====================================================================
class DualWriter:
"""Write indicator data in both NPZ and Arrow formats."""
def __init__(self):
self._has_pyarrow = False
try:
import pyarrow as pa
self._pa = pa
self._has_pyarrow = True
except ImportError:
pass
def write(self, indicators: Dict[str, Any], scan_path: Path,
scan_number: int = 0):
"""Write both NPZ and Arrow files alongside the scan."""
# Remove metadata keys
clean = {k: v for k, v in indicators.items()
if not k.startswith('_') and isinstance(v, (int, float))}
# NPZ (legacy format)
self._write_npz(clean, scan_path, scan_number)
# Arrow (new format)
if self._has_pyarrow:
self._write_arrow(clean, scan_path, scan_number)
def _write_npz(self, indicators, scan_path, scan_number):
names = sorted(INDICATORS.keys())
api_indicators = np.array([indicators.get(n, np.nan) for n in names])
api_success = np.array([not np.isnan(indicators.get(n, np.nan)) for n in names])
api_names = np.array(names, dtype='U32')
out_path = scan_path.parent / f"{scan_path.stem}__Indicators.npz"
np.savez_compressed(out_path,
api_indicators=api_indicators,
api_success=api_success,
api_names=api_names,
api_success_rate=np.array([np.nanmean(api_success)]),
timestamp=np.array([datetime.now(timezone.utc).isoformat()], dtype='U64'),
scan_number=np.array([scan_number]),
)
def _write_arrow(self, indicators, scan_path, scan_number):
pa = self._pa
fields = [
pa.field('timestamp_ns', pa.int64()),
pa.field('scan_number', pa.int32()),
]
values = {
'timestamp_ns': [int(datetime.now(timezone.utc).timestamp() * 1e9)],
'scan_number': [scan_number],
}
for name in sorted(INDICATORS.keys()):
fields.append(pa.field(name, pa.float64()))
values[name] = [indicators.get(name, np.nan)]
schema = pa.schema(fields)
table = pa.table(values, schema=schema)
out_path = scan_path.parent / f"{scan_path.stem}__Indicators.arrow"
with pa.ipc.new_file(str(out_path), schema) as writer:
writer.write_table(table)
# =====================================================================
# CONVENIENCE: Load from NPZ with lag support (for backtesting)
# =====================================================================
# =====================================================================
# LAG CONFIGURATIONS
# =====================================================================
# ROBUST DEFAULT: Uniform lag=1 for all indicators.
# Validated: +3.10% ROI, -2.02% DD vs lag=0 (54-day backtest).
# Zero overfitting risk (no per-indicator optimization).
# Scientifically justified: "yesterday's indicators predict today's market"
ROBUST_LAGS = {
'funding_btc': 1,
'funding_eth': 1,
'dvol_btc': 1,
'dvol_eth': 1,
'fng': 1,
'vix': 1,
'ls_btc': 1,
'taker': 1,
}
# EXPERIMENTAL: Per-indicator optimal lags from correlation analysis.
# Validated: +3.98% ROI, -2.93% DD vs lag=0 (54-day backtest).
# WARNING: Overfitting risk at 6.8 days/parameter. Only 5/8 significant.
# DO NOT USE until 80+ days of data available for re-validation.
# TODO: Re-run lag_correlation_analysis with 80+ days, update if confirmed.
EXPERIMENTAL_LAGS = {
'funding_btc': 5, # r=+0.39, p=0.006 (slow propagation - 5 days!)
'funding_eth': 3, # r=+0.20, p=0.154 (NOT significant)
'dvol_btc': 1, # r=-0.49, p=0.0002 (STRONGEST - overnight digest)
'dvol_eth': 1, # r=-0.42, p=0.002
'fng': 5, # r=-0.19, p=0.186 (NOT significant)
'vix': 1, # r=-0.20, p=0.270 (NOT significant)
'ls_btc': 0, # r=+0.30, p=0.036 (immediate - only lag=0 indicator)
'taker': 1, # r=-0.41, p=0.003 (overnight digest)
}
# CONSERVATIVE: Only statistically verified strong deviations from lag=1 for core indicators.
# Currently identical to V3 ROBUST but with funding_btc=5 and ls_btc=0
CONSERVATIVE_LAGS = ROBUST_LAGS.copy()
CONSERVATIVE_LAGS.update({
'funding_btc': 5,
'ls_btc': 0,
})
# V4: Combines robust baseline with 6 new statically proven indicators
V4_LAGS = ROBUST_LAGS.copy()
V4_LAGS.update({
'funding_eth': 3,
'mcap_bc': 1,
'fund_dbt_btc': 0,
'oi_btc': 0,
'fund_dbt_eth': 1,
'addr_btc': 3,
})
# Active configuration - use V4 by default given superior empirical results (+15.00% ROI, 6.68% DD)
OPTIMAL_LAGS = V4_LAGS
# =====================================================================
# META-ADAPTIVE RUNTIME
# =====================================================================
def _get_active_lags() -> dict:
"""Return lags: dynamically from meta-layer if available, else fallback V4."""
try:
from meta_adaptive_optimizer import get_current_meta_config
meta = get_current_meta_config()
if meta and 'lags' in meta:
return meta['lags']
except Exception:
pass
return OPTIMAL_LAGS
def _get_active_meta_thresholds() -> dict:
"""Return thresholds: dynamically from meta-layer if available, else None."""
try:
from meta_adaptive_optimizer import get_current_meta_config
meta = get_current_meta_config()
if meta and 'thresholds' in meta:
return meta['thresholds']
except Exception:
pass
return None
# TODO: When switching to EXPERIMENTAL_LAGS, also update IndicatorMeta.optimal_lag_days
def load_external_factors_lagged(date_str: str, all_daily_vals: Dict[str, Dict],
sorted_dates: List[str]) -> dict:
"""
Load external factors with per-indicator optimal lag applied.
Dynamically respects the Meta-Adaptive Layer configuration.
Args:
date_str: Target date
all_daily_vals: {date_str: {indicator_name: value}} for all dates
sorted_dates: Chronologically sorted list of all dates
"""
if date_str not in sorted_dates:
return {}
idx = sorted_dates.index(date_str)
result = {}
active_lags = _get_active_lags()
for name, lag in active_lags.items():
src_idx = idx - lag
if src_idx >= 0:
src_date = sorted_dates[src_idx]
val = all_daily_vals.get(src_date, {}).get(name)
if val is not None:
result[name] = val
return result