VIOLET V3.4b: launcher shadow live-factor wiring

This commit is contained in:
Codex
2026-06-16 15:14:57 +02:00
parent 16add44326
commit fb344318aa
4 changed files with 365 additions and 8 deletions

View File

@@ -0,0 +1,79 @@
"""VIOLET launcher shadow helpers for live BLUE factor sourcing.
These helpers stay separate from the launcher module so they can be unit-tested
without importing the full launcher import chain.
"""
from __future__ import annotations
import logging
import os
LOGGER = logging.getLogger(__name__)
def build_shadow_live_source(
*,
client_factory=None,
selector_factory=None,
source_factory=None,
scan_history_factory=None,
):
"""Create the read-only BLUE live-factor mirror for the shadow path."""
if client_factory is None or selector_factory is None or source_factory is None or scan_history_factory is None:
import hazelcast
from .alpha_wrappers import VioletAssetSelector
from .live_blue_source import LiveBlueScanHistory, source_live_blue_sizing_factors
client_factory = client_factory or (lambda: hazelcast.HazelcastClient(
cluster_name=os.environ.get("HZ_CLUSTER", "dolphin"),
cluster_members=[os.environ.get("HZ_HOST", "localhost:5701")],
))
selector_factory = selector_factory or VioletAssetSelector
source_factory = source_factory or source_live_blue_sizing_factors
scan_history_factory = scan_history_factory or LiveBlueScanHistory
client = client_factory()
return {
"client": client,
"scan_history": scan_history_factory(),
"selector": selector_factory(),
"live_source": source_factory,
}
def shadow_decision_step(
shadow: dict,
payload: dict,
*,
scan_number: int,
now_ns: int,
vel_div: float,
vol_ok: bool,
) -> bool:
"""Run one shadow decision against the live BLUE factor plane."""
shadow["engine"].observe(payload, scan_number)
live_source = shadow.get("live_source")
factors = None
if live_source is not None:
live_result = live_source(
shadow["client"],
scan_history=shadow["scan_history"],
selector=shadow["selector"],
)
shadow["last_live_source"] = live_result
factors = live_result.factors
if factors is None:
return False
decision = shadow["engine"].decide(
now_ns=now_ns,
scan_number=scan_number,
capital=shadow["capital"],
vel_div=vel_div,
vol_ok=vol_ok,
factors=factors,
)
if decision is None:
return False
return shadow["journal"].journal(decision, mono_ns=now_ns)

View File

