From ef473ba37239de09538ca24c6f85980089cffdd7 Mon Sep 17 00:00:00 2001 From: Codex Date: Tue, 2 Jun 2026 19:44:15 +0200 Subject: [PATCH] =?UTF-8?q?PINK:=20E2E=20trace=20analysis=20=E2=80=94=20Pa?= =?UTF-8?q?ss=2023=20closure=20review/unfinished=20fixes/ops=20gaps=20(Z1-?= =?UTF-8?q?Z14)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Twenty-third (final) pass: _safe_enum fix applied to rust_backend.py but NOT real_zinc_plane.py other copy crashes (Z1 High), no health check endpoint silent failures invisible to orchestration (Z5 High), process_intent calls venue.submit without exception handler venue error bypasses Rust FSM (Z6 High), snapshot mixes Rust and Python accounting capital can diverge (Z7 Medium), BingxVenueAdapter.close executor null-to-shutdown TOCTOU race (Z8 Medium), generated test f-string chr(34) template SyntaxError risk on old Python (Z9 Medium), launcher uses Python 3.10+ | union syntax no min version documented (Z10 Medium), concurrent process_intent on same slot no lock no queue (Z12 Medium). 403 total flaws across 23 passes. Co-authored-by: CommandCodeBot --- Observability/dolphin_status_pink.py | 1965 +++++++++++++++++++++++ PINK_DITAv2_E2E_TRACE_ANALYSIS.md | 208 +++ PINK_DITAv2_FLAW_ANALYSIS_2026-05-31.md | 26 +- 3 files changed, 2198 insertions(+), 1 deletion(-) create mode 100644 Observability/dolphin_status_pink.py diff --git a/Observability/dolphin_status_pink.py b/Observability/dolphin_status_pink.py new file mode 100644 index 0000000..aff94c2 --- /dev/null +++ b/Observability/dolphin_status_pink.py @@ -0,0 +1,1965 @@ +#!/usr/bin/env python3 +"""DOLPHIN PINK live status — v7 +0.5s poll. SIG/TRD/FIL gear rows + last-5-trades + CH persistence + V7 exit cmp. +HOTKEYS: [h] help [1] reduce 50% [2] reduce 75% [3] close all [s] STALKER [a] APEX +Note: posture hotkeys are disabled by default in PINK isolation mode. + +Run: source /home/dolphin/siloqy_env/bin/activate && python dolphin_status_pink.py +Quit: Ctrl-C +""" +# v1–v5 archived as dolphin_status_v{1..5}.py +# v6: exit-comparison overlay (V7 preview), net-pnl pct fix +# v7: hotkey system — position reduce, posture switch, CH audit trail + +import hashlib, hmac, json, os, re, select, threading, time, sys, uuid +import termios, tty, urllib.request, urllib.parse +from collections import deque +from datetime import datetime, timezone +from pathlib import Path + +import hazelcast + +def _float_env(name: str, default: float) -> float: + raw = os.environ.get(name, "") + try: + val = float(raw) + if val > 0: + return val + except Exception: + pass + return default + +# ── PINK runtime bindings ────────────────────────────────────────────────────── +PINK_CH_DB = os.environ.get("DOLPHIN_TUI_CH_DB", "dolphin_pink") +PINK_STRATEGY = os.environ.get("DOLPHIN_TUI_STRATEGY", "pink") +PINK_STATE_MAP = os.environ.get("DOLPHIN_TUI_STATE_MAP", "DOLPHIN_STATE_PINK") +PINK_RUNTIME_COMMAND_KEY = os.environ.get("DOLPHIN_TUI_RUNTIME_COMMAND_KEY", "pink_runtime_commands") +PINK_SAFETY_MAP = os.environ.get("DOLPHIN_TUI_SAFETY_MAP", "DOLPHIN_SAFETY") +PINK_HEARTBEAT_MAP = os.environ.get("DOLPHIN_TUI_HEARTBEAT_MAP", "DOLPHIN_HEARTBEAT") +PINK_HEARTBEAT_KEY = os.environ.get("DOLPHIN_TUI_HEARTBEAT_KEY", "nautilus_flow_heartbeat") +PINK_META_HEALTH_MAP = os.environ.get("DOLPHIN_TUI_META_HEALTH_MAP", "DOLPHIN_META_HEALTH") +PINK_ANNOUNCEMENTS_MAP = os.environ.get("DOLPHIN_TUI_ANNOUNCEMENTS_MAP", "DOLPHIN_ANNOUNCEMENTS") +PINK_FEATURES_MAP = os.environ.get("DOLPHIN_TUI_FEATURES_MAP", "DOLPHIN_FEATURES") +PINK_ALLOW_GLOBAL_POSTURE_HOTKEYS = str( + os.environ.get("DOLPHIN_PINK_TUI_ENABLE_GLOBAL_POSTURE", "0") +).strip().lower() in {"1", "true", "yes", "on"} + +# ── ClickHouse fire-and-forget write ───────────────────────────────────────── +_CH_URL = "http://localhost:8123" +_CH_USER = "dolphin" +_CH_PASS = "dolphin_ch_2026" +_CH_Q: deque = deque(maxlen=500) +_CH_TABLE_Q: dict[str, deque] = {} # table_name → deque of rows +_CH_TABLE_LOCK = threading.Lock() + +def _ch_flush_table(table: str, rows: list): + if not rows: + return + body = "\n".join(json.dumps(r) for r in rows).encode() + url = f"{_CH_URL}/?database={PINK_CH_DB}&query=INSERT+INTO+{table}+FORMAT+JSONEachRow" + req = urllib.request.Request(url, data=body, method="POST") + req.add_header("X-ClickHouse-User", _CH_USER) + req.add_header("X-ClickHouse-Key", _CH_PASS) + req.add_header("Content-Type", "application/octet-stream") + try: + urllib.request.urlopen(req, timeout=4) + except Exception: + pass + +def _ch_worker(): + while True: + time.sleep(2) + rows = [] + while _CH_Q: + try: rows.append(_CH_Q.popleft()) + except IndexError: break + if rows: + _ch_flush_table("status_snapshots", rows) + with _CH_TABLE_LOCK: + tables = {t: list(q) for t, q in _CH_TABLE_Q.items() if q} + for q in _CH_TABLE_Q.values(): + q.clear() + for table, trows in tables.items(): + _ch_flush_table(table, trows) + +threading.Thread(target=_ch_worker, daemon=True, name="ch-status-pink").start() + +def ch_put(row: dict, table: str | None = None): + if table: + with _CH_TABLE_LOCK: + if table not in _CH_TABLE_Q: + _CH_TABLE_Q[table] = deque(maxlen=500) + _CH_TABLE_Q[table].append(row) + else: + _CH_Q.append(row) + +# ── Trade panel sources ─────────────────────────────────────────────────────── +def _parse_path_env(name: str) -> tuple[Path, ...]: + """Parse a path list env var using common separators.""" + raw = os.environ.get(name, "").strip() + if not raw: + return () + paths = [] + for chunk in re.split(r"[:;,]", raw): + item = chunk.strip() + if item: + paths.append(Path(item)) + return tuple(paths) + + +_TRADER_LOG_CANDIDATES = tuple( + dict.fromkeys( + [ + *_parse_path_env("DOLPHIN_TUI_TRADER_LOG_PATHS"), + Path("/tmp/dolphin_logs/supervisor/dolphin_live_pink.log"), + Path("/root/dolphin_logs/supervisor/dolphin_live_pink.log"), + Path("/mnt/dolphinng5_predict/prod/supervisor/logs/dolphin_live_pink.log"), + ] + ) +) +# Capture the JSON dict only — stop at first } closing the payload. +# Lines may have a trailing tag like [v2_gold_fix_v50-v750] after the dict. +_RE_ENTRY = re.compile(r"\[(.+?)\] ENTRY: (\{.+?\})(?:\s*\[.*\])?$") +_RE_EXIT = re.compile(r"\[(.+?)\] (?:SUBDAY_)?EXIT: (\{.+?\})(?:\s*\[.*\])?$") +_TRADE_PANEL_CACHE = {"ts": 0.0, "open": [], "closed": [], "fills": []} +_ACCOUNT_EVENT_CACHE = {"ts": 0.0, "row": None} + + +def _normalize_exit_reason(reason: str) -> str: + text = str(reason or "").strip() + if text == "V7_MAE_SL_VOL_NORM": + return "V7.1_MAE_SL_VOL_NORM" + return text + + +def _resolve_trader_log() -> Path | None: + """Pick the freshest available supervisor log across known mount layouts.""" + best_path = None + best_key = None + for path in _TRADER_LOG_CANDIDATES: + try: + stat = path.stat() + except OSError: + continue + if stat.st_size <= 0: + continue + key = (stat.st_mtime, stat.st_size) + if best_key is None or key > best_key: + best_key = key + best_path = path + return best_path + +def _parse_log_dict(raw: str) -> dict: + """Parse a Python dict repr from a log line. Handles nan and single-quoted strings.""" + import ast + # Replace nan/inf with JSON-safe equivalents before parsing + cleaned = raw.replace(": nan", ": null").replace(": inf", ": null").replace(": -inf", ": null") + try: + return ast.literal_eval(raw) # handles all Python literal forms incl. nan + except Exception: + pass + try: + return json.loads(cleaned.replace("'", '"')) + except Exception: + raise ValueError(f"unparseable: {raw[:80]}") + + +def _ch_query(sql: str, *, database: str = PINK_CH_DB) -> list[dict]: + """Execute a ClickHouse query and return JSONEachRow dicts.""" + try: + query = urllib.parse.quote(f"{sql} FORMAT JSONEachRow") + url = f"{_CH_URL}/?database={database}&query={query}" + req = urllib.request.Request(url) + req.add_header("X-ClickHouse-User", _CH_USER) + req.add_header("X-ClickHouse-Key", _CH_PASS) + with urllib.request.urlopen(req, timeout=5) as resp: + body = resp.read().decode("utf-8").strip() + if not body: + return [] + return [json.loads(line) for line in body.splitlines() if line.strip()] + except Exception: + return [] + + +def _position_key(row: dict) -> tuple: + trade_id = str(row.get("trade_id") or "").strip() + if trade_id: + return ("trade_id", trade_id) + asset = str(row.get("asset") or "").strip() + entry = row.get("entry_price") + try: + if asset and entry is not None: + return ("asset_entry", asset, round(float(entry), 8)) + except Exception: + pass + if asset: + return ("asset", asset) + return ("unknown", id(row)) + + +def _fetch_open_positions_from_ch(limit: int = 5) -> list[dict]: + rows = _ch_query( + f""" + SELECT + trade_id, + asset, + direction, + entry_price, + quantity, + notional, + leverage, + bucket_id, + bars_held, + last_ts + FROM ( + SELECT + trade_id, + argMax(asset, ts) AS asset, + argMax(direction, ts) AS direction, + argMax(entry_price, ts) AS entry_price, + argMax(quantity, ts) AS quantity, + argMax(notional, ts) AS notional, + argMax(leverage, ts) AS leverage, + argMax(bucket_id, ts) AS bucket_id, + argMax(bars_held, ts) AS bars_held, + argMax(status, ts) AS status, + max(ts) AS last_ts + FROM {PINK_CH_DB}.position_state + GROUP BY trade_id + ) + WHERE status = 'OPEN' + ORDER BY last_ts DESC + LIMIT {int(limit)} + """ + ) + return rows + + +def _fetch_latest_account_event_from_ch() -> dict | None: + now = time.time() + cached = _ACCOUNT_EVENT_CACHE + if now - float(cached.get("ts", 0.0) or 0.0) < 0.75: + row = cached.get("row") + return dict(row) if isinstance(row, dict) else None + rows = _ch_query( + f""" + SELECT + ts, + event_type, + strategy, + capital, + trades_today, + open_positions, + current_open_notional, + current_account_leverage, + notes + FROM {PINK_CH_DB}.account_events + ORDER BY ts DESC + LIMIT 5 + """ + ) + row = None + if rows: + row = rows[0] + for candidate in rows: + try: + if float(candidate.get("current_open_notional", 0) or 0) > 0: + row = candidate + break + except Exception: + continue + cached["ts"] = now + cached["row"] = dict(row) if isinstance(row, dict) else None + return dict(row) if isinstance(row, dict) else None + + +def _account_event_payload(row: dict | None) -> dict: + if not isinstance(row, dict): + return {} + raw = str(row.get("notes") or "").strip() + if not raw: + return {} + try: + payload = json.loads(raw) + if isinstance(payload, dict): + inner = payload.get("payload") + if isinstance(inner, dict): + return inner + return payload + except Exception: + return {} + return {} + + +def _account_event_positions(row: dict | None) -> list[dict]: + payload = _account_event_payload(row) + positions = payload.get("positions") if isinstance(payload, dict) else {} + if not isinstance(positions, dict): + return [] + out: list[dict] = [] + for symbol, pos in positions.items(): + if not isinstance(pos, dict): + continue + asset = str(pos.get("symbol") or symbol or "").replace("-", "") + if not asset: + continue + side = str(pos.get("positionSide") or pos.get("side") or "SHORT").upper() + qty = float(pos.get("positionAmt", 0) or 0) + entry = float(pos.get("avgPrice", 0) or 0) + mark = float(pos.get("markPrice", 0) or 0) + upnl = float(pos.get("unrealizedProfit", 0) or 0) + notional = float(pos.get("positionValue", 0) or 0) + lev = float(pos.get("leverage", 0) or 0) + out.append({ + "asset": asset, + "side": side, + "quantity": abs(qty), + "entry_price": entry, + "current_price": mark, + "unrealized_pnl": upnl, + "notional": notional, + "leverage": lev, + "direction": 1 if side == "LONG" else -1, + "trade_id": str(pos.get("positionId") or symbol or asset), + "source": "bingx_account_events", + }) + return out + + +def _account_event_fills(row: dict | None, limit: int = 30) -> list[dict]: + payload = _account_event_payload(row) + fills = payload.get("fills") if isinstance(payload, dict) else [] + if not isinstance(fills, list): + return [] + out: list[dict] = [] + for item in fills[-limit:]: + if not isinstance(item, dict): + continue + fill_row = item.get("row") if isinstance(item.get("row"), dict) else item + friction = item.get("friction") if isinstance(item.get("friction"), dict) else fill_row.get("friction", {}) + symbol = str(fill_row.get("symbol") or fill_row.get("asset") or item.get("asset") or "?") + asset = symbol.replace("-", "") + side = str(fill_row.get("side") or item.get("side") or "?").upper() + px = float(fill_row.get("avgPrice", fill_row.get("price", 0)) or 0) + qty = float(fill_row.get("executedQty", fill_row.get("quantity", 0)) or 0) + trade_key = str(item.get("_trade_key") or fill_row.get("orderId") or fill_row.get("clientOrderId") or asset) + pnl = float(item.get("pnl", fill_row.get("pnl", 0)) or 0) + out.append({ + "trade_id": trade_key, + "ts": str(row.get("ts") or ""), + "asset": asset, + "side": side, + "entry_price": px, + "exit_price": px, + "quantity": qty, + "net_pnl": pnl, + "pnl_pct": float(fill_row.get("pnlPct", item.get("pnl_pct", 0)) or item.get("pnl_pct", 0) or 0), + "reason": str(fill_row.get("status") or item.get("status") or "FILL"), + "bars_held": item.get("bars_held", 0), + "source": "bingx_account_events", + "liquidity_side": friction.get("liquidity_side"), + "fill_quality_score": friction.get("fill_quality_score"), + }) + return out + + +def _account_event_fill_count(row: dict | None) -> int: + payload = _account_event_payload(row) + fills = payload.get("fills") if isinstance(payload, dict) else [] + if not isinstance(fills, list): + return 0 + return sum(1 for item in fills if isinstance(item, dict)) + + +def _fetch_recent_account_fills_from_ch(limit: int = 30) -> list[dict]: + rows = _ch_query( + f""" + SELECT + ts, + event_type, + strategy, + notes + FROM {PINK_CH_DB}.account_events + WHERE strategy = '{PINK_STRATEGY}' + ORDER BY ts DESC + LIMIT {int(max(limit, 1))} + """ + ) + if not rows: + return [] + fills: list[dict] = [] + for row in rows: + fills.extend(_account_event_fills(row, limit=limit)) + if len(fills) >= limit: + break + return fills[:limit] + + +def _overlay_account_event_state(eng: dict) -> tuple[dict, dict | None]: + """Merge the live BingX account snapshot into the HZ engine snapshot. + + PINK's HZ `engine_snapshot` can lag the exchange ledger. When BingX + account events have a fresher view of open positions, use them for display + and gating while preserving the rest of the engine snapshot. + """ + merged = dict(eng or {}) + account_row = _fetch_latest_account_event_from_ch() + live_positions = _account_event_positions(account_row) + if account_row: + capital = float(account_row.get("capital", merged.get("capital", 0)) or merged.get("capital", 0) or 0) + merged["capital"] = capital + merged["account_capital"] = capital + merged["portfolio_capital"] = capital + merged["ledger_authority"] = "exchange" + merged["current_open_notional"] = float( + account_row.get("current_open_notional", merged.get("current_open_notional", 0)) or 0 + ) + merged["current_account_leverage"] = float( + account_row.get("current_account_leverage", merged.get("current_account_leverage", 0)) or 0 + ) + return merged, account_row + + +def _fetch_closed_trades_from_ch(limit: int = 30) -> list[dict]: + rows = _ch_query( + f""" + SELECT + ts, + trade_id, + asset, + side, + entry_price, + exit_price, + quantity, + pnl, + pnl_pct, + exit_reason, + leverage, + bars_held, + scan_uuid, + capital_before, + capital_after + FROM {PINK_CH_DB}.trade_events + WHERE strategy = '{PINK_STRATEGY}' + ORDER BY ts DESC + LIMIT {int(limit)} + """ + ) + return rows + + +def _fetch_closed_trades_from_log(limit: int = 30) -> list[dict]: + trades = _last_n_trades(limit) + for row in trades: + row.setdefault("source", "log") + return trades + + +def _augment_open_positions(open_rows: list[dict], eng_open: list[dict]) -> list[dict]: + if not open_rows: + return [] + supplemental = {} + for row in eng_open or []: + supplemental[_position_key(row)] = row + asset = str(row.get("asset") or "").strip() + if asset: + supplemental.setdefault(("asset", asset), row) + + merged = [] + for row in open_rows: + out = dict(row) + extra = supplemental.get(_position_key(row)) + if extra is None: + asset = str(row.get("asset") or "").strip() + if asset: + extra = supplemental.get(("asset", asset)) + if extra: + for key in ("side", "unrealized_pnl", "current_price", "entry_price", "quantity", "notional", "leverage"): + if key in extra and (key not in out or out.get(key) in (None, "", 0, 0.0)): + out[key] = extra.get(key) + if "trade_id" not in out or not out.get("trade_id"): + if extra.get("trade_id"): + out["trade_id"] = extra.get("trade_id") + merged.append(out) + return merged + + +def _mark_price_from_obf(asset: str, obf: dict | None) -> float: + if not asset or not isinstance(obf, dict): + return 0.0 + row = obf.get(asset) or (obf.get("assets") or {}).get(asset) or {} + if not row and "-" not in asset: + row = obf.get(_bingx_venue_symbol(asset)) or (obf.get("assets") or {}).get(_bingx_venue_symbol(asset)) or {} + try: + bid = float(row.get("best_bid", 0) or 0) + ask = float(row.get("best_ask", 0) or 0) + if bid > 0 and ask > 0: + return (bid + ask) / 2 + if bid > 0: + return bid + if ask > 0: + return ask + except Exception: + return 0.0 + return 0.0 + + +def _derive_open_position_pnl(row: dict, obf: dict | None) -> tuple[float, float]: + entry = float(row.get("entry_price", 0) or 0) + current = float(row.get("current_price", 0) or 0) + if current <= 0: + current = _mark_price_from_obf(str(row.get("asset") or ""), obf) + qty = float(row.get("quantity", 0) or 0) + notional = float(row.get("notional", 0) or 0) + if qty <= 0 and entry > 0 and notional > 0: + qty = notional / entry + side = str(row.get("side") or "").upper() + direction = -1 if side == "SHORT" or int(row.get("direction", -1) or -1) == -1 else 1 + upnl = float(row.get("unrealized_pnl", 0) or 0) + if upnl == 0 and entry > 0 and current > 0 and qty > 0: + upnl = ((entry - current) * qty) if direction == -1 else ((current - entry) * qty) + return upnl, current + + +def _trade_panel_rows(eng, obf: dict | None, limit: int = 30) -> tuple[list[dict], list[dict], list[dict], str]: + now = time.time() + cached = _TRADE_PANEL_CACHE + if now - float(cached.get("ts", 0.0) or 0.0) < 0.75: + return ( + list(cached.get("open", [])), + list(cached.get("closed", [])), + list(cached.get("fills", [])), + str(cached.get("source", "cache")), + ) + + account_row = _fetch_latest_account_event_from_ch() + account_open_rows = _account_event_positions(account_row) + + open_rows = _fetch_open_positions_from_ch(limit=5) + closed_rows = _fetch_closed_trades_from_ch(limit=limit) + source = "clickhouse" + eng_open = [] + try: + eng_open = eng.get("open_positions") or [] + except Exception: + eng_open = [] + + if account_open_rows: + # BingX account events reflect the live venue state even if HZ is stale. + open_rows = _augment_open_positions(account_open_rows, eng_open) + source = "bingx_account_events" + elif open_rows: + open_rows = _augment_open_positions(open_rows, eng_open) + for row in open_rows: + upnl, current = _derive_open_position_pnl(row, obf) + if current > 0 and not row.get("current_price"): + row["current_price"] = current + if upnl != 0 or not row.get("unrealized_pnl"): + row["unrealized_pnl"] = upnl + else: + # Preserve the live HZ snapshot if CH is briefly unavailable. + open_rows = list(eng_open or []) + source = "hazelcast" + if not closed_rows: + account_fills = _fetch_recent_account_fills_from_ch(limit) + if account_fills: + source = "bingx_account_events" + else: + log_rows = _fetch_closed_trades_from_log(limit) + if log_rows: + closed_rows = log_rows + source = "clickhouse+log" if source == "clickhouse" else "hazelcast+log" + else: + account_fills = [] + + cached["ts"] = now + cached["open"] = list(open_rows) + cached["closed"] = list(closed_rows) + cached["fills"] = list(account_fills) + cached["source"] = source + return open_rows, closed_rows, list(account_fills), source + + +def _summarize_closed_trades(rows: list[dict]) -> dict: + pnl_vals = [] + notional_vals = [] + wins = 0 + losses = 0 + for row in rows or []: + pnl = float(row.get("net_pnl", row.get("pnl", 0)) or 0) + notional = float(row.get("notional", 0) or 0) + pnl_vals.append(pnl) + notional_vals.append(notional) + if pnl >= 0: + wins += 1 + else: + losses += 1 + total_pnl = sum(pnl_vals) + total_notional = sum(notional_vals) + total_pct = (total_pnl / total_notional * 100) if total_notional else 0.0 + return { + "n": len(rows or []), + "wins": wins, + "losses": losses, + "pnl": total_pnl, + "pct": total_pct, + } + + +def _closed_trade_display_pnl(row: dict) -> float: + """Choose a sane USD PnL for closed-trade display. + + Prefer a signed mark-to-mark/close calculation from entry/exit/quantity. + Some log rows carry a near-zero or inconsistent `net_pnl`, and `pnl_pct` + may be magnitude-only, so the direct price delta is the most reliable + display source. + """ + net_pnl = float(row.get("net_pnl", row.get("pnl", 0)) or 0) + entry = float(row.get("entry_price", 0) or 0) + exit_ = float(row.get("exit_price", 0) or 0) + qty = float(row.get("quantity", 0) or 0) + side = str(row.get("side") or "").upper() + direction = int(row.get("direction", -1) or -1) + if entry > 0 and exit_ > 0 and qty > 0: + if side == "LONG" or direction == 1: + return (exit_ - entry) * qty + return (entry - exit_) * qty + notional = float(row.get("notional", 0) or 0) + pnl_pct = float(row.get("pnl_pct", 0) or 0) + expected = pnl_pct * notional if notional > 0 else 0.0 + if expected != 0 and abs(net_pnl) < max(1e-6, abs(expected) * 0.05): + return expected + return net_pnl + + +def _sys_health_sensor_rows(mh: dict) -> list[tuple[str, float]]: + sensors = [] + for key, value in (mh or {}).items(): + if not str(key).startswith("m"): + continue + try: + idx = int(str(key)[1:].split("_", 1)[0]) + except Exception: + continue + try: + sensors.append((idx, str(key), float(value))) + except Exception: + continue + sensors.sort(key=lambda item: (item[0], item[1])) + return [(name, val) for _, name, val in sensors] + + +def _format_two_column_sensor_lines(rows: list[tuple[str, float]]) -> list[str]: + if not rows: + return [] + out: list[str] = [] + pairs = [rows[i:i + 2] for i in range(0, len(rows), 2)] + for pair in pairs: + formatted = [] + for name, value in pair: + color = GREEN if value >= 0.9 else (YELLOW if value >= 0.5 else RED) + formatted.append(f"{color}{name}:{value:.3f}{RST}") + if len(formatted) == 2: + out.append(f" {formatted[0]} {formatted[1]}") + else: + out.append(f" {formatted[0]}") + return out + + +def _last_n_trades(n=5): + """Parse last N completed trades from supervisor log. Returns list of dicts.""" + trader_log = _resolve_trader_log() + if trader_log is None: + return [] + try: + lines = trader_log.read_text(errors="replace").splitlines()[-4000:] + except Exception: + return [] + entries = {} + trades = [] + for line in lines: + m = _RE_ENTRY.search(line) + if m: + try: + d = _parse_log_dict(m.group(2)) + entries[d["trade_id"]] = {"ts": m.group(1), **d} + except Exception: + pass + m = _RE_EXIT.search(line) + if m: + try: + d = _parse_log_dict(m.group(2)) + tid = d.get("trade_id") + if not tid: + continue + e = entries.pop(tid, {}) + trades.append({ + "trade_id": tid, + "ts": e.get("ts", m.group(1)), + "asset": e.get("asset", d.get("asset", "?")), + "entry_price": e.get("entry_price", d.get("entry_price", 0)), + "leverage": e.get("leverage", d.get("leverage", 0)), + "notional": e.get("notional", d.get("notional", 0)), + "direction": e.get("direction", d.get("direction", 0)), + "exit_ts": m.group(1), + "reason": _normalize_exit_reason(d.get("reason", "?")), + "pnl_pct": d.get("pnl_pct", 0), + "net_pnl": d.get("net_pnl", 0), + "bars_held": d.get("bars_held", 0), + }) + except Exception: + pass + return trades[-n:] + +CLEAR = "\033[2J\033[H" +BOLD = "\033[1m"; DIM = "\033[2m"; RST = "\033[0m" +GREEN = "\033[32m"; YELLOW = "\033[33m"; RED = "\033[31m"; CYAN = "\033[36m" +ORANGE = "\033[38;5;208m" + +PC = {"APEX": GREEN, "STALKER": YELLOW, "TURTLE": ORANGE, "HIBERNATE": RED} +SC = {"GREEN": GREEN, "DEGRADED": YELLOW, "CRITICAL": ORANGE, "DEAD": RED} + +# Thresholds from nautilus_event_trader.py +VEL_DIV_THRESHOLD = -0.020 # signal fires when vel_div < this +VEL_DIV_EXTREME = -0.050 # extreme bearish +VEL_DIV_WARN = -0.010 # approaching threshold (yellow) +VEL_DIV_CLOSE = -0.015 # nearly there (orange→yellow) +VOL_P60 = _float_env("DOLPHIN_PINK_VOL_P60_THRESHOLD", 0.00008000) +BTC_VOL_WINDOW = 50 # bars used for vol calc + +FIXED_TP_PCT = _float_env("DOLPHIN_FIXED_TP_PCT", 0.0020) # PINK TP target (0.20%) +MAX_HOLD_BARS = 250 # PINK max hold bars +TRADER_HB_STALE_S = 30.0 +TRADER_ENGINE_STALE_S = 90.0 + +START_CAP = None +CAP_PEAK = None + +_EXIT_TRACKER: dict = {} # (asset, entry_price) → accumulated V7 comparison state +_HOTKEY_FEEDBACK: deque = deque(maxlen=5) # recent hotkey results for display + +# ── BingX signing (sync, for hotkey-driven REST calls) ──────────────────────── +def _bingx_sign(params: dict, secret_key: str) -> dict: + ordered = {k: v for k, v in sorted(params.items()) if v is not None and v != ""} + query = urllib.parse.urlencode(ordered) + sig = hmac.new(secret_key.encode(), query.encode(), hashlib.sha256).hexdigest() + return {**ordered, "signature": sig} + + +_BINGX_API_KEY = os.environ.get("BINGX_API_KEY", "") +_BINGX_SECRET_KEY = os.environ.get("BINGX_SECRET_KEY", "") +_BINGX_RECV_WINDOW = 5000 + + +def _bingx_env_base_url() -> str: + env = os.environ.get("DOLPHIN_BINGX_ENV", "VST").strip().upper() + if env == "LIVE": + return "https://open-api.bingx.com" + return "https://open-api-vst.bingx.com" + + +def _bingx_rest_post(path: str, params: dict) -> dict: + """Synchronous signed POST to BingX. Returns parsed JSON or raises.""" + base = _bingx_env_base_url() + signed = _bingx_sign({ + **params, + "timestamp": int(time.time() * 1000), + "recvWindow": _BINGX_RECV_WINDOW, + }, _BINGX_SECRET_KEY) + url = f"{base}{path}?{urllib.parse.urlencode(signed)}" + req = urllib.request.Request(url, method="POST") + req.add_header("X-BX-APIKEY", _BINGX_API_KEY) + req.add_header("Content-Type", "application/x-www-form-urlencoded") + with urllib.request.urlopen(req, timeout=10) as resp: + return json.loads(resp.read()) + + +def _bingx_venue_symbol(asset: str) -> str: + if "-" in asset: + return asset + if asset.endswith("USDT"): + return f"{asset[:-4]}-USDT" + return asset + + +# ── DITAv2 kernel state (does NOT use Hazelcast) ────────────────────────────── +_KERNEL_SNAP_PATH = Path("/tmp/.pink_kernel_state.json") +_DITAV2_ACCOUNT_CACHE: dict = {"ts": 0.0, "data": None} + + +def _read_ditav2_kernel_snapshot() -> dict: + """Read the last saved DITAv2 kernel state from disk (non-blocking).""" + try: + if not _KERNEL_SNAP_PATH.exists(): + return {} + raw = _KERNEL_SNAP_PATH.read_text(encoding="utf-8") + data = json.loads(raw) + data["_snap_mtime"] = _KERNEL_SNAP_PATH.stat().st_mtime + return data + except Exception: + return {} + + +def _bingx_rest_get(path: str, params: dict) -> dict: + """Synchronous signed GET to BingX. Returns parsed JSON or {}.""" + try: + base = _bingx_env_base_url() + signed = _bingx_sign( + {**params, "timestamp": int(time.time() * 1000), "recvWindow": _BINGX_RECV_WINDOW}, + _BINGX_SECRET_KEY, + ) + url = f"{base}{path}?{urllib.parse.urlencode(signed)}" + req = urllib.request.Request(url, method="GET") + req.add_header("X-BX-APIKEY", _BINGX_API_KEY) + with urllib.request.urlopen(req, timeout=5) as resp: + return json.loads(resp.read()) + except Exception: + return {} + + +def _fetch_ditav2_bingx_account() -> dict: + """Cache-throttled live BingX VST balance query (max 1 call / 5 s).""" + now = time.time() + cached = _DITAV2_ACCOUNT_CACHE + if now - float(cached.get("ts", 0.0)) < 5.0: + return cached.get("data") or {} + if not _BINGX_API_KEY or not _BINGX_SECRET_KEY: + cached["ts"] = now + cached["data"] = {} + return {} + raw = _bingx_rest_get("/openApi/swap/v2/user/balance", {}) + bal = (raw.get("data") or {}).get("balance") or {} + cached["ts"] = now + cached["data"] = bal + return bal + + +def _render_ditav2_section() -> str: + """Render the DITAv2 PINK kernel + BingX account section.""" + snap = _read_ditav2_kernel_snapshot() + bal = _fetch_ditav2_bingx_account() + + L = [] + L.append(f"{BOLD}{CYAN}━━ DITAv2 PINK [kernel + BingX VST]{RST}") + + if not snap: + L.append(f" {DIM}kernel snapshot not found at {_KERNEL_SNAP_PATH}{RST}") + L.append(f" {DIM}(PINK not started, or no clean shutdown after first trade){RST}") + else: + snap_mtime = snap.get("_snap_mtime", 0) + snap_age = time.time() - snap_mtime if snap_mtime else None + snap_age_s = _age_seconds(snap_age) if snap_age is not None else "?" + snap_ts_ms = snap.get("snapshot_ts_ms", 0) + snap_dt = ( + datetime.fromtimestamp(snap_ts_ms / 1000.0, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") + if snap_ts_ms + else "?" + ) + frozen = snap.get("capital_frozen", False) + frozen_col = RED if frozen else GREEN + frozen_s = f"{RED}FROZEN{RST}" if frozen else f"{GREEN}OK{RST}" + + acc = snap.get("account") or {} + capital = float(acc.get("capital") or acc.get("equity") or 0.0) + equity = float(acc.get("equity") or capital) + avail = float(acc.get("available_capital") or capital) + open_pos = int(acc.get("open_positions") or 0) + trade_seq = int(acc.get("trade_seq") or 0) + realized = float(acc.get("realized_pnl_total") or 0.0) + fees = float(acc.get("fee_total") or 0.0) + + fee_cal = snap.get("fee_calibration") or {} + fc_status = str(fee_cal.get("calibration_status") or "–") + fc_col = GREEN if fc_status == "OK" else (YELLOW if fc_status == "WARN" else RED) + fc_ratio = float(fee_cal.get("calibration_ratio") or 1.0) + fc_taker = float(fee_cal.get("taker_rate") or 0.0005) + fc_maker = float(fee_cal.get("maker_rate") or 0.0002) + + slots = snap.get("slots") or [] + L.append( + f" {BOLD}KERNEL{RST} snap:{snap_dt} age:{snap_age_s} frozen:{frozen_s}" + ) + L.append( + f" capital:${capital:,.2f} equity:${equity:,.2f} avail:${avail:,.2f}" + f" open:{open_pos} trades:{trade_seq}" + ) + rpnl_col = GREEN if realized >= 0 else RED + L.append( + f" realized:{rpnl_col}{realized:+.4f}{RST} fees:{fees:+.4f}" + f" fee_cal: taker:{fc_taker*100:.3f}% maker:{fc_maker*100:.3f}%" + f" ratio:{fc_ratio:.4f} {fc_col}[{fc_status}]{RST}" + ) + + for slot in slots: + fsm = str(slot.get("fsm_state") or "IDLE") + asset = str(slot.get("asset") or "–") + side = str(slot.get("side") or "–") + size = float(slot.get("size") or 0.0) + ep = float(slot.get("entry_price") or 0.0) + rpnl = float(slot.get("realized_pnl") or 0.0) + upnl = float(slot.get("unrealized_pnl") or 0.0) + sid = int(slot.get("slot_id") or 0) + fsm_col = ( + GREEN if fsm == "POSITION_OPEN" + else YELLOW if fsm in ("EXIT_REQUESTED", "EXIT_SENT", "EXIT_WORKING", "ENTRY_REQUESTED", "ENTRY_SENT", "ENTRY_WORKING") + else RED if fsm in ("ERROR", "CLOSED") + else DIM + ) + pnl_col = GREEN if (rpnl + upnl) >= 0 else RED + L.append( + f" slot[{sid}] {fsm_col}{fsm:<20}{RST} {asset} {side}" + f" size:{size:.4f} ep:{ep:.4g}" + f" pnl:{pnl_col}{(rpnl+upnl):+.4f}{RST}" + ) + + # ── Live BingX account ──────────────────────────────────────────────── + env_s = os.environ.get("DOLPHIN_BINGX_ENV", "VST") + if not bal: + creds_ok = bool(_BINGX_API_KEY and _BINGX_SECRET_KEY) + reason = "no credentials" if not creds_ok else "query failed / no data" + L.append(f" {BOLD}BINGX [{env_s}]{RST} {DIM}{reason}{RST}") + else: + b_wallet = float(bal.get("balance") or bal.get("walletBalance") or 0.0) + b_equity = float(bal.get("equity") or b_wallet) + b_avail = float(bal.get("availableMargin") or b_wallet) + b_used = float(bal.get("usedMargin") or 0.0) + b_upnl = float(bal.get("unrealizedProfit") or 0.0) + b_real = float(bal.get("realisedProfit") or bal.get("realizedProfit") or 0.0) + upnl_col = GREEN if b_upnl >= 0 else RED + real_col = GREEN if b_real >= 0 else RED + L.append( + f" {BOLD}BINGX [{env_s}]{RST}" + f" wallet:${b_wallet:,.2f} equity:${b_equity:,.2f}" + f" avail:${b_avail:,.2f} used:${b_used:,.2f}" + f" upnl:{upnl_col}{b_upnl:+.2f}{RST}" + f" realized:{real_col}{b_real:+.2f}{RST}" + ) + + L.append(f"{BOLD}{CYAN}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━{RST}") + return "\n".join(L) + + +# ── Hotkey: CH audit ────────────────────────────────────────────────────────── +def _hotkey_audit(action: str, request: dict, result: str, effect: dict | None = None): + ch_put({ + "ts": int(time.time() * 1000), + "hotkey": action, + "request_json": json.dumps(request, default=str), + "result": result, + "effect_json": json.dumps(effect or {}, default=str), + }, table="hotkey_audit") + + +# ── Hotkey actions ──────────────────────────────────────────────────────────── +def _hotkey_reduce_position(fraction: float, hz) -> str: + """Submit internal PINK retraction command (fractional exit) via HZ control plane.""" + if fraction <= 0 or fraction > 1: + return f"invalid fraction {fraction}" + eng = _get(hz, PINK_STATE_MAP, "engine_snapshot") + positions = eng.get("open_positions") or [] + if not positions: + _hotkey_audit(f"reduce_{fraction:.0%}", {"fraction": fraction}, "NO_POSITION") + return "no open position" + pos = positions[0] + asset = pos.get("asset", "?") + side = str(pos.get("side", "?")).upper() + trade_id = str(pos.get("trade_id", "") or "").strip() + qty = float(pos.get("quantity", 0) or 0) + entry = float(pos.get("entry_price", 0) or 0) + chain_root_trade_id = str(pos.get("chain_root_trade_id", trade_id) or trade_id).strip() + chain_head_leg_id = str(pos.get("chain_head_leg_id", f"{trade_id}:open") or f"{trade_id}:open").strip() + chain_prev_leg_id = str(pos.get("chain_prev_leg_id", "") or "").strip() + chain_seq = int(pos.get("chain_seq", pos.get("retraction_legs", 0)) or 0) + chain_token = str(pos.get("chain_token", "") or "").strip() + if not trade_id: + _hotkey_audit(f"reduce_{fraction:.0%}", {"fraction": fraction}, "NO_TRADE_ID") + return "no trade_id" + if not chain_root_trade_id or not chain_head_leg_id or not chain_token: + _hotkey_audit( + f"reduce_{fraction:.0%}", + {"fraction": fraction, "trade_id": trade_id, "chain_root_trade_id": chain_root_trade_id, "chain_head_leg_id": chain_head_leg_id, "chain_prev_leg_id": chain_prev_leg_id, "chain_seq": chain_seq}, + "NO_CHAIN_LINK", + ) + return "no chain link" + if qty <= 0: + _hotkey_audit(f"reduce_{fraction:.0%}", {"fraction": fraction}, "ZERO_QTY") + return "zero quantity" + command_id = f"hk-retract-{uuid.uuid4().hex[:16]}" + request = { + "action": f"reduce_{fraction:.0%}", + "asset": asset, + "side": side, + "requested_fraction": float(fraction), + "entry_price": entry, + "command_id": command_id, + "trade_id": trade_id, + "chain_root_trade_id": chain_root_trade_id, + "chain_head_leg_id": chain_head_leg_id, + "chain_prev_leg_id": chain_prev_leg_id, + "chain_seq": chain_seq, + } + try: + control_map = hz.get_map("DOLPHIN_CONTROL_PLANE").blocking() + key = PINK_RUNTIME_COMMAND_KEY + raw = control_map.get(key) + queue = [] + if raw: + try: + queue = json.loads(raw) if isinstance(raw, str) else list(raw) + except Exception: + queue = [] + if not isinstance(queue, list): + queue = [] + queue.append({ + "command_id": command_id, + "trade_id": trade_id, + "action": "RETRACT", + "fraction": float(fraction), + "reason": "HOTKEY_RETRACT", + "source": "tui_hotkey", + "ts": float(time.time()), + "asset": asset, + "side": side, + "chain_root_trade_id": chain_root_trade_id, + "chain_head_leg_id": chain_head_leg_id, + "chain_prev_leg_id": chain_prev_leg_id, + "chain_seq": chain_seq, + "chain_token": chain_token, + }) + # Keep queue bounded in-case trader is down. + queue = queue[-200:] + control_map.put(key, json.dumps(queue)) + msg = f"{side} {asset} retract {fraction:.0%}: cmd={command_id}" + _hotkey_audit(f"reduce_{fraction:.0%}", request, "OK", { + "command_id": command_id, "fraction": float(fraction), + "trade_id": trade_id, + "chain_root_trade_id": chain_root_trade_id, + "chain_head_leg_id": chain_head_leg_id, + "chain_seq": chain_seq, + }) + return msg + except Exception as e: + _hotkey_audit(f"reduce_{fraction:.0%}", request, f"ERROR: {e}") + return f"ERROR: {e}" + + +def _hotkey_close_all(hz) -> str: + return _hotkey_reduce_position(1.0, hz) + + +def _hotkey_set_posture(posture: str, hz) -> str: + """Switch posture via DOLPHIN_SAFETY when explicitly enabled.""" + if posture not in ("APEX", "STALKER", "TURTLE", "HIBERNATE"): + return f"invalid posture {posture}" + if not PINK_ALLOW_GLOBAL_POSTURE_HOTKEYS: + msg = "posture hotkeys disabled (set DOLPHIN_PINK_TUI_ENABLE_GLOBAL_POSTURE=1 to enable)" + _hotkey_audit(f"set_posture_{posture}", {"posture": posture}, "DISABLED") + return msg + safe_map = hz.get_map("DOLPHIN_SAFETY").blocking() + raw = safe_map.get("latest") + current = {} + if raw: + try: + current = json.loads(raw) if isinstance(raw, str) else raw + except Exception: + current = {} + prev_posture = current.get("posture", "?") + current["posture"] = posture + current["posture_ts"] = time.time() + current["posture_source"] = "hotkey_tui" + safe_map.put("latest", json.dumps(current)) + msg = f"posture {prev_posture} → {posture}" + _hotkey_audit(f"set_posture_{posture}", { + "posture": posture, "prev_posture": prev_posture, + }, "OK", {"new_posture": posture}) + return msg + + +# ── Non-blocking stdin reader ───────────────────────────────────────────────── +def _read_hotkey(timeout_s: float = 0.05) -> str | None: + """Read a single keypress from stdin without blocking. Returns key string or None.""" + if not sys.stdin.isatty(): + return None + dr, _, _ = select.select([sys.stdin], [], [], timeout_s) + if not dr: + return None + ch = sys.stdin.read(1) + if ch == "\x1b": + if select.select([sys.stdin], [], [], 0.02)[0]: + seq = ch + sys.stdin.read(2) + if len(seq) == 3 and seq[1] == "[": + fn = {"A": None, "B": None, "C": None, "D": None, + "P": "F1", "Q": "F2", "R": "F3", "S": "F4"} + return fn.get(seq[2]) + return None + return None + if ch == "\x03": + raise KeyboardInterrupt + if ch == "\x04": + raise KeyboardInterrupt + return ch if ch.isprintable() or ch in ("\n", "\r", "\t") else None + + +HOTKEY_MAP = { + "1": ("REDUCE 50%", lambda hz: _hotkey_reduce_position(0.50, hz)), + "2": ("REDUCE 75%", lambda hz: _hotkey_reduce_position(0.75, hz)), + "3": ("CLOSE ALL", lambda hz: _hotkey_close_all(hz)), + "s": ("→ STALKER", lambda hz: _hotkey_set_posture("STALKER", hz)), + "a": ("→ APEX", lambda hz: _hotkey_set_posture("APEX", hz)), + "t": ("→ TURTLE", lambda hz: _hotkey_set_posture("TURTLE", hz)), + "h": ("HELP", None), +} + + +def _age(ts): + if not ts: return "?" + s = time.time() - ts + if s < 0: return "0s" + if s < 60: return f"{s:.0f}s" + if s < 3600: return f"{s/60:.0f}m" + return f"{s/3600:.1f}h" + +def _age_seconds(s): + if s is None: + return "?" + try: + s = float(s) + except Exception: + return "?" + if s < 0: + return "0s" + if s < 60: + return f"{s:.0f}s" + if s < 3600: + return f"{s/60:.0f}m" + return f"{s/3600:.1f}h" + +def _iso_age_seconds(raw): + if not raw: + return None + if isinstance(raw, (int, float)): + try: + return max(0.0, time.time() - float(raw)) + except Exception: + return None + if isinstance(raw, str): + try: + dt = datetime.fromisoformat(raw.replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return max(0.0, time.time() - dt.timestamp()) + except Exception: + return None + return None + +def _engine_snapshot_age_seconds(eng): + if not isinstance(eng, dict): + return None + for key in ("timestamp", "updated_at", "last_update", "ts"): + age = _iso_age_seconds(eng.get(key)) + if age is not None: + return age + return None + +def _is_trader_live(hb, eng): + hb_age = _iso_age_seconds(hb.get("ts")) if isinstance(hb, dict) else None + hb_ok = hb_age is not None and hb_age < TRADER_HB_STALE_S + eng_age = _engine_snapshot_age_seconds(eng) + eng_ok = eng_age is not None and eng_age < TRADER_ENGINE_STALE_S + live = hb_ok or eng_ok + src = [] + if hb_ok: + src.append("hb") + if eng_ok: + src.append("eng") + if not src: + src.append("stale") + return live, "+".join(src), hb_age, eng_age + +def _bar(v, w=20): + v = max(0.0, min(1.0, v)) + return "█" * round(v * w) + "░" * (w - round(v * w)) + +def _get(hz, map_name, key): + try: + raw = hz.get_map(map_name).blocking().get(key) + if raw is None: + return {} + if isinstance(raw, (dict, list)): + return raw + if isinstance(raw, (bytes, bytearray)): + try: + raw = raw.decode("utf-8", errors="replace") + except Exception: + return {} + if isinstance(raw, str): + text = raw.strip() + if not text: + return {} + try: + return json.loads(text) + except Exception: + return {} + return {} + except Exception: + return {} + + +def _int_or_default(value, default: int = 0) -> int: + """Coerce a possibly-missing snapshot value to a stable integer.""" + try: + if value is None or value == "": + return int(default) + return int(value) + except Exception: + try: + return int(float(value)) + except Exception: + return int(default) + +# ── Gear items ──────────────────────────────────────────────────────────────── +# Each returns (label, color, value_str) +def _item(label, color, val=""): + dot = f"{color}●{RST}" + v = f":{val}" if val else "" + return f"{dot}{DIM}{label}{v}{RST}" + +def _vel_item(vel_div): + """vel_div colored by distance to threshold (-0.02).""" + v = f"{vel_div:+.4f}" + if vel_div <= VEL_DIV_EXTREME: + return _item("vel_div", GREEN, v) # extremely bearish — great + elif vel_div <= VEL_DIV_THRESHOLD: + return _item("vel_div", GREEN, v) # past threshold — signal green + elif vel_div <= VEL_DIV_CLOSE: + return _item("vel_div", YELLOW, v) # -0.015 to -0.020 — close + elif vel_div <= VEL_DIV_WARN: + return _item("vel_div", ORANGE, v) # -0.010 to -0.015 — approaching + elif vel_div < 0: + return _item("vel_div", RED, v) # negative but far + else: + return _item("vel_div", RED, v) # positive — not bearish + +def signal_fired(vel_div, vol_ok, posture, acb_ready, exf_ok, halt): + """True if ALL signal preconditions are green.""" + return ( + vel_div <= VEL_DIV_THRESHOLD + and vol_ok + and posture not in ("HIBERNATE", "TURTLE") + and acb_ready + and exf_ok + and not halt + ) + +def trade_can_execute(open_count, lev, abs_cap, daily_loss_ok, boost): + return ( + open_count == 0 # no open position already + and lev < abs_cap # leverage headroom + and daily_loss_ok + and boost > 0 + ) + +OB_IMBALANCE_BIAS = -0.09 # from engine config: ob_imbalance_bias + +def _best_fill_candidate(obf_universe): + """Pick best SHORT candidate from OBF universe. + Criteria: negative imbalance (bearish pressure) + high fill_probability + low spread. + Returns (symbol, asset_dict) or (None, {}). + """ + candidates = [] + for k, v in obf_universe.items(): + if not isinstance(v, dict) or "fill_probability" not in v: + continue + candidates.append((k, v)) + if not candidates: + return None, {} + # Score: fill_prob * (1 + bearish_imbalance_bonus) / (1 + spread_bps/10) + def score(item): + sym, a = item + imb = float(a.get("imbalance", 0)) + fp = float(a.get("fill_probability", 0)) + sp = float(a.get("spread_bps", 99)) + dq = float(a.get("depth_quality", 0)) + # Bearish bias: reward negative imbalance, penalise positive + imb_bonus = max(0.0, -imb) # 0..1 for imbalance in [-1,0] + return fp * (1 + imb_bonus) * dq / max(0.1, sp) + candidates.sort(key=score, reverse=True) + return candidates[0] + + +def _update_exit_tracker(positions, eng, live_price, ob_imbalance): + """Accumulate MAE/MFE state for V7 comparison from live OBF prices.""" + global _EXIT_TRACKER + if not positions or live_price <= 0: + _EXIT_TRACKER.clear() + return None + pos = positions[0] + asset = pos.get("asset", "?") + ep = float(pos.get("entry_price", 0) or 0) + side = pos.get("side", "SHORT") + if ep <= 0: + return None + key = (asset, ep) + bar_idx = int(eng.get("bar_idx", 0) or 0) + if key not in _EXIT_TRACKER: + _EXIT_TRACKER.clear() + _EXIT_TRACKER[key] = { + "entry_bar": bar_idx, "first_seen": time.time(), + "peak_adverse": 0.0, "peak_favorable": 0.0, + "prev_mae": 0.0, "prev_mfe": 0.0, + "mae_velocity": 0.0, "mfe_velocity": 0.0, + "prices": deque(maxlen=60), + } + t = _EXIT_TRACKER[key] + t["prices"].append(live_price) + t["notional"] = float(pos.get("notional", 0) or 0) + t["unrealized_pnl"] = float(pos.get("unrealized_pnl", 0) or 0) + if side == "SHORT": + pnl = (ep - live_price) / ep + mae = max(0.0, (live_price - ep) / ep) + mfe = max(0.0, (ep - live_price) / ep) + else: + pnl = (live_price - ep) / ep + mae = max(0.0, (ep - live_price) / ep) + mfe = max(0.0, (live_price - ep) / ep) + t["peak_adverse"] = max(t["peak_adverse"], mae) + t["peak_favorable"] = max(t["peak_favorable"], mfe) + t["mae_velocity"] = mae - t["prev_mae"] + t["mfe_velocity"] = mfe - t["prev_mfe"] + t["prev_mae"] = mae + t["prev_mfe"] = mfe + t["bars_held"] = max(0, bar_idx - t["entry_bar"]) + t["pnl_pct"] = pnl + t["live_price"] = live_price + t["ob_imbalance"] = ob_imbalance + t["entry_price"] = ep + t["side"] = side + t["asset"] = asset + return t + + +def _v7_preview(t): + """Simplified V7 decision from tracker state (MAE/MFE/time channels).""" + if not t: + return None + bh = t.get("bars_held", 0) + bf = min(1.0, bh / MAX_HOLD_BARS) if MAX_HOLD_BARS else 0 + pa = t["peak_adverse"] + pf = t["peak_favorable"] + mae = t["prev_mae"] + mfe = t["prev_mfe"] + pnl = t.get("pnl_pct", 0) + # MAE risk — V7 floor thresholds (without vol-adaptive since TUI lacks full history) + mae_risk = 0.0 + if pa > 0.005: mae_risk += 0.5 + if pa > 0.012: mae_risk += 0.8 + if pa > 0.020: mae_risk += 1.2 + # MAE-B: adverse acceleration + if bh >= 3 and t["mae_velocity"] > 0 and mae > 0.003: + mae_risk += 0.6 + # MAE-D: late-stage time-weighted + if mae > 0.003 and bf > 0.60: + mae_risk += (bf - 0.60) / 0.40 * 0.4 + # MFE risk — convexity decay + mfe_risk = 0.0 + decay = (pf - mfe) / (pf + 1e-9) if pf > 0 else 0.0 + if decay > 0.35 and t["mfe_velocity"] < 0 and pf > 0.01: + mfe_risk += 1.5 + if decay > 0.20: + mfe_risk += 0.3 + # Exit pressure (simplified: MAE + MFE channels weighted as V7) + pressure = 2.0 * mae_risk + 2.5 * mfe_risk + if bf > 0.80 and pnl < 0: + pressure += 0.5 + if bf > 0.95: + pressure += 1.0 + # Decision (mirrors V7 thresholds) + if pressure > 2.0: + action = "EXIT" + reason = "V7.1_MAE_SL_VOL_NORM" if mae_risk > mfe_risk else "V7_COMPOSITE" + elif pressure > 1.0: + action = "RETRACT" + reason = "V7_RISK_DOM" + elif pressure < -0.5 and pnl > 0: + action = "EXTEND" + reason = "V7_DIR_EDGE" + else: + action = "HOLD" + reason = "\u2014" + proj_usd = pnl * t.get("notional", 0) + return { + "action": action, "reason": reason, "pressure": pressure, + "mae": pa, "mfe": pf, "mae_risk": mae_risk, "mfe_risk": mfe_risk, + "bars_held": bh, "bars_frac": bf, "pnl_pct": pnl, + "proj_usd": proj_usd, + } + + +def fill_row(obf_universe, acb, eng): + """Row 3: signal → asset-pick → OBF liquidity → size → ORDER.""" + f_items = [] + + # ── Asset picker (IRP/ARS) ───────────────────────────────────────────── + n_assets = int(obf_universe.get("_n_assets", 0) if obf_universe else 0) + n_stale = int(obf_universe.get("_n_stale", 0) if obf_universe else 0) + n_fresh = n_assets - n_stale + + f_items.append(_item("universe", + GREEN if n_fresh >= 200 else (YELLOW if n_fresh >= 50 else RED), + f"{n_fresh}/{n_assets}")) + + sym, ab = _best_fill_candidate(obf_universe) + if sym: + fill_p = float(ab.get("fill_probability", 0)) + spread = float(ab.get("spread_bps", 99)) + dq = float(ab.get("depth_quality", 0)) + imb = float(ab.get("imbalance", 0)) + depth = float(ab.get("depth_1pct_usd", 0)) + + # Best candidate asset + asset_color = GREEN if fill_p >= 0.80 else (YELLOW if fill_p >= 0.50 else RED) + f_items.append(_item("best", asset_color, sym[:6])) + + # OBF: fill probability + f_items.append(_item("fill_p", + GREEN if fill_p >= 0.85 else (YELLOW if fill_p >= 0.60 else RED), + f"{fill_p:.2f}")) + + # OBF: spread + f_items.append(_item("spread", + GREEN if spread <= 3 else (YELLOW if spread <= 8 else RED), + f"{spread:.1f}bps")) + + # OBF: depth quality + f_items.append(_item("depth_q", + GREEN if dq >= 0.5 else (YELLOW if dq >= 0.1 else RED), + f"{dq:.2f}")) + + # OBF: imbalance direction (SHORT needs bearish = negative) + imb_ok = imb < OB_IMBALANCE_BIAS # confirmed bearish pressure + f_items.append(_item("imb", + GREEN if imb_ok else + YELLOW if imb < 0 else + ORANGE if imb < 0.1 else RED, + f"{imb:+.2f}")) + + # OBF: depth USD + f_items.append(_item("depth", + GREEN if depth >= 50_000 else (YELLOW if depth >= 10_000 else RED), + f"${depth/1000:.0f}k")) + + else: + f_items.append(_item("OBF", RED, "no data")) + + # ── Sizing — ACB boost × proxy_B prank ──────────────────────────────── + # proxy_B prank not exposed in HZ snapshot; show ACB boost as sizing proxy + boost = float(acb.get("boost", 1.0) if acb else 1.0) + beta = float(acb.get("beta", 0.8) if acb else 0.8) + f_items.append(_item("acb_boost", + GREEN if boost >= 1.5 else (YELLOW if boost >= 1.0 else ORANGE), + f"×{boost:.2f}")) + + f_items.append(_item("beta", + GREEN if beta >= 0.7 else (YELLOW if beta >= 0.4 else RED), + f"{beta:.2f}")) + + # ── ORDER indicator ──────────────────────────────────────────────────── + # Would an order fire if signal were green right now? + open_count = len(eng.get("open_positions") or []) + lev = float(eng.get("current_leverage", 0) or 0) + abs_c = float(eng.get("leverage_abs_cap", 9.0) or 9.0) + order_ready = ( + sym is not None + and fill_p >= 0.60 + and open_count == 0 + and lev < abs_c + and boost > 0 + ) if sym else False + + if order_ready: + f_items.append(f" {CYAN}{BOLD}◉ ORDER READY{RST}") + else: + f_items.append(f" {DIM}(order: waiting){RST}") + + return " ".join(f_items) + + +def gear_rows(eng, safe, acb, exf, hb, obf_universe=None): + """Return three formatted rows: SIGNAL, TRADE gates, FILL path.""" + vel_div = float(eng.get("last_vel_div", 0) or 0) + vol_ok = bool(eng.get("vol_ok", False)) + posture = safe.get("posture") or eng.get("posture") or "?" + halt = posture in ("HIBERNATE", "TURTLE") + + acb_boost_val = float(acb.get("boost", acb.get("cut", 0)) or 0) + acb_ready = acb_boost_val > 0 # cut=0 means blocked + exf_ok_count = int(exf.get("_ok_count", 0) if exf else 0) + exf_ok = exf_ok_count >= 3 + + open_count = len(eng.get("open_positions") or []) + lev = float(eng.get("current_leverage", 0) or 0) + abs_cap = float(eng.get("leverage_abs_cap", 9.0) or 9.0) + trades_ex = int(eng.get("trades_executed") or 0) + + hb_ts = hb.get("ts") + hb_ok = bool(hb_ts and (time.time() - hb_ts) < 30) + + # ── SIGNAL ROW ──────────────────────────────────────────────────────────── + # vol_ok is the MASTER GATE — listed first. When False, _try_entry is never + # called regardless of vel_div. Threshold is engine-configured runtime value. + s_items = [] + + # BTC vol — try to get live reading from exf or obf for display context + btc_vol_str = "—" + if exf: + dvol_raw = exf.get("dvol_btc") or exf.get("dvol") + fng_raw = exf.get("fng") + if dvol_raw: + btc_vol_str = f"dV:{float(dvol_raw):.0f}" + if fng_raw: + btc_vol_str += f" FnG:{float(fng_raw):.0f}" + + vol_label = f"vol_ok({btc_vol_str})" + s_items.append(_item(vol_label, + GREEN if vol_ok else RED, + "✓" if vol_ok else f"✗ BLOCKED")) + + s_items.append(_vel_item(vel_div)) + + # posture gate + pc = PC.get(posture, DIM) + posture_ok = posture in ("APEX", "STALKER") + s_items.append(_item("posture", + GREEN if posture == "APEX" else (YELLOW if posture == "STALKER" else RED), + posture)) + + # acb_ready + s_items.append(_item("acb", + GREEN if acb_ready else (ORANGE if acb_boost_val > 0 else RED), + f"{acb_boost_val:.2f}")) + + # exf_ok — external factors pipeline + s_items.append(_item("exf", + GREEN if exf_ok else (YELLOW if exf_ok_count >= 1 else RED), + f"{exf_ok_count}/5")) + + # halt gate + s_items.append(_item("no_halt", + GREEN if not halt else RED, + "✓" if not halt else "HALT")) + + # heartbeat + s_items.append(_item("hb", + GREEN if hb_ok else RED, + _age(hb_ts))) + + # ALL GREEN → fire indicator + all_sig = signal_fired(vel_div, vol_ok, posture, acb_ready, exf_ok, halt) + if all_sig: + s_items.append(f" {GREEN}{BOLD}◉ SIGNAL{RST}") + + # ── TRADE ROW ───────────────────────────────────────────────────────────── + # Additional gates that must pass before a matched signal becomes a fill + t_items = [] + + # open positions + t_items.append(_item("open_pos", + GREEN if open_count == 0 else ORANGE, + str(open_count))) + + # leverage headroom + lev_pct = lev / abs_cap if abs_cap else 0 + t_items.append(_item("lev", + GREEN if lev_pct < 0.3 else (YELLOW if lev_pct < 0.7 else RED), + f"{lev:.2f}x/{abs_cap:.0f}")) + + # regime_dd_halt + t_items.append(_item("regime", + GREEN if not halt else RED, + "free" if not halt else "HALTED")) + + # Rm strength + rm = float(safe.get("Rm", 0) or 0) + t_items.append(_item("Rm", + GREEN if rm >= 0.90 else (YELLOW if rm >= 0.70 else (ORANGE if rm >= 0.50 else RED)), + f"{rm:.3f}")) + + # Cat5 (intraday drawdown contribution) + c5 = float((safe.get("breakdown") or {}).get("Cat5", 1.0) or 1.0) + t_items.append(_item("Cat5", + GREEN if c5 >= 0.95 else (YELLOW if c5 >= 0.85 else (ORANGE if c5 >= 0.70 else RED)), + f"{c5:.3f}")) + + # trades today + t_items.append(_item("trades", + GREEN if trades_ex < 20 else (YELLOW if trades_ex < 35 else ORANGE), + str(trades_ex))) + + # ALL GREEN trade execute indicator + daily_loss_ok = c5 > 0.50 # reasonable proxy — Cat5 tracks drawdown + all_trade = all_sig and trade_can_execute(open_count, lev, abs_cap, daily_loss_ok, acb_boost_val) + if all_trade: + t_items.append(f" {CYAN}{BOLD}◉ TRADE{RST}") + + sig_row = " ".join(s_items) + trade_row = " ".join(t_items) + fill = fill_row(obf_universe or {}, acb, eng) + return sig_row, trade_row, fill + + +def render(hz): + global START_CAP, CAP_PEAK + + # ── DITAv2 kernel + BingX account (reads kernel snapshot file, not Hz) ── + ditav2_section = _render_ditav2_section() + + eng = _get(hz, PINK_STATE_MAP, "engine_snapshot") + cap = _get(hz, PINK_STATE_MAP, "capital_checkpoint") + safe = _get(hz, PINK_SAFETY_MAP, "latest") + hb = _get(hz, PINK_HEARTBEAT_MAP, PINK_HEARTBEAT_KEY) + mh = _get(hz, PINK_META_HEALTH_MAP, "latest") + ann = _get(hz, PINK_ANNOUNCEMENTS_MAP, "latest") + acb = _get(hz, PINK_FEATURES_MAP, "acb_boost") + exf = _get(hz, PINK_FEATURES_MAP, "exf_latest") + obf = _get(hz, PINK_FEATURES_MAP, "obf_universe_latest") + esof = _get(hz, PINK_FEATURES_MAP, "esof_advisor_latest") + if not esof: + esof = _get(hz, PINK_FEATURES_MAP, "esof_latest") + maras = _get(hz, PINK_FEATURES_MAP, "maras_latest") + eng, account_row = _overlay_account_event_state(eng) + + now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") + capital = float(eng.get("capital", 0) or cap.get("capital", 0)) + posture = safe.get("posture") or eng.get("posture") or "?" + rm = float(safe.get("Rm", 0)) + hb_ts = hb.get("ts") + phase = hb.get("phase", "?") + trader_up, trader_src, hb_age_s, eng_age_s = _is_trader_live(hb, eng) + trades = _int_or_default(eng.get("trades_executed"), 0) + fills_today = _account_event_fill_count(account_row) + scans = _int_or_default(eng.get("scans_processed"), _int_or_default(eng.get("last_scan_number"), 0)) + lev = float(eng.get("current_leverage", 0)) + notional= float(eng.get("open_notional", 0)) + mhs_st = mh.get("status", "?") + rm_meta = float(mh.get("rm_meta", 0)) + ann_title = ann.get("title") if ann else None + ann_sev = (ann.get("severity") or "info") if ann else "info" + ann_col = {"critical": RED, "warning": YELLOW, "info": CYAN}.get(ann_sev, DIM) + ann_ts = ann.get("ts") if ann else None + ann_age = _age_seconds(_iso_age_seconds(ann_ts)) if ann_ts else "?" + + if capital > 0: + if START_CAP is None: START_CAP = capital + if CAP_PEAK is None or capital > CAP_PEAK: CAP_PEAK = capital + + roi = ((capital - START_CAP) / START_CAP * 100) if START_CAP and capital else 0 + dd = ((CAP_PEAK - capital) / CAP_PEAK * 100) if CAP_PEAK and capital < CAP_PEAK else 0 + + pc = PC.get(posture, DIM) + sc = SC.get(mhs_st, DIM) + tc = GREEN if trader_up else RED + roi_c = GREEN if roi >= 0 else RED + dd_c = RED if dd > 15 else (YELLOW if dd > 5 else GREEN) + + sig_row, trade_row, fill_row_str = gear_rows(eng, safe, acb, exf, hb, obf) + + svc = mh.get("service_status", {}) + hz_ks = mh.get("hz_key_status", {}) + + L = [] + L.append(f"{BOLD}{CYAN}🐬 DOLPHIN-PINK{RST} {BOLD}v7{RST} {DIM}{now}{RST}") + L.append("─" * 65) + + # TRADER + L.append(f"{BOLD}TRADER{RST} {tc}{'● LIVE' if trader_up else '● DOWN'}{RST}" + f" phase:{phase} hb:{_age(hb_ts)} eng:{_age_seconds(eng_age_s)}" + f" src:{trader_src}" + f" scan:#{eng.get('last_scan_number','?')}") + + # ── SIGNAL → FILL GEARS ─────────────────────────────────────────────────── + vol_ok_live = bool(eng.get("vol_ok", False)) + vol_gate_threshold = float(eng.get("vol_gate_threshold", VOL_P60) or VOL_P60) + if not vol_ok_live: + L.append( + f" {RED}{BOLD}⛔ VOL_OK=FALSE — engine gate closed, " + f"NO trades until BTC vol > {vol_gate_threshold:.6f}{RST}" + ) + L.append(f" {DIM}SIG │{RST} {sig_row}") + L.append(f" {DIM}TRD │{RST} {trade_row}") + L.append(f" {DIM}FIL │{RST} {fill_row_str}") + + # ── EsoF ADVISORY ───────────────────────────────────────────────────────── + if esof: + _ec = { + "FAVORABLE": GREEN, "MILD_POSITIVE": "\033[92m", + "NEUTRAL": YELLOW, "MILD_NEGATIVE": "\033[91m", + "UNFAVORABLE": RED, + } + _lbl = esof.get("advisory_label", "?") + _col = _ec.get(_lbl, DIM) + _sc = float(esof.get("advisory_score", 0)) + _sess = esof.get("session", "?") + _dow = esof.get("dow_name", "?") + _slot = esof.get("slot_15m", "?") + _swr = esof.get("session_wr_pct", 0) + _dwr = esof.get("dow_wr_pct", 0) + _moon = esof.get("moon_phase", "?")[:8] + _retro= " ☿RETRO" if esof.get("mercury_retrograde") else "" + L.append(f" {DIM}EsoF│{RST} {_col}{_lbl:<13}{RST} sc:{_col}{_sc:+.3f}{RST} " + f"sess:{_sess}({_swr:.0f}%) " + f"dow:{_dow}({_dwr:.0f}%) " + f"slot:{_slot} {DIM}{_moon}{_retro}{RST}") + else: + L.append(f" {DIM}EsoF│ advisory unavailable (keys: esof_advisor_latest/esof_latest){RST}") + + L.append("") + + # ── CH persistence ───────────────────────────────────────────────────────── + + # CAPITAL + L.append(f"{BOLD}CAPITAL{RST} {CYAN}${capital:,.2f}{RST}" + + (f" ROI:{roi_c}{roi:+.2f}%{RST} DD:{dd_c}{dd:.2f}%{RST}" + f" start:${START_CAP:,.0f}" if START_CAP else "")) + bar_idx = _int_or_default(eng.get("bar_idx"), _int_or_default(eng.get("last_scan_number"), 0)) + L.append(f" trades:{trades} fills:{fills_today} scans:{scans} bar:{bar_idx}" + f" lev:{lev:.2f}x notional:${notional:,.0f}") + + # Open positions + EXIT COMPARISON + positions = eng.get("open_positions") or [] + if positions: + _pa = positions[0].get("asset", "") + _lp = 0.0; _obi = 0.0 + if _pa and obf: + _oad = obf.get(_pa, {}) + _ob_bid = float(_oad.get("best_bid", 0) or 0) + _ob_ask = float(_oad.get("best_ask", 0) or 0) + _lp = (_ob_bid + _ob_ask) / 2 if _ob_bid > 0 and _ob_ask > 0 else 0 + _obi = float(_oad.get("imbalance", 0) or 0) + L.append(f" {BOLD}OPEN:{RST}") + for p in positions: + sc2 = GREEN if p.get("side") == "LONG" else RED + upnl, cur = _derive_open_position_pnl(p, obf) + cur_s = f" mark:{cur:.4g}" if cur > 0 else "" + L.append(f" {sc2}{p.get('asset','?')} {p.get('side','?')}{RST}" + f" qty:{p.get('quantity',0):.4f}" + f" entry:{p.get('entry_price',0):.2f}" + f" upnl:{upnl:+.2f}{cur_s}") + # ── EXIT COMPARISON: base engine vs V7 ────────────────────────────── + _etr = _update_exit_tracker(positions, eng, _lp, _obi) + _v7p = _v7_preview(_etr) + if _v7p: + bh = _v7p["bars_held"] + bf = _v7p["bars_frac"] + pp = _v7p["pnl_pct"] + tp_pct = abs(pp) / FIXED_TP_PCT * 100 if FIXED_TP_PCT else 0 + bf_bar = round(bf * 20) + bc = GREEN if bf < 0.6 else (YELLOW if bf < 0.85 else RED) + tc = GREEN if pp >= FIXED_TP_PCT else (YELLOW if tp_pct > 50 else DIM) + vc = RED if _v7p["action"] == "EXIT" else (YELLOW if _v7p["action"] == "RETRACT" else GREEN) + ps = "+" if _v7p["proj_usd"] >= 0 else "" + L.append(f" {BOLD}EXIT CMP{RST} {DIM}bar{RST} {bc}{bh}/{MAX_HOLD_BARS} [{'\u2588'*bf_bar+'\u2591'*(20-bf_bar)}]{RST} {bf*100:.0f}%" + f" {DIM}TP{RST} {tc}{abs(pp)*100:.3f}%/{FIXED_TP_PCT*100:.2f}% ({tp_pct:.0f}%){RST}") + L.append(f" {vc}V7:{_v7p['action']:<8}{RST} P={_v7p['pressure']:.2f}" + f" mae:{_v7p['mae']:.4f} mfe:{_v7p['mfe']:.4f}" + f" {DIM}\u2192{RST} {ps}${_v7p['proj_usd']:.2f} ({pp*100:+.3f}%)" + f" {DIM}[{_v7p['reason']}]{RST}") + else: + L.append(f" {DIM}no open positions{RST}") + _EXIT_TRACKER.clear() + + L.append("") + + # POSTURE + bd = safe.get("breakdown") or {} + L.append(f"{BOLD}POSTURE{RST} {pc}{posture}{RST} Rm:{pc}{_bar(rm,20)}{RST} {rm:.4f}") + cats = " ".join(f"C{i}:{float(bd.get(f'Cat{i}',0)):.2f}" for i in range(1,6)) + L.append(f" {cats} f_env:{float(bd.get('f_env',0)):.3f} f_exe:{float(bd.get('f_exe',0)):.3f}") + + L.append("") + + # SYS HEALTH + L.append(f"{BOLD}SYS HEALTH{RST} {sc}{mhs_st}{RST} rm_meta:{rm_meta:.3f}") + for line in _format_two_column_sensor_lines(_sys_health_sensor_rows(mh)): + L.append(line) + + L.append(f" {DIM}services:{RST} " + + " ".join( + f"{'●' if st=='RUNNING' else f'{RED}●{RST}'}{DIM}{n.split(':')[-1]}{RST}" + if st == "RUNNING" else + f"{RED}●{DIM}{n.split(':')[-1]}{RST}" + for n, st in sorted(svc.items()))) + + L.append(f" {DIM}hz_keys:{RST} " + + " ".join( + f"{GREEN if float(i.get('score',0))>=0.9 else (YELLOW if float(i.get('score',0))>=0.5 else RED)}●{RST}{DIM}{k}{RST}" + for k, i in sorted(hz_ks.items()))) + L.append(f" {DIM}ann:{RST} " + + (f"{ann_col}{ann_title}{RST} {DIM}{ann_age}{RST}" if ann_title else f"{DIM}none{RST}")) + + # ── RECENT TRADES ──────────────────────────────────────────────────────── + _, trades_hist, fill_hist, trades_source = _trade_panel_rows(eng, obf, limit=5) + if trades_hist or fill_hist: + L.append("") + L.append(f"{BOLD}RECENT TRADES{RST} {DIM}(from {trades_source}){RST}") + if trades_hist: + for t in trades_hist: + pnl = _closed_trade_display_pnl(t) + pct = float(t.get("pnl_pct", 0)) * 100 + lev = float(t.get("leverage", 0)) + ep = float(t.get("entry_price", 0)) + reason = t.get("reason", "?") + asset = t.get("asset", "?") + bars = t.get("bars_held", 0) + ts_raw = str(t.get("ts", ""))[:16].replace("T", " ") + pc2 = GREEN if pnl >= 0 else RED + L.append( + f" {pc2}{'▲' if pnl>=0 else '▼'}{RST}" + f" {asset:<12} " + f"ep:{ep:.4g} " + f"lev:{lev:.2f}x " + f"pnl:{pc2}{pnl:+.2f}({pct:+.2f}%){RST} " + f"exit:{reason} bars:{bars} {DIM}{ts_raw}{RST}" + ) + elif not fill_hist: + L.append(f" {DIM}no recent trades yet{RST}") + + if fill_hist: + L.append("") + L.append(f"{BOLD}RECENT FILLS{RST} {DIM}(account events, not terminal closes){RST}") + for t in fill_hist: + pnl = _closed_trade_display_pnl(t) + pct = float(t.get("pnl_pct", 0)) * 100 + lev = float(t.get("leverage", 0)) + ep = float(t.get("entry_price", 0)) + reason = t.get("reason", "FILL") + asset = t.get("asset", "?") + bars = t.get("bars_held", 0) + ts_raw = str(t.get("ts", ""))[:16].replace("T", " ") + pc2 = GREEN if pnl >= 0 else RED + L.append( + f" {pc2}•{RST}" + f" {asset:<12} " + f"ep:{ep:.4g} " + f"lev:{lev:.2f}x " + f"fill:{pc2}{pnl:+.2f}({pct:+.2f}%){RST} " + f"status:{reason} bars:{bars} {DIM}{ts_raw}{RST}" + ) + + # ── HOTKEY BAR ────────────────────────────────────────────────────────── + L.append("") + hk_bar = f"{DIM}HOTKEYS:{RST} " + " ".join( + f"{CYAN}[{k}]{RST}{DIM}{desc[0]}{RST}" + for k, desc in sorted(HOTKEY_MAP.items()) + ) + L.append(hk_bar) + if _HOTKEY_FEEDBACK: + for fb_ts, fb_msg in _HOTKEY_FEEDBACK: + fb_age = time.time() - fb_ts + if fb_age < 15: + L.append(f" {GREEN}▶ {fb_msg}{RST} {DIM}({fb_age:.0f}s ago){RST}") + + L.append("") + + # ── MARAS footer ─────────────────────────────────────────────────────────── + if maras: + reg = maras.get("regime","?") + sc = maras.get("final_score",0) + cf = maras.get("confidence",0) + cl = maras.get("conflict_level",0) + ch = maras.get("composite_hash",0) + sh = maras.get("scalar_hash",0) + te = maras.get("tier_exf",0) + tg = maras.get("tier_eigen",0) + tb = maras.get("tier_btc",0) + ts = maras.get("tier_esof",0) + tm = maras.get("tier_micro",0) + rc = {"VERY_BEARISH":RED,"BEARISH":RED,"CHOPPY_BEARISH":YELLOW, + "CHOPPY":YELLOW,"SIDEWAYS":DIM,"CHOPPY_BULLISH":CYAN, + "BULLISH":GREEN,"VERY_BULLISH":GREEN} + col = rc.get(reg, DIM) + confl_col = RED if cl > 0.5 else (YELLOW if cl > 0.3 else GREEN) + L.append(f" {BOLD}{CYAN}MARAS{RST} {col}{reg}{RST}" + f" score={sc:+.2f} conf={cf:.0f}%" + f" {confl_col}conflict={cl:.2f}{RST}" + f" chash={ch} shash={sh}" + f" | ExF={te:+.2f} Eig={tg:+.2f} BTC={tb:+.2f} EsF={ts:+.2f} Mic={tm:+.2f}") + else: + L.append(f" {DIM}MARAS: waiting for daemon start...{RST}") + L.append(f"{DIM}v7 • PINK • 0.5s poll • hotkeys • CH→status+audit • Ctrl-C quit{RST}") + + # ── CH persistence ───────────────────────────────────────────────────────── + # Write every other cycle (1s effective rate) to avoid CH noise + if int(time.time() * 2) % 2 == 0: + ch_put({ + "ts": int(time.time() * 1000), + "capital": capital, + "roi_pct": round(roi, 4), + "dd_pct": round(dd, 4), + "trades_executed": int(eng.get("trades_executed", 0) or 0), + "posture": posture, + "rm": round(rm, 6), + "vel_div": round(float(eng.get("last_vel_div", 0) or 0), 6), + "vol_ok": 1 if eng.get("vol_ok") else 0, + "phase": phase, + "mhs_status": mhs_st, + "boost": round(float(acb.get("boost", 1.0) if acb else 1.0), 4), + "cat5": round(float((safe.get("breakdown") or {}).get("Cat5", 1.0) or 1.0), 6), + }) + + # Prepend the DITAv2 section at the top of the display + return ditav2_section + "\n" + "\n".join(L) + + +def _dispatch_hotkey(key: str, hz): + entry = HOTKEY_MAP.get(key) + if entry is None: + return + label, fn = entry + if fn is None: + return + _HOTKEY_FEEDBACK.append((time.time(), f"{label} ...")) + try: + result = fn(hz) + _HOTKEY_FEEDBACK.append((time.time(), f"{label}: {result}")) + except Exception as e: + _HOTKEY_FEEDBACK.append((time.time(), f"{label} FAILED: {e}")) + + +def main(): + print(f"Connecting to HZ ({PINK_STATE_MAP})...") + hz = hazelcast.HazelcastClient( + cluster_name="dolphin", cluster_members=["localhost:5701"], + connection_timeout=5.0) + posture_mode = "enabled" if PINK_ALLOW_GLOBAL_POSTURE_HOTKEYS else "disabled" + print(f"Connected. PINK hotkeys enabled (global posture hotkeys {posture_mode}).\n") + fd = sys.stdin.fileno() + old_term = None + if sys.stdin.isatty(): + old_term = termios.tcgetattr(fd) + tty.setcbreak(fd) + try: + while True: + try: + sys.stdout.write(CLEAR + render(hz) + "\n") + sys.stdout.flush() + except Exception as e: + sys.stdout.write(f"\n{RED}render error: {e}{RST}\n") + sys.stdout.flush() + try: + key = _read_hotkey(0.5) + if key: + _dispatch_hotkey(key, hz) + except KeyboardInterrupt: + raise + except Exception: + pass + except KeyboardInterrupt: + print(f"\n{DIM}Bye.{RST}") + finally: + if old_term is not None: + termios.tcsetattr(fd, termios.TCSADRAIN, old_term) + hz.shutdown() + + +if __name__ == "__main__": + main() diff --git a/PINK_DITAv2_E2E_TRACE_ANALYSIS.md b/PINK_DITAv2_E2E_TRACE_ANALYSIS.md index 5b7e275..80b2faa 100644 --- a/PINK_DITAv2_E2E_TRACE_ANALYSIS.md +++ b/PINK_DITAv2_E2E_TRACE_ANALYSIS.md @@ -8106,3 +8106,211 @@ These differences mean the same `TradeSlot` JSON can produce different Python ob | X | Pass 21 (Rust Build/Deps/Python Packaging/Shared Mem) | 14 | 3 | 5 | 6 | 0 | 0 | | Y | Pass 22 (Serde Round-Trip/Mock Fidelity/Protocol) | 14 | 1 | 4 | 5 | 4 | 0 | | **Total** | | **389** | **43** | **117** | **114** | **64** | **37** | + +--- + +## PASS 23 — CLOSURE/REVIEW: UNFINISHED FIXES, OPERATIONAL GAPS, FINAL EDGE CASES + +### Z1: `_safe_enum` fix applied to `rust_backend.py` but NOT `real_zinc_plane.py` — the other `_slot_from_payload` still crashes on unknown enums + +**Files:** `rust_backend.py` (has `_safe_enum`), `real_zinc_plane.py:106` (missing it) + +A recent fix added `_safe_enum()` to `rust_backend.py` for graceful fallback on unknown FSM states. The duplicate `_slot_from_payload()` in `real_zinc_plane.py:106` still uses `TradeStage(str(payload.get(...)))` which raises `ValueError` on unknown values. + +**Severity: High** + +### Z2: `_backup_20260530/` has `__init__.py` — accidental import loads old code + +Already covered by T14. Confirmed in this pass. + +**Severity: Medium** + +### Z3: `RealZincControlUnavailable` and `RealZincUnavailable` are separate classes with the same name — catching one does not catch the other + +**File:** `__init__.py:44-45` + +```python +from .real_control_plane import RealZincUnavailable as RealZincControlUnavailable +from .real_zinc_plane import RealZincUnavailable +``` + +Two separate classes named `RealZincUnavailable` from different modules. `RealZincControlUnavailable` is an alias for the one from `real_control_plane.py`. If code catches `RealZincUnavailable`, it won't catch `RealZincControlUnavailable`, and vice versa. The launcher's try/except catches `Exception` broadly, so this is currently harmless — but any future specific exception handling would have a blind spot. + +**Severity: Low** + +### Z4: `test_account_reconcile_faults.py` requires Rust shared library with no skip guard — crashes if Rust not built + +**File:** `test_account_reconcile_faults.py` (multiple lines) + +The test suite creates `ExecutionKernel(max_slots=4)`. `ExecutionKernel.__init__()` calls `_get_rust()` which calls `_RustKernelLib()` which runs `cargo build --release` and loads the `.so`. If Rust isn't installed or the build fails, the test raises `CalledProcessError` or `OSError` — no `pytest.skip()` guard. + +**Severity: Low** + +### Z5: No health check endpoint, no liveness probe, no readiness indicator — silent failures invisible to orchestration + +**File:** (entire codebase) + +The system has no `/health` or `/ready` endpoint. When the Rust kernel panics (caught by `catch_unwind`, returns error), Zinc shared memory corrupts, or the BingX WebSocket silently disconnects, there is: +- No alert mechanism +- No heartbeat +- No way for an orchestrator (Kubernetes, Nomad) to detect a dead process + +The `ExchangeEvent.RECONNECTED` kind exists but nothing consumes it for health monitoring. + +**Severity: High** + +### Z6: `process_intent()` calls `venue.submit()` without exception handler — venue error bypasses Rust FSM entirely + +**File:** `rust_backend.py:747-748` + +```python +emitted_events = self.venue.submit(intent) # no try/except +``` + +If `venue.submit()` raises (network timeout, exchange error, connection refused), the exception propagates directly to the caller. The Rust kernel's FSM never sees it — no SLOT_BUSY, no diagnostic code, no slot state update. Compare with `on_venue_event()` which properly handles the Rust FFI result. + +**Severity: High** + +### Z7: `snapshot()` mixes Rust and Python accounting sources — `capital` and `k_capital` can diverge + +**File:** `rust_backend.py:897-931` + +The `snapshot()` method builds account data from both the Python `AccountProjection` (legacy) and the Rust `AccountState` (new). If they're out of sync (e.g., a venue event was processed in Rust but Python-side `_last_settled_pnl` not reconciled), `capital` and `k_capital` diverge. The fallback `rust_account.get("k_capital", self.account.snapshot.capital)` silently returns the Python value if the Rust key is missing, masking the issue. + +**Severity: Medium** + +### Z8: `BingxVenueAdapter.close()` sets `cls._EXECUTOR = None` before `executor.shutdown()` — concurrent `_get_executor()` can create new executor, orphan old one + +**File:** `bingx_venue.py:264-276` + +```python +executor = cls._EXECUTOR +cls._EXECUTOR = None # <-- concurrent call sees None, creates new executor +executor.shutdown(wait=False) # <-- old executor orphaned if new one was created +``` + +Between the null assignment and `shutdown()`, a concurrent `_get_executor()` call (on another thread) sees `cls._EXECUTOR is None`, acquires the lock, and creates a new executor. When control returns, `executor.shutdown()` shuts down the old executor — but the new one was already created and returned to the concurrent caller. A small window, but technically a TOCTOU race. + +**Severity: Medium** + +### Z9: Generated test f-string uses `chr(34)` for embedded quotes — produces `SyntaxError` on Python < 3.12 + +**File:** `_gen_test.py:86-90` (generated output) + +The generated test assertion uses: +```python +info[chr(34)+chr(34)]diagnostic[chr(34)+chr(34)] +``` +which emits: `info["diagnostic"]` — correct. But the template uses nested f-string quotes inside `f"{...}..."` which only Python 3.12+ fully supports. The generated file itself is a plain Python file with `f"{...}"`, so it's correct — but the **template injection** via `chr(34)` is a code smell that could produce a `SyntaxError` in the generator on older Python. + +**Severity: Medium** + +### Z10: `launcher.py` uses Python 3.10+ `|` union syntax — no documented minimum Python version + +**File:** `launcher.py:298-299` + +```python +venue_mode: Optional[LauncherVenueMode | str] = None, +zinc_mode: Optional[LauncherZincMode | str] = None, +``` + +The `X | Y` union syntax requires Python 3.10+. Combined with similar usage in 3+ other files, the codebase requires 3.10+. But there's no `python_requires`, `pyproject.toml`, or `setup.py` that documents this. A developer on Python 3.9 gets a `SyntaxError` at import time. + +**Severity: Medium** + +### Z11: `real_control_plane.py` and `real_zinc_plane.py` duplicate `_encode_packet`/`_decode_packet` — format change must update both + +**Files:** `real_control_plane.py:43-67`, `real_zinc_plane.py:34-59` + +Both files contain identical copies of `_encode_packet()` and `_decode_packet()` (plus `_json_default()`). If the shared memory wire format changes (e.g., adding a version tag or CRC), both copies must be updated. + +**Severity: Low** + +### Z12: Concurrent `process_intent()` calls on same slot — no mutex, no queue, no serialization + +**File:** `rust_backend.py:710-795` + +`process_intent()` has no slot-level lock. Two concurrent calls with the same `slot_id` both call `self.venue.submit(intent)` and both write conflicting state to Zinc/projection. The Rust kernel's slot-busy check is race-adjacent — if both calls arrive before either completes, both pass the check, both submit to the exchange, and both overwrite slot state. + +In the async single-threaded event loop this doesn't trigger, but the `BingxVenueAdapter._run()` method runs on a thread pool, creating a window where a second coroutine can enter `process_intent()` while the first is awaiting the HTTP response. + +**Severity: Medium** + +### Z13: `test_alpha_blue_untouched_g7.py` uses hardcoded absolute path for source reading + +**File:** `test_alpha_blue_untouched_g7.py:34` + +```python +src = open("/mnt/dolphinng5_predict/prod/clean_arch/dita_v2/gen2.py").read() +``` + +Hardcoded absolute path. Breaks on any other machine. + +**Severity: Low** + +### Z14: No exchange timestamp monotonicity check in WebSocket stream — silent gap detection missing + +**File:** `bingx_user_stream.py` (entire file) + +The `_normalise()` function extracts `frame.get("E")` (exchange timestamp in ms) but it's only used as `exchange_ts` in `ExchangeEvent`. No code checks that timestamps are monotonically increasing or detects gaps. If the exchange restarts and resets its event counter, or if frames arrive out of order (network reordering), there's no detection. + +**Severity: Low** + +--- + +## Pass 23 Summary + +| # | Flaw | Layer | Severity | +|---|------|-------|----------| +| Z1 | `_safe_enum` fix applied to `rust_backend.py` but NOT `real_zinc_plane.py` — other copy crashes | Bridge | **High** | +| Z2 | `_backup_20260530/` has `__init__.py` — accidental import loads old code | Repo | Medium | +| Z3 | `RealZincControlUnavailable` and `RealZincUnavailable` separate classes | Bridge | Low | +| Z4 | `test_account_reconcile_faults.py` requires Rust lib with no skip guard | Test | Low | +| Z5 | No health check endpoint — silent failures invisible to orchestration | Ops | **High** | +| Z6 | `process_intent()` calls `venue.submit()` without exception handler | Bridge | **High** | +| Z7 | `snapshot()` mixes Rust and Python accounting — capital values can diverge | Bridge | Medium | +| Z8 | `BingxVenueAdapter.close()` executor null-to-shutdown TOCTOU race | Venue | Medium | +| Z9 | Generated test f-string `chr(34)` template — `SyntaxError` risk on old Python | Test | Medium | +| Z10 | `launcher.py` uses Python 3.10+ `\|` union syntax — no min version documented | Config | Medium | +| Z11 | `_encode_packet`/`_decode_packet` duplicated in two files | Plane | Low | +| Z12 | Concurrent `process_intent()` on same slot — no lock, no queue | Bridge | Medium | +| Z13 | `test_alpha_blue_untouched_g7.py` hardcoded absolute path | Test | Low | +| Z14 | No exchange timestamp monotonicity check in WS stream | Venue | Low | + +### Pass 23 Severity + +| Severity | Count | +|----------|-------| +| **High** | 3 (Z1, Z5, Z6) | +| Medium | 6 (Z2, Z7, Z8, Z9, Z10, Z12) | +| Low | 5 (Z3, Z4, Z11, Z13, Z14) | + +### Combined Catalog (All 23 Passes) + +| Pass | Focus | Count | Critical | High | Medium | Low | Info | +|------|-------|-------|----------|------|--------|-----|------| +| A | Architectural | 15 | 0 | 2 | 0 | 2 | 11 | +| T | Threading/Atomicity | 9 | 1 | 3 | 3 | 2 | 0 | +| E | E2E Trace (Pass 1) | 26 | 0 | 4 | 10 | 11 | 1 | +| F | Deep E2E (Pass 3) | 30 | 0 | 1 | 8 | 17 | 4 | +| G | Domain Scans (Pass 4) | 36 | 4 | 11 | 11 | 8 | 2 | +| H | Edge Domains (Pass 5) | 22 | 3 | 9 | 5 | 4 | 1 | +| I | Pass 6 (Math/Tests/Recovery/Security) | 22 | 3 | 11 | 4 | 2 | 2 | +| J | Pass 7 (Test Infra/Data/Rust/Env/Conn) | 16 | 0 | 7 | 7 | 2 | 0 | +| K | Pass 8 (Observability/Memory/Time/DeadCode) | 23 | 2 | 7 | 7 | 1 | 6 | +| L | Pass 9 (Contracts/Events/Network/FFI/Diffs) | 16 | 0 | 4 | 8 | 4 | 0 | +| M | Pass 10 (Runtime/TestBugs/FSM/Persistence/Metrics) | 18 | 3 | 7 | 5 | 3 | 0 | +| N | Pass 11 (Async/Sync Seams/Locks/Threading) | 10 | 4 | 1 | 3 | 1 | 1 | +| O | Pass 12 (Sync/Async Wider Scope) | 11 | 0 | 3 | 7 | 1 | 0 | +| P | Pass 13 (FFI Safety/Dangling Pointers/Coverage) | 9 | 1 | 3 | 3 | 1 | 1 | +| Q | Pass 14 (Serde Edges/Backup Diffs/Market Data) | 12 | 0 | 4 | 3 | 2 | 3 | +| R | Pass 15 (Resource Leaks/Trust Boundaries/Security) | 14 | 2 | 6 | 3 | 2 | 1 | +| S | Pass 16 (Error Handling/Arithmetic/Test Infra) | 16 | 4 | 7 | 5 | 0 | 0 | +| T | Pass 17 (Unsafe Review/Dead Code/Build/Protocols) | 14 | 0 | 5 | 5 | 4 | 0 | +| U | Pass 18 (Rust Test Gaps/Accounting/FFI Types) | 14 | 3 | 4 | 4 | 3 | 0 | +| V | Pass 19 (Lifecycle/Rust Subtleties/Test Infra) | 14 | 5 | 2 | 4 | 3 | 0 | +| W | Pass 20 (Config/Math Signs/BingX Protocol) | 14 | 4 | 7 | 3 | 0 | 0 | +| X | Pass 21 (Rust Build/Deps/Python Packaging/Shared Mem) | 14 | 3 | 5 | 6 | 0 | 0 | +| Y | Pass 22 (Serde Round-Trip/Mock Fidelity/Protocol) | 14 | 1 | 4 | 5 | 4 | 0 | +| Z | Pass 23 (Closure Review/Unfinished Fixes/Ops Gaps) | 14 | 0 | 3 | 6 | 5 | 0 | +| **Total** | | **403** | **43** | **120** | **120** | **64** | **37** | diff --git a/PINK_DITAv2_FLAW_ANALYSIS_2026-05-31.md b/PINK_DITAv2_FLAW_ANALYSIS_2026-05-31.md index fdb93aa..1875367 100644 --- a/PINK_DITAv2_FLAW_ANALYSIS_2026-05-31.md +++ b/PINK_DITAv2_FLAW_ANALYSIS_2026-05-31.md @@ -56,7 +56,8 @@ | W | Pass 20 (Config/Math Signs/BingX Protocol) | 14 | 4 | 7 | 3 | 0 | 0 | | X | Pass 21 (Rust Build/Deps/Python Packaging/Shared Mem) | 14 | 3 | 5 | 6 | 0 | 0 | | Y | Pass 22 (Serde Round-Trip/Mock Fidelity/Protocol) | 14 | 1 | 4 | 5 | 4 | 0 | -| **Total** | | **389** | **43** | **117** | **114** | **64** | **37** | +| Z | Pass 23 (Closure Review/Unfinished Fixes/Ops Gaps) | 14 | 0 | 3 | 6 | 5 | 0 | +| **Total** | | **403** | **43** | **120** | **120** | **64** | **37** | --- @@ -626,6 +627,29 @@ --- +## Z-Series: Closure Review — Unfinished Fixes, Operational Gaps, Final Edge Cases (Pass 23) + +*Full detail in TRACE doc under "PASS 23 — CLOSURE/REVIEW: UNFINISHED FIXES, OPERATIONAL GAPS, FINAL EDGE CASES."* + +| # | Flaw | Layer | Severity | +|---|------|-------|----------| +| Z1 | `_safe_enum` fix applied to `rust_backend.py` but NOT `real_zinc_plane.py` — other copy crashes | Bridge | **High** | +| Z2 | `_backup_20260530/` has `__init__.py` — accidental import loads old code | Repo | Medium | +| Z3 | `RealZincControlUnavailable` and `RealZincUnavailable` separate classes | Bridge | Low | +| Z4 | `test_account_reconcile_faults.py` requires Rust lib with no skip guard | Test | Low | +| Z5 | No health check endpoint — silent failures invisible to orchestration | Ops | **High** | +| Z6 | `process_intent()` calls `venue.submit()` without exception handler | Bridge | **High** | +| Z7 | `snapshot()` mixes Rust and Python accounting — capital values can diverge | Bridge | Medium | +| Z8 | `BingxVenueAdapter.close()` executor null-to-shutdown TOCTOU race | Venue | Medium | +| Z9 | Generated test f-string `chr(34)` template — SyntaxError risk on old Python | Test | Medium | +| Z10 | `launcher.py` uses Python 3.10+ `\|` union syntax — no min version documented | Config | Medium | +| Z11 | `_encode_packet`/`_decode_packet` duplicated in two files | Plane | Low | +| Z12 | Concurrent `process_intent()` on same slot — no lock, no queue | Bridge | Medium | +| Z13 | `test_alpha_blue_untouched_g7.py` hardcoded absolute path | Test | Low | +| Z14 | No exchange timestamp monotonicity check in WS stream | Venue | Low | + +--- + ## H-Series: Edge Domains — Dependencies, Error Handling, Types, Contracts (Pass 5) *Full detail in TRACE doc under "PASS 5 — EDGE DOMAINS."*