Files
siloqy/prod/launch_dita_v2.py

104 lines
2.8 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Operator-facing entrypoint for the DITAv2 kernel.
The launcher is env-driven and intentionally conservative by default:
- mock venue
- in-memory Zinc plane
- callback projection
- control-plane values may be overridden via DITA_V2_* env vars
"""
from __future__ import annotations
import json
import os
import signal
import time
from pathlib import Path
import sys
from dotenv import load_dotenv
PROJECT_ROOT = Path(__file__).parent.parent
load_dotenv(PROJECT_ROOT / ".env")
sys.path.insert(0, str(PROJECT_ROOT / "prod"))
sys.path.insert(0, str(PROJECT_ROOT / "prod" / "clean_arch"))
sys.path.insert(0, str(PROJECT_ROOT))
from prod.clean_arch.dita_v2.launcher import build_launcher_bundle
def _env_bool(name: str, default: bool = False) -> bool:
raw = os.environ.get(name)
if raw is None:
return default
return str(raw).strip().lower() in {"1", "true", "yes", "on"}
def _env_float(name: str, default: float) -> float:
raw = os.environ.get(name)
if raw is None:
return default
try:
value = float(str(raw).strip())
except Exception:
return default
return value if value > 0 else default
def _env_mode() -> str:
mode = str(os.environ.get("DITA_V2_LAUNCHER_MODE", "serve")).strip().lower()
if mode in {"once", "serve"}:
return mode
return "serve"
def _serve(bundle) -> int:
interval = _env_float("DITA_V2_LAUNCHER_HEARTBEAT_SEC", 30.0)
stop = False
def _handle_signal(signum, _frame) -> None:
nonlocal stop
stop = True
previous_term = signal.signal(signal.SIGTERM, _handle_signal)
previous_int = signal.signal(signal.SIGINT, _handle_signal)
try:
print(
json.dumps(
{
"status": "serving",
"control": bundle.kernel.control.as_dict(),
"venue": type(bundle.venue).__name__,
"zinc_plane": type(bundle.zinc_plane).__name__,
"projection": type(bundle.projection).__name__,
"heartbeat_sec": interval,
},
indent=2,
sort_keys=True,
default=str,
)
)
while not stop:
time.sleep(interval)
return 0
finally:
signal.signal(signal.SIGTERM, previous_term)
signal.signal(signal.SIGINT, previous_int)
def main() -> int:
bundle = build_launcher_bundle()
try:
mode = _env_mode()
if mode == "once" or _env_bool("DITA_V2_PRINT_SNAPSHOT", False):
print(json.dumps(bundle.kernel.snapshot(), indent=2, sort_keys=True, default=str))
return 0
return _serve(bundle)
finally:
bundle.close()
if __name__ == "__main__":
raise SystemExit(main())