Files
siloqy/prod/bingx/health.py

152 lines
4.8 KiB
Python
Raw Normal View History

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from decimal import Decimal
from typing import Any
from .journal import load_latest_record
@dataclass(frozen=True)
class BingxHealthSummary:
score: float
status: str
event_type: str
reason: str
age_s: float
transport: float
freshness: float
coherence: float
rate_limit: float
circuit: float
ws_healthy: bool
ledger_authority: str
def load_latest_health_summary(
*,
strategy: str = "bingx",
account_id: str | None = None,
now: datetime | None = None,
) -> BingxHealthSummary | None:
record = load_latest_record(strategy, account_id=account_id)
if record is None:
return None
return score_health_record(record, now=now)
def score_health_record(
record: dict[str, Any],
*,
now: datetime | None = None,
) -> BingxHealthSummary:
payload = record.get("payload") if isinstance(record, dict) else {}
if not isinstance(payload, dict):
payload = {}
notes = record.get("notes") if isinstance(record, dict) else {}
if not isinstance(notes, dict):
notes = {}
alarm = notes.get("alarm") if isinstance(notes, dict) else {}
if not isinstance(alarm, dict):
alarm = {}
event_type = str(record.get("event_type") or alarm.get("reason") or "")
reason = str(alarm.get("reason") or event_type or "")
ledger_authority = str(payload.get("ledger_authority") or notes.get("ledger_authority") or "exchange")
ws_healthy = bool(payload.get("ws_healthy", True))
parsed_ts = _parse_ts(record.get("ts"))
now_dt = now or datetime.now(timezone.utc)
age_s = max(0.0, (now_dt - parsed_ts).total_seconds()) if parsed_ts is not None else 0.0
freshness = 1.0
if age_s > 60.0:
freshness = 0.0
elif age_s > 20.0:
freshness = 0.5
circuit = 1.0
cb = payload.get("circuit_breaker") if isinstance(payload, dict) else {}
if isinstance(cb, dict):
open_until_ns = int(cb.get("open_until_ns") or 0)
failure_count = int(cb.get("failure_count") or 0)
last_delay_ms = int(cb.get("last_delay_ms") or 0)
if open_until_ns > 0:
circuit = 0.0
elif failure_count > 0 or last_delay_ms > 0:
circuit = 0.5
rate_limit = 1.0
rl = payload.get("rate_limits") if isinstance(payload, dict) else {}
if isinstance(rl, dict):
remaining = rl.get("rest_remaining")
reset_ms = int(rl.get("rest_reset_ms") or 0)
if remaining is not None:
remaining_int = int(remaining)
if remaining_int <= 0:
rate_limit = 0.0
elif remaining_int <= 5:
rate_limit = 0.25
elif remaining_int <= 20:
rate_limit = 0.6
if reset_ms > 0 and rate_limit > 0.0:
rate_limit = min(rate_limit, 0.8)
transport = 1.0 if ws_healthy else 0.3
if event_type in {"BINGX_WS_DOWN", "BINGX_REST_FAIL"}:
transport = 0.0
coherence = 1.0 if ledger_authority == "exchange" else 0.2
if event_type == "BINGX_DRIFT":
coherence = 0.0
elif event_type in {"BINGX_ORDER_REJECTED", "BINGX_ORDER_CANCEL_REJECTED"}:
coherence = min(coherence, 0.8)
if alarm:
severity = float(alarm.get("severity") or 0.0)
category = str(alarm.get("category") or "").lower()
if severity >= 0.85:
transport = min(transport, 0.0 if category in {"transport", "auth", "ws"} else transport)
coherence = min(coherence, 0.0 if category in {"coherence", "drift"} else coherence)
elif severity >= 0.5:
transport = min(transport, 0.5)
coherence = min(coherence, 0.5)
score = min(freshness, circuit, rate_limit, transport, coherence)
if score >= 0.85:
status = "GREEN"
elif score >= 0.6:
status = "DEGRADED"
elif score >= 0.3:
status = "CRITICAL"
else:
status = "DEAD"
return BingxHealthSummary(
score=round(score, 3),
status=status,
event_type=event_type,
reason=reason,
age_s=round(age_s, 1),
transport=round(transport, 3),
freshness=round(freshness, 3),
coherence=round(coherence, 3),
rate_limit=round(rate_limit, 3),
circuit=round(circuit, 3),
ws_healthy=ws_healthy,
ledger_authority=ledger_authority,
)
def _parse_ts(raw: Any) -> datetime | None:
if raw is None:
return None
if isinstance(raw, datetime):
return raw.replace(tzinfo=timezone.utc) if raw.tzinfo is None else raw.astimezone(timezone.utc)
try:
parsed = datetime.fromisoformat(str(raw).replace("Z", "+00:00"))
return parsed.replace(tzinfo=timezone.utc) if parsed.tzinfo is None else parsed.astimezone(timezone.utc)
except Exception:
return None