"""VIOLET trade/slot comparison harness. This is the missing V3 comparison unit from the main spec: collapse shadow decisions into episode-sized runs, collapse raw trade rows into terminal trade episodes, and compare them without requiring live execution wiring. VIOLET-only. BLUE is untouched. """ from __future__ import annotations from collections import defaultdict from collections.abc import Iterable, Mapping from typing import Any, Optional from pydantic import Field from .domain import StrictModel, Symbol, typed def _coerce_int(value: Any, default: Optional[int] = None) -> Optional[int]: try: if value is None: return default out = int(value) return out if out >= 0 else default except (TypeError, ValueError): return default def _coerce_float(value: Any, default: Optional[float] = None) -> Optional[float]: try: if value is None: return default out = float(value) return out if out == out and out not in (float("inf"), float("-inf")) else default except (TypeError, ValueError): return default def _coerce_text(value: Any, default: str = "") -> str: if value is None: return default text = str(value).strip() return text if text else default def _row_asset(row: Mapping[str, Any]) -> str: return _coerce_text(row.get("asset") or row.get("symbol") or row.get("instrument")).upper() def _row_side(row: Mapping[str, Any]) -> str: return _coerce_text(row.get("side") or row.get("direction") or row.get("trade_side")).upper() def _row_scan_number(row: Mapping[str, Any]) -> Optional[int]: return _coerce_int(row.get("scan_number") or row.get("scan") or row.get("scan_idx")) def _row_ts_ms(row: Mapping[str, Any]) -> Optional[int]: candidates = ( row.get("ts"), row.get("ts_ms"), row.get("timestamp"), row.get("exit_ts"), row.get("entry_ts"), row.get("mono_ns"), ) for value in candidates: ts = _coerce_int(value) if ts is not None: return ts if value is None or value != row.get("mono_ns") else ts // 1_000_000 return None def _row_trade_id(row: Mapping[str, Any]) -> str: return _coerce_text( row.get("trade_id") or row.get("id") or row.get("slot_id") or row.get("episode_id"), ) class DecisionEpisode(StrictModel): asset: Symbol side: str = Field(min_length=1, max_length=16) first_scan_number: int = Field(ge=0) last_scan_number: int = Field(ge=0) first_ts_ms: int = Field(ge=0) last_ts_ms: int = Field(ge=0) row_count: int = Field(ge=1) actuated_count: int = Field(ge=0) max_conviction_leverage: float = Field(ge=0.0, allow_inf_nan=False) last_target_exposure: float = Field(ge=0.0, allow_inf_nan=False) class TradeEpisode(StrictModel): trade_id: str = Field(min_length=1) asset: Symbol side: str = Field(min_length=1, max_length=16) entry_ts_ms: int = Field(ge=0) exit_ts_ms: int = Field(ge=0) row_count: int = Field(ge=1) bars_held: Optional[int] = Field(default=None, ge=0) net_pnl: Optional[float] = Field(default=None, allow_inf_nan=False) terminal_reason: Optional[str] = None class EpisodeMatch(StrictModel): asset: Symbol side: str = Field(min_length=1, max_length=16) decision_episode: DecisionEpisode trade_episode: TradeEpisode start_gap_ms: int = Field(ge=0) end_gap_ms: int = Field(ge=0) row_gap: int = Field(ge=0) bars_gap: Optional[int] = Field(default=None, ge=0) class TradeSlotComparison(StrictModel): decision_episodes: list[DecisionEpisode] trade_episodes: list[TradeEpisode] matches: list[EpisodeMatch] decision_only: list[DecisionEpisode] trade_only: list[TradeEpisode] def _collapse_decision_rows( rows: Iterable[Mapping[str, Any]], *, max_scan_gap: int = 1, ) -> list[DecisionEpisode]: grouped: dict[tuple[str, str], list[Mapping[str, Any]]] = defaultdict(list) for row in rows: asset = _row_asset(row) side = _row_side(row) scan_number = _row_scan_number(row) if not asset or not side or scan_number is None: continue grouped[(asset, side)].append(row) episodes: list[DecisionEpisode] = [] for (asset, side), bucket in grouped.items(): bucket = sorted( bucket, key=lambda row: ( _row_scan_number(row) or 0, _row_ts_ms(row) or 0, ), ) current: list[Mapping[str, Any]] = [] prev_scan: Optional[int] = None for row in bucket: scan_number = _row_scan_number(row) if scan_number is None: continue if current and prev_scan is not None and scan_number > prev_scan + max_scan_gap: episodes.append(_decision_episode_from_rows(asset, side, current)) current = [] current.append(row) prev_scan = scan_number if current: episodes.append(_decision_episode_from_rows(asset, side, current)) return sorted(episodes, key=lambda ep: (ep.first_ts_ms, ep.asset, ep.side, ep.first_scan_number)) def _decision_episode_from_rows( asset: str, side: str, rows: list[Mapping[str, Any]], ) -> DecisionEpisode: scans = [sn for sn in (_row_scan_number(r) for r in rows) if sn is not None] times = [ts for ts in (_row_ts_ms(r) for r in rows) if ts is not None] conv = [ value for value in (_coerce_float(r.get("conviction_leverage"), None) for r in rows) if value is not None ] exposure = [ value for value in (_coerce_float(r.get("target_exposure"), None) for r in rows) if value is not None ] actuated = sum(1 for r in rows if bool(r.get("actuated"))) return DecisionEpisode( asset=asset, side=side, first_scan_number=min(scans), last_scan_number=max(scans), first_ts_ms=min(times) if times else 0, last_ts_ms=max(times) if times else 0, row_count=len(rows), actuated_count=actuated, max_conviction_leverage=max(conv) if conv else 0.0, last_target_exposure=exposure[-1] if exposure else 0.0, ) def _collapse_trade_rows(rows: Iterable[Mapping[str, Any]]) -> list[TradeEpisode]: grouped: dict[str, list[Mapping[str, Any]]] = defaultdict(list) for row in rows: trade_id = _row_trade_id(row) asset = _row_asset(row) side = _row_side(row) if not trade_id or not asset or not side: continue grouped[trade_id].append(row) episodes: list[TradeEpisode] = [] for trade_id, bucket in grouped.items(): bucket = sorted(bucket, key=lambda row: (_row_ts_ms(row) or 0, _coerce_int(row.get("scan_number")) or 0)) first = bucket[0] last = bucket[-1] entry_ts = _row_ts_ms(first) or 0 exit_ts = _row_ts_ms(last) or entry_ts bars = None for row in reversed(bucket): bars = _coerce_int(row.get("bars_held")) if bars is not None: break net_pnl = None for row in reversed(bucket): net_pnl = _coerce_float(row.get("net_pnl") or row.get("pnl"), None) if net_pnl is not None: break reason = None for row in reversed(bucket): reason = _coerce_text(row.get("reason") or row.get("exit_reason"), "") if reason: break episodes.append( TradeEpisode( trade_id=trade_id, asset=_row_asset(first), side=_row_side(first), entry_ts_ms=entry_ts, exit_ts_ms=exit_ts, row_count=len(bucket), bars_held=bars, net_pnl=net_pnl, terminal_reason=reason or None, ) ) return sorted(episodes, key=lambda ep: (ep.entry_ts_ms, ep.asset, ep.side, ep.trade_id)) def _match_episodes( decisions: list[DecisionEpisode], trades: list[TradeEpisode], ) -> tuple[list[EpisodeMatch], list[DecisionEpisode], list[TradeEpisode]]: by_key: dict[tuple[str, str], list[TradeEpisode]] = defaultdict(list) for trade in trades: by_key[(trade.asset, trade.side)].append(trade) for bucket in by_key.values(): bucket.sort(key=lambda ep: (ep.entry_ts_ms, ep.exit_ts_ms, ep.trade_id)) matches: list[EpisodeMatch] = [] decision_only: list[DecisionEpisode] = [] used_trade_ids: set[str] = set() for decision in decisions: bucket = by_key.get((decision.asset, decision.side), []) candidate = None for trade in bucket: if trade.trade_id in used_trade_ids: continue candidate = trade break if candidate is None: decision_only.append(decision) continue used_trade_ids.add(candidate.trade_id) matches.append( EpisodeMatch( asset=decision.asset, side=decision.side, decision_episode=decision, trade_episode=candidate, start_gap_ms=abs(decision.first_ts_ms - candidate.entry_ts_ms), end_gap_ms=abs(decision.last_ts_ms - candidate.exit_ts_ms), row_gap=abs(decision.row_count - candidate.row_count), bars_gap=( abs(decision.row_count - candidate.bars_held) if candidate.bars_held is not None else None ), ) ) trade_only = [trade for trade in trades if trade.trade_id not in used_trade_ids] return matches, decision_only, trade_only @typed def compare_trade_slot_granularity( decision_rows: Iterable[Mapping[str, Any]], trade_rows: Iterable[Mapping[str, Any]], *, max_scan_gap: int = 1, ) -> TradeSlotComparison: """Collapse both surfaces to episodes and compare them at slot granularity.""" decisions = _collapse_decision_rows(decision_rows, max_scan_gap=max_scan_gap) trades = _collapse_trade_rows(trade_rows) matches, decision_only, trade_only = _match_episodes(decisions, trades) return TradeSlotComparison( decision_episodes=decisions, trade_episodes=trades, matches=matches, decision_only=decision_only, trade_only=trade_only, )