Add BingX sandbox status sidecar

This commit is contained in:
Codex
2026-05-13 19:56:58 +02:00
parent 0d70c767e4
commit 34d01fe6a4
5 changed files with 2089 additions and 60 deletions

View File

@@ -0,0 +1,126 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
DEFAULT_SANDBOX_STATUS_PATH = Path("/tmp/bingx_sandbox_status.json")
@dataclass(frozen=True)
class BingxSandboxStatus:
"""Small sidecar snapshot for BingX demo/testnet state.
The snapshot is intentionally local-only so it can be used by tests and
operators without writing into BLUE state, ClickHouse, or production logs.
"""
ts: str
environment: str
balance: float
equity: float
available_margin: float
unrealized_profit: float
used_margin: float
open_positions: int
open_orders: int
account_currency: str = "VST"
clean: bool = False
notes: dict[str, Any] | None = None
def to_dict(self) -> dict[str, Any]:
return {
"ts": self.ts,
"environment": self.environment,
"account_currency": self.account_currency,
"balance": self.balance,
"equity": self.equity,
"available_margin": self.available_margin,
"unrealized_profit": self.unrealized_profit,
"used_margin": self.used_margin,
"open_positions": self.open_positions,
"open_orders": self.open_orders,
"clean": self.clean,
"notes": self.notes or {},
}
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
out = float(value)
except Exception:
return default
return out if out == out else default
def _count_positions(positions: Any) -> int:
if isinstance(positions, list):
return sum(1 for item in positions if isinstance(item, dict))
return 0
def _count_orders(open_orders: Any) -> int:
if isinstance(open_orders, dict):
orders = open_orders.get("orders")
if isinstance(orders, list):
return sum(1 for item in orders if isinstance(item, dict))
if isinstance(open_orders, list):
return sum(1 for item in open_orders if isinstance(item, dict))
return 0
def build_sandbox_status(
*,
balance_payload: dict[str, Any],
positions_payload: Any,
open_orders_payload: Any,
environment: str = "VST",
account_currency: str = "VST",
notes: dict[str, Any] | None = None,
) -> BingxSandboxStatus:
balance_row = balance_payload.get("balance", balance_payload) if isinstance(balance_payload, dict) else {}
if not isinstance(balance_row, dict):
balance_row = {}
balance = _safe_float(balance_row.get("balance"), 0.0)
equity = _safe_float(balance_row.get("equity"), balance)
available_margin = _safe_float(balance_row.get("availableMargin"), 0.0)
unrealized_profit = _safe_float(balance_row.get("unrealizedProfit"), 0.0)
used_margin = _safe_float(balance_row.get("usedMargin"), 0.0)
open_positions = _count_positions(positions_payload)
open_orders = _count_orders(open_orders_payload)
return BingxSandboxStatus(
ts=datetime.now(timezone.utc).isoformat(),
environment=str(environment),
account_currency=str(account_currency),
balance=balance,
equity=equity,
available_margin=available_margin,
unrealized_profit=unrealized_profit,
used_margin=used_margin,
open_positions=open_positions,
open_orders=open_orders,
clean=(open_positions == 0 and open_orders == 0),
notes=notes or {},
)
def snapshot_path(path: str | Path | None = None) -> Path:
return Path(path) if path is not None else DEFAULT_SANDBOX_STATUS_PATH
def write_sandbox_status(status: BingxSandboxStatus, path: str | Path | None = None) -> Path:
target = snapshot_path(path)
target.write_text(json.dumps(status.to_dict(), indent=2, sort_keys=True))
return target
def load_sandbox_status(path: str | Path | None = None) -> dict[str, Any] | None:
target = snapshot_path(path)
if not target.exists():
return None
try:
return json.loads(target.read_text())
except Exception:
return None

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,559 @@
"""
test_bingx_nautilus_execution.py
================================
End-to-end tests for the Nautilus execution path:
engine.step_bar() -> _exec_submit_entry() -> cache.instrument() -> order_factory -> submit_order
Tests cover:
1. Instrument registration from exec client into Nautilus cache
2. _exec_submit_entry returns early when instrument missing
3. _exec_submit_entry succeeds when instrument is in cache
4. Data-venue fallback when exec-venue instrument not available
5. Full order payload correctness (tags, side, quantity precision)
6. Venue symbol mapping uses raw_symbol from cached instrument
7. _venue_symbol fallback when instrument not in cache
8. Integration: build_actor_config with split venues
"""
from __future__ import annotations
import asyncio
import math
import sys
from decimal import Decimal
from pathlib import Path
from types import SimpleNamespace
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "nautilus_dolphin"))
from nautilus_trader.model.enums import OrderSide, OrderType
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.objects import Currency, Price, Quantity
from prod.bingx.enums import BINGX_VENUE, BingxEnvironment
from prod.bingx.execution import BingxExecutionClient
from prod.bingx.sandbox_status import build_sandbox_status
from prod.bingx.sandbox_status import write_sandbox_status
# -- Helpers -------------------------------------------------------------------
def _make_bingx_instrument(symbol: str = "BTCUSDT"):
return SimpleNamespace(
id=InstrumentId.from_str(f"{symbol}.BINGX"),
instrument_id=InstrumentId.from_str(f"{symbol}.BINGX"),
symbol=SimpleNamespace(value=symbol),
raw_symbol=SimpleNamespace(value=f"{symbol[:-4]}-USDT"),
base_currency=Currency.from_str(symbol[:3]),
quote_currency=Currency.from_str("USDT"),
size_precision=3,
price_precision=2,
maker_fee=Decimal("0.0002"),
taker_fee=Decimal("0.0005"),
)
def _make_binance_instrument(symbol: str = "BTCUSDT"):
return SimpleNamespace(
id=InstrumentId.from_str(f"{symbol}.BINANCE"),
instrument_id=InstrumentId.from_str(f"{symbol}.BINANCE"),
symbol=SimpleNamespace(value=symbol),
raw_symbol=SimpleNamespace(value=symbol),
base_currency=Currency.from_str(symbol[:3]),
quote_currency=Currency.from_str("USDT"),
size_precision=3,
price_precision=2,
maker_fee=Decimal("0.0002"),
taker_fee=Decimal("0.0005"),
)
class FakeCache:
def __init__(self, instruments=None):
self._instruments = dict(instruments or {})
def instrument(self, instrument_id):
return self._instruments.get(instrument_id)
def add_instrument(self, instrument):
self._instruments[instrument.id] = instrument
def instruments(self):
return list(self._instruments.values())
def positions(self, venue=None):
return []
def order(self, client_order_id):
return None
def add_currency(self, currency):
pass
class FakeProvider:
def __init__(self, instruments=None):
self._instruments = list(instruments or [])
def list_all(self):
return self._instruments
def currencies(self):
return {}
async def initialize(self):
pass
async def _noop(*args, **kwargs):
pass
async def _persist_sandbox_status(client, *, environment: str = "VST", notes: dict[str, Any] | None = None):
balance = await client.signed_get("/openApi/swap/v2/user/balance")
positions = await client.signed_get("/openApi/swap/v2/user/positions")
open_orders = await client.signed_get("/openApi/swap/v2/trade/openOrders")
status = build_sandbox_status(
balance_payload=balance,
positions_payload=positions,
open_orders_payload=open_orders,
environment=environment,
notes=notes or {},
)
write_sandbox_status(status)
return status
def _make_connect_stub(cache, provider):
return SimpleNamespace(
_cache=cache,
_provider=provider,
_config=SimpleNamespace(prefer_websocket=False),
_log=SimpleNamespace(info=lambda *a, **kw: None, warning=lambda *a, **kw: None),
_start_pollers=lambda: None,
_refresh_account_state=_noop,
_restore_journal_snapshot=_noop,
_persist_journal_snapshot=_noop,
_await_account_registered=_noop,
)
def _make_actor_stub(cache, exec_venue="BINGX", data_venue="BINANCE"):
log_messages = []
class Log:
def info(self, msg, *a, **kw):
log_messages.append(("info", msg))
def warning(self, msg, *a, **kw):
log_messages.append(("warning", msg))
def error(self, msg, *a, **kw):
log_messages.append(("error", msg))
def debug(self, msg, *a, **kw):
log_messages.append(("debug", msg))
return SimpleNamespace(
cache=cache,
log=Log(),
_log_messages=log_messages,
dolphin_config={
"engine": {"max_account_leverage": 2.0},
"paper_trade": {"initial_capital": 25000.0},
},
engine=SimpleNamespace(capital=100000.0),
_last_portfolio_capital=100000.0,
_exec_venue_name=lambda: exec_venue,
_data_venue_name=lambda: data_venue,
_exec_open_positions={},
order_factory=SimpleNamespace(
market=lambda **kw: SimpleNamespace(
instrument_id=kw.get("instrument_id"),
order_side=kw.get("order_side"),
quantity=kw.get("quantity"),
tags=kw.get("tags", []),
client_order_id=SimpleNamespace(value="test-coid"),
order_type=OrderType.MARKET,
strategy_id=SimpleNamespace(value="test-strat"),
)
),
submit_order=MagicMock(),
clock=SimpleNamespace(timestamp_ns=lambda: 1000),
)
# -- Test: Instrument Registration --------------------------------------------
class TestExecClientRegistersInstrumentsInCache:
def test_instruments_registered_after_connect(self):
inst1 = _make_bingx_instrument("BTCUSDT")
inst2 = _make_bingx_instrument("ETHUSDT")
cache = FakeCache()
provider = FakeProvider([inst1, inst2])
stub = _make_connect_stub(cache, provider)
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(BingxExecutionClient._connect(stub))
finally:
loop.close()
assert cache.instrument(InstrumentId.from_str("BTCUSDT.BINGX")) is inst1
assert cache.instrument(InstrumentId.from_str("ETHUSDT.BINGX")) is inst2
def test_empty_provider_no_crash(self):
cache = FakeCache()
provider = FakeProvider([])
stub = _make_connect_stub(cache, provider)
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(BingxExecutionClient._connect(stub))
finally:
loop.close()
assert len(list(cache.instruments())) == 0
# -- Test: _exec_submit_entry instrument lookup ------------------------------
class TestExecSubmitEntryInstrumentLookup:
def test_returns_early_when_no_exec_instrument_no_data_fallback(self):
cache = FakeCache()
stub = _make_actor_stub(cache)
entry = {"asset": "XLMUSDT", "direction": -1, "notional": 5000.0, "entry_price": 0.10, "trade_id": "t1", "leverage": 1.0}
prices = {"XLMUSDT": 0.10}
result = DolphinActor._exec_submit_entry(stub, entry, prices)
assert result is None
assert not stub.submit_order.called
error_msgs = [m for lvl, m in stub._log_messages if lvl == "error"]
assert any("not in cache" in m for m in error_msgs)
def test_succeeds_with_exec_instrument_in_cache(self):
inst = _make_bingx_instrument("XLMUSDT")
cache = FakeCache({inst.id: inst})
stub = _make_actor_stub(cache)
entry = {"asset": "XLMUSDT", "direction": -1, "notional": 5000.0, "entry_price": 0.10, "trade_id": "t1", "leverage": 1.0}
prices = {"XLMUSDT": 0.10}
DolphinActor._exec_submit_entry(stub, entry, prices)
assert stub.submit_order.called
order = stub.submit_order.call_args[0][0]
assert str(order.instrument_id) == "XLMUSDT.BINGX"
assert order.order_side == OrderSide.SELL
assert order.tags[0] == "type:entry"
assert "direction:SHORT" in order.tags
def test_data_venue_fallback_with_warning(self):
binance_inst = _make_binance_instrument("XLMUSDT")
cache = FakeCache({binance_inst.id: binance_inst})
stub = _make_actor_stub(cache, exec_venue="BINGX", data_venue="BINANCE")
entry = {"asset": "XLMUSDT", "direction": -1, "notional": 5000.0, "entry_price": 0.10, "trade_id": "t1", "leverage": 1.0}
prices = {"XLMUSDT": 0.10}
DolphinActor._exec_submit_entry(stub, entry, prices)
assert stub.submit_order.called
warn_msgs = [m for lvl, m in stub._log_messages if lvl == "warning"]
assert any("borrowing metadata" in m for m in warn_msgs)
def test_info_log_on_successful_entry(self):
inst = _make_bingx_instrument("XLMUSDT")
cache = FakeCache({inst.id: inst})
stub = _make_actor_stub(cache)
entry = {"asset": "XLMUSDT", "direction": -1, "notional": 5000.0, "entry_price": 0.10, "trade_id": "t42", "leverage": 1.5}
prices = {"XLMUSDT": 0.10}
DolphinActor._exec_submit_entry(stub, entry, prices)
info_msgs = [m for lvl, m in stub._log_messages if lvl == "info"]
assert any("[EXEC] ENTRY SHORT" in m and "XLMUSDT" in m for m in info_msgs)
def test_quantity_uses_instrument_size_precision(self):
inst = _make_bingx_instrument("BTCUSDT")
inst.size_precision = 4
cache = FakeCache({inst.id: inst})
stub = _make_actor_stub(cache)
entry = {"asset": "BTCUSDT", "direction": 1, "notional": 50000.0, "entry_price": 100000.0, "trade_id": "t1", "leverage": 1.0}
prices = {"BTCUSDT": 100000.0}
DolphinActor._exec_submit_entry(stub, entry, prices)
order = stub.submit_order.call_args[0][0]
assert order.quantity.precision == 4
# -- Test: _venue_symbol mapping ---------------------------------------------
class TestVenueSymbolMapping:
def test_uses_raw_symbol_when_instrument_in_cache(self):
inst = _make_bingx_instrument("TRXUSDT")
cache = FakeCache({inst.id: inst})
stub = SimpleNamespace(_cache=cache)
result = BingxExecutionClient._venue_symbol(stub, InstrumentId.from_str("TRXUSDT.BINGX"))
assert result == "TRX-USDT"
def test_fallback_converts_usdt_suffix(self):
stub = SimpleNamespace(_cache=FakeCache())
result = BingxExecutionClient._venue_symbol(stub, InstrumentId.from_str("XLMUSDT.BINGX"))
assert result == "XLM-USDT"
def test_fallback_passes_through_hyphenated(self):
stub = SimpleNamespace(_cache=FakeCache())
result = BingxExecutionClient._venue_symbol(stub, InstrumentId.from_str("BTC-USDT.BINGX"))
assert result == "BTC-USDT"
# -- Test: _map_submit_order -------------------------------------------------
class TestMapSubmitOrderForMarketOrder:
def test_market_sell_with_tags(self):
inst = _make_bingx_instrument("ETHUSDT")
cache = FakeCache({inst.id: inst})
order = SimpleNamespace(
instrument_id=InstrumentId.from_str("ETHUSDT.BINGX"),
side=OrderSide.SELL,
order_type=OrderType.MARKET,
quantity=Quantity.from_str("1.500"),
client_order_id=SimpleNamespace(value="test-cid-001"),
is_post_only=False,
is_reduce_only=False,
has_price=False,
has_trigger_price=False,
price=None,
trigger_price=None,
time_in_force=None,
tags=["type:entry", "direction:SHORT", "cm:2.50", "tid:t99"],
)
adapter = SimpleNamespace(
_cache=cache,
_config=SimpleNamespace(use_reduce_only=True, recv_window_ms=5000),
_venue_symbol=lambda iid: BingxExecutionClient._venue_symbol(adapter, iid),
_format_quantity=lambda q: str(q),
_format_price=lambda p: str(p),
_map_order_type=BingxExecutionClient._map_order_type,
_map_time_in_force=BingxExecutionClient._map_time_in_force,
)
# Rebind _venue_symbol after adapter exists so self-referencing works
adapter._venue_symbol = lambda iid: BingxExecutionClient._venue_symbol(adapter, iid)
payload = BingxExecutionClient._map_submit_order(adapter, order)
assert payload["symbol"] == "ETH-USDT"
assert payload["side"] == "SELL"
assert payload["type"] == "MARKET"
assert payload["quantity"] == "1.500"
assert payload["clientOrderId"] == "test-cid-001"
# -- Test: Order tag parsing for leverage ------------------------------------
class TestLeverageTagParsing:
def test_extracts_lev_tag(self):
order = SimpleNamespace(tags=["type:entry", "lev:2.50", "cm:2.50", "tid:t1"])
assert BingxExecutionClient._parse_leverage_from_tags(order) == 2.50
def test_extracts_cm_tag(self):
order = SimpleNamespace(tags=["type:entry", "cm:3.00", "tid:t1"])
assert BingxExecutionClient._parse_leverage_from_tags(order) == 3.00
def test_returns_none_no_tags(self):
order = SimpleNamespace(tags=[])
assert BingxExecutionClient._parse_leverage_from_tags(order) is None
def test_returns_none_no_leverage_tags(self):
order = SimpleNamespace(tags=["type:entry", "direction:SHORT"])
assert BingxExecutionClient._parse_leverage_from_tags(order) is None
# -- Test: Split venue configuration -----------------------------------------
class TestSplitVenueConfig:
def test_split_venues_preserved(self):
from prod.launch_dolphin_live import build_actor_config
cfg = build_actor_config(data_venue="BINANCE", exec_venue="BINGX")
assert cfg["data_venue"] == "BINANCE"
assert cfg["exec_venue"] == "BINGX"
assert cfg["venue"] == "BINGX"
# -- Import DolphinActor -----------------------------------------------------
try:
from nautilus_dolphin.nautilus.dolphin_actor import DolphinActor
HAS_DOLPHIN_ACTOR = True
except ImportError:
HAS_DOLPHIN_ACTOR = False
@pytest.mark.skipif(not HAS_DOLPHIN_ACTOR, reason="DolphinActor not importable")
class TestDolphinActorExecSubmitEntry:
def _actor_stub(self, cache):
return _make_actor_stub(cache)
def test_full_entry_flow_short_order(self):
inst = _make_bingx_instrument("SOLUSDT")
cache = FakeCache({inst.id: inst})
stub = self._actor_stub(cache)
entry = {
"asset": "SOLUSDT", "direction": -1, "notional": 3000.0,
"entry_price": 150.0, "trade_id": "trade-sol-001",
"leverage": 2.0, "vel_div": -0.035,
}
prices = {"SOLUSDT": 150.0}
DolphinActor._exec_submit_entry(stub, entry, prices)
assert stub.submit_order.called
order = stub.submit_order.call_args[0][0]
assert "direction:SHORT" in order.tags
assert "cm:2.00" in order.tags
assert "lev:2.00" in order.tags
assert "tid:trade-sol-001" in order.tags
def test_full_entry_flow_long_order(self):
inst = _make_bingx_instrument("ADAUSDT")
cache = FakeCache({inst.id: inst})
stub = self._actor_stub(cache)
entry = {
"asset": "ADAUSDT", "direction": 1, "notional": 2000.0,
"entry_price": 0.45, "trade_id": "trade-ada-002",
"leverage": 1.0, "vel_div": -0.025,
}
prices = {"ADAUSDT": 0.45}
DolphinActor._exec_submit_entry(stub, entry, prices)
assert stub.submit_order.called
order = stub.submit_order.call_args[0][0]
assert order.order_side == OrderSide.BUY
assert "direction:LONG" in order.tags
def test_caps_notional_when_near_capacity_limit(self):
inst = _make_bingx_instrument("BTCUSDT")
cache = FakeCache({inst.id: inst})
stub = self._actor_stub(cache)
stub.engine = SimpleNamespace(capital=10.0)
stub.dolphin_config["engine"]["max_account_leverage"] = 0.01
entry = {
"asset": "BTCUSDT", "direction": -1, "notional": 5000.0,
"entry_price": 100000.0, "trade_id": "t1", "leverage": 1.0,
}
prices = {"BTCUSDT": 100000.0}
DolphinActor._exec_submit_entry(stub, entry, prices)
assert stub.submit_order.called
warn_msgs = [m for lvl, m in stub._log_messages if lvl == "warning"]
assert any("capped by portfolio exposure" in m for m in warn_msgs)
def test_skips_when_notional_zero(self):
inst = _make_bingx_instrument("BTCUSDT")
cache = FakeCache({inst.id: inst})
stub = self._actor_stub(cache)
entry = {
"asset": "BTCUSDT", "direction": -1, "notional": 0.0,
"entry_price": 100000.0, "trade_id": "t1", "leverage": 1.0,
}
prices = {"BTCUSDT": 100000.0}
result = DolphinActor._exec_submit_entry(stub, entry, prices)
assert result is None
assert not stub.submit_order.called
# -- Live integration (requires BingX VST credentials) -----------------------
@pytest.mark.skipif(
not Path("/mnt/dolphinng5_predict/.env").exists(),
reason="No .env file (no BingX credentials)",
)
class TestLiveInstrumentProvider:
def test_loads_instruments_from_vst(self):
import os
from dotenv import load_dotenv
load_dotenv("/mnt/dolphinng5_predict/.env")
from prod.bingx.config import BingxExecClientConfig
from prod.bingx.http import BingxHttpClient
from prod.bingx.instrument_provider import BingxInstrumentProvider, BingxInstrumentProviderConfig
async def _run():
cfg = BingxExecClientConfig(
api_key=os.environ.get("BINGX_API_KEY", ""),
secret_key=os.environ.get("BINGX_SECRET_KEY", ""),
environment=BingxEnvironment.VST,
)
client = BingxHttpClient(config=cfg)
provider = BingxInstrumentProvider(
client=client,
config=BingxInstrumentProviderConfig(load_all=True),
)
await provider.initialize()
instruments = provider.list_all()
await _persist_sandbox_status(client, notes={"test": "loads_instruments_from_vst"})
await client.close()
return instruments
instruments = asyncio.run(_run())
assert len(instruments) > 0
symbols = {i.symbol.value for i in instruments}
assert "BTCUSDT" in symbols
assert "ETHUSDT" in symbols
def test_trxusdt_instrument_has_correct_precision(self):
import os
from dotenv import load_dotenv
load_dotenv("/mnt/dolphinng5_predict/.env")
from prod.bingx.config import BingxExecClientConfig
from prod.bingx.http import BingxHttpClient
from prod.bingx.instrument_provider import BingxInstrumentProvider, BingxInstrumentProviderConfig
async def _run():
cfg = BingxExecClientConfig(
api_key=os.environ.get("BINGX_API_KEY", ""),
secret_key=os.environ.get("BINGX_SECRET_KEY", ""),
environment=BingxEnvironment.VST,
)
client = BingxHttpClient(config=cfg)
provider = BingxInstrumentProvider(
client=client,
config=BingxInstrumentProviderConfig(load_all=True),
)
await provider.initialize()
inst = provider.find(InstrumentId.from_str("TRXUSDT.BINGX"))
await _persist_sandbox_status(client, notes={"test": "trxusdt_instrument_has_correct_precision"})
await client.close()
return inst
inst = asyncio.run(_run())
assert inst is not None
assert inst.size_precision >= 1
assert inst.price_precision >= 1
assert inst.raw_symbol.value == "TRX-USDT"

