Skip to content
96 changes: 96 additions & 0 deletions detectmatelibrary_tests/test_detectors/test_persist_integration.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import pytest
import fsspec

from detectmatelibrary.detectors.new_value_detector import NewValueDetector, NewValueDetectorConfig
from detectmatelibrary.detectors.new_value_combo_detector import (
NewValueComboDetector,
NewValueComboDetectorConfig,
)
from detectmatelibrary.detectors.new_event_detector import NewEventDetector, NewEventDetectorConfig
from detectmatelibrary.detectors.rule_detector import RuleDetector
from detectmatelibrary.common.detector import PersistConfig
from detectmatelibrary.utils.persistency.persistency_saver import PersistencySaver

Expand Down Expand Up @@ -130,3 +134,95 @@ def test_saver_created_when_persist_configured(self):
det = NewEventDetector(name="NED1", config=config)
assert det.saver is not None
det.saver.stop()


class TestDetectorExportImportState:
def test_export_state_creates_metadata(self):
det = NewValueDetector(
name="ExportTest",
config=NewValueDetectorConfig(auto_config=False),
)
det.persistency.ingest_event(
event_id=1, event_template="login <*>", named_variables={"user": "alice"}
)
det.export_state("memory://export_state_test/state")
assert fsspec.filesystem("memory").exists("export_state_test/state/metadata.json")

def test_export_state_creates_event_files(self):
det = NewValueDetector(
name="ExportFiles",
config=NewValueDetectorConfig(auto_config=False),
)
det.persistency.ingest_event(
event_id=2, event_template="logout <*>", named_variables={"user": "bob"}
)
det.export_state("memory://export_files_test/state")
# NewValueDetector uses EventStabilityTracker → msgpack extension
assert fsspec.filesystem("memory").exists("export_files_test/state/events/2.msgpack")

def test_import_state_restores_events_seen(self):
det1 = NewValueDetector(name="ImportSrc", config=NewValueDetectorConfig(auto_config=False))
det1.persistency.ingest_event(
event_id=1, event_template="login <*>", named_variables={"user": "alice"}
)
det1.export_state("memory://import_state_test/state")

det2 = NewValueDetector(name="ImportDst", config=NewValueDetectorConfig(auto_config=False))
det2.import_state("memory://import_state_test/state")
assert 1 in det2.persistency.get_events_seen()

def test_import_state_with_running_saver_does_not_raise(self):
det1 = NewValueDetector(name="ImportSaverSrc", config=NewValueDetectorConfig(auto_config=False))
det1.persistency.ingest_event(
event_id=3, event_template="connect <*>", named_variables={"host": "srv1"}
)
det1.export_state("memory://import_saver_test/state")

det2 = NewValueDetector(
name="ImportSaverDst",
config=NewValueDetectorConfig(
auto_config=False,
persist=PersistConfig(path="memory://import_saver_dst/state"),
),
)
det2.import_state("memory://import_saver_test/state")
det2.saver.stop()
assert 3 in det2.persistency.get_events_seen()

def test_export_raises_without_persistency(self):
det = RuleDetector()
with pytest.raises(RuntimeError, match="no persistency configured"):
det.export_state("memory://any/path")

def test_import_raises_without_persistency(self):
det = RuleDetector()
with pytest.raises(RuntimeError, match="no persistency configured"):
det.import_state("memory://any/path")

def test_export_state_returns_bytes_when_no_path(self):
det = NewValueDetector(name="ExportBytes", config=NewValueDetectorConfig(auto_config=False))
det.persistency.ingest_event(
event_id=1, event_template="login <*>", named_variables={"user": "alice"}
)
result = det.export_state()
assert isinstance(result, bytes)
assert len(result) > 0

def test_export_state_path_returns_none(self):
det = NewValueDetector(name="ExportNone", config=NewValueDetectorConfig(auto_config=False))
det.persistency.ingest_event(
event_id=1, event_template="login <*>", named_variables={"user": "alice"}
)
result = det.export_state("memory://export_none_test/state")
assert result is None

def test_import_state_accepts_bytes(self):
det1 = NewValueDetector(name="BytesSrc", config=NewValueDetectorConfig(auto_config=False))
det1.persistency.ingest_event(
event_id=5, event_template="login <*>", named_variables={"user": "alice"}
)
data = det1.export_state()

det2 = NewValueDetector(name="BytesDst", config=NewValueDetectorConfig(auto_config=False))
det2.import_state(data)
assert 5 in det2.persistency.get_events_seen()
157 changes: 157 additions & 0 deletions detectmatelibrary_tests/test_utils/test_persistency_saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
PersistencyLoadError,
PersistencySaver,
_SaveTimer,
save as standalone_save,
load as standalone_load,
)


