Files
siloqy/Observability/dolphin_status_pink.py
Codex ef473ba372 PINK: E2E trace analysis — Pass 23 closure review/unfinished fixes/ops gaps (Z1-Z14)
Twenty-third (final) pass: _safe_enum fix applied to rust_backend.py but NOT
real_zinc_plane.py other copy crashes (Z1 High), no health check endpoint
silent failures invisible to orchestration (Z5 High), process_intent calls
venue.submit without exception handler venue error bypasses Rust FSM (Z6 High),
snapshot mixes Rust and Python accounting capital can diverge (Z7 Medium),
BingxVenueAdapter.close executor null-to-shutdown TOCTOU race (Z8 Medium),
generated test f-string chr(34) template SyntaxError risk on old Python (Z9
Medium), launcher uses Python 3.10+ | union syntax no min version documented
(Z10 Medium), concurrent process_intent on same slot no lock no queue (Z12
Medium). 403 total flaws across 23 passes.

Co-authored-by: CommandCodeBot <noreply@commandcode.ai>
2026-06-02 19:44:15 +02:00

1966 lines
76 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""DOLPHIN PINK live status — v7
0.5s poll. SIG/TRD/FIL gear rows + last-5-trades + CH persistence + V7 exit cmp.
HOTKEYS: [h] help [1] reduce 50% [2] reduce 75% [3] close all [s] STALKER [a] APEX
Note: posture hotkeys are disabled by default in PINK isolation mode.
Run: source /home/dolphin/siloqy_env/bin/activate && python dolphin_status_pink.py
Quit: Ctrl-C
"""
# v1v5 archived as dolphin_status_v{1..5}.py
# v6: exit-comparison overlay (V7 preview), net-pnl pct fix
# v7: hotkey system — position reduce, posture switch, CH audit trail
import hashlib, hmac, json, os, re, select, threading, time, sys, uuid
import termios, tty, urllib.request, urllib.parse
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
import hazelcast
def _float_env(name: str, default: float) -> float:
raw = os.environ.get(name, "")
try:
val = float(raw)
if val > 0:
return val
except Exception:
pass
return default
# ── PINK runtime bindings ──────────────────────────────────────────────────────
PINK_CH_DB = os.environ.get("DOLPHIN_TUI_CH_DB", "dolphin_pink")
PINK_STRATEGY = os.environ.get("DOLPHIN_TUI_STRATEGY", "pink")
PINK_STATE_MAP = os.environ.get("DOLPHIN_TUI_STATE_MAP", "DOLPHIN_STATE_PINK")
PINK_RUNTIME_COMMAND_KEY = os.environ.get("DOLPHIN_TUI_RUNTIME_COMMAND_KEY", "pink_runtime_commands")
PINK_SAFETY_MAP = os.environ.get("DOLPHIN_TUI_SAFETY_MAP", "DOLPHIN_SAFETY")
PINK_HEARTBEAT_MAP = os.environ.get("DOLPHIN_TUI_HEARTBEAT_MAP", "DOLPHIN_HEARTBEAT")
PINK_HEARTBEAT_KEY = os.environ.get("DOLPHIN_TUI_HEARTBEAT_KEY", "nautilus_flow_heartbeat")
PINK_META_HEALTH_MAP = os.environ.get("DOLPHIN_TUI_META_HEALTH_MAP", "DOLPHIN_META_HEALTH")
PINK_ANNOUNCEMENTS_MAP = os.environ.get("DOLPHIN_TUI_ANNOUNCEMENTS_MAP", "DOLPHIN_ANNOUNCEMENTS")
PINK_FEATURES_MAP = os.environ.get("DOLPHIN_TUI_FEATURES_MAP", "DOLPHIN_FEATURES")
PINK_ALLOW_GLOBAL_POSTURE_HOTKEYS = str(
os.environ.get("DOLPHIN_PINK_TUI_ENABLE_GLOBAL_POSTURE", "0")
).strip().lower() in {"1", "true", "yes", "on"}
# ── ClickHouse fire-and-forget write ─────────────────────────────────────────
_CH_URL = "http://localhost:8123"
_CH_USER = "dolphin"
_CH_PASS = "dolphin_ch_2026"
_CH_Q: deque = deque(maxlen=500)
_CH_TABLE_Q: dict[str, deque] = {} # table_name → deque of rows
_CH_TABLE_LOCK = threading.Lock()
def _ch_flush_table(table: str, rows: list):
if not rows:
return
body = "\n".join(json.dumps(r) for r in rows).encode()
url = f"{_CH_URL}/?database={PINK_CH_DB}&query=INSERT+INTO+{table}+FORMAT+JSONEachRow"
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("X-ClickHouse-User", _CH_USER)
req.add_header("X-ClickHouse-Key", _CH_PASS)
req.add_header("Content-Type", "application/octet-stream")
try:
urllib.request.urlopen(req, timeout=4)
except Exception:
pass
def _ch_worker():
while True:
time.sleep(2)
rows = []
while _CH_Q:
try: rows.append(_CH_Q.popleft())
except IndexError: break
if rows:
_ch_flush_table("status_snapshots", rows)
with _CH_TABLE_LOCK:
tables = {t: list(q) for t, q in _CH_TABLE_Q.items() if q}
for q in _CH_TABLE_Q.values():
q.clear()
for table, trows in tables.items():
_ch_flush_table(table, trows)
threading.Thread(target=_ch_worker, daemon=True, name="ch-status-pink").start()
def ch_put(row: dict, table: str | None = None):
if table:
with _CH_TABLE_LOCK:
if table not in _CH_TABLE_Q:
_CH_TABLE_Q[table] = deque(maxlen=500)
_CH_TABLE_Q[table].append(row)
else:
_CH_Q.append(row)
# ── Trade panel sources ───────────────────────────────────────────────────────
def _parse_path_env(name: str) -> tuple[Path, ...]:
"""Parse a path list env var using common separators."""
raw = os.environ.get(name, "").strip()
if not raw:
return ()
paths = []
for chunk in re.split(r"[:;,]", raw):
item = chunk.strip()
if item:
paths.append(Path(item))
return tuple(paths)
_TRADER_LOG_CANDIDATES = tuple(
dict.fromkeys(
[
*_parse_path_env("DOLPHIN_TUI_TRADER_LOG_PATHS"),
Path("/tmp/dolphin_logs/supervisor/dolphin_live_pink.log"),
Path("/root/dolphin_logs/supervisor/dolphin_live_pink.log"),
Path("/mnt/dolphinng5_predict/prod/supervisor/logs/dolphin_live_pink.log"),
]
)
)
# Capture the JSON dict only — stop at first } closing the payload.
# Lines may have a trailing tag like [v2_gold_fix_v50-v750] after the dict.
_RE_ENTRY = re.compile(r"\[(.+?)\] ENTRY: (\{.+?\})(?:\s*\[.*\])?$")
_RE_EXIT = re.compile(r"\[(.+?)\] (?:SUBDAY_)?EXIT: (\{.+?\})(?:\s*\[.*\])?$")
_TRADE_PANEL_CACHE = {"ts": 0.0, "open": [], "closed": [], "fills": []}
_ACCOUNT_EVENT_CACHE = {"ts": 0.0, "row": None}
def _normalize_exit_reason(reason: str) -> str:
text = str(reason or "").strip()
if text == "V7_MAE_SL_VOL_NORM":
return "V7.1_MAE_SL_VOL_NORM"
return text
def _resolve_trader_log() -> Path | None:
"""Pick the freshest available supervisor log across known mount layouts."""
best_path = None
best_key = None
for path in _TRADER_LOG_CANDIDATES:
try:
stat = path.stat()
except OSError:
continue
if stat.st_size <= 0:
continue
key = (stat.st_mtime, stat.st_size)
if best_key is None or key > best_key:
best_key = key
best_path = path
return best_path
def _parse_log_dict(raw: str) -> dict:
"""Parse a Python dict repr from a log line. Handles nan and single-quoted strings."""
import ast
# Replace nan/inf with JSON-safe equivalents before parsing
cleaned = raw.replace(": nan", ": null").replace(": inf", ": null").replace(": -inf", ": null")
try:
return ast.literal_eval(raw) # handles all Python literal forms incl. nan
except Exception:
pass
try:
return json.loads(cleaned.replace("'", '"'))
except Exception:
raise ValueError(f"unparseable: {raw[:80]}")
def _ch_query(sql: str, *, database: str = PINK_CH_DB) -> list[dict]:
"""Execute a ClickHouse query and return JSONEachRow dicts."""
try:
query = urllib.parse.quote(f"{sql} FORMAT JSONEachRow")
url = f"{_CH_URL}/?database={database}&query={query}"
req = urllib.request.Request(url)
req.add_header("X-ClickHouse-User", _CH_USER)
req.add_header("X-ClickHouse-Key", _CH_PASS)
with urllib.request.urlopen(req, timeout=5) as resp:
body = resp.read().decode("utf-8").strip()
if not body:
return []
return [json.loads(line) for line in body.splitlines() if line.strip()]
except Exception:
return []
def _position_key(row: dict) -> tuple:
trade_id = str(row.get("trade_id") or "").strip()
if trade_id:
return ("trade_id", trade_id)
asset = str(row.get("asset") or "").strip()
entry = row.get("entry_price")
try:
if asset and entry is not None:
return ("asset_entry", asset, round(float(entry), 8))
except Exception:
pass
if asset:
return ("asset", asset)
return ("unknown", id(row))
def _fetch_open_positions_from_ch(limit: int = 5) -> list[dict]:
rows = _ch_query(
f"""
SELECT
trade_id,
asset,
direction,
entry_price,
quantity,
notional,
leverage,
bucket_id,
bars_held,
last_ts
FROM (
SELECT
trade_id,
argMax(asset, ts) AS asset,
argMax(direction, ts) AS direction,
argMax(entry_price, ts) AS entry_price,
argMax(quantity, ts) AS quantity,
argMax(notional, ts) AS notional,
argMax(leverage, ts) AS leverage,
argMax(bucket_id, ts) AS bucket_id,
argMax(bars_held, ts) AS bars_held,
argMax(status, ts) AS status,
max(ts) AS last_ts
FROM {PINK_CH_DB}.position_state
GROUP BY trade_id
)
WHERE status = 'OPEN'
ORDER BY last_ts DESC
LIMIT {int(limit)}
"""
)
return rows
def _fetch_latest_account_event_from_ch() -> dict | None:
now = time.time()
cached = _ACCOUNT_EVENT_CACHE
if now - float(cached.get("ts", 0.0) or 0.0) < 0.75:
row = cached.get("row")
return dict(row) if isinstance(row, dict) else None
rows = _ch_query(
f"""
SELECT
ts,
event_type,
strategy,
capital,
trades_today,
open_positions,
current_open_notional,
current_account_leverage,
notes
FROM {PINK_CH_DB}.account_events
ORDER BY ts DESC
LIMIT 5
"""
)
row = None
if rows:
row = rows[0]
for candidate in rows:
try:
if float(candidate.get("current_open_notional", 0) or 0) > 0:
row = candidate
break
except Exception:
continue
cached["ts"] = now
cached["row"] = dict(row) if isinstance(row, dict) else None
return dict(row) if isinstance(row, dict) else None
def _account_event_payload(row: dict | None) -> dict:
if not isinstance(row, dict):
return {}
raw = str(row.get("notes") or "").strip()
if not raw:
return {}
try:
payload = json.loads(raw)
if isinstance(payload, dict):
inner = payload.get("payload")
if isinstance(inner, dict):
return inner
return payload
except Exception:
return {}
return {}
def _account_event_positions(row: dict | None) -> list[dict]:
payload = _account_event_payload(row)
positions = payload.get("positions") if isinstance(payload, dict) else {}
if not isinstance(positions, dict):
return []
out: list[dict] = []
for symbol, pos in positions.items():
if not isinstance(pos, dict):
continue
asset = str(pos.get("symbol") or symbol or "").replace("-", "")
if not asset:
continue
side = str(pos.get("positionSide") or pos.get("side") or "SHORT").upper()
qty = float(pos.get("positionAmt", 0) or 0)
entry = float(pos.get("avgPrice", 0) or 0)
mark = float(pos.get("markPrice", 0) or 0)
upnl = float(pos.get("unrealizedProfit", 0) or 0)
notional = float(pos.get("positionValue", 0) or 0)
lev = float(pos.get("leverage", 0) or 0)
out.append({
"asset": asset,
"side": side,
"quantity": abs(qty),
"entry_price": entry,
"current_price": mark,
"unrealized_pnl": upnl,
"notional": notional,
"leverage": lev,
"direction": 1 if side == "LONG" else -1,
"trade_id": str(pos.get("positionId") or symbol or asset),
"source": "bingx_account_events",
})
return out
def _account_event_fills(row: dict | None, limit: int = 30) -> list[dict]:
payload = _account_event_payload(row)
fills = payload.get("fills") if isinstance(payload, dict) else []
if not isinstance(fills, list):
return []
out: list[dict] = []
for item in fills[-limit:]:
if not isinstance(item, dict):
continue
fill_row = item.get("row") if isinstance(item.get("row"), dict) else item
friction = item.get("friction") if isinstance(item.get("friction"), dict) else fill_row.get("friction", {})
symbol = str(fill_row.get("symbol") or fill_row.get("asset") or item.get("asset") or "?")
asset = symbol.replace("-", "")
side = str(fill_row.get("side") or item.get("side") or "?").upper()
px = float(fill_row.get("avgPrice", fill_row.get("price", 0)) or 0)
qty = float(fill_row.get("executedQty", fill_row.get("quantity", 0)) or 0)
trade_key = str(item.get("_trade_key") or fill_row.get("orderId") or fill_row.get("clientOrderId") or asset)
pnl = float(item.get("pnl", fill_row.get("pnl", 0)) or 0)
out.append({
"trade_id": trade_key,
"ts": str(row.get("ts") or ""),
"asset": asset,
"side": side,
"entry_price": px,
"exit_price": px,
"quantity": qty,
"net_pnl": pnl,
"pnl_pct": float(fill_row.get("pnlPct", item.get("pnl_pct", 0)) or item.get("pnl_pct", 0) or 0),
"reason": str(fill_row.get("status") or item.get("status") or "FILL"),
"bars_held": item.get("bars_held", 0),
"source": "bingx_account_events",
"liquidity_side": friction.get("liquidity_side"),
"fill_quality_score": friction.get("fill_quality_score"),
})
return out
def _account_event_fill_count(row: dict | None) -> int:
payload = _account_event_payload(row)
fills = payload.get("fills") if isinstance(payload, dict) else []
if not isinstance(fills, list):
return 0
return sum(1 for item in fills if isinstance(item, dict))
def _fetch_recent_account_fills_from_ch(limit: int = 30) -> list[dict]:
rows = _ch_query(
f"""
SELECT
ts,
event_type,
strategy,
notes
FROM {PINK_CH_DB}.account_events
WHERE strategy = '{PINK_STRATEGY}'
ORDER BY ts DESC
LIMIT {int(max(limit, 1))}
"""
)
if not rows:
return []
fills: list[dict] = []
for row in rows:
fills.extend(_account_event_fills(row, limit=limit))
if len(fills) >= limit:
break
return fills[:limit]
def _overlay_account_event_state(eng: dict) -> tuple[dict, dict | None]:
"""Merge the live BingX account snapshot into the HZ engine snapshot.
PINK's HZ `engine_snapshot` can lag the exchange ledger. When BingX
account events have a fresher view of open positions, use them for display
and gating while preserving the rest of the engine snapshot.
"""
merged = dict(eng or {})
account_row = _fetch_latest_account_event_from_ch()
live_positions = _account_event_positions(account_row)
if account_row:
capital = float(account_row.get("capital", merged.get("capital", 0)) or merged.get("capital", 0) or 0)
merged["capital"] = capital
merged["account_capital"] = capital
merged["portfolio_capital"] = capital
merged["ledger_authority"] = "exchange"
merged["current_open_notional"] = float(
account_row.get("current_open_notional", merged.get("current_open_notional", 0)) or 0
)
merged["current_account_leverage"] = float(
account_row.get("current_account_leverage", merged.get("current_account_leverage", 0)) or 0
)
return merged, account_row
def _fetch_closed_trades_from_ch(limit: int = 30) -> list[dict]:
rows = _ch_query(
f"""
SELECT
ts,
trade_id,
asset,
side,
entry_price,
exit_price,
quantity,
pnl,
pnl_pct,
exit_reason,
leverage,
bars_held,
scan_uuid,
capital_before,
capital_after
FROM {PINK_CH_DB}.trade_events
WHERE strategy = '{PINK_STRATEGY}'
ORDER BY ts DESC
LIMIT {int(limit)}
"""
)
return rows
def _fetch_closed_trades_from_log(limit: int = 30) -> list[dict]:
trades = _last_n_trades(limit)
for row in trades:
row.setdefault("source", "log")
return trades
def _augment_open_positions(open_rows: list[dict], eng_open: list[dict]) -> list[dict]:
if not open_rows:
return []
supplemental = {}
for row in eng_open or []:
supplemental[_position_key(row)] = row
asset = str(row.get("asset") or "").strip()
if asset:
supplemental.setdefault(("asset", asset), row)
merged = []
for row in open_rows:
out = dict(row)
extra = supplemental.get(_position_key(row))
if extra is None:
asset = str(row.get("asset") or "").strip()
if asset:
extra = supplemental.get(("asset", asset))
if extra:
for key in ("side", "unrealized_pnl", "current_price", "entry_price", "quantity", "notional", "leverage"):
if key in extra and (key not in out or out.get(key) in (None, "", 0, 0.0)):
out[key] = extra.get(key)
if "trade_id" not in out or not out.get("trade_id"):
if extra.get("trade_id"):
out["trade_id"] = extra.get("trade_id")
merged.append(out)
return merged
def _mark_price_from_obf(asset: str, obf: dict | None) -> float:
if not asset or not isinstance(obf, dict):
return 0.0
row = obf.get(asset) or (obf.get("assets") or {}).get(asset) or {}
if not row and "-" not in asset:
row = obf.get(_bingx_venue_symbol(asset)) or (obf.get("assets") or {}).get(_bingx_venue_symbol(asset)) or {}
try:
bid = float(row.get("best_bid", 0) or 0)
ask = float(row.get("best_ask", 0) or 0)
if bid > 0 and ask > 0:
return (bid + ask) / 2
if bid > 0:
return bid
if ask > 0:
return ask
except Exception:
return 0.0
return 0.0
def _derive_open_position_pnl(row: dict, obf: dict | None) -> tuple[float, float]:
entry = float(row.get("entry_price", 0) or 0)
current = float(row.get("current_price", 0) or 0)
if current <= 0:
current = _mark_price_from_obf(str(row.get("asset") or ""), obf)
qty = float(row.get("quantity", 0) or 0)
notional = float(row.get("notional", 0) or 0)
if qty <= 0 and entry > 0 and notional > 0:
qty = notional / entry
side = str(row.get("side") or "").upper()
direction = -1 if side == "SHORT" or int(row.get("direction", -1) or -1) == -1 else 1
upnl = float(row.get("unrealized_pnl", 0) or 0)
if upnl == 0 and entry > 0 and current > 0 and qty > 0:
upnl = ((entry - current) * qty) if direction == -1 else ((current - entry) * qty)
return upnl, current
def _trade_panel_rows(eng, obf: dict | None, limit: int = 30) -> tuple[list[dict], list[dict], list[dict], str]:
now = time.time()
cached = _TRADE_PANEL_CACHE
if now - float(cached.get("ts", 0.0) or 0.0) < 0.75:
return (
list(cached.get("open", [])),
list(cached.get("closed", [])),
list(cached.get("fills", [])),
str(cached.get("source", "cache")),
)
account_row = _fetch_latest_account_event_from_ch()
account_open_rows = _account_event_positions(account_row)
open_rows = _fetch_open_positions_from_ch(limit=5)
closed_rows = _fetch_closed_trades_from_ch(limit=limit)
source = "clickhouse"
eng_open = []
try:
eng_open = eng.get("open_positions") or []
except Exception:
eng_open = []
if account_open_rows:
# BingX account events reflect the live venue state even if HZ is stale.
open_rows = _augment_open_positions(account_open_rows, eng_open)
source = "bingx_account_events"
elif open_rows:
open_rows = _augment_open_positions(open_rows, eng_open)
for row in open_rows:
upnl, current = _derive_open_position_pnl(row, obf)
if current > 0 and not row.get("current_price"):
row["current_price"] = current
if upnl != 0 or not row.get("unrealized_pnl"):
row["unrealized_pnl"] = upnl
else:
# Preserve the live HZ snapshot if CH is briefly unavailable.
open_rows = list(eng_open or [])
source = "hazelcast"
if not closed_rows:
account_fills = _fetch_recent_account_fills_from_ch(limit)
if account_fills:
source = "bingx_account_events"
else:
log_rows = _fetch_closed_trades_from_log(limit)
if log_rows:
closed_rows = log_rows
source = "clickhouse+log" if source == "clickhouse" else "hazelcast+log"
else:
account_fills = []
cached["ts"] = now
cached["open"] = list(open_rows)
cached["closed"] = list(closed_rows)
cached["fills"] = list(account_fills)
cached["source"] = source
return open_rows, closed_rows, list(account_fills), source
def _summarize_closed_trades(rows: list[dict]) -> dict:
pnl_vals = []
notional_vals = []
wins = 0
losses = 0
for row in rows or []:
pnl = float(row.get("net_pnl", row.get("pnl", 0)) or 0)
notional = float(row.get("notional", 0) or 0)
pnl_vals.append(pnl)
notional_vals.append(notional)
if pnl >= 0:
wins += 1
else:
losses += 1
total_pnl = sum(pnl_vals)
total_notional = sum(notional_vals)
total_pct = (total_pnl / total_notional * 100) if total_notional else 0.0
return {
"n": len(rows or []),
"wins": wins,
"losses": losses,
"pnl": total_pnl,
"pct": total_pct,
}
def _closed_trade_display_pnl(row: dict) -> float:
"""Choose a sane USD PnL for closed-trade display.
Prefer a signed mark-to-mark/close calculation from entry/exit/quantity.
Some log rows carry a near-zero or inconsistent `net_pnl`, and `pnl_pct`
may be magnitude-only, so the direct price delta is the most reliable
display source.
"""
net_pnl = float(row.get("net_pnl", row.get("pnl", 0)) or 0)
entry = float(row.get("entry_price", 0) or 0)
exit_ = float(row.get("exit_price", 0) or 0)
qty = float(row.get("quantity", 0) or 0)
side = str(row.get("side") or "").upper()
direction = int(row.get("direction", -1) or -1)
if entry > 0 and exit_ > 0 and qty > 0:
if side == "LONG" or direction == 1:
return (exit_ - entry) * qty
return (entry - exit_) * qty
notional = float(row.get("notional", 0) or 0)
pnl_pct = float(row.get("pnl_pct", 0) or 0)
expected = pnl_pct * notional if notional > 0 else 0.0
if expected != 0 and abs(net_pnl) < max(1e-6, abs(expected) * 0.05):
return expected
return net_pnl
def _sys_health_sensor_rows(mh: dict) -> list[tuple[str, float]]:
sensors = []
for key, value in (mh or {}).items():
if not str(key).startswith("m"):
continue
try:
idx = int(str(key)[1:].split("_", 1)[0])
except Exception:
continue
try:
sensors.append((idx, str(key), float(value)))
except Exception:
continue
sensors.sort(key=lambda item: (item[0], item[1]))
return [(name, val) for _, name, val in sensors]
def _format_two_column_sensor_lines(rows: list[tuple[str, float]]) -> list[str]:
if not rows:
return []
out: list[str] = []
pairs = [rows[i:i + 2] for i in range(0, len(rows), 2)]
for pair in pairs:
formatted = []
for name, value in pair:
color = GREEN if value >= 0.9 else (YELLOW if value >= 0.5 else RED)
formatted.append(f"{color}{name}:{value:.3f}{RST}")
if len(formatted) == 2:
out.append(f" {formatted[0]} {formatted[1]}")
else:
out.append(f" {formatted[0]}")
return out
def _last_n_trades(n=5):
"""Parse last N completed trades from supervisor log. Returns list of dicts."""
trader_log = _resolve_trader_log()
if trader_log is None:
return []
try:
lines = trader_log.read_text(errors="replace").splitlines()[-4000:]
except Exception:
return []
entries = {}
trades = []
for line in lines:
m = _RE_ENTRY.search(line)
if m:
try:
d = _parse_log_dict(m.group(2))
entries[d["trade_id"]] = {"ts": m.group(1), **d}
except Exception:
pass
m = _RE_EXIT.search(line)
if m:
try:
d = _parse_log_dict(m.group(2))
tid = d.get("trade_id")
if not tid:
continue
e = entries.pop(tid, {})
trades.append({
"trade_id": tid,
"ts": e.get("ts", m.group(1)),
"asset": e.get("asset", d.get("asset", "?")),
"entry_price": e.get("entry_price", d.get("entry_price", 0)),
"leverage": e.get("leverage", d.get("leverage", 0)),
"notional": e.get("notional", d.get("notional", 0)),
"direction": e.get("direction", d.get("direction", 0)),
"exit_ts": m.group(1),
"reason": _normalize_exit_reason(d.get("reason", "?")),
"pnl_pct": d.get("pnl_pct", 0),
"net_pnl": d.get("net_pnl", 0),
"bars_held": d.get("bars_held", 0),
})
except Exception:
pass
return trades[-n:]
CLEAR = "\033[2J\033[H"
BOLD = "\033[1m"; DIM = "\033[2m"; RST = "\033[0m"
GREEN = "\033[32m"; YELLOW = "\033[33m"; RED = "\033[31m"; CYAN = "\033[36m"
ORANGE = "\033[38;5;208m"
PC = {"APEX": GREEN, "STALKER": YELLOW, "TURTLE": ORANGE, "HIBERNATE": RED}
SC = {"GREEN": GREEN, "DEGRADED": YELLOW, "CRITICAL": ORANGE, "DEAD": RED}
# Thresholds from nautilus_event_trader.py
VEL_DIV_THRESHOLD = -0.020 # signal fires when vel_div < this
VEL_DIV_EXTREME = -0.050 # extreme bearish
VEL_DIV_WARN = -0.010 # approaching threshold (yellow)
VEL_DIV_CLOSE = -0.015 # nearly there (orange→yellow)
VOL_P60 = _float_env("DOLPHIN_PINK_VOL_P60_THRESHOLD", 0.00008000)
BTC_VOL_WINDOW = 50 # bars used for vol calc
FIXED_TP_PCT = _float_env("DOLPHIN_FIXED_TP_PCT", 0.0020) # PINK TP target (0.20%)
MAX_HOLD_BARS = 250 # PINK max hold bars
TRADER_HB_STALE_S = 30.0
TRADER_ENGINE_STALE_S = 90.0
START_CAP = None
CAP_PEAK = None
_EXIT_TRACKER: dict = {} # (asset, entry_price) → accumulated V7 comparison state
_HOTKEY_FEEDBACK: deque = deque(maxlen=5) # recent hotkey results for display
# ── BingX signing (sync, for hotkey-driven REST calls) ────────────────────────
def _bingx_sign(params: dict, secret_key: str) -> dict:
ordered = {k: v for k, v in sorted(params.items()) if v is not None and v != ""}
query = urllib.parse.urlencode(ordered)
sig = hmac.new(secret_key.encode(), query.encode(), hashlib.sha256).hexdigest()
return {**ordered, "signature": sig}
_BINGX_API_KEY = os.environ.get("BINGX_API_KEY", "")
_BINGX_SECRET_KEY = os.environ.get("BINGX_SECRET_KEY", "")
_BINGX_RECV_WINDOW = 5000
def _bingx_env_base_url() -> str:
env = os.environ.get("DOLPHIN_BINGX_ENV", "VST").strip().upper()
if env == "LIVE":
return "https://open-api.bingx.com"
return "https://open-api-vst.bingx.com"
def _bingx_rest_post(path: str, params: dict) -> dict:
"""Synchronous signed POST to BingX. Returns parsed JSON or raises."""
base = _bingx_env_base_url()
signed = _bingx_sign({
**params,
"timestamp": int(time.time() * 1000),
"recvWindow": _BINGX_RECV_WINDOW,
}, _BINGX_SECRET_KEY)
url = f"{base}{path}?{urllib.parse.urlencode(signed)}"
req = urllib.request.Request(url, method="POST")
req.add_header("X-BX-APIKEY", _BINGX_API_KEY)
req.add_header("Content-Type", "application/x-www-form-urlencoded")
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read())
def _bingx_venue_symbol(asset: str) -> str:
if "-" in asset:
return asset
if asset.endswith("USDT"):
return f"{asset[:-4]}-USDT"
return asset
# ── DITAv2 kernel state (does NOT use Hazelcast) ──────────────────────────────
_KERNEL_SNAP_PATH = Path("/tmp/.pink_kernel_state.json")
_DITAV2_ACCOUNT_CACHE: dict = {"ts": 0.0, "data": None}
def _read_ditav2_kernel_snapshot() -> dict:
"""Read the last saved DITAv2 kernel state from disk (non-blocking)."""
try:
if not _KERNEL_SNAP_PATH.exists():
return {}
raw = _KERNEL_SNAP_PATH.read_text(encoding="utf-8")
data = json.loads(raw)
data["_snap_mtime"] = _KERNEL_SNAP_PATH.stat().st_mtime
return data
except Exception:
return {}
def _bingx_rest_get(path: str, params: dict) -> dict:
"""Synchronous signed GET to BingX. Returns parsed JSON or {}."""
try:
base = _bingx_env_base_url()
signed = _bingx_sign(
{**params, "timestamp": int(time.time() * 1000), "recvWindow": _BINGX_RECV_WINDOW},
_BINGX_SECRET_KEY,
)
url = f"{base}{path}?{urllib.parse.urlencode(signed)}"
req = urllib.request.Request(url, method="GET")
req.add_header("X-BX-APIKEY", _BINGX_API_KEY)
with urllib.request.urlopen(req, timeout=5) as resp:
return json.loads(resp.read())
except Exception:
return {}
def _fetch_ditav2_bingx_account() -> dict:
"""Cache-throttled live BingX VST balance query (max 1 call / 5 s)."""
now = time.time()
cached = _DITAV2_ACCOUNT_CACHE
if now - float(cached.get("ts", 0.0)) < 5.0:
return cached.get("data") or {}
if not _BINGX_API_KEY or not _BINGX_SECRET_KEY:
cached["ts"] = now
cached["data"] = {}
return {}
raw = _bingx_rest_get("/openApi/swap/v2/user/balance", {})
bal = (raw.get("data") or {}).get("balance") or {}
cached["ts"] = now
cached["data"] = bal
return bal
def _render_ditav2_section() -> str:
"""Render the DITAv2 PINK kernel + BingX account section."""
snap = _read_ditav2_kernel_snapshot()
bal = _fetch_ditav2_bingx_account()
L = []
L.append(f"{BOLD}{CYAN}━━ DITAv2 PINK [kernel + BingX VST]{RST}")
if not snap:
L.append(f" {DIM}kernel snapshot not found at {_KERNEL_SNAP_PATH}{RST}")
L.append(f" {DIM}(PINK not started, or no clean shutdown after first trade){RST}")
else:
snap_mtime = snap.get("_snap_mtime", 0)
snap_age = time.time() - snap_mtime if snap_mtime else None
snap_age_s = _age_seconds(snap_age) if snap_age is not None else "?"
snap_ts_ms = snap.get("snapshot_ts_ms", 0)
snap_dt = (
datetime.fromtimestamp(snap_ts_ms / 1000.0, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
if snap_ts_ms
else "?"
)
frozen = snap.get("capital_frozen", False)
frozen_col = RED if frozen else GREEN
frozen_s = f"{RED}FROZEN{RST}" if frozen else f"{GREEN}OK{RST}"
acc = snap.get("account") or {}
capital = float(acc.get("capital") or acc.get("equity") or 0.0)
equity = float(acc.get("equity") or capital)
avail = float(acc.get("available_capital") or capital)
open_pos = int(acc.get("open_positions") or 0)
trade_seq = int(acc.get("trade_seq") or 0)
realized = float(acc.get("realized_pnl_total") or 0.0)
fees = float(acc.get("fee_total") or 0.0)
fee_cal = snap.get("fee_calibration") or {}
fc_status = str(fee_cal.get("calibration_status") or "")
fc_col = GREEN if fc_status == "OK" else (YELLOW if fc_status == "WARN" else RED)
fc_ratio = float(fee_cal.get("calibration_ratio") or 1.0)
fc_taker = float(fee_cal.get("taker_rate") or 0.0005)
fc_maker = float(fee_cal.get("maker_rate") or 0.0002)
slots = snap.get("slots") or []
L.append(
f" {BOLD}KERNEL{RST} snap:{snap_dt} age:{snap_age_s} frozen:{frozen_s}"
)
L.append(
f" capital:${capital:,.2f} equity:${equity:,.2f} avail:${avail:,.2f}"
f" open:{open_pos} trades:{trade_seq}"
)
rpnl_col = GREEN if realized >= 0 else RED
L.append(
f" realized:{rpnl_col}{realized:+.4f}{RST} fees:{fees:+.4f}"
f" fee_cal: taker:{fc_taker*100:.3f}% maker:{fc_maker*100:.3f}%"
f" ratio:{fc_ratio:.4f} {fc_col}[{fc_status}]{RST}"
)
for slot in slots:
fsm = str(slot.get("fsm_state") or "IDLE")
asset = str(slot.get("asset") or "")
side = str(slot.get("side") or "")
size = float(slot.get("size") or 0.0)
ep = float(slot.get("entry_price") or 0.0)
rpnl = float(slot.get("realized_pnl") or 0.0)
upnl = float(slot.get("unrealized_pnl") or 0.0)
sid = int(slot.get("slot_id") or 0)
fsm_col = (
GREEN if fsm == "POSITION_OPEN"
else YELLOW if fsm in ("EXIT_REQUESTED", "EXIT_SENT", "EXIT_WORKING", "ENTRY_REQUESTED", "ENTRY_SENT", "ENTRY_WORKING")
else RED if fsm in ("ERROR", "CLOSED")
else DIM
)
pnl_col = GREEN if (rpnl + upnl) >= 0 else RED
L.append(
f" slot[{sid}] {fsm_col}{fsm:<20}{RST} {asset} {side}"
f" size:{size:.4f} ep:{ep:.4g}"
f" pnl:{pnl_col}{(rpnl+upnl):+.4f}{RST}"
)
# ── Live BingX account ────────────────────────────────────────────────
env_s = os.environ.get("DOLPHIN_BINGX_ENV", "VST")
if not bal:
creds_ok = bool(_BINGX_API_KEY and _BINGX_SECRET_KEY)
reason = "no credentials" if not creds_ok else "query failed / no data"
L.append(f" {BOLD}BINGX [{env_s}]{RST} {DIM}{reason}{RST}")
else:
b_wallet = float(bal.get("balance") or bal.get("walletBalance") or 0.0)
b_equity = float(bal.get("equity") or b_wallet)
b_avail = float(bal.get("availableMargin") or b_wallet)
b_used = float(bal.get("usedMargin") or 0.0)
b_upnl = float(bal.get("unrealizedProfit") or 0.0)
b_real = float(bal.get("realisedProfit") or bal.get("realizedProfit") or 0.0)
upnl_col = GREEN if b_upnl >= 0 else RED
real_col = GREEN if b_real >= 0 else RED
L.append(
f" {BOLD}BINGX [{env_s}]{RST}"
f" wallet:${b_wallet:,.2f} equity:${b_equity:,.2f}"
f" avail:${b_avail:,.2f} used:${b_used:,.2f}"
f" upnl:{upnl_col}{b_upnl:+.2f}{RST}"
f" realized:{real_col}{b_real:+.2f}{RST}"
)
L.append(f"{BOLD}{CYAN}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━{RST}")
return "\n".join(L)
# ── Hotkey: CH audit ──────────────────────────────────────────────────────────
def _hotkey_audit(action: str, request: dict, result: str, effect: dict | None = None):
ch_put({
"ts": int(time.time() * 1000),
"hotkey": action,
"request_json": json.dumps(request, default=str),
"result": result,
"effect_json": json.dumps(effect or {}, default=str),
}, table="hotkey_audit")
# ── Hotkey actions ────────────────────────────────────────────────────────────
def _hotkey_reduce_position(fraction: float, hz) -> str:
"""Submit internal PINK retraction command (fractional exit) via HZ control plane."""
if fraction <= 0 or fraction > 1:
return f"invalid fraction {fraction}"
eng = _get(hz, PINK_STATE_MAP, "engine_snapshot")
positions = eng.get("open_positions") or []
if not positions:
_hotkey_audit(f"reduce_{fraction:.0%}", {"fraction": fraction}, "NO_POSITION")
return "no open position"
pos = positions[0]
asset = pos.get("asset", "?")
side = str(pos.get("side", "?")).upper()
trade_id = str(pos.get("trade_id", "") or "").strip()
qty = float(pos.get("quantity", 0) or 0)
entry = float(pos.get("entry_price", 0) or 0)
chain_root_trade_id = str(pos.get("chain_root_trade_id", trade_id) or trade_id).strip()
chain_head_leg_id = str(pos.get("chain_head_leg_id", f"{trade_id}:open") or f"{trade_id}:open").strip()
chain_prev_leg_id = str(pos.get("chain_prev_leg_id", "") or "").strip()
chain_seq = int(pos.get("chain_seq", pos.get("retraction_legs", 0)) or 0)
chain_token = str(pos.get("chain_token", "") or "").strip()
if not trade_id:
_hotkey_audit(f"reduce_{fraction:.0%}", {"fraction": fraction}, "NO_TRADE_ID")
return "no trade_id"
if not chain_root_trade_id or not chain_head_leg_id or not chain_token:
_hotkey_audit(
f"reduce_{fraction:.0%}",
{"fraction": fraction, "trade_id": trade_id, "chain_root_trade_id": chain_root_trade_id, "chain_head_leg_id": chain_head_leg_id, "chain_prev_leg_id": chain_prev_leg_id, "chain_seq": chain_seq},
"NO_CHAIN_LINK",
)
return "no chain link"
if qty <= 0:
_hotkey_audit(f"reduce_{fraction:.0%}", {"fraction": fraction}, "ZERO_QTY")
return "zero quantity"
command_id = f"hk-retract-{uuid.uuid4().hex[:16]}"
request = {
"action": f"reduce_{fraction:.0%}",
"asset": asset,
"side": side,
"requested_fraction": float(fraction),
"entry_price": entry,
"command_id": command_id,
"trade_id": trade_id,
"chain_root_trade_id": chain_root_trade_id,
"chain_head_leg_id": chain_head_leg_id,
"chain_prev_leg_id": chain_prev_leg_id,
"chain_seq": chain_seq,
}
try:
control_map = hz.get_map("DOLPHIN_CONTROL_PLANE").blocking()
key = PINK_RUNTIME_COMMAND_KEY
raw = control_map.get(key)
queue = []
if raw:
try:
queue = json.loads(raw) if isinstance(raw, str) else list(raw)
except Exception:
queue = []
if not isinstance(queue, list):
queue = []
queue.append({
"command_id": command_id,
"trade_id": trade_id,
"action": "RETRACT",
"fraction": float(fraction),
"reason": "HOTKEY_RETRACT",
"source": "tui_hotkey",
"ts": float(time.time()),
"asset": asset,
"side": side,
"chain_root_trade_id": chain_root_trade_id,
"chain_head_leg_id": chain_head_leg_id,
"chain_prev_leg_id": chain_prev_leg_id,
"chain_seq": chain_seq,
"chain_token": chain_token,
})
# Keep queue bounded in-case trader is down.
queue = queue[-200:]
control_map.put(key, json.dumps(queue))
msg = f"{side} {asset} retract {fraction:.0%}: cmd={command_id}"
_hotkey_audit(f"reduce_{fraction:.0%}", request, "OK", {
"command_id": command_id, "fraction": float(fraction),
"trade_id": trade_id,
"chain_root_trade_id": chain_root_trade_id,
"chain_head_leg_id": chain_head_leg_id,
"chain_seq": chain_seq,
})
return msg
except Exception as e:
_hotkey_audit(f"reduce_{fraction:.0%}", request, f"ERROR: {e}")
return f"ERROR: {e}"
def _hotkey_close_all(hz) -> str:
return _hotkey_reduce_position(1.0, hz)
def _hotkey_set_posture(posture: str, hz) -> str:
"""Switch posture via DOLPHIN_SAFETY when explicitly enabled."""
if posture not in ("APEX", "STALKER", "TURTLE", "HIBERNATE"):
return f"invalid posture {posture}"
if not PINK_ALLOW_GLOBAL_POSTURE_HOTKEYS:
msg = "posture hotkeys disabled (set DOLPHIN_PINK_TUI_ENABLE_GLOBAL_POSTURE=1 to enable)"
_hotkey_audit(f"set_posture_{posture}", {"posture": posture}, "DISABLED")
return msg
safe_map = hz.get_map("DOLPHIN_SAFETY").blocking()
raw = safe_map.get("latest")
current = {}
if raw:
try:
current = json.loads(raw) if isinstance(raw, str) else raw
except Exception:
current = {}
prev_posture = current.get("posture", "?")
current["posture"] = posture
current["posture_ts"] = time.time()
current["posture_source"] = "hotkey_tui"
safe_map.put("latest", json.dumps(current))
msg = f"posture {prev_posture}{posture}"
_hotkey_audit(f"set_posture_{posture}", {
"posture": posture, "prev_posture": prev_posture,
}, "OK", {"new_posture": posture})
return msg
# ── Non-blocking stdin reader ─────────────────────────────────────────────────
def _read_hotkey(timeout_s: float = 0.05) -> str | None:
"""Read a single keypress from stdin without blocking. Returns key string or None."""
if not sys.stdin.isatty():
return None
dr, _, _ = select.select([sys.stdin], [], [], timeout_s)
if not dr:
return None
ch = sys.stdin.read(1)
if ch == "\x1b":
if select.select([sys.stdin], [], [], 0.02)[0]:
seq = ch + sys.stdin.read(2)
if len(seq) == 3 and seq[1] == "[":
fn = {"A": None, "B": None, "C": None, "D": None,
"P": "F1", "Q": "F2", "R": "F3", "S": "F4"}
return fn.get(seq[2])
return None
return None
if ch == "\x03":
raise KeyboardInterrupt
if ch == "\x04":
raise KeyboardInterrupt
return ch if ch.isprintable() or ch in ("\n", "\r", "\t") else None
HOTKEY_MAP = {
"1": ("REDUCE 50%", lambda hz: _hotkey_reduce_position(0.50, hz)),
"2": ("REDUCE 75%", lambda hz: _hotkey_reduce_position(0.75, hz)),
"3": ("CLOSE ALL", lambda hz: _hotkey_close_all(hz)),
"s": ("→ STALKER", lambda hz: _hotkey_set_posture("STALKER", hz)),
"a": ("→ APEX", lambda hz: _hotkey_set_posture("APEX", hz)),
"t": ("→ TURTLE", lambda hz: _hotkey_set_posture("TURTLE", hz)),
"h": ("HELP", None),
}
def _age(ts):
if not ts: return "?"
s = time.time() - ts
if s < 0: return "0s"
if s < 60: return f"{s:.0f}s"
if s < 3600: return f"{s/60:.0f}m"
return f"{s/3600:.1f}h"
def _age_seconds(s):
if s is None:
return "?"
try:
s = float(s)
except Exception:
return "?"
if s < 0:
return "0s"
if s < 60:
return f"{s:.0f}s"
if s < 3600:
return f"{s/60:.0f}m"
return f"{s/3600:.1f}h"
def _iso_age_seconds(raw):
if not raw:
return None
if isinstance(raw, (int, float)):
try:
return max(0.0, time.time() - float(raw))
except Exception:
return None
if isinstance(raw, str):
try:
dt = datetime.fromisoformat(raw.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return max(0.0, time.time() - dt.timestamp())
except Exception:
return None
return None
def _engine_snapshot_age_seconds(eng):
if not isinstance(eng, dict):
return None
for key in ("timestamp", "updated_at", "last_update", "ts"):
age = _iso_age_seconds(eng.get(key))
if age is not None:
return age
return None
def _is_trader_live(hb, eng):
hb_age = _iso_age_seconds(hb.get("ts")) if isinstance(hb, dict) else None
hb_ok = hb_age is not None and hb_age < TRADER_HB_STALE_S
eng_age = _engine_snapshot_age_seconds(eng)
eng_ok = eng_age is not None and eng_age < TRADER_ENGINE_STALE_S
live = hb_ok or eng_ok
src = []
if hb_ok:
src.append("hb")
if eng_ok:
src.append("eng")
if not src:
src.append("stale")
return live, "+".join(src), hb_age, eng_age
def _bar(v, w=20):
v = max(0.0, min(1.0, v))
return "" * round(v * w) + "" * (w - round(v * w))
def _get(hz, map_name, key):
try:
raw = hz.get_map(map_name).blocking().get(key)
if raw is None:
return {}
if isinstance(raw, (dict, list)):
return raw
if isinstance(raw, (bytes, bytearray)):
try:
raw = raw.decode("utf-8", errors="replace")
except Exception:
return {}
if isinstance(raw, str):
text = raw.strip()
if not text:
return {}
try:
return json.loads(text)
except Exception:
return {}
return {}
except Exception:
return {}
def _int_or_default(value, default: int = 0) -> int:
"""Coerce a possibly-missing snapshot value to a stable integer."""
try:
if value is None or value == "":
return int(default)
return int(value)
except Exception:
try:
return int(float(value))
except Exception:
return int(default)
# ── Gear items ────────────────────────────────────────────────────────────────
# Each returns (label, color, value_str)
def _item(label, color, val=""):
dot = f"{color}{RST}"
v = f":{val}" if val else ""
return f"{dot}{DIM}{label}{v}{RST}"
def _vel_item(vel_div):
"""vel_div colored by distance to threshold (-0.02)."""
v = f"{vel_div:+.4f}"
if vel_div <= VEL_DIV_EXTREME:
return _item("vel_div", GREEN, v) # extremely bearish — great
elif vel_div <= VEL_DIV_THRESHOLD:
return _item("vel_div", GREEN, v) # past threshold — signal green
elif vel_div <= VEL_DIV_CLOSE:
return _item("vel_div", YELLOW, v) # -0.015 to -0.020 — close
elif vel_div <= VEL_DIV_WARN:
return _item("vel_div", ORANGE, v) # -0.010 to -0.015 — approaching
elif vel_div < 0:
return _item("vel_div", RED, v) # negative but far
else:
return _item("vel_div", RED, v) # positive — not bearish
def signal_fired(vel_div, vol_ok, posture, acb_ready, exf_ok, halt):
"""True if ALL signal preconditions are green."""
return (
vel_div <= VEL_DIV_THRESHOLD
and vol_ok
and posture not in ("HIBERNATE", "TURTLE")
and acb_ready
and exf_ok
and not halt
)
def trade_can_execute(open_count, lev, abs_cap, daily_loss_ok, boost):
return (
open_count == 0 # no open position already
and lev < abs_cap # leverage headroom
and daily_loss_ok
and boost > 0
)
OB_IMBALANCE_BIAS = -0.09 # from engine config: ob_imbalance_bias
def _best_fill_candidate(obf_universe):
"""Pick best SHORT candidate from OBF universe.
Criteria: negative imbalance (bearish pressure) + high fill_probability + low spread.
Returns (symbol, asset_dict) or (None, {}).
"""
candidates = []
for k, v in obf_universe.items():
if not isinstance(v, dict) or "fill_probability" not in v:
continue
candidates.append((k, v))
if not candidates:
return None, {}
# Score: fill_prob * (1 + bearish_imbalance_bonus) / (1 + spread_bps/10)
def score(item):
sym, a = item
imb = float(a.get("imbalance", 0))
fp = float(a.get("fill_probability", 0))
sp = float(a.get("spread_bps", 99))
dq = float(a.get("depth_quality", 0))
# Bearish bias: reward negative imbalance, penalise positive
imb_bonus = max(0.0, -imb) # 0..1 for imbalance in [-1,0]
return fp * (1 + imb_bonus) * dq / max(0.1, sp)
candidates.sort(key=score, reverse=True)
return candidates[0]
def _update_exit_tracker(positions, eng, live_price, ob_imbalance):
"""Accumulate MAE/MFE state for V7 comparison from live OBF prices."""
global _EXIT_TRACKER
if not positions or live_price <= 0:
_EXIT_TRACKER.clear()
return None
pos = positions[0]
asset = pos.get("asset", "?")
ep = float(pos.get("entry_price", 0) or 0)
side = pos.get("side", "SHORT")
if ep <= 0:
return None
key = (asset, ep)
bar_idx = int(eng.get("bar_idx", 0) or 0)
if key not in _EXIT_TRACKER:
_EXIT_TRACKER.clear()
_EXIT_TRACKER[key] = {
"entry_bar": bar_idx, "first_seen": time.time(),
"peak_adverse": 0.0, "peak_favorable": 0.0,
"prev_mae": 0.0, "prev_mfe": 0.0,
"mae_velocity": 0.0, "mfe_velocity": 0.0,
"prices": deque(maxlen=60),
}
t = _EXIT_TRACKER[key]
t["prices"].append(live_price)
t["notional"] = float(pos.get("notional", 0) or 0)
t["unrealized_pnl"] = float(pos.get("unrealized_pnl", 0) or 0)
if side == "SHORT":
pnl = (ep - live_price) / ep
mae = max(0.0, (live_price - ep) / ep)
mfe = max(0.0, (ep - live_price) / ep)
else:
pnl = (live_price - ep) / ep
mae = max(0.0, (ep - live_price) / ep)
mfe = max(0.0, (live_price - ep) / ep)
t["peak_adverse"] = max(t["peak_adverse"], mae)
t["peak_favorable"] = max(t["peak_favorable"], mfe)
t["mae_velocity"] = mae - t["prev_mae"]
t["mfe_velocity"] = mfe - t["prev_mfe"]
t["prev_mae"] = mae
t["prev_mfe"] = mfe
t["bars_held"] = max(0, bar_idx - t["entry_bar"])
t["pnl_pct"] = pnl
t["live_price"] = live_price
t["ob_imbalance"] = ob_imbalance
t["entry_price"] = ep
t["side"] = side
t["asset"] = asset
return t
def _v7_preview(t):
"""Simplified V7 decision from tracker state (MAE/MFE/time channels)."""
if not t:
return None
bh = t.get("bars_held", 0)
bf = min(1.0, bh / MAX_HOLD_BARS) if MAX_HOLD_BARS else 0
pa = t["peak_adverse"]
pf = t["peak_favorable"]
mae = t["prev_mae"]
mfe = t["prev_mfe"]
pnl = t.get("pnl_pct", 0)
# MAE risk — V7 floor thresholds (without vol-adaptive since TUI lacks full history)
mae_risk = 0.0
if pa > 0.005: mae_risk += 0.5
if pa > 0.012: mae_risk += 0.8
if pa > 0.020: mae_risk += 1.2
# MAE-B: adverse acceleration
if bh >= 3 and t["mae_velocity"] > 0 and mae > 0.003:
mae_risk += 0.6
# MAE-D: late-stage time-weighted
if mae > 0.003 and bf > 0.60:
mae_risk += (bf - 0.60) / 0.40 * 0.4
# MFE risk — convexity decay
mfe_risk = 0.0
decay = (pf - mfe) / (pf + 1e-9) if pf > 0 else 0.0
if decay > 0.35 and t["mfe_velocity"] < 0 and pf > 0.01:
mfe_risk += 1.5
if decay > 0.20:
mfe_risk += 0.3
# Exit pressure (simplified: MAE + MFE channels weighted as V7)
pressure = 2.0 * mae_risk + 2.5 * mfe_risk
if bf > 0.80 and pnl < 0:
pressure += 0.5
if bf > 0.95:
pressure += 1.0
# Decision (mirrors V7 thresholds)
if pressure > 2.0:
action = "EXIT"
reason = "V7.1_MAE_SL_VOL_NORM" if mae_risk > mfe_risk else "V7_COMPOSITE"
elif pressure > 1.0:
action = "RETRACT"
reason = "V7_RISK_DOM"
elif pressure < -0.5 and pnl > 0:
action = "EXTEND"
reason = "V7_DIR_EDGE"
else:
action = "HOLD"
reason = "\u2014"
proj_usd = pnl * t.get("notional", 0)
return {
"action": action, "reason": reason, "pressure": pressure,
"mae": pa, "mfe": pf, "mae_risk": mae_risk, "mfe_risk": mfe_risk,
"bars_held": bh, "bars_frac": bf, "pnl_pct": pnl,
"proj_usd": proj_usd,
}
def fill_row(obf_universe, acb, eng):
"""Row 3: signal → asset-pick → OBF liquidity → size → ORDER."""
f_items = []
# ── Asset picker (IRP/ARS) ─────────────────────────────────────────────
n_assets = int(obf_universe.get("_n_assets", 0) if obf_universe else 0)
n_stale = int(obf_universe.get("_n_stale", 0) if obf_universe else 0)
n_fresh = n_assets - n_stale
f_items.append(_item("universe",
GREEN if n_fresh >= 200 else (YELLOW if n_fresh >= 50 else RED),
f"{n_fresh}/{n_assets}"))
sym, ab = _best_fill_candidate(obf_universe)
if sym:
fill_p = float(ab.get("fill_probability", 0))
spread = float(ab.get("spread_bps", 99))
dq = float(ab.get("depth_quality", 0))
imb = float(ab.get("imbalance", 0))
depth = float(ab.get("depth_1pct_usd", 0))
# Best candidate asset
asset_color = GREEN if fill_p >= 0.80 else (YELLOW if fill_p >= 0.50 else RED)
f_items.append(_item("best", asset_color, sym[:6]))
# OBF: fill probability
f_items.append(_item("fill_p",
GREEN if fill_p >= 0.85 else (YELLOW if fill_p >= 0.60 else RED),
f"{fill_p:.2f}"))
# OBF: spread
f_items.append(_item("spread",
GREEN if spread <= 3 else (YELLOW if spread <= 8 else RED),
f"{spread:.1f}bps"))
# OBF: depth quality
f_items.append(_item("depth_q",
GREEN if dq >= 0.5 else (YELLOW if dq >= 0.1 else RED),
f"{dq:.2f}"))
# OBF: imbalance direction (SHORT needs bearish = negative)
imb_ok = imb < OB_IMBALANCE_BIAS # confirmed bearish pressure
f_items.append(_item("imb",
GREEN if imb_ok else
YELLOW if imb < 0 else
ORANGE if imb < 0.1 else RED,
f"{imb:+.2f}"))
# OBF: depth USD
f_items.append(_item("depth",
GREEN if depth >= 50_000 else (YELLOW if depth >= 10_000 else RED),
f"${depth/1000:.0f}k"))
else:
f_items.append(_item("OBF", RED, "no data"))
# ── Sizing — ACB boost × proxy_B prank ────────────────────────────────
# proxy_B prank not exposed in HZ snapshot; show ACB boost as sizing proxy
boost = float(acb.get("boost", 1.0) if acb else 1.0)
beta = float(acb.get("beta", 0.8) if acb else 0.8)
f_items.append(_item("acb_boost",
GREEN if boost >= 1.5 else (YELLOW if boost >= 1.0 else ORANGE),
f"×{boost:.2f}"))
f_items.append(_item("beta",
GREEN if beta >= 0.7 else (YELLOW if beta >= 0.4 else RED),
f"{beta:.2f}"))
# ── ORDER indicator ────────────────────────────────────────────────────
# Would an order fire if signal were green right now?
open_count = len(eng.get("open_positions") or [])
lev = float(eng.get("current_leverage", 0) or 0)
abs_c = float(eng.get("leverage_abs_cap", 9.0) or 9.0)
order_ready = (
sym is not None
and fill_p >= 0.60
and open_count == 0
and lev < abs_c
and boost > 0
) if sym else False
if order_ready:
f_items.append(f" {CYAN}{BOLD}◉ ORDER READY{RST}")
else:
f_items.append(f" {DIM}(order: waiting){RST}")
return " ".join(f_items)
def gear_rows(eng, safe, acb, exf, hb, obf_universe=None):
"""Return three formatted rows: SIGNAL, TRADE gates, FILL path."""
vel_div = float(eng.get("last_vel_div", 0) or 0)
vol_ok = bool(eng.get("vol_ok", False))
posture = safe.get("posture") or eng.get("posture") or "?"
halt = posture in ("HIBERNATE", "TURTLE")
acb_boost_val = float(acb.get("boost", acb.get("cut", 0)) or 0)
acb_ready = acb_boost_val > 0 # cut=0 means blocked
exf_ok_count = int(exf.get("_ok_count", 0) if exf else 0)
exf_ok = exf_ok_count >= 3
open_count = len(eng.get("open_positions") or [])
lev = float(eng.get("current_leverage", 0) or 0)
abs_cap = float(eng.get("leverage_abs_cap", 9.0) or 9.0)
trades_ex = int(eng.get("trades_executed") or 0)
hb_ts = hb.get("ts")
hb_ok = bool(hb_ts and (time.time() - hb_ts) < 30)
# ── SIGNAL ROW ────────────────────────────────────────────────────────────
# vol_ok is the MASTER GATE — listed first. When False, _try_entry is never
# called regardless of vel_div. Threshold is engine-configured runtime value.
s_items = []
# BTC vol — try to get live reading from exf or obf for display context
btc_vol_str = ""
if exf:
dvol_raw = exf.get("dvol_btc") or exf.get("dvol")
fng_raw = exf.get("fng")
if dvol_raw:
btc_vol_str = f"dV:{float(dvol_raw):.0f}"
if fng_raw:
btc_vol_str += f" FnG:{float(fng_raw):.0f}"
vol_label = f"vol_ok({btc_vol_str})"
s_items.append(_item(vol_label,
GREEN if vol_ok else RED,
"" if vol_ok else f"✗ BLOCKED"))
s_items.append(_vel_item(vel_div))
# posture gate
pc = PC.get(posture, DIM)
posture_ok = posture in ("APEX", "STALKER")
s_items.append(_item("posture",
GREEN if posture == "APEX" else (YELLOW if posture == "STALKER" else RED),
posture))
# acb_ready
s_items.append(_item("acb",
GREEN if acb_ready else (ORANGE if acb_boost_val > 0 else RED),
f"{acb_boost_val:.2f}"))
# exf_ok — external factors pipeline
s_items.append(_item("exf",
GREEN if exf_ok else (YELLOW if exf_ok_count >= 1 else RED),
f"{exf_ok_count}/5"))
# halt gate
s_items.append(_item("no_halt",
GREEN if not halt else RED,
"" if not halt else "HALT"))
# heartbeat
s_items.append(_item("hb",
GREEN if hb_ok else RED,
_age(hb_ts)))
# ALL GREEN → fire indicator
all_sig = signal_fired(vel_div, vol_ok, posture, acb_ready, exf_ok, halt)
if all_sig:
s_items.append(f" {GREEN}{BOLD}◉ SIGNAL{RST}")
# ── TRADE ROW ─────────────────────────────────────────────────────────────
# Additional gates that must pass before a matched signal becomes a fill
t_items = []
# open positions
t_items.append(_item("open_pos",
GREEN if open_count == 0 else ORANGE,
str(open_count)))
# leverage headroom
lev_pct = lev / abs_cap if abs_cap else 0
t_items.append(_item("lev",
GREEN if lev_pct < 0.3 else (YELLOW if lev_pct < 0.7 else RED),
f"{lev:.2f}x/{abs_cap:.0f}"))
# regime_dd_halt
t_items.append(_item("regime",
GREEN if not halt else RED,
"free" if not halt else "HALTED"))
# Rm strength
rm = float(safe.get("Rm", 0) or 0)
t_items.append(_item("Rm",
GREEN if rm >= 0.90 else (YELLOW if rm >= 0.70 else (ORANGE if rm >= 0.50 else RED)),
f"{rm:.3f}"))
# Cat5 (intraday drawdown contribution)
c5 = float((safe.get("breakdown") or {}).get("Cat5", 1.0) or 1.0)
t_items.append(_item("Cat5",
GREEN if c5 >= 0.95 else (YELLOW if c5 >= 0.85 else (ORANGE if c5 >= 0.70 else RED)),
f"{c5:.3f}"))
# trades today
t_items.append(_item("trades",
GREEN if trades_ex < 20 else (YELLOW if trades_ex < 35 else ORANGE),
str(trades_ex)))
# ALL GREEN trade execute indicator
daily_loss_ok = c5 > 0.50 # reasonable proxy — Cat5 tracks drawdown
all_trade = all_sig and trade_can_execute(open_count, lev, abs_cap, daily_loss_ok, acb_boost_val)
if all_trade:
t_items.append(f" {CYAN}{BOLD}◉ TRADE{RST}")
sig_row = " ".join(s_items)
trade_row = " ".join(t_items)
fill = fill_row(obf_universe or {}, acb, eng)
return sig_row, trade_row, fill
def render(hz):
global START_CAP, CAP_PEAK
# ── DITAv2 kernel + BingX account (reads kernel snapshot file, not Hz) ──
ditav2_section = _render_ditav2_section()
eng = _get(hz, PINK_STATE_MAP, "engine_snapshot")
cap = _get(hz, PINK_STATE_MAP, "capital_checkpoint")
safe = _get(hz, PINK_SAFETY_MAP, "latest")
hb = _get(hz, PINK_HEARTBEAT_MAP, PINK_HEARTBEAT_KEY)
mh = _get(hz, PINK_META_HEALTH_MAP, "latest")
ann = _get(hz, PINK_ANNOUNCEMENTS_MAP, "latest")
acb = _get(hz, PINK_FEATURES_MAP, "acb_boost")
exf = _get(hz, PINK_FEATURES_MAP, "exf_latest")
obf = _get(hz, PINK_FEATURES_MAP, "obf_universe_latest")
esof = _get(hz, PINK_FEATURES_MAP, "esof_advisor_latest")
if not esof:
esof = _get(hz, PINK_FEATURES_MAP, "esof_latest")
maras = _get(hz, PINK_FEATURES_MAP, "maras_latest")
eng, account_row = _overlay_account_event_state(eng)
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
capital = float(eng.get("capital", 0) or cap.get("capital", 0))
posture = safe.get("posture") or eng.get("posture") or "?"
rm = float(safe.get("Rm", 0))
hb_ts = hb.get("ts")
phase = hb.get("phase", "?")
trader_up, trader_src, hb_age_s, eng_age_s = _is_trader_live(hb, eng)
trades = _int_or_default(eng.get("trades_executed"), 0)
fills_today = _account_event_fill_count(account_row)
scans = _int_or_default(eng.get("scans_processed"), _int_or_default(eng.get("last_scan_number"), 0))
lev = float(eng.get("current_leverage", 0))
notional= float(eng.get("open_notional", 0))
mhs_st = mh.get("status", "?")
rm_meta = float(mh.get("rm_meta", 0))
ann_title = ann.get("title") if ann else None
ann_sev = (ann.get("severity") or "info") if ann else "info"
ann_col = {"critical": RED, "warning": YELLOW, "info": CYAN}.get(ann_sev, DIM)
ann_ts = ann.get("ts") if ann else None
ann_age = _age_seconds(_iso_age_seconds(ann_ts)) if ann_ts else "?"
if capital > 0:
if START_CAP is None: START_CAP = capital
if CAP_PEAK is None or capital > CAP_PEAK: CAP_PEAK = capital
roi = ((capital - START_CAP) / START_CAP * 100) if START_CAP and capital else 0
dd = ((CAP_PEAK - capital) / CAP_PEAK * 100) if CAP_PEAK and capital < CAP_PEAK else 0
pc = PC.get(posture, DIM)
sc = SC.get(mhs_st, DIM)
tc = GREEN if trader_up else RED
roi_c = GREEN if roi >= 0 else RED
dd_c = RED if dd > 15 else (YELLOW if dd > 5 else GREEN)
sig_row, trade_row, fill_row_str = gear_rows(eng, safe, acb, exf, hb, obf)
svc = mh.get("service_status", {})
hz_ks = mh.get("hz_key_status", {})
L = []
L.append(f"{BOLD}{CYAN}🐬 DOLPHIN-PINK{RST} {BOLD}v7{RST} {DIM}{now}{RST}")
L.append("" * 65)
# TRADER
L.append(f"{BOLD}TRADER{RST} {tc}{'● LIVE' if trader_up else '● DOWN'}{RST}"
f" phase:{phase} hb:{_age(hb_ts)} eng:{_age_seconds(eng_age_s)}"
f" src:{trader_src}"
f" scan:#{eng.get('last_scan_number','?')}")
# ── SIGNAL → FILL GEARS ───────────────────────────────────────────────────
vol_ok_live = bool(eng.get("vol_ok", False))
vol_gate_threshold = float(eng.get("vol_gate_threshold", VOL_P60) or VOL_P60)
if not vol_ok_live:
L.append(
f" {RED}{BOLD}⛔ VOL_OK=FALSE — engine gate closed, "
f"NO trades until BTC vol > {vol_gate_threshold:.6f}{RST}"
)
L.append(f" {DIM}SIG │{RST} {sig_row}")
L.append(f" {DIM}TRD │{RST} {trade_row}")
L.append(f" {DIM}FIL │{RST} {fill_row_str}")
# ── EsoF ADVISORY ─────────────────────────────────────────────────────────
if esof:
_ec = {
"FAVORABLE": GREEN, "MILD_POSITIVE": "\033[92m",
"NEUTRAL": YELLOW, "MILD_NEGATIVE": "\033[91m",
"UNFAVORABLE": RED,
}
_lbl = esof.get("advisory_label", "?")
_col = _ec.get(_lbl, DIM)
_sc = float(esof.get("advisory_score", 0))
_sess = esof.get("session", "?")
_dow = esof.get("dow_name", "?")
_slot = esof.get("slot_15m", "?")
_swr = esof.get("session_wr_pct", 0)
_dwr = esof.get("dow_wr_pct", 0)
_moon = esof.get("moon_phase", "?")[:8]
_retro= " ☿RETRO" if esof.get("mercury_retrograde") else ""
L.append(f" {DIM}EsoF│{RST} {_col}{_lbl:<13}{RST} sc:{_col}{_sc:+.3f}{RST} "
f"sess:{_sess}({_swr:.0f}%) "
f"dow:{_dow}({_dwr:.0f}%) "
f"slot:{_slot} {DIM}{_moon}{_retro}{RST}")
else:
L.append(f" {DIM}EsoF│ advisory unavailable (keys: esof_advisor_latest/esof_latest){RST}")
L.append("")
# ── CH persistence ─────────────────────────────────────────────────────────
# CAPITAL
L.append(f"{BOLD}CAPITAL{RST} {CYAN}${capital:,.2f}{RST}"
+ (f" ROI:{roi_c}{roi:+.2f}%{RST} DD:{dd_c}{dd:.2f}%{RST}"
f" start:${START_CAP:,.0f}" if START_CAP else ""))
bar_idx = _int_or_default(eng.get("bar_idx"), _int_or_default(eng.get("last_scan_number"), 0))
L.append(f" trades:{trades} fills:{fills_today} scans:{scans} bar:{bar_idx}"
f" lev:{lev:.2f}x notional:${notional:,.0f}")
# Open positions + EXIT COMPARISON
positions = eng.get("open_positions") or []
if positions:
_pa = positions[0].get("asset", "")
_lp = 0.0; _obi = 0.0
if _pa and obf:
_oad = obf.get(_pa, {})
_ob_bid = float(_oad.get("best_bid", 0) or 0)
_ob_ask = float(_oad.get("best_ask", 0) or 0)
_lp = (_ob_bid + _ob_ask) / 2 if _ob_bid > 0 and _ob_ask > 0 else 0
_obi = float(_oad.get("imbalance", 0) or 0)
L.append(f" {BOLD}OPEN:{RST}")
for p in positions:
sc2 = GREEN if p.get("side") == "LONG" else RED
upnl, cur = _derive_open_position_pnl(p, obf)
cur_s = f" mark:{cur:.4g}" if cur > 0 else ""
L.append(f" {sc2}{p.get('asset','?')} {p.get('side','?')}{RST}"
f" qty:{p.get('quantity',0):.4f}"
f" entry:{p.get('entry_price',0):.2f}"
f" upnl:{upnl:+.2f}{cur_s}")
# ── EXIT COMPARISON: base engine vs V7 ──────────────────────────────
_etr = _update_exit_tracker(positions, eng, _lp, _obi)
_v7p = _v7_preview(_etr)
if _v7p:
bh = _v7p["bars_held"]
bf = _v7p["bars_frac"]
pp = _v7p["pnl_pct"]
tp_pct = abs(pp) / FIXED_TP_PCT * 100 if FIXED_TP_PCT else 0
bf_bar = round(bf * 20)
bc = GREEN if bf < 0.6 else (YELLOW if bf < 0.85 else RED)
tc = GREEN if pp >= FIXED_TP_PCT else (YELLOW if tp_pct > 50 else DIM)
vc = RED if _v7p["action"] == "EXIT" else (YELLOW if _v7p["action"] == "RETRACT" else GREEN)
ps = "+" if _v7p["proj_usd"] >= 0 else ""
L.append(f" {BOLD}EXIT CMP{RST} {DIM}bar{RST} {bc}{bh}/{MAX_HOLD_BARS} [{'\u2588'*bf_bar+'\u2591'*(20-bf_bar)}]{RST} {bf*100:.0f}%"
f" {DIM}TP{RST} {tc}{abs(pp)*100:.3f}%/{FIXED_TP_PCT*100:.2f}% ({tp_pct:.0f}%){RST}")
L.append(f" {vc}V7:{_v7p['action']:<8}{RST} P={_v7p['pressure']:.2f}"
f" mae:{_v7p['mae']:.4f} mfe:{_v7p['mfe']:.4f}"
f" {DIM}\u2192{RST} {ps}${_v7p['proj_usd']:.2f} ({pp*100:+.3f}%)"
f" {DIM}[{_v7p['reason']}]{RST}")
else:
L.append(f" {DIM}no open positions{RST}")
_EXIT_TRACKER.clear()
L.append("")
# POSTURE
bd = safe.get("breakdown") or {}
L.append(f"{BOLD}POSTURE{RST} {pc}{posture}{RST} Rm:{pc}{_bar(rm,20)}{RST} {rm:.4f}")
cats = " ".join(f"C{i}:{float(bd.get(f'Cat{i}',0)):.2f}" for i in range(1,6))
L.append(f" {cats} f_env:{float(bd.get('f_env',0)):.3f} f_exe:{float(bd.get('f_exe',0)):.3f}")
L.append("")
# SYS HEALTH
L.append(f"{BOLD}SYS HEALTH{RST} {sc}{mhs_st}{RST} rm_meta:{rm_meta:.3f}")
for line in _format_two_column_sensor_lines(_sys_health_sensor_rows(mh)):
L.append(line)
L.append(f" {DIM}services:{RST} "
+ " ".join(
f"{'' if st=='RUNNING' else f'{RED}{RST}'}{DIM}{n.split(':')[-1]}{RST}"
if st == "RUNNING" else
f"{RED}{DIM}{n.split(':')[-1]}{RST}"
for n, st in sorted(svc.items())))
L.append(f" {DIM}hz_keys:{RST} "
+ " ".join(
f"{GREEN if float(i.get('score',0))>=0.9 else (YELLOW if float(i.get('score',0))>=0.5 else RED)}{RST}{DIM}{k}{RST}"
for k, i in sorted(hz_ks.items())))
L.append(f" {DIM}ann:{RST} "
+ (f"{ann_col}{ann_title}{RST} {DIM}{ann_age}{RST}" if ann_title else f"{DIM}none{RST}"))
# ── RECENT TRADES ────────────────────────────────────────────────────────
_, trades_hist, fill_hist, trades_source = _trade_panel_rows(eng, obf, limit=5)
if trades_hist or fill_hist:
L.append("")
L.append(f"{BOLD}RECENT TRADES{RST} {DIM}(from {trades_source}){RST}")
if trades_hist:
for t in trades_hist:
pnl = _closed_trade_display_pnl(t)
pct = float(t.get("pnl_pct", 0)) * 100
lev = float(t.get("leverage", 0))
ep = float(t.get("entry_price", 0))
reason = t.get("reason", "?")
asset = t.get("asset", "?")
bars = t.get("bars_held", 0)
ts_raw = str(t.get("ts", ""))[:16].replace("T", " ")
pc2 = GREEN if pnl >= 0 else RED
L.append(
f" {pc2}{'' if pnl>=0 else ''}{RST}"
f" {asset:<12} "
f"ep:{ep:.4g} "
f"lev:{lev:.2f}x "
f"pnl:{pc2}{pnl:+.2f}({pct:+.2f}%){RST} "
f"exit:{reason} bars:{bars} {DIM}{ts_raw}{RST}"
)
elif not fill_hist:
L.append(f" {DIM}no recent trades yet{RST}")
if fill_hist:
L.append("")
L.append(f"{BOLD}RECENT FILLS{RST} {DIM}(account events, not terminal closes){RST}")
for t in fill_hist:
pnl = _closed_trade_display_pnl(t)
pct = float(t.get("pnl_pct", 0)) * 100
lev = float(t.get("leverage", 0))
ep = float(t.get("entry_price", 0))
reason = t.get("reason", "FILL")
asset = t.get("asset", "?")
bars = t.get("bars_held", 0)
ts_raw = str(t.get("ts", ""))[:16].replace("T", " ")
pc2 = GREEN if pnl >= 0 else RED
L.append(
f" {pc2}{RST}"
f" {asset:<12} "
f"ep:{ep:.4g} "
f"lev:{lev:.2f}x "
f"fill:{pc2}{pnl:+.2f}({pct:+.2f}%){RST} "
f"status:{reason} bars:{bars} {DIM}{ts_raw}{RST}"
)
# ── HOTKEY BAR ──────────────────────────────────────────────────────────
L.append("")
hk_bar = f"{DIM}HOTKEYS:{RST} " + " ".join(
f"{CYAN}[{k}]{RST}{DIM}{desc[0]}{RST}"
for k, desc in sorted(HOTKEY_MAP.items())
)
L.append(hk_bar)
if _HOTKEY_FEEDBACK:
for fb_ts, fb_msg in _HOTKEY_FEEDBACK:
fb_age = time.time() - fb_ts
if fb_age < 15:
L.append(f" {GREEN}{fb_msg}{RST} {DIM}({fb_age:.0f}s ago){RST}")
L.append("")
# ── MARAS footer ───────────────────────────────────────────────────────────
if maras:
reg = maras.get("regime","?")
sc = maras.get("final_score",0)
cf = maras.get("confidence",0)
cl = maras.get("conflict_level",0)
ch = maras.get("composite_hash",0)
sh = maras.get("scalar_hash",0)
te = maras.get("tier_exf",0)
tg = maras.get("tier_eigen",0)
tb = maras.get("tier_btc",0)
ts = maras.get("tier_esof",0)
tm = maras.get("tier_micro",0)
rc = {"VERY_BEARISH":RED,"BEARISH":RED,"CHOPPY_BEARISH":YELLOW,
"CHOPPY":YELLOW,"SIDEWAYS":DIM,"CHOPPY_BULLISH":CYAN,
"BULLISH":GREEN,"VERY_BULLISH":GREEN}
col = rc.get(reg, DIM)
confl_col = RED if cl > 0.5 else (YELLOW if cl > 0.3 else GREEN)
L.append(f" {BOLD}{CYAN}MARAS{RST} {col}{reg}{RST}"
f" score={sc:+.2f} conf={cf:.0f}%"
f" {confl_col}conflict={cl:.2f}{RST}"
f" chash={ch} shash={sh}"
f" | ExF={te:+.2f} Eig={tg:+.2f} BTC={tb:+.2f} EsF={ts:+.2f} Mic={tm:+.2f}")
else:
L.append(f" {DIM}MARAS: waiting for daemon start...{RST}")
L.append(f"{DIM}v7 • PINK • 0.5s poll • hotkeys • CH→status+audit • Ctrl-C quit{RST}")
# ── CH persistence ─────────────────────────────────────────────────────────
# Write every other cycle (1s effective rate) to avoid CH noise
if int(time.time() * 2) % 2 == 0:
ch_put({
"ts": int(time.time() * 1000),
"capital": capital,
"roi_pct": round(roi, 4),
"dd_pct": round(dd, 4),
"trades_executed": int(eng.get("trades_executed", 0) or 0),
"posture": posture,
"rm": round(rm, 6),
"vel_div": round(float(eng.get("last_vel_div", 0) or 0), 6),
"vol_ok": 1 if eng.get("vol_ok") else 0,
"phase": phase,
"mhs_status": mhs_st,
"boost": round(float(acb.get("boost", 1.0) if acb else 1.0), 4),
"cat5": round(float((safe.get("breakdown") or {}).get("Cat5", 1.0) or 1.0), 6),
})
# Prepend the DITAv2 section at the top of the display
return ditav2_section + "\n" + "\n".join(L)
def _dispatch_hotkey(key: str, hz):
entry = HOTKEY_MAP.get(key)
if entry is None:
return
label, fn = entry
if fn is None:
return
_HOTKEY_FEEDBACK.append((time.time(), f"{label} ..."))
try:
result = fn(hz)
_HOTKEY_FEEDBACK.append((time.time(), f"{label}: {result}"))
except Exception as e:
_HOTKEY_FEEDBACK.append((time.time(), f"{label} FAILED: {e}"))
def main():
print(f"Connecting to HZ ({PINK_STATE_MAP})...")
hz = hazelcast.HazelcastClient(
cluster_name="dolphin", cluster_members=["localhost:5701"],
connection_timeout=5.0)
posture_mode = "enabled" if PINK_ALLOW_GLOBAL_POSTURE_HOTKEYS else "disabled"
print(f"Connected. PINK hotkeys enabled (global posture hotkeys {posture_mode}).\n")
fd = sys.stdin.fileno()
old_term = None
if sys.stdin.isatty():
old_term = termios.tcgetattr(fd)
tty.setcbreak(fd)
try:
while True:
try:
sys.stdout.write(CLEAR + render(hz) + "\n")
sys.stdout.flush()
except Exception as e:
sys.stdout.write(f"\n{RED}render error: {e}{RST}\n")
sys.stdout.flush()
try:
key = _read_hotkey(0.5)
if key:
_dispatch_hotkey(key, hz)
except KeyboardInterrupt:
raise
except Exception:
pass
except KeyboardInterrupt:
print(f"\n{DIM}Bye.{RST}")
finally:
if old_term is not None:
termios.tcsetattr(fd, termios.TCSADRAIN, old_term)
hz.shutdown()
if __name__ == "__main__":
main()