Includes core prod + GREEN/BLUE subsystems: - prod/ (BLUE harness, configs, scripts, docs) - nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved) - adaptive_exit/ (AEM engine + models/bucket_assignments.pkl) - Observability/ (EsoF advisor, TUI, dashboards) - external_factors/ (EsoF producer) - mc_forewarning_qlabs_fork/ (MC regime/envelope) Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
319 lines
9.5 KiB
Markdown
Executable File
319 lines
9.5 KiB
Markdown
Executable File
# Nautilus Trader Integration Roadmap
|
||
|
||
**Purpose**: Connect ExtF (External Factors) to Nautilus Trader execution layer with microsecond-level latency
|
||
**Current State**: Python event-driven (<10ms)
|
||
**Target**: <100μs for HFT execution fills
|
||
**Stack**: Python (ExtF) → [IPC Bridge] → Nautilus Rust Core
|
||
|
||
---
|
||
|
||
## Phase 1: Testing (Current - Python)
|
||
|
||
**Duration**: 1-2 weeks
|
||
**Goal**: Validate Nautilus integration with current Python ExtF
|
||
|
||
### Implementation
|
||
```python
|
||
# nautilus_exf_adapter.py
|
||
from nautilus_trader.adapters import DataAdapter
|
||
from nautilus_trader.model.data import QuoteTick
|
||
import hazelcast
|
||
import json
|
||
|
||
class ExFDataAdapter(DataAdapter):
|
||
"""
|
||
Feed ExtF data directly into Nautilus.
|
||
|
||
Latency target: <10ms (Python → Nautilus)
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.hz = hazelcast.HazelcastClient(
|
||
cluster_name="dolphin",
|
||
cluster_members=["localhost:5701"]
|
||
)
|
||
self.last_btc_price = 50000.0
|
||
|
||
def subscribe_external_factors(self, handler):
|
||
"""
|
||
Subscribe to ExtF updates.
|
||
|
||
Called by Nautilus Rust core at strategy init.
|
||
"""
|
||
while True:
|
||
data = self._fetch_latest()
|
||
|
||
# Convert to Nautilus QuoteTick
|
||
quote = self._to_quote_tick(data)
|
||
|
||
# Push to Nautilus (goes to Rust core)
|
||
handler(quote)
|
||
|
||
time.sleep(0.001) # 1ms poll (1000Hz)
|
||
|
||
def _fetch_latest(self) -> dict:
|
||
"""Fetch from Hazelcast."""
|
||
raw = self.hz.get_map("DOLPHIN_FEATURES").blocking().get("exf_latest")
|
||
return json.loads(raw) if raw else {}
|
||
|
||
def _to_quote_tick(self, data: dict) -> QuoteTick:
|
||
"""
|
||
Convert ExtF indicators to Nautilus QuoteTick.
|
||
|
||
Uses basis, spread, imbal to construct synthetic order book.
|
||
"""
|
||
btc_price = self.last_btc_price
|
||
spread_bps = data.get('spread', 5.0)
|
||
imbal = data.get('imbal_btc', 0.0)
|
||
|
||
# Adjust bid/ask based on imbalance
|
||
# Positive imbal = more bids = tighter ask
|
||
spread_pct = spread_bps / 10000.0
|
||
half_spread = spread_pct / 2
|
||
|
||
bid = btc_price * (1 - half_spread * (1 - imbal * 0.1))
|
||
ask = btc_price * (1 + half_spread * (1 + imbal * 0.1))
|
||
|
||
return QuoteTick(
|
||
instrument_id=BTCUSDT_BINANCE,
|
||
bid_price=Price(bid, 2),
|
||
ask_price=Price(ask, 2),
|
||
bid_size=Quantity(1.0, 8),
|
||
ask_size=Quantity(1.0, 8),
|
||
ts_event=time.time_ns(),
|
||
ts_init=time.time_ns(),
|
||
)
|
||
```
|
||
|
||
### Metrics to Measure
|
||
```python
|
||
# Measure actual latency
|
||
import time
|
||
|
||
latencies = []
|
||
for _ in range(1000):
|
||
t0 = time.time_ns()
|
||
data = hz.get_map("DOLPHIN_FEATURES").blocking().get("exf_latest")
|
||
parsed = json.loads(data)
|
||
t1 = time.time_ns()
|
||
latencies.append((t1 - t0) / 1e6) # Convert to μs
|
||
|
||
print(f"Median: {np.median(latencies):.1f}μs")
|
||
print(f"P99: {np.percentile(latencies, 99):.1f}μs")
|
||
print(f"Max: {max(latencies):.1f}μs")
|
||
```
|
||
|
||
**Acceptance Criteria**:
|
||
- Median latency < 500μs: ✅ Continue to Phase 2
|
||
- Median latency 500μs-2ms: ⚠️ Optimize further
|
||
- Median latency > 2ms: 🚫 Need Java port
|
||
|
||
---
|
||
|
||
## Phase 2: Shared Memory Bridge (Python → Nautilus)
|
||
|
||
**Duration**: 2-3 weeks
|
||
**Goal**: <100μs Python → Nautilus latency
|
||
**Tech**: mmap / shared memory
|
||
|
||
### Architecture
|
||
```
|
||
┌─────────────────────────────────────────────────────────┐
|
||
│ Python ExtF Service │ Nautilus Rust Core │
|
||
│ │ │
|
||
│ [Poll APIs: 0.5s] │ [Strategy] │
|
||
│ ↓ │ ↓ │
|
||
│ [Update state] │ [Decision] │
|
||
│ ↓ │ ↓ │
|
||
│ ┌──────────────────┐ │ ┌──────────────────┐ │
|
||
│ │ Shared Memory │ │ │ Shared Memory │ │
|
||
│ │ /dev/shm/dolphin │◄───────┼──►│ /dev/shm/dolphin │ │
|
||
│ │ (mmap) │ │ │ (mmap) │ │
|
||
│ └──────────────────┘ │ └──────────────────┘ │
|
||
│ │ │
|
||
│ Write: <1μs │ Read: <1μs │
|
||
└──────────────────────────────┴──────────────────────────┘
|
||
```
|
||
|
||
### Implementation
|
||
|
||
**Python Writer** (`exf_shared_memory.py`):
|
||
```python
|
||
import mmap
|
||
import struct
|
||
import json
|
||
|
||
class ExFSharedMemory:
|
||
"""
|
||
Write ExtF data to shared memory for Nautilus consumption.
|
||
|
||
Format: Binary structured data (not JSON - faster)
|
||
"""
|
||
|
||
def __init__(self, size=4096):
|
||
self.fd = os.open('/dev/shm/dolphin_exf', os.O_CREAT | os.O_RDWR)
|
||
os.ftruncate(self.fd, size)
|
||
self.mm = mmap.mmap(self.fd, size)
|
||
|
||
def write(self, indicators: dict):
|
||
"""
|
||
Write indicators to shared memory.
|
||
|
||
Format: [timestamp:8][n_indicators:4][indicator_data:...]
|
||
"""
|
||
self.mm.seek(0)
|
||
|
||
# Timestamp (ns)
|
||
self.mm.write(struct.pack('Q', time.time_ns()))
|
||
|
||
# Count
|
||
n = len([k for k in indicators if not k.startswith('_')])
|
||
self.mm.write(struct.pack('I', n))
|
||
|
||
# Indicators (name_len, name, value)
|
||
for key, value in indicators.items():
|
||
if key.startswith('_'):
|
||
continue
|
||
if not isinstance(value, (int, float)):
|
||
continue
|
||
|
||
name_bytes = key.encode('utf-8')
|
||
self.mm.write(struct.pack('H', len(name_bytes))) # name_len
|
||
self.mm.write(name_bytes) # name
|
||
self.mm.write(struct.pack('d', float(value))) # value (double)
|
||
|
||
def close(self):
|
||
self.mm.close()
|
||
os.close(self.fd)
|
||
```
|
||
|
||
**Nautilus Reader** (Rust FFI):
|
||
```rust
|
||
// dolphin_exf_reader.rs
|
||
use std::fs::OpenOptions;
|
||
use std::os::unix::fs::OpenOptionsExt;
|
||
use memmap2::MmapMut;
|
||
|
||
pub struct ExFReader {
|
||
mmap: MmapMut,
|
||
}
|
||
|
||
impl ExFReader {
|
||
pub fn new() -> Self {
|
||
let file = OpenOptions::new()
|
||
.read(true)
|
||
.write(true)
|
||
.custom_flags(libc::O_CREAT)
|
||
.open("/dev/shm/dolphin_exf")
|
||
.unwrap();
|
||
|
||
let mmap = unsafe { MmapMut::map_mut(&file).unwrap() };
|
||
Self { mmap }
|
||
}
|
||
|
||
pub fn read(&self) -> ExFData {
|
||
// Zero-copy read from shared memory
|
||
// <1μs latency
|
||
let timestamp = u64::from_le_bytes([
|
||
self.mmap[0], self.mmap[1], self.mmap[2], self.mmap[3],
|
||
self.mmap[4], self.mmap[5], self.mmap[6], self.mmap[7],
|
||
]);
|
||
|
||
// ... parse rest of structure
|
||
|
||
ExFData {
|
||
timestamp,
|
||
indicators: self.parse_indicators(),
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
### Expected Performance
|
||
- Python write: ~500ns
|
||
- Rust read: ~500ns
|
||
- Total latency: ~1μs (vs 10ms with Hazelcast)
|
||
|
||
---
|
||
|
||
## Phase 3: Java Port (If Needed)
|
||
|
||
**Duration**: 1-2 months
|
||
**Goal**: <50μs end-to-end
|
||
**Trigger**: If Phase 2 > 100μs
|
||
|
||
### Architecture
|
||
```
|
||
[Exchange APIs]
|
||
↓
|
||
[Java ExtF Service]
|
||
- Chronicle Queue (IPC)
|
||
- Agrona (data structures)
|
||
- Disruptor (event processing)
|
||
↓
|
||
[Nautilus Rust Core]
|
||
- Native Aeron/UDP reader
|
||
```
|
||
|
||
### Key Libraries
|
||
- **Chronicle Queue**: Persistent IPC, <1μs latency
|
||
- **Agrona**: Lock-free data structures
|
||
- **Disruptor**: 1M+ events/sec
|
||
- **Aeron**: UDP multicast, <50μs network
|
||
|
||
### Implementation Sketch
|
||
```java
|
||
@Service
|
||
public class ExFExecutionService {
|
||
|
||
private final ChronicleQueue queue;
|
||
private final Disruptor<IndicatorEvent> disruptor;
|
||
private final RingBuffer<IndicatorEvent> ringBuffer;
|
||
|
||
public void onIndicatorUpdate(String name, double value) {
|
||
// Lock-free publish to Disruptor
|
||
long seq = ringBuffer.next();
|
||
try {
|
||
IndicatorEvent event = ringBuffer.get(seq);
|
||
event.setName(name);
|
||
event.setValue(value);
|
||
event.setTimestamp(System.nanoTime());
|
||
} finally {
|
||
ringBuffer.publish(seq);
|
||
}
|
||
}
|
||
|
||
@EventHandler
|
||
public void onEvent(IndicatorEvent event, long seq, boolean endOfBatch) {
|
||
// Process and write to Chronicle Queue
|
||
// Nautilus reads from queue
|
||
queue.acquireAppender().writeDocument(w -> {
|
||
w.getValueOut().object(event);
|
||
});
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## Decision Matrix
|
||
|
||
| Phase | Latency | Complexity | When to Use |
|
||
|-------|---------|------------|-------------|
|
||
| 1: Python + HZ | ~5-10ms | Low | Testing, low-frequency trading |
|
||
| 2: Shared Memory | ~100μs | Medium | HFT, fill optimization |
|
||
| 3: Java + Chronicle | ~50μs | High | Ultra-HFT, co-location |
|
||
|
||
## Immediate Next Steps
|
||
|
||
1. **Deploy Python event-driven** (today): `./start_exf.sh restart`
|
||
2. **Test Nautilus integration** (this week): Measure actual latency
|
||
3. **Implement shared memory** (if needed): Target <100μs
|
||
4. **Java port** (if needed): Target <50μs
|
||
|
||
---
|
||
|
||
**Document**: NAUTILUS_INTEGRATION_ROADMAP.md
|
||
**Author**: Kimi, DESTINATION/DOLPHIN Machine dev/prod-Agent
|
||
**Date**: 2026-03-20
|