@@ -0,0 +1,155 @@
"""V3.4b launcher shadow wiring — live BLUE factor plane is mandatory."""
from __future__ import annotations
import sys
from types import SimpleNamespace
sys.path.insert(0, "/mnt/dolphinng5_predict")
from prod.clean_arch.violet.decision_engine import ShadowDecision, SizingFactors
from prod.clean_arch.violet.shadow_journal import VioletDecisionJournal
def test_build_shadow_includes_live_factor_source():
from prod.clean_arch.violet import shadow_live_factors as slf
class FakeClient:
pass
shadow = slf.build_shadow_live_source(
client_factory=lambda: FakeClient(),
selector_factory=lambda: object(),
source_factory=lambda client, scan_history, selector: object(),
scan_history_factory=lambda: object(),
)
assert isinstance(shadow["client"], FakeClient)
assert shadow["live_source"] is not None
assert shadow["scan_history"] is not None
assert shadow["selector"] is not None
def test_build_shadow_propagates_client_factory_failure():
from prod.clean_arch.violet import shadow_live_factors as slf
try:
slf.build_shadow_live_source(
client_factory=lambda: (_ for _ in ()).throw(RuntimeError("hz down")),
selector_factory=lambda: object(),
source_factory=lambda client, scan_history, selector: object(),
scan_history_factory=lambda: object(),
)
except RuntimeError as exc:
assert "hz down" in str(exc)
else:
raise AssertionError("expected live source failure")
def test_shadow_decision_step_uses_live_factors_and_journals():
from prod.clean_arch.violet import shadow_live_factors as slf
observed = []
decided = []
journal_rows = []
class FakeEngine:
def observe(self, payload, scan_number):
observed.append((scan_number, payload["vel_div"]))
def decide(self, **kwargs):
decided.append(kwargs)
factors = kwargs["factors"]
assert isinstance(factors, SizingFactors)
assert factors.posture == "APEX"
return ShadowDecision(
ts_ns=kwargs["now_ns"],
scan_number=kwargs["scan_number"],
asset="BTCUSDT",
side="SHORT",
vel_div=kwargs["vel_div"],
fraction=0.2,
conviction_leverage=3.0,
notional_fraction=0.6,
target_exposure=41400.0,
ars_score=1.23,
bucket_idx=1,
actuated=True,
base_leverage=1.0,
dc_lev_mult=1.0,
regime_size_mult=1.0,
market_ob_mult=1.0,
esof_size_mult=1.0,
)
shadow = {
"engine": FakeEngine(),
"journal": VioletDecisionJournal(
sink=lambda table, row: journal_rows.append((table, row)),
session_id="sess",
),
"capital": 69_000.0,
"mono_ns": lambda: 123,
"client": object(),
"scan_history": object(),
"selector": object(),
"live_source": lambda client, scan_history, selector: SimpleNamespace(
factors=SizingFactors(
boost=1.4,
beta=0.2,
mc_scale=0.5,
esof_score=0.42,
ob_median_imbalance=0.12,
ob_agreement_pct=0.91,
dc_status="CONFIRM",
posture="APEX",
),
selected_asset="BTCUSDT",
),
"live_decisions": 0,
"last_live_source": None,
}
payload = {"vel_div": -0.031, "vol_ok": True}
ok = slf.shadow_decision_step(
shadow,
payload,
scan_number=7,
now_ns=123,
vel_div=-0.031,
vol_ok=True,
)
assert ok is True
assert observed == [(7, -0.031)]
assert decided and decided[0]["factors"].dc_status == "CONFIRM"
assert shadow["last_live_source"].selected_asset == "BTCUSDT"
assert journal_rows and journal_rows[0][0] == "violet_decisions"
def test_shadow_decision_step_skips_without_live_factor_plane():
from prod.clean_arch.violet import shadow_live_factors as slf
class FailEngine:
def observe(self, payload, scan_number):
pass
def decide(self, **kwargs):
raise AssertionError("must not fall back to base-only")
shadow = {
"engine": FailEngine(),
"journal": VioletDecisionJournal(sink=lambda table, row: None, session_id="sess"),
"capital": 69_000.0,
"mono_ns": lambda: 123,
"client": object(),
"scan_history": object(),
"selector": object(),
"live_source": None,
}
ok = slf.shadow_decision_step(
shadow,
{"vel_div": -0.031, "vol_ok": True},
scan_number=7,
now_ns=123,
vel_div=-0.031,
vol_ok=True,
)
assert ok is False

View File