View File

@@ -0,0 +1,71 @@
from __future__ import annotations
from pathlib import Path
from prod.bingx.sandbox_status import build_sandbox_status
from prod.bingx.sandbox_status import load_sandbox_status
from prod.bingx.sandbox_status import write_sandbox_status
def test_build_sandbox_status_marks_clean_when_flat():
status = build_sandbox_status(
balance_payload={
"balance": {
"balance": "12000.5",
"equity": "12000.5",
"availableMargin": "12000.5",
"unrealizedProfit": "0",
"usedMargin": "0",
}
},
positions_payload=[],
open_orders_payload={"orders": []},
environment="VST",
)
assert status.clean is True
assert status.balance == 12000.5
assert status.equity == 12000.5
assert status.open_positions == 0
assert status.open_orders == 0
def test_build_sandbox_status_marks_dirty_when_positions_or_orders_exist():
status = build_sandbox_status(
balance_payload={
"balance": {
"balance": "12000.5",
"equity": "12500.5",
"availableMargin": "9000.5",
"unrealizedProfit": "500",
"usedMargin": "3000",
}
},
positions_payload=[{"symbol": "BTC-USDT"}, {"symbol": "ETH-USDT"}],
open_orders_payload={"orders": [{"symbol": "BTC-USDT"}]},
environment="VST",
)
assert status.clean is False
assert status.open_positions == 2
assert status.open_orders == 1
assert status.unrealized_profit == 500.0
def test_write_and_load_sandbox_status_round_trip(tmp_path: Path):
status = build_sandbox_status(
balance_payload={"balance": {"balance": "10", "equity": "11", "availableMargin": "9", "unrealizedProfit": "1", "usedMargin": "2"}},
positions_payload=[],
open_orders_payload=[],
environment="VST",
notes={"source": "unit-test"},
)
path = tmp_path / "bingx_sandbox_status.json"
write_sandbox_status(status, path)
loaded = load_sandbox_status(path)
assert loaded is not None
assert loaded["balance"] == 10.0
assert loaded["equity"] == 11.0
assert loaded["clean"] is True
assert loaded["notes"]["source"] == "unit-test"

