Files
DOLPHIN/prod/docs/NAUTILUS_INTEGRATION_ROADMAP.md
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

9.5 KiB
Executable File

Nautilus Trader Integration Roadmap

Purpose: Connect ExtF (External Factors) to Nautilus Trader execution layer with microsecond-level latency
Current State: Python event-driven (<10ms)
Target: <100μs for HFT execution fills
Stack: Python (ExtF) → [IPC Bridge] → Nautilus Rust Core


Phase 1: Testing (Current - Python)

Duration: 1-2 weeks
Goal: Validate Nautilus integration with current Python ExtF

Implementation

# nautilus_exf_adapter.py
from nautilus_trader.adapters import DataAdapter
from nautilus_trader.model.data import QuoteTick
import hazelcast
import json

class ExFDataAdapter(DataAdapter):
    """
    Feed ExtF data directly into Nautilus.
    
    Latency target: <10ms (Python → Nautilus)
    """
    
    def __init__(self):
        self.hz = hazelcast.HazelcastClient(
            cluster_name="dolphin",
            cluster_members=["localhost:5701"]
        )
        self.last_btc_price = 50000.0
        
    def subscribe_external_factors(self, handler):
        """
        Subscribe to ExtF updates.
        
        Called by Nautilus Rust core at strategy init.
        """
        while True:
            data = self._fetch_latest()
            
            # Convert to Nautilus QuoteTick
            quote = self._to_quote_tick(data)
            
            # Push to Nautilus (goes to Rust core)
            handler(quote)
            
            time.sleep(0.001)  # 1ms poll (1000Hz)
    
    def _fetch_latest(self) -> dict:
        """Fetch from Hazelcast."""
        raw = self.hz.get_map("DOLPHIN_FEATURES").blocking().get("exf_latest")
        return json.loads(raw) if raw else {}
    
    def _to_quote_tick(self, data: dict) -> QuoteTick:
        """
        Convert ExtF indicators to Nautilus QuoteTick.
        
        Uses basis, spread, imbal to construct synthetic order book.
        """
        btc_price = self.last_btc_price
        spread_bps = data.get('spread', 5.0)
        imbal = data.get('imbal_btc', 0.0)
        
        # Adjust bid/ask based on imbalance
        # Positive imbal = more bids = tighter ask
        spread_pct = spread_bps / 10000.0
        half_spread = spread_pct / 2
        
        bid = btc_price * (1 - half_spread * (1 - imbal * 0.1))
        ask = btc_price * (1 + half_spread * (1 + imbal * 0.1))
        
        return QuoteTick(
            instrument_id=BTCUSDT_BINANCE,
            bid_price=Price(bid, 2),
            ask_price=Price(ask, 2),
            bid_size=Quantity(1.0, 8),
            ask_size=Quantity(1.0, 8),
            ts_event=time.time_ns(),
            ts_init=time.time_ns(),
        )

Metrics to Measure

# Measure actual latency
import time

latencies = []
for _ in range(1000):
    t0 = time.time_ns()
    data = hz.get_map("DOLPHIN_FEATURES").blocking().get("exf_latest")
    parsed = json.loads(data)
    t1 = time.time_ns()
    latencies.append((t1 - t0) / 1e6)  # Convert to μs

print(f"Median: {np.median(latencies):.1f}μs")
print(f"P99: {np.percentile(latencies, 99):.1f}μs")
print(f"Max: {max(latencies):.1f}μs")

Acceptance Criteria:

  • Median latency < 500μs: Continue to Phase 2
  • Median latency 500μs-2ms: ⚠️ Optimize further
  • Median latency > 2ms: 🚫 Need Java port

Phase 2: Shared Memory Bridge (Python → Nautilus)

Duration: 2-3 weeks
Goal: <100μs Python → Nautilus latency
Tech: mmap / shared memory

Architecture

┌─────────────────────────────────────────────────────────┐
│  Python ExtF Service         │   Nautilus Rust Core     │
│                              │                          │
│  [Poll APIs: 0.5s]           │   [Strategy]             │
│         ↓                    │         ↓                │
│  [Update state]              │   [Decision]             │
│         ↓                    │         ↓                │
│  ┌──────────────────┐        │   ┌──────────────────┐   │
│  │ Shared Memory    │        │   │ Shared Memory    │   │
│  │ /dev/shm/dolphin │◄───────┼──►│ /dev/shm/dolphin │   │
│  │ (mmap)           │        │   │ (mmap)           │   │
│  └──────────────────┘        │   └──────────────────┘   │
│                              │                          │
│  Write: <1μs                 │   Read: <1μs             │
└──────────────────────────────┴──────────────────────────┘

Implementation

Python Writer (exf_shared_memory.py):

import mmap
import struct
import json