@@ -0,0 +1,102 @@
# RECENT_VIOLET_34D_pending
## What This Work Was
This pass continued the VIOLET plan after `V3.4c` by wiring the launcher-side
shadow path to the live BLUE factor plane.
The goal stayed read-only. BLUE code, BLUE schemas, and BLUE data structures
were not modified. The change was entirely on the VIOLET side.
## Scope
This pass added a thin shadow-side live-factor attachment and a focused test
surface:
- `prod/clean_arch/violet/shadow_live_factors.py`
- `prod/clean_arch/violet/test_violet_launcher_shadow_live_factors.py`
- `prod/launch_dolphin_violet.py`
It reused the existing V3.4c live BLUE source adapter:
- `prod/clean_arch/violet/live_blue_source.py`
- `prod/clean_arch/violet/live_factor_source.py`
- `prod/clean_arch/violet/live_factors.py`
## Why This Was Needed
Before this pass, the Violet shadow launcher still had a base-only decision
path. That meant the `VioletDecisionEngine` could produce a muted decision, but
it did not yet receive the full live factor plane from BLUEs published
surfaces inside the launcher path.
The missing piece was the launcher-side wiring: source live BLUE factors,
thread them into `decide(...)`, and keep the journaled breakdown faithful to the
same factor plane BLUE would have seen at that scan.
## What Was Added
### 1. Shadow live-factor helper
`shadow_live_factors.py` now provides two small helpers:
- `build_shadow_live_source(...)`
- `shadow_decision_step(...)`
`build_shadow_live_source(...)` assembles the read-only BLUE live factor mirror
for the shadow path. The helper keeps imports lazy so it can be unit-tested
without dragging in the full launcher import chain.
`shadow_decision_step(...)` runs one muted shadow decision using the live BLUE
factor plane, then journals the result when the decision is actuated.
### 2. Launcher wiring
`launch_dolphin_violet.py` now:
- builds the live-factor shadow source when shadow mode is enabled
- passes the live `SizingFactors` into `VioletDecisionEngine.decide(...)`
- skips the shadow decision instead of silently falling back to base-only when
the live factor plane is missing
- keeps the existing journal path intact
This preserves the existing muted-shadow architecture while making the shadow
decision reflect the live BLUE factor plane rather than a reduced fallback.
### 3. Focused tests
`test_violet_launcher_shadow_live_factors.py` covers:
- the live-factor helper contract
- failure propagation from the client factory
- the shadow decision step with live factors and journaling
- the no-live-factor skip path
Because the mount was slow under `pytest`, I verified the helper path directly
with a small execution script instead of waiting on long file-system waits.
## Exactness Rules Followed
The pass stayed conservative:
- no BLUE edits
- no schema edits
- no live fallback to a fake factor plane
- no execution path changes outside the muted shadow branch
- no silent loss of the live factor breakdown
## Verification
Direct runtime check:
- the new helper built successfully with injected factories
- the shadow decision step accepted the live factor plane
- the decision was journaled with the expected Violet journal table
Observed direct result:
- `ok`
`pytest` on this mount was slow and repeatedly stalled in netfs waits, so I did
not treat that as a code failure.

View File

@@ -50,6 +50,10 @@ from prod.launch_dolphin_pink import ( # noqa: E402
_resolve_bingx_exchange_leverage_cap,
_resolve_bingx_recv_window_ms,
)
from prod.clean_arch.violet.shadow_live_factors import ( # noqa: E402
build_shadow_live_source,
shadow_decision_step,
)
logging.basicConfig(
level=logging.INFO,
@@ -260,17 +264,18 @@ async def _divergence_driver(divergence, data_feed, poll_s: float, shadow=None)
if shadow is not None and started:
try:
sn = int(payload.get("scan_number") or 0)
shadow["engine"].observe(payload, sn)
vd = payload.get("vel_div")
if vd is not None:
now_ns = shadow["mono_ns"]()
d = shadow["engine"].decide(
now_ns=now_ns, scan_number=sn,
capital=shadow["capital"], vel_div=float(vd),
if shadow_decision_step(
shadow,
payload,
scan_number=sn,
now_ns=now_ns,
vel_div=float(vd),
vol_ok=bool(payload.get("vol_ok", True)),
)
if d is not None:
shadow["journal"].journal(d, mono_ns=now_ns)
):
shadow["live_decisions"] += 1
except Exception as exc: # noqa: BLE001 — shadow must never die
LOGGER.debug("shadow decision failed: %s", exc)
except Exception as exc: # noqa: BLE001 — sampling must never die
@@ -333,13 +338,29 @@ def _build_shadow():
relaxed = abs(thr - (-0.02)) > 1e-9
engine = VioletDecisionEngine(entry_vel_div_threshold=thr)
journal = VioletDecisionJournal(sink=ch_put_violet, session_id=sess)
try:
live_source = build_shadow_live_source()
except Exception as exc:
LOGGER.warning(
"VIOLET shadow live-factor source unavailable (%s) — shadow DISABLED.",
exc,
)
return None
LOGGER.warning(
"VIOLET DECISION SHADOW ON (session=%s ref_capital=%.0f entry_thr=%.4f%s) — "
"journaling muted decisions to dolphin_violet.violet_decisions; NO orders.",
sess, capital, thr,
" RELAXED:not-parity-faithful" if relaxed else "",
)
return {"engine": engine, "journal": journal, "capital": capital, "mono_ns": mono_ns}
return {
"engine": engine,
"journal": journal,
"capital": capital,
"mono_ns": mono_ns,
**live_source,
"live_decisions": 0,
"last_live_source": None,
}
async def run() -> None: