30 lines
825 B
Python
30 lines
825 B
Python
|
|
|
||
|
|
import pandas as pd
|
||
|
|
from nautilus_trader.core.nautilus_pyo3 import Signal
|
||
|
|
from nautilus_trader.persistence.catalog import ParquetDataCatalog
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
def test_write_signal():
|
||
|
|
catalog_path = Path("test_catalog")
|
||
|
|
catalog_path.mkdir(exist_ok=True)
|
||
|
|
catalog = ParquetDataCatalog(str(catalog_path))
|
||
|
|
|
||
|
|
ts = int(pd.Timestamp("2026-02-18 12:00:00").value)
|
||
|
|
import json
|
||
|
|
sig = Signal(
|
||
|
|
name="test_signal",
|
||
|
|
value=json.dumps({"test": 123}),
|
||
|
|
ts_event=ts,
|
||
|
|
ts_init=ts
|
||
|
|
)
|
||
|
|
|
||
|
|
print(f"Writing signal: {sig}")
|
||
|
|
try:
|
||
|
|
catalog.write_data([sig])
|
||
|
|
print("Successfully wrote signal to catalog")
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Failed to write signal: {e}")
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
test_write_signal()
|