Expand Down Expand Up @@ -131,6 +133,51 @@ def test_load_raises_on_missing_path(self):
with pytest.raises(PersistencyLoadError):
saver.load()

def test_save_includes_event_data_class_in_metadata(self):
saver, _ = _memory_saver()
saver.save()
fs = fsspec.filesystem("memory")
with fs.open("test/state/metadata.json", "r") as f:
meta = json.load(f)
assert meta["event_data_class"] == "EventDataFrame"

def test_load_restores_event_data_class(self):
saver, _ = _memory_saver()
saver.save()
# Start with a different class to verify it gets overwritten
p2 = EventPersistency(event_data_class=EventStabilityTracker)
PersistencySaver(p2, PersistencySaverConfig(path="memory://test/state")).load()
assert p2.event_data_class is EventDataFrame

def test_load_clears_stale_events_data(self):
"""Loading into a non-empty EP must replace, not merge, events_data."""
saver, _ = _memory_saver()
saver.save() # saves E1 and E2

p2 = EventPersistency(event_data_class=EventDataFrame)
# Pre-populate with a key NOT in the saved snapshot
p2.ingest_event(event_id="STALE", event_template="stale", variables=["x"], named_variables={})
PersistencySaver(p2, PersistencySaverConfig(path="memory://test/state")).load()

assert "STALE" not in p2.get_events_data()
assert "E1" in p2.get_events_data()

def test_load_restores_event_data_kwargs(self):
"""event_data_kwargs must be written back to ep after load."""
from detectmatelibrary.utils.persistency.event_data_structures.dataframes import ChunkedEventDataFrame

p = EventPersistency(
event_data_class=ChunkedEventDataFrame,
event_data_kwargs={"max_rows": 500},
)
p.ingest_event(event_id="E1", event_template="t", variables=["v"], named_variables={})
saver = PersistencySaver(p, PersistencySaverConfig(path="memory://kwargs_test/state"))
saver.save()

p2 = EventPersistency(event_data_class=ChunkedEventDataFrame) # no kwargs
PersistencySaver(p2, PersistencySaverConfig(path="memory://kwargs_test/state")).load()
assert p2.event_data_kwargs == {"max_rows": 500}


class TestPersistencySaverTriggers:
def test_timer_triggers_save(self):
Expand Down Expand Up @@ -276,3 +323,113 @@ def test_full_cycle_tracker_backend(self):
rest = restored_tracker.get_data()[var_name]
assert list(rest.change_series) == list(orig.change_series)
assert rest.unique_set == orig.unique_set


class TestStandaloneSaveLoad:
def test_save_creates_metadata(self):
p = _make_persistency_with_data()
standalone_save(p, "memory://standalone_save1/state")
fs = fsspec.filesystem("memory")
assert fs.exists("standalone_save1/state/metadata.json")

def test_save_creates_event_files(self):
p = _make_persistency_with_data()
standalone_save(p, "memory://standalone_save2/state")
fs = fsspec.filesystem("memory")
assert fs.exists("standalone_save2/state/events/E1.parquet")
assert fs.exists("standalone_save2/state/events/E2.parquet")

def test_save_resets_events_since_save(self):
p = _make_persistency_with_data()
assert p._events_since_save == 3
standalone_save(p, "memory://standalone_save3/state")
assert p._events_since_save == 0

def test_load_restores_events_seen(self):
p = _make_persistency_with_data()
standalone_save(p, "memory://standalone_load1/state")
p2 = EventPersistency(event_data_class=EventDataFrame)
standalone_load(p2, "memory://standalone_load1/state")
assert "E1" in p2.get_events_seen()
assert "E2" in p2.get_events_seen()

def test_load_restores_event_data(self):
p = _make_persistency_with_data()
standalone_save(p, "memory://standalone_load2/state")
p2 = EventPersistency(event_data_class=EventDataFrame)
standalone_load(p2, "memory://standalone_load2/state")
assert len(p2.get_event_data("E1")) == 2

def test_load_restores_event_data_class(self):
p = _make_persistency_with_data()
standalone_save(p, "memory://standalone_load3/state")
p2 = EventPersistency(event_data_class=EventStabilityTracker)
standalone_load(p2, "memory://standalone_load3/state")
assert p2.event_data_class is EventDataFrame

def test_load_raises_when_missing(self):
p = EventPersistency(event_data_class=EventDataFrame)
with pytest.raises(PersistencyLoadError):
standalone_load(p, "memory://nonexistent_standalone/state")

def test_exported_from_package(self):
from detectmatelibrary.utils import persistency
assert callable(persistency.save)
assert callable(persistency.load)

def test_save_returns_bytes_when_no_path(self):
p = _make_persistency_with_data()
result = standalone_save(p)
assert isinstance(result, bytes)
assert len(result) > 0

