Files
DOLPHIN/Observability/dolphin_status.py
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

774 lines
31 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""DOLPHIN live status — v6
0.5s poll. SIG/TRD/FIL gear rows + last-5-trades + CH persistence + V7 exit cmp.
Run: source /home/dolphin/siloqy_env/bin/activate && python dolphin_status.py
Quit: Ctrl-C
"""
# v1v5 archived as dolphin_status_v{1..5}.py
# v6: exit-comparison overlay (V7 preview), net-pnl pct fix
import json, re, threading, time, sys, urllib.request, urllib.parse
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
import hazelcast
# ── ClickHouse fire-and-forget write ─────────────────────────────────────────
_CH_URL = "http://localhost:8123"
_CH_USER = "dolphin"
_CH_PASS = "dolphin_ch_2026"
_CH_Q: deque = deque(maxlen=500)
def _ch_worker():
while True:
time.sleep(2)
rows = []
while _CH_Q:
try: rows.append(_CH_Q.popleft())
except IndexError: break
if not rows: continue
body = "\n".join(json.dumps(r) for r in rows).encode()
url = f"{_CH_URL}/?database=dolphin&query=INSERT+INTO+status_snapshots+FORMAT+JSONEachRow"
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("X-ClickHouse-User", _CH_USER)
req.add_header("X-ClickHouse-Key", _CH_PASS)
req.add_header("Content-Type", "application/octet-stream")
try: urllib.request.urlopen(req, timeout=4)
except Exception: pass # observability is non-critical
threading.Thread(target=_ch_worker, daemon=True, name="ch-status").start()
def ch_put(row: dict):
_CH_Q.append(row)
# ── Trade log parser ──────────────────────────────────────────────────────────
_TRADER_LOG = Path("/mnt/dolphinng5_predict/prod/supervisor/logs/nautilus_trader.log")
# Capture the JSON dict only — stop at first } closing the payload.
# Lines may have a trailing tag like [v2_gold_fix_v50-v750] after the dict.
_RE_ENTRY = re.compile(r"\[(.+?)\] ENTRY: (\{.+?\})(?:\s*\[.*\])?$")
_RE_EXIT = re.compile(r"\[(.+?)\] EXIT: (\{.+?\})(?:\s*\[.*\])?$")
def _parse_log_dict(raw: str) -> dict:
"""Parse a Python dict repr from a log line. Handles nan and single-quoted strings."""
import ast
# Replace nan/inf with JSON-safe equivalents before parsing
cleaned = raw.replace(": nan", ": null").replace(": inf", ": null").replace(": -inf", ": null")
try:
return ast.literal_eval(raw) # handles all Python literal forms incl. nan
except Exception:
pass
try:
return json.loads(cleaned.replace("'", '"'))
except Exception:
raise ValueError(f"unparseable: {raw[:80]}")
def _last_n_trades(n=5):
"""Parse last N completed trades from supervisor log. Returns list of dicts."""
try:
lines = _TRADER_LOG.read_text(errors="replace").splitlines()[-4000:]
except Exception:
return []
entries = {}
trades = []
for line in lines:
m = _RE_ENTRY.search(line)
if m:
try:
d = _parse_log_dict(m.group(2))
entries[d["trade_id"]] = {"ts": m.group(1), **d}
except Exception:
pass
m = _RE_EXIT.search(line)
if m:
try:
d = _parse_log_dict(m.group(2))
tid = d.get("trade_id")
if tid and tid in entries:
e = entries.pop(tid)
trades.append({**e, "exit_ts": m.group(1),
"reason": d.get("reason","?"),
"pnl_pct": d.get("pnl_pct", 0),
"net_pnl": d.get("net_pnl", 0),
"bars_held": d.get("bars_held", 0)})
except Exception:
pass
return trades[-n:]
CLEAR = "\033[2J\033[H"
BOLD = "\033[1m"; DIM = "\033[2m"; RST = "\033[0m"
GREEN = "\033[32m"; YELLOW = "\033[33m"; RED = "\033[31m"; CYAN = "\033[36m"
ORANGE = "\033[38;5;208m"
PC = {"APEX": GREEN, "STALKER": YELLOW, "TURTLE": ORANGE, "HIBERNATE": RED}
SC = {"GREEN": GREEN, "DEGRADED": YELLOW, "CRITICAL": ORANGE, "DEAD": RED}
# Thresholds from nautilus_event_trader.py
VEL_DIV_THRESHOLD = -0.020 # signal fires when vel_div < this
VEL_DIV_EXTREME = -0.050 # extreme bearish
VEL_DIV_WARN = -0.010 # approaching threshold (yellow)
VEL_DIV_CLOSE = -0.015 # nearly there (orange→yellow)
VOL_P60 = 0.00026414 # BTC 50-bar realised vol p60 — MASTER GATE
BTC_VOL_WINDOW = 50 # bars used for vol calc
FIXED_TP_PCT = 0.0095 # BLUE TP target (0.95%)
MAX_HOLD_BARS = 250 # BLUE max hold bars
START_CAP = None
CAP_PEAK = None
_EXIT_TRACKER: dict = {} # (asset, entry_price) → accumulated V7 comparison state
def _age(ts):
if not ts: return "?"
s = time.time() - ts
if s < 0: return "0s"
if s < 60: return f"{s:.0f}s"
if s < 3600: return f"{s/60:.0f}m"
return f"{s/3600:.1f}h"
def _bar(v, w=20):
v = max(0.0, min(1.0, v))
return "" * round(v * w) + "" * (w - round(v * w))
def _get(hz, map_name, key):
try:
raw = hz.get_map(map_name).blocking().get(key)
return json.loads(raw) if raw else {}
except Exception:
return {}
# ── Gear items ────────────────────────────────────────────────────────────────
# Each returns (label, color, value_str)
def _item(label, color, val=""):
dot = f"{color}{RST}"
v = f":{val}" if val else ""
return f"{dot}{DIM}{label}{v}{RST}"
def _vel_item(vel_div):
"""vel_div colored by distance to threshold (-0.02)."""
v = f"{vel_div:+.4f}"
if vel_div <= VEL_DIV_EXTREME:
return _item("vel_div", GREEN, v) # extremely bearish — great
elif vel_div <= VEL_DIV_THRESHOLD:
return _item("vel_div", GREEN, v) # past threshold — signal green
elif vel_div <= VEL_DIV_CLOSE:
return _item("vel_div", YELLOW, v) # -0.015 to -0.020 — close
elif vel_div <= VEL_DIV_WARN:
return _item("vel_div", ORANGE, v) # -0.010 to -0.015 — approaching
elif vel_div < 0:
return _item("vel_div", RED, v) # negative but far
else:
return _item("vel_div", RED, v) # positive — not bearish
def signal_fired(vel_div, vol_ok, posture, acb_ready, exf_ok, halt):
"""True if ALL signal preconditions are green."""
return (
vel_div <= VEL_DIV_THRESHOLD
and vol_ok
and posture not in ("HIBERNATE", "TURTLE")
and acb_ready
and exf_ok
and not halt
)
def trade_can_execute(open_count, lev, abs_cap, daily_loss_ok, boost):
return (
open_count == 0 # no open position already
and lev < abs_cap # leverage headroom
and daily_loss_ok
and boost > 0
)
OB_IMBALANCE_BIAS = -0.09 # from engine config: ob_imbalance_bias
def _best_fill_candidate(obf_universe):
"""Pick best SHORT candidate from OBF universe.
Criteria: negative imbalance (bearish pressure) + high fill_probability + low spread.
Returns (symbol, asset_dict) or (None, {}).
"""
candidates = []
for k, v in obf_universe.items():
if not isinstance(v, dict) or "fill_probability" not in v:
continue
candidates.append((k, v))
if not candidates:
return None, {}
# Score: fill_prob * (1 + bearish_imbalance_bonus) / (1 + spread_bps/10)
def score(item):
sym, a = item
imb = float(a.get("imbalance", 0))
fp = float(a.get("fill_probability", 0))
sp = float(a.get("spread_bps", 99))
dq = float(a.get("depth_quality", 0))
# Bearish bias: reward negative imbalance, penalise positive
imb_bonus = max(0.0, -imb) # 0..1 for imbalance in [-1,0]
return fp * (1 + imb_bonus) * dq / max(0.1, sp)
candidates.sort(key=score, reverse=True)
return candidates[0]
def _update_exit_tracker(positions, eng, live_price, ob_imbalance):
"""Accumulate MAE/MFE state for V7 comparison from live OBF prices."""
global _EXIT_TRACKER
if not positions or live_price <= 0:
_EXIT_TRACKER.clear()
return None
pos = positions[0]
asset = pos.get("asset", "?")
ep = float(pos.get("entry_price", 0) or 0)
side = pos.get("side", "SHORT")
if ep <= 0:
return None
key = (asset, ep)
bar_idx = int(eng.get("bar_idx", 0) or 0)
if key not in _EXIT_TRACKER:
_EXIT_TRACKER.clear()
_EXIT_TRACKER[key] = {
"entry_bar": bar_idx, "first_seen": time.time(),
"peak_adverse": 0.0, "peak_favorable": 0.0,
"prev_mae": 0.0, "prev_mfe": 0.0,
"mae_velocity": 0.0, "mfe_velocity": 0.0,
"prices": deque(maxlen=60),
}
t = _EXIT_TRACKER[key]
t["prices"].append(live_price)
t["notional"] = float(pos.get("notional", 0) or 0)
t["unrealized_pnl"] = float(pos.get("unrealized_pnl", 0) or 0)
if side == "SHORT":
pnl = (ep - live_price) / ep
mae = max(0.0, (live_price - ep) / ep)
mfe = max(0.0, (ep - live_price) / ep)
else:
pnl = (live_price - ep) / ep
mae = max(0.0, (ep - live_price) / ep)
mfe = max(0.0, (live_price - ep) / ep)
t["peak_adverse"] = max(t["peak_adverse"], mae)
t["peak_favorable"] = max(t["peak_favorable"], mfe)
t["mae_velocity"] = mae - t["prev_mae"]
t["mfe_velocity"] = mfe - t["prev_mfe"]
t["prev_mae"] = mae
t["prev_mfe"] = mfe
t["bars_held"] = max(0, bar_idx - t["entry_bar"])
t["pnl_pct"] = pnl
t["live_price"] = live_price
t["ob_imbalance"] = ob_imbalance
t["entry_price"] = ep
t["side"] = side
t["asset"] = asset
return t
def _v7_preview(t):
"""Simplified V7 decision from tracker state (MAE/MFE/time channels)."""
if not t:
return None
bh = t.get("bars_held", 0)
bf = min(1.0, bh / MAX_HOLD_BARS) if MAX_HOLD_BARS else 0
pa = t["peak_adverse"]
pf = t["peak_favorable"]
mae = t["prev_mae"]
mfe = t["prev_mfe"]
pnl = t.get("pnl_pct", 0)
# MAE risk — V7 floor thresholds (without vol-adaptive since TUI lacks full history)
mae_risk = 0.0
if pa > 0.005: mae_risk += 0.5
if pa > 0.012: mae_risk += 0.8
if pa > 0.020: mae_risk += 1.2
# MAE-B: adverse acceleration
if bh >= 3 and t["mae_velocity"] > 0 and mae > 0.003:
mae_risk += 0.6
# MAE-D: late-stage time-weighted
if mae > 0.003 and bf > 0.60:
mae_risk += (bf - 0.60) / 0.40 * 0.4
# MFE risk — convexity decay
mfe_risk = 0.0
decay = (pf - mfe) / (pf + 1e-9) if pf > 0 else 0.0
if decay > 0.35 and t["mfe_velocity"] < 0 and pf > 0.01:
mfe_risk += 1.5
if decay > 0.20:
mfe_risk += 0.3
# Exit pressure (simplified: MAE + MFE channels weighted as V7)
pressure = 2.0 * mae_risk + 2.5 * mfe_risk
if bf > 0.80 and pnl < 0:
pressure += 0.5
if bf > 0.95:
pressure += 1.0
# Decision (mirrors V7 thresholds)
if pressure > 2.0:
action = "EXIT"
reason = "V7_MAE_SL" if mae_risk > mfe_risk else "V7_COMPOSITE"
elif pressure > 1.0:
action = "RETRACT"
reason = "V7_RISK_DOM"
elif pressure < -0.5 and pnl > 0:
action = "EXTEND"
reason = "V7_DIR_EDGE"
else:
action = "HOLD"
reason = "\u2014"
proj_usd = pnl * t.get("notional", 0)
return {
"action": action, "reason": reason, "pressure": pressure,
"mae": pa, "mfe": pf, "mae_risk": mae_risk, "mfe_risk": mfe_risk,
"bars_held": bh, "bars_frac": bf, "pnl_pct": pnl,
"proj_usd": proj_usd,
}
def fill_row(obf_universe, acb, eng):
"""Row 3: signal → asset-pick → OBF liquidity → size → ORDER."""
f_items = []
# ── Asset picker (IRP/ARS) ─────────────────────────────────────────────
n_assets = int(obf_universe.get("_n_assets", 0) if obf_universe else 0)
n_stale = int(obf_universe.get("_n_stale", 0) if obf_universe else 0)
n_fresh = n_assets - n_stale
f_items.append(_item("universe",
GREEN if n_fresh >= 200 else (YELLOW if n_fresh >= 50 else RED),
f"{n_fresh}/{n_assets}"))
sym, ab = _best_fill_candidate(obf_universe)
if sym:
fill_p = float(ab.get("fill_probability", 0))
spread = float(ab.get("spread_bps", 99))
dq = float(ab.get("depth_quality", 0))
imb = float(ab.get("imbalance", 0))
depth = float(ab.get("depth_1pct_usd", 0))
# Best candidate asset
asset_color = GREEN if fill_p >= 0.80 else (YELLOW if fill_p >= 0.50 else RED)
f_items.append(_item("best", asset_color, sym[:6]))
# OBF: fill probability
f_items.append(_item("fill_p",
GREEN if fill_p >= 0.85 else (YELLOW if fill_p >= 0.60 else RED),
f"{fill_p:.2f}"))
# OBF: spread
f_items.append(_item("spread",
GREEN if spread <= 3 else (YELLOW if spread <= 8 else RED),
f"{spread:.1f}bps"))
# OBF: depth quality
f_items.append(_item("depth_q",
GREEN if dq >= 0.5 else (YELLOW if dq >= 0.1 else RED),
f"{dq:.2f}"))
# OBF: imbalance direction (SHORT needs bearish = negative)
imb_ok = imb < OB_IMBALANCE_BIAS # confirmed bearish pressure
f_items.append(_item("imb",
GREEN if imb_ok else
YELLOW if imb < 0 else
ORANGE if imb < 0.1 else RED,
f"{imb:+.2f}"))
# OBF: depth USD
f_items.append(_item("depth",
GREEN if depth >= 50_000 else (YELLOW if depth >= 10_000 else RED),
f"${depth/1000:.0f}k"))
else:
f_items.append(_item("OBF", RED, "no data"))
# ── Sizing — ACB boost × proxy_B prank ────────────────────────────────
# proxy_B prank not exposed in HZ snapshot; show ACB boost as sizing proxy
boost = float(acb.get("boost", 1.0) if acb else 1.0)
beta = float(acb.get("beta", 0.8) if acb else 0.8)
f_items.append(_item("acb_boost",
GREEN if boost >= 1.5 else (YELLOW if boost >= 1.0 else ORANGE),
f"×{boost:.2f}"))
f_items.append(_item("beta",
GREEN if beta >= 0.7 else (YELLOW if beta >= 0.4 else RED),
f"{beta:.2f}"))
# ── ORDER indicator ────────────────────────────────────────────────────
# Would an order fire if signal were green right now?
open_count = len(eng.get("open_positions") or [])
lev = float(eng.get("current_leverage", 0) or 0)
abs_c = float(eng.get("leverage_abs_cap", 9.0) or 9.0)
order_ready = (
sym is not None
and fill_p >= 0.60
and open_count == 0
and lev < abs_c
and boost > 0
) if sym else False
if order_ready:
f_items.append(f" {CYAN}{BOLD}◉ ORDER READY{RST}")
else:
f_items.append(f" {DIM}(order: waiting){RST}")
return " ".join(f_items)
def gear_rows(eng, safe, acb, exf, hb, obf_universe=None):
"""Return three formatted rows: SIGNAL, TRADE gates, FILL path."""
vel_div = float(eng.get("last_vel_div", 0) or 0)
vol_ok = bool(eng.get("vol_ok", False))
posture = safe.get("posture") or eng.get("posture") or "?"
halt = posture in ("HIBERNATE", "TURTLE")
acb_boost_val = float(acb.get("boost", acb.get("cut", 0)) or 0)
acb_ready = acb_boost_val > 0 # cut=0 means blocked
exf_ok_count = int(exf.get("_ok_count", 0) if exf else 0)
exf_ok = exf_ok_count >= 3
open_count = len(eng.get("open_positions") or [])
lev = float(eng.get("current_leverage", 0) or 0)
abs_cap = float(eng.get("leverage_abs_cap", 9.0) or 9.0)
trades_ex = int(eng.get("trades_executed") or 0)
hb_ts = hb.get("ts")
hb_ok = bool(hb_ts and (time.time() - hb_ts) < 30)
# ── SIGNAL ROW ────────────────────────────────────────────────────────────
# vol_ok is the MASTER GATE — listed first. When False, _try_entry is never
# called regardless of vel_div. BTC 50-bar realised vol must exceed p60=0.000264.
s_items = []
# BTC vol — try to get live reading from exf or obf for display context
btc_vol_str = ""
if exf:
dvol_raw = exf.get("dvol_btc") or exf.get("dvol")
fng_raw = exf.get("fng")
if dvol_raw:
btc_vol_str = f"dV:{float(dvol_raw):.0f}"
if fng_raw:
btc_vol_str += f" FnG:{float(fng_raw):.0f}"
vol_label = f"vol_ok({btc_vol_str})"
s_items.append(_item(vol_label,
GREEN if vol_ok else RED,
"" if vol_ok else f"✗ BLOCKED"))
s_items.append(_vel_item(vel_div))
# posture gate
pc = PC.get(posture, DIM)
posture_ok = posture in ("APEX", "STALKER")
s_items.append(_item("posture",
GREEN if posture == "APEX" else (YELLOW if posture == "STALKER" else RED),
posture))
# acb_ready
s_items.append(_item("acb",
GREEN if acb_ready else (ORANGE if acb_boost_val > 0 else RED),
f"{acb_boost_val:.2f}"))
# exf_ok — external factors pipeline
s_items.append(_item("exf",
GREEN if exf_ok else (YELLOW if exf_ok_count >= 1 else RED),
f"{exf_ok_count}/5"))
# halt gate
s_items.append(_item("no_halt",
GREEN if not halt else RED,
"" if not halt else "HALT"))
# heartbeat
s_items.append(_item("hb",
GREEN if hb_ok else RED,
_age(hb_ts)))
# ALL GREEN → fire indicator
all_sig = signal_fired(vel_div, vol_ok, posture, acb_ready, exf_ok, halt)
if all_sig:
s_items.append(f" {GREEN}{BOLD}◉ SIGNAL{RST}")
# ── TRADE ROW ─────────────────────────────────────────────────────────────
# Additional gates that must pass before a matched signal becomes a fill
t_items = []
# open positions
t_items.append(_item("open_pos",
GREEN if open_count == 0 else ORANGE,
str(open_count)))
# leverage headroom
lev_pct = lev / abs_cap if abs_cap else 0
t_items.append(_item("lev",
GREEN if lev_pct < 0.3 else (YELLOW if lev_pct < 0.7 else RED),
f"{lev:.2f}x/{abs_cap:.0f}"))
# regime_dd_halt
t_items.append(_item("regime",
GREEN if not halt else RED,
"free" if not halt else "HALTED"))
# Rm strength
rm = float(safe.get("Rm", 0) or 0)
t_items.append(_item("Rm",
GREEN if rm >= 0.90 else (YELLOW if rm >= 0.70 else (ORANGE if rm >= 0.50 else RED)),
f"{rm:.3f}"))
# Cat5 (intraday drawdown contribution)
c5 = float((safe.get("breakdown") or {}).get("Cat5", 1.0) or 1.0)
t_items.append(_item("Cat5",
GREEN if c5 >= 0.95 else (YELLOW if c5 >= 0.85 else (ORANGE if c5 >= 0.70 else RED)),
f"{c5:.3f}"))
# trades today
t_items.append(_item("trades",
GREEN if trades_ex < 20 else (YELLOW if trades_ex < 35 else ORANGE),
str(trades_ex)))
# ALL GREEN trade execute indicator
daily_loss_ok = c5 > 0.50 # reasonable proxy — Cat5 tracks drawdown
all_trade = all_sig and trade_can_execute(open_count, lev, abs_cap, daily_loss_ok, acb_boost_val)
if all_trade:
t_items.append(f" {CYAN}{BOLD}◉ TRADE{RST}")
sig_row = " ".join(s_items)
trade_row = " ".join(t_items)
fill = fill_row(obf_universe or {}, acb, eng)
return sig_row, trade_row, fill
def render(hz):
global START_CAP, CAP_PEAK
eng = _get(hz, "DOLPHIN_STATE_BLUE", "engine_snapshot")
cap = _get(hz, "DOLPHIN_STATE_BLUE", "capital_checkpoint")
safe = _get(hz, "DOLPHIN_SAFETY", "latest")
hb = _get(hz, "DOLPHIN_HEARTBEAT", "nautilus_flow_heartbeat")
mh = _get(hz, "DOLPHIN_META_HEALTH", "latest")
acb = _get(hz, "DOLPHIN_FEATURES", "acb_boost")
exf = _get(hz, "DOLPHIN_FEATURES", "exf_latest")
obf = _get(hz, "DOLPHIN_FEATURES", "obf_universe_latest")
esof = _get(hz, "DOLPHIN_FEATURES", "esof_advisor_latest")
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
capital = float(eng.get("capital", 0) or cap.get("capital", 0))
posture = safe.get("posture") or eng.get("posture") or "?"
rm = float(safe.get("Rm", 0))
hb_ts = hb.get("ts")
phase = hb.get("phase", "?")
trader_up = hb_ts and (time.time() - hb_ts) < 30
trades = eng.get("trades_executed", "")
scans = eng.get("scans_processed", "")
lev = float(eng.get("current_leverage", 0))
notional= float(eng.get("open_notional", 0))
mhs_st = mh.get("status", "?")
rm_meta = float(mh.get("rm_meta", 0))
if capital > 0:
if START_CAP is None: START_CAP = capital
if CAP_PEAK is None or capital > CAP_PEAK: CAP_PEAK = capital
roi = ((capital - START_CAP) / START_CAP * 100) if START_CAP and capital else 0
dd = ((CAP_PEAK - capital) / CAP_PEAK * 100) if CAP_PEAK and capital < CAP_PEAK else 0
pc = PC.get(posture, DIM)
sc = SC.get(mhs_st, DIM)
tc = GREEN if trader_up else RED
roi_c = GREEN if roi >= 0 else RED
dd_c = RED if dd > 15 else (YELLOW if dd > 5 else GREEN)
sig_row, trade_row, fill_row_str = gear_rows(eng, safe, acb, exf, hb, obf)
svc = mh.get("service_status", {})
hz_ks = mh.get("hz_key_status", {})
L = []
L.append(f"{BOLD}{CYAN}🐬 DOLPHIN-NAUTILUS{RST} {BOLD}v6{RST} {DIM}{now}{RST}")
L.append("" * 65)
# TRADER
L.append(f"{BOLD}TRADER{RST} {tc}{'● LIVE' if trader_up else '● DOWN'}{RST}"
f" phase:{phase} hb:{_age(hb_ts)}"
f" scan:#{eng.get('last_scan_number','?')}")
# ── SIGNAL → FILL GEARS ───────────────────────────────────────────────────
vol_ok_live = bool(eng.get("vol_ok", False))
if not vol_ok_live:
L.append(f" {RED}{BOLD}⛔ VOL_OK=FALSE — engine gate closed, NO trades until BTC vol > {VOL_P60:.6f}{RST}")
L.append(f" {DIM}SIG │{RST} {sig_row}")
L.append(f" {DIM}TRD │{RST} {trade_row}")
L.append(f" {DIM}FIL │{RST} {fill_row_str}")
# ── EsoF ADVISORY ─────────────────────────────────────────────────────────
if esof:
_ec = {
"FAVORABLE": GREEN, "MILD_POSITIVE": "\033[92m",
"NEUTRAL": YELLOW, "MILD_NEGATIVE": "\033[91m",
"UNFAVORABLE": RED,
}
_lbl = esof.get("advisory_label", "?")
_col = _ec.get(_lbl, DIM)
_sc = float(esof.get("advisory_score", 0))
_sess = esof.get("session", "?")
_dow = esof.get("dow_name", "?")
_slot = esof.get("slot_15m", "?")
_swr = esof.get("session_wr_pct", 0)
_dwr = esof.get("dow_wr_pct", 0)
_moon = esof.get("moon_phase", "?")[:8]
_retro= " ☿RETRO" if esof.get("mercury_retrograde") else ""
L.append(f" {DIM}EsoF│{RST} {_col}{_lbl:<13}{RST} sc:{_col}{_sc:+.3f}{RST} "
f"sess:{_sess}({_swr:.0f}%) "
f"dow:{_dow}({_dwr:.0f}%) "
f"slot:{_slot} {DIM}{_moon}{_retro}{RST}")
else:
L.append(f" {DIM}EsoF│ (start esof_advisor.py for advisory){RST}")
L.append("")
# CAPITAL
L.append(f"{BOLD}CAPITAL{RST} {CYAN}${capital:,.2f}{RST}"
+ (f" ROI:{roi_c}{roi:+.2f}%{RST} DD:{dd_c}{dd:.2f}%{RST}"
f" start:${START_CAP:,.0f}" if START_CAP else ""))
L.append(f" trades:{trades} scans:{scans} bar:{eng.get('bar_idx','?')}"
f" lev:{lev:.2f}x notional:${notional:,.0f}")
# Open positions + EXIT COMPARISON
positions = eng.get("open_positions") or []
if positions:
_pa = positions[0].get("asset", "")
_lp = 0.0; _obi = 0.0
if _pa and obf:
_oad = obf.get(_pa, {})
_ob_bid = float(_oad.get("best_bid", 0) or 0)
_ob_ask = float(_oad.get("best_ask", 0) or 0)
_lp = (_ob_bid + _ob_ask) / 2 if _ob_bid > 0 and _ob_ask > 0 else 0
_obi = float(_oad.get("imbalance", 0) or 0)
L.append(f" {BOLD}OPEN:{RST}")
for p in positions:
sc2 = GREEN if p.get("side") == "LONG" else RED
L.append(f" {sc2}{p.get('asset','?')} {p.get('side','?')}{RST}"
f" qty:{p.get('quantity',0):.4f}"
f" entry:{p.get('entry_price',0):.2f}"
f" upnl:{p.get('unrealized_pnl',0):+.2f}")
# ── EXIT COMPARISON: base engine vs V7 ──────────────────────────────
_etr = _update_exit_tracker(positions, eng, _lp, _obi)
_v7p = _v7_preview(_etr)
if _v7p:
bh = _v7p["bars_held"]
bf = _v7p["bars_frac"]
pp = _v7p["pnl_pct"]
tp_pct = abs(pp) / FIXED_TP_PCT * 100 if FIXED_TP_PCT else 0
bf_bar = round(bf * 20)
bc = GREEN if bf < 0.6 else (YELLOW if bf < 0.85 else RED)
tc = GREEN if pp >= FIXED_TP_PCT else (YELLOW if tp_pct > 50 else DIM)
vc = RED if _v7p["action"] == "EXIT" else (YELLOW if _v7p["action"] == "RETRACT" else GREEN)
ps = "+" if _v7p["proj_usd"] >= 0 else ""
L.append(f" {BOLD}EXIT CMP{RST} {DIM}bar{RST} {bc}{bh}/{MAX_HOLD_BARS} [{'\u2588'*bf_bar+'\u2591'*(20-bf_bar)}]{RST} {bf*100:.0f}%"
f" {DIM}TP{RST} {tc}{abs(pp)*100:.3f}%/{FIXED_TP_PCT*100:.2f}% ({tp_pct:.0f}%){RST}")
L.append(f" {vc}V7:{_v7p['action']:<8}{RST} P={_v7p['pressure']:.2f}"
f" mae:{_v7p['mae']:.4f} mfe:{_v7p['mfe']:.4f}"
f" {DIM}\u2192{RST} {ps}${_v7p['proj_usd']:.2f} ({pp*100:+.3f}%)"
f" {DIM}[{_v7p['reason']}]{RST}")
else:
L.append(f" {DIM}no open positions{RST}")
_EXIT_TRACKER.clear()
L.append("")
# POSTURE
bd = safe.get("breakdown") or {}
L.append(f"{BOLD}POSTURE{RST} {pc}{posture}{RST} Rm:{pc}{_bar(rm,20)}{RST} {rm:.4f}")
cats = " ".join(f"C{i}:{float(bd.get(f'Cat{i}',0)):.2f}" for i in range(1,6))
L.append(f" {cats} f_env:{float(bd.get('f_env',0)):.3f} f_exe:{float(bd.get('f_exe',0)):.3f}")
L.append("")
# SYS HEALTH
L.append(f"{BOLD}SYS HEALTH{RST} {sc}{mhs_st}{RST} rm_meta:{rm_meta:.3f}")
for m in ("m1_data_infra","m1_trader","m2_heartbeat",
"m3_data_freshness","m4_control_plane","m5_coherence"):
v = float(mh.get(m, 0))
c = GREEN if v >= 0.9 else (YELLOW if v >= 0.5 else RED)
L.append(f" {c}{m}:{v:.3f}{RST}")
L.append(f" {DIM}services:{RST} "
+ " ".join(
f"{'' if st=='RUNNING' else f'{RED}{RST}'}{DIM}{n.split(':')[-1]}{RST}"
if st == "RUNNING" else
f"{RED}{DIM}{n.split(':')[-1]}{RST}"
for n, st in sorted(svc.items())))
L.append(f" {DIM}hz_keys:{RST} "
+ " ".join(
f"{GREEN if float(i.get('score',0))>=0.9 else (YELLOW if float(i.get('score',0))>=0.5 else RED)}{RST}{DIM}{k}{RST}"
for k, i in sorted(hz_ks.items())))
# ── LAST TRADES ──────────────────────────────────────────────────────────
trades_hist = _last_n_trades(30)
if trades_hist:
L.append("")
L.append(f"{BOLD}LAST TRADES{RST} {DIM}(from log){RST}")
for t in trades_hist:
pnl = float(t.get("net_pnl", 0))
_not = float(t.get("notional", 0))
pct = (pnl / _not * 100) if _not else float(t.get("pnl_pct", 0)) * 100
lev = float(t.get("leverage", 0))
ep = float(t.get("entry_price", 0))
reason = t.get("reason", "?")
asset = t.get("asset", "?")
bars = t.get("bars_held", 0)
ts_raw = t.get("ts", "")[:16].replace("T", " ")
pc2 = GREEN if pnl >= 0 else RED
L.append(
f" {pc2}{'' if pnl>=0 else ''}{RST}"
f" {asset:<12} "
f"ep:{ep:.4g} "
f"lev:{lev:.2f}x "
f"pnl:{pc2}{pnl:+.2f}({pct:+.2f}%){RST} "
f"exit:{reason} bars:{bars} {DIM}{ts_raw}{RST}"
)
else:
L.append(f" {DIM}no completed trades in log yet{RST}")
L.append("")
L.append(f"{DIM}v6 • 0.5s poll • CH→status_snapshots • Ctrl-C quit{RST}")
# ── CH persistence ─────────────────────────────────────────────────────────
# Write every other cycle (1s effective rate) to avoid CH noise
if int(time.time() * 2) % 2 == 0:
ch_put({
"ts": int(time.time() * 1000),
"capital": capital,
"roi_pct": round(roi, 4),
"dd_pct": round(dd, 4),
"trades_executed": int(eng.get("trades_executed", 0) or 0),
"posture": posture,
"rm": round(rm, 6),
"vel_div": round(float(eng.get("last_vel_div", 0) or 0), 6),
"vol_ok": 1 if eng.get("vol_ok") else 0,
"phase": phase,
"mhs_status": mhs_st,
"boost": round(float(acb.get("boost", 1.0) if acb else 1.0), 4),
"cat5": round(float((safe.get("breakdown") or {}).get("Cat5", 1.0) or 1.0), 6),
})
return "\n".join(L)
def main():
print("Connecting to HZ...")
hz = hazelcast.HazelcastClient(
cluster_name="dolphin", cluster_members=["localhost:5701"],
connection_timeout=5.0)
print("Connected.\n")
try:
while True:
try:
sys.stdout.write(CLEAR + render(hz) + "\n")
sys.stdout.flush()
except Exception as e:
sys.stdout.write(f"\n{RED}render error: {e}{RST}\n")
sys.stdout.flush()
time.sleep(0.5)
except KeyboardInterrupt:
print(f"\n{DIM}Bye.{RST}")
finally:
hz.shutdown()
if __name__ == "__main__":
main()