diff --git a/prod/clean_arch/violet/test_violet_trade_slot_compare.py b/prod/clean_arch/violet/test_violet_trade_slot_compare.py new file mode 100644 index 0000000..53a7e4b --- /dev/null +++ b/prod/clean_arch/violet/test_violet_trade_slot_compare.py @@ -0,0 +1,151 @@ +from __future__ import annotations + +from prod.clean_arch.violet.trade_slot_compare import ( + compare_trade_slot_granularity, +) + + +def test_compare_trade_slot_granularity_collapses_and_matches(): + decisions = [ + { + "asset": "BTCUSDT", + "side": "SHORT", + "scan_number": 10, + "ts": 1_000, + "actuated": True, + "conviction_leverage": 3.0, + "target_exposure": 30.0, + }, + { + "asset": "BTCUSDT", + "side": "SHORT", + "scan_number": 11, + "ts": 2_000, + "actuated": True, + "conviction_leverage": 4.0, + "target_exposure": 40.0, + }, + { + "asset": "BTCUSDT", + "side": "LONG", + "scan_number": 16, + "ts": 4_000, + "actuated": True, + "conviction_leverage": 2.0, + "target_exposure": 20.0, + }, + ] + trades = [ + { + "trade_id": "t-1", + "asset": "BTCUSDT", + "side": "SHORT", + "ts": 900, + "bars_held": 1, + "net_pnl": 1.5, + "reason": "OPEN", + }, + { + "trade_id": "t-1", + "asset": "BTCUSDT", + "side": "SHORT", + "ts": 2_500, + "bars_held": 2, + "net_pnl": 4.5, + "reason": "EXIT", + }, + { + "trade_id": "t-2", + "asset": "BTCUSDT", + "side": "LONG", + "ts": 3_900, + "bars_held": 1, + "net_pnl": -0.25, + "reason": "EXIT", + }, + ] + + result = compare_trade_slot_granularity(decisions, trades) + + assert len(result.decision_episodes) == 2 + assert len(result.trade_episodes) == 2 + assert len(result.matches) == 2 + assert not result.decision_only + assert not result.trade_only + + short_match = result.matches[0] + assert short_match.asset == "BTCUSDT" + assert short_match.side == "SHORT" + assert short_match.decision_episode.first_scan_number == 10 + assert short_match.decision_episode.last_scan_number == 11 + assert short_match.decision_episode.row_count == 2 + assert short_match.trade_episode.trade_id == "t-1" + assert short_match.trade_episode.row_count == 2 + assert short_match.trade_episode.terminal_reason == "EXIT" + assert short_match.trade_episode.net_pnl == 4.5 + assert short_match.start_gap_ms == 100 + assert short_match.end_gap_ms == 500 + assert short_match.row_gap == 0 + assert short_match.bars_gap == 0 + + +def test_compare_trade_slot_granularity_splits_on_scan_gap_and_ignores_bad_rows(): + decisions = [ + { + "asset": "ETHUSDT", + "side": "SHORT", + "scan_number": 1, + "ts": 10, + "actuated": True, + "conviction_leverage": 1.0, + "target_exposure": 10.0, + }, + { + "asset": "ETHUSDT", + "side": "SHORT", + "scan_number": 2, + "ts": 20, + "actuated": True, + "conviction_leverage": 1.1, + "target_exposure": 11.0, + }, + { + "asset": "ETHUSDT", + "side": "SHORT", + "scan_number": 8, + "ts": 80, + "actuated": True, + "conviction_leverage": 1.2, + "target_exposure": 12.0, + }, + { + "asset": None, + "side": "SHORT", + "scan_number": "bad", + "ts": 90, + "actuated": True, + }, + ] + trades = [ + {"trade_id": "x-1", "asset": "ETHUSDT", "side": "SHORT", "ts": 15, "reason": "EXIT"}, + {"trade_id": "x-2", "asset": "ETHUSDT", "side": "SHORT", "ts": 99, "reason": "EXIT"}, + ] + + result = compare_trade_slot_granularity(decisions, trades) + + assert len(result.decision_episodes) == 2 + assert [ep.row_count for ep in result.decision_episodes] == [2, 1] + assert len(result.trade_episodes) == 2 + assert len(result.matches) == 2 + assert [m.trade_episode.trade_id for m in result.matches] == ["x-1", "x-2"] + assert not result.decision_only + assert not result.trade_only + + +def test_compare_trade_slot_granularity_handles_empty_input(): + result = compare_trade_slot_granularity([], []) + assert result.decision_episodes == [] + assert result.trade_episodes == [] + assert result.matches == [] + assert result.decision_only == [] + assert result.trade_only == [] diff --git a/prod/clean_arch/violet/trade_slot_compare.py b/prod/clean_arch/violet/trade_slot_compare.py new file mode 100644 index 0000000..328a0b7 --- /dev/null +++ b/prod/clean_arch/violet/trade_slot_compare.py @@ -0,0 +1,309 @@ +"""VIOLET trade/slot comparison harness. + +This is the missing V3 comparison unit from the main spec: collapse shadow +decisions into episode-sized runs, collapse raw trade rows into terminal trade +episodes, and compare them without requiring live execution wiring. + +VIOLET-only. BLUE is untouched. +""" + +from __future__ import annotations + +from collections import defaultdict +from collections.abc import Iterable, Mapping +from typing import Any, Optional + +from pydantic import Field + +from .domain import StrictModel, Symbol, typed + + +def _coerce_int(value: Any, default: Optional[int] = None) -> Optional[int]: + try: + if value is None: + return default + out = int(value) + return out if out >= 0 else default + except (TypeError, ValueError): + return default + + +def _coerce_float(value: Any, default: Optional[float] = None) -> Optional[float]: + try: + if value is None: + return default + out = float(value) + return out if out == out and out not in (float("inf"), float("-inf")) else default + except (TypeError, ValueError): + return default + + +def _coerce_text(value: Any, default: str = "") -> str: + if value is None: + return default + text = str(value).strip() + return text if text else default + + +def _row_asset(row: Mapping[str, Any]) -> str: + return _coerce_text(row.get("asset") or row.get("symbol") or row.get("instrument")).upper() + + +def _row_side(row: Mapping[str, Any]) -> str: + return _coerce_text(row.get("side") or row.get("direction") or row.get("trade_side")).upper() + + +def _row_scan_number(row: Mapping[str, Any]) -> Optional[int]: + return _coerce_int(row.get("scan_number") or row.get("scan") or row.get("scan_idx")) + + +def _row_ts_ms(row: Mapping[str, Any]) -> Optional[int]: + candidates = ( + row.get("ts"), + row.get("ts_ms"), + row.get("timestamp"), + row.get("exit_ts"), + row.get("entry_ts"), + row.get("mono_ns"), + ) + for value in candidates: + ts = _coerce_int(value) + if ts is not None: + return ts if value is None or value != row.get("mono_ns") else ts // 1_000_000 + return None + + +def _row_trade_id(row: Mapping[str, Any]) -> str: + return _coerce_text( + row.get("trade_id") or row.get("id") or row.get("slot_id") or row.get("episode_id"), + ) + + +class DecisionEpisode(StrictModel): + asset: Symbol + side: str = Field(min_length=1, max_length=16) + first_scan_number: int = Field(ge=0) + last_scan_number: int = Field(ge=0) + first_ts_ms: int = Field(ge=0) + last_ts_ms: int = Field(ge=0) + row_count: int = Field(ge=1) + actuated_count: int = Field(ge=0) + max_conviction_leverage: float = Field(ge=0.0, allow_inf_nan=False) + last_target_exposure: float = Field(ge=0.0, allow_inf_nan=False) + + +class TradeEpisode(StrictModel): + trade_id: str = Field(min_length=1) + asset: Symbol + side: str = Field(min_length=1, max_length=16) + entry_ts_ms: int = Field(ge=0) + exit_ts_ms: int = Field(ge=0) + row_count: int = Field(ge=1) + bars_held: Optional[int] = Field(default=None, ge=0) + net_pnl: Optional[float] = Field(default=None, allow_inf_nan=False) + terminal_reason: Optional[str] = None + + +class EpisodeMatch(StrictModel): + asset: Symbol + side: str = Field(min_length=1, max_length=16) + decision_episode: DecisionEpisode + trade_episode: TradeEpisode + start_gap_ms: int = Field(ge=0) + end_gap_ms: int = Field(ge=0) + row_gap: int = Field(ge=0) + bars_gap: Optional[int] = Field(default=None, ge=0) + + +class TradeSlotComparison(StrictModel): + decision_episodes: list[DecisionEpisode] + trade_episodes: list[TradeEpisode] + matches: list[EpisodeMatch] + decision_only: list[DecisionEpisode] + trade_only: list[TradeEpisode] + + +def _collapse_decision_rows( + rows: Iterable[Mapping[str, Any]], + *, + max_scan_gap: int = 1, +) -> list[DecisionEpisode]: + grouped: dict[tuple[str, str], list[Mapping[str, Any]]] = defaultdict(list) + for row in rows: + asset = _row_asset(row) + side = _row_side(row) + scan_number = _row_scan_number(row) + if not asset or not side or scan_number is None: + continue + grouped[(asset, side)].append(row) + + episodes: list[DecisionEpisode] = [] + for (asset, side), bucket in grouped.items(): + bucket = sorted( + bucket, + key=lambda row: ( + _row_scan_number(row) or 0, + _row_ts_ms(row) or 0, + ), + ) + current: list[Mapping[str, Any]] = [] + prev_scan: Optional[int] = None + for row in bucket: + scan_number = _row_scan_number(row) + if scan_number is None: + continue + if current and prev_scan is not None and scan_number > prev_scan + max_scan_gap: + episodes.append(_decision_episode_from_rows(asset, side, current)) + current = [] + current.append(row) + prev_scan = scan_number + if current: + episodes.append(_decision_episode_from_rows(asset, side, current)) + + return sorted(episodes, key=lambda ep: (ep.first_ts_ms, ep.asset, ep.side, ep.first_scan_number)) + + +def _decision_episode_from_rows( + asset: str, + side: str, + rows: list[Mapping[str, Any]], +) -> DecisionEpisode: + scans = [sn for sn in (_row_scan_number(r) for r in rows) if sn is not None] + times = [ts for ts in (_row_ts_ms(r) for r in rows) if ts is not None] + conv = [ + value for value in (_coerce_float(r.get("conviction_leverage"), None) for r in rows) + if value is not None + ] + exposure = [ + value for value in (_coerce_float(r.get("target_exposure"), None) for r in rows) + if value is not None + ] + actuated = sum(1 for r in rows if bool(r.get("actuated"))) + return DecisionEpisode( + asset=asset, + side=side, + first_scan_number=min(scans), + last_scan_number=max(scans), + first_ts_ms=min(times) if times else 0, + last_ts_ms=max(times) if times else 0, + row_count=len(rows), + actuated_count=actuated, + max_conviction_leverage=max(conv) if conv else 0.0, + last_target_exposure=exposure[-1] if exposure else 0.0, + ) + + +def _collapse_trade_rows(rows: Iterable[Mapping[str, Any]]) -> list[TradeEpisode]: + grouped: dict[str, list[Mapping[str, Any]]] = defaultdict(list) + for row in rows: + trade_id = _row_trade_id(row) + asset = _row_asset(row) + side = _row_side(row) + if not trade_id or not asset or not side: + continue + grouped[trade_id].append(row) + + episodes: list[TradeEpisode] = [] + for trade_id, bucket in grouped.items(): + bucket = sorted(bucket, key=lambda row: (_row_ts_ms(row) or 0, _coerce_int(row.get("scan_number")) or 0)) + first = bucket[0] + last = bucket[-1] + entry_ts = _row_ts_ms(first) or 0 + exit_ts = _row_ts_ms(last) or entry_ts + bars = None + for row in reversed(bucket): + bars = _coerce_int(row.get("bars_held")) + if bars is not None: + break + net_pnl = None + for row in reversed(bucket): + net_pnl = _coerce_float(row.get("net_pnl") or row.get("pnl"), None) + if net_pnl is not None: + break + reason = None + for row in reversed(bucket): + reason = _coerce_text(row.get("reason") or row.get("exit_reason"), "") + if reason: + break + episodes.append( + TradeEpisode( + trade_id=trade_id, + asset=_row_asset(first), + side=_row_side(first), + entry_ts_ms=entry_ts, + exit_ts_ms=exit_ts, + row_count=len(bucket), + bars_held=bars, + net_pnl=net_pnl, + terminal_reason=reason or None, + ) + ) + + return sorted(episodes, key=lambda ep: (ep.entry_ts_ms, ep.asset, ep.side, ep.trade_id)) + + +def _match_episodes( + decisions: list[DecisionEpisode], + trades: list[TradeEpisode], +) -> tuple[list[EpisodeMatch], list[DecisionEpisode], list[TradeEpisode]]: + by_key: dict[tuple[str, str], list[TradeEpisode]] = defaultdict(list) + for trade in trades: + by_key[(trade.asset, trade.side)].append(trade) + for bucket in by_key.values(): + bucket.sort(key=lambda ep: (ep.entry_ts_ms, ep.exit_ts_ms, ep.trade_id)) + + matches: list[EpisodeMatch] = [] + decision_only: list[DecisionEpisode] = [] + used_trade_ids: set[str] = set() + + for decision in decisions: + bucket = by_key.get((decision.asset, decision.side), []) + candidate = None + for trade in bucket: + if trade.trade_id in used_trade_ids: + continue + candidate = trade + break + if candidate is None: + decision_only.append(decision) + continue + used_trade_ids.add(candidate.trade_id) + matches.append( + EpisodeMatch( + asset=decision.asset, + side=decision.side, + decision_episode=decision, + trade_episode=candidate, + start_gap_ms=abs(decision.first_ts_ms - candidate.entry_ts_ms), + end_gap_ms=abs(decision.last_ts_ms - candidate.exit_ts_ms), + row_gap=abs(decision.row_count - candidate.row_count), + bars_gap=( + abs(decision.row_count - candidate.bars_held) + if candidate.bars_held is not None + else None + ), + ) + ) + + trade_only = [trade for trade in trades if trade.trade_id not in used_trade_ids] + return matches, decision_only, trade_only + + +@typed +def compare_trade_slot_granularity( + decision_rows: Iterable[Mapping[str, Any]], + trade_rows: Iterable[Mapping[str, Any]], + *, + max_scan_gap: int = 1, +) -> TradeSlotComparison: + """Collapse both surfaces to episodes and compare them at slot granularity.""" + decisions = _collapse_decision_rows(decision_rows, max_scan_gap=max_scan_gap) + trades = _collapse_trade_rows(trade_rows) + matches, decision_only, trade_only = _match_episodes(decisions, trades) + return TradeSlotComparison( + decision_episodes=decisions, + trade_episodes=trades, + matches=matches, + decision_only=decision_only, + trade_only=trade_only, + ) diff --git a/prod/docs/RECENT_VIOLET_34E_pending.md b/prod/docs/RECENT_VIOLET_34E_pending.md new file mode 100644 index 0000000..ab8b8d2 --- /dev/null +++ b/prod/docs/RECENT_VIOLET_34E_pending.md @@ -0,0 +1,83 @@ +# RECENT VIOLET 34E — trade/slot-granularity comparison harness + +This pass implemented the next numbered comparison item from the main Violet spec: +the trade/slot-granularity comparison that was still deferred after the V3.4c +live-source work and shadow journal wiring. + +## What existed before + +Before this pass, Violet already had: + +- `VioletDecisionEngine` producing shadow decisions +- `VioletDecisionJournal` persisting actuated decisions to `violet_decisions` +- live-source adapters for BLUE-published factors +- full sizing parity through `VioletSizer` + +What was still missing was a dedicated comparison unit that could collapse those +journaled decisions into episode-sized runs and compare them against the terminal +trade surface at the same granularity. + +## What was added + +I added a new Violet-only comparator: + +- [prod/clean_arch/violet/trade_slot_compare.py](/mnt/dolphinng5_predict/prod/clean_arch/violet/trade_slot_compare.py) +- [prod/clean_arch/violet/test_violet_trade_slot_compare.py](/mnt/dolphinng5_predict/prod/clean_arch/violet/test_violet_trade_slot_compare.py) + +The new module provides: + +- `DecisionEpisode`: a collapsed run of contiguous shadow decisions for one + asset/side +- `TradeEpisode`: a collapsed terminal trade record grouped by `trade_id` +- `EpisodeMatch`: a paired decision/trade episode with timing and row-count gaps +- `TradeSlotComparison`: the full comparison result +- `compare_trade_slot_granularity(...)`: the top-level collapse-and-compare API + +## How it works + +The comparator is deliberately narrow: + +- it groups decision rows by `asset` + `side` +- it splits them into episodes when the scan number gap exceeds the configured + `max_scan_gap` +- it groups trade rows by `trade_id` +- it ignores malformed rows rather than failing on them +- it matches episodes by asset/side and preserves unmatched decision/trade rows + +This is downstream of the existing shadow journal. It does not reimplement ACB, +EsoF, OB, or sizing math. It only compares the surfaces that already exist. + +## Anomaly handling + +The comparator rejects or skips bad inputs instead of letting them pollute the +episode view: + +- missing asset or side +- missing or invalid scan number +- malformed or missing trade id +- non-finite numeric fields + +That keeps the harness usable against noisy journal extracts and historical trade +rows that may contain replay artifacts. + +## Tests + +The new tests cover: + +- episode collapse across contiguous decision rows +- terminal trade dedupe through `trade_id` +- scan-gap splitting +- malformed row handling +- empty-input behavior + +Verification on this pass: + +- `PYTHONPATH=/mnt/dolphinng5_predict rtk python -m pytest -q /mnt/dolphinng5_predict/prod/clean_arch/violet/test_violet_trade_slot_compare.py` +- result: `3 passed` + +## Why this is the right next step + +The main Violet plan explicitly defers trade/slot-granularity comparison until the +shadow side has a comparable execution surface. That condition is now met well +enough to build the comparison harness without touching BLUE or the live executor. +