#!/usr/bin/env python3 """DOLPHIN live status — v6 0.5s poll. SIG/TRD/FIL gear rows + last-5-trades + CH persistence + V7 exit cmp. Run: source /home/dolphin/siloqy_env/bin/activate && python dolphin_status.py Quit: Ctrl-C """ # v1–v5 archived as dolphin_status_v{1..5}.py # v6: exit-comparison overlay (V7 preview), net-pnl pct fix import json, re, threading, time, sys, urllib.request, urllib.parse from collections import deque from datetime import datetime, timezone from pathlib import Path import hazelcast # ── ClickHouse fire-and-forget write ───────────────────────────────────────── _CH_URL = "http://localhost:8123" _CH_USER = "dolphin" _CH_PASS = "dolphin_ch_2026" _CH_Q: deque = deque(maxlen=500) def _ch_worker(): while True: time.sleep(2) rows = [] while _CH_Q: try: rows.append(_CH_Q.popleft()) except IndexError: break if not rows: continue body = "\n".join(json.dumps(r) for r in rows).encode() url = f"{_CH_URL}/?database=dolphin&query=INSERT+INTO+status_snapshots+FORMAT+JSONEachRow" req = urllib.request.Request(url, data=body, method="POST") req.add_header("X-ClickHouse-User", _CH_USER) req.add_header("X-ClickHouse-Key", _CH_PASS) req.add_header("Content-Type", "application/octet-stream") try: urllib.request.urlopen(req, timeout=4) except Exception: pass # observability is non-critical threading.Thread(target=_ch_worker, daemon=True, name="ch-status").start() def ch_put(row: dict): _CH_Q.append(row) # ── Trade log parser ────────────────────────────────────────────────────────── _TRADER_LOG = Path("/mnt/dolphinng5_predict/prod/supervisor/logs/nautilus_trader.log") # Capture the JSON dict only — stop at first } closing the payload. # Lines may have a trailing tag like [v2_gold_fix_v50-v750] after the dict. _RE_ENTRY = re.compile(r"\[(.+?)\] ENTRY: (\{.+?\})(?:\s*\[.*\])?$") _RE_EXIT = re.compile(r"\[(.+?)\] EXIT: (\{.+?\})(?:\s*\[.*\])?$") def _parse_log_dict(raw: str) -> dict: """Parse a Python dict repr from a log line. Handles nan and single-quoted strings.""" import ast # Replace nan/inf with JSON-safe equivalents before parsing cleaned = raw.replace(": nan", ": null").replace(": inf", ": null").replace(": -inf", ": null") try: return ast.literal_eval(raw) # handles all Python literal forms incl. nan except Exception: pass try: return json.loads(cleaned.replace("'", '"')) except Exception: raise ValueError(f"unparseable: {raw[:80]}") def _last_n_trades(n=5): """Parse last N completed trades from supervisor log. Returns list of dicts.""" try: lines = _TRADER_LOG.read_text(errors="replace").splitlines()[-4000:] except Exception: return [] entries = {} trades = [] for line in lines: m = _RE_ENTRY.search(line) if m: try: d = _parse_log_dict(m.group(2)) entries[d["trade_id"]] = {"ts": m.group(1), **d} except Exception: pass m = _RE_EXIT.search(line) if m: try: d = _parse_log_dict(m.group(2)) tid = d.get("trade_id") if tid and tid in entries: e = entries.pop(tid) trades.append({**e, "exit_ts": m.group(1), "reason": d.get("reason","?"), "pnl_pct": d.get("pnl_pct", 0), "net_pnl": d.get("net_pnl", 0), "bars_held": d.get("bars_held", 0)}) except Exception: pass return trades[-n:] CLEAR = "\033[2J\033[H" BOLD = "\033[1m"; DIM = "\033[2m"; RST = "\033[0m" GREEN = "\033[32m"; YELLOW = "\033[33m"; RED = "\033[31m"; CYAN = "\033[36m" ORANGE = "\033[38;5;208m" PC = {"APEX": GREEN, "STALKER": YELLOW, "TURTLE": ORANGE, "HIBERNATE": RED} SC = {"GREEN": GREEN, "DEGRADED": YELLOW, "CRITICAL": ORANGE, "DEAD": RED} # Thresholds from nautilus_event_trader.py VEL_DIV_THRESHOLD = -0.020 # signal fires when vel_div < this VEL_DIV_EXTREME = -0.050 # extreme bearish VEL_DIV_WARN = -0.010 # approaching threshold (yellow) VEL_DIV_CLOSE = -0.015 # nearly there (orange→yellow) VOL_P60 = 0.00026414 # BTC 50-bar realised vol p60 — MASTER GATE BTC_VOL_WINDOW = 50 # bars used for vol calc FIXED_TP_PCT = 0.0095 # BLUE TP target (0.95%) MAX_HOLD_BARS = 250 # BLUE max hold bars START_CAP = None CAP_PEAK = None _EXIT_TRACKER: dict = {} # (asset, entry_price) → accumulated V7 comparison state def _age(ts): if not ts: return "?" s = time.time() - ts if s < 0: return "0s" if s < 60: return f"{s:.0f}s" if s < 3600: return f"{s/60:.0f}m" return f"{s/3600:.1f}h" def _bar(v, w=20): v = max(0.0, min(1.0, v)) return "█" * round(v * w) + "░" * (w - round(v * w)) def _get(hz, map_name, key): try: raw = hz.get_map(map_name).blocking().get(key) return json.loads(raw) if raw else {} except Exception: return {} # ── Gear items ──────────────────────────────────────────────────────────────── # Each returns (label, color, value_str) def _item(label, color, val=""): dot = f"{color}●{RST}" v = f":{val}" if val else "" return f"{dot}{DIM}{label}{v}{RST}" def _vel_item(vel_div): """vel_div colored by distance to threshold (-0.02).""" v = f"{vel_div:+.4f}" if vel_div <= VEL_DIV_EXTREME: return _item("vel_div", GREEN, v) # extremely bearish — great elif vel_div <= VEL_DIV_THRESHOLD: return _item("vel_div", GREEN, v) # past threshold — signal green elif vel_div <= VEL_DIV_CLOSE: return _item("vel_div", YELLOW, v) # -0.015 to -0.020 — close elif vel_div <= VEL_DIV_WARN: return _item("vel_div", ORANGE, v) # -0.010 to -0.015 — approaching elif vel_div < 0: return _item("vel_div", RED, v) # negative but far else: return _item("vel_div", RED, v) # positive — not bearish def signal_fired(vel_div, vol_ok, posture, acb_ready, exf_ok, halt): """True if ALL signal preconditions are green.""" return ( vel_div <= VEL_DIV_THRESHOLD and vol_ok and posture not in ("HIBERNATE", "TURTLE") and acb_ready and exf_ok and not halt ) def trade_can_execute(open_count, lev, abs_cap, daily_loss_ok, boost): return ( open_count == 0 # no open position already and lev < abs_cap # leverage headroom and daily_loss_ok and boost > 0 ) OB_IMBALANCE_BIAS = -0.09 # from engine config: ob_imbalance_bias def _best_fill_candidate(obf_universe): """Pick best SHORT candidate from OBF universe. Criteria: negative imbalance (bearish pressure) + high fill_probability + low spread. Returns (symbol, asset_dict) or (None, {}). """ candidates = [] for k, v in obf_universe.items(): if not isinstance(v, dict) or "fill_probability" not in v: continue candidates.append((k, v)) if not candidates: return None, {} # Score: fill_prob * (1 + bearish_imbalance_bonus) / (1 + spread_bps/10) def score(item): sym, a = item imb = float(a.get("imbalance", 0)) fp = float(a.get("fill_probability", 0)) sp = float(a.get("spread_bps", 99)) dq = float(a.get("depth_quality", 0)) # Bearish bias: reward negative imbalance, penalise positive imb_bonus = max(0.0, -imb) # 0..1 for imbalance in [-1,0] return fp * (1 + imb_bonus) * dq / max(0.1, sp) candidates.sort(key=score, reverse=True) return candidates[0] def _update_exit_tracker(positions, eng, live_price, ob_imbalance): """Accumulate MAE/MFE state for V7 comparison from live OBF prices.""" global _EXIT_TRACKER if not positions or live_price <= 0: _EXIT_TRACKER.clear() return None pos = positions[0] asset = pos.get("asset", "?") ep = float(pos.get("entry_price", 0) or 0) side = pos.get("side", "SHORT") if ep <= 0: return None key = (asset, ep) bar_idx = int(eng.get("bar_idx", 0) or 0) if key not in _EXIT_TRACKER: _EXIT_TRACKER.clear() _EXIT_TRACKER[key] = { "entry_bar": bar_idx, "first_seen": time.time(), "peak_adverse": 0.0, "peak_favorable": 0.0, "prev_mae": 0.0, "prev_mfe": 0.0, "mae_velocity": 0.0, "mfe_velocity": 0.0, "prices": deque(maxlen=60), } t = _EXIT_TRACKER[key] t["prices"].append(live_price) t["notional"] = float(pos.get("notional", 0) or 0) t["unrealized_pnl"] = float(pos.get("unrealized_pnl", 0) or 0) if side == "SHORT": pnl = (ep - live_price) / ep mae = max(0.0, (live_price - ep) / ep) mfe = max(0.0, (ep - live_price) / ep) else: pnl = (live_price - ep) / ep mae = max(0.0, (ep - live_price) / ep) mfe = max(0.0, (live_price - ep) / ep) t["peak_adverse"] = max(t["peak_adverse"], mae) t["peak_favorable"] = max(t["peak_favorable"], mfe) t["mae_velocity"] = mae - t["prev_mae"] t["mfe_velocity"] = mfe - t["prev_mfe"] t["prev_mae"] = mae t["prev_mfe"] = mfe t["bars_held"] = max(0, bar_idx - t["entry_bar"]) t["pnl_pct"] = pnl t["live_price"] = live_price t["ob_imbalance"] = ob_imbalance t["entry_price"] = ep t["side"] = side t["asset"] = asset return t def _v7_preview(t): """Simplified V7 decision from tracker state (MAE/MFE/time channels).""" if not t: return None bh = t.get("bars_held", 0) bf = min(1.0, bh / MAX_HOLD_BARS) if MAX_HOLD_BARS else 0 pa = t["peak_adverse"] pf = t["peak_favorable"] mae = t["prev_mae"] mfe = t["prev_mfe"] pnl = t.get("pnl_pct", 0) # MAE risk — V7 floor thresholds (without vol-adaptive since TUI lacks full history) mae_risk = 0.0 if pa > 0.005: mae_risk += 0.5 if pa > 0.012: mae_risk += 0.8 if pa > 0.020: mae_risk += 1.2 # MAE-B: adverse acceleration if bh >= 3 and t["mae_velocity"] > 0 and mae > 0.003: mae_risk += 0.6 # MAE-D: late-stage time-weighted if mae > 0.003 and bf > 0.60: mae_risk += (bf - 0.60) / 0.40 * 0.4 # MFE risk — convexity decay mfe_risk = 0.0 decay = (pf - mfe) / (pf + 1e-9) if pf > 0 else 0.0 if decay > 0.35 and t["mfe_velocity"] < 0 and pf > 0.01: mfe_risk += 1.5 if decay > 0.20: mfe_risk += 0.3 # Exit pressure (simplified: MAE + MFE channels weighted as V7) pressure = 2.0 * mae_risk + 2.5 * mfe_risk if bf > 0.80 and pnl < 0: pressure += 0.5 if bf > 0.95: pressure += 1.0 # Decision (mirrors V7 thresholds) if pressure > 2.0: action = "EXIT" reason = "V7_MAE_SL" if mae_risk > mfe_risk else "V7_COMPOSITE" elif pressure > 1.0: action = "RETRACT" reason = "V7_RISK_DOM" elif pressure < -0.5 and pnl > 0: action = "EXTEND" reason = "V7_DIR_EDGE" else: action = "HOLD" reason = "\u2014" proj_usd = pnl * t.get("notional", 0) return { "action": action, "reason": reason, "pressure": pressure, "mae": pa, "mfe": pf, "mae_risk": mae_risk, "mfe_risk": mfe_risk, "bars_held": bh, "bars_frac": bf, "pnl_pct": pnl, "proj_usd": proj_usd, } def fill_row(obf_universe, acb, eng): """Row 3: signal → asset-pick → OBF liquidity → size → ORDER.""" f_items = [] # ── Asset picker (IRP/ARS) ───────────────────────────────────────────── n_assets = int(obf_universe.get("_n_assets", 0) if obf_universe else 0) n_stale = int(obf_universe.get("_n_stale", 0) if obf_universe else 0) n_fresh = n_assets - n_stale f_items.append(_item("universe", GREEN if n_fresh >= 200 else (YELLOW if n_fresh >= 50 else RED), f"{n_fresh}/{n_assets}")) sym, ab = _best_fill_candidate(obf_universe) if sym: fill_p = float(ab.get("fill_probability", 0)) spread = float(ab.get("spread_bps", 99)) dq = float(ab.get("depth_quality", 0)) imb = float(ab.get("imbalance", 0)) depth = float(ab.get("depth_1pct_usd", 0)) # Best candidate asset asset_color = GREEN if fill_p >= 0.80 else (YELLOW if fill_p >= 0.50 else RED) f_items.append(_item("best", asset_color, sym[:6])) # OBF: fill probability f_items.append(_item("fill_p", GREEN if fill_p >= 0.85 else (YELLOW if fill_p >= 0.60 else RED), f"{fill_p:.2f}")) # OBF: spread f_items.append(_item("spread", GREEN if spread <= 3 else (YELLOW if spread <= 8 else RED), f"{spread:.1f}bps")) # OBF: depth quality f_items.append(_item("depth_q", GREEN if dq >= 0.5 else (YELLOW if dq >= 0.1 else RED), f"{dq:.2f}")) # OBF: imbalance direction (SHORT needs bearish = negative) imb_ok = imb < OB_IMBALANCE_BIAS # confirmed bearish pressure f_items.append(_item("imb", GREEN if imb_ok else YELLOW if imb < 0 else ORANGE if imb < 0.1 else RED, f"{imb:+.2f}")) # OBF: depth USD f_items.append(_item("depth", GREEN if depth >= 50_000 else (YELLOW if depth >= 10_000 else RED), f"${depth/1000:.0f}k")) else: f_items.append(_item("OBF", RED, "no data")) # ── Sizing — ACB boost × proxy_B prank ──────────────────────────────── # proxy_B prank not exposed in HZ snapshot; show ACB boost as sizing proxy boost = float(acb.get("boost", 1.0) if acb else 1.0) beta = float(acb.get("beta", 0.8) if acb else 0.8) f_items.append(_item("acb_boost", GREEN if boost >= 1.5 else (YELLOW if boost >= 1.0 else ORANGE), f"×{boost:.2f}")) f_items.append(_item("beta", GREEN if beta >= 0.7 else (YELLOW if beta >= 0.4 else RED), f"{beta:.2f}")) # ── ORDER indicator ──────────────────────────────────────────────────── # Would an order fire if signal were green right now? open_count = len(eng.get("open_positions") or []) lev = float(eng.get("current_leverage", 0) or 0) abs_c = float(eng.get("leverage_abs_cap", 9.0) or 9.0) order_ready = ( sym is not None and fill_p >= 0.60 and open_count == 0 and lev < abs_c and boost > 0 ) if sym else False if order_ready: f_items.append(f" {CYAN}{BOLD}◉ ORDER READY{RST}") else: f_items.append(f" {DIM}(order: waiting){RST}") return " ".join(f_items) def gear_rows(eng, safe, acb, exf, hb, obf_universe=None): """Return three formatted rows: SIGNAL, TRADE gates, FILL path.""" vel_div = float(eng.get("last_vel_div", 0) or 0) vol_ok = bool(eng.get("vol_ok", False)) posture = safe.get("posture") or eng.get("posture") or "?" halt = posture in ("HIBERNATE", "TURTLE") acb_boost_val = float(acb.get("boost", acb.get("cut", 0)) or 0) acb_ready = acb_boost_val > 0 # cut=0 means blocked exf_ok_count = int(exf.get("_ok_count", 0) if exf else 0) exf_ok = exf_ok_count >= 3 open_count = len(eng.get("open_positions") or []) lev = float(eng.get("current_leverage", 0) or 0) abs_cap = float(eng.get("leverage_abs_cap", 9.0) or 9.0) trades_ex = int(eng.get("trades_executed") or 0) hb_ts = hb.get("ts") hb_ok = bool(hb_ts and (time.time() - hb_ts) < 30) # ── SIGNAL ROW ──────────────────────────────────────────────────────────── # vol_ok is the MASTER GATE — listed first. When False, _try_entry is never # called regardless of vel_div. BTC 50-bar realised vol must exceed p60=0.000264. s_items = [] # BTC vol — try to get live reading from exf or obf for display context btc_vol_str = "—" if exf: dvol_raw = exf.get("dvol_btc") or exf.get("dvol") fng_raw = exf.get("fng") if dvol_raw: btc_vol_str = f"dV:{float(dvol_raw):.0f}" if fng_raw: btc_vol_str += f" FnG:{float(fng_raw):.0f}" vol_label = f"vol_ok({btc_vol_str})" s_items.append(_item(vol_label, GREEN if vol_ok else RED, "✓" if vol_ok else f"✗ BLOCKED")) s_items.append(_vel_item(vel_div)) # posture gate pc = PC.get(posture, DIM) posture_ok = posture in ("APEX", "STALKER") s_items.append(_item("posture", GREEN if posture == "APEX" else (YELLOW if posture == "STALKER" else RED), posture)) # acb_ready s_items.append(_item("acb", GREEN if acb_ready else (ORANGE if acb_boost_val > 0 else RED), f"{acb_boost_val:.2f}")) # exf_ok — external factors pipeline s_items.append(_item("exf", GREEN if exf_ok else (YELLOW if exf_ok_count >= 1 else RED), f"{exf_ok_count}/5")) # halt gate s_items.append(_item("no_halt", GREEN if not halt else RED, "✓" if not halt else "HALT")) # heartbeat s_items.append(_item("hb", GREEN if hb_ok else RED, _age(hb_ts))) # ALL GREEN → fire indicator all_sig = signal_fired(vel_div, vol_ok, posture, acb_ready, exf_ok, halt) if all_sig: s_items.append(f" {GREEN}{BOLD}◉ SIGNAL{RST}") # ── TRADE ROW ───────────────────────────────────────────────────────────── # Additional gates that must pass before a matched signal becomes a fill t_items = [] # open positions t_items.append(_item("open_pos", GREEN if open_count == 0 else ORANGE, str(open_count))) # leverage headroom lev_pct = lev / abs_cap if abs_cap else 0 t_items.append(_item("lev", GREEN if lev_pct < 0.3 else (YELLOW if lev_pct < 0.7 else RED), f"{lev:.2f}x/{abs_cap:.0f}")) # regime_dd_halt t_items.append(_item("regime", GREEN if not halt else RED, "free" if not halt else "HALTED")) # Rm strength rm = float(safe.get("Rm", 0) or 0) t_items.append(_item("Rm", GREEN if rm >= 0.90 else (YELLOW if rm >= 0.70 else (ORANGE if rm >= 0.50 else RED)), f"{rm:.3f}")) # Cat5 (intraday drawdown contribution) c5 = float((safe.get("breakdown") or {}).get("Cat5", 1.0) or 1.0) t_items.append(_item("Cat5", GREEN if c5 >= 0.95 else (YELLOW if c5 >= 0.85 else (ORANGE if c5 >= 0.70 else RED)), f"{c5:.3f}")) # trades today t_items.append(_item("trades", GREEN if trades_ex < 20 else (YELLOW if trades_ex < 35 else ORANGE), str(trades_ex))) # ALL GREEN trade execute indicator daily_loss_ok = c5 > 0.50 # reasonable proxy — Cat5 tracks drawdown all_trade = all_sig and trade_can_execute(open_count, lev, abs_cap, daily_loss_ok, acb_boost_val) if all_trade: t_items.append(f" {CYAN}{BOLD}◉ TRADE{RST}") sig_row = " ".join(s_items) trade_row = " ".join(t_items) fill = fill_row(obf_universe or {}, acb, eng) return sig_row, trade_row, fill def render(hz): global START_CAP, CAP_PEAK eng = _get(hz, "DOLPHIN_STATE_BLUE", "engine_snapshot") cap = _get(hz, "DOLPHIN_STATE_BLUE", "capital_checkpoint") safe = _get(hz, "DOLPHIN_SAFETY", "latest") hb = _get(hz, "DOLPHIN_HEARTBEAT", "nautilus_flow_heartbeat") mh = _get(hz, "DOLPHIN_META_HEALTH", "latest") acb = _get(hz, "DOLPHIN_FEATURES", "acb_boost") exf = _get(hz, "DOLPHIN_FEATURES", "exf_latest") obf = _get(hz, "DOLPHIN_FEATURES", "obf_universe_latest") esof = _get(hz, "DOLPHIN_FEATURES", "esof_advisor_latest") now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") capital = float(eng.get("capital", 0) or cap.get("capital", 0)) posture = safe.get("posture") or eng.get("posture") or "?" rm = float(safe.get("Rm", 0)) hb_ts = hb.get("ts") phase = hb.get("phase", "?") trader_up = hb_ts and (time.time() - hb_ts) < 30 trades = eng.get("trades_executed", "—") scans = eng.get("scans_processed", "—") lev = float(eng.get("current_leverage", 0)) notional= float(eng.get("open_notional", 0)) mhs_st = mh.get("status", "?") rm_meta = float(mh.get("rm_meta", 0)) if capital > 0: if START_CAP is None: START_CAP = capital if CAP_PEAK is None or capital > CAP_PEAK: CAP_PEAK = capital roi = ((capital - START_CAP) / START_CAP * 100) if START_CAP and capital else 0 dd = ((CAP_PEAK - capital) / CAP_PEAK * 100) if CAP_PEAK and capital < CAP_PEAK else 0 pc = PC.get(posture, DIM) sc = SC.get(mhs_st, DIM) tc = GREEN if trader_up else RED roi_c = GREEN if roi >= 0 else RED dd_c = RED if dd > 15 else (YELLOW if dd > 5 else GREEN) sig_row, trade_row, fill_row_str = gear_rows(eng, safe, acb, exf, hb, obf) svc = mh.get("service_status", {}) hz_ks = mh.get("hz_key_status", {}) L = [] L.append(f"{BOLD}{CYAN}🐬 DOLPHIN-NAUTILUS{RST} {BOLD}v6{RST} {DIM}{now}{RST}") L.append("─" * 65) # TRADER L.append(f"{BOLD}TRADER{RST} {tc}{'● LIVE' if trader_up else '● DOWN'}{RST}" f" phase:{phase} hb:{_age(hb_ts)}" f" scan:#{eng.get('last_scan_number','?')}") # ── SIGNAL → FILL GEARS ─────────────────────────────────────────────────── vol_ok_live = bool(eng.get("vol_ok", False)) if not vol_ok_live: L.append(f" {RED}{BOLD}⛔ VOL_OK=FALSE — engine gate closed, NO trades until BTC vol > {VOL_P60:.6f}{RST}") L.append(f" {DIM}SIG │{RST} {sig_row}") L.append(f" {DIM}TRD │{RST} {trade_row}") L.append(f" {DIM}FIL │{RST} {fill_row_str}") # ── EsoF ADVISORY ───────────────────────────────────────────────────────── if esof: _ec = { "FAVORABLE": GREEN, "MILD_POSITIVE": "\033[92m", "NEUTRAL": YELLOW, "MILD_NEGATIVE": "\033[91m", "UNFAVORABLE": RED, } _lbl = esof.get("advisory_label", "?") _col = _ec.get(_lbl, DIM) _sc = float(esof.get("advisory_score", 0)) _sess = esof.get("session", "?") _dow = esof.get("dow_name", "?") _slot = esof.get("slot_15m", "?") _swr = esof.get("session_wr_pct", 0) _dwr = esof.get("dow_wr_pct", 0) _moon = esof.get("moon_phase", "?")[:8] _retro= " ☿RETRO" if esof.get("mercury_retrograde") else "" L.append(f" {DIM}EsoF│{RST} {_col}{_lbl:<13}{RST} sc:{_col}{_sc:+.3f}{RST} " f"sess:{_sess}({_swr:.0f}%) " f"dow:{_dow}({_dwr:.0f}%) " f"slot:{_slot} {DIM}{_moon}{_retro}{RST}") else: L.append(f" {DIM}EsoF│ (start esof_advisor.py for advisory){RST}") L.append("") # CAPITAL L.append(f"{BOLD}CAPITAL{RST} {CYAN}${capital:,.2f}{RST}" + (f" ROI:{roi_c}{roi:+.2f}%{RST} DD:{dd_c}{dd:.2f}%{RST}" f" start:${START_CAP:,.0f}" if START_CAP else "")) L.append(f" trades:{trades} scans:{scans} bar:{eng.get('bar_idx','?')}" f" lev:{lev:.2f}x notional:${notional:,.0f}") # Open positions + EXIT COMPARISON positions = eng.get("open_positions") or [] if positions: _pa = positions[0].get("asset", "") _lp = 0.0; _obi = 0.0 if _pa and obf: _oad = obf.get(_pa, {}) _ob_bid = float(_oad.get("best_bid", 0) or 0) _ob_ask = float(_oad.get("best_ask", 0) or 0) _lp = (_ob_bid + _ob_ask) / 2 if _ob_bid > 0 and _ob_ask > 0 else 0 _obi = float(_oad.get("imbalance", 0) or 0) L.append(f" {BOLD}OPEN:{RST}") for p in positions: sc2 = GREEN if p.get("side") == "LONG" else RED L.append(f" {sc2}{p.get('asset','?')} {p.get('side','?')}{RST}" f" qty:{p.get('quantity',0):.4f}" f" entry:{p.get('entry_price',0):.2f}" f" upnl:{p.get('unrealized_pnl',0):+.2f}") # ── EXIT COMPARISON: base engine vs V7 ────────────────────────────── _etr = _update_exit_tracker(positions, eng, _lp, _obi) _v7p = _v7_preview(_etr) if _v7p: bh = _v7p["bars_held"] bf = _v7p["bars_frac"] pp = _v7p["pnl_pct"] tp_pct = abs(pp) / FIXED_TP_PCT * 100 if FIXED_TP_PCT else 0 bf_bar = round(bf * 20) bc = GREEN if bf < 0.6 else (YELLOW if bf < 0.85 else RED) tc = GREEN if pp >= FIXED_TP_PCT else (YELLOW if tp_pct > 50 else DIM) vc = RED if _v7p["action"] == "EXIT" else (YELLOW if _v7p["action"] == "RETRACT" else GREEN) ps = "+" if _v7p["proj_usd"] >= 0 else "" L.append(f" {BOLD}EXIT CMP{RST} {DIM}bar{RST} {bc}{bh}/{MAX_HOLD_BARS} [{'\u2588'*bf_bar+'\u2591'*(20-bf_bar)}]{RST} {bf*100:.0f}%" f" {DIM}TP{RST} {tc}{abs(pp)*100:.3f}%/{FIXED_TP_PCT*100:.2f}% ({tp_pct:.0f}%){RST}") L.append(f" {vc}V7:{_v7p['action']:<8}{RST} P={_v7p['pressure']:.2f}" f" mae:{_v7p['mae']:.4f} mfe:{_v7p['mfe']:.4f}" f" {DIM}\u2192{RST} {ps}${_v7p['proj_usd']:.2f} ({pp*100:+.3f}%)" f" {DIM}[{_v7p['reason']}]{RST}") else: L.append(f" {DIM}no open positions{RST}") _EXIT_TRACKER.clear() L.append("") # POSTURE bd = safe.get("breakdown") or {} L.append(f"{BOLD}POSTURE{RST} {pc}{posture}{RST} Rm:{pc}{_bar(rm,20)}{RST} {rm:.4f}") cats = " ".join(f"C{i}:{float(bd.get(f'Cat{i}',0)):.2f}" for i in range(1,6)) L.append(f" {cats} f_env:{float(bd.get('f_env',0)):.3f} f_exe:{float(bd.get('f_exe',0)):.3f}") L.append("") # SYS HEALTH L.append(f"{BOLD}SYS HEALTH{RST} {sc}{mhs_st}{RST} rm_meta:{rm_meta:.3f}") for m in ("m1_data_infra","m1_trader","m2_heartbeat", "m3_data_freshness","m4_control_plane","m5_coherence"): v = float(mh.get(m, 0)) c = GREEN if v >= 0.9 else (YELLOW if v >= 0.5 else RED) L.append(f" {c}{m}:{v:.3f}{RST}") L.append(f" {DIM}services:{RST} " + " ".join( f"{'●' if st=='RUNNING' else f'{RED}●{RST}'}{DIM}{n.split(':')[-1]}{RST}" if st == "RUNNING" else f"{RED}●{DIM}{n.split(':')[-1]}{RST}" for n, st in sorted(svc.items()))) L.append(f" {DIM}hz_keys:{RST} " + " ".join( f"{GREEN if float(i.get('score',0))>=0.9 else (YELLOW if float(i.get('score',0))>=0.5 else RED)}●{RST}{DIM}{k}{RST}" for k, i in sorted(hz_ks.items()))) # ── LAST TRADES ────────────────────────────────────────────────────────── trades_hist = _last_n_trades(30) if trades_hist: L.append("") L.append(f"{BOLD}LAST TRADES{RST} {DIM}(from log){RST}") for t in trades_hist: pnl = float(t.get("net_pnl", 0)) _not = float(t.get("notional", 0)) pct = (pnl / _not * 100) if _not else float(t.get("pnl_pct", 0)) * 100 lev = float(t.get("leverage", 0)) ep = float(t.get("entry_price", 0)) reason = t.get("reason", "?") asset = t.get("asset", "?") bars = t.get("bars_held", 0) ts_raw = t.get("ts", "")[:16].replace("T", " ") pc2 = GREEN if pnl >= 0 else RED L.append( f" {pc2}{'▲' if pnl>=0 else '▼'}{RST}" f" {asset:<12} " f"ep:{ep:.4g} " f"lev:{lev:.2f}x " f"pnl:{pc2}{pnl:+.2f}({pct:+.2f}%){RST} " f"exit:{reason} bars:{bars} {DIM}{ts_raw}{RST}" ) else: L.append(f" {DIM}no completed trades in log yet{RST}") L.append("") L.append(f"{DIM}v6 • 0.5s poll • CH→status_snapshots • Ctrl-C quit{RST}") # ── CH persistence ───────────────────────────────────────────────────────── # Write every other cycle (1s effective rate) to avoid CH noise if int(time.time() * 2) % 2 == 0: ch_put({ "ts": int(time.time() * 1000), "capital": capital, "roi_pct": round(roi, 4), "dd_pct": round(dd, 4), "trades_executed": int(eng.get("trades_executed", 0) or 0), "posture": posture, "rm": round(rm, 6), "vel_div": round(float(eng.get("last_vel_div", 0) or 0), 6), "vol_ok": 1 if eng.get("vol_ok") else 0, "phase": phase, "mhs_status": mhs_st, "boost": round(float(acb.get("boost", 1.0) if acb else 1.0), 4), "cat5": round(float((safe.get("breakdown") or {}).get("Cat5", 1.0) or 1.0), 6), }) return "\n".join(L) def main(): print("Connecting to HZ...") hz = hazelcast.HazelcastClient( cluster_name="dolphin", cluster_members=["localhost:5701"], connection_timeout=5.0) print("Connected.\n") try: while True: try: sys.stdout.write(CLEAR + render(hz) + "\n") sys.stdout.flush() except Exception as e: sys.stdout.write(f"\n{RED}render error: {e}{RST}\n") sys.stdout.flush() time.sleep(0.5) except KeyboardInterrupt: print(f"\n{DIM}Bye.{RST}") finally: hz.shutdown() if __name__ == "__main__": main()