chore: safety snapshot 2026-03-05 — HCM infrastructure before 2y klines experiment

Captures critical infrastructure surrounding the nautilus_dolphin core package:
- dolphin_vbt_real.py: VBT vectorized backtest engine (6008 lines)
- dolphin_paper_trade_adaptive_cb_v2.py: champion runner (champion_5x_f20)
- _update_vbt_cache.py / update_VBT_parquet_cache.bat: cache builder
- external_factors/: ExF system (all 85 indicator fetchers + NPZ cache)
- mc_forewarning_qlabs_fork/: QLabs-enhanced MC-Forewarner research fork
- DATA_LOCATIONS.md: source-of-truth path registry
- .gitignore: excludes vbt_cache*, backfilled_data, .venv, models, etc.

Note: nautilus_dolphin/ has own git repo (inner) — safety snapshot committed there separately.
Champion state: WR=49.3%, ROI=+44.89%, PF=1.123, DD=14.95%, Sharpe=2.50 (55d, full-stack, abs_max_lev=6.0).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-05 23:51:30 +01:00
commit 351ce2044d
38 changed files with 21890 additions and 0 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,430 @@
Now before following, if the new esoteric factor engine is modeled after the established factor engine, it should have fallback feeling, backfilling, backfilling capabilities. Please backfill for all 55 days, backfill only end in the backfill mode, and backfill all the esoteric factors. If I am assuming the data sources are either computable or historically capable of that, fetch the information from other sources, the mainstay can be backfilled. When you're done with that, without altering any of the main algorithms, look for any and all correlations between the output of the essential factors and drawdown or run-up and profitability or not of the particular day, and other system variables, just to say correlate all against all, looking for any strong confirmations or correlations.
What youre describing is essentially a controlled historical replay + full-factor correlation audit, without touching the core production logic. Ill structure this in clean operational terms
---
1⃣ Esoteric Factor Engine — Full 55-Day Backfill Mode
Assumptions:
Esoteric engine mirrors the established factor engine architecture.
All data sources are either:
Deterministically computable from price/volume history, or
Historically retrievable from external APIs or stored archives.
No modification to factor formulas.
---
A. Backfill Design Principles
Mode: BACKFILL_ONLY_END
This means:
Replay day-by-day from T-55 → T-1.
Compute all esoteric factors exactly as if live.
Write values to historical store.
No strategy decisions executed.
No incremental learning updates.
No state mutation except factor history buffers.
---
B. Fallback Hierarchy
For each esoteric factor:
1. Primary historical source
(official data store / archive)
2. Secondary API historical endpoint
3. Deterministic reconstruction
Recompute from base OHLCV
Reconstruct state from rolling window
4. Synthetic proxy fallback
Only if mathematically derivable
Must be flagged as fallback_level = 3
You log fallback level for each factor/day.
---
C. Backfill Procedure
Step 1 — Freeze Production State
Snapshot:
Rolling buffers
Latent embeddings (if any)
Volatility states
Regime states
Step 2 — Initialize Clean Historical Buffers
Clear only:
Esoteric factor buffers
Derived rolling statistics
Keep:
Core algorithm logic intact.
---
Step 3 — Replay Loop (55 days)
For each day d from T-55 to T-1:
1. Load OHLCV
2. Fetch or reconstruct all auxiliary inputs
3. Compute:
All esoteric factors
All intermediate states
Any smoothed or filtered outputs
Any latent representations
4. Append to:
esoteric_factor_store[d]
factor_output_store[d]
system_state_snapshot[d]
No trading logic triggered.
---
Step 4 — Integrity Check
After replay:
Check no NaNs
Check window completeness
Check rolling windows properly warm
Confirm fallback levels < threshold (ideally mostly 0/1)
---
2 Correlation Audit All Against All
Now comes the analytical part.
We compute correlations between:
A. Target Variables
Per day:
Max intraday drawdown
Max intraday run-up
Close-to-close return
Strategy P&L
Win/Loss (binary)
Profitability magnitude
Volatility
Spread
Liquidity proxy
Regime label
Latent manifold distortion (if present)
Drift velocity (if you implemented that earlier idea)
---
B. Factor Groups
1. Core factors
2. Esoteric factors
3. Combined outputs
4. Latent embeddings (if available)
5. Signal strength
6. Conviction metrics
7. Confidence weighting
8. Any internal risk throttles
---
C. Correlation Types to Compute
You want more than Pearson.
Compute:
1. Pearson correlation
2. Spearman rank correlation
3. Kendall tau
4. Mutual information
5. Distance correlation
6. Rolling correlation (7, 14, 30 days)
7. Lagged correlation 1, ±2, ±3 days)
---
D. Binary Outcome Testing
For profitability:
Logistic regression coefficients
Point-biserial correlation
Information coefficient (IC)
t-stat significance
---
E. Cross-Correlation Matrix
You compute:
corr_matrix = corr(all_factors all_targets)
Then:
Extract |corr| > 0.6
Flag p < 0.05
Flag stable correlations across rolling windows
Flag correlations that persist across fallback levels
---
3 What Youre Actually Looking For
This audit answers:
1. Do esoteric factors anticipate drawdown?
2. Do they amplify run-up?
3. Are they redundant with core factors?
4. Are they orthogonal alpha?
5. Do they correlate only in stressed regimes?
6. Do they degrade performance in low-vol days?
7. Do latent geometry distortions align with profitability?
---
4 Interpretation Layer
You classify correlations into:
Strong Confirmations
Stable across windows
Significant
Not present in core-only model
Conditional Correlations
Appear only in high volatility
Appear only in drawdown clusters
Spurious / Structural
Correlate due to shared base data
Dangerous
Correlate negatively with profitability
Increase drawdown magnitude
---
5 Deliverables You Should Generate
1. Heatmap of full correlation matrix
2. Ranked factor impact table
3. Stability score per factor
4. Redundancy map (clustered)
5. Regime-conditional breakdown
6. Factor drawdown predictive ranking
7. Factor run-up predictive ranking
---
6 Critical Warning
Do NOT:
Change algorithm weights.
Remove factors.
Normalize differently.
Retrain anything.
This is purely diagnostic.
---
7 What This Tells You Strategically
If strong correlation emerges between:
Esoteric manifold distortion and drawdown
youve built a stress sensor.
If strong correlation emerges between:
Drift velocity and next-day profitability
you have regime anticipation.
If esoteric factors are mostly redundant
compress the engine.
If orthogonal and stable
youve added real signal dept

View File

@@ -0,0 +1,466 @@
#!/usr/bin/env python3
"""
DOLPHIN BACKFILL RUNNER v2.0
============================
Spiders DOLPHIN scan directories, enriches with external factors matrix.
INDICATOR SOURCES:
1. API_HISTORICAL: Fetched with scan timestamp (CoinMetrics, FRED, DeFi Llama, etc.)
2. SCAN_DERIVED: Computed from scan's market_prices, tracking_data, per_asset_signals
3. UNAVAILABLE: No historical API AND cannot compute from scan → NaN
Output: {original_name}__Indicators.npz (sorts alphabetically next to source)
Author: HJ / Claude
Version: 2.0.0
"""
import os
import sys
import json
import numpy as np
import asyncio
import aiohttp
from pathlib import Path
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Any, Set
import logging
import time
import argparse
# Import external factors module
from external_factors_matrix import (
ExternalFactorsFetcher, Config, INDICATORS, N_INDICATORS,
HistoricalSupport, Stationarity, Category
)
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
# =============================================================================
# INDICATOR SOURCE CLASSIFICATION
# =============================================================================
class IndicatorSource:
"""Classifies each indicator by how it can be obtained for backfill"""
# Indicators that HAVE historical API support (fetch with timestamp)
API_HISTORICAL: Set[int] = set()
# Indicators that are UNAVAILABLE (no history, can't derive from scan)
UNAVAILABLE: Set[int] = set()
@classmethod
def classify(cls):
"""Classify all indicators by their backfill source"""
for ind in INDICATORS:
if ind.historical in [HistoricalSupport.FULL, HistoricalSupport.PARTIAL]:
cls.API_HISTORICAL.add(ind.id)
else:
cls.UNAVAILABLE.add(ind.id)
logger.info(f"Indicator sources: API_HISTORICAL={len(cls.API_HISTORICAL)}, "
f"UNAVAILABLE={len(cls.UNAVAILABLE)}")
@classmethod
def get_unavailable_names(cls) -> List[str]:
return [INDICATORS[i-1].name for i in sorted(cls.UNAVAILABLE)]
# Initialize classification
IndicatorSource.classify()
# =============================================================================
# CONFIGURATION
# =============================================================================
@dataclass
class BackfillConfig:
scan_dir: Path(r"C:\Users\Lenovo\Documents\- Dolphin NG HD (NG3)\correlation_arb512\eigenvalues")
output_dir: Optional[str] = None
skip_existing: bool = True
dry_run: bool = False
fred_api_key: str = ""
rate_limit_delay: float = 0.5
verbose: bool = False
# =============================================================================
# SCAN DATA
# =============================================================================
@dataclass
class ScanData:
path: Path
scan_number: int
timestamp: datetime
market_prices: Dict[str, float]
windows: Dict[str, Dict]
@property
def n_assets(self) -> int:
return len(self.market_prices)
@property
def symbols(self) -> List[str]:
return sorted(self.market_prices.keys())
def get_tracking(self, window: str) -> Dict:
return self.windows.get(window, {}).get('tracking_data', {})
def get_regime(self, window: str) -> Dict:
return self.windows.get(window, {}).get('regime_signals', {})
def get_asset_signals(self, window: str) -> Dict:
return self.windows.get(window, {}).get('per_asset_signals', {})
# =============================================================================
# INDICATORS FROM SCAN DATA
# =============================================================================
WINDOWS = ['50', '150', '300', '750']
# Global scan-derived indicators (eigenvalue-based, from tracking_data/regime_signals)
SCAN_GLOBAL_INDICATORS = [
# Lambda max per window
*[(f"lambda_max_w{w}", f"Lambda max window {w}") for w in WINDOWS],
*[(f"lambda_min_w{w}", f"Lambda min window {w}") for w in WINDOWS],
*[(f"lambda_vel_w{w}", f"Lambda velocity window {w}") for w in WINDOWS],
*[(f"lambda_acc_w{w}", f"Lambda acceleration window {w}") for w in WINDOWS],
*[(f"eigrot_max_w{w}", f"Eigenvector rotation window {w}") for w in WINDOWS],
*[(f"eiggap_w{w}", f"Eigenvalue gap window {w}") for w in WINDOWS],
*[(f"instab_w{w}", f"Instability window {w}") for w in WINDOWS],
*[(f"transp_w{w}", f"Transition prob window {w}") for w in WINDOWS],
*[(f"coher_w{w}", f"Coherence window {w}") for w in WINDOWS],
# Aggregates
("lambda_max_mean", "Mean lambda max"),
("lambda_max_std", "Std lambda max"),
("instab_mean", "Mean instability"),
("instab_max", "Max instability"),
("coher_mean", "Mean coherence"),
("coher_min", "Min coherence"),
("coher_trend", "Coherence trend (w750-w50)"),
# From prices
("n_assets", "Number of assets"),
("price_dispersion", "Log price dispersion"),
]
N_SCAN_GLOBAL = len(SCAN_GLOBAL_INDICATORS)
# Per-asset indicators
PER_ASSET_INDICATORS = [
("price", "Price"),
("log_price", "Log price"),
("price_rank", "Price percentile"),
("price_btc", "Price / BTC"),
("price_eth", "Price / ETH"),
*[(f"align_w{w}", f"Alignment w{w}") for w in WINDOWS],
*[(f"decouple_w{w}", f"Decoupling w{w}") for w in WINDOWS],
*[(f"anomaly_w{w}", f"Anomaly w{w}") for w in WINDOWS],
*[(f"eigvec_w{w}", f"Eigenvector w{w}") for w in WINDOWS],
("align_mean", "Mean alignment"),
("align_std", "Alignment std"),
("anomaly_max", "Max anomaly"),
("decouple_max", "Max |decoupling|"),
]
N_PER_ASSET = len(PER_ASSET_INDICATORS)
# =============================================================================
# PROCESSOR
# =============================================================================
class ScanProcessor:
def __init__(self, config: BackfillConfig):
self.config = config
self.fetcher = ExternalFactorsFetcher(Config(fred_api_key=config.fred_api_key))
def load_scan(self, path: Path) -> Optional[ScanData]:
try:
with open(path, 'r') as f:
data = json.load(f)
ts_str = data.get('timestamp', '')
try:
timestamp = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
if timestamp.tzinfo is None:
timestamp = timestamp.replace(tzinfo=timezone.utc)
except:
timestamp = datetime.now(timezone.utc)
return ScanData(
path=path,
scan_number=data.get('scan_number', 0),
timestamp=timestamp,
market_prices=data.get('market_prices', {}),
windows=data.get('windows', {})
)
except Exception as e:
logger.error(f"Load failed {path}: {e}")
return None
async def fetch_api_indicators(self, timestamp: datetime) -> Tuple[np.ndarray, np.ndarray]:
"""Fetch indicators with historical API support"""
try:
result = await self.fetcher.fetch_all(target_date=timestamp)
matrix = result['matrix']
success = np.array([
result['details'].get(i+1, {}).get('success', False)
for i in range(N_INDICATORS)
])
# Mark non-historical indicators as NaN
for i in range(N_INDICATORS):
if (i+1) not in IndicatorSource.API_HISTORICAL:
success[i] = False
matrix[i] = np.nan
return matrix, success
except Exception as e:
logger.warning(f"API fetch failed: {e}")
return np.full(N_INDICATORS, np.nan), np.zeros(N_INDICATORS, dtype=bool)
def compute_scan_global(self, scan: ScanData) -> np.ndarray:
"""Compute global indicators from scan's tracking_data and regime_signals"""
values = []
# Per-window metrics
for w in WINDOWS:
values.append(scan.get_tracking(w).get('lambda_max', np.nan))
for w in WINDOWS:
values.append(scan.get_tracking(w).get('lambda_min', np.nan))
for w in WINDOWS:
values.append(scan.get_tracking(w).get('lambda_max_velocity', np.nan))
for w in WINDOWS:
values.append(scan.get_tracking(w).get('lambda_max_acceleration', np.nan))
for w in WINDOWS:
values.append(scan.get_tracking(w).get('eigenvector_rotation_max', np.nan))
for w in WINDOWS:
values.append(scan.get_tracking(w).get('eigenvalue_gap', np.nan))
for w in WINDOWS:
values.append(scan.get_regime(w).get('instability_score', np.nan))
for w in WINDOWS:
values.append(scan.get_regime(w).get('regime_transition_probability', np.nan))
for w in WINDOWS:
values.append(scan.get_regime(w).get('market_coherence', np.nan))
# Aggregates
lmax = [scan.get_tracking(w).get('lambda_max', np.nan) for w in WINDOWS]
values.append(np.nanmean(lmax))
values.append(np.nanstd(lmax))
instab = [scan.get_regime(w).get('instability_score', np.nan) for w in WINDOWS]
values.append(np.nanmean(instab))
values.append(np.nanmax(instab))
coher = [scan.get_regime(w).get('market_coherence', np.nan) for w in WINDOWS]
values.append(np.nanmean(coher))
values.append(np.nanmin(coher))
values.append(coher[3] - coher[0] if not np.isnan(coher[3]) and not np.isnan(coher[0]) else np.nan)
# From prices
prices = np.array(list(scan.market_prices.values())) if scan.market_prices else np.array([])
values.append(len(prices))
values.append(np.std(np.log(np.maximum(prices, 1e-10))) if len(prices) > 0 else np.nan)
return np.array(values)
def compute_per_asset(self, scan: ScanData) -> Tuple[np.ndarray, List[str]]:
"""Compute per-asset indicator matrix"""
symbols = scan.symbols
n = len(symbols)
if n == 0:
return np.zeros((0, N_PER_ASSET)), []
matrix = np.zeros((n, N_PER_ASSET))
prices = np.array([scan.market_prices[s] for s in symbols])
btc_p = scan.market_prices.get('BTC', scan.market_prices.get('BTCUSDT', np.nan))
eth_p = scan.market_prices.get('ETH', scan.market_prices.get('ETHUSDT', np.nan))
col = 0
matrix[:, col] = prices; col += 1
matrix[:, col] = np.log(np.maximum(prices, 1e-10)); col += 1
matrix[:, col] = np.argsort(np.argsort(prices)) / n; col += 1
matrix[:, col] = prices / btc_p if btc_p > 0 else np.nan; col += 1
matrix[:, col] = prices / eth_p if eth_p > 0 else np.nan; col += 1
# Per-window signals
for metric in ['market_alignment', 'decoupling_velocity', 'anomaly_score', 'eigenvector_component']:
for w in WINDOWS:
sigs = scan.get_asset_signals(w)
for i, sym in enumerate(symbols):
matrix[i, col] = sigs.get(sym, {}).get(metric, np.nan)
col += 1
# Aggregates
align_cols = list(range(5, 9))
matrix[:, col] = np.nanmean(matrix[:, align_cols], axis=1); col += 1
matrix[:, col] = np.nanstd(matrix[:, align_cols], axis=1); col += 1
anomaly_cols = list(range(13, 17))
matrix[:, col] = np.nanmax(matrix[:, anomaly_cols], axis=1); col += 1
decouple_cols = list(range(9, 13))
matrix[:, col] = np.nanmax(np.abs(matrix[:, decouple_cols]), axis=1); col += 1
return matrix, symbols
async def process(self, path: Path) -> Optional[Dict[str, Any]]:
start = time.time()
scan = self.load_scan(path)
if scan is None:
return None
# 1. API historical indicators
api_matrix, api_success = await self.fetch_api_indicators(scan.timestamp)
# 2. Scan-derived global
scan_global = self.compute_scan_global(scan)
# 3. Per-asset
asset_matrix, asset_symbols = self.compute_per_asset(scan)
return {
'scan_number': scan.scan_number,
'timestamp': scan.timestamp.isoformat(),
'processing_time': time.time() - start,
'api_indicators': api_matrix,
'api_success': api_success,
'api_names': np.array([ind.name for ind in INDICATORS], dtype='U32'),
'scan_global': scan_global,
'scan_global_names': np.array([n for n, _ in SCAN_GLOBAL_INDICATORS], dtype='U32'),
'asset_matrix': asset_matrix,
'asset_symbols': np.array(asset_symbols, dtype='U16'),
'asset_names': np.array([n for n, _ in PER_ASSET_INDICATORS], dtype='U32'),
'n_assets': len(asset_symbols),
'api_success_rate': np.nanmean(api_success[list(i-1 for i in IndicatorSource.API_HISTORICAL)]),
}
# =============================================================================
# OUTPUT
# =============================================================================
class OutputWriter:
def __init__(self, config: BackfillConfig):
self.config = config
def get_output_path(self, scan_path: Path) -> Path:
out_dir = Path(self.config.output_dir) if self.config.output_dir else scan_path.parent
out_dir.mkdir(parents=True, exist_ok=True)
return out_dir / f"{scan_path.stem}__Indicators.npz"
def save(self, data: Dict[str, Any], scan_path: Path) -> Path:
out_path = self.get_output_path(scan_path)
save_data = {}
for k, v in data.items():
if isinstance(v, np.ndarray):
save_data[k] = v
elif isinstance(v, str):
save_data[k] = np.array([v], dtype='U64')
else:
save_data[k] = np.array([v])
np.savez_compressed(out_path, **save_data)
return out_path
# =============================================================================
# RUNNER
# =============================================================================
class BackfillRunner:
def __init__(self, config: BackfillConfig):
self.config = config
self.processor = ScanProcessor(config)
self.writer = OutputWriter(config)
self.stats = {'processed': 0, 'failed': 0, 'skipped': 0}
def find_scans(self) -> List[Path]:
root = Path(self.config.scan_dir)
files = sorted(root.rglob("scan_*.json"))
if self.config.skip_existing:
files = [f for f in files if not self.writer.get_output_path(f).exists()]
return files
async def run(self):
unavail = IndicatorSource.get_unavailable_names()
logger.info(f"Skipping {len(unavail)} unavailable indicators: {unavail[:5]}...")
files = self.find_scans()
logger.info(f"Processing {len(files)} files...")
for i, path in enumerate(files):
try:
result = await self.processor.process(path)
if result:
if not self.config.dry_run:
self.writer.save(result, path)
self.stats['processed'] += 1
else:
self.stats['failed'] += 1
except Exception as e:
logger.error(f"Error {path.name}: {e}")
self.stats['failed'] += 1
if (i + 1) % 10 == 0:
logger.info(f"Progress: {i+1}/{len(files)}")
if self.config.rate_limit_delay > 0:
await asyncio.sleep(self.config.rate_limit_delay)
logger.info(f"Done: {self.stats}")
return self.stats
# =============================================================================
# UTILITY
# =============================================================================
def load_indicators(path: str) -> Dict[str, np.ndarray]:
"""Load .npz indicator file"""
return dict(np.load(path, allow_pickle=True))
def summary(path: str) -> str:
"""Summary of indicator file"""
d = load_indicators(path)
return f"""Timestamp: {d['timestamp'][0]}
Assets: {d['n_assets'][0]}
API success: {d['api_success_rate'][0]:.1%}
API shape: {d['api_indicators'].shape}
Scan global: {d['scan_global'].shape}
Per-asset: {d['asset_matrix'].shape}"""
# =============================================================================
# CLI
# =============================================================================
def main():
parser = argparse.ArgumentParser(description="DOLPHIN Backfill Runner")
# parser.add_argument("scan_dir", help="Directory with scan JSON files")
parser.add_argument("-o", "--output", help="Output directory")
parser.add_argument("--fred-key", default="", help="FRED API key")
parser.add_argument("--no-skip", action="store_true", help="Reprocess existing")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--delay", type=float, default=0.5)
args = parser.parse_args()
config = BackfillConfig(
scan_dir= Path(r"C:\Users\Lenovo\Documents\- Dolphin NG HD (NG3)\correlation_arb512\eigenvalues"),
output_dir=args.output,
# FRED API Key: c16a9cde3e3bb5bb972bb9283485f202
fred_api_key=args.fred_key or 'c16a9cde3e3bb5bb972bb9283485f202',
skip_existing=not args.no_skip,
dry_run=args.dry_run,
rate_limit_delay=args.delay,
)
runner = BackfillRunner(config)
asyncio.run(runner.run())
if __name__ == "__main__":
main()

