#!/usr/bin/env python3 """ INDICATOR READER v1.0 ===================== Utility to read and analyze processed indicator .npz files. Usage: from indicator_reader import IndicatorReader # Load single file reader = IndicatorReader("scan_000027_193311__Indicators.npz") print(reader.summary()) # Get DataFrames scan_df = reader.scan_derived_df() external_df = reader.external_df() asset_df = reader.asset_df() # Load directory all_data = IndicatorReader.load_directory("./scans/") """ import numpy as np from pathlib import Path from typing import Dict, List, Optional, Any, Tuple from datetime import datetime class IndicatorReader: """Reader for processed indicator .npz files""" def __init__(self, path: str): self.path = Path(path) self._data = dict(np.load(path, allow_pickle=True)) @property def scan_number(self) -> int: return int(self._data['scan_number'][0]) @property def timestamp(self) -> str: return str(self._data['timestamp'][0]) @property def processing_time(self) -> float: return float(self._data['processing_time'][0]) @property def n_assets(self) -> int: return len(self._data['asset_symbols']) @property def asset_symbols(self) -> List[str]: return list(self._data['asset_symbols']) # ========================================================================= # SCAN-DERIVED (eigenvalue indicators from tracking_data/regime_signals) # ========================================================================= @property def scan_derived(self) -> np.ndarray: """Get scan-derived indicator array""" return self._data['scan_derived'] @property def scan_derived_names(self) -> List[str]: return list(self._data['scan_derived_names']) def scan_derived_df(self): """Get scan-derived as pandas DataFrame""" import pandas as pd return pd.DataFrame({ 'name': self.scan_derived_names, 'value': self.scan_derived }) def get_scan_indicator(self, name: str) -> float: """Get specific scan-derived indicator by name""" names = self.scan_derived_names if name in names: return float(self.scan_derived[names.index(name)]) raise KeyError(f"Unknown scan indicator: {name}") # ========================================================================= # EXTERNAL (API-fetched indicators) # ========================================================================= @property def external(self) -> np.ndarray: """Get external indicator array (85 values, NaN for skipped)""" return self._data['external'] @property def external_success(self) -> np.ndarray: """Get success flags for external indicators""" return self._data['external_success'] def external_df(self): """Get external indicators as pandas DataFrame""" import pandas as pd # Indicator names (would need to import from external_factors_matrix) names = [f"ext_{i+1}" for i in range(85)] return pd.DataFrame({ 'id': range(1, 86), 'value': self.external, 'success': self.external_success }) @property def external_success_rate(self) -> float: """Percentage of external indicators successfully fetched""" valid = ~np.isnan(self.external) if valid.sum() == 0: return 0.0 return float(self.external_success[valid].mean()) # ========================================================================= # PER-ASSET # ========================================================================= @property def asset_matrix(self) -> np.ndarray: """Get per-asset indicator matrix (n_assets x n_indicators)""" return self._data['asset_matrix'] @property def asset_indicator_names(self) -> List[str]: return list(self._data['asset_indicator_names']) def asset_df(self): """Get per-asset indicators as pandas DataFrame""" import pandas as pd return pd.DataFrame( self.asset_matrix, index=self.asset_symbols, columns=self.asset_indicator_names ) def get_asset(self, symbol: str) -> Dict[str, float]: """Get all indicators for a specific asset""" symbols = self.asset_symbols if symbol not in symbols: raise KeyError(f"Unknown symbol: {symbol}") idx = symbols.index(symbol) return dict(zip(self.asset_indicator_names, self.asset_matrix[idx])) def get_asset_indicator(self, symbol: str, indicator: str) -> float: """Get specific indicator for specific asset""" asset = self.get_asset(symbol) if indicator not in asset: raise KeyError(f"Unknown indicator: {indicator}") return asset[indicator] # ========================================================================= # UTILITIES # ========================================================================= def summary(self) -> str: """Get summary string""" ext_valid = (~np.isnan(self.external)).sum() ext_success = self.external_success.sum() return f"""Indicator File: {self.path.name} Scan: #{self.scan_number} @ {self.timestamp} Processing: {self.processing_time:.2f}s Scan-derived: {len(self.scan_derived)} indicators lambda_max: {self.get_scan_indicator('lambda_max'):.4f} coherence: {self.get_scan_indicator('market_coherence'):.4f} instability: {self.get_scan_indicator('instability_score'):.4f} External: {ext_success}/{ext_valid} successful ({self.external_success_rate*100:.1f}%) Per-asset: {self.n_assets} assets × {len(self.asset_indicator_names)} indicators """ def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return { 'scan_number': self.scan_number, 'timestamp': self.timestamp, 'processing_time': self.processing_time, 'scan_derived': dict(zip(self.scan_derived_names, self.scan_derived.tolist())), 'external': self.external.tolist(), 'external_success': self.external_success.tolist(), 'asset_symbols': self.asset_symbols, 'asset_matrix': self.asset_matrix.tolist(), } # ========================================================================= # CLASS METHODS # ========================================================================= @classmethod def load_directory(cls, directory: str, pattern: str = "*__Indicators.npz") -> List['IndicatorReader']: """Load all indicator files from directory""" root = Path(directory) files = sorted(root.rglob(pattern)) return [cls(str(f)) for f in files] @classmethod def to_timeseries(cls, readers: List['IndicatorReader']) -> Dict[str, np.ndarray]: """Convert list of readers to time series arrays""" n = len(readers) if n == 0: return {} # Get dimensions from first file n_scan = len(readers[0].scan_derived) n_ext = 85 n_assets = readers[0].n_assets n_asset_ind = len(readers[0].asset_indicator_names) # Allocate arrays timestamps = [] scan_series = np.zeros((n, n_scan)) ext_series = np.zeros((n, n_ext)) for i, r in enumerate(readers): timestamps.append(r.timestamp) scan_series[i] = r.scan_derived ext_series[i] = r.external return { 'timestamps': np.array(timestamps, dtype='U32'), 'scan_derived': scan_series, 'external': ext_series, 'scan_names': readers[0].scan_derived_names, } # ============================================================================= # CLI # ============================================================================= def main(): import argparse parser = argparse.ArgumentParser(description="Indicator Reader") parser.add_argument("path", help="Path to .npz file or directory") parser.add_argument("-a", "--asset", help="Show specific asset") parser.add_argument("-j", "--json", action="store_true", help="Output as JSON") args = parser.parse_args() path = Path(args.path) if path.is_file(): reader = IndicatorReader(str(path)) if args.json: import json print(json.dumps(reader.to_dict(), indent=2)) elif args.asset: asset = reader.get_asset(args.asset) for k, v in asset.items(): print(f" {k}: {v:.6f}") else: print(reader.summary()) elif path.is_dir(): readers = IndicatorReader.load_directory(str(path)) print(f"Found {len(readers)} indicator files") if readers: ts = IndicatorReader.to_timeseries(readers) print(f"Time range: {ts['timestamps'][0]} to {ts['timestamps'][-1]}") print(f"Scan-derived shape: {ts['scan_derived'].shape}") print(f"External shape: {ts['external'].shape}") if __name__ == "__main__": main()