Includes core prod + GREEN/BLUE subsystems: - prod/ (BLUE harness, configs, scripts, docs) - nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved) - adaptive_exit/ (AEM engine + models/bucket_assignments.pkl) - Observability/ (EsoF advisor, TUI, dashboards) - external_factors/ (EsoF producer) - mc_forewarning_qlabs_fork/ (MC regime/envelope) Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
9.5 KiB
Executable File
9.5 KiB
Executable File
Nautilus Trader Integration Roadmap
Purpose: Connect ExtF (External Factors) to Nautilus Trader execution layer with microsecond-level latency
Current State: Python event-driven (<10ms)
Target: <100μs for HFT execution fills
Stack: Python (ExtF) → [IPC Bridge] → Nautilus Rust Core
Phase 1: Testing (Current - Python)
Duration: 1-2 weeks
Goal: Validate Nautilus integration with current Python ExtF
Implementation
# nautilus_exf_adapter.py
from nautilus_trader.adapters import DataAdapter
from nautilus_trader.model.data import QuoteTick
import hazelcast
import json
class ExFDataAdapter(DataAdapter):
"""
Feed ExtF data directly into Nautilus.
Latency target: <10ms (Python → Nautilus)
"""
def __init__(self):
self.hz = hazelcast.HazelcastClient(
cluster_name="dolphin",
cluster_members=["localhost:5701"]
)
self.last_btc_price = 50000.0
def subscribe_external_factors(self, handler):
"""
Subscribe to ExtF updates.
Called by Nautilus Rust core at strategy init.
"""
while True:
data = self._fetch_latest()
# Convert to Nautilus QuoteTick
quote = self._to_quote_tick(data)
# Push to Nautilus (goes to Rust core)
handler(quote)
time.sleep(0.001) # 1ms poll (1000Hz)
def _fetch_latest(self) -> dict:
"""Fetch from Hazelcast."""
raw = self.hz.get_map("DOLPHIN_FEATURES").blocking().get("exf_latest")
return json.loads(raw) if raw else {}
def _to_quote_tick(self, data: dict) -> QuoteTick:
"""
Convert ExtF indicators to Nautilus QuoteTick.
Uses basis, spread, imbal to construct synthetic order book.
"""
btc_price = self.last_btc_price
spread_bps = data.get('spread', 5.0)
imbal = data.get('imbal_btc', 0.0)
# Adjust bid/ask based on imbalance
# Positive imbal = more bids = tighter ask
spread_pct = spread_bps / 10000.0
half_spread = spread_pct / 2
bid = btc_price * (1 - half_spread * (1 - imbal * 0.1))
ask = btc_price * (1 + half_spread * (1 + imbal * 0.1))
return QuoteTick(
instrument_id=BTCUSDT_BINANCE,
bid_price=Price(bid, 2),
ask_price=Price(ask, 2),
bid_size=Quantity(1.0, 8),
ask_size=Quantity(1.0, 8),
ts_event=time.time_ns(),
ts_init=time.time_ns(),
)
Metrics to Measure
# Measure actual latency
import time
latencies = []
for _ in range(1000):
t0 = time.time_ns()
data = hz.get_map("DOLPHIN_FEATURES").blocking().get("exf_latest")
parsed = json.loads(data)
t1 = time.time_ns()
latencies.append((t1 - t0) / 1e6) # Convert to μs
print(f"Median: {np.median(latencies):.1f}μs")
print(f"P99: {np.percentile(latencies, 99):.1f}μs")
print(f"Max: {max(latencies):.1f}μs")
Acceptance Criteria:
- Median latency < 500μs: ✅ Continue to Phase 2
- Median latency 500μs-2ms: ⚠️ Optimize further
- Median latency > 2ms: 🚫 Need Java port
Phase 2: Shared Memory Bridge (Python → Nautilus)
Duration: 2-3 weeks
Goal: <100μs Python → Nautilus latency
Tech: mmap / shared memory
Architecture
┌─────────────────────────────────────────────────────────┐
│ Python ExtF Service │ Nautilus Rust Core │
│ │ │
│ [Poll APIs: 0.5s] │ [Strategy] │
│ ↓ │ ↓ │
│ [Update state] │ [Decision] │
│ ↓ │ ↓ │
│ ┌──────────────────┐ │ ┌──────────────────┐ │
│ │ Shared Memory │ │ │ Shared Memory │ │
│ │ /dev/shm/dolphin │◄───────┼──►│ /dev/shm/dolphin │ │
│ │ (mmap) │ │ │ (mmap) │ │
│ └──────────────────┘ │ └──────────────────┘ │
│ │ │
│ Write: <1μs │ Read: <1μs │
└──────────────────────────────┴──────────────────────────┘
Implementation
Python Writer (exf_shared_memory.py):
import mmap
import struct
import json
class ExFSharedMemory:
"""
Write ExtF data to shared memory for Nautilus consumption.
Format: Binary structured data (not JSON - faster)
"""
def __init__(self, size=4096):
self.fd = os.open('/dev/shm/dolphin_exf', os.O_CREAT | os.O_RDWR)
os.ftruncate(self.fd, size)
self.mm = mmap.mmap(self.fd, size)
def write(self, indicators: dict):
"""
Write indicators to shared memory.
Format: [timestamp:8][n_indicators:4][indicator_data:...]
"""
self.mm.seek(0)
# Timestamp (ns)
self.mm.write(struct.pack('Q', time.time_ns()))
# Count
n = len([k for k in indicators if not k.startswith('_')])
self.mm.write(struct.pack('I', n))
# Indicators (name_len, name, value)
for key, value in indicators.items():
if key.startswith('_'):
continue
if not isinstance(value, (int, float)):
continue
name_bytes = key.encode('utf-8')
self.mm.write(struct.pack('H', len(name_bytes))) # name_len
self.mm.write(name_bytes) # name
self.mm.write(struct.pack('d', float(value))) # value (double)
def close(self):
self.mm.close()
os.close(self.fd)
Nautilus Reader (Rust FFI):
// dolphin_exf_reader.rs
use std::fs::OpenOptions;
use std::os::unix::fs::OpenOptionsExt;
use memmap2::MmapMut;
pub struct ExFReader {
mmap: MmapMut,
}
impl ExFReader {
pub fn new() -> Self {
let file = OpenOptions::new()
.read(true)
.write(true)
.custom_flags(libc::O_CREAT)
.open("/dev/shm/dolphin_exf")
.unwrap();
let mmap = unsafe { MmapMut::map_mut(&file).unwrap() };
Self { mmap }
}
pub fn read(&self) -> ExFData {
// Zero-copy read from shared memory
// <1μs latency
let timestamp = u64::from_le_bytes([
self.mmap[0], self.mmap[1], self.mmap[2], self.mmap[3],
self.mmap[4], self.mmap[5], self.mmap[6], self.mmap[7],
]);
// ... parse rest of structure
ExFData {
timestamp,
indicators: self.parse_indicators(),
}
}
}
Expected Performance
- Python write: ~500ns
- Rust read: ~500ns
- Total latency: ~1μs (vs 10ms with Hazelcast)
Phase 3: Java Port (If Needed)
Duration: 1-2 months
Goal: <50μs end-to-end
Trigger: If Phase 2 > 100μs
Architecture
[Exchange APIs]
↓
[Java ExtF Service]
- Chronicle Queue (IPC)
- Agrona (data structures)
- Disruptor (event processing)
↓
[Nautilus Rust Core]
- Native Aeron/UDP reader
Key Libraries
- Chronicle Queue: Persistent IPC, <1μs latency
- Agrona: Lock-free data structures
- Disruptor: 1M+ events/sec
- Aeron: UDP multicast, <50μs network
Implementation Sketch
@Service
public class ExFExecutionService {
private final ChronicleQueue queue;
private final Disruptor<IndicatorEvent> disruptor;
private final RingBuffer<IndicatorEvent> ringBuffer;
public void onIndicatorUpdate(String name, double value) {
// Lock-free publish to Disruptor
long seq = ringBuffer.next();
try {
IndicatorEvent event = ringBuffer.get(seq);
event.setName(name);
event.setValue(value);
event.setTimestamp(System.nanoTime());
} finally {
ringBuffer.publish(seq);
}
}
@EventHandler
public void onEvent(IndicatorEvent event, long seq, boolean endOfBatch) {
// Process and write to Chronicle Queue
// Nautilus reads from queue
queue.acquireAppender().writeDocument(w -> {
w.getValueOut().object(event);
});
}
}
Decision Matrix
| Phase | Latency | Complexity | When to Use |
|---|---|---|---|
| 1: Python + HZ | ~5-10ms | Low | Testing, low-frequency trading |
| 2: Shared Memory | ~100μs | Medium | HFT, fill optimization |
| 3: Java + Chronicle | ~50μs | High | Ultra-HFT, co-location |
Immediate Next Steps
- Deploy Python event-driven (today):
./start_exf.sh restart - Test Nautilus integration (this week): Measure actual latency
- Implement shared memory (if needed): Target <100μs
- Java port (if needed): Target <50μs
Document: NAUTILUS_INTEGRATION_ROADMAP.md
Author: Kimi, DESTINATION/DOLPHIN Machine dev/prod-Agent
Date: 2026-03-20