1
external_factors/bf.bat Normal file
View File

@@ -0,0 +1 @@
"python backfill_runner.py"

1
external_factors/br.bat Normal file
View File

@@ -0,0 +1 @@
python backfill_runner.py

View File

@@ -0,0 +1,46 @@
{
"timestamp": "2026-03-01T21:34:06.686948+00:00",
"unix": 1772400846,
"calendar": {
"year": 2026,
"month": 3,
"day_of_month": 1,
"hour": 21,
"minute": 34,
"day_of_week": 6,
"week_of_year": 9
},
"fibonacci_time": {
"closest_fib_minute": 1597,
"harmonic_strength": 0.0
},
"regional_times": {
"Americas": {
"hour": 16.566666666666666,
"is_tradfi_open": false
},
"EMEA": {
"hour": 21.566666666666666,
"is_tradfi_open": false
},
"South_Asia": {
"hour": 3.066666666666667,
"is_tradfi_open": false
},
"East_Asia": {
"hour": 5.566666666666666,
"is_tradfi_open": false
},
"Oceania_SEA": {
"hour": 5.566666666666666,
"is_tradfi_open": false
}
},
"population_weighted_hour": 1.57,
"liquidity_weighted_hour": 21.13,
"liquidity_session": "LOW_LIQUIDITY",
"market_cycle_position": 0.4658,
"moon_illumination": 0.9703631088596449,
"moon_phase_name": "FULL_MOON",
"mercury_retrograde": 1
}

View File

@@ -0,0 +1,299 @@
import asyncio
import datetime
import json
import logging
import math
import threading
import time
import zoneinfo
from pathlib import Path
from typing import Dict, Any, Optional
import numpy as np
from astropy.time import Time
import astropy.coordinates as coord
import astropy.units as u
from astropy.coordinates import solar_system_ephemeris, get_body, EarthLocation
logger = logging.getLogger(__name__)
class MarketIndicators:
"""
Mathematical and astronomical calculations for the Esoteric Factors mapping.
Evaluates completely locally without external API dependencies.
"""
def __init__(self):
# Regions defined by NON-OVERLAPPING population clusters for accurate global weighting.
# Population in Millions (approximate). Liquidity weight is estimated crypto volume share.
self.regions = [
{'name': 'Americas', 'tz': 'America/New_York', 'pop': 1000, 'liq_weight': 0.35},
{'name': 'EMEA', 'tz': 'Europe/London', 'pop': 2200, 'liq_weight': 0.30},
{'name': 'South_Asia', 'tz': 'Asia/Kolkata', 'pop': 1400, 'liq_weight': 0.05},
{'name': 'East_Asia', 'tz': 'Asia/Shanghai', 'pop': 1600, 'liq_weight': 0.20},
{'name': 'Oceania_SEA', 'tz': 'Asia/Singapore', 'pop': 800, 'liq_weight': 0.10}
]
# Market cycle: Bitcoin halving based, ~4 years
self.cycle_length_days = 1460
self.last_halving = datetime.datetime(2024, 4, 20, tzinfo=datetime.timezone.utc)
# Cache for expensive ASTRO calculations
self._cache = {
'moon': {'val': None, 'ts': 0},
'mercury': {'val': None, 'ts': 0}
}
self.cache_ttl_seconds = 3600 * 6 # Update astro every 6 hours
def get_calendar_items(self, now: datetime.datetime) -> Dict[str, int]:
return {
'year': now.year,
'month': now.month,
'day_of_month': now.day,
'hour': now.hour,
'minute': now.minute,
'day_of_week': now.weekday(), # 0=Monday
'week_of_year': now.isocalendar().week
}
def is_tradfi_open(self, region_name: str, local_time: datetime.datetime) -> bool:
day = local_time.weekday()
if day >= 5: return False
hour_dec = local_time.hour + local_time.minute / 60.0
if 'Americas' in region_name:
return 9.5 <= hour_dec < 16.0
elif 'EMEA' in region_name:
return 8.0 <= hour_dec < 16.5
elif 'Asia' in region_name:
return 9.0 <= hour_dec < 15.0
return False
def get_regional_times(self, now_utc: datetime.datetime) -> Dict[str, Any]:
times = {}
for region in self.regions:
tz = zoneinfo.ZoneInfo(region['tz'])
local_time = now_utc.astimezone(tz)
times[region['name']] = {
'hour': local_time.hour + local_time.minute / 60.0,
'is_tradfi_open': self.is_tradfi_open(region['name'], local_time)
}
return times
def get_liquidity_session(self, now_utc: datetime.datetime) -> str:
utc_hour = now_utc.hour + now_utc.minute / 60.0
if 13 <= utc_hour < 17:
return "LONDON_NEW_YORK_OVERLAP"
elif 8 <= utc_hour < 13:
return "LONDON_MORNING"
elif 0 <= utc_hour < 8:
return "ASIA_PACIFIC"
elif 17 <= utc_hour < 21:
return "NEW_YORK_AFTERNOON"
else:
return "LOW_LIQUIDITY"
def get_weighted_times(self, now_utc: datetime.datetime) -> tuple[float, float]:
pop_sin, pop_cos = 0.0, 0.0
liq_sin, liq_cos = 0.0, 0.0
total_pop = sum(r['pop'] for r in self.regions)
for region in self.regions:
tz = zoneinfo.ZoneInfo(region['tz'])
local_time = now_utc.astimezone(tz)
hour_frac = (local_time.hour + local_time.minute / 60.0) / 24.0
angle = 2 * math.pi * hour_frac
w_pop = region['pop'] / total_pop
pop_sin += math.sin(angle) * w_pop
pop_cos += math.cos(angle) * w_pop
w_liq = region['liq_weight']
liq_sin += math.sin(angle) * w_liq
liq_cos += math.cos(angle) * w_liq
pop_angle = math.atan2(pop_sin, pop_cos)
if pop_angle < 0: pop_angle += 2 * math.pi
pop_hour = (pop_angle / (2 * math.pi)) * 24
liq_angle = math.atan2(liq_sin, liq_cos)
if liq_angle < 0: liq_angle += 2 * math.pi
liq_hour = (liq_angle / (2 * math.pi)) * 24
return round(pop_hour, 2), round(liq_hour, 2)
def get_market_cycle_position(self, now_utc: datetime.datetime) -> float:
days_since_halving = (now_utc - self.last_halving).days
position = (days_since_halving % self.cycle_length_days) / self.cycle_length_days
return position
def get_fibonacci_time(self, now_utc: datetime.datetime) -> Dict[str, Any]:
mins_passed = now_utc.hour * 60 + now_utc.minute
fib_seq = [1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597]
closest = min(fib_seq, key=lambda x: abs(x - mins_passed))
distance = abs(mins_passed - closest)
strength = 1.0 - min(distance / 30.0, 1.0)
return {'closest_fib_minute': closest, 'harmonic_strength': round(strength, 3)}
def get_moon_phase(self, now_utc: datetime.datetime) -> Dict[str, Any]:
now_ts = now_utc.timestamp()
if self._cache['moon']['val'] and (now_ts - self._cache['moon']['ts'] < self.cache_ttl_seconds):
return self._cache['moon']['val']
t = Time(now_utc)
with solar_system_ephemeris.set('builtin'):
moon = get_body('moon', t)
sun = get_body('sun', t)
elongation = sun.separation(moon)
phase_angle = np.arctan2(sun.distance * np.sin(elongation),
moon.distance - sun.distance * np.cos(elongation))
illumination = (1 + np.cos(phase_angle)) / 2.0
phase_name = "WAXING"
if illumination < 0.03: phase_name = "NEW_MOON"
elif illumination > 0.97: phase_name = "FULL_MOON"
elif illumination < 0.5: phase_name = "WAXING_CRESCENT" if moon.dec.deg > sun.dec.deg else "WANING_CRESCENT"
else: phase_name = "WAXING_GIBBOUS" if moon.dec.deg > sun.dec.deg else "WANING_GIBBOUS"
result = {'illumination': float(illumination), 'phase_name': phase_name}
self._cache['moon'] = {'val': result, 'ts': now_ts}
return result
def is_mercury_retrograde(self, now_utc: datetime.datetime) -> bool:
now_ts = now_utc.timestamp()
if self._cache['mercury']['val'] is not None and (now_ts - self._cache['mercury']['ts'] < self.cache_ttl_seconds):
return self._cache['mercury']['val']
t = Time(now_utc)
is_retro = False
try:
with solar_system_ephemeris.set('builtin'):
loc = EarthLocation.of_site('greenwich')
merc_now = get_body('mercury', t, loc)
merc_later = get_body('mercury', t + 1 * u.day, loc)
lon_now = merc_now.transform_to('geocentrictrueecliptic').lon.deg
lon_later = merc_later.transform_to('geocentrictrueecliptic').lon.deg
diff = (lon_later - lon_now) % 360
is_retro = diff > 180
except Exception as e:
logger.error(f"Astro calc error: {e}")
self._cache['mercury'] = {'val': is_retro, 'ts': now_ts}
return is_retro
def get_indicators(self, custom_now: Optional[datetime.datetime] = None) -> Dict[str, Any]:
"""Generate full suite of Esoteric Matrix factors."""
now_utc = custom_now if custom_now else datetime.datetime.now(datetime.timezone.utc)
pop_hour, liq_hour = self.get_weighted_times(now_utc)
moon_data = self.get_moon_phase(now_utc)
calendar = self.get_calendar_items(now_utc)
return {
'timestamp': now_utc.isoformat(),
'unix': int(now_utc.timestamp()),
'calendar': calendar,
'fibonacci_time': self.get_fibonacci_time(now_utc),
'regional_times': self.get_regional_times(now_utc),
'population_weighted_hour': pop_hour,
'liquidity_weighted_hour': liq_hour,
'liquidity_session': self.get_liquidity_session(now_utc),
'market_cycle_position': round(self.get_market_cycle_position(now_utc), 4),
'moon_illumination': moon_data['illumination'],
'moon_phase_name': moon_data['phase_name'],
'mercury_retrograde': int(self.is_mercury_retrograde(now_utc)),
}
class EsotericFactorsService:
"""
Continuous evaluation service for Esoteric Factors.
Dumps state deterministically to be consumed by the live trading orchestrator/Forewarning layers.
"""
def __init__(self, output_dir: str = "", poll_interval_s: float = 60.0):
# Default to same structure as external factors
if not output_dir:
self.output_dir = Path(__file__).parent / "eso_cache"
else:
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.poll_interval_s = poll_interval_s
self.engine = MarketIndicators()
self._latest_data = {}
self._running = False
self._task = None
self._lock = threading.Lock()
async def _update_loop(self):
logger.info(f"EsotericFactorsService starting. Polling every {self.poll_interval_s}s.")
while self._running:
try:
# 1. Compute Matrix
data = self.engine.get_indicators()
# 2. Store in memory
with self._lock:
self._latest_data = data
# 3. Dump purely to fast JSON
self._write_to_disk(data)
except Exception as e:
logger.error(f"Error in Esoteric update loop: {e}", exc_info=True)
await asyncio.sleep(self.poll_interval_s)
def _write_to_disk(self, data: dict):
# Fast write pattern via atomic tmp rename strategy
target_path = self.output_dir / "latest_esoteric_factors.json"
tmp_path = self.output_dir / "latest_esoteric_factors.tmp"
try:
with open(tmp_path, 'w') as f:
json.dump(data, f, indent=2)
tmp_path.replace(target_path)
except Exception as e:
logger.error(f"Failed to write Esoteric factors to disk: {e}")
def get_latest(self) -> dict:
"""Non-blocking sub-millisecond retrieval of the latest internal state."""
with self._lock:
return self._latest_data.copy()
def start(self):
"""Starts the background calculation loop (Threaded/Async wrapper)."""
if self._running: return
self._running = True
def run_async():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._update_loop())
self._thread = threading.Thread(target=run_async, daemon=True)
self._thread.start()
def stop(self):
self._running = False
if hasattr(self, '_thread'):
self._thread.join(timeout=2.0)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
svc = EsotericFactorsService(poll_interval_s=5.0)
print("Starting Esoteric Factors Service test run for 15 seconds...")
svc.start()
for _ in range(3):
time.sleep(5)
latest = svc.get_latest()
print(f"Update: Moon Illumination={latest.get('moon_illumination'):.3f} | Liquid Session={latest.get('liquidity_session')} | PopHour={latest.get('population_weighted_hour')}")
svc.stop()
print("Stopped successfully.")

View File