class ExFSharedMemory:
    """
    Write ExtF data to shared memory for Nautilus consumption.
    
    Format: Binary structured data (not JSON - faster)
    """
    
    def __init__(self, size=4096):
        self.fd = os.open('/dev/shm/dolphin_exf', os.O_CREAT | os.O_RDWR)
        os.ftruncate(self.fd, size)
        self.mm = mmap.mmap(self.fd, size)
        
    def write(self, indicators: dict):
        """
        Write indicators to shared memory.
        
        Format: [timestamp:8][n_indicators:4][indicator_data:...]
        """
        self.mm.seek(0)
        
        # Timestamp (ns)
        self.mm.write(struct.pack('Q', time.time_ns()))
        
        # Count
        n = len([k for k in indicators if not k.startswith('_')])
        self.mm.write(struct.pack('I', n))
        
        # Indicators (name_len, name, value)
        for key, value in indicators.items():
            if key.startswith('_'):
                continue
            if not isinstance(value, (int, float)):
                continue
                
            name_bytes = key.encode('utf-8')
            self.mm.write(struct.pack('H', len(name_bytes)))  # name_len
            self.mm.write(name_bytes)                          # name
            self.mm.write(struct.pack('d', float(value)))      # value (double)
    
    def close(self):
        self.mm.close()
        os.close(self.fd)

Nautilus Reader (Rust FFI):

// dolphin_exf_reader.rs
use std::fs::OpenOptions;
use std::os::unix::fs::OpenOptionsExt;
use memmap2::MmapMut;

pub struct ExFReader {
    mmap: MmapMut,
}

impl ExFReader {
    pub fn new() -> Self {
        let file = OpenOptions::new()
            .read(true)
            .write(true)
            .custom_flags(libc::O_CREAT)
            .open("/dev/shm/dolphin_exf")
            .unwrap();
            
        let mmap = unsafe { MmapMut::map_mut(&file).unwrap() };
        Self { mmap }
    }
    
    pub fn read(&self) -> ExFData {
        // Zero-copy read from shared memory
        // <1μs latency
        let timestamp = u64::from_le_bytes([
            self.mmap[0], self.mmap[1], self.mmap[2], self.mmap[3],
            self.mmap[4], self.mmap[5], self.mmap[6], self.mmap[7],
        ]);
        
        // ... parse rest of structure
        
        ExFData {
            timestamp,
            indicators: self.parse_indicators(),
        }
    }
}

Expected Performance

  • Python write: ~500ns
  • Rust read: ~500ns
  • Total latency: ~1μs (vs 10ms with Hazelcast)

Phase 3: Java Port (If Needed)

Duration: 1-2 months
Goal: <50μs end-to-end
Trigger: If Phase 2 > 100μs

Architecture

[Exchange APIs]
      ↓
[Java ExtF Service]
  - Chronicle Queue (IPC)
  - Agrona (data structures)
  - Disruptor (event processing)
      ↓
[Nautilus Rust Core]
  - Native Aeron/UDP reader

Key Libraries

  • Chronicle Queue: Persistent IPC, <1μs latency
  • Agrona: Lock-free data structures
  • Disruptor: 1M+ events/sec
  • Aeron: UDP multicast, <50μs network

Implementation Sketch

@Service
public class ExFExecutionService {
    
    private final ChronicleQueue queue;
    private final Disruptor<IndicatorEvent> disruptor;
    private final RingBuffer<IndicatorEvent> ringBuffer;
    
    public void onIndicatorUpdate(String name, double value) {
        // Lock-free publish to Disruptor
        long seq = ringBuffer.next();
        try {
            IndicatorEvent event = ringBuffer.get(seq);
            event.setName(name);
            event.setValue(value);
            event.setTimestamp(System.nanoTime());
        } finally {
            ringBuffer.publish(seq);
        }
    }
    
    @EventHandler
    public void onEvent(IndicatorEvent event, long seq, boolean endOfBatch) {
        // Process and write to Chronicle Queue
        // Nautilus reads from queue
        queue.acquireAppender().writeDocument(w -> {
            w.getValueOut().object(event);
        });
    }
}

Decision Matrix

Phase Latency Complexity When to Use
1: Python + HZ ~5-10ms Low Testing, low-frequency trading
2: Shared Memory ~100μs Medium HFT, fill optimization
3: Java + Chronicle ~50μs High Ultra-HFT, co-location

Immediate Next Steps

  1. Deploy Python event-driven (today): ./start_exf.sh restart
  2. Test Nautilus integration (this week): Measure actual latency
  3. Implement shared memory (if needed): Target <100μs
  4. Java port (if needed): Target <50μs

Document: NAUTILUS_INTEGRATION_ROADMAP.md
Author: Kimi, DESTINATION/DOLPHIN Machine dev/prod-Agent
Date: 2026-03-20