#!/usr/bin/env python3 """Operator-facing entrypoint for the DITAv2 kernel. The launcher is env-driven and intentionally conservative by default: - mock venue - in-memory Zinc plane - callback projection - control-plane values may be overridden via DITA_V2_* env vars """ from __future__ import annotations import json import os import signal import time from pathlib import Path import sys from dotenv import load_dotenv PROJECT_ROOT = Path(__file__).parent.parent load_dotenv(PROJECT_ROOT / ".env") sys.path.insert(0, str(PROJECT_ROOT / "prod")) sys.path.insert(0, str(PROJECT_ROOT / "prod" / "clean_arch")) sys.path.insert(0, str(PROJECT_ROOT)) from prod.clean_arch.dita_v2.launcher import build_launcher_bundle def _env_bool(name: str, default: bool = False) -> bool: raw = os.environ.get(name) if raw is None: return default return str(raw).strip().lower() in {"1", "true", "yes", "on"} def _env_float(name: str, default: float) -> float: raw = os.environ.get(name) if raw is None: return default try: value = float(str(raw).strip()) except Exception: return default return value if value > 0 else default def _env_mode() -> str: mode = str(os.environ.get("DITA_V2_LAUNCHER_MODE", "serve")).strip().lower() if mode in {"once", "serve"}: return mode return "serve" def _serve(bundle) -> int: interval = _env_float("DITA_V2_LAUNCHER_HEARTBEAT_SEC", 30.0) stop = False def _handle_signal(signum, _frame) -> None: nonlocal stop stop = True previous_term = signal.signal(signal.SIGTERM, _handle_signal) previous_int = signal.signal(signal.SIGINT, _handle_signal) try: print( json.dumps( { "status": "serving", "control": bundle.kernel.control.as_dict(), "venue": type(bundle.venue).__name__, "zinc_plane": type(bundle.zinc_plane).__name__, "projection": type(bundle.projection).__name__, "heartbeat_sec": interval, }, indent=2, sort_keys=True, default=str, ) ) while not stop: time.sleep(interval) return 0 finally: signal.signal(signal.SIGTERM, previous_term) signal.signal(signal.SIGINT, previous_int) def main() -> int: bundle = build_launcher_bundle() try: mode = _env_mode() if mode == "once" or _env_bool("DITA_V2_PRINT_SNAPSHOT", False): print(json.dumps(bundle.kernel.snapshot(), indent=2, sort_keys=True, default=str)) return 0 return _serve(bundle) finally: bundle.close() if __name__ == "__main__": raise SystemExit(main())