@@ -0,0 +1,612 @@
#!/usr/bin/env python3
"""
EXTERNAL FACTORS MATRIX v5.0 - DOLPHIN Compatible with BACKFILL
================================================================
85 indicators with HISTORICAL query support where available.
BACKFILL CAPABILITY:
FULL HISTORY (51): CoinMetrics, FRED, DeFi Llama TVL/stables, F&G, Binance funding/OI
PARTIAL (12): Deribit DVOL, CoinGecko prices, DEX volume
CURRENT ONLY (22): Mempool, order books, spreads, dominance
Author: HJ / Claude | Version: 5.0.0
"""
import asyncio
import aiohttp
import numpy as np
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime, timezone
from collections import deque
from enum import Enum
import json
class Category(Enum):
DERIVATIVES = "derivatives"
ONCHAIN = "onchain"
DEFI = "defi"
MACRO = "macro"
SENTIMENT = "sentiment"
MICROSTRUCTURE = "microstructure"
class Stationarity(Enum):
STATIONARY = "stationary"
TREND_UP = "trend_up"
EPISODIC = "episodic"
class HistoricalSupport(Enum):
FULL = "full" # Any historical date
PARTIAL = "partial" # Limited history
CURRENT = "current" # Real-time only
@dataclass
class Indicator:
id: int
name: str
category: Category
source: str
url: str
parser: str
stationarity: Stationarity
historical: HistoricalSupport
hist_url: str = ""
hist_resolution: str = ""
description: str = ""
@dataclass
class Config:
timeout: int = 15
max_concurrent: int = 15
cache_ttl: int = 30
fred_api_key: str = ""
# fmt: off
INDICATORS: List[Indicator] = [
# DERIVATIVES - Binance (1-10) - Most have FULL history
Indicator(1, "funding_btc", Category.DERIVATIVES, "binance",
"https://fapi.binance.com/fapi/v1/fundingRate?symbol=BTCUSDT&limit=1",
"parse_binance_funding", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://fapi.binance.com/fapi/v1/fundingRate?symbol=BTCUSDT&startTime={start_ms}&endTime={end_ms}&limit=1",
"8h", "BTC funding - FULL via startTime/endTime"),
Indicator(2, "funding_eth", Category.DERIVATIVES, "binance",
"https://fapi.binance.com/fapi/v1/fundingRate?symbol=ETHUSDT&limit=1",
"parse_binance_funding", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://fapi.binance.com/fapi/v1/fundingRate?symbol=ETHUSDT&startTime={start_ms}&endTime={end_ms}&limit=1",
"8h", "ETH funding"),
Indicator(3, "oi_btc", Category.DERIVATIVES, "binance",
"https://fapi.binance.com/fapi/v1/openInterest?symbol=BTCUSDT",
"parse_binance_oi", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://fapi.binance.com/futures/data/openInterestHist?symbol=BTCUSDT&period=1h&startTime={start_ms}&endTime={end_ms}&limit=1",
"1h", "BTC OI - FULL via openInterestHist"),
Indicator(4, "oi_eth", Category.DERIVATIVES, "binance",
"https://fapi.binance.com/fapi/v1/openInterest?symbol=ETHUSDT",
"parse_binance_oi", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://fapi.binance.com/futures/data/openInterestHist?symbol=ETHUSDT&period=1h&startTime={start_ms}&endTime={end_ms}&limit=1",
"1h", "ETH OI"),
Indicator(5, "ls_btc", Category.DERIVATIVES, "binance",
"https://fapi.binance.com/futures/data/globalLongShortAccountRatio?symbol=BTCUSDT&period=1h&limit=1",
"parse_binance_ls", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://fapi.binance.com/futures/data/globalLongShortAccountRatio?symbol=BTCUSDT&period=1h&startTime={start_ms}&endTime={end_ms}&limit=1",
"1h", "L/S ratio - FULL"),
Indicator(6, "ls_eth", Category.DERIVATIVES, "binance",
"https://fapi.binance.com/futures/data/globalLongShortAccountRatio?symbol=ETHUSDT&period=1h&limit=1",
"parse_binance_ls", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://fapi.binance.com/futures/data/globalLongShortAccountRatio?symbol=ETHUSDT&period=1h&startTime={start_ms}&endTime={end_ms}&limit=1",
"1h", "ETH L/S"),
Indicator(7, "ls_top", Category.DERIVATIVES, "binance",
"https://fapi.binance.com/futures/data/topLongShortAccountRatio?symbol=BTCUSDT&period=1h&limit=1",
"parse_binance_ls", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://fapi.binance.com/futures/data/topLongShortAccountRatio?symbol=BTCUSDT&period=1h&startTime={start_ms}&endTime={end_ms}&limit=1",
"1h", "Top trader L/S"),
Indicator(8, "taker", Category.DERIVATIVES, "binance",
"https://fapi.binance.com/futures/data/takerlongshortRatio?symbol=BTCUSDT&period=1h&limit=1",
"parse_binance_taker", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://fapi.binance.com/futures/data/takerlongshortRatio?symbol=BTCUSDT&period=1h&startTime={start_ms}&endTime={end_ms}&limit=1",
"1h", "Taker ratio"),
Indicator(9, "basis", Category.DERIVATIVES, "binance",
"https://fapi.binance.com/fapi/v1/premiumIndex?symbol=BTCUSDT",
"parse_binance_basis", Stationarity.STATIONARY, HistoricalSupport.CURRENT,
"", "", "Basis - CURRENT"),
Indicator(10, "liq_proxy", Category.DERIVATIVES, "binance",
"https://fapi.binance.com/fapi/v1/ticker/24hr?symbol=BTCUSDT",
"parse_liq_proxy", Stationarity.STATIONARY, HistoricalSupport.CURRENT,
"", "", "Liq proxy - CURRENT"),
# DERIVATIVES - Deribit (11-18)
Indicator(11, "dvol_btc", Category.DERIVATIVES, "deribit",
"https://www.deribit.com/api/v2/public/get_volatility_index_data?currency=BTC&resolution=3600&count=1",
"parse_deribit_dvol", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://www.deribit.com/api/v2/public/get_volatility_index_data?currency=BTC&resolution=3600&start_timestamp={start_ms}&end_timestamp={end_ms}",
"1h", "DVOL - FULL"),
Indicator(12, "dvol_eth", Category.DERIVATIVES, "deribit",
"https://www.deribit.com/api/v2/public/get_volatility_index_data?currency=ETH&resolution=3600&count=1",
"parse_deribit_dvol", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://www.deribit.com/api/v2/public/get_volatility_index_data?currency=ETH&resolution=3600&start_timestamp={start_ms}&end_timestamp={end_ms}",
"1h", "ETH DVOL"),
Indicator(13, "pcr_vol", Category.DERIVATIVES, "deribit",
"https://www.deribit.com/api/v2/public/get_book_summary_by_currency?currency=BTC&kind=option",
"parse_deribit_pcr", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "PCR - CURRENT"),
Indicator(14, "pcr_oi", Category.DERIVATIVES, "deribit",
"https://www.deribit.com/api/v2/public/get_book_summary_by_currency?currency=BTC&kind=option",
"parse_deribit_pcr_oi", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "PCR OI - CURRENT"),
Indicator(15, "pcr_eth", Category.DERIVATIVES, "deribit",
"https://www.deribit.com/api/v2/public/get_book_summary_by_currency?currency=ETH&kind=option",
"parse_deribit_pcr", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "ETH PCR - CURRENT"),
Indicator(16, "opt_oi", Category.DERIVATIVES, "deribit",
"https://www.deribit.com/api/v2/public/get_book_summary_by_currency?currency=BTC&kind=option",
"parse_deribit_oi", Stationarity.TREND_UP, HistoricalSupport.CURRENT, "", "", "Options OI - CURRENT"),
Indicator(17, "fund_dbt_btc", Category.DERIVATIVES, "deribit",
"https://www.deribit.com/api/v2/public/get_funding_rate_value?instrument_name=BTC-PERPETUAL",
"parse_deribit_fund", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://www.deribit.com/api/v2/public/get_funding_rate_history?instrument_name=BTC-PERPETUAL&start_timestamp={start_ms}&end_timestamp={end_ms}",
"8h", "Deribit fund - FULL"),
Indicator(18, "fund_dbt_eth", Category.DERIVATIVES, "deribit",
"https://www.deribit.com/api/v2/public/get_funding_rate_value?instrument_name=ETH-PERPETUAL",
"parse_deribit_fund", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://www.deribit.com/api/v2/public/get_funding_rate_history?instrument_name=ETH-PERPETUAL&start_timestamp={start_ms}&end_timestamp={end_ms}",
"8h", "Deribit ETH fund"),
# ONCHAIN - CoinMetrics (19-30) - ALL FULL HISTORY
Indicator(19, "rcap_btc", Category.ONCHAIN, "coinmetrics",
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=CapRealUSD&frequency=1d&page_size=1",
"parse_cm", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=CapRealUSD&frequency=1d&start_time={date}T00:00:00Z&end_time={date}T23:59:59Z",
"1d", "Realized cap - FULL"),
Indicator(20, "mvrv", Category.ONCHAIN, "coinmetrics",
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=CapMrktCurUSD,CapRealUSD&frequency=1d&page_size=1",
"parse_cm_mvrv", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=CapMrktCurUSD,CapRealUSD&frequency=1d&start_time={date}T00:00:00Z&end_time={date}T23:59:59Z",
"1d", "MVRV - FULL"),
Indicator(21, "nupl", Category.ONCHAIN, "coinmetrics",
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=CapMrktCurUSD,CapRealUSD&frequency=1d&page_size=1",
"parse_cm_nupl", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=CapMrktCurUSD,CapRealUSD&frequency=1d&start_time={date}T00:00:00Z&end_time={date}T23:59:59Z",
"1d", "NUPL - FULL"),
Indicator(22, "addr_btc", Category.ONCHAIN, "coinmetrics",
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=AdrActCnt&frequency=1d&page_size=1",
"parse_cm", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=AdrActCnt&frequency=1d&start_time={date}T00:00:00Z&end_time={date}T23:59:59Z",
"1d", "Active addr - FULL"),
Indicator(23, "addr_eth", Category.ONCHAIN, "coinmetrics",
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=eth&metrics=AdrActCnt&frequency=1d&page_size=1",
"parse_cm", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=eth&metrics=AdrActCnt&frequency=1d&start_time={date}T00:00:00Z&end_time={date}T23:59:59Z",
"1d", "ETH addr - FULL"),
Indicator(24, "txcnt", Category.ONCHAIN, "coinmetrics",
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=TxCnt&frequency=1d&page_size=1",
"parse_cm", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=TxCnt&frequency=1d&start_time={date}T00:00:00Z&end_time={date}T23:59:59Z",
"1d", "TX count - FULL"),
Indicator(25, "fees_btc", Category.ONCHAIN, "coinmetrics",
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=FeeTotUSD&frequency=1d&page_size=1",
"parse_cm", Stationarity.EPISODIC, HistoricalSupport.FULL,
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=FeeTotUSD&frequency=1d&start_time={date}T00:00:00Z&end_time={date}T23:59:59Z",
"1d", "BTC fees - FULL"),
Indicator(26, "fees_eth", Category.ONCHAIN, "coinmetrics",
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=eth&metrics=FeeTotUSD&frequency=1d&page_size=1",
"parse_cm", Stationarity.EPISODIC, HistoricalSupport.FULL,
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=eth&metrics=FeeTotUSD&frequency=1d&start_time={date}T00:00:00Z&end_time={date}T23:59:59Z",
"1d", "ETH fees - FULL"),
Indicator(27, "nvt", Category.ONCHAIN, "coinmetrics",
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=NVTAdj&frequency=1d&page_size=1",
"parse_cm", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=NVTAdj&frequency=1d&start_time={date}T00:00:00Z&end_time={date}T23:59:59Z",
"1d", "NVT - FULL"),
Indicator(28, "velocity", Category.ONCHAIN, "coinmetrics",
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=VelCur1yr&frequency=1d&page_size=1",
"parse_cm", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=VelCur1yr&frequency=1d&start_time={date}T00:00:00Z&end_time={date}T23:59:59Z",
"1d", "Velocity - FULL"),
Indicator(29, "sply_act", Category.ONCHAIN, "coinmetrics",
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=SplyAct1yr&frequency=1d&page_size=1",
"parse_cm", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=btc&metrics=SplyAct1yr&frequency=1d&start_time={date}T00:00:00Z&end_time={date}T23:59:59Z",
"1d", "Active supply - FULL"),
Indicator(30, "rcap_eth", Category.ONCHAIN, "coinmetrics",
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=eth&metrics=CapRealUSD&frequency=1d&page_size=1",
"parse_cm", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets=eth&metrics=CapRealUSD&frequency=1d&start_time={date}T00:00:00Z&end_time={date}T23:59:59Z",
"1d", "ETH rcap - FULL"),
# ONCHAIN - Blockchain.info (31-37)
Indicator(31, "hashrate", Category.ONCHAIN, "blockchain",
"https://blockchain.info/q/hashrate", "parse_bc", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://api.blockchain.info/charts/hash-rate?timespan=1days&start={date}&format=json", "1d", "Hashrate - FULL"),
Indicator(32, "difficulty", Category.ONCHAIN, "blockchain",
"https://blockchain.info/q/getdifficulty", "parse_bc", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://api.blockchain.info/charts/difficulty?timespan=1days&start={date}&format=json", "1d", "Difficulty - FULL"),
Indicator(33, "blk_int", Category.ONCHAIN, "blockchain",
"https://blockchain.info/q/interval", "parse_bc_int", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "Block int - CURRENT"),
Indicator(34, "unconf", Category.ONCHAIN, "blockchain",
"https://blockchain.info/q/unconfirmedcount", "parse_bc", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "Unconf - CURRENT"),
Indicator(35, "tx_blk", Category.ONCHAIN, "blockchain",
"https://blockchain.info/q/nperblock", "parse_bc", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://api.blockchain.info/charts/n-transactions-per-block?timespan=1days&start={date}&format=json", "1d", "TX/blk - FULL"),
Indicator(36, "total_btc", Category.ONCHAIN, "blockchain",
"https://blockchain.info/q/totalbc", "parse_bc_btc", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://api.blockchain.info/charts/total-bitcoins?timespan=1days&start={date}&format=json", "1d", "Total BTC - FULL"),
Indicator(37, "mcap_bc", Category.ONCHAIN, "blockchain",
"https://blockchain.info/q/marketcap", "parse_bc", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://api.blockchain.info/charts/market-cap?timespan=1days&start={date}&format=json", "1d", "Mcap - FULL"),
# ONCHAIN - Mempool (38-42) - ALL CURRENT
Indicator(38, "mp_cnt", Category.ONCHAIN, "mempool", "https://mempool.space/api/mempool",
"parse_mp_cnt", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "Mempool - CURRENT"),
Indicator(39, "mp_mb", Category.ONCHAIN, "mempool", "https://mempool.space/api/mempool",
"parse_mp_mb", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "Mempool MB - CURRENT"),
Indicator(40, "fee_fast", Category.ONCHAIN, "mempool", "https://mempool.space/api/v1/fees/recommended",
"parse_fee_fast", Stationarity.EPISODIC, HistoricalSupport.CURRENT, "", "", "Fast fee - CURRENT"),
Indicator(41, "fee_med", Category.ONCHAIN, "mempool", "https://mempool.space/api/v1/fees/recommended",
"parse_fee_med", Stationarity.EPISODIC, HistoricalSupport.CURRENT, "", "", "Med fee - CURRENT"),
Indicator(42, "fee_slow", Category.ONCHAIN, "mempool", "https://mempool.space/api/v1/fees/recommended",
"parse_fee_slow", Stationarity.EPISODIC, HistoricalSupport.CURRENT, "", "", "Slow fee - CURRENT"),
# DEFI - DeFi Llama (43-51)
Indicator(43, "tvl", Category.DEFI, "defillama", "https://api.llama.fi/v2/historicalChainTvl",
"parse_dl_tvl", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://api.llama.fi/v2/historicalChainTvl", "1d", "TVL - FULL (filter client-side)"),
Indicator(44, "tvl_eth", Category.DEFI, "defillama", "https://api.llama.fi/v2/historicalChainTvl/Ethereum",
"parse_dl_tvl", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://api.llama.fi/v2/historicalChainTvl/Ethereum", "1d", "ETH TVL - FULL"),
Indicator(45, "stables", Category.DEFI, "defillama", "https://stablecoins.llama.fi/stablecoins?includePrices=false",
"parse_dl_stables", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://stablecoins.llama.fi/stablecoincharts/all?stablecoin=1", "1d", "Stables - FULL"),
Indicator(46, "usdt", Category.DEFI, "defillama", "https://stablecoins.llama.fi/stablecoin/tether",
"parse_dl_single", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://stablecoins.llama.fi/stablecoincharts/all?stablecoin=1", "1d", "USDT - FULL"),
Indicator(47, "usdc", Category.DEFI, "defillama", "https://stablecoins.llama.fi/stablecoin/usd-coin",
"parse_dl_single", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://stablecoins.llama.fi/stablecoincharts/all?stablecoin=2", "1d", "USDC - FULL"),
Indicator(48, "dex_vol", Category.DEFI, "defillama",
"https://api.llama.fi/overview/dexs?excludeTotalDataChart=true&excludeTotalDataChartBreakdown=true",
"parse_dl_dex", Stationarity.EPISODIC, HistoricalSupport.PARTIAL, "", "1d", "DEX vol - PARTIAL"),
Indicator(49, "bridge", Category.DEFI, "defillama", "https://bridges.llama.fi/bridges?includeChains=false",
"parse_dl_bridge", Stationarity.EPISODIC, HistoricalSupport.PARTIAL, "", "1d", "Bridge - PARTIAL"),
Indicator(50, "yields", Category.DEFI, "defillama", "https://yields.llama.fi/pools",
"parse_dl_yields", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "Yields - CURRENT"),
Indicator(51, "fees", Category.DEFI, "defillama", "https://api.llama.fi/overview/fees?excludeTotalDataChart=true",
"parse_dl_fees", Stationarity.EPISODIC, HistoricalSupport.PARTIAL, "", "1d", "Fees - PARTIAL"),
# MACRO - FRED (52-65) - ALL FULL HISTORY (decades)
Indicator(52, "dxy", Category.MACRO, "fred", "DTWEXBGS", "parse_fred", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=DTWEXBGS&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1d", "DXY - FULL"),
Indicator(53, "us10y", Category.MACRO, "fred", "DGS10", "parse_fred", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=DGS10&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1d", "10Y - FULL"),
Indicator(54, "us2y", Category.MACRO, "fred", "DGS2", "parse_fred", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=DGS2&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1d", "2Y - FULL"),
Indicator(55, "ycurve", Category.MACRO, "fred", "T10Y2Y", "parse_fred", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=T10Y2Y&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1d", "Yield curve - FULL"),
Indicator(56, "vix", Category.MACRO, "fred", "VIXCLS", "parse_fred", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=VIXCLS&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1d", "VIX - FULL"),
Indicator(57, "fedfunds", Category.MACRO, "fred", "DFF", "parse_fred", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=DFF&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1d", "Fed funds - FULL"),
Indicator(58, "m2", Category.MACRO, "fred", "WM2NS", "parse_fred", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=WM2NS&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1w", "M2 - FULL"),
Indicator(59, "cpi", Category.MACRO, "fred", "CPIAUCSL", "parse_fred", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=CPIAUCSL&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1m", "CPI - FULL"),
Indicator(60, "sp500", Category.MACRO, "fred", "SP500", "parse_fred", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=SP500&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1d", "S&P - FULL"),
Indicator(61, "gold", Category.MACRO, "fred", "GOLDAMGBD228NLBM", "parse_fred", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=GOLDAMGBD228NLBM&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1d", "Gold - FULL"),
Indicator(62, "hy_spread", Category.MACRO, "fred", "BAMLH0A0HYM2", "parse_fred", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=BAMLH0A0HYM2&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1d", "HY spread - FULL"),
Indicator(63, "be5y", Category.MACRO, "fred", "T5YIE", "parse_fred", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=T5YIE&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1d", "Breakeven - FULL"),
Indicator(64, "nfci", Category.MACRO, "fred", "NFCI", "parse_fred", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=NFCI&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1w", "NFCI - FULL"),
Indicator(65, "claims", Category.MACRO, "fred", "ICSA", "parse_fred", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://api.stlouisfed.org/fred/series/observations?series_id=ICSA&api_key={key}&file_type=json&observation_start={date}&observation_end={date}", "1w", "Claims - FULL"),
# SENTIMENT (66-72) - F&G has FULL history
Indicator(66, "fng", Category.SENTIMENT, "alternative", "https://api.alternative.me/fng/?limit=1",
"parse_fng", Stationarity.STATIONARY, HistoricalSupport.FULL,
"https://api.alternative.me/fng/?limit=1000&date_format=us", "1d", "F&G - FULL (returns history, filter)"),
Indicator(67, "fng_prev", Category.SENTIMENT, "alternative", "https://api.alternative.me/fng/?limit=2",
"parse_fng_prev", Stationarity.STATIONARY, HistoricalSupport.FULL, "", "1d", "Prev F&G"),
Indicator(68, "fng_week", Category.SENTIMENT, "alternative", "https://api.alternative.me/fng/?limit=7",
"parse_fng_week", Stationarity.STATIONARY, HistoricalSupport.FULL, "", "1d", "Week F&G"),
Indicator(69, "fng_vol", Category.SENTIMENT, "alternative", "https://api.alternative.me/fng/?limit=1",
"parse_fng", Stationarity.STATIONARY, HistoricalSupport.FULL, "", "1d", "Vol proxy"),
Indicator(70, "fng_mom", Category.SENTIMENT, "alternative", "https://api.alternative.me/fng/?limit=1",
"parse_fng", Stationarity.STATIONARY, HistoricalSupport.FULL, "", "1d", "Mom proxy"),
Indicator(71, "fng_soc", Category.SENTIMENT, "alternative", "https://api.alternative.me/fng/?limit=1",
"parse_fng", Stationarity.STATIONARY, HistoricalSupport.FULL, "", "1d", "Social proxy"),
Indicator(72, "fng_dom", Category.SENTIMENT, "alternative", "https://api.alternative.me/fng/?limit=1",
"parse_fng", Stationarity.STATIONARY, HistoricalSupport.FULL, "", "1d", "Dom proxy"),
# MICROSTRUCTURE (73-80) - Most CURRENT
Indicator(73, "imbal_btc", Category.MICROSTRUCTURE, "binance", "https://api.binance.com/api/v3/depth?symbol=BTCUSDT&limit=100",
"parse_imbal", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "Imbalance - CURRENT"),
Indicator(74, "imbal_eth", Category.MICROSTRUCTURE, "binance", "https://api.binance.com/api/v3/depth?symbol=ETHUSDT&limit=100",
"parse_imbal", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "ETH imbal - CURRENT"),
Indicator(75, "spread", Category.MICROSTRUCTURE, "binance", "https://api.binance.com/api/v3/ticker/bookTicker?symbol=BTCUSDT",
"parse_spread", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "Spread - CURRENT"),
Indicator(76, "chg24_btc", Category.MICROSTRUCTURE, "binance", "https://api.binance.com/api/v3/ticker/24hr?symbol=BTCUSDT",
"parse_chg", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "24h chg - CURRENT"),
Indicator(77, "chg24_eth", Category.MICROSTRUCTURE, "binance", "https://api.binance.com/api/v3/ticker/24hr?symbol=ETHUSDT",
"parse_chg", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "ETH 24h - CURRENT"),
Indicator(78, "vol24", Category.MICROSTRUCTURE, "binance", "https://api.binance.com/api/v3/ticker/24hr?symbol=BTCUSDT",
"parse_vol", Stationarity.EPISODIC, HistoricalSupport.FULL,
"https://api.binance.com/api/v3/klines?symbol=BTCUSDT&interval=1d&startTime={start_ms}&endTime={end_ms}&limit=1",
"1d", "Volume - FULL via klines"),
Indicator(79, "dispersion", Category.MICROSTRUCTURE, "binance", "https://api.binance.com/api/v3/ticker/24hr",
"parse_disp", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "Dispersion - CURRENT"),
Indicator(80, "correlation", Category.MICROSTRUCTURE, "binance", "https://api.binance.com/api/v3/ticker/24hr",
"parse_corr", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "Correlation - CURRENT"),
# MARKET - CoinGecko (81-85)
Indicator(81, "btc_price", Category.MACRO, "coingecko", "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd",
"parse_cg_btc", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://api.coingecko.com/api/v3/coins/bitcoin/history?date={date_dmy}", "1d", "BTC price - FULL"),
Indicator(82, "eth_price", Category.MACRO, "coingecko", "https://api.coingecko.com/api/v3/simple/price?ids=ethereum&vs_currencies=usd",
"parse_cg_eth", Stationarity.TREND_UP, HistoricalSupport.FULL,
"https://api.coingecko.com/api/v3/coins/ethereum/history?date={date_dmy}", "1d", "ETH price - FULL"),
Indicator(83, "mcap", Category.MACRO, "coingecko", "https://api.coingecko.com/api/v3/global",
"parse_cg_mcap", Stationarity.TREND_UP, HistoricalSupport.PARTIAL, "", "1d", "Mcap - PARTIAL"),
Indicator(84, "btc_dom", Category.MACRO, "coingecko", "https://api.coingecko.com/api/v3/global",
"parse_cg_dom_btc", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "BTC dom - CURRENT"),
Indicator(85, "eth_dom", Category.MACRO, "coingecko", "https://api.coingecko.com/api/v3/global",
"parse_cg_dom_eth", Stationarity.STATIONARY, HistoricalSupport.CURRENT, "", "", "ETH dom - CURRENT"),
]
# fmt: on
N_INDICATORS = len(INDICATORS)
class StationarityTransformer:
def __init__(self, lookback: int = 10):
self.history: Dict[int, deque] = {i: deque(maxlen=lookback+1) for i in range(1, N_INDICATORS+1)}
def transform(self, ind_id: int, raw: float) -> float:
ind = INDICATORS[ind_id - 1]
hist = self.history[ind_id]
hist.append(raw)
if ind.stationarity == Stationarity.STATIONARY: return raw
if ind.stationarity == Stationarity.TREND_UP:
return (raw - hist[-2]) / abs(hist[-2]) if len(hist) >= 2 and hist[-2] != 0 else 0.0
if ind.stationarity == Stationarity.EPISODIC:
if len(hist) < 3: return 0.0
m, s = np.mean(list(hist)), np.std(list(hist))
return (raw - m) / s if s > 0 else 0.0
return raw
def transform_matrix(self, raw: np.ndarray) -> np.ndarray:
return np.array([self.transform(i+1, raw[i]) for i in range(len(raw))])
class ExternalFactorsFetcher:
def __init__(self, config: Config = None):
self.config = config or Config()
self.cache: Dict[str, Tuple[float, Any]] = {}
import time as t; self._time = t
def _build_hist_url(self, ind: Indicator, dt: datetime) -> Optional[str]:
if ind.historical == HistoricalSupport.CURRENT or not ind.hist_url: return None
url = ind.hist_url
date_str = dt.strftime("%Y-%m-%d")
date_dmy = dt.strftime("%d-%m-%Y")
start_ms = int(dt.replace(hour=0, minute=0, second=0).timestamp() * 1000)
end_ms = int(dt.replace(hour=23, minute=59, second=59).timestamp() * 1000)
key = self.config.fred_api_key or "DEMO_KEY"
return url.replace("{date}", date_str).replace("{date_dmy}", date_dmy).replace("{start_ms}", str(start_ms)).replace("{end_ms}", str(end_ms)).replace("{key}", key)
async def _fetch(self, session, url: str) -> Optional[Any]:
if url in self.cache:
ct, cd = self.cache[url]
if self._time.time() - ct < self.config.cache_ttl: return cd
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=self.config.timeout), headers={"User-Agent": "Mozilla/5.0"}) as r:
if r.status == 200:
d = await r.json() if 'json' in r.headers.get('Content-Type', '') else await r.text()
if isinstance(d, str):
try: d = json.loads(d)
except: pass
self.cache[url] = (self._time.time(), d)
return d
except: pass
return None
def _fred_url(self, series: str) -> str:
return f"https://api.stlouisfed.org/fred/series/observations?series_id={series}&api_key={self.config.fred_api_key or 'DEMO_KEY'}&file_type=json&sort_order=desc&limit=1"
# Parsers
def parse_binance_funding(self, d): return float(d[0]['fundingRate']) if isinstance(d, list) and d else 0.0
def parse_binance_oi(self, d):
if isinstance(d, list) and d: return float(d[-1].get('sumOpenInterest', 0))
return float(d.get('openInterest', 0)) if isinstance(d, dict) else 0.0
def parse_binance_ls(self, d): return float(d[-1]['longShortRatio']) if isinstance(d, list) and d else 1.0
def parse_binance_taker(self, d): return float(d[-1]['buySellRatio']) if isinstance(d, list) and d else 1.0
def parse_binance_basis(self, d): return float(d.get('lastFundingRate', 0)) * 365 * 3 if isinstance(d, dict) else 0.0
def parse_liq_proxy(self, d): return np.tanh(float(d.get('priceChangePercent', 0)) / 10) if isinstance(d, dict) else 0.0
def parse_deribit_dvol(self, d):
if isinstance(d, dict) and 'result' in d and isinstance(d['result'], dict) and 'data' in d['result'] and d['result']['data']:
return float(d['result']['data'][-1][4]) if len(d['result']['data'][-1]) > 4 else 0.0
return 0.0
def parse_deribit_pcr(self, d):
if isinstance(d, dict) and 'result' in d:
r = d['result']
p = sum(float(o.get('volume', 0)) for o in r if '-P' in o.get('instrument_name', ''))
c = sum(float(o.get('volume', 0)) for o in r if '-C' in o.get('instrument_name', ''))
return p / c if c > 0 else 1.0
return 1.0
def parse_deribit_pcr_oi(self, d):
if isinstance(d, dict) and 'result' in d:
r = d['result']
p = sum(float(o.get('open_interest', 0)) for o in r if '-P' in o.get('instrument_name', ''))
c = sum(float(o.get('open_interest', 0)) for o in r if '-C' in o.get('instrument_name', ''))
return p / c if c > 0 else 1.0
return 1.0
def parse_deribit_oi(self, d): return sum(float(o.get('open_interest', 0)) for o in d['result']) if isinstance(d, dict) and 'result' in d else 0.0
def parse_deribit_fund(self, d):
if isinstance(d, dict) and 'result' in d:
r = d['result']
return float(r[-1].get('interest_8h', 0)) if isinstance(r, list) and r else float(r)
return 0.0
def parse_cm(self, d):
if isinstance(d, dict) and 'data' in d and d['data']:
for k, v in d['data'][-1].items():
if k not in ['asset', 'time']:
try: return float(v)
except: pass
return 0.0
def parse_cm_mvrv(self, d):
if isinstance(d, dict) and 'data' in d and d['data']:
r = d['data'][-1]
m, rc = float(r.get('CapMrktCurUSD', 0)), float(r.get('CapRealUSD', 1))
return m / rc if rc > 0 else 0.0
return 0.0
def parse_cm_nupl(self, d):
if isinstance(d, dict) and 'data' in d and d['data']:
r = d['data'][-1]
m, rc = float(r.get('CapMrktCurUSD', 0)), float(r.get('CapRealUSD', 1))
return (m - rc) / m if m > 0 else 0.0
return 0.0
def parse_bc(self, d):
if isinstance(d, (int, float)): return float(d)
if isinstance(d, str):
try: return float(d)
except: pass
if isinstance(d, dict) and 'values' in d and d['values']: return float(d['values'][-1].get('y', 0))
return 0.0
def parse_bc_int(self, d): v = self.parse_bc(d); return abs(v - 600) / 600 if v > 0 else 0.0
def parse_bc_btc(self, d): v = self.parse_bc(d); return v / 1e8 if v > 0 else 0.0
def parse_mp_cnt(self, d): return float(d.get('count', 0)) if isinstance(d, dict) else 0.0
def parse_mp_mb(self, d): return float(d.get('vsize', 0)) / 1e6 if isinstance(d, dict) else 0.0
def parse_fee_fast(self, d): return float(d.get('fastestFee', 0)) if isinstance(d, dict) else 0.0
def parse_fee_med(self, d): return float(d.get('halfHourFee', 0)) if isinstance(d, dict) else 0.0
def parse_fee_slow(self, d): return float(d.get('economyFee', 0)) if isinstance(d, dict) else 0.0
def parse_dl_tvl(self, d, target_date: datetime = None):
if isinstance(d, list) and d:
if target_date:
ts = int(target_date.timestamp())
for e in reversed(d):
if e.get('date', 0) <= ts: return float(e.get('tvl', 0))
return float(d[-1].get('tvl', 0))
return 0.0
def parse_dl_stables(self, d):
if isinstance(d, dict) and 'peggedAssets' in d:
return sum(float(a.get('circulating', {}).get('peggedUSD', 0)) for a in d['peggedAssets'])
return 0.0
def parse_dl_single(self, d):
if isinstance(d, dict) and 'tokens' in d and d['tokens']:
return float(d['tokens'][-1].get('circulating', {}).get('peggedUSD', 0))
return 0.0
def parse_dl_dex(self, d): return float(d.get('total24h', 0)) if isinstance(d, dict) else 0.0
def parse_dl_bridge(self, d):
if isinstance(d, dict) and 'bridges' in d:
return sum(float(b.get('lastDayVolume', 0)) for b in d['bridges'])
return 0.0
def parse_dl_yields(self, d):
if isinstance(d, dict) and 'data' in d:
apys = [float(p.get('apy', 0)) for p in d['data'][:100] if p.get('apy')]
return np.mean(apys) if apys else 0.0
return 0.0
def parse_dl_fees(self, d): return float(d.get('total24h', 0)) if isinstance(d, dict) else 0.0
def parse_fred(self, d):
if isinstance(d, dict) and 'observations' in d and d['observations']:
v = d['observations'][-1].get('value', '.')
if v != '.':
try: return float(v)
except: pass
return 0.0
def parse_fng(self, d): return float(d['data'][0]['value']) if isinstance(d, dict) and 'data' in d and d['data'] else 50.0
def parse_fng_prev(self, d): return float(d['data'][1]['value']) if isinstance(d, dict) and 'data' in d and len(d['data']) > 1 else 50.0
def parse_fng_week(self, d): return np.mean([float(x['value']) for x in d['data'][:7]]) if isinstance(d, dict) and 'data' in d and len(d['data']) >= 7 else 50.0
def parse_imbal(self, d):
if isinstance(d, dict):
bv = sum(float(b[1]) for b in d.get('bids', [])[:50])
av = sum(float(a[1]) for a in d.get('asks', [])[:50])
t = bv + av
return (bv - av) / t if t > 0 else 0.0
return 0.0
def parse_spread(self, d):
if isinstance(d, dict):
b, a = float(d.get('bidPrice', 0)), float(d.get('askPrice', 0))
return (a - b) / b * 10000 if b > 0 else 0.0
return 0.0
def parse_chg(self, d): return float(d.get('priceChangePercent', 0)) if isinstance(d, dict) else 0.0
def parse_vol(self, d):
if isinstance(d, dict): return float(d.get('quoteVolume', 0))
if isinstance(d, list) and d and isinstance(d[0], list): return float(d[-1][7])
return 0.0
def parse_disp(self, d):
if isinstance(d, list) and len(d) > 10:
chg = [float(t['priceChangePercent']) for t in d if t.get('symbol', '').endswith('USDT') and 'priceChangePercent' in t]
return float(np.std(chg[:50])) if len(chg) > 5 else 0.0
return 0.0
def parse_corr(self, d): disp = self.parse_disp(d); return 1 / (1 + disp) if disp > 0 else 0.5
def parse_cg_btc(self, d):
if isinstance(d, dict) and 'bitcoin' in d: return float(d['bitcoin']['usd'])
if isinstance(d, dict) and 'market_data' in d: return float(d['market_data'].get('current_price', {}).get('usd', 0))
return 0.0
def parse_cg_eth(self, d):
if isinstance(d, dict) and 'ethereum' in d: return float(d['ethereum']['usd'])
if isinstance(d, dict) and 'market_data' in d: return float(d['market_data'].get('current_price', {}).get('usd', 0))
return 0.0
def parse_cg_mcap(self, d): return float(d['data']['total_market_cap']['usd']) if isinstance(d, dict) and 'data' in d else 0.0
def parse_cg_dom_btc(self, d): return float(d['data']['market_cap_percentage']['btc']) if isinstance(d, dict) and 'data' in d else 0.0
def parse_cg_dom_eth(self, d): return float(d['data']['market_cap_percentage']['eth']) if isinstance(d, dict) and 'data' in d else 0.0
async def fetch_indicator(self, session, ind: Indicator, target_date: datetime = None) -> Tuple[int, str, float, bool]:
if target_date and ind.historical != HistoricalSupport.CURRENT:
url = self._build_hist_url(ind, target_date)
else:
url = self._fred_url(ind.url) if ind.source == "fred" else ind.url
if url is None: return (ind.id, ind.name, 0.0, False)
data = await self._fetch(session, url)
if data is None: return (ind.id, ind.name, 0.0, False)
parser = getattr(self, ind.parser, None)
if parser is None: return (ind.id, ind.name, 0.0, False)
try:
value = parser(data)
return (ind.id, ind.name, value, value != 0.0 or 'imbal' in ind.name)
except: return (ind.id, ind.name, 0.0, False)
async def fetch_all(self, target_date: datetime = None) -> Dict[str, Any]:
connector = aiohttp.TCPConnector(limit=self.config.max_concurrent)
async with aiohttp.ClientSession(connector=connector) as session:
results = await asyncio.gather(*[self.fetch_indicator(session, ind, target_date) for ind in INDICATORS])
matrix = np.zeros(N_INDICATORS)
success = 0
details = {}
for idx, name, value, ok in results:
matrix[idx - 1] = value
if ok: success += 1
details[idx] = {'name': name, 'value': value, 'success': ok}
return {'matrix': matrix, 'timestamp': (target_date or datetime.now(timezone.utc)).isoformat(), 'success_count': success, 'total': N_INDICATORS, 'details': details}
def fetch_sync(self, target_date: datetime = None) -> Dict[str, Any]:
return asyncio.run(self.fetch_all(target_date))
class ExternalFactorsMatrix:
"""DOLPHIN interface with BACKFILL. Usage: efm.update() or efm.update(datetime(2024,6,15))"""
def __init__(self, config: Config = None):
self.config = config or Config()
self.fetcher = ExternalFactorsFetcher(self.config)
self.transformer = StationarityTransformer()
self.raw_matrix: Optional[np.ndarray] = None
self.stationary_matrix: Optional[np.ndarray] = None
self.last_result: Optional[Dict] = None
def update(self, target_date: datetime = None) -> np.ndarray:
self.last_result = self.fetcher.fetch_sync(target_date)
self.raw_matrix = self.last_result['matrix']
self.stationary_matrix = self.transformer.transform_matrix(self.raw_matrix)
return self.stationary_matrix
def update_raw(self, target_date: datetime = None) -> np.ndarray:
self.last_result = self.fetcher.fetch_sync(target_date)
self.raw_matrix = self.last_result['matrix']
return self.raw_matrix
def get_indicator_names(self) -> List[str]: return [i.name for i in INDICATORS]
def get_backfillable(self) -> List[Tuple[int, str, str]]:
return [(i.id, i.name, i.hist_resolution) for i in INDICATORS if i.historical in [HistoricalSupport.FULL, HistoricalSupport.PARTIAL]]
def get_current_only(self) -> List[Tuple[int, str]]:
return [(i.id, i.name) for i in INDICATORS if i.historical == HistoricalSupport.CURRENT]
def summary(self) -> str:
if not self.last_result: return "No data."
r = self.last_result
f = sum(1 for i in INDICATORS if i.historical == HistoricalSupport.FULL)
p = sum(1 for i in INDICATORS if i.historical == HistoricalSupport.PARTIAL)
c = sum(1 for i in INDICATORS if i.historical == HistoricalSupport.CURRENT)
return f"Success: {r['success_count']}/{r['total']} | Historical: FULL={f}, PARTIAL={p}, CURRENT={c}"
if __name__ == "__main__":
print(f"EXTERNAL FACTORS v5.0 - {N_INDICATORS} indicators with BACKFILL")
f = [i for i in INDICATORS if i.historical == HistoricalSupport.FULL]
p = [i for i in INDICATORS if i.historical == HistoricalSupport.PARTIAL]
c = [i for i in INDICATORS if i.historical == HistoricalSupport.CURRENT]
print(f"\nFULL: {len(f)} | PARTIAL: {len(p)} | CURRENT: {len(c)}")
print("\nFULL HISTORY indicators:")
for i in f: print(f" {i.id:2d}. {i.name:15s} [{i.hist_resolution:3s}] {i.source}")
print("\nCURRENT ONLY:")
for i in c: print(f" {i.id:2d}. {i.name:15s} - {i.description}")

View File

@@ -0,0 +1,266 @@
#!/usr/bin/env python3
"""
INDICATOR READER v1.0
=====================
Utility to read and analyze processed indicator .npz files.
Usage:
from indicator_reader import IndicatorReader
# Load single file
reader = IndicatorReader("scan_000027_193311__Indicators.npz")
print(reader.summary())
# Get DataFrames
scan_df = reader.scan_derived_df()
external_df = reader.external_df()
asset_df = reader.asset_df()
# Load directory
all_data = IndicatorReader.load_directory("./scans/")
"""
import numpy as np
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime
class IndicatorReader:
"""Reader for processed indicator .npz files"""
def __init__(self, path: str):
self.path = Path(path)
self._data = dict(np.load(path, allow_pickle=True))
@property
def scan_number(self) -> int:
return int(self._data['scan_number'][0])
@property
def timestamp(self) -> str:
return str(self._data['timestamp'][0])
@property
def processing_time(self) -> float:
return float(self._data['processing_time'][0])
@property
def n_assets(self) -> int:
return len(self._data['asset_symbols'])
@property
def asset_symbols(self) -> List[str]:
return list(self._data['asset_symbols'])
# =========================================================================
# SCAN-DERIVED (eigenvalue indicators from tracking_data/regime_signals)
# =========================================================================
@property
def scan_derived(self) -> np.ndarray:
"""Get scan-derived indicator array"""
return self._data['scan_derived']
@property
def scan_derived_names(self) -> List[str]:
return list(self._data['scan_derived_names'])
def scan_derived_df(self):
"""Get scan-derived as pandas DataFrame"""
import pandas as pd
return pd.DataFrame({
'name': self.scan_derived_names,
'value': self.scan_derived
})
def get_scan_indicator(self, name: str) -> float:
"""Get specific scan-derived indicator by name"""
names = self.scan_derived_names
if name in names:
return float(self.scan_derived[names.index(name)])
raise KeyError(f"Unknown scan indicator: {name}")
# =========================================================================
# EXTERNAL (API-fetched indicators)
# =========================================================================
@property
def external(self) -> np.ndarray:
"""Get external indicator array (85 values, NaN for skipped)"""
return self._data['external']
@property
def external_success(self) -> np.ndarray:
"""Get success flags for external indicators"""
return self._data['external_success']
def external_df(self):
"""Get external indicators as pandas DataFrame"""
import pandas as pd
# Indicator names (would need to import from external_factors_matrix)
names = [f"ext_{i+1}" for i in range(85)]
return pd.DataFrame({
'id': range(1, 86),
'value': self.external,
'success': self.external_success
})
@property
def external_success_rate(self) -> float:
"""Percentage of external indicators successfully fetched"""
valid = ~np.isnan(self.external)
if valid.sum() == 0:
return 0.0
return float(self.external_success[valid].mean())
# =========================================================================
# PER-ASSET
# =========================================================================
@property
def asset_matrix(self) -> np.ndarray:
"""Get per-asset indicator matrix (n_assets x n_indicators)"""
return self._data['asset_matrix']
@property
def asset_indicator_names(self) -> List[str]:
return list(self._data['asset_indicator_names'])
def asset_df(self):
"""Get per-asset indicators as pandas DataFrame"""
import pandas as pd
return pd.DataFrame(
self.asset_matrix,
index=self.asset_symbols,
columns=self.asset_indicator_names
)
def get_asset(self, symbol: str) -> Dict[str, float]:
"""Get all indicators for a specific asset"""
symbols = self.asset_symbols
if symbol not in symbols:
raise KeyError(f"Unknown symbol: {symbol}")
idx = symbols.index(symbol)
return dict(zip(self.asset_indicator_names, self.asset_matrix[idx]))
def get_asset_indicator(self, symbol: str, indicator: str) -> float:
"""Get specific indicator for specific asset"""
asset = self.get_asset(symbol)
if indicator not in asset:
raise KeyError(f"Unknown indicator: {indicator}")
return asset[indicator]
# =========================================================================
# UTILITIES
# =========================================================================
def summary(self) -> str:
"""Get summary string"""
ext_valid = (~np.isnan(self.external)).sum()
ext_success = self.external_success.sum()
return f"""Indicator File: {self.path.name}
Scan: #{self.scan_number} @ {self.timestamp}
Processing: {self.processing_time:.2f}s
Scan-derived: {len(self.scan_derived)} indicators
lambda_max: {self.get_scan_indicator('lambda_max'):.4f}
coherence: {self.get_scan_indicator('market_coherence'):.4f}
instability: {self.get_scan_indicator('instability_score'):.4f}
External: {ext_success}/{ext_valid} successful ({self.external_success_rate*100:.1f}%)
Per-asset: {self.n_assets} assets × {len(self.asset_indicator_names)} indicators
"""
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
'scan_number': self.scan_number,
'timestamp': self.timestamp,
'processing_time': self.processing_time,
'scan_derived': dict(zip(self.scan_derived_names, self.scan_derived.tolist())),
'external': self.external.tolist(),
'external_success': self.external_success.tolist(),
'asset_symbols': self.asset_symbols,
'asset_matrix': self.asset_matrix.tolist(),
}
# =========================================================================
# CLASS METHODS
# =========================================================================
@classmethod
def load_directory(cls, directory: str, pattern: str = "*__Indicators.npz") -> List['IndicatorReader']:
"""Load all indicator files from directory"""
root = Path(directory)
files = sorted(root.rglob(pattern))
return [cls(str(f)) for f in files]
@classmethod
def to_timeseries(cls, readers: List['IndicatorReader']) -> Dict[str, np.ndarray]:
"""Convert list of readers to time series arrays"""
n = len(readers)
if n == 0:
return {}
# Get dimensions from first file
n_scan = len(readers[0].scan_derived)
n_ext = 85
n_assets = readers[0].n_assets
n_asset_ind = len(readers[0].asset_indicator_names)
# Allocate arrays
timestamps = []
scan_series = np.zeros((n, n_scan))
ext_series = np.zeros((n, n_ext))
for i, r in enumerate(readers):
timestamps.append(r.timestamp)
scan_series[i] = r.scan_derived
ext_series[i] = r.external
return {
'timestamps': np.array(timestamps, dtype='U32'),
'scan_derived': scan_series,
'external': ext_series,
'scan_names': readers[0].scan_derived_names,
}
# =============================================================================
# CLI
# =============================================================================
def main():
import argparse
parser = argparse.ArgumentParser(description="Indicator Reader")
parser.add_argument("path", help="Path to .npz file or directory")
parser.add_argument("-a", "--asset", help="Show specific asset")
parser.add_argument("-j", "--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
path = Path(args.path)
if path.is_file():
reader = IndicatorReader(str(path))
if args.json:
import json
print(json.dumps(reader.to_dict(), indent=2))
elif args.asset:
asset = reader.get_asset(args.asset)
for k, v in asset.items():
print(f" {k}: {v:.6f}")
else:
print(reader.summary())
elif path.is_dir():
readers = IndicatorReader.load_directory(str(path))
print(f"Found {len(readers)} indicator files")
if readers:
ts = IndicatorReader.to_timeseries(readers)
print(f"Time range: {ts['timestamps'][0]} to {ts['timestamps'][-1]}")
print(f"Scan-derived shape: {ts['scan_derived'].shape}")
print(f"External shape: {ts['external'].shape}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""
INDICATOR SOURCES v5.0 - API Reference with Historical Support
===============================================================
Documents all 85 indicators with their backfill capability.
"""
SOURCES = {
"binance": {"url": "fapi.binance.com / api.binance.com", "auth": "None", "limit": "1200/min", "history": "FULL (startTime/endTime)"},
"deribit": {"url": "deribit.com/api/v2/public", "auth": "None", "limit": "20/sec", "history": "FULL for DVOL/funding"},
"coinmetrics": {"url": "community-api.coinmetrics.io/v4", "auth": "None", "limit": "10/6sec", "history": "FULL (start_time/end_time)"},
"fred": {"url": "api.stlouisfed.org/fred", "auth": "Free key", "limit": "120/min", "history": "FULL (decades)"},
"defillama": {"url": "api.llama.fi", "auth": "None", "limit": "Generous", "history": "FULL for TVL/stables"},
"alternative": {"url": "api.alternative.me", "auth": "None", "limit": "Unlimited", "history": "FULL (limit=N param)"},
"blockchain": {"url": "blockchain.info", "auth": "None", "limit": "Generous", "history": "FULL via charts API"},
"mempool": {"url": "mempool.space/api", "auth": "None", "limit": "Generous", "history": "NONE (real-time only)"},
"coingecko": {"url": "api.coingecko.com/api/v3", "auth": "None (demo)", "limit": "30/min", "history": "FULL for prices"},
}
# Historical URL templates for backfill
HISTORICAL_ENDPOINTS = {
# BINANCE - All support startTime/endTime in milliseconds
"binance_funding": "https://fapi.binance.com/fapi/v1/fundingRate?symbol={SYMBOL}&startTime={start_ms}&endTime={end_ms}&limit=1000",
"binance_oi_hist": "https://fapi.binance.com/futures/data/openInterestHist?symbol={SYMBOL}&period=1h&startTime={start_ms}&endTime={end_ms}&limit=500",
"binance_ls_hist": "https://fapi.binance.com/futures/data/globalLongShortAccountRatio?symbol={SYMBOL}&period=1h&startTime={start_ms}&endTime={end_ms}&limit=500",
"binance_taker_hist": "https://fapi.binance.com/futures/data/takerlongshortRatio?symbol={SYMBOL}&period=1h&startTime={start_ms}&endTime={end_ms}&limit=500",
"binance_klines": "https://api.binance.com/api/v3/klines?symbol={SYMBOL}&interval=1d&startTime={start_ms}&endTime={end_ms}&limit=1",
# DERIBIT - Uses start_timestamp/end_timestamp in milliseconds
"deribit_dvol": "https://www.deribit.com/api/v2/public/get_volatility_index_data?currency={CURRENCY}&resolution=3600&start_timestamp={start_ms}&end_timestamp={end_ms}",
"deribit_funding_hist": "https://www.deribit.com/api/v2/public/get_funding_rate_history?instrument_name={INSTRUMENT}&start_timestamp={start_ms}&end_timestamp={end_ms}",
# COINMETRICS - Uses ISO date format
"coinmetrics": "https://community-api.coinmetrics.io/v4/timeseries/asset-metrics?assets={asset}&metrics={metric}&frequency=1d&start_time={date}T00:00:00Z&end_time={date}T23:59:59Z",
# FRED - Uses observation_start/observation_end in YYYY-MM-DD
"fred": "https://api.stlouisfed.org/fred/series/observations?series_id={series}&api_key={key}&file_type=json&observation_start={date}&observation_end={date}",
# DEFILLAMA - Returns full history, filter client-side
"defillama_tvl": "https://api.llama.fi/v2/historicalChainTvl", # Filter by date client-side
"defillama_tvl_chain": "https://api.llama.fi/v2/historicalChainTvl/{chain}",
"defillama_stables": "https://stablecoins.llama.fi/stablecoincharts/all?stablecoin={id}", # 1=USDT, 2=USDC
# BLOCKCHAIN.INFO - Uses start param in YYYY-MM-DD
"blockchain_charts": "https://api.blockchain.info/charts/{chart}?timespan=1days&start={date}&format=json",
# COINGECKO - Uses DD-MM-YYYY format
"coingecko_history": "https://api.coingecko.com/api/v3/coins/{id}/history?date={date_dmy}",
# ALTERNATIVE.ME - Returns N days of history
"fng_history": "https://api.alternative.me/fng/?limit=1000&date_format=us", # Filter client-side
}
HISTORICAL_SUPPORT = {
# FULL HISTORY (51 indicators)
"full": [
# Binance derivatives
(1, "funding_btc", "8h", "Funding rate history via startTime/endTime"),
(2, "funding_eth", "8h", "ETH funding"),
(3, "oi_btc", "1h", "Open interest history via openInterestHist endpoint"),
(4, "oi_eth", "1h", "ETH OI"),
(5, "ls_btc", "1h", "Long/short ratio history"),
(6, "ls_eth", "1h", "ETH L/S"),
(7, "ls_top", "1h", "Top trader L/S"),
(8, "taker", "1h", "Taker ratio history"),
# Deribit
(11, "dvol_btc", "1h", "DVOL via get_volatility_index_data"),
(12, "dvol_eth", "1h", "ETH DVOL"),
(17, "fund_dbt_btc", "8h", "Deribit funding via get_funding_rate_history"),
(18, "fund_dbt_eth", "8h", "ETH Deribit funding"),
# CoinMetrics (ALL have full history)
(19, "rcap_btc", "1d", "CoinMetrics: CapRealUSD"),
(20, "mvrv", "1d", "CoinMetrics: derived from CapMrktCurUSD/CapRealUSD"),
(21, "nupl", "1d", "CoinMetrics: derived"),
(22, "addr_btc", "1d", "CoinMetrics: AdrActCnt"),
(23, "addr_eth", "1d", "CoinMetrics: ETH AdrActCnt"),
(24, "txcnt", "1d", "CoinMetrics: TxCnt"),
(25, "fees_btc", "1d", "CoinMetrics: FeeTotUSD"),
(26, "fees_eth", "1d", "CoinMetrics: ETH FeeTotUSD"),
(27, "nvt", "1d", "CoinMetrics: NVTAdj"),
(28, "velocity", "1d", "CoinMetrics: VelCur1yr"),
(29, "sply_act", "1d", "CoinMetrics: SplyAct1yr"),
(30, "rcap_eth", "1d", "CoinMetrics: ETH CapRealUSD"),
# Blockchain.info charts
(31, "hashrate", "1d", "Blockchain.info: hash-rate chart"),
(32, "difficulty", "1d", "Blockchain.info: difficulty chart"),
(35, "tx_blk", "1d", "Blockchain.info: n-transactions-per-block chart"),
(36, "total_btc", "1d", "Blockchain.info: total-bitcoins chart"),
(37, "mcap_bc", "1d", "Blockchain.info: market-cap chart"),
# DeFi Llama
(43, "tvl", "1d", "DeFi Llama: historicalChainTvl (returns all, filter client-side)"),
(44, "tvl_eth", "1d", "DeFi Llama: ETH TVL"),
(45, "stables", "1d", "DeFi Llama: stablecoincharts"),
(46, "usdt", "1d", "DeFi Llama: stablecoin ID=1"),
(47, "usdc", "1d", "DeFi Llama: stablecoin ID=2"),
# FRED (ALL have decades of history)
(52, "dxy", "1d", "FRED: DTWEXBGS"),
(53, "us10y", "1d", "FRED: DGS10"),
(54, "us2y", "1d", "FRED: DGS2"),
(55, "ycurve", "1d", "FRED: T10Y2Y"),
(56, "vix", "1d", "FRED: VIXCLS"),
(57, "fedfunds", "1d", "FRED: DFF"),
(58, "m2", "1w", "FRED: WM2NS (weekly)"),
(59, "cpi", "1m", "FRED: CPIAUCSL (monthly)"),
(60, "sp500", "1d", "FRED: SP500"),
(61, "gold", "1d", "FRED: GOLDAMGBD228NLBM"),
(62, "hy_spread", "1d", "FRED: BAMLH0A0HYM2"),
(63, "be5y", "1d", "FRED: T5YIE"),
(64, "nfci", "1w", "FRED: NFCI (weekly)"),
(65, "claims", "1w", "FRED: ICSA (weekly)"),
# Alternative.me
(66, "fng", "1d", "Alternative.me: limit param returns history"),
(67, "fng_prev", "1d", ""),
(68, "fng_week", "1d", ""),
(69, "fng_vol", "1d", ""),
(70, "fng_mom", "1d", ""),
(71, "fng_soc", "1d", ""),
(72, "fng_dom", "1d", ""),
# CoinGecko
(81, "btc_price", "1d", "CoinGecko: /coins/{id}/history"),
(82, "eth_price", "1d", "CoinGecko: /coins/{id}/history"),
# Binance klines
(78, "vol24", "1d", "Binance: klines endpoint"),
],
# PARTIAL HISTORY (12 indicators)
"partial": [
(48, "dex_vol", "1d", "DeFi Llama: recent history in response"),
(49, "bridge", "1d", "DeFi Llama: bridgevolume endpoint"),
(51, "fees", "1d", "DeFi Llama: fees overview"),
(83, "mcap", "1d", "CoinGecko: market_cap_chart (limited)"),
],
# CURRENT ONLY (22 indicators)
"current": [
(9, "basis", "Binance premium index - real-time only"),
(10, "liq_proxy", "Derived from 24hr ticker - real-time"),
(13, "pcr_vol", "Deribit options summary - real-time"),
(14, "pcr_oi", "Deribit options OI - real-time"),
(15, "pcr_eth", "Deribit ETH options - real-time"),
(16, "opt_oi", "Deribit total options OI - real-time"),
(33, "blk_int", "Blockchain.info simple query - real-time"),
(34, "unconf", "Blockchain.info unconfirmed - real-time"),
(38, "mp_cnt", "Mempool.space - NO historical API"),
(39, "mp_mb", "Mempool.space - NO historical API"),
(40, "fee_fast", "Mempool.space - NO historical API"),
(41, "fee_med", "Mempool.space - NO historical API"),
(42, "fee_slow", "Mempool.space - NO historical API"),
(50, "yields", "DeFi Llama yields - real-time"),
(73, "imbal_btc", "Order book depth - real-time"),
(74, "imbal_eth", "Order book depth - real-time"),
(75, "spread", "Book ticker - real-time"),
(76, "chg24_btc", "24hr ticker - real-time"),
(77, "chg24_eth", "24hr ticker - real-time"),
(79, "dispersion", "Calculated from 24hr - real-time"),
(80, "correlation", "Calculated from 24hr - real-time"),
(84, "btc_dom", "CoinGecko global - real-time"),
(85, "eth_dom", "CoinGecko global - real-time"),
],
}
BACKFILL_NOTES = """
BACKFILL STRATEGY
=================
1. DAILY BACKFILL (Most indicators):
- CoinMetrics, FRED, DeFi Llama TVL, Blockchain.info charts
- Use: efm.update(datetime(2024, 6, 15))
2. HOURLY BACKFILL (Binance derivatives):
- OI, L/S ratio, taker ratio have 1h resolution
- Funding rate has 8h resolution
3. APIS RETURNING FULL HISTORY:
- DeFi Llama TVL: Returns ALL history, filter client-side by timestamp
- Alternative.me F&G: Use limit=1000 to get ~3 years of history
- Blockchain.info charts: Use start= param with date
4. MISSING HISTORICAL DATA:
- Mempool fees: Build your own collector
- Order book imbalance: Build your own collector
- Spreads: Build your own collector
5. RECOMMENDED APPROACH FOR TRAINING:
a) Backfill what's available (51 indicators with FULL history)
b) For CURRENT-only indicators, either:
- Accept NaN/0 for historical periods
- Build collectors to capture going forward
- Use proxy indicators (e.g., volatility proxy for mempool fees)
"""
if __name__ == "__main__":
print("INDICATOR SOURCES v5.0")
print("=" * 60)
print("\nData Sources:")
for src, info in SOURCES.items():
print(f" {src:12s}: {info['auth']:10s} | {info['limit']:12s} | {info['history']}")
print(f"\nHistorical Support:")
print(f" FULL: {len(HISTORICAL_SUPPORT['full'])} indicators")
print(f" PARTIAL: {len(HISTORICAL_SUPPORT['partial'])} indicators")
print(f" CURRENT: {len(HISTORICAL_SUPPORT['current'])} indicators")
print(BACKFILL_NOTES)

View File

@@ -0,0 +1,207 @@
"""
Meta-Adaptive ExF Optimizer
===========================
Runs nightly (or on-demand) to calculate dynamic lag configurations and
active indicator thresholds for the Adaptive Circuit Breaker (ACB).
Implementation of the "Meta-Adaptive" Blueprint:
1. Pulls up to the last 90 days of market returns and indicator values.
2. Runs lag hypothesis testing (0-7 days) on all tracked ExF indicators.
3. Uses strict Point-Biserial correlation (p < 0.05) against market stress (< -1% daily drop).
4. Persists the active, statistically verified JSON configuration for realtime_exf_service.py.
"""
import sys
import json
import time
import logging
import numpy as np
import pandas as pd
from pathlib import Path
from collections import defaultdict
import threading
from scipy import stats
from datetime import datetime, timezone
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
sys.path.insert(0, str(PROJECT_ROOT / 'nautilus_dolphin'))
try:
from realtime_exf_service import INDICATORS, OPTIMAL_LAGS
from dolphin_paper_trade_adaptive_cb_v2 import EIGENVALUES_BASE_PATH
from dolphin_vbt_real import load_all_data, run_full_backtest, STRATEGIES, INIT_CAPITAL
except ImportError:
pass
logger = logging.getLogger(__name__)
CONFIG_PATH = Path(__file__).parent / "meta_adaptive_config.json"
class MetaAdaptiveOptimizer:
def __init__(self, days_lookback=90, max_lags=6, p_value_gate=0.05):
self.days_lookback = days_lookback
self.max_lags = max_lags
self.p_value_gate = p_value_gate
self.indicators = list(INDICATORS.keys()) if 'INDICATORS' in globals() else []
self._lock = threading.Lock()
def _build_history_cache(self, dates, limit_days):
"""Build daily feature cache from NPZ files."""
logger.info(f"Building cache for last {limit_days} days...")
cache = {}
target_dates = dates[-limit_days:] if len(dates) > limit_days else dates
for date_str in target_dates:
date_path = EIGENVALUES_BASE_PATH / date_str
if not date_path.exists(): continue
npz_files = list(date_path.glob('scan_*__Indicators.npz'))
if not npz_files: continue
accum = defaultdict(list)
for f in npz_files:
try:
data = dict(np.load(f, allow_pickle=True))
names = [str(n) for n in data.get('api_names', [])]
vals = data.get('api_indicators', [])
succ = data.get('api_success', [])
for n, v, s in zip(names, vals, succ):
if s and not np.isnan(v):
accum[n].append(float(v))
except Exception:
pass
if accum:
cache[date_str] = {k: np.mean(v) for k, v in accum.items()}
return cache, target_dates
def _get_daily_returns(self, df, target_dates):
"""Derive daily returns proxy from the champion strategy logic."""
logger.info("Computing proxy returns for the time window...")
champion = STRATEGIES['champion_5x_f20']
returns = []
cap = INIT_CAPITAL
valid_dates = []
for d in target_dates:
day_df = df[df['date_str'] == d]
if len(day_df) < 200:
returns.append(np.nan)
valid_dates.append(d)
continue
res = run_full_backtest(day_df, champion, init_cash=cap, seed=42, verbose=False)
ret = (res['capital'] - cap) / cap
returns.append(ret)
cap = res['capital']
valid_dates.append(d)
return np.array(returns), valid_dates
def run_optimization(self) -> dict:
"""Run the full meta-adaptive optimization routine and return new config."""
with self._lock:
logger.info("Starting META-ADAPTIVE optimization loop.")
t0 = time.time()
df = load_all_data()
if 'date_str' not in df.columns:
df['date_str'] = df['timestamp'].dt.date.astype(str)
all_dates = sorted(df['date_str'].unique())
cache, target_dates = self._build_history_cache(all_dates, self.days_lookback + self.max_lags)
daily_returns, target_dates = self._get_daily_returns(df, target_dates)
# Predict market stress dropping by more than 1%
stress_arr = (daily_returns < -0.01).astype(float)
candidate_lags = {}
active_thresholds = {}
candidate_count = 0
for key in self.indicators:
ind_arr = np.array([cache.get(d, {}).get(key, np.nan) for d in target_dates])
corrs = []; pvals = []; sc_corrs = []
for lag in range(self.max_lags + 1):
if lag == 0: x, y, y_stress = ind_arr, daily_returns, stress_arr
else: x, y, y_stress = ind_arr[:-lag], daily_returns[lag:], stress_arr[lag:]
mask = ~np.isnan(x) & ~np.isnan(y)
if mask.sum() < 20: # Need at least 20 viable days
corrs.append(0); pvals.append(1); sc_corrs.append(0)
continue
# Pearson to price returns
r, p = stats.pearsonr(x[mask], y[mask])
corrs.append(r); pvals.append(p)
# Point-Biserial to stress events
# We capture the relation to binary stress to figure out threshold direction
if y_stress[mask].sum() > 2: # At least a few stress days required
sc = stats.pointbiserialr(y_stress[mask], x[mask])[0]
else:
sc = 0
sc_corrs.append(sc)
if not corrs: continue
# Find lag with highest correlation strength
best_lag = int(np.argmax(np.abs(corrs)))
best_p = pvals[best_lag]
# Check gate
if best_p <= self.p_value_gate:
direction = ">" if sc_corrs[best_lag] > 0 else "<"
# Compute a stress threshold logic (e.g. 15th / 85th percentile of historical)
valid_vals = ind_arr[~np.isnan(ind_arr)]
thresh = np.percentile(valid_vals, 85 if direction == '>' else 15)
candidate_lags[key] = best_lag
active_thresholds[key] = {
'threshold': float(thresh),
'direction': direction,
'p_value': float(best_p),
'r_value': float(corrs[best_lag])
}
candidate_count += 1
# Fallback checks mapping to V4 baseline if things drift too far
logger.info(f"Optimization complete ({time.time() - t0:.1f}s). {candidate_count} indicators passed P < {self.p_value_gate}.")
output_config = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'days_lookback': self.days_lookback,
'lags': candidate_lags,
'thresholds': active_thresholds
}
# Atomic save
temp_path = CONFIG_PATH.with_suffix('.tmp')
with open(temp_path, 'w', encoding='utf-8') as f:
json.dump(output_config, f, indent=2)
temp_path.replace(CONFIG_PATH)
return output_config
def get_current_meta_config() -> dict:
"""Read the latest meta-adaptive config, or return empty/default dict."""
if not CONFIG_PATH.exists():
return {}
try:
with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to read meta-adaptive config: {e}")
return {}
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
optimizer = MetaAdaptiveOptimizer(days_lookback=90)
config = optimizer.run_optimization()
print(f"\nSaved config to: {CONFIG_PATH}")
for k, v in config['lags'].items():
print(f" {k}: lag={v} days, dir={config['thresholds'][k]['direction']} thresh={config['thresholds'][k]['threshold']:.4g}")

View File

@@ -0,0 +1,228 @@
import asyncio
import aiohttp
import json
import time
import logging
import numpy as np
from typing import Dict, List, Optional
from collections import defaultdict
# Setup basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s')
logger = logging.getLogger("OBStreamService")
try:
import websockets
except ImportError:
logger.warning("websockets package not found. Run pip install websockets aiohttp")
class OBStreamService:
"""
Real-Time Order Book Streamer for Binance Futures.
Connects via WebSockets to maintain a perfectly synchronized local L2 Book,
and slices the book into 5% notional depth buckets dynamically for the
SmartPlacer and OBFeatureEngine layers.
"""
def __init__(self, assets: List[str], max_depth_pct: int = 5):
self.assets = [a.upper() for a in assets]
self.streams = [f"{a.lower()}@depth@100ms" for a in self.assets]
self.max_depth_pct = max_depth_pct
# In-memory Order Book caches (Price -> Quantity)
self.bids: Dict[str, Dict[float, float]] = {a: {} for a in self.assets}
self.asks: Dict[str, Dict[float, float]] = {a: {} for a in self.assets}
# Synchronization mechanisms
self.last_update_id: Dict[str, int] = {a: 0 for a in self.assets}
self.buffer: Dict[str, List[dict]] = {a: [] for a in self.assets}
self.initialized: Dict[str, bool] = {a: False for a in self.assets}
# Optional: Lock for thread-safe reads if requested asynchronously
self.locks: Dict[str, asyncio.Lock] = {a: asyncio.Lock() for a in self.assets}
async def fetch_snapshot(self, asset: str):
"""Fetch REST snapshot of the Order Book to initialize local state."""
url = f"https://fapi.binance.com/fapi/v1/depth?symbol={asset}&limit=1000"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
if 'lastUpdateId' not in data:
logger.error(f"Failed to fetch snapshot for {asset}: {data}")
return
last_id = data['lastUpdateId']
async with self.locks[asset]:
self.bids[asset] = {float(p): float(q) for p, q in data['bids']}
self.asks[asset] = {float(p): float(q) for p, q in data['asks']}
self.last_update_id[asset] = last_id
# Apply any buffered updates
buffered = self.buffer[asset]
for event in buffered:
if event['u'] <= last_id:
continue # Ignore old events
self._apply_event(asset, event)
self.buffer[asset].clear()
self.initialized[asset] = True
logger.info(f"Synchronized L2 book for {asset} (UpdateId: {last_id})")
except Exception as e:
logger.error(f"Error initializing snapshot for {asset}: {e}")
def _apply_event(self, asset: str, event: dict):
"""Apply a streaming diff event to the local book."""
bids = self.bids[asset]
asks = self.asks[asset]
# Process Bids
for p_str, q_str in event['b']:
p, q = float(p_str), float(q_str)
if q == 0.0:
bids.pop(p, None)
else:
bids[p] = q
# Process Asks
for p_str, q_str in event['a']:
p, q = float(p_str), float(q_str)
if q == 0.0:
asks.pop(p, None)
else:
asks[p] = q
self.last_update_id[asset] = event['u']
async def stream(self):
"""Main loop: connect to WebSocket streams and maintain books."""
import websockets
# 1. Fire off REST snapshot initialization concurrently
for a in self.assets:
asyncio.create_task(self.fetch_snapshot(a))
# 2. Start WebSocket listening instantly to buffer diffs
stream_url = "wss://fstream.binance.com/stream?streams=" + "/".join(self.streams)
logger.info(f"Connecting to Binance Stream: {stream_url}")
while True:
try:
async with websockets.connect(stream_url, ping_interval=20, ping_timeout=20) as ws:
logger.info("WebSocket connected. Streaming depth diffs...")
while True:
msg = await ws.recv()
data = json.loads(msg)
if 'data' in data:
ev = data['data']
asset = ev['s'].upper()
async with self.locks[asset]:
if not self.initialized[asset]:
self.buffer[asset].append(ev)
else:
self._apply_event(asset, ev)
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"WebSocket closed ({e}). Reconnecting in 3s...")
# Require re-init on disconnect to prevent drifted states
for a in self.assets:
self.initialized[a] = False
asyncio.create_task(self.fetch_snapshot(a))
await asyncio.sleep(3)
except Exception as e:
logger.error(f"Stream error: {e}")
await asyncio.sleep(3)
async def get_depth_buckets(self, asset: str) -> Optional[dict]:
"""
Extract the Notional Depth vectors matching OBSnapshot.
Creates 5 elements summing USD depth between 0-1%, 1-2%, ..., 4-5% from mid.
"""
async with self.locks[asset]:
if not self.initialized[asset]:
return None
# Extract and sort bids (descending) & asks (ascending)
bids = sorted(self.bids[asset].items(), key=lambda x: -x[0])
asks = sorted(self.asks[asset].items(), key=lambda x: x[0])
if not bids or not asks:
return None
best_bid = bids[0][0]
best_ask = asks[0][0]
mid = (best_bid + best_ask) / 2.0
bid_not = np.zeros(self.max_depth_pct, dtype=np.float64)
ask_not = np.zeros(self.max_depth_pct, dtype=np.float64)
bid_dep = np.zeros(self.max_depth_pct, dtype=np.float64)
ask_dep = np.zeros(self.max_depth_pct, dtype=np.float64)
# Bin bids into percentages
for p, q in bids:
dist_pct = (mid - p) / mid * 100
idx = int(dist_pct)
if idx < self.max_depth_pct:
bid_not[idx] += p * q
bid_dep[idx] += q
else: # Since sorted, if we exceed max distance, we can safely break
break
# Bin asks into percentages
for p, q in asks:
dist_pct = (p - mid) / mid * 100
idx = int(dist_pct)
if idx < self.max_depth_pct:
ask_not[idx] += p * q
ask_dep[idx] += q
else:
break
return {
"timestamp": time.time(),
"asset": asset,
"bid_notional": bid_not,
"ask_notional": ask_not,
"bid_depth": bid_dep,
"ask_depth": ask_dep,
"best_bid": best_bid,
"best_ask": best_ask,
"spread_bps": (best_ask - best_bid) / mid * 10_000
}
# -----------------------------------------------------------------------------
# Standalone run/test hook
# -----------------------------------------------------------------------------
async def demo():
assets_to_track = ["BTCUSDT", "ETHUSDT", "SOLUSDT"]
service = OBStreamService(assets=assets_to_track)
# Run the streaming listener in the background
asyncio.create_task(service.stream())
await asyncio.sleep(4) # Let it initialize
for _ in range(3):
print("\n--- Current Real-Time OB Snapshots ---")
for asset in assets_to_track:
snap = await service.get_depth_buckets(asset)
if snap:
imb = (snap['bid_notional'][0] - snap['ask_notional'][0]) / (snap['bid_notional'][0] + snap['ask_notional'][0] + 1e-9)
b1 = snap['bid_notional'][0]
a1 = snap['ask_notional'][0]
print(f"{asset:10s} | Spread: {snap['spread_bps']:.2f} bps | 1% Bid: ${b1:,.0f} | 1% Ask: ${a1:,.0f} | 1% Imb: {imb:+.3f}")
else:
print(f"{asset:10s} | Waiting for init...")
await asyncio.sleep(2)
if __name__ == "__main__":
try:
asyncio.run(demo())
except KeyboardInterrupt:
print("OB Streamer shut down manually.")

View File

@@ -0,0 +1,886 @@
#!/usr/bin/env python3
"""
REAL-TIME EXTERNAL FACTORS SERVICE v1.0
========================================
Production-grade, HFT-optimized external factors service.
Key design decisions (empirically validated 2026-02-27, 54-day backtest):
- Per-indicator adaptive polling at native API resolution
- Uniform lag=1 day (ROBUST: +3.10% ROI, -2.02% DD, zero overfit risk)
- Binary gating (no confidence weighting - empirically validated)
- Never blocks consumer: get_indicators() returns cached data in <1ms
- Dual output: NPZ (legacy) + Arrow (new)
Empirical validation vs baseline (54-day backtest):
N: No ACB: ROI=+7.51%, DD=18.34%
A: Current (lag=0 daily avg): ROI=+9.33%, DD=12.04% <-- current production
L1: Uniform lag=1: ROI=+12.43%, DD=10.02% <-- THIS SERVICE DEFAULT
MO: Mixed optimal lags: ROI=+13.31%, DD=9.10% <-- experimental (needs 80+ days)
MS: Mixed + synth intra-day: ROI=+16.00%, DD=9.92% <-- future (needs VBT changes)
TODO (ordered by priority):
1. [CRITICAL] Re-validate lag=1 with 80+ days of data for statistical robustness
2. [HIGH] Fix the 50 dead indicators (see DEAD_INDICATORS below)
3. [HIGH] Test each repaired indicator isolated against ACB & alpha engine
4. [HIGH] Move from per-day ACB to intra-day continuous ACB once VBT supports it
5. [MED] Switch to per-indicator optimal lags once 80+ days available
6. [MED] Implement adaptive variance estimator for poll interval tuning
7. [MED] Add Arrow dual output (schema defined, writer implemented)
8. [LOW] FRED indicators: handle weekend/holiday gaps (fill-forward last value)
9. [LOW] CoinMetrics indicators: fix parse_cm returning 0 (API may need auth)
10.[LOW] Tune system sync to never generate signals with stale/missing data
"""
import asyncio
import aiohttp
import numpy as np
import time
import logging
import json
from pathlib import Path
from datetime import datetime, timezone
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Any
from collections import deque, defaultdict
from enum import Enum
import threading
logger = logging.getLogger(__name__)
# =====================================================================
# INDICATOR METADATA (from empirical analysis)
# =====================================================================
@dataclass
class IndicatorMeta:
"""Per-indicator configuration derived from empirical testing."""
name: str
source: str # API provider
url: str # Real-time endpoint
parser: str # Parser method name
poll_interval_s: float # Native update rate (seconds)
optimal_lag_days: int # Information discount lag (empirically measured)
lag_correlation: float # Pearson r at optimal lag
lag_pvalue: float # Statistical significance
acb_critical: bool # Used by ACB v2/v3
category: str # derivatives/onchain/macro/etc
# Empirically measured optimal lags (from lag_correlation_analysis):
# dvol_btc: lag=1, r=-0.4919, p=0.0002 (strongest)
# taker: lag=1, r=-0.4105, p=0.0034
# dvol_eth: lag=1, r=-0.4246, p=0.0015
# funding_btc: lag=5, r=+0.3892, p=0.0057 (slow propagation)
# ls_btc: lag=0, r=+0.2970, p=0.0362 (immediate)
# funding_eth: lag=3, r=+0.2026, p=0.1539 (not significant)
# vix: lag=1, r=-0.2044, p=0.2700 (not significant)
# fng: lag=5, r=-0.1923, p=0.1856 (not significant)
INDICATORS = {
# BINANCE DERIVATIVES (rate limit: 1200/min)
'funding_btc': IndicatorMeta('funding_btc', 'binance',
'https://fapi.binance.com/fapi/v1/fundingRate?symbol=BTCUSDT&limit=1',
'parse_binance_funding', 28800, 5, 0.3892, 0.0057, True, 'derivatives'),
'funding_eth': IndicatorMeta('funding_eth', 'binance',
'https://fapi.binance.com/fapi/v1/fundingRate?symbol=ETHUSDT&limit=1',
'parse_binance_funding', 28800, 3, 0.2026, 0.1539, True, 'derivatives'),
'oi_btc': IndicatorMeta('oi_btc', 'binance',
'https://fapi.binance.com/fapi/v1/openInterest?symbol=BTCUSDT',
'parse_binance_oi', 300, 0, 0, 1.0, False, 'derivatives'),
'oi_eth': IndicatorMeta('oi_eth', 'binance',
'https://fapi.binance.com/fapi/v1/openInterest?symbol=ETHUSDT',
'parse_binance_oi', 300, 0, 0, 1.0, False, 'derivatives'),
'ls_btc': IndicatorMeta('ls_btc', 'binance',
'https://fapi.binance.com/futures/data/globalLongShortAccountRatio?symbol=BTCUSDT&period=5m&limit=1',
'parse_binance_ls', 300, 0, 0.2970, 0.0362, True, 'derivatives'),
'ls_eth': IndicatorMeta('ls_eth', 'binance',
'https://fapi.binance.com/futures/data/globalLongShortAccountRatio?symbol=ETHUSDT&period=5m&limit=1',
'parse_binance_ls', 300, 0, 0, 1.0, False, 'derivatives'),
'ls_top': IndicatorMeta('ls_top', 'binance',
'https://fapi.binance.com/futures/data/topLongShortAccountRatio?symbol=BTCUSDT&period=5m&limit=1',
'parse_binance_ls', 300, 0, 0, 1.0, False, 'derivatives'),
'taker': IndicatorMeta('taker', 'binance',
'https://fapi.binance.com/futures/data/takerlongshortRatio?symbol=BTCUSDT&period=5m&limit=1',
'parse_binance_taker', 300, 1, -0.4105, 0.0034, True, 'derivatives'),
'basis': IndicatorMeta('basis', 'binance',
'https://fapi.binance.com/fapi/v1/premiumIndex?symbol=BTCUSDT',
'parse_binance_basis', 30, 0, 0, 1.0, False, 'derivatives'),
# DERIBIT (rate limit: 100/10s)
'dvol_btc': IndicatorMeta('dvol_btc', 'deribit',
'https://www.deribit.com/api/v2/public/get_volatility_index_data?currency=BTC&resolution=3600&count=1',
'parse_deribit_dvol', 60, 1, -0.4919, 0.0002, True, 'derivatives'),
'dvol_eth': IndicatorMeta('dvol_eth', 'deribit',
'https://www.deribit.com/api/v2/public/get_volatility_index_data?currency=ETH&resolution=3600&count=1',
'parse_deribit_dvol', 60, 1, -0.4246, 0.0015, True, 'derivatives'),
'fund_dbt_btc': IndicatorMeta('fund_dbt_btc', 'deribit',
'https://www.deribit.com/api/v2/public/get_funding_rate_value?instrument_name=BTC-PERPETUAL',
'parse_deribit_fund', 28800, 0, 0, 1.0, False, 'derivatives'),
'fund_dbt_eth': IndicatorMeta('fund_dbt_eth', 'deribit',
'https://www.deribit.com/api/v2/public/get_funding_rate_value?instrument_name=ETH-PERPETUAL',
'parse_deribit_fund', 28800, 0, 0, 1.0, False, 'derivatives'),
# MACRO (FRED, rate limit: 120/min)
'vix': IndicatorMeta('vix', 'fred', 'VIXCLS', 'parse_fred', 21600, 1, -0.2044, 0.27, True, 'macro'),
'dxy': IndicatorMeta('dxy', 'fred', 'DTWEXBGS', 'parse_fred', 21600, 0, 0, 1.0, False, 'macro'),
'us10y': IndicatorMeta('us10y', 'fred', 'DGS10', 'parse_fred', 21600, 0, 0, 1.0, False, 'macro'),
'sp500': IndicatorMeta('sp500', 'fred', 'SP500', 'parse_fred', 21600, 0, 0, 1.0, False, 'macro'),
'fedfunds': IndicatorMeta('fedfunds', 'fred', 'DFF', 'parse_fred', 86400, 0, 0, 1.0, False, 'macro'),
# SENTIMENT
'fng': IndicatorMeta('fng', 'alternative', 'https://api.alternative.me/fng/?limit=1',
'parse_fng', 21600, 5, -0.1923, 0.1856, True, 'sentiment'),
# ON-CHAIN (blockchain.info)
'hashrate': IndicatorMeta('hashrate', 'blockchain', 'https://blockchain.info/q/hashrate',
'parse_bc', 1800, 0, 0, 1.0, False, 'onchain'),
# DEFI (DeFi Llama)
'tvl': IndicatorMeta('tvl', 'defillama', 'https://api.llama.fi/v2/historicalChainTvl',
'parse_dl_tvl', 21600, 0, 0, 1.0, False, 'defi'),
}
# Rate limits per provider (requests per second)
RATE_LIMITS = {
'binance': 20.0, # 1200/min
'deribit': 10.0, # 100/10s
'fred': 2.0, # 120/min
'alternative': 0.5,
'blockchain': 0.5,
'defillama': 1.0,
'coinmetrics': 0.15, # 10/min
}
# =====================================================================
# INDICATOR STATE
# =====================================================================
@dataclass
class IndicatorState:
"""Live state for a single indicator."""
value: float = np.nan
fetched_at: float = 0.0 # monotonic time
fetched_utc: Optional[datetime] = None
success: bool = False
error: str = ""
fetch_count: int = 0
fail_count: int = 0
# History buffer for lag support
daily_history: deque = field(default_factory=lambda: deque(maxlen=10))
# =====================================================================
# PARSERS (same as external_factors_matrix.py, inlined for independence)
# =====================================================================
class Parsers:
@staticmethod
def parse_binance_funding(d):
return float(d[0]['fundingRate']) if isinstance(d, list) and d else 0.0
@staticmethod
def parse_binance_oi(d):
if isinstance(d, list) and d: return float(d[-1].get('sumOpenInterest', 0))
return float(d.get('openInterest', 0)) if isinstance(d, dict) else 0.0
@staticmethod
def parse_binance_ls(d):
return float(d[-1]['longShortRatio']) if isinstance(d, list) and d else 1.0
@staticmethod
def parse_binance_taker(d):
return float(d[-1]['buySellRatio']) if isinstance(d, list) and d else 1.0
@staticmethod
def parse_binance_basis(d):
return float(d.get('lastFundingRate', 0)) * 365 * 3 if isinstance(d, dict) else 0.0
@staticmethod
def parse_deribit_dvol(d):
if isinstance(d, dict) and 'result' in d:
r = d['result']
if isinstance(r, dict) and 'data' in r and r['data']:
return float(r['data'][-1][4]) if len(r['data'][-1]) > 4 else 0.0
return 0.0
@staticmethod
def parse_deribit_fund(d):
if isinstance(d, dict) and 'result' in d:
r = d['result']
return float(r[-1].get('interest_8h', 0)) if isinstance(r, list) and r else float(r)
return 0.0
@staticmethod
def parse_fred(d):
if isinstance(d, dict) and 'observations' in d and d['observations']:
v = d['observations'][-1].get('value', '.')
if v != '.':
try: return float(v)
except: pass
return 0.0
@staticmethod
def parse_fng(d):
return float(d['data'][0]['value']) if isinstance(d, dict) and 'data' in d and d['data'] else 50.0
@staticmethod
def parse_bc(d):
if isinstance(d, (int, float)): return float(d)
if isinstance(d, str):
try: return float(d)
except: pass
if isinstance(d, dict) and 'values' in d and d['values']:
return float(d['values'][-1].get('y', 0))
return 0.0
@staticmethod
def parse_dl_tvl(d):
if isinstance(d, list) and d:
return float(d[-1].get('tvl', 0))
return 0.0
# =====================================================================
# REAL-TIME SERVICE
# =====================================================================
class RealTimeExFService:
"""
Singleton real-time external factors service.
Design principles:
- Never blocks: get_indicators() is pure memory read
- Background asyncio loop fetches on per-indicator timers
- Per-provider rate limiting via semaphores
- History buffer per indicator for lag support
- Thread-safe via lock on state dict
"""
def __init__(self, fred_api_key: str = ""):
self.fred_api_key = fred_api_key or 'c16a9cde3e3bb5bb972bb9283485f202'
self.state: Dict[str, IndicatorState] = {
name: IndicatorState() for name in INDICATORS
}
self._lock = threading.Lock()
self._running = False
self._loop = None
self._thread = None
self._semaphores: Dict[str, asyncio.Semaphore] = {}
self._session: Optional[aiohttp.ClientSession] = None
self._current_date: str = "" # for daily history rotation
# ----- Consumer API (never blocks, <1ms) -----
def get_indicators(self, apply_lag: bool = True) -> Dict[str, Any]:
"""
Get current indicator values with optional lag application.
Returns dict compatible with calculate_adaptive_cut_v2/v3:
{'funding_btc': float, 'dvol_btc': float, ...}
Plus metadata:
{'_staleness': {name: seconds}, '_fetched_at': {name: iso}}
"""
with self._lock:
result = {}
staleness = {}
now = time.monotonic()
for name, meta in INDICATORS.items():
st = self.state[name]
if apply_lag and meta.optimal_lag_days > 0:
# Use lagged value from history
lag = meta.optimal_lag_days
hist = list(st.daily_history)
if len(hist) >= lag:
result[name] = hist[-lag] # lag days ago
# If not enough history, use current (better than nothing)
elif st.success:
result[name] = st.value
else:
if st.success and not np.isnan(st.value):
result[name] = st.value
if st.fetched_at > 0:
staleness[name] = now - st.fetched_at
result['_staleness'] = staleness
return result
def get_acb_indicators(self) -> Dict[str, float]:
"""Get only the ACB-critical indicators (with lags applied)."""
full = self.get_indicators(apply_lag=True)
return {k: v for k, v in full.items()
if k in ('funding_btc', 'funding_eth', 'dvol_btc', 'dvol_eth',
'fng', 'vix', 'ls_btc', 'taker',
'mcap_bc', 'fund_dbt_btc', 'oi_btc', 'fund_dbt_eth', 'addr_btc')
and isinstance(v, (int, float))}
# ----- Background fetching -----
async def _fetch_url(self, url: str, source: str) -> Optional[Any]:
"""Fetch URL with rate limiting and error handling."""
sem = self._semaphores.get(source)
if sem:
await sem.acquire()
try:
return await self._do_fetch(url)
finally:
sem.release()
# Enforce rate limit delay
delay = 1.0 / RATE_LIMITS.get(source, 1.0)
await asyncio.sleep(delay)
return await self._do_fetch(url)
async def _do_fetch(self, url: str) -> Optional[Any]:
"""Raw HTTP fetch."""
if not self._session:
return None
try:
timeout = aiohttp.ClientTimeout(total=10)
headers = {"User-Agent": "Mozilla/5.0"}
async with self._session.get(url, timeout=timeout, headers=headers) as r:
if r.status == 200:
ct = r.headers.get('Content-Type', '')
if 'json' in ct:
return await r.json()
text = await r.text()
try: return json.loads(text)
except: return text
else:
logger.warning(f"HTTP {r.status} for {url[:60]}")
except asyncio.TimeoutError:
logger.debug(f"Timeout: {url[:60]}")
except Exception as e:
logger.debug(f"Fetch error: {e}")
return None
def _build_fred_url(self, series_id: str) -> str:
return (f"https://api.stlouisfed.org/fred/series/observations?"
f"series_id={series_id}&api_key={self.fred_api_key}"
f"&file_type=json&sort_order=desc&limit=1")
async def _fetch_indicator(self, name: str, meta: IndicatorMeta):
"""Fetch and parse a single indicator."""
# Build URL
if meta.source == 'fred':
url = self._build_fred_url(meta.url)
else:
url = meta.url
# Fetch
data = await self._fetch_url(url, meta.source)
if data is None:
with self._lock:
self.state[name].fail_count += 1
self.state[name].error = "fetch_failed"
return
# Parse
parser = getattr(Parsers, meta.parser, None)
if parser is None:
logger.error(f"No parser: {meta.parser}")
return
try:
value = parser(data)
if value == 0.0 and 'imbal' not in name:
# Most parsers return 0.0 on failure
with self._lock:
self.state[name].fail_count += 1
self.state[name].error = "zero_value"
return
with self._lock:
self.state[name].value = value
self.state[name].success = True
self.state[name].fetched_at = time.monotonic()
self.state[name].fetched_utc = datetime.now(timezone.utc)
self.state[name].fetch_count += 1
self.state[name].error = ""
except Exception as e:
with self._lock:
self.state[name].fail_count += 1
self.state[name].error = str(e)
async def _indicator_loop(self, name: str, meta: IndicatorMeta):
"""Continuous poll loop for one indicator."""
while self._running:
try:
await self._fetch_indicator(name, meta)
except Exception as e:
logger.error(f"Loop error {name}: {e}")
await asyncio.sleep(meta.poll_interval_s)
async def _daily_rotation(self):
"""At midnight UTC, snapshot current values into daily history."""
while self._running:
now = datetime.now(timezone.utc)
date_str = now.strftime('%Y-%m-%d')
if date_str != self._current_date:
with self._lock:
for name, st in self.state.items():
if st.success and not np.isnan(st.value):
st.daily_history.append(st.value)
self._current_date = date_str
logger.info(f"Daily rotation: {date_str}")
await asyncio.sleep(60) # check every minute
async def _run(self):
"""Main async loop."""
connector = aiohttp.TCPConnector(limit=30, ttl_dns_cache=300)
self._session = aiohttp.ClientSession(connector=connector)
# Create rate limit semaphores
for source, rate in RATE_LIMITS.items():
max_concurrent = max(1, int(rate * 2))
self._semaphores[source] = asyncio.Semaphore(max_concurrent)
# Start per-indicator loops
tasks = []
for name, meta in INDICATORS.items():
tasks.append(asyncio.create_task(self._indicator_loop(name, meta)))
# Start daily rotation
tasks.append(asyncio.create_task(self._daily_rotation()))
logger.info(f"Started {len(INDICATORS)} indicator loops")
try:
await asyncio.gather(*tasks)
finally:
await self._session.close()
def start(self):
"""Start background thread with asyncio loop."""
if self._running:
return
self._running = True
def _thread_target():
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._loop.run_until_complete(self._run())
self._thread = threading.Thread(target=_thread_target, daemon=True)
self._thread.start()
logger.info("RealTimeExFService started")
def stop(self):
"""Stop the service."""
self._running = False
if self._thread:
self._thread.join(timeout=5)
logger.info("RealTimeExFService stopped")
def status(self) -> Dict[str, Any]:
"""Service health status."""
with self._lock:
total = len(self.state)
ok = sum(1 for s in self.state.values() if s.success)
acb_ok = sum(1 for name in ('funding_btc', 'funding_eth', 'dvol_btc',
'dvol_eth', 'fng', 'vix', 'ls_btc', 'taker')
if self.state.get(name, IndicatorState()).success)
return {
'indicators_ok': ok,
'indicators_total': total,
'acb_indicators_ok': acb_ok,
'acb_indicators_total': 8,
'details': {name: {'value': s.value, 'success': s.success,
'staleness_s': time.monotonic() - s.fetched_at if s.fetched_at > 0 else -1,
'fetches': s.fetch_count, 'fails': s.fail_count}
for name, s in self.state.items()},
}
# =====================================================================
# ACB v3 - LAG-AWARE (drop-in replacement for v2)
# =====================================================================
def calculate_adaptive_cut_v3(ext_factors: dict, config: dict = None) -> tuple:
"""
ACB v3: Same logic as v2 but expects lag-adjusted indicator values.
The lag adjustment happens in RealTimeExFService.get_acb_indicators().
This function is identical to v2 in logic - the innovation is in the
data pipeline feeding it lagged values.
For backtest: manually construct ext_factors with lagged values.
"""
from dolphin_paper_trade_adaptive_cb_v2 import ACBV2_CONFIG as DEFAULT_CONFIG
config = config or DEFAULT_CONFIG
if not ext_factors or not config.get('enabled', True):
return config.get('base_cut', 0.30), 0, 0, {'status': 'disabled'}
signals = 0
severity = 0
details = {}
# Signal 1: Funding (bearish confirmation)
funding_btc = ext_factors.get('funding_btc', 0)
if funding_btc < config['thresholds']['funding_btc_very_bearish']:
signals += 1; severity += 2
details['funding'] = f'{funding_btc:.6f} (very bearish)'
elif funding_btc < config['thresholds']['funding_btc_bearish']:
signals += 1; severity += 1
details['funding'] = f'{funding_btc:.6f} (bearish)'
else:
details['funding'] = f'{funding_btc:.6f} (neutral)'
# Signal 2: DVOL (volatility confirmation)
dvol_btc = ext_factors.get('dvol_btc', 50)
if dvol_btc > config['thresholds']['dvol_extreme']:
signals += 1; severity += 2
details['dvol'] = f'{dvol_btc:.1f} (extreme)'
elif dvol_btc > config['thresholds']['dvol_elevated']:
signals += 1; severity += 1
details['dvol'] = f'{dvol_btc:.1f} (elevated)'
else:
details['dvol'] = f'{dvol_btc:.1f} (normal)'
# Signal 3: FNG (only if confirmed by funding/DVOL)
fng = ext_factors.get('fng', 50)
funding_bearish = funding_btc < 0
dvol_elevated = dvol_btc > 55
if fng < config['thresholds']['fng_extreme_fear'] and (funding_bearish or dvol_elevated):
signals += 1; severity += 1
details['fng'] = f'{fng:.1f} (extreme fear, confirmed)'
elif fng < config['thresholds']['fng_fear'] and (funding_bearish or dvol_elevated):
signals += 0.5; severity += 0.5
details['fng'] = f'{fng:.1f} (fear, confirmed)'
else:
details['fng'] = f'{fng:.1f} (neutral or unconfirmed)'
# Signal 4: Taker ratio (strongest predictor)
taker = ext_factors.get('taker', 1.0)
if taker < config['thresholds']['taker_selling']:
signals += 1; severity += 2
details['taker'] = f'{taker:.3f} (heavy selling)'
elif taker < config['thresholds']['taker_mild_selling']:
signals += 0.5; severity += 1
details['taker'] = f'{taker:.3f} (mild selling)'
else:
details['taker'] = f'{taker:.3f} (neutral)'
# Cut calculation (identical to v2)
if signals >= 3 and severity >= 5:
cut = 0.75
elif signals >= 3:
cut = 0.65
elif signals >= 2 and severity >= 3:
cut = 0.55
elif signals >= 2:
cut = 0.45
elif signals >= 1:
cut = 0.30
else:
cut = 0.0
details['signals'] = signals
details['severity'] = severity
details['version'] = 'v3_lag_aware'
return cut, signals, severity, details
# =====================================================================
# ACB v4 - EXPANDED 10-INDICATOR ENGINE
# =====================================================================
# Empirically validated thresholds for new v4 indicators
ACB_V4_THRESHOLDS = {
'funding_eth': -3.105e-05,
'mcap_bc': 1.361e+12,
'fund_dbt_btc': -2.426e-06,
'oi_btc': 7.955e+04,
'fund_dbt_eth': -6.858e-06,
'addr_btc': 7.028e+05,
}
def calculate_adaptive_cut_v4(ext_factors: dict, config: dict = None) -> tuple:
"""
ACB v4: Expanded engine evaluating 10 empirically validated indicators.
Base cut threshold and math derived from 54-day exhaustive backtest
(+15.00% ROI, 6.68% DD).
"""
from dolphin_paper_trade_adaptive_cb_v2 import ACBV2_CONFIG as DEFAULT_CONFIG
config = config or DEFAULT_CONFIG
if not ext_factors or not config.get('enabled', True):
return config.get('base_cut', 0.30), 0, 0, {'status': 'disabled'}
# Use baseline logic for the core 4 signals
cut, signals, severity, details = calculate_adaptive_cut_v3(ext_factors, config)
# -------------------------------------------------------------
# META-ADAPTIVE OVERRIDE OR FALLBACK TO STATIC v4
# -------------------------------------------------------------
try:
from realtime_exf_service import _get_active_meta_thresholds
active_thresh = _get_active_meta_thresholds()
except Exception:
active_thresh = None
if active_thresh:
# Dynamic processing of strictly proved meta thresholds
details['version'] = 'v4_meta_adaptive'
for key, limits in active_thresh.items():
if key in ('funding_btc', 'dvol_btc', 'fng', 'taker'):
continue # Handled by v3
val = ext_factors.get(key, np.nan)
if np.isnan(val): continue
triggered = False
if limits['direction'] == '<' and val < limits['threshold']:
triggered = True
elif limits['direction'] == '>' and val > limits['threshold']:
triggered = True
if triggered:
signals += 0.5; severity += 1
details[key] = f"{val:.4g} (meta {limits['direction']} {limits['threshold']:.4g})"
else:
# Fallback 10-indicator engine statically verified on 2026-02-27
details['version'] = 'v4_expanded_static'
val = ext_factors.get('funding_eth', np.nan)
if not np.isnan(val) and val < ACB_V4_THRESHOLDS['funding_eth']:
signals += 0.5; severity += 1
details['funding_eth'] = f"{val:.6f} (< {ACB_V4_THRESHOLDS['funding_eth']})"
val = ext_factors.get('mcap_bc', np.nan)
if not np.isnan(val) and val < ACB_V4_THRESHOLDS['mcap_bc']:
signals += 0.5; severity += 1
details['mcap_bc'] = f"{val:.2e} (< {ACB_V4_THRESHOLDS['mcap_bc']:.2e})"
val = ext_factors.get('fund_dbt_btc', np.nan)
if not np.isnan(val) and val < ACB_V4_THRESHOLDS['fund_dbt_btc']:
signals += 0.5; severity += 1
details['fund_dbt_btc'] = f"{val:.2e} (< {ACB_V4_THRESHOLDS['fund_dbt_btc']:.2e})"
val = ext_factors.get('oi_btc', np.nan)
if not np.isnan(val) and val < ACB_V4_THRESHOLDS['oi_btc']:
signals += 0.5; severity += 1
details['oi_btc'] = f"{val:.1f} (< {ACB_V4_THRESHOLDS['oi_btc']:.1f})"
val = ext_factors.get('fund_dbt_eth', np.nan)
if not np.isnan(val) and val < ACB_V4_THRESHOLDS['fund_dbt_eth']:
signals += 0.5; severity += 1
details['fund_dbt_eth'] = f"{val:.2e} (< {ACB_V4_THRESHOLDS['fund_dbt_eth']:.2e})"
val = ext_factors.get('addr_btc', np.nan)
if not np.isnan(val) and val > ACB_V4_THRESHOLDS['addr_btc']:
signals += 0.5; severity += 1
details['addr_btc'] = f"{val:.1f} (> {ACB_V4_THRESHOLDS['addr_btc']:.1f})"
# Recalculate cut with updated signals and severity
if signals >= 3 and severity >= 5:
cut = 0.75
elif signals >= 3:
cut = 0.65
elif signals >= 2 and severity >= 3:
cut = 0.55
elif signals >= 2:
cut = 0.45
elif signals >= 1:
cut = 0.30
else:
cut = 0.0
details['total_signals_v4'] = signals
details['total_severity_v4'] = severity
return cut, signals, severity, details
# =====================================================================
# NPZ + ARROW DUAL WRITER
# =====================================================================
class DualWriter:
"""Write indicator data in both NPZ and Arrow formats."""
def __init__(self):
self._has_pyarrow = False
try:
import pyarrow as pa
self._pa = pa
self._has_pyarrow = True
except ImportError:
pass
def write(self, indicators: Dict[str, Any], scan_path: Path,
scan_number: int = 0):
"""Write both NPZ and Arrow files alongside the scan."""
# Remove metadata keys
clean = {k: v for k, v in indicators.items()
if not k.startswith('_') and isinstance(v, (int, float))}
# NPZ (legacy format)
self._write_npz(clean, scan_path, scan_number)
# Arrow (new format)
if self._has_pyarrow:
self._write_arrow(clean, scan_path, scan_number)
def _write_npz(self, indicators, scan_path, scan_number):
names = sorted(INDICATORS.keys())
api_indicators = np.array([indicators.get(n, np.nan) for n in names])
api_success = np.array([not np.isnan(indicators.get(n, np.nan)) for n in names])
api_names = np.array(names, dtype='U32')
out_path = scan_path.parent / f"{scan_path.stem}__Indicators.npz"
np.savez_compressed(out_path,
api_indicators=api_indicators,
api_success=api_success,
api_names=api_names,
api_success_rate=np.array([np.nanmean(api_success)]),
timestamp=np.array([datetime.now(timezone.utc).isoformat()], dtype='U64'),
scan_number=np.array([scan_number]),
)
def _write_arrow(self, indicators, scan_path, scan_number):
pa = self._pa
fields = [
pa.field('timestamp_ns', pa.int64()),
pa.field('scan_number', pa.int32()),
]
values = {
'timestamp_ns': [int(datetime.now(timezone.utc).timestamp() * 1e9)],
'scan_number': [scan_number],
}
for name in sorted(INDICATORS.keys()):
fields.append(pa.field(name, pa.float64()))
values[name] = [indicators.get(name, np.nan)]
schema = pa.schema(fields)
table = pa.table(values, schema=schema)
out_path = scan_path.parent / f"{scan_path.stem}__Indicators.arrow"
with pa.ipc.new_file(str(out_path), schema) as writer:
writer.write_table(table)
# =====================================================================
# CONVENIENCE: Load from NPZ with lag support (for backtesting)
# =====================================================================
# =====================================================================
# LAG CONFIGURATIONS
# =====================================================================
# ROBUST DEFAULT: Uniform lag=1 for all indicators.
# Validated: +3.10% ROI, -2.02% DD vs lag=0 (54-day backtest).
# Zero overfitting risk (no per-indicator optimization).
# Scientifically justified: "yesterday's indicators predict today's market"
ROBUST_LAGS = {
'funding_btc': 1,
'funding_eth': 1,
'dvol_btc': 1,
'dvol_eth': 1,
'fng': 1,
'vix': 1,
'ls_btc': 1,
'taker': 1,
}
# EXPERIMENTAL: Per-indicator optimal lags from correlation analysis.
# Validated: +3.98% ROI, -2.93% DD vs lag=0 (54-day backtest).
# WARNING: Overfitting risk at 6.8 days/parameter. Only 5/8 significant.
# DO NOT USE until 80+ days of data available for re-validation.
# TODO: Re-run lag_correlation_analysis with 80+ days, update if confirmed.
EXPERIMENTAL_LAGS = {
'funding_btc': 5, # r=+0.39, p=0.006 (slow propagation - 5 days!)
'funding_eth': 3, # r=+0.20, p=0.154 (NOT significant)
'dvol_btc': 1, # r=-0.49, p=0.0002 (STRONGEST - overnight digest)
'dvol_eth': 1, # r=-0.42, p=0.002
'fng': 5, # r=-0.19, p=0.186 (NOT significant)
'vix': 1, # r=-0.20, p=0.270 (NOT significant)
'ls_btc': 0, # r=+0.30, p=0.036 (immediate - only lag=0 indicator)
'taker': 1, # r=-0.41, p=0.003 (overnight digest)
}
# CONSERVATIVE: Only statistically verified strong deviations from lag=1 for core indicators.
# Currently identical to V3 ROBUST but with funding_btc=5 and ls_btc=0
CONSERVATIVE_LAGS = ROBUST_LAGS.copy()
CONSERVATIVE_LAGS.update({
'funding_btc': 5,
'ls_btc': 0,
})
# V4: Combines robust baseline with 6 new statically proven indicators
V4_LAGS = ROBUST_LAGS.copy()
V4_LAGS.update({
'funding_eth': 3,
'mcap_bc': 1,
'fund_dbt_btc': 0,
'oi_btc': 0,
'fund_dbt_eth': 1,
'addr_btc': 3,
})
# Active configuration - use V4 by default given superior empirical results (+15.00% ROI, 6.68% DD)
OPTIMAL_LAGS = V4_LAGS
# =====================================================================
# META-ADAPTIVE RUNTIME
# =====================================================================
def _get_active_lags() -> dict:
"""Return lags: dynamically from meta-layer if available, else fallback V4."""
try:
from meta_adaptive_optimizer import get_current_meta_config
meta = get_current_meta_config()
if meta and 'lags' in meta:
return meta['lags']
except Exception:
pass
return OPTIMAL_LAGS
def _get_active_meta_thresholds() -> dict:
"""Return thresholds: dynamically from meta-layer if available, else None."""
try:
from meta_adaptive_optimizer import get_current_meta_config
meta = get_current_meta_config()
if meta and 'thresholds' in meta:
return meta['thresholds']
except Exception:
pass
return None
# TODO: When switching to EXPERIMENTAL_LAGS, also update IndicatorMeta.optimal_lag_days
def load_external_factors_lagged(date_str: str, all_daily_vals: Dict[str, Dict],
sorted_dates: List[str]) -> dict:
"""
Load external factors with per-indicator optimal lag applied.
Dynamically respects the Meta-Adaptive Layer configuration.
Args:
date_str: Target date
all_daily_vals: {date_str: {indicator_name: value}} for all dates
sorted_dates: Chronologically sorted list of all dates
"""
if date_str not in sorted_dates:
return {}
idx = sorted_dates.index(date_str)
result = {}
active_lags = _get_active_lags()
for name, lag in active_lags.items():
src_idx = idx - lag
if src_idx >= 0:
src_date = sorted_dates[src_idx]
val = all_daily_vals.get(src_date, {}).get(name)
if val is not None:
result[name] = val
return result