def test_save_bytes_is_zip(self):
import zipfile
import io
p = _make_persistency_with_data()
data = standalone_save(p)
assert zipfile.is_zipfile(io.BytesIO(data))

def test_save_path_returns_none(self):
p = _make_persistency_with_data()
result = standalone_save(p, "memory://save_returns_none/state")
assert result is None

def test_load_from_bytes_restores_events_seen(self):
p = _make_persistency_with_data()
data = standalone_save(p)
p2 = EventPersistency(event_data_class=EventDataFrame)
standalone_load(p2, data)
assert "E1" in p2.get_events_seen()
assert "E2" in p2.get_events_seen()

def test_load_from_bytes_restores_event_data(self):
p = _make_persistency_with_data()
data = standalone_save(p)
p2 = EventPersistency(event_data_class=EventDataFrame)
standalone_load(p2, data)
assert len(p2.get_event_data("E1")) == 2

def test_bytes_roundtrip_restores_event_data_class(self):
p = _make_persistency_with_data()
data = standalone_save(p)
p2 = EventPersistency(event_data_class=EventStabilityTracker)
standalone_load(p2, data)
assert p2.event_data_class is EventDataFrame


class TestPersistencySaverThreadSafety:
def test_load_with_running_timer_does_not_raise(self):
p = _make_persistency_with_data()
path = "memory://threadsafe_test/state"
# Save initial state
PersistencySaver(p, PersistencySaverConfig(path=path)).save()
# Start a saver with a fast timer
saver = PersistencySaver(p, PersistencySaverConfig(path=path, save_interval_seconds=0))
saver.start()
time.sleep(0.05)
# Load into a second persistency while first saver's timer is firing
p2 = EventPersistency(event_data_class=EventDataFrame)
PersistencySaver(p2, PersistencySaverConfig(path=path)).load()
saver.stop()
assert "E1" in p2.get_events_seen()
62 changes: 59 additions & 3 deletions docs/auxiliar/persistency.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ kept.
Two families ship today:

- **DataFrame backends** (`EventDataFrame`, `ChunkedEventDataFrame`) keeps the
raw rows. Very storage heavy and not recommended for production-ready detectors.
raw rows. Very storage heavy and *not recommended* for production-ready detectors.
- **Tracker backends** (`EventStabilityTracker`) keep only derived features
(e.g. "this variable has been constant for the last 10k events"). Use these
when you only need a summary, not the raw history — they cost a fraction of
(e.g. "this variable has been constant for the last 10k events") that are relevant for the detector. Use these
when you only need a summary or a subset of the log's information, not the raw history — they cost a fraction of
the memory.

All backends implement the same four-method contract: `add_data`, `get_data`,
Expand Down Expand Up @@ -150,6 +150,62 @@ If `auto_load=True` and no saved state exists, the constructor raises
`persistency.PersistencyLoadError` immediately — fail-fast rather than
silently starting empty.

#### Exporting and importing state on demand

For one-shot transfers — e.g. moving trained state to a new environment, or
taking a manual snapshot — use the standalone functions directly:

```python
from detectmatelibrary.utils import persistency

# Export to a file URI
persistency.save(ep, "./snapshots/trained-state")

# Export to bytes (no disk I/O — useful when sending state over a network API)
data: bytes = persistency.save(ep)

# Import from a file URI
persistency.load(ep, "./snapshots/trained-state")

# Import from bytes
persistency.load(ep, data)
```

`save` writes the same format as `PersistencySaver` and resets
`ep._events_since_save`. When called without a path it returns the state as a
zip archive in memory. `load` accepts either a path string or the bytes
returned by `save`; it raises `PersistencyLoadError` if no state exists at the
path, restores `event_data_class` and `event_data_kwargs` from metadata, and
clears any existing state in `ep` before loading.

Both functions are **not thread-safe** when called concurrently with a running
`PersistencySaver` on the same `ep`. Use the detector-level wrappers below
when a saver is active.

#### Detector-level export and import

When working through a detector (the typical path for DetectMateService), use
the methods on the detector object directly — no need to access
`EventPersistency` internals:

```python
# Export to a file URI
detector.export_state("./snapshots/my-detector")

# Export to bytes (e.g. for an API response)
data: bytes = detector.export_state()

# Import from a file URI
detector.import_state("./snapshots/my-detector")

# Import from bytes (e.g. from an API request body)
detector.import_state(data)
```

`import_state` is thread-safe: it acquires the saver lock before loading when
a `PersistencySaver` is running. Both methods raise `RuntimeError` if the
detector has no persistency configured.

### Storage backends (fsspec)

`PersistencySaverConfig.path` accepts any URI fsspec understands: a local path
Expand Down
Loading
Loading