#!/usr/bin/env python3 """DOLPHIN PINK live status — v7 0.5s poll. SIG/TRD/FIL gear rows + last-5-trades + CH persistence + V7 exit cmp. HOTKEYS: [h] help [1] reduce 50% [2] reduce 75% [3] close all [s] STALKER [a] APEX Note: posture hotkeys are disabled by default in PINK isolation mode. Run: source /home/dolphin/siloqy_env/bin/activate && python dolphin_status_pink.py Quit: Ctrl-C """ # v1–v5 archived as dolphin_status_v{1..5}.py # v6: exit-comparison overlay (V7 preview), net-pnl pct fix # v7: hotkey system — position reduce, posture switch, CH audit trail import hashlib, hmac, json, os, re, select, threading, time, sys, uuid import termios, tty, urllib.request, urllib.parse from collections import deque from datetime import datetime, timezone from pathlib import Path import hazelcast def _float_env(name: str, default: float) -> float: raw = os.environ.get(name, "") try: val = float(raw) if val > 0: return val except Exception: pass return default # ── PINK runtime bindings ────────────────────────────────────────────────────── PINK_CH_DB = os.environ.get("DOLPHIN_TUI_CH_DB", "dolphin_pink") PINK_STRATEGY = os.environ.get("DOLPHIN_TUI_STRATEGY", "pink") PINK_STATE_MAP = os.environ.get("DOLPHIN_TUI_STATE_MAP", "DOLPHIN_STATE_PINK") PINK_RUNTIME_COMMAND_KEY = os.environ.get("DOLPHIN_TUI_RUNTIME_COMMAND_KEY", "pink_runtime_commands") PINK_SAFETY_MAP = os.environ.get("DOLPHIN_TUI_SAFETY_MAP", "DOLPHIN_SAFETY") PINK_HEARTBEAT_MAP = os.environ.get("DOLPHIN_TUI_HEARTBEAT_MAP", "DOLPHIN_HEARTBEAT") PINK_HEARTBEAT_KEY = os.environ.get("DOLPHIN_TUI_HEARTBEAT_KEY", "nautilus_flow_heartbeat") PINK_META_HEALTH_MAP = os.environ.get("DOLPHIN_TUI_META_HEALTH_MAP", "DOLPHIN_META_HEALTH") PINK_ANNOUNCEMENTS_MAP = os.environ.get("DOLPHIN_TUI_ANNOUNCEMENTS_MAP", "DOLPHIN_ANNOUNCEMENTS") PINK_FEATURES_MAP = os.environ.get("DOLPHIN_TUI_FEATURES_MAP", "DOLPHIN_FEATURES") PINK_ALLOW_GLOBAL_POSTURE_HOTKEYS = str( os.environ.get("DOLPHIN_PINK_TUI_ENABLE_GLOBAL_POSTURE", "0") ).strip().lower() in {"1", "true", "yes", "on"} # ── ClickHouse fire-and-forget write ───────────────────────────────────────── _CH_URL = "http://localhost:8123" _CH_USER = "dolphin" _CH_PASS = "dolphin_ch_2026" _CH_Q: deque = deque(maxlen=500) _CH_TABLE_Q: dict[str, deque] = {} # table_name → deque of rows _CH_TABLE_LOCK = threading.Lock() def _ch_flush_table(table: str, rows: list): if not rows: return body = "\n".join(json.dumps(r) for r in rows).encode() url = f"{_CH_URL}/?database={PINK_CH_DB}&query=INSERT+INTO+{table}+FORMAT+JSONEachRow" req = urllib.request.Request(url, data=body, method="POST") req.add_header("X-ClickHouse-User", _CH_USER) req.add_header("X-ClickHouse-Key", _CH_PASS) req.add_header("Content-Type", "application/octet-stream") try: urllib.request.urlopen(req, timeout=4) except Exception: pass def _ch_worker(): while True: time.sleep(2) rows = [] while _CH_Q: try: rows.append(_CH_Q.popleft()) except IndexError: break if rows: _ch_flush_table("status_snapshots", rows) with _CH_TABLE_LOCK: tables = {t: list(q) for t, q in _CH_TABLE_Q.items() if q} for q in _CH_TABLE_Q.values(): q.clear() for table, trows in tables.items(): _ch_flush_table(table, trows) threading.Thread(target=_ch_worker, daemon=True, name="ch-status-pink").start() def ch_put(row: dict, table: str | None = None): if table: with _CH_TABLE_LOCK: if table not in _CH_TABLE_Q: _CH_TABLE_Q[table] = deque(maxlen=500) _CH_TABLE_Q[table].append(row) else: _CH_Q.append(row) # ── Trade panel sources ─────────────────────────────────────────────────────── def _parse_path_env(name: str) -> tuple[Path, ...]: """Parse a path list env var using common separators.""" raw = os.environ.get(name, "").strip() if not raw: return () paths = [] for chunk in re.split(r"[:;,]", raw): item = chunk.strip() if item: paths.append(Path(item)) return tuple(paths) _TRADER_LOG_CANDIDATES = tuple( dict.fromkeys( [ *_parse_path_env("DOLPHIN_TUI_TRADER_LOG_PATHS"), Path("/tmp/dolphin_logs/supervisor/dolphin_live_pink.log"), Path("/root/dolphin_logs/supervisor/dolphin_live_pink.log"), Path("/mnt/dolphinng5_predict/prod/supervisor/logs/dolphin_live_pink.log"), ] ) ) # Capture the JSON dict only — stop at first } closing the payload. # Lines may have a trailing tag like [v2_gold_fix_v50-v750] after the dict. _RE_ENTRY = re.compile(r"\[(.+?)\] ENTRY: (\{.+?\})(?:\s*\[.*\])?$") _RE_EXIT = re.compile(r"\[(.+?)\] (?:SUBDAY_)?EXIT: (\{.+?\})(?:\s*\[.*\])?$") _TRADE_PANEL_CACHE = {"ts": 0.0, "open": [], "closed": [], "fills": []} _ACCOUNT_EVENT_CACHE = {"ts": 0.0, "row": None} def _normalize_exit_reason(reason: str) -> str: text = str(reason or "").strip() if text == "V7_MAE_SL_VOL_NORM": return "V7.1_MAE_SL_VOL_NORM" return text def _resolve_trader_log() -> Path | None: """Pick the freshest available supervisor log across known mount layouts.""" best_path = None best_key = None for path in _TRADER_LOG_CANDIDATES: try: stat = path.stat() except OSError: continue if stat.st_size <= 0: continue key = (stat.st_mtime, stat.st_size) if best_key is None or key > best_key: best_key = key best_path = path return best_path def _parse_log_dict(raw: str) -> dict: """Parse a Python dict repr from a log line. Handles nan and single-quoted strings.""" import ast # Replace nan/inf with JSON-safe equivalents before parsing cleaned = raw.replace(": nan", ": null").replace(": inf", ": null").replace(": -inf", ": null") try: return ast.literal_eval(raw) # handles all Python literal forms incl. nan except Exception: pass try: return json.loads(cleaned.replace("'", '"')) except Exception: raise ValueError(f"unparseable: {raw[:80]}") def _ch_query(sql: str, *, database: str = PINK_CH_DB) -> list[dict]: """Execute a ClickHouse query and return JSONEachRow dicts.""" try: query = urllib.parse.quote(f"{sql} FORMAT JSONEachRow") url = f"{_CH_URL}/?database={database}&query={query}" req = urllib.request.Request(url) req.add_header("X-ClickHouse-User", _CH_USER) req.add_header("X-ClickHouse-Key", _CH_PASS) with urllib.request.urlopen(req, timeout=5) as resp: body = resp.read().decode("utf-8").strip() if not body: return [] return [json.loads(line) for line in body.splitlines() if line.strip()] except Exception: return [] def _position_key(row: dict) -> tuple: trade_id = str(row.get("trade_id") or "").strip() if trade_id: return ("trade_id", trade_id) asset = str(row.get("asset") or "").strip() entry = row.get("entry_price") try: if asset and entry is not None: return ("asset_entry", asset, round(float(entry), 8)) except Exception: pass if asset: return ("asset", asset) return ("unknown", id(row)) def _fetch_open_positions_from_ch(limit: int = 5) -> list[dict]: rows = _ch_query( f""" SELECT trade_id, asset, direction, entry_price, quantity, notional, leverage, bucket_id, bars_held, last_ts FROM ( SELECT trade_id, argMax(asset, ts) AS asset, argMax(direction, ts) AS direction, argMax(entry_price, ts) AS entry_price, argMax(quantity, ts) AS quantity, argMax(notional, ts) AS notional, argMax(leverage, ts) AS leverage, argMax(bucket_id, ts) AS bucket_id, argMax(bars_held, ts) AS bars_held, argMax(status, ts) AS status, max(ts) AS last_ts FROM {PINK_CH_DB}.position_state GROUP BY trade_id ) WHERE status = 'OPEN' ORDER BY last_ts DESC LIMIT {int(limit)} """ ) return rows def _fetch_latest_account_event_from_ch() -> dict | None: now = time.time() cached = _ACCOUNT_EVENT_CACHE if now - float(cached.get("ts", 0.0) or 0.0) < 0.75: row = cached.get("row") return dict(row) if isinstance(row, dict) else None rows = _ch_query( f""" SELECT ts, event_type, strategy, capital, trades_today, open_positions, current_open_notional, current_account_leverage, notes FROM {PINK_CH_DB}.account_events ORDER BY ts DESC LIMIT 5 """ ) row = None if rows: row = rows[0] for candidate in rows: try: if float(candidate.get("current_open_notional", 0) or 0) > 0: row = candidate break except Exception: continue cached["ts"] = now cached["row"] = dict(row) if isinstance(row, dict) else None return dict(row) if isinstance(row, dict) else None def _account_event_payload(row: dict | None) -> dict: if not isinstance(row, dict): return {} raw = str(row.get("notes") or "").strip() if not raw: return {} try: payload = json.loads(raw) if isinstance(payload, dict): inner = payload.get("payload") if isinstance(inner, dict): return inner return payload except Exception: return {} return {} def _account_event_positions(row: dict | None) -> list[dict]: payload = _account_event_payload(row) positions = payload.get("positions") if isinstance(payload, dict) else {} if not isinstance(positions, dict): return [] out: list[dict] = [] for symbol, pos in positions.items(): if not isinstance(pos, dict): continue asset = str(pos.get("symbol") or symbol or "").replace("-", "") if not asset: continue side = str(pos.get("positionSide") or pos.get("side") or "SHORT").upper() qty = float(pos.get("positionAmt", 0) or 0) entry = float(pos.get("avgPrice", 0) or 0) mark = float(pos.get("markPrice", 0) or 0) upnl = float(pos.get("unrealizedProfit", 0) or 0) notional = float(pos.get("positionValue", 0) or 0) lev = float(pos.get("leverage", 0) or 0) out.append({ "asset": asset, "side": side, "quantity": abs(qty), "entry_price": entry, "current_price": mark, "unrealized_pnl": upnl, "notional": notional, "leverage": lev, "direction": 1 if side == "LONG" else -1, "trade_id": str(pos.get("positionId") or symbol or asset), "source": "bingx_account_events", }) return out def _account_event_fills(row: dict | None, limit: int = 30) -> list[dict]: payload = _account_event_payload(row) fills = payload.get("fills") if isinstance(payload, dict) else [] if not isinstance(fills, list): return [] out: list[dict] = [] for item in fills[-limit:]: if not isinstance(item, dict): continue fill_row = item.get("row") if isinstance(item.get("row"), dict) else item friction = item.get("friction") if isinstance(item.get("friction"), dict) else fill_row.get("friction", {}) symbol = str(fill_row.get("symbol") or fill_row.get("asset") or item.get("asset") or "?") asset = symbol.replace("-", "") side = str(fill_row.get("side") or item.get("side") or "?").upper() px = float(fill_row.get("avgPrice", fill_row.get("price", 0)) or 0) qty = float(fill_row.get("executedQty", fill_row.get("quantity", 0)) or 0) trade_key = str(item.get("_trade_key") or fill_row.get("orderId") or fill_row.get("clientOrderId") or asset) pnl = float(item.get("pnl", fill_row.get("pnl", 0)) or 0) out.append({ "trade_id": trade_key, "ts": str(row.get("ts") or ""), "asset": asset, "side": side, "entry_price": px, "exit_price": px, "quantity": qty, "net_pnl": pnl, "pnl_pct": float(fill_row.get("pnlPct", item.get("pnl_pct", 0)) or item.get("pnl_pct", 0) or 0), "reason": str(fill_row.get("status") or item.get("status") or "FILL"), "bars_held": item.get("bars_held", 0), "source": "bingx_account_events", "liquidity_side": friction.get("liquidity_side"), "fill_quality_score": friction.get("fill_quality_score"), }) return out def _account_event_fill_count(row: dict | None) -> int: payload = _account_event_payload(row) fills = payload.get("fills") if isinstance(payload, dict) else [] if not isinstance(fills, list): return 0 return sum(1 for item in fills if isinstance(item, dict)) def _fetch_recent_account_fills_from_ch(limit: int = 30) -> list[dict]: rows = _ch_query( f""" SELECT ts, event_type, strategy, notes FROM {PINK_CH_DB}.account_events WHERE strategy = '{PINK_STRATEGY}' ORDER BY ts DESC LIMIT {int(max(limit, 1))} """ ) if not rows: return [] fills: list[dict] = [] for row in rows: fills.extend(_account_event_fills(row, limit=limit)) if len(fills) >= limit: break return fills[:limit] def _overlay_account_event_state(eng: dict) -> tuple[dict, dict | None]: """Merge the live BingX account snapshot into the HZ engine snapshot. PINK's HZ `engine_snapshot` can lag the exchange ledger. When BingX account events have a fresher view of open positions, use them for display and gating while preserving the rest of the engine snapshot. """ merged = dict(eng or {}) account_row = _fetch_latest_account_event_from_ch() live_positions = _account_event_positions(account_row) if account_row: capital = float(account_row.get("capital", merged.get("capital", 0)) or merged.get("capital", 0) or 0) merged["capital"] = capital merged["account_capital"] = capital merged["portfolio_capital"] = capital merged["ledger_authority"] = "exchange" merged["current_open_notional"] = float( account_row.get("current_open_notional", merged.get("current_open_notional", 0)) or 0 ) merged["current_account_leverage"] = float( account_row.get("current_account_leverage", merged.get("current_account_leverage", 0)) or 0 ) return merged, account_row def _fetch_closed_trades_from_ch(limit: int = 30) -> list[dict]: rows = _ch_query( f""" SELECT ts, trade_id, asset, side, entry_price, exit_price, quantity, pnl, pnl_pct, exit_reason, leverage, bars_held, scan_uuid, capital_before, capital_after FROM {PINK_CH_DB}.trade_events WHERE strategy = '{PINK_STRATEGY}' ORDER BY ts DESC LIMIT {int(limit)} """ ) return rows def _fetch_closed_trades_from_log(limit: int = 30) -> list[dict]: trades = _last_n_trades(limit) for row in trades: row.setdefault("source", "log") return trades def _augment_open_positions(open_rows: list[dict], eng_open: list[dict]) -> list[dict]: if not open_rows: return [] supplemental = {} for row in eng_open or []: supplemental[_position_key(row)] = row asset = str(row.get("asset") or "").strip() if asset: supplemental.setdefault(("asset", asset), row) merged = [] for row in open_rows: out = dict(row) extra = supplemental.get(_position_key(row)) if extra is None: asset = str(row.get("asset") or "").strip() if asset: extra = supplemental.get(("asset", asset)) if extra: for key in ("side", "unrealized_pnl", "current_price", "entry_price", "quantity", "notional", "leverage"): if key in extra and (key not in out or out.get(key) in (None, "", 0, 0.0)): out[key] = extra.get(key) if "trade_id" not in out or not out.get("trade_id"): if extra.get("trade_id"): out["trade_id"] = extra.get("trade_id") merged.append(out) return merged def _mark_price_from_obf(asset: str, obf: dict | None) -> float: if not asset or not isinstance(obf, dict): return 0.0 row = obf.get(asset) or (obf.get("assets") or {}).get(asset) or {} if not row and "-" not in asset: row = obf.get(_bingx_venue_symbol(asset)) or (obf.get("assets") or {}).get(_bingx_venue_symbol(asset)) or {} try: bid = float(row.get("best_bid", 0) or 0) ask = float(row.get("best_ask", 0) or 0) if bid > 0 and ask > 0: return (bid + ask) / 2 if bid > 0: return bid if ask > 0: return ask except Exception: return 0.0 return 0.0 def _derive_open_position_pnl(row: dict, obf: dict | None) -> tuple[float, float]: entry = float(row.get("entry_price", 0) or 0) current = float(row.get("current_price", 0) or 0) if current <= 0: current = _mark_price_from_obf(str(row.get("asset") or ""), obf) qty = float(row.get("quantity", 0) or 0) notional = float(row.get("notional", 0) or 0) if qty <= 0 and entry > 0 and notional > 0: qty = notional / entry side = str(row.get("side") or "").upper() direction = -1 if side == "SHORT" or int(row.get("direction", -1) or -1) == -1 else 1 upnl = float(row.get("unrealized_pnl", 0) or 0) if upnl == 0 and entry > 0 and current > 0 and qty > 0: upnl = ((entry - current) * qty) if direction == -1 else ((current - entry) * qty) return upnl, current def _trade_panel_rows(eng, obf: dict | None, limit: int = 30) -> tuple[list[dict], list[dict], list[dict], str]: now = time.time() cached = _TRADE_PANEL_CACHE if now - float(cached.get("ts", 0.0) or 0.0) < 0.75: return ( list(cached.get("open", [])), list(cached.get("closed", [])), list(cached.get("fills", [])), str(cached.get("source", "cache")), ) account_row = _fetch_latest_account_event_from_ch() account_open_rows = _account_event_positions(account_row) open_rows = _fetch_open_positions_from_ch(limit=5) closed_rows = _fetch_closed_trades_from_ch(limit=limit) source = "clickhouse" eng_open = [] try: eng_open = eng.get("open_positions") or [] except Exception: eng_open = [] if account_open_rows: # BingX account events reflect the live venue state even if HZ is stale. open_rows = _augment_open_positions(account_open_rows, eng_open) source = "bingx_account_events" elif open_rows: open_rows = _augment_open_positions(open_rows, eng_open) for row in open_rows: upnl, current = _derive_open_position_pnl(row, obf) if current > 0 and not row.get("current_price"): row["current_price"] = current if upnl != 0 or not row.get("unrealized_pnl"): row["unrealized_pnl"] = upnl else: # Preserve the live HZ snapshot if CH is briefly unavailable. open_rows = list(eng_open or []) source = "hazelcast" if not closed_rows: account_fills = _fetch_recent_account_fills_from_ch(limit) if account_fills: source = "bingx_account_events" else: log_rows = _fetch_closed_trades_from_log(limit) if log_rows: closed_rows = log_rows source = "clickhouse+log" if source == "clickhouse" else "hazelcast+log" else: account_fills = [] cached["ts"] = now cached["open"] = list(open_rows) cached["closed"] = list(closed_rows) cached["fills"] = list(account_fills) cached["source"] = source return open_rows, closed_rows, list(account_fills), source def _summarize_closed_trades(rows: list[dict]) -> dict: pnl_vals = [] notional_vals = [] wins = 0 losses = 0 for row in rows or []: pnl = float(row.get("net_pnl", row.get("pnl", 0)) or 0) notional = float(row.get("notional", 0) or 0) pnl_vals.append(pnl) notional_vals.append(notional) if pnl >= 0: wins += 1 else: losses += 1 total_pnl = sum(pnl_vals) total_notional = sum(notional_vals) total_pct = (total_pnl / total_notional * 100) if total_notional else 0.0 return { "n": len(rows or []), "wins": wins, "losses": losses, "pnl": total_pnl, "pct": total_pct, } def _closed_trade_display_pnl(row: dict) -> float: """Choose a sane USD PnL for closed-trade display. Prefer a signed mark-to-mark/close calculation from entry/exit/quantity. Some log rows carry a near-zero or inconsistent `net_pnl`, and `pnl_pct` may be magnitude-only, so the direct price delta is the most reliable display source. """ net_pnl = float(row.get("net_pnl", row.get("pnl", 0)) or 0) entry = float(row.get("entry_price", 0) or 0) exit_ = float(row.get("exit_price", 0) or 0) qty = float(row.get("quantity", 0) or 0) side = str(row.get("side") or "").upper() direction = int(row.get("direction", -1) or -1) if entry > 0 and exit_ > 0 and qty > 0: if side == "LONG" or direction == 1: return (exit_ - entry) * qty return (entry - exit_) * qty notional = float(row.get("notional", 0) or 0) pnl_pct = float(row.get("pnl_pct", 0) or 0) expected = pnl_pct * notional if notional > 0 else 0.0 if expected != 0 and abs(net_pnl) < max(1e-6, abs(expected) * 0.05): return expected return net_pnl def _sys_health_sensor_rows(mh: dict) -> list[tuple[str, float]]: sensors = [] for key, value in (mh or {}).items(): if not str(key).startswith("m"): continue try: idx = int(str(key)[1:].split("_", 1)[0]) except Exception: continue try: sensors.append((idx, str(key), float(value))) except Exception: continue sensors.sort(key=lambda item: (item[0], item[1])) return [(name, val) for _, name, val in sensors] def _format_two_column_sensor_lines(rows: list[tuple[str, float]]) -> list[str]: if not rows: return [] out: list[str] = [] pairs = [rows[i:i + 2] for i in range(0, len(rows), 2)] for pair in pairs: formatted = [] for name, value in pair: color = GREEN if value >= 0.9 else (YELLOW if value >= 0.5 else RED) formatted.append(f"{color}{name}:{value:.3f}{RST}") if len(formatted) == 2: out.append(f" {formatted[0]} {formatted[1]}") else: out.append(f" {formatted[0]}") return out def _last_n_trades(n=5): """Parse last N completed trades from supervisor log. Returns list of dicts.""" trader_log = _resolve_trader_log() if trader_log is None: return [] try: lines = trader_log.read_text(errors="replace").splitlines()[-4000:] except Exception: return [] entries = {} trades = [] for line in lines: m = _RE_ENTRY.search(line) if m: try: d = _parse_log_dict(m.group(2)) entries[d["trade_id"]] = {"ts": m.group(1), **d} except Exception: pass m = _RE_EXIT.search(line) if m: try: d = _parse_log_dict(m.group(2)) tid = d.get("trade_id") if not tid: continue e = entries.pop(tid, {}) trades.append({ "trade_id": tid, "ts": e.get("ts", m.group(1)), "asset": e.get("asset", d.get("asset", "?")), "entry_price": e.get("entry_price", d.get("entry_price", 0)), "leverage": e.get("leverage", d.get("leverage", 0)), "notional": e.get("notional", d.get("notional", 0)), "direction": e.get("direction", d.get("direction", 0)), "exit_ts": m.group(1), "reason": _normalize_exit_reason(d.get("reason", "?")), "pnl_pct": d.get("pnl_pct", 0), "net_pnl": d.get("net_pnl", 0), "bars_held": d.get("bars_held", 0), }) except Exception: pass return trades[-n:] CLEAR = "\033[2J\033[H" BOLD = "\033[1m"; DIM = "\033[2m"; RST = "\033[0m" GREEN = "\033[32m"; YELLOW = "\033[33m"; RED = "\033[31m"; CYAN = "\033[36m" ORANGE = "\033[38;5;208m" PC = {"APEX": GREEN, "STALKER": YELLOW, "TURTLE": ORANGE, "HIBERNATE": RED} SC = {"GREEN": GREEN, "DEGRADED": YELLOW, "CRITICAL": ORANGE, "DEAD": RED} # Thresholds from nautilus_event_trader.py VEL_DIV_THRESHOLD = -0.020 # signal fires when vel_div < this VEL_DIV_EXTREME = -0.050 # extreme bearish VEL_DIV_WARN = -0.010 # approaching threshold (yellow) VEL_DIV_CLOSE = -0.015 # nearly there (orange→yellow) VOL_P60 = _float_env("DOLPHIN_PINK_VOL_P60_THRESHOLD", 0.00008000) BTC_VOL_WINDOW = 50 # bars used for vol calc FIXED_TP_PCT = _float_env("DOLPHIN_FIXED_TP_PCT", 0.0020) # PINK TP target (0.20%) MAX_HOLD_BARS = 250 # PINK max hold bars TRADER_HB_STALE_S = 30.0 TRADER_ENGINE_STALE_S = 90.0 START_CAP = None CAP_PEAK = None _EXIT_TRACKER: dict = {} # (asset, entry_price) → accumulated V7 comparison state _HOTKEY_FEEDBACK: deque = deque(maxlen=5) # recent hotkey results for display # ── BingX signing (sync, for hotkey-driven REST calls) ──────────────────────── def _bingx_sign(params: dict, secret_key: str) -> dict: ordered = {k: v for k, v in sorted(params.items()) if v is not None and v != ""} query = urllib.parse.urlencode(ordered) sig = hmac.new(secret_key.encode(), query.encode(), hashlib.sha256).hexdigest() return {**ordered, "signature": sig} _BINGX_API_KEY = os.environ.get("BINGX_API_KEY", "") _BINGX_SECRET_KEY = os.environ.get("BINGX_SECRET_KEY", "") _BINGX_RECV_WINDOW = 5000 def _bingx_env_base_url() -> str: env = os.environ.get("DOLPHIN_BINGX_ENV", "VST").strip().upper() if env == "LIVE": return "https://open-api.bingx.com" return "https://open-api-vst.bingx.com" def _bingx_rest_post(path: str, params: dict) -> dict: """Synchronous signed POST to BingX. Returns parsed JSON or raises.""" base = _bingx_env_base_url() signed = _bingx_sign({ **params, "timestamp": int(time.time() * 1000), "recvWindow": _BINGX_RECV_WINDOW, }, _BINGX_SECRET_KEY) url = f"{base}{path}?{urllib.parse.urlencode(signed)}" req = urllib.request.Request(url, method="POST") req.add_header("X-BX-APIKEY", _BINGX_API_KEY) req.add_header("Content-Type", "application/x-www-form-urlencoded") with urllib.request.urlopen(req, timeout=10) as resp: return json.loads(resp.read()) def _bingx_venue_symbol(asset: str) -> str: if "-" in asset: return asset if asset.endswith("USDT"): return f"{asset[:-4]}-USDT" return asset # ── DITAv2 kernel state (does NOT use Hazelcast) ────────────────────────────── _KERNEL_SNAP_PATH = Path("/tmp/.pink_kernel_state.json") _DITAV2_ACCOUNT_CACHE: dict = {"ts": 0.0, "data": None} def _read_ditav2_kernel_snapshot() -> dict: """Read the last saved DITAv2 kernel state from disk (non-blocking).""" try: if not _KERNEL_SNAP_PATH.exists(): return {} raw = _KERNEL_SNAP_PATH.read_text(encoding="utf-8") data = json.loads(raw) data["_snap_mtime"] = _KERNEL_SNAP_PATH.stat().st_mtime return data except Exception: return {} def _bingx_rest_get(path: str, params: dict) -> dict: """Synchronous signed GET to BingX. Returns parsed JSON or {}.""" try: base = _bingx_env_base_url() signed = _bingx_sign( {**params, "timestamp": int(time.time() * 1000), "recvWindow": _BINGX_RECV_WINDOW}, _BINGX_SECRET_KEY, ) url = f"{base}{path}?{urllib.parse.urlencode(signed)}" req = urllib.request.Request(url, method="GET") req.add_header("X-BX-APIKEY", _BINGX_API_KEY) with urllib.request.urlopen(req, timeout=5) as resp: return json.loads(resp.read()) except Exception: return {} def _fetch_ditav2_bingx_account() -> dict: """Cache-throttled live BingX VST balance query (max 1 call / 5 s).""" now = time.time() cached = _DITAV2_ACCOUNT_CACHE if now - float(cached.get("ts", 0.0)) < 5.0: return cached.get("data") or {} if not _BINGX_API_KEY or not _BINGX_SECRET_KEY: cached["ts"] = now cached["data"] = {} return {} raw = _bingx_rest_get("/openApi/swap/v2/user/balance", {}) bal = (raw.get("data") or {}).get("balance") or {} cached["ts"] = now cached["data"] = bal return bal def _render_ditav2_section() -> str: """Render the DITAv2 PINK kernel + BingX account section.""" snap = _read_ditav2_kernel_snapshot() bal = _fetch_ditav2_bingx_account() L = [] L.append(f"{BOLD}{CYAN}━━ DITAv2 PINK [kernel + BingX VST]{RST}") if not snap: L.append(f" {DIM}kernel snapshot not found at {_KERNEL_SNAP_PATH}{RST}") L.append(f" {DIM}(PINK not started, or no clean shutdown after first trade){RST}") else: snap_mtime = snap.get("_snap_mtime", 0) snap_age = time.time() - snap_mtime if snap_mtime else None snap_age_s = _age_seconds(snap_age) if snap_age is not None else "?" snap_ts_ms = snap.get("snapshot_ts_ms", 0) snap_dt = ( datetime.fromtimestamp(snap_ts_ms / 1000.0, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") if snap_ts_ms else "?" ) frozen = snap.get("capital_frozen", False) frozen_col = RED if frozen else GREEN frozen_s = f"{RED}FROZEN{RST}" if frozen else f"{GREEN}OK{RST}" acc = snap.get("account") or {} capital = float(acc.get("capital") or acc.get("equity") or 0.0) equity = float(acc.get("equity") or capital) avail = float(acc.get("available_capital") or capital) open_pos = int(acc.get("open_positions") or 0) trade_seq = int(acc.get("trade_seq") or 0) realized = float(acc.get("realized_pnl_total") or 0.0) fees = float(acc.get("fee_total") or 0.0) fee_cal = snap.get("fee_calibration") or {} fc_status = str(fee_cal.get("calibration_status") or "–") fc_col = GREEN if fc_status == "OK" else (YELLOW if fc_status == "WARN" else RED) fc_ratio = float(fee_cal.get("calibration_ratio") or 1.0) fc_taker = float(fee_cal.get("taker_rate") or 0.0005) fc_maker = float(fee_cal.get("maker_rate") or 0.0002) slots = snap.get("slots") or [] L.append( f" {BOLD}KERNEL{RST} snap:{snap_dt} age:{snap_age_s} frozen:{frozen_s}" ) L.append( f" capital:${capital:,.2f} equity:${equity:,.2f} avail:${avail:,.2f}" f" open:{open_pos} trades:{trade_seq}" ) rpnl_col = GREEN if realized >= 0 else RED L.append( f" realized:{rpnl_col}{realized:+.4f}{RST} fees:{fees:+.4f}" f" fee_cal: taker:{fc_taker*100:.3f}% maker:{fc_maker*100:.3f}%" f" ratio:{fc_ratio:.4f} {fc_col}[{fc_status}]{RST}" ) for slot in slots: fsm = str(slot.get("fsm_state") or "IDLE") asset = str(slot.get("asset") or "–") side = str(slot.get("side") or "–") size = float(slot.get("size") or 0.0) ep = float(slot.get("entry_price") or 0.0) rpnl = float(slot.get("realized_pnl") or 0.0) upnl = float(slot.get("unrealized_pnl") or 0.0) sid = int(slot.get("slot_id") or 0) fsm_col = ( GREEN if fsm == "POSITION_OPEN" else YELLOW if fsm in ("EXIT_REQUESTED", "EXIT_SENT", "EXIT_WORKING", "ENTRY_REQUESTED", "ENTRY_SENT", "ENTRY_WORKING") else RED if fsm in ("ERROR", "CLOSED") else DIM ) pnl_col = GREEN if (rpnl + upnl) >= 0 else RED L.append( f" slot[{sid}] {fsm_col}{fsm:<20}{RST} {asset} {side}" f" size:{size:.4f} ep:{ep:.4g}" f" pnl:{pnl_col}{(rpnl+upnl):+.4f}{RST}" ) # ── Live BingX account ──────────────────────────────────────────────── env_s = os.environ.get("DOLPHIN_BINGX_ENV", "VST") if not bal: creds_ok = bool(_BINGX_API_KEY and _BINGX_SECRET_KEY) reason = "no credentials" if not creds_ok else "query failed / no data" L.append(f" {BOLD}BINGX [{env_s}]{RST} {DIM}{reason}{RST}") else: b_wallet = float(bal.get("balance") or bal.get("walletBalance") or 0.0) b_equity = float(bal.get("equity") or b_wallet) b_avail = float(bal.get("availableMargin") or b_wallet) b_used = float(bal.get("usedMargin") or 0.0) b_upnl = float(bal.get("unrealizedProfit") or 0.0) b_real = float(bal.get("realisedProfit") or bal.get("realizedProfit") or 0.0) upnl_col = GREEN if b_upnl >= 0 else RED real_col = GREEN if b_real >= 0 else RED L.append( f" {BOLD}BINGX [{env_s}]{RST}" f" wallet:${b_wallet:,.2f} equity:${b_equity:,.2f}" f" avail:${b_avail:,.2f} used:${b_used:,.2f}" f" upnl:{upnl_col}{b_upnl:+.2f}{RST}" f" realized:{real_col}{b_real:+.2f}{RST}" ) L.append(f"{BOLD}{CYAN}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━{RST}") return "\n".join(L) # ── Hotkey: CH audit ────────────────────────────────────────────────────────── def _hotkey_audit(action: str, request: dict, result: str, effect: dict | None = None): ch_put({ "ts": int(time.time() * 1000), "hotkey": action, "request_json": json.dumps(request, default=str), "result": result, "effect_json": json.dumps(effect or {}, default=str), }, table="hotkey_audit") # ── Hotkey actions ──────────────────────────────────────────────────────────── def _hotkey_reduce_position(fraction: float, hz) -> str: """Submit internal PINK retraction command (fractional exit) via HZ control plane.""" if fraction <= 0 or fraction > 1: return f"invalid fraction {fraction}" eng = _get(hz, PINK_STATE_MAP, "engine_snapshot") positions = eng.get("open_positions") or [] if not positions: _hotkey_audit(f"reduce_{fraction:.0%}", {"fraction": fraction}, "NO_POSITION") return "no open position" pos = positions[0] asset = pos.get("asset", "?") side = str(pos.get("side", "?")).upper() trade_id = str(pos.get("trade_id", "") or "").strip() qty = float(pos.get("quantity", 0) or 0) entry = float(pos.get("entry_price", 0) or 0) chain_root_trade_id = str(pos.get("chain_root_trade_id", trade_id) or trade_id).strip() chain_head_leg_id = str(pos.get("chain_head_leg_id", f"{trade_id}:open") or f"{trade_id}:open").strip() chain_prev_leg_id = str(pos.get("chain_prev_leg_id", "") or "").strip() chain_seq = int(pos.get("chain_seq", pos.get("retraction_legs", 0)) or 0) chain_token = str(pos.get("chain_token", "") or "").strip() if not trade_id: _hotkey_audit(f"reduce_{fraction:.0%}", {"fraction": fraction}, "NO_TRADE_ID") return "no trade_id" if not chain_root_trade_id or not chain_head_leg_id or not chain_token: _hotkey_audit( f"reduce_{fraction:.0%}", {"fraction": fraction, "trade_id": trade_id, "chain_root_trade_id": chain_root_trade_id, "chain_head_leg_id": chain_head_leg_id, "chain_prev_leg_id": chain_prev_leg_id, "chain_seq": chain_seq}, "NO_CHAIN_LINK", ) return "no chain link" if qty <= 0: _hotkey_audit(f"reduce_{fraction:.0%}", {"fraction": fraction}, "ZERO_QTY") return "zero quantity" command_id = f"hk-retract-{uuid.uuid4().hex[:16]}" request = { "action": f"reduce_{fraction:.0%}", "asset": asset, "side": side, "requested_fraction": float(fraction), "entry_price": entry, "command_id": command_id, "trade_id": trade_id, "chain_root_trade_id": chain_root_trade_id, "chain_head_leg_id": chain_head_leg_id, "chain_prev_leg_id": chain_prev_leg_id, "chain_seq": chain_seq, } try: control_map = hz.get_map("DOLPHIN_CONTROL_PLANE").blocking() key = PINK_RUNTIME_COMMAND_KEY raw = control_map.get(key) queue = [] if raw: try: queue = json.loads(raw) if isinstance(raw, str) else list(raw) except Exception: queue = [] if not isinstance(queue, list): queue = [] queue.append({ "command_id": command_id, "trade_id": trade_id, "action": "RETRACT", "fraction": float(fraction), "reason": "HOTKEY_RETRACT", "source": "tui_hotkey", "ts": float(time.time()), "asset": asset, "side": side, "chain_root_trade_id": chain_root_trade_id, "chain_head_leg_id": chain_head_leg_id, "chain_prev_leg_id": chain_prev_leg_id, "chain_seq": chain_seq, "chain_token": chain_token, }) # Keep queue bounded in-case trader is down. queue = queue[-200:] control_map.put(key, json.dumps(queue)) msg = f"{side} {asset} retract {fraction:.0%}: cmd={command_id}" _hotkey_audit(f"reduce_{fraction:.0%}", request, "OK", { "command_id": command_id, "fraction": float(fraction), "trade_id": trade_id, "chain_root_trade_id": chain_root_trade_id, "chain_head_leg_id": chain_head_leg_id, "chain_seq": chain_seq, }) return msg except Exception as e: _hotkey_audit(f"reduce_{fraction:.0%}", request, f"ERROR: {e}") return f"ERROR: {e}" def _hotkey_close_all(hz) -> str: return _hotkey_reduce_position(1.0, hz) def _hotkey_set_posture(posture: str, hz) -> str: """Switch posture via DOLPHIN_SAFETY when explicitly enabled.""" if posture not in ("APEX", "STALKER", "TURTLE", "HIBERNATE"): return f"invalid posture {posture}" if not PINK_ALLOW_GLOBAL_POSTURE_HOTKEYS: msg = "posture hotkeys disabled (set DOLPHIN_PINK_TUI_ENABLE_GLOBAL_POSTURE=1 to enable)" _hotkey_audit(f"set_posture_{posture}", {"posture": posture}, "DISABLED") return msg safe_map = hz.get_map("DOLPHIN_SAFETY").blocking() raw = safe_map.get("latest") current = {} if raw: try: current = json.loads(raw) if isinstance(raw, str) else raw except Exception: current = {} prev_posture = current.get("posture", "?") current["posture"] = posture current["posture_ts"] = time.time() current["posture_source"] = "hotkey_tui" safe_map.put("latest", json.dumps(current)) msg = f"posture {prev_posture} → {posture}" _hotkey_audit(f"set_posture_{posture}", { "posture": posture, "prev_posture": prev_posture, }, "OK", {"new_posture": posture}) return msg # ── Non-blocking stdin reader ───────────────────────────────────────────────── def _read_hotkey(timeout_s: float = 0.05) -> str | None: """Read a single keypress from stdin without blocking. Returns key string or None.""" if not sys.stdin.isatty(): return None dr, _, _ = select.select([sys.stdin], [], [], timeout_s) if not dr: return None ch = sys.stdin.read(1) if ch == "\x1b": if select.select([sys.stdin], [], [], 0.02)[0]: seq = ch + sys.stdin.read(2) if len(seq) == 3 and seq[1] == "[": fn = {"A": None, "B": None, "C": None, "D": None, "P": "F1", "Q": "F2", "R": "F3", "S": "F4"} return fn.get(seq[2]) return None return None if ch == "\x03": raise KeyboardInterrupt if ch == "\x04": raise KeyboardInterrupt return ch if ch.isprintable() or ch in ("\n", "\r", "\t") else None HOTKEY_MAP = { "1": ("REDUCE 50%", lambda hz: _hotkey_reduce_position(0.50, hz)), "2": ("REDUCE 75%", lambda hz: _hotkey_reduce_position(0.75, hz)), "3": ("CLOSE ALL", lambda hz: _hotkey_close_all(hz)), "s": ("→ STALKER", lambda hz: _hotkey_set_posture("STALKER", hz)), "a": ("→ APEX", lambda hz: _hotkey_set_posture("APEX", hz)), "t": ("→ TURTLE", lambda hz: _hotkey_set_posture("TURTLE", hz)), "h": ("HELP", None), } def _age(ts): if not ts: return "?" s = time.time() - ts if s < 0: return "0s" if s < 60: return f"{s:.0f}s" if s < 3600: return f"{s/60:.0f}m" return f"{s/3600:.1f}h" def _age_seconds(s): if s is None: return "?" try: s = float(s) except Exception: return "?" if s < 0: return "0s" if s < 60: return f"{s:.0f}s" if s < 3600: return f"{s/60:.0f}m" return f"{s/3600:.1f}h" def _iso_age_seconds(raw): if not raw: return None if isinstance(raw, (int, float)): try: return max(0.0, time.time() - float(raw)) except Exception: return None if isinstance(raw, str): try: dt = datetime.fromisoformat(raw.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return max(0.0, time.time() - dt.timestamp()) except Exception: return None return None def _engine_snapshot_age_seconds(eng): if not isinstance(eng, dict): return None for key in ("timestamp", "updated_at", "last_update", "ts"): age = _iso_age_seconds(eng.get(key)) if age is not None: return age return None def _is_trader_live(hb, eng): hb_age = _iso_age_seconds(hb.get("ts")) if isinstance(hb, dict) else None hb_ok = hb_age is not None and hb_age < TRADER_HB_STALE_S eng_age = _engine_snapshot_age_seconds(eng) eng_ok = eng_age is not None and eng_age < TRADER_ENGINE_STALE_S live = hb_ok or eng_ok src = [] if hb_ok: src.append("hb") if eng_ok: src.append("eng") if not src: src.append("stale") return live, "+".join(src), hb_age, eng_age def _bar(v, w=20): v = max(0.0, min(1.0, v)) return "█" * round(v * w) + "░" * (w - round(v * w)) def _get(hz, map_name, key): try: raw = hz.get_map(map_name).blocking().get(key) if raw is None: return {} if isinstance(raw, (dict, list)): return raw if isinstance(raw, (bytes, bytearray)): try: raw = raw.decode("utf-8", errors="replace") except Exception: return {} if isinstance(raw, str): text = raw.strip() if not text: return {} try: return json.loads(text) except Exception: return {} return {} except Exception: return {} def _int_or_default(value, default: int = 0) -> int: """Coerce a possibly-missing snapshot value to a stable integer.""" try: if value is None or value == "": return int(default) return int(value) except Exception: try: return int(float(value)) except Exception: return int(default) # ── Gear items ──────────────────────────────────────────────────────────────── # Each returns (label, color, value_str) def _item(label, color, val=""): dot = f"{color}●{RST}" v = f":{val}" if val else "" return f"{dot}{DIM}{label}{v}{RST}" def _vel_item(vel_div): """vel_div colored by distance to threshold (-0.02).""" v = f"{vel_div:+.4f}" if vel_div <= VEL_DIV_EXTREME: return _item("vel_div", GREEN, v) # extremely bearish — great elif vel_div <= VEL_DIV_THRESHOLD: return _item("vel_div", GREEN, v) # past threshold — signal green elif vel_div <= VEL_DIV_CLOSE: return _item("vel_div", YELLOW, v) # -0.015 to -0.020 — close elif vel_div <= VEL_DIV_WARN: return _item("vel_div", ORANGE, v) # -0.010 to -0.015 — approaching elif vel_div < 0: return _item("vel_div", RED, v) # negative but far else: return _item("vel_div", RED, v) # positive — not bearish def signal_fired(vel_div, vol_ok, posture, acb_ready, exf_ok, halt): """True if ALL signal preconditions are green.""" return ( vel_div <= VEL_DIV_THRESHOLD and vol_ok and posture not in ("HIBERNATE", "TURTLE") and acb_ready and exf_ok and not halt ) def trade_can_execute(open_count, lev, abs_cap, daily_loss_ok, boost): return ( open_count == 0 # no open position already and lev < abs_cap # leverage headroom and daily_loss_ok and boost > 0 ) OB_IMBALANCE_BIAS = -0.09 # from engine config: ob_imbalance_bias def _best_fill_candidate(obf_universe): """Pick best SHORT candidate from OBF universe. Criteria: negative imbalance (bearish pressure) + high fill_probability + low spread. Returns (symbol, asset_dict) or (None, {}). """ candidates = [] for k, v in obf_universe.items(): if not isinstance(v, dict) or "fill_probability" not in v: continue candidates.append((k, v)) if not candidates: return None, {} # Score: fill_prob * (1 + bearish_imbalance_bonus) / (1 + spread_bps/10) def score(item): sym, a = item imb = float(a.get("imbalance", 0)) fp = float(a.get("fill_probability", 0)) sp = float(a.get("spread_bps", 99)) dq = float(a.get("depth_quality", 0)) # Bearish bias: reward negative imbalance, penalise positive imb_bonus = max(0.0, -imb) # 0..1 for imbalance in [-1,0] return fp * (1 + imb_bonus) * dq / max(0.1, sp) candidates.sort(key=score, reverse=True) return candidates[0] def _update_exit_tracker(positions, eng, live_price, ob_imbalance): """Accumulate MAE/MFE state for V7 comparison from live OBF prices.""" global _EXIT_TRACKER if not positions or live_price <= 0: _EXIT_TRACKER.clear() return None pos = positions[0] asset = pos.get("asset", "?") ep = float(pos.get("entry_price", 0) or 0) side = pos.get("side", "SHORT") if ep <= 0: return None key = (asset, ep) bar_idx = int(eng.get("bar_idx", 0) or 0) if key not in _EXIT_TRACKER: _EXIT_TRACKER.clear() _EXIT_TRACKER[key] = { "entry_bar": bar_idx, "first_seen": time.time(), "peak_adverse": 0.0, "peak_favorable": 0.0, "prev_mae": 0.0, "prev_mfe": 0.0, "mae_velocity": 0.0, "mfe_velocity": 0.0, "prices": deque(maxlen=60), } t = _EXIT_TRACKER[key] t["prices"].append(live_price) t["notional"] = float(pos.get("notional", 0) or 0) t["unrealized_pnl"] = float(pos.get("unrealized_pnl", 0) or 0) if side == "SHORT": pnl = (ep - live_price) / ep mae = max(0.0, (live_price - ep) / ep) mfe = max(0.0, (ep - live_price) / ep) else: pnl = (live_price - ep) / ep mae = max(0.0, (ep - live_price) / ep) mfe = max(0.0, (live_price - ep) / ep) t["peak_adverse"] = max(t["peak_adverse"], mae) t["peak_favorable"] = max(t["peak_favorable"], mfe) t["mae_velocity"] = mae - t["prev_mae"] t["mfe_velocity"] = mfe - t["prev_mfe"] t["prev_mae"] = mae t["prev_mfe"] = mfe t["bars_held"] = max(0, bar_idx - t["entry_bar"]) t["pnl_pct"] = pnl t["live_price"] = live_price t["ob_imbalance"] = ob_imbalance t["entry_price"] = ep t["side"] = side t["asset"] = asset return t def _v7_preview(t): """Simplified V7 decision from tracker state (MAE/MFE/time channels).""" if not t: return None bh = t.get("bars_held", 0) bf = min(1.0, bh / MAX_HOLD_BARS) if MAX_HOLD_BARS else 0 pa = t["peak_adverse"] pf = t["peak_favorable"] mae = t["prev_mae"] mfe = t["prev_mfe"] pnl = t.get("pnl_pct", 0) # MAE risk — V7 floor thresholds (without vol-adaptive since TUI lacks full history) mae_risk = 0.0 if pa > 0.005: mae_risk += 0.5 if pa > 0.012: mae_risk += 0.8 if pa > 0.020: mae_risk += 1.2 # MAE-B: adverse acceleration if bh >= 3 and t["mae_velocity"] > 0 and mae > 0.003: mae_risk += 0.6 # MAE-D: late-stage time-weighted if mae > 0.003 and bf > 0.60: mae_risk += (bf - 0.60) / 0.40 * 0.4 # MFE risk — convexity decay mfe_risk = 0.0 decay = (pf - mfe) / (pf + 1e-9) if pf > 0 else 0.0 if decay > 0.35 and t["mfe_velocity"] < 0 and pf > 0.01: mfe_risk += 1.5 if decay > 0.20: mfe_risk += 0.3 # Exit pressure (simplified: MAE + MFE channels weighted as V7) pressure = 2.0 * mae_risk + 2.5 * mfe_risk if bf > 0.80 and pnl < 0: pressure += 0.5 if bf > 0.95: pressure += 1.0 # Decision (mirrors V7 thresholds) if pressure > 2.0: action = "EXIT" reason = "V7.1_MAE_SL_VOL_NORM" if mae_risk > mfe_risk else "V7_COMPOSITE" elif pressure > 1.0: action = "RETRACT" reason = "V7_RISK_DOM" elif pressure < -0.5 and pnl > 0: action = "EXTEND" reason = "V7_DIR_EDGE" else: action = "HOLD" reason = "\u2014" proj_usd = pnl * t.get("notional", 0) return { "action": action, "reason": reason, "pressure": pressure, "mae": pa, "mfe": pf, "mae_risk": mae_risk, "mfe_risk": mfe_risk, "bars_held": bh, "bars_frac": bf, "pnl_pct": pnl, "proj_usd": proj_usd, } def fill_row(obf_universe, acb, eng): """Row 3: signal → asset-pick → OBF liquidity → size → ORDER.""" f_items = [] # ── Asset picker (IRP/ARS) ───────────────────────────────────────────── n_assets = int(obf_universe.get("_n_assets", 0) if obf_universe else 0) n_stale = int(obf_universe.get("_n_stale", 0) if obf_universe else 0) n_fresh = n_assets - n_stale f_items.append(_item("universe", GREEN if n_fresh >= 200 else (YELLOW if n_fresh >= 50 else RED), f"{n_fresh}/{n_assets}")) sym, ab = _best_fill_candidate(obf_universe) if sym: fill_p = float(ab.get("fill_probability", 0)) spread = float(ab.get("spread_bps", 99)) dq = float(ab.get("depth_quality", 0)) imb = float(ab.get("imbalance", 0)) depth = float(ab.get("depth_1pct_usd", 0)) # Best candidate asset asset_color = GREEN if fill_p >= 0.80 else (YELLOW if fill_p >= 0.50 else RED) f_items.append(_item("best", asset_color, sym[:6])) # OBF: fill probability f_items.append(_item("fill_p", GREEN if fill_p >= 0.85 else (YELLOW if fill_p >= 0.60 else RED), f"{fill_p:.2f}")) # OBF: spread f_items.append(_item("spread", GREEN if spread <= 3 else (YELLOW if spread <= 8 else RED), f"{spread:.1f}bps")) # OBF: depth quality f_items.append(_item("depth_q", GREEN if dq >= 0.5 else (YELLOW if dq >= 0.1 else RED), f"{dq:.2f}")) # OBF: imbalance direction (SHORT needs bearish = negative) imb_ok = imb < OB_IMBALANCE_BIAS # confirmed bearish pressure f_items.append(_item("imb", GREEN if imb_ok else YELLOW if imb < 0 else ORANGE if imb < 0.1 else RED, f"{imb:+.2f}")) # OBF: depth USD f_items.append(_item("depth", GREEN if depth >= 50_000 else (YELLOW if depth >= 10_000 else RED), f"${depth/1000:.0f}k")) else: f_items.append(_item("OBF", RED, "no data")) # ── Sizing — ACB boost × proxy_B prank ──────────────────────────────── # proxy_B prank not exposed in HZ snapshot; show ACB boost as sizing proxy boost = float(acb.get("boost", 1.0) if acb else 1.0) beta = float(acb.get("beta", 0.8) if acb else 0.8) f_items.append(_item("acb_boost", GREEN if boost >= 1.5 else (YELLOW if boost >= 1.0 else ORANGE), f"×{boost:.2f}")) f_items.append(_item("beta", GREEN if beta >= 0.7 else (YELLOW if beta >= 0.4 else RED), f"{beta:.2f}")) # ── ORDER indicator ──────────────────────────────────────────────────── # Would an order fire if signal were green right now? open_count = len(eng.get("open_positions") or []) lev = float(eng.get("current_leverage", 0) or 0) abs_c = float(eng.get("leverage_abs_cap", 9.0) or 9.0) order_ready = ( sym is not None and fill_p >= 0.60 and open_count == 0 and lev < abs_c and boost > 0 ) if sym else False if order_ready: f_items.append(f" {CYAN}{BOLD}◉ ORDER READY{RST}") else: f_items.append(f" {DIM}(order: waiting){RST}") return " ".join(f_items) def gear_rows(eng, safe, acb, exf, hb, obf_universe=None): """Return three formatted rows: SIGNAL, TRADE gates, FILL path.""" vel_div = float(eng.get("last_vel_div", 0) or 0) vol_ok = bool(eng.get("vol_ok", False)) posture = safe.get("posture") or eng.get("posture") or "?" halt = posture in ("HIBERNATE", "TURTLE") acb_boost_val = float(acb.get("boost", acb.get("cut", 0)) or 0) acb_ready = acb_boost_val > 0 # cut=0 means blocked exf_ok_count = int(exf.get("_ok_count", 0) if exf else 0) exf_ok = exf_ok_count >= 3 open_count = len(eng.get("open_positions") or []) lev = float(eng.get("current_leverage", 0) or 0) abs_cap = float(eng.get("leverage_abs_cap", 9.0) or 9.0) trades_ex = int(eng.get("trades_executed") or 0) hb_ts = hb.get("ts") hb_ok = bool(hb_ts and (time.time() - hb_ts) < 30) # ── SIGNAL ROW ──────────────────────────────────────────────────────────── # vol_ok is the MASTER GATE — listed first. When False, _try_entry is never # called regardless of vel_div. Threshold is engine-configured runtime value. s_items = [] # BTC vol — try to get live reading from exf or obf for display context btc_vol_str = "—" if exf: dvol_raw = exf.get("dvol_btc") or exf.get("dvol") fng_raw = exf.get("fng") if dvol_raw: btc_vol_str = f"dV:{float(dvol_raw):.0f}" if fng_raw: btc_vol_str += f" FnG:{float(fng_raw):.0f}" vol_label = f"vol_ok({btc_vol_str})" s_items.append(_item(vol_label, GREEN if vol_ok else RED, "✓" if vol_ok else f"✗ BLOCKED")) s_items.append(_vel_item(vel_div)) # posture gate pc = PC.get(posture, DIM) posture_ok = posture in ("APEX", "STALKER") s_items.append(_item("posture", GREEN if posture == "APEX" else (YELLOW if posture == "STALKER" else RED), posture)) # acb_ready s_items.append(_item("acb", GREEN if acb_ready else (ORANGE if acb_boost_val > 0 else RED), f"{acb_boost_val:.2f}")) # exf_ok — external factors pipeline s_items.append(_item("exf", GREEN if exf_ok else (YELLOW if exf_ok_count >= 1 else RED), f"{exf_ok_count}/5")) # halt gate s_items.append(_item("no_halt", GREEN if not halt else RED, "✓" if not halt else "HALT")) # heartbeat s_items.append(_item("hb", GREEN if hb_ok else RED, _age(hb_ts))) # ALL GREEN → fire indicator all_sig = signal_fired(vel_div, vol_ok, posture, acb_ready, exf_ok, halt) if all_sig: s_items.append(f" {GREEN}{BOLD}◉ SIGNAL{RST}") # ── TRADE ROW ───────────────────────────────────────────────────────────── # Additional gates that must pass before a matched signal becomes a fill t_items = [] # open positions t_items.append(_item("open_pos", GREEN if open_count == 0 else ORANGE, str(open_count))) # leverage headroom lev_pct = lev / abs_cap if abs_cap else 0 t_items.append(_item("lev", GREEN if lev_pct < 0.3 else (YELLOW if lev_pct < 0.7 else RED), f"{lev:.2f}x/{abs_cap:.0f}")) # regime_dd_halt t_items.append(_item("regime", GREEN if not halt else RED, "free" if not halt else "HALTED")) # Rm strength rm = float(safe.get("Rm", 0) or 0) t_items.append(_item("Rm", GREEN if rm >= 0.90 else (YELLOW if rm >= 0.70 else (ORANGE if rm >= 0.50 else RED)), f"{rm:.3f}")) # Cat5 (intraday drawdown contribution) c5 = float((safe.get("breakdown") or {}).get("Cat5", 1.0) or 1.0) t_items.append(_item("Cat5", GREEN if c5 >= 0.95 else (YELLOW if c5 >= 0.85 else (ORANGE if c5 >= 0.70 else RED)), f"{c5:.3f}")) # trades today t_items.append(_item("trades", GREEN if trades_ex < 20 else (YELLOW if trades_ex < 35 else ORANGE), str(trades_ex))) # ALL GREEN trade execute indicator daily_loss_ok = c5 > 0.50 # reasonable proxy — Cat5 tracks drawdown all_trade = all_sig and trade_can_execute(open_count, lev, abs_cap, daily_loss_ok, acb_boost_val) if all_trade: t_items.append(f" {CYAN}{BOLD}◉ TRADE{RST}") sig_row = " ".join(s_items) trade_row = " ".join(t_items) fill = fill_row(obf_universe or {}, acb, eng) return sig_row, trade_row, fill def render(hz): global START_CAP, CAP_PEAK # ── DITAv2 kernel + BingX account (reads kernel snapshot file, not Hz) ── ditav2_section = _render_ditav2_section() eng = _get(hz, PINK_STATE_MAP, "engine_snapshot") cap = _get(hz, PINK_STATE_MAP, "capital_checkpoint") safe = _get(hz, PINK_SAFETY_MAP, "latest") hb = _get(hz, PINK_HEARTBEAT_MAP, PINK_HEARTBEAT_KEY) mh = _get(hz, PINK_META_HEALTH_MAP, "latest") ann = _get(hz, PINK_ANNOUNCEMENTS_MAP, "latest") acb = _get(hz, PINK_FEATURES_MAP, "acb_boost") exf = _get(hz, PINK_FEATURES_MAP, "exf_latest") obf = _get(hz, PINK_FEATURES_MAP, "obf_universe_latest") esof = _get(hz, PINK_FEATURES_MAP, "esof_advisor_latest") if not esof: esof = _get(hz, PINK_FEATURES_MAP, "esof_latest") maras = _get(hz, PINK_FEATURES_MAP, "maras_latest") eng, account_row = _overlay_account_event_state(eng) now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") capital = float(eng.get("capital", 0) or cap.get("capital", 0)) posture = safe.get("posture") or eng.get("posture") or "?" rm = float(safe.get("Rm", 0)) hb_ts = hb.get("ts") phase = hb.get("phase", "?") trader_up, trader_src, hb_age_s, eng_age_s = _is_trader_live(hb, eng) trades = _int_or_default(eng.get("trades_executed"), 0) fills_today = _account_event_fill_count(account_row) scans = _int_or_default(eng.get("scans_processed"), _int_or_default(eng.get("last_scan_number"), 0)) lev = float(eng.get("current_leverage", 0)) notional= float(eng.get("open_notional", 0)) mhs_st = mh.get("status", "?") rm_meta = float(mh.get("rm_meta", 0)) ann_title = ann.get("title") if ann else None ann_sev = (ann.get("severity") or "info") if ann else "info" ann_col = {"critical": RED, "warning": YELLOW, "info": CYAN}.get(ann_sev, DIM) ann_ts = ann.get("ts") if ann else None ann_age = _age_seconds(_iso_age_seconds(ann_ts)) if ann_ts else "?" if capital > 0: if START_CAP is None: START_CAP = capital if CAP_PEAK is None or capital > CAP_PEAK: CAP_PEAK = capital roi = ((capital - START_CAP) / START_CAP * 100) if START_CAP and capital else 0 dd = ((CAP_PEAK - capital) / CAP_PEAK * 100) if CAP_PEAK and capital < CAP_PEAK else 0 pc = PC.get(posture, DIM) sc = SC.get(mhs_st, DIM) tc = GREEN if trader_up else RED roi_c = GREEN if roi >= 0 else RED dd_c = RED if dd > 15 else (YELLOW if dd > 5 else GREEN) sig_row, trade_row, fill_row_str = gear_rows(eng, safe, acb, exf, hb, obf) svc = mh.get("service_status", {}) hz_ks = mh.get("hz_key_status", {}) L = [] L.append(f"{BOLD}{CYAN}🐬 DOLPHIN-PINK{RST} {BOLD}v7{RST} {DIM}{now}{RST}") L.append("─" * 65) # TRADER L.append(f"{BOLD}TRADER{RST} {tc}{'● LIVE' if trader_up else '● DOWN'}{RST}" f" phase:{phase} hb:{_age(hb_ts)} eng:{_age_seconds(eng_age_s)}" f" src:{trader_src}" f" scan:#{eng.get('last_scan_number','?')}") # ── SIGNAL → FILL GEARS ─────────────────────────────────────────────────── vol_ok_live = bool(eng.get("vol_ok", False)) vol_gate_threshold = float(eng.get("vol_gate_threshold", VOL_P60) or VOL_P60) if not vol_ok_live: L.append( f" {RED}{BOLD}⛔ VOL_OK=FALSE — engine gate closed, " f"NO trades until BTC vol > {vol_gate_threshold:.6f}{RST}" ) L.append(f" {DIM}SIG │{RST} {sig_row}") L.append(f" {DIM}TRD │{RST} {trade_row}") L.append(f" {DIM}FIL │{RST} {fill_row_str}") # ── EsoF ADVISORY ───────────────────────────────────────────────────────── if esof: _ec = { "FAVORABLE": GREEN, "MILD_POSITIVE": "\033[92m", "NEUTRAL": YELLOW, "MILD_NEGATIVE": "\033[91m", "UNFAVORABLE": RED, } _lbl = esof.get("advisory_label", "?") _col = _ec.get(_lbl, DIM) _sc = float(esof.get("advisory_score", 0)) _sess = esof.get("session", "?") _dow = esof.get("dow_name", "?") _slot = esof.get("slot_15m", "?") _swr = esof.get("session_wr_pct", 0) _dwr = esof.get("dow_wr_pct", 0) _moon = esof.get("moon_phase", "?")[:8] _retro= " ☿RETRO" if esof.get("mercury_retrograde") else "" L.append(f" {DIM}EsoF│{RST} {_col}{_lbl:<13}{RST} sc:{_col}{_sc:+.3f}{RST} " f"sess:{_sess}({_swr:.0f}%) " f"dow:{_dow}({_dwr:.0f}%) " f"slot:{_slot} {DIM}{_moon}{_retro}{RST}") else: L.append(f" {DIM}EsoF│ advisory unavailable (keys: esof_advisor_latest/esof_latest){RST}") L.append("") # ── CH persistence ───────────────────────────────────────────────────────── # CAPITAL L.append(f"{BOLD}CAPITAL{RST} {CYAN}${capital:,.2f}{RST}" + (f" ROI:{roi_c}{roi:+.2f}%{RST} DD:{dd_c}{dd:.2f}%{RST}" f" start:${START_CAP:,.0f}" if START_CAP else "")) bar_idx = _int_or_default(eng.get("bar_idx"), _int_or_default(eng.get("last_scan_number"), 0)) L.append(f" trades:{trades} fills:{fills_today} scans:{scans} bar:{bar_idx}" f" lev:{lev:.2f}x notional:${notional:,.0f}") # Open positions + EXIT COMPARISON positions = eng.get("open_positions") or [] if positions: _pa = positions[0].get("asset", "") _lp = 0.0; _obi = 0.0 if _pa and obf: _oad = obf.get(_pa, {}) _ob_bid = float(_oad.get("best_bid", 0) or 0) _ob_ask = float(_oad.get("best_ask", 0) or 0) _lp = (_ob_bid + _ob_ask) / 2 if _ob_bid > 0 and _ob_ask > 0 else 0 _obi = float(_oad.get("imbalance", 0) or 0) L.append(f" {BOLD}OPEN:{RST}") for p in positions: sc2 = GREEN if p.get("side") == "LONG" else RED upnl, cur = _derive_open_position_pnl(p, obf) cur_s = f" mark:{cur:.4g}" if cur > 0 else "" L.append(f" {sc2}{p.get('asset','?')} {p.get('side','?')}{RST}" f" qty:{p.get('quantity',0):.4f}" f" entry:{p.get('entry_price',0):.2f}" f" upnl:{upnl:+.2f}{cur_s}") # ── EXIT COMPARISON: base engine vs V7 ────────────────────────────── _etr = _update_exit_tracker(positions, eng, _lp, _obi) _v7p = _v7_preview(_etr) if _v7p: bh = _v7p["bars_held"] bf = _v7p["bars_frac"] pp = _v7p["pnl_pct"] tp_pct = abs(pp) / FIXED_TP_PCT * 100 if FIXED_TP_PCT else 0 bf_bar = round(bf * 20) bc = GREEN if bf < 0.6 else (YELLOW if bf < 0.85 else RED) tc = GREEN if pp >= FIXED_TP_PCT else (YELLOW if tp_pct > 50 else DIM) vc = RED if _v7p["action"] == "EXIT" else (YELLOW if _v7p["action"] == "RETRACT" else GREEN) ps = "+" if _v7p["proj_usd"] >= 0 else "" L.append(f" {BOLD}EXIT CMP{RST} {DIM}bar{RST} {bc}{bh}/{MAX_HOLD_BARS} [{'\u2588'*bf_bar+'\u2591'*(20-bf_bar)}]{RST} {bf*100:.0f}%" f" {DIM}TP{RST} {tc}{abs(pp)*100:.3f}%/{FIXED_TP_PCT*100:.2f}% ({tp_pct:.0f}%){RST}") L.append(f" {vc}V7:{_v7p['action']:<8}{RST} P={_v7p['pressure']:.2f}" f" mae:{_v7p['mae']:.4f} mfe:{_v7p['mfe']:.4f}" f" {DIM}\u2192{RST} {ps}${_v7p['proj_usd']:.2f} ({pp*100:+.3f}%)" f" {DIM}[{_v7p['reason']}]{RST}") else: L.append(f" {DIM}no open positions{RST}") _EXIT_TRACKER.clear() L.append("") # POSTURE bd = safe.get("breakdown") or {} L.append(f"{BOLD}POSTURE{RST} {pc}{posture}{RST} Rm:{pc}{_bar(rm,20)}{RST} {rm:.4f}") cats = " ".join(f"C{i}:{float(bd.get(f'Cat{i}',0)):.2f}" for i in range(1,6)) L.append(f" {cats} f_env:{float(bd.get('f_env',0)):.3f} f_exe:{float(bd.get('f_exe',0)):.3f}") L.append("") # SYS HEALTH L.append(f"{BOLD}SYS HEALTH{RST} {sc}{mhs_st}{RST} rm_meta:{rm_meta:.3f}") for line in _format_two_column_sensor_lines(_sys_health_sensor_rows(mh)): L.append(line) L.append(f" {DIM}services:{RST} " + " ".join( f"{'●' if st=='RUNNING' else f'{RED}●{RST}'}{DIM}{n.split(':')[-1]}{RST}" if st == "RUNNING" else f"{RED}●{DIM}{n.split(':')[-1]}{RST}" for n, st in sorted(svc.items()))) L.append(f" {DIM}hz_keys:{RST} " + " ".join( f"{GREEN if float(i.get('score',0))>=0.9 else (YELLOW if float(i.get('score',0))>=0.5 else RED)}●{RST}{DIM}{k}{RST}" for k, i in sorted(hz_ks.items()))) L.append(f" {DIM}ann:{RST} " + (f"{ann_col}{ann_title}{RST} {DIM}{ann_age}{RST}" if ann_title else f"{DIM}none{RST}")) # ── RECENT TRADES ──────────────────────────────────────────────────────── _, trades_hist, fill_hist, trades_source = _trade_panel_rows(eng, obf, limit=5) if trades_hist or fill_hist: L.append("") L.append(f"{BOLD}RECENT TRADES{RST} {DIM}(from {trades_source}){RST}") if trades_hist: for t in trades_hist: pnl = _closed_trade_display_pnl(t) pct = float(t.get("pnl_pct", 0)) * 100 lev = float(t.get("leverage", 0)) ep = float(t.get("entry_price", 0)) reason = t.get("reason", "?") asset = t.get("asset", "?") bars = t.get("bars_held", 0) ts_raw = str(t.get("ts", ""))[:16].replace("T", " ") pc2 = GREEN if pnl >= 0 else RED L.append( f" {pc2}{'▲' if pnl>=0 else '▼'}{RST}" f" {asset:<12} " f"ep:{ep:.4g} " f"lev:{lev:.2f}x " f"pnl:{pc2}{pnl:+.2f}({pct:+.2f}%){RST} " f"exit:{reason} bars:{bars} {DIM}{ts_raw}{RST}" ) elif not fill_hist: L.append(f" {DIM}no recent trades yet{RST}") if fill_hist: L.append("") L.append(f"{BOLD}RECENT FILLS{RST} {DIM}(account events, not terminal closes){RST}") for t in fill_hist: pnl = _closed_trade_display_pnl(t) pct = float(t.get("pnl_pct", 0)) * 100 lev = float(t.get("leverage", 0)) ep = float(t.get("entry_price", 0)) reason = t.get("reason", "FILL") asset = t.get("asset", "?") bars = t.get("bars_held", 0) ts_raw = str(t.get("ts", ""))[:16].replace("T", " ") pc2 = GREEN if pnl >= 0 else RED L.append( f" {pc2}•{RST}" f" {asset:<12} " f"ep:{ep:.4g} " f"lev:{lev:.2f}x " f"fill:{pc2}{pnl:+.2f}({pct:+.2f}%){RST} " f"status:{reason} bars:{bars} {DIM}{ts_raw}{RST}" ) # ── HOTKEY BAR ────────────────────────────────────────────────────────── L.append("") hk_bar = f"{DIM}HOTKEYS:{RST} " + " ".join( f"{CYAN}[{k}]{RST}{DIM}{desc[0]}{RST}" for k, desc in sorted(HOTKEY_MAP.items()) ) L.append(hk_bar) if _HOTKEY_FEEDBACK: for fb_ts, fb_msg in _HOTKEY_FEEDBACK: fb_age = time.time() - fb_ts if fb_age < 15: L.append(f" {GREEN}▶ {fb_msg}{RST} {DIM}({fb_age:.0f}s ago){RST}") L.append("") # ── MARAS footer ─────────────────────────────────────────────────────────── if maras: reg = maras.get("regime","?") sc = maras.get("final_score",0) cf = maras.get("confidence",0) cl = maras.get("conflict_level",0) ch = maras.get("composite_hash",0) sh = maras.get("scalar_hash",0) te = maras.get("tier_exf",0) tg = maras.get("tier_eigen",0) tb = maras.get("tier_btc",0) ts = maras.get("tier_esof",0) tm = maras.get("tier_micro",0) rc = {"VERY_BEARISH":RED,"BEARISH":RED,"CHOPPY_BEARISH":YELLOW, "CHOPPY":YELLOW,"SIDEWAYS":DIM,"CHOPPY_BULLISH":CYAN, "BULLISH":GREEN,"VERY_BULLISH":GREEN} col = rc.get(reg, DIM) confl_col = RED if cl > 0.5 else (YELLOW if cl > 0.3 else GREEN) L.append(f" {BOLD}{CYAN}MARAS{RST} {col}{reg}{RST}" f" score={sc:+.2f} conf={cf:.0f}%" f" {confl_col}conflict={cl:.2f}{RST}" f" chash={ch} shash={sh}" f" | ExF={te:+.2f} Eig={tg:+.2f} BTC={tb:+.2f} EsF={ts:+.2f} Mic={tm:+.2f}") else: L.append(f" {DIM}MARAS: waiting for daemon start...{RST}") L.append(f"{DIM}v7 • PINK • 0.5s poll • hotkeys • CH→status+audit • Ctrl-C quit{RST}") # ── CH persistence ───────────────────────────────────────────────────────── # Write every other cycle (1s effective rate) to avoid CH noise if int(time.time() * 2) % 2 == 0: ch_put({ "ts": int(time.time() * 1000), "capital": capital, "roi_pct": round(roi, 4), "dd_pct": round(dd, 4), "trades_executed": int(eng.get("trades_executed", 0) or 0), "posture": posture, "rm": round(rm, 6), "vel_div": round(float(eng.get("last_vel_div", 0) or 0), 6), "vol_ok": 1 if eng.get("vol_ok") else 0, "phase": phase, "mhs_status": mhs_st, "boost": round(float(acb.get("boost", 1.0) if acb else 1.0), 4), "cat5": round(float((safe.get("breakdown") or {}).get("Cat5", 1.0) or 1.0), 6), }) # Prepend the DITAv2 section at the top of the display return ditav2_section + "\n" + "\n".join(L) def _dispatch_hotkey(key: str, hz): entry = HOTKEY_MAP.get(key) if entry is None: return label, fn = entry if fn is None: return _HOTKEY_FEEDBACK.append((time.time(), f"{label} ...")) try: result = fn(hz) _HOTKEY_FEEDBACK.append((time.time(), f"{label}: {result}")) except Exception as e: _HOTKEY_FEEDBACK.append((time.time(), f"{label} FAILED: {e}")) def main(): print(f"Connecting to HZ ({PINK_STATE_MAP})...") hz = hazelcast.HazelcastClient( cluster_name="dolphin", cluster_members=["localhost:5701"], connection_timeout=5.0) posture_mode = "enabled" if PINK_ALLOW_GLOBAL_POSTURE_HOTKEYS else "disabled" print(f"Connected. PINK hotkeys enabled (global posture hotkeys {posture_mode}).\n") fd = sys.stdin.fileno() old_term = None if sys.stdin.isatty(): old_term = termios.tcgetattr(fd) tty.setcbreak(fd) try: while True: try: sys.stdout.write(CLEAR + render(hz) + "\n") sys.stdout.flush() except Exception as e: sys.stdout.write(f"\n{RED}render error: {e}{RST}\n") sys.stdout.flush() try: key = _read_hotkey(0.5) if key: _dispatch_hotkey(key, hz) except KeyboardInterrupt: raise except Exception: pass except KeyboardInterrupt: print(f"\n{DIM}Bye.{RST}") finally: if old_term is not None: termios.tcsetattr(fd, termios.TCSADRAIN, old_term) hz.shutdown() if __name__ == "__main__": main()