Files
siloqy/prod/launch_dolphin_pink.py

421 lines
15 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""PINK live launcher — DITAv2-backed execution.
Wires PINK decision/intent logic through the DITAv2 kernel + BingX venue
adapter. The kernel owns the single-slot FSM, AccountProjection (capital
settled from fills, not balance-poll overwritten), Zinc shared-memory mirror,
and Hazelcast slot projection.
"""
from __future__ import annotations
import asyncio
from copy import deepcopy
import contextlib
import os
import sys
from pathlib import Path
from enum import Enum
from typing import Any
from datetime import datetime
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT / "prod"))
sys.path.insert(0, str(PROJECT_ROOT / "prod" / "clean_arch"))
sys.path.insert(0, str(PROJECT_ROOT))
from dotenv import load_dotenv
load_dotenv(PROJECT_ROOT / ".env")
from prod.bingx.config import BingxExecClientConfig
from prod.bingx.config import BingxInstrumentProviderConfig
from prod.bingx.enums import BingxEnvironment
from prod.clean_arch.adapters.hazelcast_feed import HazelcastDataFeed
from prod.clean_arch.dita import DecisionConfig
from prod.clean_arch.dita import DecisionEngine
from prod.clean_arch.dita import IntentEngine
from prod.clean_arch.dita_v2.launcher import build_launcher_bundle
from prod.clean_arch.persistence import PinkClickHousePersistence
from adaptive_exit.market_state_runtime import MarketStateRuntime
from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime
from prod.clean_arch.runtime.runner_heartbeat import (
build_runner_heartbeat_payload,
write_runner_heartbeat,
)
PINK_DEFAULTS = {
"strategy_name": "pink",
"state_map": "DOLPHIN_STATE_PINK",
"pnl_map": "DOLPHIN_PNL_PINK",
"trader_id": "DOLPHIN-PINK-001",
"journal_strategy": "pink",
"journal_db": "dolphin_pink",
"fixed_tp_pct": 0.0020,
"vol_p60_threshold": -1000000000.0,
}
class PinkPhase(str, Enum):
"""Feature-gate phases for the standalone PINK launcher."""
BOOTSTRAP = "bootstrap"
SINGLE_LEG = "single_leg"
MULTI_EXIT = "multi_exit"
def _env_bool(name: str, default: bool = False) -> bool:
raw = os.environ.get(name)
if raw is None:
return default
return str(raw).strip().lower() in {"1", "true", "yes", "on"}
def _env_upper(name: str, default: str = "") -> str:
return str(os.environ.get(name, default)).strip().upper()
def _resolve_bingx_environment() -> BingxEnvironment:
name = str(os.environ.get("DOLPHIN_BINGX_ENV", "VST")).strip().upper()
return BingxEnvironment.LIVE if name == "LIVE" else BingxEnvironment.VST
def _resolve_bingx_allow_mainnet() -> bool:
return _env_bool("DOLPHIN_BINGX_ALLOW_MAINNET", False)
def _resolve_bingx_recv_window_ms() -> int:
raw = str(os.environ.get("DOLPHIN_BINGX_RECV_WINDOW_MS", "5000")).strip()
try:
parsed = int(raw)
except Exception:
return 5000
return parsed if parsed > 0 else 5000
def _resolve_bingx_exchange_leverage_cap() -> int:
raw = str(os.environ.get("DOLPHIN_BINGX_EXCHANGE_LEVERAGE_CAP", "3")).strip()
try:
parsed = int(raw)
except Exception:
return 3
return parsed if parsed > 0 else 3
def _resolve_pink_vol_p60_threshold() -> float:
raw = str(os.environ.get("DOLPHIN_PINK_VOL_P60_THRESHOLD", PINK_DEFAULTS["vol_p60_threshold"])).strip()
try:
return float(raw)
except Exception:
return float(PINK_DEFAULTS["vol_p60_threshold"])
def _resolve_pink_phase() -> PinkPhase:
raw = str(os.environ.get("DOLPHIN_PINK_PHASE", PinkPhase.SINGLE_LEG.value)).strip().lower()
for phase in PinkPhase:
if raw == phase.value:
return phase
return PinkPhase.SINGLE_LEG
def _resolve_pink_account_sync_interval_sec() -> float:
"""Account sync is now advisory — kernel tracks capital via settle()
on close. Periodic reconcile re-seeds capital from exchange balance,
mainly as a safety net for long-running sessions."""
raw = str(os.environ.get("DOLPHIN_PINK_ACCOUNT_SYNC_INTERVAL_SEC", "300")).strip()
try:
parsed = float(raw)
except Exception:
return 300.0
return parsed if parsed > 0 else 300.0
def _resolve_pink_exit_leg_ratios(phase: PinkPhase) -> tuple[float, ...]:
if phase is PinkPhase.MULTI_EXIT:
raw = str(os.environ.get("DOLPHIN_PINK_EXIT_LEG_RATIOS", "0.5,1.0")).strip()
ratios: list[float] = []
for chunk in raw.split(","):
try:
value = float(chunk.strip())
except Exception:
continue
if 0.0 < value <= 1.0:
ratios.append(value)
if ratios:
return tuple(ratios)
return (0.5, 1.0)
return (1.0,)
def _set_ditav2_env_defaults() -> None:
os.environ.setdefault("DITA_V2_VENUE", "BINGX")
os.environ.setdefault("DITA_V2_HAZELCAST", "REAL")
os.environ.setdefault("DITA_V2_MODE", "DEBUG")
os.environ.setdefault("DITA_V2_VERBOSITY", "TRACE")
os.environ.setdefault("DITA_V2_PREFIX", "pink")
os.environ.setdefault("DOLPHIN_BINGX_ENV", "VST")
os.environ.setdefault("DOLPHIN_BINGX_ALLOW_MAINNET", "0")
def _apply_pink_namespace_env() -> None:
os.environ["DOLPHIN_STRATEGY_NAME"] = PINK_DEFAULTS["strategy_name"]
os.environ["DOLPHIN_STATE_MAP"] = PINK_DEFAULTS["state_map"]
os.environ["DOLPHIN_PNL_MAP"] = PINK_DEFAULTS["pnl_map"]
os.environ["DOLPHIN_JOURNAL_STRATEGY"] = PINK_DEFAULTS["journal_strategy"]
os.environ["DOLPHIN_JOURNAL_DB"] = PINK_DEFAULTS["journal_db"]
os.environ["DOLPHIN_FIXED_TP_PCT"] = f'{PINK_DEFAULTS["fixed_tp_pct"]:.4f}'
os.environ["DOLPHIN_BINGX_ENV"] = "VST"
os.environ["DOLPHIN_BINGX_ALLOW_MAINNET"] = "0"
def _apply_pink_env() -> None:
_set_ditav2_env_defaults()
_apply_pink_namespace_env()
def _apply_pink_actor_overrides(actor_cfg: dict[str, Any]) -> dict[str, Any]:
cfg: dict[str, Any] = deepcopy(actor_cfg) if actor_cfg else {}
cfg["strategy_name"] = PINK_DEFAULTS["strategy_name"]
hz = cfg.setdefault("hazelcast", {})
hz["state_map"] = PINK_DEFAULTS["state_map"]
hz["imap_pnl"] = PINK_DEFAULTS["pnl_map"]
hz["state_map_aliases"] = []
hz["imap_pnl_aliases"] = []
adaptive_exit = cfg.setdefault("adaptive_exit", {})
adaptive_exit["shadow_db"] = PINK_DEFAULTS["journal_db"]
cfg["v7_journal_db"] = PINK_DEFAULTS["journal_db"]
cfg["sync_bar_idx_from_blue"] = False
vol_p60_threshold = _resolve_pink_vol_p60_threshold()
cfg["vol_p60_threshold"] = vol_p60_threshold
cfg.setdefault("paper_trade", {})["vol_p60"] = vol_p60_threshold
cfg.setdefault("engine", {})["fixed_tp_pct"] = float(PINK_DEFAULTS["fixed_tp_pct"])
return cfg
class BinanceDataClientConfig: # pragma: no cover - compatibility shim
"""Local placeholder so legacy tests can patch the symbol without Nautilus imports."""
class TradingNode: # pragma: no cover - compatibility shim
"""Local placeholder so legacy tests can patch the symbol without Nautilus imports."""
def build_actor_config(
*,
data_venue: str | None = None,
exec_venue: str | None = None,
) -> dict[str, Any]:
"""Build the minimal actor config needed by the direct PINK launcher."""
return _apply_pink_actor_overrides(
{
"strategy_name": PINK_DEFAULTS["strategy_name"],
"hazelcast": {
"state_map": PINK_DEFAULTS["state_map"],
"imap_pnl": PINK_DEFAULTS["pnl_map"],
"state_map_aliases": [],
"imap_pnl_aliases": [],
},
"adaptive_exit": {"shadow_db": PINK_DEFAULTS["journal_db"]},
"paper_trade": {"vol_p60": _resolve_pink_vol_p60_threshold()},
"engine": {"fixed_tp_pct": PINK_DEFAULTS["fixed_tp_pct"]},
"data_venue": (data_venue or "BINANCE").upper(),
"exec_venue": (exec_venue or "BINGX").upper(),
"v7_journal_db": PINK_DEFAULTS["journal_db"],
"sync_bar_idx_from_blue": False,
}
)
def build_bingx_exec_client_config(**_: Any) -> BingxExecClientConfig:
"""Return the direct BingX client config shared by the DITAv2 bundle."""
return BingxExecClientConfig(
api_key=os.environ.get("BINGX_API_KEY"),
secret_key=os.environ.get("BINGX_SECRET_KEY"),
environment=_resolve_bingx_environment(),
allow_mainnet=_resolve_bingx_allow_mainnet(),
recv_window_ms=_resolve_bingx_recv_window_ms(),
default_leverage=int(os.environ.get("DOLPHIN_BINGX_DEFAULT_LEVERAGE", "1")),
exchange_leverage_cap=_resolve_bingx_exchange_leverage_cap(),
prefer_websocket=False,
sizing_mode=os.environ.get("DOLPHIN_BINGX_SIZING_MODE", "testnet"),
journal_strategy="pink",
journal_db="dolphin_pink",
instrument_provider=BingxInstrumentProviderConfig(load_all=True),
)
def build_pink_node(
*,
data_venue: str | None = None,
exec_venue: str | None = None,
trader_id: str | None = None,
) -> dict[str, Any]:
"""Compatibility shim for legacy tests/tools expecting a node-style builder."""
resolved_bingx_env = _resolve_bingx_environment()
resolved_bingx_allow_mainnet = _resolve_bingx_allow_mainnet()
if resolved_bingx_env is BingxEnvironment.LIVE and not resolved_bingx_allow_mainnet:
raise RuntimeError("BingX LIVE requested but DOLPHIN_BINGX_ALLOW_MAINNET is not enabled")
actor_cfg = build_actor_config(
data_venue=(data_venue or "BINANCE"),
exec_venue=(exec_venue or "BINGX"),
)
actor_cfg = _apply_pink_actor_overrides(actor_cfg)
actor_cfg["trader_id"] = trader_id or PINK_DEFAULTS["trader_id"]
actor_cfg["bingx_environment"] = str(resolved_bingx_env.value)
return {"actor_cfg": actor_cfg}
def _build_data_feed() -> HazelcastDataFeed:
return HazelcastDataFeed(
{
"hazelcast": {
"cluster": os.environ.get("HZ_CLUSTER", "dolphin"),
"host": os.environ.get("HZ_HOST", "localhost:5701"),
}
}
)
def _build_runtime(*, phase: PinkPhase) -> PinkDirectRuntime:
data_feed = _build_data_feed()
market_state_runtime = MarketStateRuntime()
# Decision and intent policy — unchanged from BLUE semantics.
# DOLPHIN_PINK_VEL_DIV_THRESHOLD: relax for on-exchange debugging (e.g. -0.005).
# Default -0.02 matches BLUE production. BLUE is unaffected.
_vel_div_threshold = float(os.environ.get("DOLPHIN_PINK_VEL_DIV_THRESHOLD", "-0.02"))
cfg = DecisionConfig(
vel_div_threshold=_vel_div_threshold,
vel_div_extreme=min(_vel_div_threshold * 2.5, -0.001),
fixed_tp_pct=float(os.environ.get("DOLPHIN_FIXED_TP_PCT", "0.0020")),
max_hold_bars=int(os.environ.get("DOLPHIN_MAX_HOLD_BARS", "250")),
capital_fraction=0.20,
max_leverage=3.0,
allow_short=True,
allow_long=False,
policy_version="pink_ditav2_v1",
exit_leg_ratios=_resolve_pink_exit_leg_ratios(phase),
)
decision = DecisionEngine(cfg)
intent = IntentEngine(cfg)
# DITAv2 execution bundle: kernel + venue + control + Zinc + projection.
bundle = build_launcher_bundle(
venue_mode="BINGX",
max_slots=1,
bingx_config=build_bingx_exec_client_config(),
)
kernel = bundle.kernel
# Persistence reads from the kernel's AccountProjection (single authority).
persistence = PinkClickHousePersistence(kernel.account)
# Non-blocking Hz state writer — writes DOLPHIN_STATE_PINK / DOLPHIN_PNL_PINK.
# Separate client from data-feed (which is read-only). Lazy: any Hz failure
# during instantiation is caught and silenced so PINK still trades without Hz.
hz_state_writer = None
try:
from prod.clean_arch.dita_v2.hazelcast_projection import PinkHzStateWriter
hz_state_writer = PinkHzStateWriter(
cluster=os.environ.get("HZ_CLUSTER", "dolphin"),
host=os.environ.get("HZ_HOST", "localhost:5701"),
state_map_name=PINK_DEFAULTS["state_map"], # "DOLPHIN_STATE_PINK"
pnl_map_name=PINK_DEFAULTS["pnl_map"], # "DOLPHIN_PNL_PINK"
)
except Exception:
pass # Hz down at startup → PINK still trades; TUI shows kernel snapshot fallback
return PinkDirectRuntime(
data_feed=data_feed,
kernel=kernel,
decision_engine=decision,
intent_engine=intent,
persistence=persistence,
market_state_runtime=market_state_runtime,
hz_state_writer=hz_state_writer,
)
async def run() -> None:
_apply_pink_env()
phase = _resolve_pink_phase()
os.environ["DOLPHIN_PINK_PHASE"] = phase.value
runtime = _build_runtime(phase=phase)
symbol = str(os.environ.get("DOLPHIN_PINK_SNAPSHOT_SYMBOL", "BTCUSDT")).strip().upper()
poll_interval = float(os.environ.get("DOLPHIN_PINK_POLL_INTERVAL_SEC", "1.0"))
one_shot = _env_bool("DOLPHIN_PINK_ONE_SHOT", False)
account_sync_interval = _resolve_pink_account_sync_interval_sec()
initial_capital = float(os.environ.get("DOLPHIN_INITIAL_CAPITAL", "25000.0"))
await runtime.connect(initial_capital=initial_capital)
heartbeat_client = None
heartbeat_map = None
heartbeat_stop = asyncio.Event()
heartbeat_task = None
try:
import hazelcast
heartbeat_client = hazelcast.HazelcastClient(
cluster_name=os.environ.get("HZ_CLUSTER", "dolphin"),
cluster_members=[os.environ.get("HZ_HOST", "localhost:5701")],
)
heartbeat_map = heartbeat_client.get_map("DOLPHIN_HEARTBEAT").blocking()
async def _heartbeat_loop() -> None:
while not heartbeat_stop.is_set():
try:
write_runner_heartbeat(
heartbeat_map,
build_runner_heartbeat_payload(
flow="pink_ditav2_runtime",
phase=phase.value,
run_date=str(datetime.utcnow().date()),
runner="pink",
),
)
except Exception:
pass
try:
await asyncio.wait_for(heartbeat_stop.wait(), timeout=10.0)
except asyncio.TimeoutError:
continue
heartbeat_task = asyncio.create_task(_heartbeat_loop())
initial_snapshot = await runtime.data_feed.get_latest_snapshot(symbol)
await runtime.recover_account(
snapshot=initial_snapshot,
phase="startup_reconcile",
event_type="ACCOUNT_RECONCILE",
)
last_account_sync = asyncio.get_running_loop().time()
while True:
snapshot = await runtime.data_feed.get_latest_snapshot(symbol)
loop_now = asyncio.get_running_loop().time()
if account_sync_interval > 0 and loop_now - last_account_sync >= account_sync_interval:
await runtime.reconcile_account(snapshot)
last_account_sync = loop_now
if phase is not PinkPhase.BOOTSTRAP and snapshot is not None:
await runtime.step(snapshot)
if one_shot:
break
await asyncio.sleep(poll_interval)
finally:
heartbeat_stop.set()
if heartbeat_task is not None:
heartbeat_task.cancel()
with contextlib.suppress(BaseException):
await heartbeat_task
if heartbeat_client is not None:
heartbeat_client.shutdown()
if runtime.hz_state_writer is not None:
runtime.hz_state_writer.close()
await runtime.disconnect()
if __name__ == "__main__":
asyncio.run(run())