95 lines
3.4 KiB
Python
95 lines
3.4 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from uuid import uuid4
|
||
|
|
import os
|
||
|
|
import unittest
|
||
|
|
|
||
|
|
from prod.clean_arch.dita_v2 import (
|
||
|
|
BackendMode,
|
||
|
|
ControlUpdate,
|
||
|
|
InMemoryControlPlane,
|
||
|
|
ZincControlPlane,
|
||
|
|
KernelControlSnapshot,
|
||
|
|
KernelMode,
|
||
|
|
KernelVerbosity,
|
||
|
|
RealZincControlPlane,
|
||
|
|
build_control_plane,
|
||
|
|
)
|
||
|
|
from prod.clean_arch.dita_v2.real_control_plane import SharedRegion
|
||
|
|
|
||
|
|
|
||
|
|
HAS_REAL_ZINC = SharedRegion is not None
|
||
|
|
|
||
|
|
|
||
|
|
@unittest.skipUnless(HAS_REAL_ZINC, "Real Zinc adapter is unavailable")
|
||
|
|
class TestDITAv2RealControlPlane(unittest.TestCase):
|
||
|
|
def test_build_control_plane_defaults_to_zinc(self) -> None:
|
||
|
|
plane = build_control_plane()
|
||
|
|
self.assertIsInstance(plane, ZincControlPlane)
|
||
|
|
|
||
|
|
def test_roundtrip_update_and_read(self) -> None:
|
||
|
|
prefix = f"dita_v2_control_{uuid4().hex}"
|
||
|
|
writer = RealZincControlPlane(prefix=prefix, create=True)
|
||
|
|
reader = RealZincControlPlane(prefix=prefix, create=False)
|
||
|
|
try:
|
||
|
|
snapshot = writer.update(
|
||
|
|
ControlUpdate(
|
||
|
|
mode=KernelMode.DEBUG,
|
||
|
|
verbosity=KernelVerbosity.TRACE,
|
||
|
|
backend_mode=BackendMode.BINGX,
|
||
|
|
trace_transitions=True,
|
||
|
|
mirror_to_hazelcast=True,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
self.assertEqual(snapshot.mode, KernelMode.DEBUG)
|
||
|
|
self.assertEqual(snapshot.verbosity, KernelVerbosity.TRACE)
|
||
|
|
self.assertEqual(snapshot.backend_mode, BackendMode.BINGX)
|
||
|
|
self.assertTrue(snapshot.trace_transitions)
|
||
|
|
read_back = reader.read()
|
||
|
|
self.assertEqual(read_back.mode, KernelMode.DEBUG)
|
||
|
|
self.assertEqual(read_back.verbosity, KernelVerbosity.TRACE)
|
||
|
|
self.assertEqual(read_back.backend_mode, BackendMode.BINGX)
|
||
|
|
self.assertTrue(read_back.trace_transitions)
|
||
|
|
finally:
|
||
|
|
writer.close()
|
||
|
|
reader.close()
|
||
|
|
|
||
|
|
def test_env_can_select_real_control_plane(self) -> None:
|
||
|
|
prefix = f"dita_v2_control_{uuid4().hex}"
|
||
|
|
previous = os.environ.get("DITA_V2_CONTROL_PLANE")
|
||
|
|
os.environ["DITA_V2_CONTROL_PLANE"] = "REAL_ZINC"
|
||
|
|
try:
|
||
|
|
plane = build_control_plane(prefix=prefix)
|
||
|
|
self.assertIsInstance(plane, RealZincControlPlane)
|
||
|
|
if isinstance(plane, RealZincControlPlane):
|
||
|
|
plane.close()
|
||
|
|
finally:
|
||
|
|
if previous is None:
|
||
|
|
os.environ.pop("DITA_V2_CONTROL_PLANE", None)
|
||
|
|
else:
|
||
|
|
os.environ["DITA_V2_CONTROL_PLANE"] = previous
|
||
|
|
|
||
|
|
def test_initial_snapshot_is_default(self) -> None:
|
||
|
|
prefix = f"dita_v2_control_{uuid4().hex}"
|
||
|
|
plane = RealZincControlPlane(prefix=prefix, create=True)
|
||
|
|
try:
|
||
|
|
snapshot = plane.read()
|
||
|
|
self.assertEqual(snapshot, KernelControlSnapshot())
|
||
|
|
finally:
|
||
|
|
plane.close()
|
||
|
|
|
||
|
|
|
||
|
|
class TestDITAv2InMemoryControlPlane(unittest.TestCase):
|
||
|
|
def test_wait_and_notify(self) -> None:
|
||
|
|
plane = InMemoryControlPlane()
|
||
|
|
self.assertFalse(plane.wait(timeout_ms=1))
|
||
|
|
plane.notify()
|
||
|
|
self.assertTrue(plane.wait(timeout_ms=1))
|
||
|
|
snapshot = plane.update(ControlUpdate(mode=KernelMode.DEBUG, verbosity=KernelVerbosity.TRACE))
|
||
|
|
self.assertEqual(snapshot.mode, KernelMode.DEBUG)
|
||
|
|
self.assertEqual(snapshot.verbosity, KernelVerbosity.TRACE)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
unittest.main()
|