Files
DOLPHIN/Observability/TUI/dolphin_tui.py
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

2655 lines
99 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
DOLPHIN-NAUTILUS Real-Time TUI Monitor
Observability/TUI/dolphin_tui.py
Usage:
python Observability/TUI/dolphin_tui.py [--hz-host HOST] [--hz-port PORT] [--log-path PATH]
"""
# ---------------------------------------------------------------------------
# Standard library
# ---------------------------------------------------------------------------
from __future__ import annotations
import argparse
import asyncio
import json
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
# ---------------------------------------------------------------------------
# Third-party
# ---------------------------------------------------------------------------
import httpx
from textual.app import App, ComposeResult
from textual.containers import Horizontal
from textual.widgets import Static
try:
from textual.widgets import VerticalScroll
except ImportError:
from textual.containers import VerticalScroll # textual >= 0.47
try:
import hazelcast
HAZELCAST_AVAILABLE = True
except ImportError:
HAZELCAST_AVAILABLE = False
# ---------------------------------------------------------------------------
# Color / theme constants
# ---------------------------------------------------------------------------
POSTURE_COLORS: dict[str, str] = {
"APEX": "green",
"STALKER": "yellow",
"TURTLE": "dark_orange",
"HIBERNATE": "red",
}
STATUS_COLORS: dict[str, str] = {
"GREEN": "green",
"DEGRADED": "yellow",
"CRITICAL": "dark_orange",
"DEAD": "red",
}
# Staleness thresholds (seconds)
STALE_WARN: int = 15 # yellow
STALE_DEAD: int = 60 # red
# ---------------------------------------------------------------------------
# Hazelcast map names
# ---------------------------------------------------------------------------
HZ_MAP_FEATURES = "DOLPHIN_FEATURES"
HZ_MAP_SAFETY = "DOLPHIN_SAFETY"
HZ_MAP_STATE_BLUE = "DOLPHIN_STATE_BLUE"
HZ_MAP_HEARTBEAT = "DOLPHIN_HEARTBEAT"
HZ_MAP_META_HEALTH = "DOLPHIN_META_HEALTH"
HZ_MAP_PNL_BLUE = "DOLPHIN_PNL_BLUE"
# Shard map name template — format with shard index 00-09
HZ_MAP_SHARD_TMPL = "DOLPHIN_FEATURES_SHARD_{:02d}"
HZ_SHARD_COUNT = 10 # shards 00-09
# ---------------------------------------------------------------------------
# Hazelcast key names
# ---------------------------------------------------------------------------
HZ_KEY_EIGEN_SCAN = "latest_eigen_scan"
HZ_KEY_ACB_BOOST = "acb_boost"
HZ_KEY_EXF_LATEST = "exf_latest"
HZ_KEY_ESOF_LATEST = "esof_latest"
HZ_KEY_SAFETY_LATEST = "latest"
HZ_KEY_STATE_LATEST = "latest"
HZ_KEY_STATE_NAUT = "latest_nautilus"
HZ_KEY_HEARTBEAT = "nautilus_flow_heartbeat"
HZ_KEY_META_LATEST = "latest"
# ---------------------------------------------------------------------------
# Prefect API
# ---------------------------------------------------------------------------
PREFECT_BASE_URL = "http://dolphin.taile8ad92.ts.net:4200"
PREFECT_TIMEOUT_S = 2.0
# ---------------------------------------------------------------------------
# Poll / reconnect settings
# ---------------------------------------------------------------------------
POLL_INTERVAL_S = 2.0
RECONNECT_INIT_S = 5.0
RECONNECT_MULT = 1.5
RECONNECT_MAX_S = 60.0
HZ_CONNECT_TIMEOUT_S = 2.0
# ---------------------------------------------------------------------------
# Log tail defaults
# ---------------------------------------------------------------------------
LOG_DEFAULT_PATH = "run_logs/meta_health.log"
LOG_TAIL_LINES = 50
LOG_TAIL_CHUNK_BYTES = 32_768 # 32 KB — enough for 50 lines in most cases
# ---------------------------------------------------------------------------
# Terminal size requirements
# ---------------------------------------------------------------------------
MIN_TERM_WIDTH = 120
MIN_TERM_HEIGHT = 30
# ---------------------------------------------------------------------------
# Helper Utilities
# ---------------------------------------------------------------------------
def color_age(age_s: float | None) -> tuple[str, str]:
"""Return (color_class, display_text) for a data-age value in seconds.
Postconditions:
age_s is None → ("dim", "N/A")
age_s < STALE_WARN → ("green", f"{age_s:.1f}s")
age_s < STALE_DEAD → ("yellow", f"{age_s:.1f}s")
age_s >= STALE_DEAD → ("red", f"{age_s:.1f}s")
"""
if age_s is None:
return ("dim", "N/A")
if age_s < STALE_WARN:
return ("green", f"{age_s:.1f}s")
if age_s < STALE_DEAD:
return ("yellow", f"{age_s:.1f}s")
return ("red", f"{age_s:.1f}s")
def rm_bar(rm: float | None, width: int = 20) -> str:
"""Return an ASCII progress bar string representing a risk-management ratio.
Args:
rm: Ratio in [0.0, 1.0], or None.
width: Total number of bar characters (default 20).
Returns:
``"--"`` when *rm* is None, otherwise a string of the form::
[████████░░░░░░░░░░░░] 0.40
Postconditions:
rm is None → "--"
rm in [0.0, 1.0] → len(result) == width + 4
filled = int(rm * width) → first *filled* chars inside brackets are ""
remaining chars → "" * (width - filled)
suffix → f"] {rm:.2f}"
"""
if rm is None:
return "--"
filled = int(rm * width)
return "[" + "" * filled + "" * (width - filled) + f"] {rm:.2f}"
def fmt_float(v: float | None, decimals: int = 4) -> str:
"""Format a float value for display, returning ``"--"`` when *v* is None.
Args:
v: The value to format, or None.
decimals: Number of decimal places (default 4).
Returns:
``"--"`` when *v* is None, otherwise ``f"{v:.{decimals}f}"``.
"""
if v is None:
return "--"
return f"{v:.{decimals}f}"
def fmt_pnl(v: float | None) -> tuple[str, str]:
"""Format a PnL value for display with sign and currency notation.
Args:
v: The PnL value in dollars, or None.
Returns:
A ``(color, text)`` tuple where *color* is ``"green"``, ``"red"``, or
``"white"`` and *text* is ``"--"`` when *v* is None, otherwise
``"+$X,XXX.XX"`` for positive values or ``"-$X,XXX.XX"`` for negative.
"""
if v is None:
return ("white", "--")
if v > 0:
return ("green", f"+${v:,.2f}")
if v < 0:
return ("red", f"-${abs(v):,.2f}")
return ("white", f"${v:,.2f}")
def posture_color(posture: str | None) -> str:
"""Return the color class for a posture string.
Args:
posture: One of ``"APEX"``, ``"STALKER"``, ``"TURTLE"``,
``"HIBERNATE"``, or any unknown/None value.
Returns:
A color class string from :data:`POSTURE_COLORS`, or ``"dim"`` if
*posture* is None or not found in the map.
"""
if posture is None:
return "dim"
return POSTURE_COLORS.get(posture, "dim")
def status_color(status: str | None) -> str:
"""Return the color class for a meta-health status string.
Args:
status: One of ``"GREEN"``, ``"DEGRADED"``, ``"CRITICAL"``,
``"DEAD"``, or any unknown/None value.
Returns:
A color class string from :data:`STATUS_COLORS`, or ``"dim"`` if
*status* is None or not found in the map.
"""
if status is None:
return "dim"
return STATUS_COLORS.get(status, "dim")
# ---------------------------------------------------------------------------
# CLI argument parsing
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments for the TUI monitor."""
parser = argparse.ArgumentParser(
description="DOLPHIN-NAUTILUS Real-Time TUI Monitor",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--hz-host",
default="dolphin.taile8ad92.ts.net",
metavar="HOST",
help="Hazelcast cluster host",
)
parser.add_argument(
"--hz-port",
type=int,
default=5701,
metavar="PORT",
help="Hazelcast cluster port",
)
parser.add_argument(
"--log-path",
default=LOG_DEFAULT_PATH,
metavar="PATH",
help="Path to log file for the log tail panel",
)
parser.add_argument(
"--mock",
action="store_true",
default=False,
help="Populate all panels with mock data (no HZ needed — for UI testing)",
)
return parser.parse_args()
# ---------------------------------------------------------------------------
# DataSnapshot — immutable snapshot of all polled data for one render cycle
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class DataSnapshot:
"""Immutable snapshot of all data sources for a single TUI render cycle.
All fields default to None / empty so that a partially-populated snapshot
never causes a KeyError or AttributeError in the UI layer.
"""
# Wall-clock time this snapshot was assembled
ts: float = field(default_factory=time.time)
# Connectivity flags
hz_connected: bool = False
prefect_connected: bool = False
# ------------------------------------------------------------------
# DOLPHIN_FEATURES["latest_eigen_scan"]
# ------------------------------------------------------------------
scan_number: int | None = None
vel_div: float | None = None
w50_velocity: float | None = None
w750_velocity: float | None = None
instability_50: float | None = None
asset_prices: dict[str, float] = field(default_factory=dict)
scan_bridge_ts: str | None = None # ISO-8601 timestamp string
scan_age_s: float | None = None # seconds since bridge_ts
# ------------------------------------------------------------------
# DOLPHIN_FEATURES["acb_boost"]
# ------------------------------------------------------------------
acb_boost: float | None = None
acb_beta: float | None = None
# ------------------------------------------------------------------
# DOLPHIN_FEATURES["exf_latest"] (ExtF)
# ------------------------------------------------------------------
funding_btc: float | None = None
dvol_btc: float | None = None
fng: float | None = None
taker: float | None = None
vix: float | None = None
ls_btc: float | None = None
acb_ready: bool | None = None
acb_present: str | None = None # e.g. "9/9"
exf_age_s: float | None = None
# ------------------------------------------------------------------
# DOLPHIN_FEATURES["esof_latest"] (EsoF)
# ------------------------------------------------------------------
moon_phase: str | None = None
mercury_retro: bool | None = None
liquidity_session: str | None = None
market_cycle_pos: float | None = None
esof_age_s: float | None = None
# ------------------------------------------------------------------
# DOLPHIN_SAFETY["latest"]
# ------------------------------------------------------------------
posture: str | None = None # APEX / STALKER / TURTLE / HIBERNATE
rm: float | None = None # 0.0 1.0
cat1: float | None = None
cat2: float | None = None
cat3: float | None = None
cat4: float | None = None
cat5: float | None = None
# ------------------------------------------------------------------
# DOLPHIN_STATE_BLUE["latest"] (Blue strategy)
# ------------------------------------------------------------------
capital: float | None = None
drawdown: float | None = None
peak_capital: float | None = None
pnl: float | None = None
trades: int | None = None
# ------------------------------------------------------------------
# DOLPHIN_STATE_BLUE["latest_nautilus"] (Nautilus engine)
# ------------------------------------------------------------------
nautilus_capital: float | None = None
nautilus_pnl: float | None = None
nautilus_trades: int | None = None
nautilus_posture: str | None = None
nautilus_param_hash: str | None = None
# ------------------------------------------------------------------
# DOLPHIN_HEARTBEAT["nautilus_flow_heartbeat"]
# ------------------------------------------------------------------
heartbeat_ts: float | None = None
heartbeat_phase: str | None = None
heartbeat_flow: str | None = None
heartbeat_age_s: float | None = None
# ------------------------------------------------------------------
# DOLPHIN_META_HEALTH["latest"]
# ------------------------------------------------------------------
meta_rm: float | None = None
meta_status: str | None = None # GREEN / DEGRADED / CRITICAL / DEAD
m1_proc: float | None = None
m2_heartbeat: float | None = None
m3_data: float | None = None
m4_cp: float | None = None
m5_coh: float | None = None
# ------------------------------------------------------------------
# OBF shards (top-5 assets by absolute imbalance)
# Each entry: {asset, imbalance, fill_prob, depth_quality}
# ------------------------------------------------------------------
obf_top: list[dict[str, Any]] = field(default_factory=list)
# ------------------------------------------------------------------
# Prefect
# ------------------------------------------------------------------
prefect_healthy: bool = False
prefect_flows: list[dict[str, Any]] = field(default_factory=list)
# ------------------------------------------------------------------
# Log tail (last N lines from meta_health.log)
# ------------------------------------------------------------------
log_lines: list[str] = field(default_factory=list)
# ---------------------------------------------------------------------------
# DolphinDataFetcher — async data retrieval from Hazelcast and Prefect
# ---------------------------------------------------------------------------
class DolphinDataFetcher:
"""Encapsulates all data retrieval from Hazelcast and Prefect.
Runs in a background async worker. Returns a DataSnapshot on each poll cycle.
The __init__ only sets up state — actual connection happens in connect_hz().
"""
def __init__(
self,
hz_host: str = "dolphin.taile8ad92.ts.net",
hz_port: int = 5701,
log_path: str = "run_logs/meta_health.log",
) -> None:
# Hazelcast connection config
self.hz_host = hz_host
self.hz_port = hz_port
# Hazelcast client state
self.hz_client = None # HazelcastClient instance after connect
self.hz_connected: bool = False
# Reconnect state tracking
self._running: bool = True # set False on shutdown to stop reconnect loop
self._reconnect_task = None # asyncio Task handle
self._reconnect_backoff: float = 5.0 # current backoff delay (s)
self._reconnect_backoff_initial: float = 5.0 # reset value on success
self._reconnect_backoff_max: float = 60.0 # cap
self._reconnect_backoff_multiplier: float = 1.5
# Log path
self.log_path = log_path
async def connect_hz(self) -> bool:
"""Attempt to connect to Hazelcast with a 2-second timeout.
Returns True on success, False on failure. Never raises.
"""
try:
loop = asyncio.get_event_loop()
client = await loop.run_in_executor(
None,
lambda: hazelcast.HazelcastClient(
cluster_name="dolphin",
cluster_members=[f"{self.hz_host}:{self.hz_port}"],
connection_timeout=HZ_CONNECT_TIMEOUT_S,
async_start=False,
),
)
self.hz_client = client
self.hz_connected = True
return True
except Exception:
self.hz_connected = False
self._start_reconnect()
return False
async def _get_hz_map(self, map_name: str):
"""Safe async map getter — returns a blocking IMap proxy or None on any exception."""
try:
loop = asyncio.get_event_loop()
hz_map = await loop.run_in_executor(
None,
lambda: self.hz_client.get_map(map_name).blocking(),
)
return hz_map
except Exception:
return None
def _parse_scan(self, raw_json: str | None) -> dict:
"""Extract all latest_eigen_scan fields from raw JSON string.
Returns a dict with scan fields, all None on failure.
"""
_none = dict(
scan_number=None,
vel_div=None,
w50_velocity=None,
w750_velocity=None,
instability_50=None,
asset_prices={},
scan_bridge_ts=None,
scan_age_s=None,
)
if raw_json is None:
return _none
try:
data = json.loads(raw_json)
except (json.JSONDecodeError, TypeError) as exc:
print(f"[WARN] _parse_scan: malformed JSON — {exc}", file=__import__("sys").stderr)
return _none
if not isinstance(data, dict):
print(f"[WARN] _parse_scan: expected JSON object, got {type(data).__name__}", file=__import__("sys").stderr)
return _none
bridge_ts: str | None = data.get("bridge_ts")
scan_age_s: float | None = None
if bridge_ts is not None:
try:
scan_age_s = time.time() - datetime.fromisoformat(bridge_ts).timestamp()
except (ValueError, OSError):
scan_age_s = None
# Support both flat schema (NG8 direct write) and nested NG7 schema
# NG7: fields are under data["result"], NG8: fields are at top level
result = data.get("result", {}) if isinstance(data.get("result"), dict) else {}
flat = data # NG8 direct write puts vel_div etc. at top level
def _get(key: str, default=None):
"""Try flat first, then nested result."""
v = flat.get(key)
if v is not None:
return v
return result.get(key, default)
# vel_div: flat in NG8, or compute from multi_window_results in NG7
vel_div = _get("vel_div")
if vel_div is None:
mwr = result.get("multi_window_results", {})
w50 = mwr.get("50", {}).get("tracking_data", {})
w150 = mwr.get("150", {}).get("tracking_data", {})
v50 = w50.get("lambda_max_velocity")
v150 = w150.get("lambda_max_velocity")
if v50 is not None and v150 is not None:
vel_div = float(v50) - float(v150)
# w50/w750 velocity
w50_velocity = _get("w50_velocity")
if w50_velocity is None:
mwr = result.get("multi_window_results", {})
w50_velocity = mwr.get("50", {}).get("tracking_data", {}).get("lambda_max_velocity")
w750_velocity = _get("w750_velocity")
if w750_velocity is None:
mwr = result.get("multi_window_results", {})
w750_velocity = mwr.get("750", {}).get("tracking_data", {}).get("lambda_max_velocity")
# instability_50 — from regime_prediction or multi_window_results
instability_50 = _get("instability_50")
if instability_50 is None:
rp = result.get("regime_prediction", {})
mwi = rp.get("multi_window_instabilities", {})
instability_50 = mwi.get("50") or mwi.get(50)
# bridge_ts — try top level then result
if bridge_ts is None:
bridge_ts = result.get("bridge_ts") or result.get("timestamp")
return dict(
scan_number=_get("scan_number"),
vel_div=vel_div,
w50_velocity=w50_velocity,
w750_velocity=w750_velocity,
instability_50=instability_50,
asset_prices=_get("asset_prices") or {},
scan_bridge_ts=bridge_ts,
scan_age_s=scan_age_s,
)
def _parse_safety(self, raw_json: str | None) -> dict:
"""Extract posture, Rm, and Cat1-Cat5 from DOLPHIN_SAFETY["latest"].
Returns a dict with safety fields, all None on failure.
"""
_none = dict(
posture=None,
rm=None,
cat1=None,
cat2=None,
cat3=None,
cat4=None,
cat5=None,
)
if raw_json is None:
return _none
try:
data = json.loads(raw_json)
except (json.JSONDecodeError, TypeError) as exc:
print(f"[WARN] _parse_safety: malformed JSON — {exc}", file=__import__("sys").stderr)
return _none
if not isinstance(data, dict):
print(f"[WARN] _parse_safety: expected JSON object, got {type(data).__name__}", file=__import__("sys").stderr)
return _none
return dict(
posture=data.get("posture"),
rm=data.get("Rm"),
cat1=data.get("Cat1"),
cat2=data.get("Cat2"),
cat3=data.get("Cat3"),
cat4=data.get("Cat4"),
cat5=data.get("Cat5"),
)
async def _parse_state(self) -> dict:
"""Extract capital/pnl/trades from DOLPHIN_STATE_BLUE["latest"] and ["latest_nautilus"].
Reads both keys from Hazelcast, parses JSON, and returns a flat dict.
Any missing key or parse error returns None for that field — never raises.
"""
result = dict(
capital=None,
drawdown=None,
peak_capital=None,
pnl=None,
trades=None,
nautilus_capital=None,
nautilus_pnl=None,
nautilus_trades=None,
nautilus_posture=None,
nautilus_param_hash=None,
)
hz_map = await self._get_hz_map(HZ_MAP_STATE_BLUE)
if hz_map is None:
return result
# --- DOLPHIN_STATE_BLUE["capital_checkpoint"] or ["latest"] ---
# Live system uses "capital_checkpoint"; legacy used "latest"
for key_latest in ("latest", "capital_checkpoint"):
try:
loop = asyncio.get_event_loop()
raw_latest = await loop.run_in_executor(
None,
lambda k=key_latest: hz_map.get(k),
)
if raw_latest is not None:
data = json.loads(raw_latest)
result["capital"] = data.get("capital")
result["drawdown"] = data.get("drawdown")
result["peak_capital"] = data.get("peak_capital")
result["pnl"] = data.get("pnl")
result["trades"] = data.get("trades")
break # use first key that has data
except (json.JSONDecodeError, TypeError) as exc:
print(f"[WARN] _parse_state {key_latest}: malformed JSON — {exc}", file=__import__("sys").stderr)
except Exception as exc:
print(f"[WARN] _parse_state {key_latest}: {exc}", file=__import__("sys").stderr)
# --- DOLPHIN_STATE_BLUE["engine_snapshot"] or ["latest_nautilus"] ---
# Live system uses "engine_snapshot"; legacy used "latest_nautilus"
for key_naut in ("latest_nautilus", "engine_snapshot"):
try:
loop = asyncio.get_event_loop()
raw_naut = await loop.run_in_executor(
None,
lambda k=key_naut: hz_map.get(k),
)
if raw_naut is not None:
data = json.loads(raw_naut)
result["nautilus_capital"] = data.get("capital")
result["nautilus_pnl"] = data.get("pnl")
result["nautilus_trades"] = data.get("trades") or data.get("trades_executed")
result["nautilus_posture"] = data.get("posture")
result["nautilus_param_hash"] = data.get("param_hash")
break
except (json.JSONDecodeError, TypeError) as exc:
print(f"[WARN] _parse_state {key_naut}: malformed JSON — {exc}", file=__import__("sys").stderr)
except Exception as exc:
print(f"[WARN] _parse_state {key_naut}: {exc}", file=__import__("sys").stderr)
return result
async def _parse_extf(self) -> dict:
"""Extract all ExtF fields + age from DOLPHIN_FEATURES["exf_latest"].
Returns a dict with extf fields, all None on any failure.
"""
_none = dict(
funding_btc=None,
dvol_btc=None,
fng=None,
taker=None,
vix=None,
ls_btc=None,
acb_ready=None,
acb_present=None,
exf_age_s=None,
)
try:
hz_map = await self._get_hz_map(HZ_MAP_FEATURES)
if hz_map is None:
return _none
loop = asyncio.get_event_loop()
raw_json = await loop.run_in_executor(
None,
lambda: hz_map.get(HZ_KEY_EXF_LATEST),
)
if raw_json is None:
return _none
data = json.loads(raw_json)
pushed_at: str | None = data.get("_pushed_at")
exf_age_s: float | None = None
if pushed_at is not None:
try:
ts_str = pushed_at.replace("Z", "+00:00")
exf_age_s = time.time() - datetime.fromisoformat(ts_str).timestamp()
except (ValueError, OSError):
exf_age_s = None
return dict(
funding_btc=data.get("funding_btc"),
dvol_btc=data.get("dvol_btc"),
fng=data.get("fng"),
taker=data.get("taker"),
vix=data.get("vix"),
ls_btc=data.get("ls_btc"),
acb_ready=data.get("_acb_ready"),
acb_present=data.get("_acb_present"),
exf_age_s=exf_age_s,
)
except Exception as exc:
print(f"[WARN] _parse_extf: {exc}", file=__import__("sys").stderr)
return _none
async def _parse_esof(self) -> dict:
"""Extract all EsoF fields + age from DOLPHIN_FEATURES["esof_latest"].
Returns a dict with esof fields, all None on any failure.
"""
_none = dict(
moon_phase=None,
mercury_retro=None,
liquidity_session=None,
market_cycle_pos=None,
esof_age_s=None,
)
try:
hz_map = await self._get_hz_map(HZ_MAP_FEATURES)
if hz_map is None:
return _none
loop = asyncio.get_event_loop()
raw_json = await loop.run_in_executor(
None,
lambda: hz_map.get(HZ_KEY_ESOF_LATEST),
)
if raw_json is None:
return _none
try:
data = json.loads(raw_json)
except (json.JSONDecodeError, TypeError) as exc:
print(f"[WARN] _parse_esof: malformed JSON — {exc}", file=__import__("sys").stderr)
return _none
pushed_at: str | None = data.get("_pushed_at")
esof_age_s: float | None = None
if pushed_at is not None:
try:
ts_str = pushed_at.replace("Z", "+00:00")
esof_age_s = time.time() - datetime.fromisoformat(ts_str).timestamp()
except (ValueError, OSError):
esof_age_s = None
return dict(
moon_phase=data.get("moon_phase_name"),
mercury_retro=data.get("mercury_retrograde"),
liquidity_session=data.get("liquidity_session"),
market_cycle_pos=data.get("market_cycle_position"),
esof_age_s=esof_age_s,
)
except Exception as exc:
print(f"[WARN] _parse_esof: {exc}", file=__import__("sys").stderr)
return _none
async def _parse_meta_health(self) -> dict:
"""Extract rm_meta, status, and M1-M5 scores from DOLPHIN_META_HEALTH["latest"].
Returns a dict with meta health fields, all None on any failure.
"""
_none = dict(
meta_rm=None,
meta_status=None,
m1_proc=None,
m2_heartbeat=None,
m3_data=None,
m4_cp=None,
m5_coh=None,
)
try:
hz_map = await self._get_hz_map(HZ_MAP_META_HEALTH)
if hz_map is None:
return _none
loop = asyncio.get_event_loop()
raw_json = await loop.run_in_executor(
None,
lambda: hz_map.get(HZ_KEY_META_LATEST),
)
if raw_json is None:
return _none
try:
data = json.loads(raw_json)
except (json.JSONDecodeError, TypeError) as exc:
print(f"[WARN] _parse_meta_health: malformed JSON — {exc}", file=__import__("sys").stderr)
return _none
return dict(
meta_rm=data.get("rm_meta"),
meta_status=data.get("status"),
m1_proc=data.get("m1_proc"),
m2_heartbeat=data.get("m2_heartbeat"),
m3_data=data.get("m3_data_freshness"),
m4_cp=data.get("m4_control_plane"),
m5_coh=data.get("m5_coherence"),
)
except Exception as exc:
print(f"[WARN] _parse_meta_health: {exc}", file=__import__("sys").stderr)
return _none
async def _parse_daily_pnl(self) -> dict | None:
"""Extract pnl, capital, trades, boost, mc_status from DOLPHIN_PNL_BLUE[YYYY-MM-DD].
Returns a dict with daily pnl fields, or None if the key is absent or on any failure.
"""
try:
hz_map = await self._get_hz_map(HZ_MAP_PNL_BLUE)
if hz_map is None:
return None
date_key = datetime.now(timezone.utc).strftime("%Y-%m-%d")
loop = asyncio.get_event_loop()
raw_json = await loop.run_in_executor(
None,
lambda: hz_map.get(date_key),
)
if raw_json is None:
return None
try:
data = json.loads(raw_json)
except (json.JSONDecodeError, TypeError) as exc:
print(f"[WARN] _parse_daily_pnl: malformed JSON — {exc}", file=__import__("sys").stderr)
return None
return dict(
pnl=data.get("pnl"),
capital=data.get("capital"),
trades=data.get("trades"),
boost=data.get("boost"),
mc_status=data.get("mc_status"),
)
except Exception as exc:
print(f"[WARN] _parse_daily_pnl: {exc}", file=__import__("sys").stderr)
return None
def _parse_heartbeat(self, raw: str | None) -> dict:
"""Extract ts, phase, flow and compute heartbeat_age_s from DOLPHIN_HEARTBEAT["nautilus_flow_heartbeat"].
Returns a dict with heartbeat fields, all None on failure.
"""
_none = dict(
heartbeat_ts=None,
heartbeat_phase=None,
heartbeat_flow=None,
heartbeat_age_s=None,
)
if raw is None:
return _none
try:
data = json.loads(raw)
ts: float = data["ts"]
return dict(
heartbeat_ts=ts,
heartbeat_phase=data.get("phase"),
heartbeat_flow=data.get("flow"),
heartbeat_age_s=time.time() - ts,
)
except Exception as exc:
print(f"[WARN] _parse_heartbeat: {exc}", file=__import__("sys").stderr)
return _none
async def _sample_obf_shards(self) -> list[dict]:
"""Read OBF data from DOLPHIN_FEATURES asset_*_ob keys.
Live system stores OBF as asset_BTCUSDT_ob etc. in DOLPHIN_FEATURES,
not in DOLPHIN_FEATURES_SHARD_* maps (which are empty).
Returns top 5 assets by abs(imbalance).
"""
if not self.hz_connected:
return []
assets: list[dict] = []
try:
hz_map = await self._get_hz_map(HZ_MAP_FEATURES)
if hz_map is None:
return []
loop = asyncio.get_event_loop()
all_keys = await loop.run_in_executor(
None,
lambda m=hz_map: m.key_set(),
)
ob_keys = [k for k in all_keys if k.startswith("asset_") and k.endswith("_ob")]
for key in ob_keys[:50]: # cap at 50 to avoid slow scans
try:
raw_json = await loop.run_in_executor(
None,
lambda m=hz_map, k=key: m.get(k),
)
if raw_json is None:
continue
data = json.loads(raw_json)
imbalance = data.get("imbalance")
if imbalance is None:
continue
asset_name = key[len("asset_"):-len("_ob")]
fill_prob = data.get("fill_prob") or data.get("fill_probability")
depth_quality = data.get("depth_quality")
assets.append(dict(
asset=asset_name,
imbalance=imbalance,
fill_prob=fill_prob,
depth_quality=depth_quality,
))
except Exception as exc:
print(f"[WARN] _sample_obf key {key}: {exc}", file=__import__("sys").stderr)
continue
except Exception as exc:
print(f"[WARN] _sample_obf_shards: {exc}", file=__import__("sys").stderr)
if not assets:
return []
assets.sort(key=lambda x: abs(x["imbalance"]), reverse=True)
return assets[:5]
async def _reconnect_loop(self) -> None:
"""Background task: retry HZ connection with exponential backoff.
Initial delay = RECONNECT_INIT_S (5s), multiplier = RECONNECT_MULT (1.5x),
cap = RECONNECT_MAX_S (60s). Resets backoff to initial on success.
Exits once connected — the main fetch() will restart it if needed.
"""
import sys
self._reconnect_backoff = self._reconnect_backoff_initial
while self.hz_client is None and self._running:
await asyncio.sleep(self._reconnect_backoff)
if not self._running:
break
try:
loop = asyncio.get_event_loop()
client = await loop.run_in_executor(
None,
lambda: hazelcast.HazelcastClient(
cluster_name="dolphin",
cluster_members=[f"{self.hz_host}:{self.hz_port}"],
connection_timeout=HZ_CONNECT_TIMEOUT_S,
async_start=False,
),
)
self.hz_client = client
self.hz_connected = True
self._reconnect_backoff = self._reconnect_backoff_initial # reset on success
print("[INFO] _reconnect_loop: reconnected to Hazelcast", file=sys.stderr)
except Exception:
self._reconnect_backoff = min(
self._reconnect_backoff * self._reconnect_backoff_multiplier,
self._reconnect_backoff_max,
)
def _start_reconnect(self) -> None:
"""Start the reconnect background task if not already running."""
if not HAZELCAST_AVAILABLE:
return
# Only start a new task if there isn't one already running
if self._reconnect_task is None or self._reconnect_task.done():
try:
loop = asyncio.get_event_loop()
self._reconnect_task = loop.create_task(self._reconnect_loop())
except RuntimeError:
pass # No running event loop — will be started later
def start_reconnect_task(self) -> None:
"""Public method to launch _reconnect_loop() as a background asyncio task.
Stores the task reference in self._reconnect_task for cleanup.
No-op if Hazelcast is not available or a task is already running.
"""
self._start_reconnect()
async def disconnect_hz(self) -> None:
"""Shut down the Hazelcast client cleanly. Never raises."""
self._running = False
# Cancel the reconnect background task if it's still running
if self._reconnect_task is not None and not self._reconnect_task.done():
self._reconnect_task.cancel()
try:
await self._reconnect_task
except (asyncio.CancelledError, Exception):
pass
try:
if self.hz_client is not None:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.hz_client.shutdown)
except Exception:
pass
finally:
self.hz_client = None
self.hz_connected = False
async def fetch(self) -> DataSnapshot:
"""Orchestrate all _parse_* calls and assemble a DataSnapshot.
Always returns a DataSnapshot — never raises. If hz_client is None,
all HZ-derived fields are None and hz_connected=False.
"""
import sys
ts = time.time()
# ------------------------------------------------------------------
# Short-circuit: no HZ client available
# ------------------------------------------------------------------
if self.hz_client is None or not self.hz_connected:
# Ensure reconnect loop is running
self._start_reconnect()
prefect_healthy, prefect_flows = await self.fetch_prefect()
log_lines = self.tail_log(self.log_path, LOG_TAIL_LINES)
return DataSnapshot(
ts=ts,
hz_connected=False,
prefect_connected=prefect_healthy,
prefect_healthy=prefect_healthy,
prefect_flows=prefect_flows,
log_lines=log_lines,
)
# ------------------------------------------------------------------
# Fetch raw values from HZ maps that need raw JSON passed to sync parsers
# ------------------------------------------------------------------
# --- DOLPHIN_FEATURES["latest_eigen_scan"] ---
raw_scan: str | None = None
try:
hz_features = await self._get_hz_map(HZ_MAP_FEATURES)
if hz_features is not None:
loop = asyncio.get_event_loop()
raw_scan = await loop.run_in_executor(
None,
lambda: hz_features.get(HZ_KEY_EIGEN_SCAN),
)
except Exception as exc:
print(f"[WARN] fetch: get latest_eigen_scan: {exc}", file=sys.stderr)
# --- DOLPHIN_SAFETY["latest"] ---
raw_safety: str | None = None
try:
hz_safety = await self._get_hz_map(HZ_MAP_SAFETY)
if hz_safety is not None:
loop = asyncio.get_event_loop()
raw_safety = await loop.run_in_executor(
None,
lambda: hz_safety.get(HZ_KEY_SAFETY_LATEST),
)
except Exception as exc:
print(f"[WARN] fetch: get safety latest: {exc}", file=sys.stderr)
# --- DOLPHIN_HEARTBEAT["nautilus_flow_heartbeat"] ---
raw_heartbeat: str | None = None
try:
hz_heartbeat = await self._get_hz_map(HZ_MAP_HEARTBEAT)
if hz_heartbeat is not None:
loop = asyncio.get_event_loop()
raw_heartbeat = await loop.run_in_executor(
None,
lambda: hz_heartbeat.get(HZ_KEY_HEARTBEAT),
)
except Exception as exc:
print(f"[WARN] fetch: get heartbeat: {exc}", file=sys.stderr)
# ------------------------------------------------------------------
# Run all async parsers + Prefect concurrently
# ------------------------------------------------------------------
(
state_d,
extf_d,
esof_d,
meta_d,
obf_top,
(prefect_healthy, prefect_flows),
) = await asyncio.gather(
self._parse_state(),
self._parse_extf(),
self._parse_esof(),
self._parse_meta_health(),
self._sample_obf_shards(),
self.fetch_prefect(),
return_exceptions=False,
)
# ------------------------------------------------------------------
# Run sync parsers (they only parse already-fetched raw strings)
# ------------------------------------------------------------------
scan_d: dict = {}
try:
scan_d = self._parse_scan(raw_scan)
except Exception as exc:
print(f"[WARN] fetch: _parse_scan: {exc}", file=sys.stderr)
scan_d = dict(
scan_number=None, vel_div=None, w50_velocity=None,
w750_velocity=None, instability_50=None, asset_prices={},
scan_bridge_ts=None, scan_age_s=None,
)
safety_d: dict = {}
try:
safety_d = self._parse_safety(raw_safety)
except Exception as exc:
print(f"[WARN] fetch: _parse_safety: {exc}", file=sys.stderr)
safety_d = dict(posture=None, rm=None, cat1=None, cat2=None,
cat3=None, cat4=None, cat5=None)
heartbeat_d: dict = {}
try:
heartbeat_d = self._parse_heartbeat(raw_heartbeat)
except Exception as exc:
print(f"[WARN] fetch: _parse_heartbeat: {exc}", file=sys.stderr)
heartbeat_d = dict(heartbeat_ts=None, heartbeat_phase=None,
heartbeat_flow=None, heartbeat_age_s=None)
# ------------------------------------------------------------------
# Log tail (sync, fast)
# ------------------------------------------------------------------
log_lines: list[str] = []
try:
log_lines = self.tail_log(self.log_path, LOG_TAIL_LINES)
except Exception as exc:
print(f"[WARN] fetch: tail_log: {exc}", file=sys.stderr)
log_lines = [f"Log error: {exc}"]
# ------------------------------------------------------------------
# Assemble and return DataSnapshot
# ------------------------------------------------------------------
return DataSnapshot(
ts=ts,
hz_connected=True,
prefect_connected=prefect_healthy,
# scan
scan_number=scan_d.get("scan_number"),
vel_div=scan_d.get("vel_div"),
w50_velocity=scan_d.get("w50_velocity"),
w750_velocity=scan_d.get("w750_velocity"),
instability_50=scan_d.get("instability_50"),
asset_prices=scan_d.get("asset_prices") or {},
scan_bridge_ts=scan_d.get("scan_bridge_ts"),
scan_age_s=scan_d.get("scan_age_s"),
# safety
posture=safety_d.get("posture"),
rm=safety_d.get("rm"),
cat1=safety_d.get("cat1"),
cat2=safety_d.get("cat2"),
cat3=safety_d.get("cat3"),
cat4=safety_d.get("cat4"),
cat5=safety_d.get("cat5"),
# state (blue + nautilus)
capital=state_d.get("capital"),
drawdown=state_d.get("drawdown"),
peak_capital=state_d.get("peak_capital"),
pnl=state_d.get("pnl"),
trades=state_d.get("trades"),
nautilus_capital=state_d.get("nautilus_capital"),
nautilus_pnl=state_d.get("nautilus_pnl"),
nautilus_trades=state_d.get("nautilus_trades"),
nautilus_posture=state_d.get("nautilus_posture"),
nautilus_param_hash=state_d.get("nautilus_param_hash"),
# extf
funding_btc=extf_d.get("funding_btc"),
dvol_btc=extf_d.get("dvol_btc"),
fng=extf_d.get("fng"),
taker=extf_d.get("taker"),
vix=extf_d.get("vix"),
ls_btc=extf_d.get("ls_btc"),
acb_ready=extf_d.get("acb_ready"),
acb_present=extf_d.get("acb_present"),
exf_age_s=extf_d.get("exf_age_s"),
# esof
moon_phase=esof_d.get("moon_phase"),
mercury_retro=esof_d.get("mercury_retro"),
liquidity_session=esof_d.get("liquidity_session"),
market_cycle_pos=esof_d.get("market_cycle_pos"),
esof_age_s=esof_d.get("esof_age_s"),
# heartbeat
heartbeat_ts=heartbeat_d.get("heartbeat_ts"),
heartbeat_phase=heartbeat_d.get("heartbeat_phase"),
heartbeat_flow=heartbeat_d.get("heartbeat_flow"),
heartbeat_age_s=heartbeat_d.get("heartbeat_age_s"),
# meta health
meta_rm=meta_d.get("meta_rm"),
meta_status=meta_d.get("meta_status"),
m1_proc=meta_d.get("m1_proc"),
m2_heartbeat=meta_d.get("m2_heartbeat"),
m3_data=meta_d.get("m3_data"),
m4_cp=meta_d.get("m4_cp"),
m5_coh=meta_d.get("m5_coh"),
# obf
obf_top=obf_top if isinstance(obf_top, list) else [],
# prefect
prefect_healthy=prefect_healthy,
prefect_flows=prefect_flows if isinstance(prefect_flows, list) else [],
# log
log_lines=log_lines,
)
async def fetch_prefect(self) -> tuple[bool, list[dict]]:
"""Fetch Prefect API health and last 5 flow runs.
Makes async httpx GETs to /api/health and /api/flow-runs with a 2s timeout.
Returns:
(healthy, flows_list) where:
- healthy: True if /api/health returned HTTP 200, False otherwise
- flows_list: list of dicts with keys name, status, start_time, duration
Empty list if Prefect is unreachable or returns no data.
"""
try:
async with httpx.AsyncClient(timeout=PREFECT_TIMEOUT_S) as client:
# Check health
try:
health_resp = await client.get(f"{PREFECT_BASE_URL}/api/health")
healthy = health_resp.status_code == 200
except Exception:
return (False, [])
# Fetch last 5 flow runs
flows_list: list[dict] = []
try:
flows_resp = await client.get(
f"{PREFECT_BASE_URL}/api/flow-runs",
params={"limit": 5, "sort": "START_TIME_DESC"},
)
if flows_resp.status_code == 200:
raw_flows = flows_resp.json()
for run in raw_flows:
state = run.get("state") or {}
flows_list.append(dict(
name=run.get("name"),
status=state.get("type"),
start_time=run.get("start_time"),
duration=run.get("total_run_time"),
))
except Exception:
pass # flows_list stays empty; healthy is still valid
return (healthy, flows_list)
except Exception:
return (False, [])
def tail_log(self, path: str, n: int = 50) -> list[str]:
"""Read the last n lines of a log file efficiently using seek(-chunk, 2).
Uses a chunk-based approach to avoid reading the full file (REQ 13.3).
Args:
path: Path to the log file.
n: Number of lines to return (default 50, per REQ 9.1).
Returns:
List of the last n non-empty lines, or an error/not-found message list.
"""
try:
chunk_size = max(8192, n * 200)
with open(path, "rb") as f:
try:
f.seek(-chunk_size, 2)
except OSError:
# File is smaller than chunk_size — read from start
f.seek(0)
data = f.read()
lines = [ln for ln in data.decode("utf-8", errors="replace").splitlines() if ln.strip()]
return lines[-n:]
except FileNotFoundError:
return [f"Log not found: {path}"]
except Exception as exc:
return [f"Log error: {exc}"]
# ---------------------------------------------------------------------------
# HeaderBar — top bar widget (Req 11.1, 11.2, 11.3)
# ---------------------------------------------------------------------------
TUI_VERSION = "1.1.0"
TUI_BUILD_DATE = "2026-04-03"
class HeaderBar(Static):
"""Top-bar widget: app name, UTC clock (1s updates), HZ badge, meta status badge.
Updated by:
- App's 1s timer → calls update_clock()
- _apply_snapshot() → calls update_status(hz_connected, meta_status)
"""
def __init__(self, hz_host: str = "?", **kwargs) -> None:
super().__init__(**kwargs)
self._hz_connected: bool = False
self._meta_status: str | None = None
self._hz_host: str = hz_host
self._start_time: datetime = datetime.now(timezone.utc)
# ------------------------------------------------------------------
# Internal rendering
# ------------------------------------------------------------------
def _render_markup(self) -> str:
"""Build the full header markup string from current state."""
clock_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
uptime_s = int((datetime.now(timezone.utc) - self._start_time).total_seconds())
h, rem = divmod(uptime_s, 3600)
m, s = divmod(rem, 60)
uptime_str = f"{h:02d}:{m:02d}:{s:02d}"
hz_badge = "[green][HZ ✓][/green]" if self._hz_connected else "[red][HZ ✗][/red]"
if self._meta_status is not None:
color = STATUS_COLORS.get(self._meta_status, "dim")
status_badge = f"[{color}]● {self._meta_status}[/{color}]"
else:
status_badge = "[dim]● --[/dim]"
# Line 1: identity + clock
line1 = (
f"[bold cyan]🐬 DOLPHIN-NAUTILUS MONITOR[/bold cyan]"
f" v{TUI_VERSION} ({TUI_BUILD_DATE})"
f"{clock_str}"
f" │ up {uptime_str}"
f"{status_badge} {hz_badge}"
)
# Line 2: static startup info — always visible, no HZ needed
line2 = (
f"[dim] HZ: {self._hz_host}"
f" │ Python TUI │ q=quit r=refresh l=log ↑↓=scroll[/dim]"
)
return f"{line1}\n{line2}"
# ------------------------------------------------------------------
# Public update methods
# ------------------------------------------------------------------
def update_clock(self) -> None:
"""Refresh the clock display. Called by the app's 1s timer (Req 11.1)."""
self.update(self._render_markup())
def update_status(self, hz_connected: bool, meta_status: str | None) -> None:
"""Update HZ badge and meta status badge. Called from _apply_snapshot() (Req 11.2, 11.3)."""
self._hz_connected = hz_connected
self._meta_status = meta_status
self.update(self._render_markup())
# ---------------------------------------------------------------------------
# SystemHealthPanel — system health widget (Req 6.1, 6.2, 6.3)
# ---------------------------------------------------------------------------
class SystemHealthPanel(Static):
"""System health panel: rm_meta gauge, M1-M5 scores, status with color dot.
Updated by _apply_snapshot() via update_data(snap).
None fields render as "--" — never crashes on missing data.
"""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._snap: DataSnapshot | None = None
# ------------------------------------------------------------------
# Internal rendering
# ------------------------------------------------------------------
def _render_markup(self) -> str:
"""Build the full panel markup string from current snapshot."""
snap = self._snap
# rm_meta — 3 decimal places
if snap is not None and snap.meta_rm is not None:
rm_meta_str = fmt_float(snap.meta_rm, decimals=3)
else:
rm_meta_str = "--"
# M1-M5 — 1 decimal place each
def _m(v: float | None) -> str:
return fmt_float(v, decimals=1) if v is not None else "--"
m1 = _m(snap.m1_proc if snap else None)
m2 = _m(snap.m2_heartbeat if snap else None)
m3 = _m(snap.m3_data if snap else None)
m4 = _m(snap.m4_cp if snap else None)
m5 = _m(snap.m5_coh if snap else None)
# Status line with colored dot
meta_status = snap.meta_status if snap is not None else None
if meta_status is not None:
color = STATUS_COLORS.get(meta_status, "dim")
status_line = f"[{color}]● {meta_status}[/{color}]"
else:
status_line = "[dim]● --[/dim]"
return (
f"[bold]SYSTEM HEALTH[/bold]\n"
f"rm_meta: {rm_meta_str}\n"
f"M1: {m1} M2: {m2}\n"
f"M3: {m3} M4: {m4}\n"
f"M5: {m5}\n"
f"Status: {status_line}"
)
# ------------------------------------------------------------------
# Public update method
# ------------------------------------------------------------------
def update_data(self, snap: DataSnapshot) -> None:
"""Store snapshot and refresh the widget display.
Args:
snap: Latest DataSnapshot from the poll cycle.
"""
self._snap = snap
self.update(self._render_markup())
# ---------------------------------------------------------------------------
# AlphaEnginePanel — alpha engine widget (Req 3.1, 3.2, 3.3, 3.4)
# ---------------------------------------------------------------------------
class AlphaEnginePanel(Static):
"""Alpha engine panel: posture (colored), Rm bar, ACB boost/beta, Cat1-Cat5.
Updated by _apply_snapshot() via update_data(snap).
None fields render as "--" — never crashes on missing data (Req 12.3).
"""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._snap: DataSnapshot | None = None
# ------------------------------------------------------------------
# Internal rendering
# ------------------------------------------------------------------
def _render_markup(self) -> str:
"""Build the full panel markup string from current snapshot."""
snap = self._snap
# --- Posture with color (Req 3.1) ---
posture = snap.posture if snap is not None else None
if posture is not None:
color = posture_color(posture)
posture_str = f"[{color}]{posture}[/{color}]"
else:
posture_str = "[dim]--[/dim]"
# --- Rm bar (Req 3.2) ---
rm = snap.rm if snap is not None else None
rm_str = rm_bar(rm) # returns "--" when rm is None
# --- ACB boost / beta (Req 3.3) ---
boost = snap.acb_boost if snap is not None else None
beta = snap.acb_beta if snap is not None else None
boost_str = fmt_float(boost, decimals=2) if boost is not None else "--"
beta_str = fmt_float(beta, decimals=2) if beta is not None else "--"
# --- Cat1-Cat5 (Req 3.4) — 2 decimal places, right-aligned in 4 chars (Req 15.3) ---
def _cat(v: float | None) -> str:
return f"{v:>6.2f}" if v is not None else " --"
c1 = _cat(snap.cat1 if snap else None)
c2 = _cat(snap.cat2 if snap else None)
c3 = _cat(snap.cat3 if snap else None)
c4 = _cat(snap.cat4 if snap else None)
c5 = _cat(snap.cat5 if snap else None)
return (
f"[bold]ALPHA ENGINE[/bold]\n"
f"Posture: {posture_str}\n"
f"Rm: {rm_str}\n"
f"ACB: {boost_str}x β={beta_str}\n"
f"Cat1:{c1} Cat2:{c2}\n"
f"Cat3:{c3} Cat4:{c4}\n"
f"Cat5:{c5}"
)
# ------------------------------------------------------------------
# Public update method
# ------------------------------------------------------------------
def update_data(self, snap: DataSnapshot) -> None:
"""Store snapshot and refresh the widget display.
Args:
snap: Latest DataSnapshot from the poll cycle.
"""
self._snap = snap
self.update(self._render_markup())
# ---------------------------------------------------------------------------
# ScanBridgePanel — scan bridge / NG7 widget (Req 2.1, 2.2, 2.3, 2.4)
# ---------------------------------------------------------------------------
class ScanBridgePanel(Static):
"""Scan bridge / NG7 panel: scan#, vel_div, w50/w750 velocity, instability_50,
bridge_ts, and scan age (color-coded by staleness).
Updated by _apply_snapshot() via update_data(snap).
None fields render as "--" — never crashes on missing data (Req 12.3).
"""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._snap: DataSnapshot | None = None
# ------------------------------------------------------------------
# Internal rendering
# ------------------------------------------------------------------
def _render_markup(self) -> str:
"""Build the full panel markup string from current snapshot."""
snap = self._snap
# --- Scan number (Req 2.1) ---
scan_num = snap.scan_number if snap is not None else None
scan_str = str(scan_num) if scan_num is not None else "--"
# --- Scan age with color coding (Req 2.2, 2.4) ---
age_s = snap.scan_age_s if snap is not None else None
age_color, age_text = color_age(age_s)
age_str = f"[{age_color}]{age_text} ●[/{age_color}]"
# --- Numeric fields (Req 2.3) — 4 decimal places ---
vel_div_str = fmt_float(snap.vel_div if snap else None, decimals=4)
w50_vel_str = fmt_float(snap.w50_velocity if snap else None, decimals=4)
w750_vel_str = fmt_float(snap.w750_velocity if snap else None, decimals=4)
instab50_str = fmt_float(snap.instability_50 if snap else None, decimals=4)
# --- bridge_ts — format as HH:MM:SS UTC when present ---
bridge_ts_raw = snap.scan_bridge_ts if snap is not None else None
if bridge_ts_raw is not None:
try:
dt = datetime.fromisoformat(bridge_ts_raw.replace("Z", "+00:00"))
bridge_ts_str = dt.strftime("%H:%M:%S UTC")
except (ValueError, AttributeError):
bridge_ts_str = bridge_ts_raw
else:
bridge_ts_str = "--"
return (
f"[bold]SCAN BRIDGE / NG7[/bold]\n"
f"Scan #{scan_str} Age: {age_str}\n"
f"vel_div: {vel_div_str}\n"
f"w50_vel: {w50_vel_str}\n"
f"w750_vel: {w750_vel_str}\n"
f"instab50: {instab50_str}\n"
f"bridge_ts: {bridge_ts_str}"
)
# ------------------------------------------------------------------
# Public update method
# ------------------------------------------------------------------
def update_data(self, snap: DataSnapshot) -> None:
"""Store snapshot and refresh the widget display.
Args:
snap: Latest DataSnapshot from the poll cycle.
"""
self._snap = snap
self.update(self._render_markup())
# ---------------------------------------------------------------------------
# ExtFPanel — external features widget (Req 4.1, 4.2, 4.3)
# ---------------------------------------------------------------------------
class ExtFPanel(Static):
"""ExtF panel: funding_btc, dvol_btc, fng, taker, vix, ls_btc,
ACB readiness (boolean ✓/✗) and present count, and data age (color-coded).
Updated by _apply_snapshot() via update_data(snap).
None fields render as "--" — never crashes on missing data (Req 12.3).
"""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._snap: DataSnapshot | None = None
# ------------------------------------------------------------------
# Internal rendering
# ------------------------------------------------------------------
def _render_markup(self) -> str:
"""Build the full panel markup string from current snapshot."""
snap = self._snap
# --- Numeric fields (Req 4.1) — 4 decimal places ---
funding_str = fmt_float(snap.funding_btc if snap else None, decimals=4)
dvol_str = fmt_float(snap.dvol_btc if snap else None, decimals=4)
fng_str = fmt_float(snap.fng if snap else None, decimals=4)
taker_str = fmt_float(snap.taker if snap else None, decimals=4)
vix_str = fmt_float(snap.vix if snap else None, decimals=4)
ls_btc_str = fmt_float(snap.ls_btc if snap else None, decimals=4)
# --- ACB readiness (Req 4.2) — boolean → ✓ (green) or ✗ (red) ---
acb_ready = snap.acb_ready if snap is not None else None
if acb_ready is True:
acb_ready_str = "[green]✓[/green]"
elif acb_ready is False:
acb_ready_str = "[red]✗[/red]"
else:
acb_ready_str = "[dim]--[/dim]"
# --- ACB present (Req 4.2) — string like "9/9" ---
acb_present = snap.acb_present if snap is not None else None
acb_present_str = acb_present if acb_present is not None else "--"
# --- ExtF data age with color coding (Req 4.3) ---
age_s = snap.exf_age_s if snap is not None else None
age_color, age_text = color_age(age_s)
age_str = f"[{age_color}]{age_text} ●[/{age_color}]"
return (
f"[bold]ExtF[/bold]\n"
f"funding: {funding_str}\n"
f"dvol: {dvol_str}\n"
f"fng: {fng_str}\n"
f"taker: {taker_str}\n"
f"vix: {vix_str}\n"
f"ls_btc: {ls_btc_str}\n"
f"ACB: {acb_present_str} {acb_ready_str}\n"
f"Age: {age_str}"
)
# ------------------------------------------------------------------
# Public update method
# ------------------------------------------------------------------
def update_data(self, snap: DataSnapshot) -> None:
"""Store snapshot and refresh the widget display.
Args:
snap: Latest DataSnapshot from the poll cycle.
"""
self._snap = snap
self.update(self._render_markup())
# ---------------------------------------------------------------------------
# EsoFPanel — esoteric features widget (Req 5.1, 5.2)
# ---------------------------------------------------------------------------
class EsoFPanel(Static):
"""EsoF panel: moon_phase, mercury_retro, liquidity_session, market_cycle_pos,
and data age (color-coded by staleness).
Updated by _apply_snapshot() via update_data(snap).
None fields render as "--" — never crashes on missing data (Req 12.3).
"""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._snap: DataSnapshot | None = None
# ------------------------------------------------------------------
# Internal rendering
# ------------------------------------------------------------------
def _render_markup(self) -> str:
"""Build the full panel markup string from current snapshot."""
snap = self._snap
# --- Moon phase (Req 5.1) — plain text or "--" ---
moon = snap.moon_phase if snap is not None else None
moon_str = moon if moon is not None else "--"
# --- Mercury retrograde (Req 5.1) — colored indicator ---
mercury = snap.mercury_retro if snap is not None else None
if mercury is True:
mercury_str = "[yellow]Retro ⚠[/yellow]"
elif mercury is False:
mercury_str = "[green]Normal[/green]"
else:
mercury_str = "[dim]--[/dim]"
# --- Liquidity session (Req 5.1) — plain text or "--" ---
session = snap.liquidity_session if snap is not None else None
session_str = session if session is not None else "--"
# --- Market cycle position (Req 5.1) — 2 decimal places ---
mc_pos = snap.market_cycle_pos if snap is not None else None
mc_str = fmt_float(mc_pos, decimals=2) if mc_pos is not None else "--"
# --- EsoF data age with color coding (Req 5.2) ---
age_s = snap.esof_age_s if snap is not None else None
age_color, age_text = color_age(age_s)
age_str = f"[{age_color}]{age_text} ●[/{age_color}]"
return (
f"[bold]EsoF[/bold]\n"
f"Moon: {moon_str}\n"
f"Mercury: {mercury_str}\n"
f"Session: {session_str}\n"
f"MC pos: {mc_str}\n"
f"Age: {age_str}"
)
# ------------------------------------------------------------------
# Public update method
# ------------------------------------------------------------------
def update_data(self, snap: DataSnapshot) -> None:
"""Store snapshot and refresh the widget display.
Args:
snap: Latest DataSnapshot from the poll cycle.
"""
self._snap = snap
self.update(self._render_markup())
# ---------------------------------------------------------------------------
# CapitalPanel — capital / PnL widget (Req 7.1, 7.2, 7.3)
# ---------------------------------------------------------------------------
class CapitalPanel(Static):
"""Capital / PnL panel: Blue strategy capital, drawdown, peak, pnl, trades;
Nautilus capital, pnl, trades, and posture.
Updated by _apply_snapshot() via update_data(snap).
None fields render as "--" — never crashes on missing data (Req 12.3).
PnL values are color-coded: positive → green, negative → red, zero → white (Req 7.3).
"""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._snap: DataSnapshot | None = None
# ------------------------------------------------------------------
# Internal rendering
# ------------------------------------------------------------------
def _render_markup(self) -> str:
"""Build the full panel markup string from current snapshot."""
snap = self._snap
# --- Blue capital (Req 7.1) — $X,XXX.XX format ---
capital = snap.capital if snap is not None else None
capital_str = f"${capital:,.2f}" if capital is not None else "--"
# --- Drawdown (Req 7.1) — stored as ratio, displayed as percentage ---
drawdown = snap.drawdown if snap is not None else None
if drawdown is not None:
drawdown_str = f"{drawdown * 100:.2f}%"
else:
drawdown_str = "--"
# --- Peak capital (Req 7.1) — $X,XXX.XX format ---
peak = snap.peak_capital if snap is not None else None
peak_str = f"${peak:,.2f}" if peak is not None else "--"
# --- Blue PnL (Req 7.1, 7.3) — color-coded via fmt_pnl ---
pnl_color, pnl_text = fmt_pnl(snap.pnl if snap is not None else None)
pnl_str = f"[{pnl_color}]{pnl_text}[/{pnl_color}]"
# --- Blue trades (Req 7.1) ---
trades = snap.trades if snap is not None else None
trades_str = str(trades) if trades is not None else "--"
# --- Nautilus capital (Req 7.2) — $X,XXX.XX format ---
naut_capital = snap.nautilus_capital if snap is not None else None
naut_capital_str = f"${naut_capital:,.2f}" if naut_capital is not None else "--"
# --- Nautilus PnL (Req 7.2, 7.3) — color-coded via fmt_pnl ---
naut_pnl_color, naut_pnl_text = fmt_pnl(snap.nautilus_pnl if snap is not None else None)
naut_pnl_str = f"[{naut_pnl_color}]{naut_pnl_text}[/{naut_pnl_color}]"
# --- Nautilus trades (Req 7.2) ---
naut_trades = snap.nautilus_trades if snap is not None else None
naut_trades_str = str(naut_trades) if naut_trades is not None else "--"
# --- Nautilus posture (Req 7.2) — color-coded via posture_color ---
naut_posture = snap.nautilus_posture if snap is not None else None
if naut_posture is not None:
color = posture_color(naut_posture)
naut_posture_str = f"[{color}]{naut_posture}[/{color}]"
else:
naut_posture_str = "[dim]--[/dim]"
return (
f"[bold]CAPITAL / PnL[/bold]\n"
f"Blue capital: {capital_str}\n"
f"Drawdown: {drawdown_str}\n"
f"Peak: {peak_str}\n"
f"PnL today: {pnl_str}\n"
f"Trades: {trades_str}\n"
f"Nautilus cap: {naut_capital_str}\n"
f"Nautilus PnL: {naut_pnl_str}\n"
f"Naut trades: {naut_trades_str}\n"
f"Naut posture: {naut_posture_str}"
)
# ------------------------------------------------------------------
# Public update method
# ------------------------------------------------------------------
def update_data(self, snap: DataSnapshot) -> None:
"""Store snapshot and refresh the widget display.
Args:
snap: Latest DataSnapshot from the poll cycle.
"""
self._snap = snap
self.update(self._render_markup())
# ---------------------------------------------------------------------------
# PrefectPanel — Prefect flow orchestrator status (Req 8.18.4)
# ---------------------------------------------------------------------------
# Flow run status → Textual color class (Req 8.3) — public constant
PREFECT_FLOW_STATUS_COLORS: dict[str, str] = {
"COMPLETED": "green",
"RUNNING": "cyan",
"FAILED": "red",
"CRASHED": "red",
"PENDING": "yellow",
"LATE": "yellow",
}
# Internal alias (includes extra statuses for display)
_FLOW_STATUS_COLORS: dict[str, str] = {
**PREFECT_FLOW_STATUS_COLORS,
"CANCELLED": "dim",
"CANCELLING": "dim",
}
def _fmt_flow_duration(seconds: float | None) -> str:
"""Format a flow run duration (total_run_time in seconds) as a human string.
Examples: 45s → "45s", 135s → "2m 15s", 3600s → "60m 0s".
Returns "--" for None.
"""
if seconds is None:
return "--"
s = int(seconds)
if s < 60:
return f"{s}s"
m = s // 60
rem_s = s % 60
return f"{m}m {rem_s}s"
def _fmt_flow_start(iso: str | None) -> str:
"""Format an ISO-8601 start_time string as HH:MM UTC.
Returns "--" for None or unparseable values.
"""
if iso is None:
return "--"
try:
# Truncate sub-second precision and timezone suffix for fromisoformat
ts = iso[:19].replace("T", " ")
dt = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S")
return dt.strftime("%H:%M UTC")
except Exception:
return iso[:5] if len(iso) >= 5 else "--"
class PrefectPanel(Static):
"""Prefect orchestration panel.
Displays:
- Health badge: [PREFECT ✓] (green) or [PREFECT OFFLINE] (red) (Req 8.1, 8.4)
- Last 5 flow runs: name, status (color-coded), start time, duration (Req 8.2, 8.3)
Updated by _apply_snapshot() via update_data(snap).
Never crashes when Prefect is unreachable (Req 12.2).
"""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._snap: DataSnapshot | None = None
# ------------------------------------------------------------------
# Internal rendering
# ------------------------------------------------------------------
def _render_markup(self) -> str:
"""Build the full panel markup string from current snapshot."""
snap = self._snap
# --- Health badge (Req 8.1, 8.4) ---
if snap is None or not snap.prefect_healthy:
health_str = "[red][PREFECT OFFLINE][/red]"
else:
health_str = "[green][PREFECT ✓][/green]"
flows = snap.prefect_flows if snap is not None else []
# --- Flow run rows (Req 8.2, 8.3) ---
rows: list[str] = []
for run in flows[:5]:
name = run.get("name") or "--"
status = (run.get("status") or "").upper()
start = _fmt_flow_start(run.get("start_time"))
duration = _fmt_flow_duration(run.get("duration"))
color = _FLOW_STATUS_COLORS.get(status, "dim")
# Truncate long flow names to keep layout tidy
name_trunc = name[:22] if len(name) > 22 else name
status_str = f"[{color}]{status or '--'}[/{color}]"
rows.append(f" {status_str:<30} {name_trunc:<24} {start} {duration}")
# Pad to 5 rows so the panel height stays stable
while len(rows) < 5:
rows.append(" [dim]--[/dim]")
flow_block = "\n".join(rows)
return (
f"[bold]PREFECT FLOWS[/bold] {health_str}\n"
f" {'STATUS':<20} {'NAME':<24} {'START':>5} {'DUR':>4}\n"
f"{flow_block}"
)
# ------------------------------------------------------------------
# Public update method
# ------------------------------------------------------------------
def update_data(self, snap: DataSnapshot) -> None:
"""Store snapshot and refresh the widget display.
Args:
snap: Latest DataSnapshot from the poll cycle.
"""
self._snap = snap
self.update(self._render_markup())
# ---------------------------------------------------------------------------
# OBFPanel — top-5 assets by absolute imbalance
# ---------------------------------------------------------------------------
class OBFPanel(Static):
"""Order Book Features panel.
Displays a table of the top-5 assets ranked by absolute imbalance value.
Columns: asset, imbalance (color-coded), fill_prob, depth_quality.
Updated by _apply_snapshot() via update_data(snap).
Shows "No OBF data" when obf_top is empty or None.
"""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._snap: DataSnapshot | None = None
# ------------------------------------------------------------------
# Internal rendering
# ------------------------------------------------------------------
def _render_markup(self) -> str:
"""Build the full panel markup string from current snapshot."""
snap = self._snap
obf_top: list[dict] = (snap.obf_top if snap is not None else None) or []
if not obf_top:
return "[bold]OBF TOP ASSETS[/bold]\n [dim]No OBF data[/dim]"
header = (
f"[bold]OBF TOP ASSETS[/bold]\n"
f" {'ASSET':<8} {'IMBALANCE':>8} {'FILL_PROB':>9} {'DEPTH_QUAL':>13}"
)
rows: list[str] = []
for entry in obf_top[:5]:
asset = str(entry.get("asset") or "--")
imbalance = entry.get("imbalance")
fill_prob = entry.get("fill_prob")
depth_quality = entry.get("depth_quality")
# Imbalance: right-aligned 8 chars, +/- sign, 2 decimals (Req 1.3)
if imbalance is None:
imb_str = f"{'--':>8}"
else:
imb_str = f"{imbalance:>+8.2f}"
fp_str = f"{fill_prob:>9.2f}" if fill_prob is not None else f"{'--':>9}"
dq_str = f"{depth_quality:>13.2f}" if depth_quality is not None else f"{'--':>13}"
rows.append(f" {asset:<8} {imb_str} {fp_str} {dq_str}")
# Pad to 5 rows so panel height stays stable
while len(rows) < 5:
rows.append(f" [dim]{'--':<8} {'--':>8} {'--':>9} {'--':>13}[/dim]")
return header + "\n" + "\n".join(rows)
# ------------------------------------------------------------------
# Public update method
# ------------------------------------------------------------------
def update_data(self, snap: DataSnapshot) -> None:
"""Store snapshot and refresh the widget display.
Args:
snap: Latest DataSnapshot from the poll cycle.
"""
self._snap = snap
self.update(self._render_markup())
# ---------------------------------------------------------------------------
# LogPanel
# ---------------------------------------------------------------------------
class LogPanel(VerticalScroll):
"""Scrollable log tail panel.
Displays the last N lines from a log file (default: run_logs/meta_health.log).
Visibility is toggled by the ``l`` key in DolphinTUIApp.
When the log file does not exist or cannot be read, shows a
``"Log not found: <path>"`` fallback message.
The panel is hidden by default; DolphinTUIApp calls
``display = True/False`` to show/hide it.
"""
DEFAULT_CSS = """
LogPanel {
height: 8;
border: solid $border;
background: $surface;
padding: 0 1;
}
"""
def __init__(self, log_path: str = LOG_DEFAULT_PATH, **kwargs) -> None:
super().__init__(**kwargs)
self._log_path: str = log_path
self._lines: list[str] = []
self._snap: DataSnapshot | None = None
# ------------------------------------------------------------------
# Compose
# ------------------------------------------------------------------
def compose(self) -> ComposeResult:
"""Yield the inner Static child that holds the log text."""
yield Static(self._build_content(), id="log_inner")
# ------------------------------------------------------------------
# Internal rendering
# ------------------------------------------------------------------
def _build_content(self) -> str:
"""Build the markup string from the current log lines."""
if not self._lines:
return (
f"[bold]LOG TAIL [meta_health.log] (l=toggle, ↑↓=scroll)[/bold]\n"
)
header = "[bold]LOG TAIL [meta_health.log] (l=toggle, ↑↓=scroll)[/bold]"
body = "\n".join(self._lines)
return header + "\n" + body
# ------------------------------------------------------------------
# Public update methods
# ------------------------------------------------------------------
def update_data(self, snap: DataSnapshot) -> None:
"""Store snapshot and refresh the inner Static widget.
Args:
snap: Latest DataSnapshot from the poll cycle.
"""
self._snap = snap
self._lines = list(snap.log_lines) if snap is not None else []
content = self._build_content()
try:
inner = self.query_one("#log_inner", Static)
inner.update(content)
except Exception:
pass # Not yet mounted — content will be set in compose()
def update_lines(self, lines: list[str]) -> None:
"""Replace displayed log lines and refresh the widget.
Args:
lines: New list of log lines (may be empty to show fallback).
"""
self._lines = lines or []
content = self._build_content()
try:
inner = self.query_one("#log_inner", Static)
inner.update(content)
except Exception:
pass
def set_log_path(self, path: str) -> None:
"""Update the log file path label (does not re-read the file).
Args:
path: New log file path string.
"""
self._log_path = path
# ---------------------------------------------------------------------------
# DolphinTUIApp — main Textual application (Req 11, 12, 13, 14, 15)
# ---------------------------------------------------------------------------
class DolphinTUIApp(App):
"""Top-level Textual App for the DOLPHIN-NAUTILUS real-time TUI monitor.
Owns the layout, keyboard bindings, and the 2s poll timer.
Instantiates DolphinDataFetcher in on_mount() and drives all panel updates
via _apply_snapshot() on each poll cycle.
"""
CSS = """
Screen {
background: #0d0d0d;
}
/* ------------------------------------------------------------------ */
/* Color utility classes — used by panel markup via [@class] tags */
/* ------------------------------------------------------------------ */
.green {
color: #00ff00;
}
.yellow {
color: #ffff00;
}
.red {
color: #ff0000;
}
.dim {
color: #666666;
}
.dark_orange {
color: #ff8c00;
}
.cyan {
color: #00ffff;
}
/* ------------------------------------------------------------------ */
/* Header bar */
/* ------------------------------------------------------------------ */
HeaderBar {
height: 2;
background: #1a1a1a;
color: #cccccc;
padding: 0 1;
border-bottom: solid #333333;
}
/* ------------------------------------------------------------------ */
/* Panel widgets — shared border / padding */
/* ------------------------------------------------------------------ */
SystemHealthPanel,
AlphaEnginePanel,
ScanBridgePanel,
ExtFPanel,
EsoFPanel,
CapitalPanel,
PrefectPanel,
OBFPanel {
border: solid #333333;
padding: 0 1;
background: #111111;
color: #cccccc;
}
/* ------------------------------------------------------------------ */
/* Layout rows */
/* ------------------------------------------------------------------ */
#top_row {
height: 9;
layout: horizontal;
}
#mid_row {
height: 11;
layout: horizontal;
}
#bottom_row {
height: 9;
layout: horizontal;
}
/* Top row — equal thirds */
#top_row SystemHealthPanel {
width: 1fr;
}
#top_row AlphaEnginePanel {
width: 1fr;
}
#top_row ScanBridgePanel {
width: 1fr;
}
/* Mid row — equal thirds */
#mid_row ExtFPanel {
width: 1fr;
}
#mid_row EsoFPanel {
width: 1fr;
}
#mid_row CapitalPanel {
width: 1fr;
}
/* Bottom row — two halves */
#bottom_row PrefectPanel {
width: 1fr;
}
#bottom_row OBFPanel {
width: 1fr;
}
/* ------------------------------------------------------------------ */
/* Log panel */
/* ------------------------------------------------------------------ */
LogPanel {
height: 8;
border: solid #333333;
background: #0a0a0a;
padding: 0 1;
color: #888888;
}
/* ------------------------------------------------------------------ */
/* Terminal size warning overlay (Req 14.4) */
/* ------------------------------------------------------------------ */
#size_warning {
layer: overlay;
width: 100%;
height: 100%;
background: #1a0000 80%;
border: double #ff0000;
color: #ff4444;
content-align: center middle;
text-align: center;
text-style: bold;
}
"""
BINDINGS = [
("q", "quit", "Quit"),
("r", "force_refresh", "Refresh"),
("l", "toggle_log", "Log Panel"),
("up", "scroll_up", "Scroll Up"),
("down", "scroll_down", "Scroll Down"),
]
# ------------------------------------------------------------------
# Instance state
# ------------------------------------------------------------------
_log_visible: bool = False
def __init__(self, hz_host: str = "dolphin.taile8ad92.ts.net",
hz_port: int = 5701,
log_path: str = LOG_DEFAULT_PATH,
mock_mode: bool = False,
**kwargs) -> None:
super().__init__(**kwargs)
self.hz_host = hz_host
self.hz_port = hz_port
self.log_path = log_path
self.mock_mode = mock_mode
self.fetcher: DolphinDataFetcher | None = None
# ------------------------------------------------------------------
# Compose — widget tree
# ------------------------------------------------------------------
def compose(self) -> ComposeResult:
"""Build the full widget tree.
Layout:
- HeaderBar (full width, 1 row)
- #top_row: SystemHealthPanel | AlphaEnginePanel | ScanBridgePanel
- #mid_row: ExtFPanel | EsoFPanel | CapitalPanel
- #bottom_row: PrefectPanel | OBFPanel
- LogPanel (full width, toggleable)
"""
yield HeaderBar(hz_host=f"{self.hz_host}:{self.hz_port}", id="header")
with Horizontal(id="top_row"):
yield SystemHealthPanel(id="panel_health")
yield AlphaEnginePanel(id="panel_alpha")
yield ScanBridgePanel(id="panel_scan")
with Horizontal(id="mid_row"):
yield ExtFPanel(id="panel_extf")
yield EsoFPanel(id="panel_esof")
yield CapitalPanel(id="panel_capital")
with Horizontal(id="bottom_row"):
yield PrefectPanel(id="panel_prefect")
yield OBFPanel(id="panel_obf")
yield LogPanel(log_path=self.log_path, id="panel_log")
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def on_mount(self) -> None:
"""Instantiate fetcher, connect to HZ, start poll and clock timers."""
# 1. Instantiate fetcher (skip in mock mode)
if not self.mock_mode:
self.fetcher = DolphinDataFetcher(
hz_host=self.hz_host,
hz_port=self.hz_port,
log_path=self.log_path,
)
await self.fetcher.connect_hz()
# 2. Start timers
self._poll_timer = self.set_interval(2, self._poll)
self.set_interval(1, self._update_clock)
# 3. Terminal size check
self._check_terminal_size()
# 4. Hide LogPanel initially
try:
self.query_one("#panel_log", LogPanel).display = False
except Exception:
pass
# 5. Fire first poll immediately so panels aren't blank on startup
await self._poll()
# ------------------------------------------------------------------
# Clock update
# ------------------------------------------------------------------
def _update_clock(self) -> None:
"""Called by the 1s timer — refreshes the HeaderBar UTC clock."""
try:
self.query_one(HeaderBar).update_clock()
except Exception:
pass # Widget not yet mounted or already unmounted
# ------------------------------------------------------------------
# Terminal size check (Req 14.4)
# ------------------------------------------------------------------
def _check_terminal_size(self) -> None:
"""Show or hide the terminal-too-small warning overlay.
Mounts the overlay on first call when the terminal is too small.
Toggles display=True/False on subsequent calls (e.g. from on_resize).
"""
size = self.size
too_small = size.width < MIN_TERM_WIDTH or size.height < MIN_TERM_HEIGHT
try:
warning = self.query_one("#size_warning", Static)
# Already mounted — just toggle visibility and update text
if too_small:
warning.update(
f"⚠ Terminal too small — resize to "
f"{MIN_TERM_WIDTH}×{MIN_TERM_HEIGHT} minimum\n"
f"(current: {size.width}×{size.height})"
)
warning.display = True
else:
warning.display = False
except Exception:
# Not yet mounted — create it if needed
if too_small:
try:
overlay = Static(
f"⚠ Terminal too small — resize to "
f"{MIN_TERM_WIDTH}×{MIN_TERM_HEIGHT} minimum\n"
f"(current: {size.width}×{size.height})",
id="size_warning",
)
self.mount(overlay)
except Exception:
pass # Non-fatal — app still runs
def on_resize(self, event) -> None:
"""Re-check terminal size whenever the terminal is resized (Req 14.4)."""
self._check_terminal_size()
# ------------------------------------------------------------------
# Poll / snapshot
# ------------------------------------------------------------------
@staticmethod
def _make_mock_snapshot() -> "DataSnapshot":
"""Return a fully-populated DataSnapshot with realistic mock values."""
import time as _time
return DataSnapshot(
ts=_time.time(),
hz_connected=True,
prefect_connected=True,
prefect_healthy=True,
# scan
scan_number=59001,
vel_div=-0.0312,
w50_velocity=-0.0421,
w750_velocity=-0.0109,
instability_50=0.0234,
asset_prices={"BTCUSDT": 83420.5, "ETHUSDT": 1612.3},
scan_bridge_ts="2026-04-04T18:45:00+00:00",
scan_age_s=2.1,
# safety
posture="APEX",
rm=0.82,
cat1=0.9, cat2=0.8, cat3=0.7, cat4=1.0, cat5=0.9,
# state
capital=124532.10,
drawdown=-3.21,
peak_capital=128750.00,
pnl=1240.50,
trades=12,
nautilus_capital=124532.10,
nautilus_pnl=1240.50,
nautilus_trades=12,
nautilus_posture="APEX",
nautilus_param_hash="abc123",
# extf
funding_btc=-0.012,
dvol_btc=62.4,
fng=28.0,
taker=0.81,
vix=18.2,
ls_btc=0.48,
acb_ready=True,
acb_present="9/9",
exf_age_s=4.2,
# esof
moon_phase="Waxing Gibbous",
mercury_retro=False,
liquidity_session="London",
market_cycle_pos=0.42,
esof_age_s=3.8,
# heartbeat
heartbeat_ts=_time.time() - 5,
heartbeat_phase="trading",
heartbeat_flow="nautilus_event_trader",
heartbeat_age_s=5.1,
# meta health
meta_rm=0.923,
meta_status="GREEN",
m1_proc=1.0,
m2_heartbeat=1.0,
m3_data=1.0,
m4_cp=1.0,
m5_coh=1.0,
# obf
obf_top=[
{"asset": "BTCUSDT", "imbalance": 0.18, "fill_prob": 0.72, "depth_quality": 1.2},
{"asset": "ETHUSDT", "imbalance": 0.12, "fill_prob": 0.68, "depth_quality": 1.1},
{"asset": "SOLUSDT", "imbalance": 0.09, "fill_prob": 0.61, "depth_quality": 1.0},
{"asset": "BNBUSDT", "imbalance": -0.05, "fill_prob": 0.51, "depth_quality": 0.9},
{"asset": "XRPUSDT", "imbalance": -0.11, "fill_prob": 0.44, "depth_quality": 0.8},
],
# prefect
prefect_flows=[
{"name": "paper_trade_flow", "status": "COMPLETED", "start_time": "10:30:01", "duration": "2m"},
{"name": "nautilus_prefect", "status": "COMPLETED", "start_time": "10:22:00", "duration": "8m"},
{"name": "obf_prefect_flow", "status": "RUNNING", "start_time": "10:30:00", "duration": "0m"},
{"name": "exf_fetcher_flow", "status": "COMPLETED", "start_time": "10:15:00", "duration": "15m"},
{"name": "mc_forewarner_flow", "status": "COMPLETED", "start_time": "09:30:00", "duration": "1h"},
],
# log
log_lines=[
"10:30:01 [INFO] RM_META=0.923 [GREEN] M1=1.0 M2=1.0 M3=1.0 M4=1.0 M5=1.0",
"10:29:56 [INFO] RM_META=0.921 [GREEN] M1=1.0 M2=1.0 M3=0.98 M4=1.0 M5=1.0",
"10:29:51 [INFO] HEARTBEAT phase=trading flow=nautilus_event_trader",
"10:29:46 [INFO] SCAN #59000 vel_div=-0.031 posture=APEX",
"10:29:41 [INFO] ACB boost=1.55x beta=0.80 cut=0.0",
],
)
async def _poll(self) -> None:
"""Called by set_interval(2) — fetches data and updates all panels."""
import dataclasses, sys
if self.mock_mode:
self._apply_snapshot(self._make_mock_snapshot())
return
try:
snap, (prefect_healthy, prefect_flows) = await asyncio.gather(
self.fetcher.fetch(),
self.fetcher.fetch_prefect(),
)
snap = dataclasses.replace(
snap,
prefect_healthy=prefect_healthy,
prefect_connected=prefect_healthy,
prefect_flows=prefect_flows if isinstance(prefect_flows, list) else [],
)
except Exception as exc:
print(f"[POLL ERROR] {exc}", file=sys.stderr)
import traceback; traceback.print_exc(file=sys.stderr)
return
self._apply_snapshot(snap)
def _apply_snapshot(self, snap: DataSnapshot) -> None:
"""Update all panel widgets from a DataSnapshot.
None fields render as '--'; color classes applied per thresholds.
Log panel updated only if self._log_visible.
Args:
snap: Latest DataSnapshot from the poll cycle.
"""
# --- HeaderBar: HZ badge + meta status badge ---
try:
self.query_one("#header", HeaderBar).update_status(
snap.hz_connected, snap.meta_status
)
except Exception:
pass
# --- SystemHealthPanel ---
try:
self.query_one("#panel_health", SystemHealthPanel).update_data(snap)
except Exception as exc:
import sys; print(f"[APPLY panel_health] {exc}", file=sys.stderr)
# --- AlphaEnginePanel ---
try:
self.query_one("#panel_alpha", AlphaEnginePanel).update_data(snap)
except Exception:
pass
# --- ScanBridgePanel ---
try:
self.query_one("#panel_scan", ScanBridgePanel).update_data(snap)
except Exception:
pass
# --- ExtFPanel ---
try:
self.query_one("#panel_extf", ExtFPanel).update_data(snap)
except Exception:
pass
# --- EsoFPanel ---
try:
self.query_one("#panel_esof", EsoFPanel).update_data(snap)
except Exception:
pass
# --- CapitalPanel ---
try:
self.query_one("#panel_capital", CapitalPanel).update_data(snap)
except Exception:
pass
# --- PrefectPanel ---
try:
self.query_one("#panel_prefect", PrefectPanel).update_data(snap)
except Exception:
pass
# --- OBFPanel ---
try:
self.query_one("#panel_obf", OBFPanel).update_data(snap)
except Exception:
pass
# --- LogPanel — only if visible (Req 9.2) ---
if self._log_visible:
try:
self.query_one("#panel_log", LogPanel).update_data(snap)
except Exception:
pass
# ------------------------------------------------------------------
# Actions
# ------------------------------------------------------------------
async def action_quit(self) -> None:
"""Shut down HZ client cleanly, then exit."""
if self.fetcher is not None:
await self.fetcher.disconnect_hz()
self.exit()
async def action_force_refresh(self) -> None:
"""Cancel the current poll timer, immediately run a poll cycle, then restart the timer."""
try:
self._poll_timer.stop()
except Exception:
pass
await self._poll()
self._poll_timer = self.set_interval(2, self._poll)
def action_toggle_log(self) -> None:
"""Toggle LogPanel visibility and update _log_visible flag."""
self._log_visible = not self._log_visible
try:
self.query_one("#panel_log", LogPanel).display = self._log_visible
except Exception:
pass
if __name__ == "__main__":
args = parse_args()
app = DolphinTUIApp(
hz_host=args.hz_host,
hz_port=args.hz_port,
log_path=args.log_path,
mock_mode=args.mock,
)
app.run()