View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python3
"""Tests for capital restore source selection on startup."""
import json
import os
from datetime import datetime, timezone
from unittest.mock import patch
import pytest
from prod.nautilus_event_trader import DolphinLiveTrader
class _MapStub:
def __init__(self, payloads):
self._payloads = payloads
def blocking(self):
return self
def get(self, key):
return self._payloads.get(key)
def _build_trader() -> DolphinLiveTrader:
trader = DolphinLiveTrader()
trader._build_engine()
trader.eng.begin_day(datetime.now(timezone.utc).strftime("%Y-%m-%d"), posture="APEX")
return trader
def test_restore_prefers_fresher_engine_snapshot_over_stale_latest_nautilus():
trader = _build_trader()
trader.eng.capital = 25_000.0
trader.state_map = _MapStub(
{
"latest_nautilus": json.dumps(
{
"capital": 31_049.44,
"updated_at": "2026-05-13T10:52:40+00:00",
}
),
"engine_snapshot": json.dumps(
{
"capital": 33_150.07,
"timestamp": "2026-05-13T16:20:38+00:00",
}
),
}
)
trader.pnl_map = _MapStub({})
with patch.dict(os.environ, {"DOLPHIN_CAPITAL_SEED_STALE_LAG_SEC": "180"}, clear=False):
trader._restore_capital()
assert trader.eng.capital == pytest.approx(33_150.07, abs=0.01)
assert trader._restore_source == "HZ engine_snapshot"
def test_restore_can_force_latest_nautilus_override():
trader = _build_trader()
trader.eng.capital = 25_000.0
trader.state_map = _MapStub(
{
"latest_nautilus": json.dumps(
{
"capital": 31_049.44,
"updated_at": "2026-05-13T10:52:40+00:00",
}
),
"engine_snapshot": json.dumps(
{
"capital": 33_150.07,
"timestamp": "2026-05-13T16:20:38+00:00",
}
),
}
)
trader.pnl_map = _MapStub({})
with patch.dict(os.environ, {"DOLPHIN_FORCE_LATEST_NAUTILUS_RESTORE": "1"}, clear=False):
trader._restore_capital()
assert trader.eng.capital == pytest.approx(31_049.44, abs=0.01)
assert trader._restore_source == "HZ latest_nautilus"