From 38a356057b7032d4aaef84319e9529674e3fbb71 Mon Sep 17 00:00:00 2001 From: viktorbeck98 Date: Wed, 3 Jun 2026 14:32:46 +0200 Subject: [PATCH 1/6] refactor(persistency): extract _save helper, add event_data_class to metadata Co-Authored-By: Claude Sonnet 4.6 --- .../utils/persistency/persistency_saver.py | 83 ++++++++++--------- tests/test_utils/test_persistency_saver.py | 8 ++ 2 files changed, 50 insertions(+), 41 deletions(-) diff --git a/src/detectmatelibrary/utils/persistency/persistency_saver.py b/src/detectmatelibrary/utils/persistency/persistency_saver.py index 521bdf6..2d888e4 100644 --- a/src/detectmatelibrary/utils/persistency/persistency_saver.py +++ b/src/detectmatelibrary/utils/persistency/persistency_saver.py @@ -42,6 +42,47 @@ def _coerce_event_id(k: str) -> int | str: return k +def _safe_event_data_kwargs(ep: EventPersistency) -> dict[str, Any]: + safe = {} + for k, v in ep.event_data_kwargs.items(): + try: + json.dumps(v) + safe[k] = v + except (TypeError, ValueError): + pass + return safe + + +def _save(ep: EventPersistency, fs: Any, root: str) -> None: + fs.makedirs(f"{root}/events", exist_ok=True) + event_backends: dict[str, str] = {} + event_extensions: dict[str, str] = {} + + for event_id, data_structure in ep.events_data.items(): + backend_name = type(data_structure).__name__ + ext = _EXTENSION_MAP.get(backend_name, "bin") + event_backends[str(event_id)] = backend_name + event_extensions[str(event_id)] = ext + file_path = f"{root}/events/{event_id}.{ext}" + with fs.open(file_path, "wb") as f: + f.write(data_structure.dump()) + + metadata = { + "version": 1, + "saved_at": datetime.now(timezone.utc).isoformat(), + "events_seen": list(ep.events_seen), + "event_templates": {str(k): v for k, v in ep.event_templates.items()}, + "event_backends": event_backends, + "event_extensions": event_extensions, + "event_data_kwargs": _safe_event_data_kwargs(ep), + "event_data_class": ep.event_data_class.__name__, # read back by _load + } + with fs.open(f"{root}/metadata.json", "w") as f: + json.dump(metadata, f, indent=2) + + ep.reset_events_since_save() + + class PersistencyLoadError(Exception): """Raised when restoring persisted state fails.""" @@ -96,35 +137,7 @@ def save(self) -> None: """ with self._lock: try: - self._fs.makedirs(f"{self._root}/events", exist_ok=True) - event_backends: dict[str, str] = {} - event_extensions: dict[str, str] = {} - - for event_id, data_structure in self._persistency.events_data.items(): - backend_name = type(data_structure).__name__ - ext = _EXTENSION_MAP.get(backend_name, "bin") - event_backends[str(event_id)] = backend_name - event_extensions[str(event_id)] = ext - - file_path = f"{self._root}/events/{event_id}.{ext}" - with self._fs.open(file_path, "wb") as f: - f.write(data_structure.dump()) - - metadata = { - "version": 1, - "saved_at": datetime.now(timezone.utc).isoformat(), - "events_seen": list(self._persistency.events_seen), - "event_templates": { - str(k): v for k, v in self._persistency.event_templates.items() - }, - "event_backends": event_backends, - "event_extensions": event_extensions, - "event_data_kwargs": self._safe_event_data_kwargs(), - } - with self._fs.open(f"{self._root}/metadata.json", "w") as f: - json.dump(metadata, f, indent=2) - - self._persistency.reset_events_since_save() + _save(self._persistency, self._fs, self._root) except Exception as e: logger.warning(f"PersistencySaver: save failed — {e}") @@ -189,18 +202,6 @@ def stop(self) -> None: atexit.unregister(self.stop) self.save() - def _safe_event_data_kwargs(self) -> dict[str, Any]: - """Return event_data_kwargs with non-JSON-serializable values - excluded.""" - safe = {} - for k, v in self._persistency.event_data_kwargs.items(): - try: - json.dumps(v) - safe[k] = v - except (TypeError, ValueError): - pass - return safe - def _check_event_count(self) -> None: """Trigger a save when ingested-event count reaches events_until_save.""" diff --git a/tests/test_utils/test_persistency_saver.py b/tests/test_utils/test_persistency_saver.py index f8aad6c..5645774 100644 --- a/tests/test_utils/test_persistency_saver.py +++ b/tests/test_utils/test_persistency_saver.py @@ -131,6 +131,14 @@ def test_load_raises_on_missing_path(self): with pytest.raises(PersistencyLoadError): saver.load() + def test_save_includes_event_data_class_in_metadata(self): + saver, _ = _memory_saver() + saver.save() + fs = fsspec.filesystem("memory") + with fs.open("test/state/metadata.json", "r") as f: + meta = json.load(f) + assert meta["event_data_class"] == "EventDataFrame" + class TestPersistencySaverTriggers: def test_timer_triggers_save(self): From 495769bacba8647663d5afa0c29dd3cc756043f2 Mon Sep 17 00:00:00 2001 From: viktorbeck98 Date: Wed, 3 Jun 2026 14:39:26 +0200 Subject: [PATCH 2/6] refactor(persistency): extract _load helper, restore event_data_class, fix load() thread safety - Add module-level _load helper that mirrors _save - Restore event_data_class from metadata on load - PersistencySaver.load() now acquires self._lock before mutating persistency - Use fs.pipe() for atomic file writes in _save to eliminate read/write race - Add test_load_restores_event_data_class and TestPersistencySaverThreadSafety Co-Authored-By: Claude Sonnet 4.6 --- .../utils/persistency/persistency_saver.py | 81 +++++++++++-------- tests/test_utils/test_persistency_saver.py | 54 +++++++++++++ 2 files changed, 100 insertions(+), 35 deletions(-) diff --git a/src/detectmatelibrary/utils/persistency/persistency_saver.py b/src/detectmatelibrary/utils/persistency/persistency_saver.py index 2d888e4..87d3e76 100644 --- a/src/detectmatelibrary/utils/persistency/persistency_saver.py +++ b/src/detectmatelibrary/utils/persistency/persistency_saver.py @@ -64,8 +64,7 @@ def _save(ep: EventPersistency, fs: Any, root: str) -> None: event_backends[str(event_id)] = backend_name event_extensions[str(event_id)] = ext file_path = f"{root}/events/{event_id}.{ext}" - with fs.open(file_path, "wb") as f: - f.write(data_structure.dump()) + fs.pipe(file_path, data_structure.dump()) metadata = { "version": 1, @@ -77,8 +76,7 @@ def _save(ep: EventPersistency, fs: Any, root: str) -> None: "event_data_kwargs": _safe_event_data_kwargs(ep), "event_data_class": ep.event_data_class.__name__, # read back by _load } - with fs.open(f"{root}/metadata.json", "w") as f: - json.dump(metadata, f, indent=2) + fs.pipe(f"{root}/metadata.json", json.dumps(metadata, indent=2).encode()) ep.reset_events_since_save() @@ -87,6 +85,48 @@ class PersistencyLoadError(Exception): """Raised when restoring persisted state fails.""" +def _load(ep: EventPersistency, fs: Any, root: str) -> None: + meta_path = f"{root}/metadata.json" + if not fs.exists(meta_path): + raise PersistencyLoadError( + f"No saved state found at '{root}' (metadata.json missing)" + ) + try: + with fs.open(meta_path, "r") as f: + metadata = json.load(f) + + ep.events_data = {} + ep.event_templates = {} + + ep.events_seen = set(metadata["events_seen"]) + ep.event_templates = { + _coerce_event_id(k): v for k, v in metadata["event_templates"].items() + } + global_kwargs = metadata.get("event_data_kwargs", {}) + ep.event_data_kwargs = global_kwargs + + for event_id_str, backend_name in metadata["event_backends"].items(): + event_id = _coerce_event_id(event_id_str) + ext = metadata["event_extensions"][event_id_str] + file_path = f"{root}/events/{event_id_str}.{ext}" + with fs.open(file_path, "rb") as f: + data = f.read() + if backend_name not in _BACKEND_REGISTRY: + raise PersistencyLoadError( + f"Unknown backend '{backend_name}' — cannot restore event '{event_id}'" + ) + backend_cls = _BACKEND_REGISTRY[backend_name] + ep.events_data[event_id] = backend_cls.load(data, **global_kwargs) + + class_name = metadata.get("event_data_class") + if class_name and class_name in _BACKEND_REGISTRY: + ep.event_data_class = _BACKEND_REGISTRY[class_name] + except PersistencyLoadError: + raise + except Exception as e: + raise PersistencyLoadError(f"Failed to restore state: {e}") from e + + @dataclass class PersistencySaverConfig: path: str @@ -146,37 +186,8 @@ def load(self) -> None: Raises PersistencyLoadError on failure. """ - meta_path = f"{self._root}/metadata.json" - if not self._fs.exists(meta_path): - raise PersistencyLoadError( - f"No saved state found at '{self._config.path}' (metadata.json missing)" - ) - try: - with self._fs.open(meta_path, "r") as f: - metadata = json.load(f) - - self._persistency.events_seen = set(metadata["events_seen"]) - self._persistency.event_templates = { - _coerce_event_id(k): v for k, v in metadata["event_templates"].items() - } - global_kwargs = metadata.get("event_data_kwargs", {}) - - for event_id_str, backend_name in metadata["event_backends"].items(): - event_id = _coerce_event_id(event_id_str) - ext = metadata["event_extensions"][event_id_str] - file_path = f"{self._root}/events/{event_id_str}.{ext}" - with self._fs.open(file_path, "rb") as f: - data = f.read() - if backend_name not in _BACKEND_REGISTRY: - raise PersistencyLoadError( - f"Unknown backend '{backend_name}' — cannot restore event '{event_id}'" - ) - backend_cls = _BACKEND_REGISTRY[backend_name] - self._persistency.events_data[event_id] = backend_cls.load(data, **global_kwargs) - except PersistencyLoadError: - raise - except Exception as e: - raise PersistencyLoadError(f"Failed to restore state: {e}") from e + with self._lock: + _load(self._persistency, self._fs, self._root) def start(self) -> None: """Start the background save timer and register process-exit hooks.""" diff --git a/tests/test_utils/test_persistency_saver.py b/tests/test_utils/test_persistency_saver.py index 5645774..598c5cc 100644 --- a/tests/test_utils/test_persistency_saver.py +++ b/tests/test_utils/test_persistency_saver.py @@ -139,6 +139,43 @@ def test_save_includes_event_data_class_in_metadata(self): meta = json.load(f) assert meta["event_data_class"] == "EventDataFrame" + def test_load_restores_event_data_class(self): + saver, _ = _memory_saver() + saver.save() + # Start with a different class to verify it gets overwritten + p2 = EventPersistency(event_data_class=EventStabilityTracker) + PersistencySaver(p2, PersistencySaverConfig(path="memory://test/state")).load() + assert p2.event_data_class is EventDataFrame + + def test_load_clears_stale_events_data(self): + """Loading into a non-empty EP must replace, not merge, events_data.""" + saver, _ = _memory_saver() + saver.save() # saves E1 and E2 + + p2 = EventPersistency(event_data_class=EventDataFrame) + # Pre-populate with a key NOT in the saved snapshot + p2.ingest_event(event_id="STALE", event_template="stale", variables=["x"], named_variables={}) + PersistencySaver(p2, PersistencySaverConfig(path="memory://test/state")).load() + + assert "STALE" not in p2.get_events_data() + assert "E1" in p2.get_events_data() + + def test_load_restores_event_data_kwargs(self): + """event_data_kwargs must be written back to ep after load.""" + from detectmatelibrary.utils.persistency.event_data_structures.dataframes import ChunkedEventDataFrame + + p = EventPersistency( + event_data_class=ChunkedEventDataFrame, + event_data_kwargs={"max_rows": 500}, + ) + p.ingest_event(event_id="E1", event_template="t", variables=["v"], named_variables={}) + saver = PersistencySaver(p, PersistencySaverConfig(path="memory://kwargs_test/state")) + saver.save() + + p2 = EventPersistency(event_data_class=ChunkedEventDataFrame) # no kwargs + PersistencySaver(p2, PersistencySaverConfig(path="memory://kwargs_test/state")).load() + assert p2.event_data_kwargs == {"max_rows": 500} + class TestPersistencySaverTriggers: def test_timer_triggers_save(self): @@ -284,3 +321,20 @@ def test_full_cycle_tracker_backend(self): rest = restored_tracker.get_data()[var_name] assert list(rest.change_series) == list(orig.change_series) assert rest.unique_set == orig.unique_set + + +class TestPersistencySaverThreadSafety: + def test_load_with_running_timer_does_not_raise(self): + p = _make_persistency_with_data() + path = "memory://threadsafe_test/state" + # Save initial state + PersistencySaver(p, PersistencySaverConfig(path=path)).save() + # Start a saver with a fast timer + saver = PersistencySaver(p, PersistencySaverConfig(path=path, save_interval_seconds=0)) + saver.start() + time.sleep(0.05) + # Load into a second persistency while first saver's timer is firing + p2 = EventPersistency(event_data_class=EventDataFrame) + PersistencySaver(p2, PersistencySaverConfig(path=path)).load() + saver.stop() + assert "E1" in p2.get_events_seen() From 5060d2bcad6b324e0eba449448ca32ce33852246 Mon Sep 17 00:00:00 2001 From: viktorbeck98 Date: Wed, 3 Jun 2026 14:49:23 +0200 Subject: [PATCH 3/6] feat(persistency): add standalone save/load functions Co-Authored-By: Claude Sonnet 4.6 --- .../utils/persistency/__init__.py | 4 +- .../utils/persistency/persistency_saver.py | 22 ++++++++ tests/test_utils/test_persistency_saver.py | 55 +++++++++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/src/detectmatelibrary/utils/persistency/__init__.py b/src/detectmatelibrary/utils/persistency/__init__.py index fff0f39..171ee9a 100644 --- a/src/detectmatelibrary/utils/persistency/__init__.py +++ b/src/detectmatelibrary/utils/persistency/__init__.py @@ -1,5 +1,5 @@ from .event_persistency import EventPersistency -from .persistency_saver import PersistencySaver, PersistencySaverConfig, PersistencyLoadError +from .persistency_saver import PersistencySaver, PersistencySaverConfig, PersistencyLoadError, save, load from .event_data_structures.base import EventDataStructure from .event_data_structures.dataframes.event_dataframe import EventDataFrame from .event_data_structures.dataframes.chunked_event_dataframe import ChunkedEventDataFrame @@ -14,4 +14,6 @@ "EventDataFrame", "ChunkedEventDataFrame", "EventStabilityTracker", + "save", + "load", ] diff --git a/src/detectmatelibrary/utils/persistency/persistency_saver.py b/src/detectmatelibrary/utils/persistency/persistency_saver.py index 87d3e76..9239552 100644 --- a/src/detectmatelibrary/utils/persistency/persistency_saver.py +++ b/src/detectmatelibrary/utils/persistency/persistency_saver.py @@ -225,3 +225,25 @@ def _check_event_count(self) -> None: def _tick(self) -> None: """Called by the timer thread each interval.""" self.save() + + +def save(ep: EventPersistency, path: str, storage_options: dict[str, Any] | None = None) -> None: + """Save EventPersistency state to an fsspec URI. + + Not thread-safe when called concurrently with a running + PersistencySaver on the same ep. Use CoreDetector.export_state() in + that case. + """ + fs, root = fsspec.url_to_fs(path, **(storage_options or {})) + _save(ep, fs, root) + + +def load(ep: EventPersistency, path: str, storage_options: dict[str, Any] | None = None) -> None: + """Restore EventPersistency state from an fsspec URI. + + Raises PersistencyLoadError if no saved state exists at path. Not + thread-safe when called concurrently with a running PersistencySaver + on the same ep. Use CoreDetector.import_state() in that case. + """ + fs, root = fsspec.url_to_fs(path, **(storage_options or {})) + _load(ep, fs, root) diff --git a/tests/test_utils/test_persistency_saver.py b/tests/test_utils/test_persistency_saver.py index 598c5cc..200182d 100644 --- a/tests/test_utils/test_persistency_saver.py +++ b/tests/test_utils/test_persistency_saver.py @@ -13,6 +13,8 @@ PersistencyLoadError, PersistencySaver, _SaveTimer, + save as standalone_save, + load as standalone_load, ) @@ -323,6 +325,59 @@ def test_full_cycle_tracker_backend(self): assert rest.unique_set == orig.unique_set +class TestStandaloneSaveLoad: + def test_save_creates_metadata(self): + p = _make_persistency_with_data() + standalone_save(p, "memory://standalone_save1/state") + fs = fsspec.filesystem("memory") + assert fs.exists("standalone_save1/state/metadata.json") + + def test_save_creates_event_files(self): + p = _make_persistency_with_data() + standalone_save(p, "memory://standalone_save2/state") + fs = fsspec.filesystem("memory") + assert fs.exists("standalone_save2/state/events/E1.parquet") + assert fs.exists("standalone_save2/state/events/E2.parquet") + + def test_save_resets_events_since_save(self): + p = _make_persistency_with_data() + assert p._events_since_save == 3 + standalone_save(p, "memory://standalone_save3/state") + assert p._events_since_save == 0 + + def test_load_restores_events_seen(self): + p = _make_persistency_with_data() + standalone_save(p, "memory://standalone_load1/state") + p2 = EventPersistency(event_data_class=EventDataFrame) + standalone_load(p2, "memory://standalone_load1/state") + assert "E1" in p2.get_events_seen() + assert "E2" in p2.get_events_seen() + + def test_load_restores_event_data(self): + p = _make_persistency_with_data() + standalone_save(p, "memory://standalone_load2/state") + p2 = EventPersistency(event_data_class=EventDataFrame) + standalone_load(p2, "memory://standalone_load2/state") + assert len(p2.get_event_data("E1")) == 2 + + def test_load_restores_event_data_class(self): + p = _make_persistency_with_data() + standalone_save(p, "memory://standalone_load3/state") + p2 = EventPersistency(event_data_class=EventStabilityTracker) + standalone_load(p2, "memory://standalone_load3/state") + assert p2.event_data_class is EventDataFrame + + def test_load_raises_when_missing(self): + p = EventPersistency(event_data_class=EventDataFrame) + with pytest.raises(PersistencyLoadError): + standalone_load(p, "memory://nonexistent_standalone/state") + + def test_exported_from_package(self): + from detectmatelibrary.utils import persistency + assert callable(persistency.save) + assert callable(persistency.load) + + class TestPersistencySaverThreadSafety: def test_load_with_running_timer_does_not_raise(self): p = _make_persistency_with_data() From ce2a6ac243f948b573727afd9ba032d08945e400 Mon Sep 17 00:00:00 2001 From: viktorbeck98 Date: Wed, 3 Jun 2026 14:53:36 +0200 Subject: [PATCH 4/6] feat(detector): add export_state and import_state methods to CoreDetector Co-Authored-By: Claude Sonnet 4.6 --- src/detectmatelibrary/common/detector.py | 27 ++++++++ .../utils/persistency/persistency_saver.py | 7 ++ .../test_persist_integration.py | 68 +++++++++++++++++++ 3 files changed, 102 insertions(+) diff --git a/src/detectmatelibrary/common/detector.py b/src/detectmatelibrary/common/detector.py index aca671a..145a559 100644 --- a/src/detectmatelibrary/common/detector.py +++ b/src/detectmatelibrary/common/detector.py @@ -179,6 +179,33 @@ def _register_persistency(self, event_persistency: persistency.EventPersistency) self.name, cast(CoreDetectorConfig, self.config), event_persistency ) + def export_state(self, path: str, storage_options: dict[str, Any] | None = None) -> None: + """Save this detector's EventPersistency state to an fsspec URI. + + Not thread-safe when a PersistencySaver is running on this + detector. + """ + ep = getattr(self, "persistency", None) + if ep is None: + raise RuntimeError(f"{self.name}: no persistency configured") + persistency.save(ep, path, storage_options) + + def import_state(self, path: str, storage_options: dict[str, Any] | None = None) -> None: + """Restore this detector's EventPersistency state from an fsspec URI. + + Thread-safe when a PersistencySaver is running: acquires the saver lock + before loading. + """ + ep = getattr(self, "persistency", None) + if ep is None: + raise RuntimeError(f"{self.name}: no persistency configured") + saver = getattr(self, "saver", None) + if saver is not None: + with saver.locked(): + persistency.load(ep, path, storage_options) + else: + persistency.load(ep, path, storage_options) + @override def run( self, input_: List[ParserSchema] | ParserSchema, output_: DetectorSchema # type: ignore diff --git a/src/detectmatelibrary/utils/persistency/persistency_saver.py b/src/detectmatelibrary/utils/persistency/persistency_saver.py index 9239552..6ebcefe 100644 --- a/src/detectmatelibrary/utils/persistency/persistency_saver.py +++ b/src/detectmatelibrary/utils/persistency/persistency_saver.py @@ -2,6 +2,7 @@ import json import signal import threading +from contextlib import contextmanager from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, Callable @@ -213,6 +214,12 @@ def stop(self) -> None: atexit.unregister(self.stop) self.save() + @contextmanager + def locked(self) -> Any: + """Context manager that acquires the internal save lock.""" + with self._lock: + yield + def _check_event_count(self) -> None: """Trigger a save when ingested-event count reaches events_until_save.""" diff --git a/tests/test_detectors/test_persist_integration.py b/tests/test_detectors/test_persist_integration.py index a32767d..7259517 100644 --- a/tests/test_detectors/test_persist_integration.py +++ b/tests/test_detectors/test_persist_integration.py @@ -1,9 +1,13 @@ +import pytest +import fsspec + from detectmatelibrary.detectors.new_value_detector import NewValueDetector, NewValueDetectorConfig from detectmatelibrary.detectors.new_value_combo_detector import ( NewValueComboDetector, NewValueComboDetectorConfig, ) from detectmatelibrary.detectors.new_event_detector import NewEventDetector, NewEventDetectorConfig +from detectmatelibrary.detectors.rule_detector import RuleDetector from detectmatelibrary.common.detector import PersistConfig from detectmatelibrary.utils.persistency.persistency_saver import PersistencySaver @@ -130,3 +134,67 @@ def test_saver_created_when_persist_configured(self): det = NewEventDetector(name="NED1", config=config) assert det.saver is not None det.saver.stop() + + +class TestDetectorExportImportState: + def test_export_state_creates_metadata(self): + det = NewValueDetector( + name="ExportTest", + config=NewValueDetectorConfig(auto_config=False), + ) + det.persistency.ingest_event( + event_id=1, event_template="login <*>", named_variables={"user": "alice"} + ) + det.export_state("memory://export_state_test/state") + assert fsspec.filesystem("memory").exists("export_state_test/state/metadata.json") + + def test_export_state_creates_event_files(self): + det = NewValueDetector( + name="ExportFiles", + config=NewValueDetectorConfig(auto_config=False), + ) + det.persistency.ingest_event( + event_id=2, event_template="logout <*>", named_variables={"user": "bob"} + ) + det.export_state("memory://export_files_test/state") + # NewValueDetector uses EventStabilityTracker → msgpack extension + assert fsspec.filesystem("memory").exists("export_files_test/state/events/2.msgpack") + + def test_import_state_restores_events_seen(self): + det1 = NewValueDetector(name="ImportSrc", config=NewValueDetectorConfig(auto_config=False)) + det1.persistency.ingest_event( + event_id=1, event_template="login <*>", named_variables={"user": "alice"} + ) + det1.export_state("memory://import_state_test/state") + + det2 = NewValueDetector(name="ImportDst", config=NewValueDetectorConfig(auto_config=False)) + det2.import_state("memory://import_state_test/state") + assert 1 in det2.persistency.get_events_seen() + + def test_import_state_with_running_saver_does_not_raise(self): + det1 = NewValueDetector(name="ImportSaverSrc", config=NewValueDetectorConfig(auto_config=False)) + det1.persistency.ingest_event( + event_id=3, event_template="connect <*>", named_variables={"host": "srv1"} + ) + det1.export_state("memory://import_saver_test/state") + + det2 = NewValueDetector( + name="ImportSaverDst", + config=NewValueDetectorConfig( + auto_config=False, + persist=PersistConfig(path="memory://import_saver_dst/state"), + ), + ) + det2.import_state("memory://import_saver_test/state") + det2.saver.stop() + assert 3 in det2.persistency.get_events_seen() + + def test_export_raises_without_persistency(self): + det = RuleDetector() + with pytest.raises(RuntimeError, match="no persistency configured"): + det.export_state("memory://any/path") + + def test_import_raises_without_persistency(self): + det = RuleDetector() + with pytest.raises(RuntimeError, match="no persistency configured"): + det.import_state("memory://any/path") From 4b3cab65e2bd44b0abd32c8075117ea7e66aae15 Mon Sep 17 00:00:00 2001 From: viktorbeck98 Date: Wed, 3 Jun 2026 14:56:52 +0200 Subject: [PATCH 5/6] docs(persistency): document export_state, import_state, and standalone save/load --- docs/auxiliar/persistency.md | 48 +++++++++++++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/docs/auxiliar/persistency.md b/docs/auxiliar/persistency.md index 2b8764c..a371e6f 100644 --- a/docs/auxiliar/persistency.md +++ b/docs/auxiliar/persistency.md @@ -28,10 +28,10 @@ kept. Two families ship today: - **DataFrame backends** (`EventDataFrame`, `ChunkedEventDataFrame`) keeps the - raw rows. Very storage heavy and not recommended for production-ready detectors. + raw rows. Very storage heavy and *not recommended* for production-ready detectors. - **Tracker backends** (`EventStabilityTracker`) keep only derived features - (e.g. "this variable has been constant for the last 10k events"). Use these - when you only need a summary, not the raw history — they cost a fraction of + (e.g. "this variable has been constant for the last 10k events") that are relevant for the detector. Use these + when you only need a summary or a subset of the log's information, not the raw history — they cost a fraction of the memory. All backends implement the same four-method contract: `add_data`, `get_data`, @@ -150,6 +150,48 @@ If `auto_load=True` and no saved state exists, the constructor raises `persistency.PersistencyLoadError` immediately — fail-fast rather than silently starting empty. +#### Exporting and importing state on demand + +For one-shot transfers — e.g. moving trained state to a new environment, or +taking a manual snapshot — use the standalone functions directly: + +```python +from detectmatelibrary.utils import persistency + +# Export: write full state to any fsspec URI +persistency.save(ep, "./snapshots/trained-state") + +# Import: restore state into an existing EventPersistency +persistency.load(ep, "./snapshots/trained-state") +``` + +`save` writes the same format as `PersistencySaver` and resets +`ep._events_since_save`. `load` raises `PersistencyLoadError` if no state +exists at the path, restores `event_data_class` and `event_data_kwargs` from +metadata, and clears any existing state in `ep` before loading. + +Both functions are **not thread-safe** when called concurrently with a running +`PersistencySaver` on the same `ep`. Use the detector-level wrappers below +when a saver is active. + +#### Detector-level export and import + +When working through a detector (the typical path for DetectMateService), use +the methods on the detector object directly — no need to access +`EventPersistency` internals: + +```python +# Export the trained detector's state +detector.export_state("./snapshots/my-detector") + +# Import state into a fresh detector +detector.import_state("./snapshots/my-detector") +``` + +`import_state` is thread-safe: it acquires the saver lock before loading when +a `PersistencySaver` is running. Both methods raise `RuntimeError` if the +detector has no persistency configured. + ### Storage backends (fsspec) `PersistencySaverConfig.path` accepts any URI fsspec understands: a local path From 587265439138a0f9db2eb065b07867b55e8e5eeb Mon Sep 17 00:00:00 2001 From: viktorbeck98 Date: Wed, 3 Jun 2026 15:50:23 +0200 Subject: [PATCH 6/6] feat(persistency): add bytes-based export/import (no-path overload) save(ep) returns a zip archive as bytes; load(ep, bytes) restores from it. export_state() and import_state() on CoreDetector gain the same overload, so callers can transfer state over a network API without touching the filesystem. Co-Authored-By: Claude Sonnet 4.6 --- docs/auxiliar/persistency.md | 28 ++++++--- src/detectmatelibrary/common/detector.py | 21 +++++-- .../utils/persistency/persistency_saver.py | 61 +++++++++++++++++-- .../test_persist_integration.py | 28 +++++++++ tests/test_utils/test_persistency_saver.py | 40 ++++++++++++ 5 files changed, 162 insertions(+), 16 deletions(-) diff --git a/docs/auxiliar/persistency.md b/docs/auxiliar/persistency.md index a371e6f..1ecba9b 100644 --- a/docs/auxiliar/persistency.md +++ b/docs/auxiliar/persistency.md @@ -158,17 +158,25 @@ taking a manual snapshot — use the standalone functions directly: ```python from detectmatelibrary.utils import persistency -# Export: write full state to any fsspec URI +# Export to a file URI persistency.save(ep, "./snapshots/trained-state") -# Import: restore state into an existing EventPersistency +# Export to bytes (no disk I/O — useful when sending state over a network API) +data: bytes = persistency.save(ep) + +# Import from a file URI persistency.load(ep, "./snapshots/trained-state") + +# Import from bytes +persistency.load(ep, data) ``` `save` writes the same format as `PersistencySaver` and resets -`ep._events_since_save`. `load` raises `PersistencyLoadError` if no state -exists at the path, restores `event_data_class` and `event_data_kwargs` from -metadata, and clears any existing state in `ep` before loading. +`ep._events_since_save`. When called without a path it returns the state as a +zip archive in memory. `load` accepts either a path string or the bytes +returned by `save`; it raises `PersistencyLoadError` if no state exists at the +path, restores `event_data_class` and `event_data_kwargs` from metadata, and +clears any existing state in `ep` before loading. Both functions are **not thread-safe** when called concurrently with a running `PersistencySaver` on the same `ep`. Use the detector-level wrappers below @@ -181,11 +189,17 @@ the methods on the detector object directly — no need to access `EventPersistency` internals: ```python -# Export the trained detector's state +# Export to a file URI detector.export_state("./snapshots/my-detector") -# Import state into a fresh detector +# Export to bytes (e.g. for an API response) +data: bytes = detector.export_state() + +# Import from a file URI detector.import_state("./snapshots/my-detector") + +# Import from bytes (e.g. from an API request body) +detector.import_state(data) ``` `import_state` is thread-safe: it acquires the saver lock before loading when diff --git a/src/detectmatelibrary/common/detector.py b/src/detectmatelibrary/common/detector.py index 145a559..b525545 100644 --- a/src/detectmatelibrary/common/detector.py +++ b/src/detectmatelibrary/common/detector.py @@ -179,20 +179,31 @@ def _register_persistency(self, event_persistency: persistency.EventPersistency) self.name, cast(CoreDetectorConfig, self.config), event_persistency ) - def export_state(self, path: str, storage_options: dict[str, Any] | None = None) -> None: - """Save this detector's EventPersistency state to an fsspec URI. + def export_state( + self, + path: str | None = None, + storage_options: dict[str, Any] | None = None, + ) -> bytes | None: + """Save this detector's EventPersistency state. + When path is None, returns the state as bytes (zip archive). + When path is given, writes to that fsspec URI and returns None. Not thread-safe when a PersistencySaver is running on this detector. """ ep = getattr(self, "persistency", None) if ep is None: raise RuntimeError(f"{self.name}: no persistency configured") - persistency.save(ep, path, storage_options) + return persistency.save(ep, path, storage_options) - def import_state(self, path: str, storage_options: dict[str, Any] | None = None) -> None: - """Restore this detector's EventPersistency state from an fsspec URI. + def import_state( + self, + path: str | bytes, + storage_options: dict[str, Any] | None = None, + ) -> None: + """Restore this detector's EventPersistency state. + path may be an fsspec URI string or bytes returned by export_state(). Thread-safe when a PersistencySaver is running: acquires the saver lock before loading. """ diff --git a/src/detectmatelibrary/utils/persistency/persistency_saver.py b/src/detectmatelibrary/utils/persistency/persistency_saver.py index 6ebcefe..491d8d6 100644 --- a/src/detectmatelibrary/utils/persistency/persistency_saver.py +++ b/src/detectmatelibrary/utils/persistency/persistency_saver.py @@ -1,7 +1,9 @@ import atexit +import io import json import signal import threading +import zipfile from contextlib import contextmanager from dataclasses import dataclass, field from datetime import datetime, timezone @@ -128,6 +130,35 @@ def _load(ep: EventPersistency, fs: Any, root: str) -> None: raise PersistencyLoadError(f"Failed to restore state: {e}") from e +def _save_to_bytes(ep: EventPersistency) -> bytes: + """Serialize EP state to a zip archive in memory.""" + # reusing the same _save logic with an in-memory filesystem to avoid code duplication + from fsspec.implementations.memory import MemoryFileSystem + mem_fs = MemoryFileSystem() + root = "s" + _save(ep, mem_fs, root) + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: + for file_path in mem_fs.find(root): + if mem_fs.isfile(file_path): + rel = file_path[len(root) + 1:] + zf.writestr(rel, mem_fs.cat(file_path)) + return buf.getvalue() + + +def _load_from_bytes(ep: EventPersistency, data: bytes) -> None: + """Restore EP state from a zip archive in memory.""" + # reusing the same _save logic with an in-memory filesystem to avoid code duplication + from fsspec.implementations.memory import MemoryFileSystem + mem_fs = MemoryFileSystem() + root = "s" + mem_fs.makedirs(f"{root}/events", exist_ok=True) + with zipfile.ZipFile(io.BytesIO(data)) as zf: + for name in zf.namelist(): + mem_fs.pipe(f"{root}/{name}", zf.read(name)) + _load(ep, mem_fs, root) + + @dataclass class PersistencySaverConfig: path: str @@ -234,23 +265,45 @@ def _tick(self) -> None: self.save() -def save(ep: EventPersistency, path: str, storage_options: dict[str, Any] | None = None) -> None: - """Save EventPersistency state to an fsspec URI. +def save( + ep: EventPersistency, + path: str | None = None, + storage_options: dict[str, Any] | None = None, +) -> bytes | None: + """Save EventPersistency state to an fsspec URI or return bytes. + + When path is None, serialises state to a zip archive and returns the + bytes — no filesystem I/O occurs. When path is given, writes to + that URI and returns None. Not thread-safe when called concurrently with a running PersistencySaver on the same ep. Use CoreDetector.export_state() in that case. """ + if path is None: + return _save_to_bytes(ep) fs, root = fsspec.url_to_fs(path, **(storage_options or {})) _save(ep, fs, root) + return None + +def load( + ep: EventPersistency, + path: str | bytes, + storage_options: dict[str, Any] | None = None, +) -> None: + """Restore EventPersistency state from an fsspec URI or bytes. -def load(ep: EventPersistency, path: str, storage_options: dict[str, Any] | None = None) -> None: - """Restore EventPersistency state from an fsspec URI. + When path is bytes (a zip archive returned by save()), state is + restored directly from memory — no filesystem I/O occurs. When path + is a string URI, state is read from that location. Raises PersistencyLoadError if no saved state exists at path. Not thread-safe when called concurrently with a running PersistencySaver on the same ep. Use CoreDetector.import_state() in that case. """ + if isinstance(path, bytes): + _load_from_bytes(ep, path) + return fs, root = fsspec.url_to_fs(path, **(storage_options or {})) _load(ep, fs, root) diff --git a/tests/test_detectors/test_persist_integration.py b/tests/test_detectors/test_persist_integration.py index 7259517..593f276 100644 --- a/tests/test_detectors/test_persist_integration.py +++ b/tests/test_detectors/test_persist_integration.py @@ -198,3 +198,31 @@ def test_import_raises_without_persistency(self): det = RuleDetector() with pytest.raises(RuntimeError, match="no persistency configured"): det.import_state("memory://any/path") + + def test_export_state_returns_bytes_when_no_path(self): + det = NewValueDetector(name="ExportBytes", config=NewValueDetectorConfig(auto_config=False)) + det.persistency.ingest_event( + event_id=1, event_template="login <*>", named_variables={"user": "alice"} + ) + result = det.export_state() + assert isinstance(result, bytes) + assert len(result) > 0 + + def test_export_state_path_returns_none(self): + det = NewValueDetector(name="ExportNone", config=NewValueDetectorConfig(auto_config=False)) + det.persistency.ingest_event( + event_id=1, event_template="login <*>", named_variables={"user": "alice"} + ) + result = det.export_state("memory://export_none_test/state") + assert result is None + + def test_import_state_accepts_bytes(self): + det1 = NewValueDetector(name="BytesSrc", config=NewValueDetectorConfig(auto_config=False)) + det1.persistency.ingest_event( + event_id=5, event_template="login <*>", named_variables={"user": "alice"} + ) + data = det1.export_state() + + det2 = NewValueDetector(name="BytesDst", config=NewValueDetectorConfig(auto_config=False)) + det2.import_state(data) + assert 5 in det2.persistency.get_events_seen() diff --git a/tests/test_utils/test_persistency_saver.py b/tests/test_utils/test_persistency_saver.py index 200182d..c379d54 100644 --- a/tests/test_utils/test_persistency_saver.py +++ b/tests/test_utils/test_persistency_saver.py @@ -377,6 +377,46 @@ def test_exported_from_package(self): assert callable(persistency.save) assert callable(persistency.load) + def test_save_returns_bytes_when_no_path(self): + p = _make_persistency_with_data() + result = standalone_save(p) + assert isinstance(result, bytes) + assert len(result) > 0 + + def test_save_bytes_is_zip(self): + import zipfile + import io + p = _make_persistency_with_data() + data = standalone_save(p) + assert zipfile.is_zipfile(io.BytesIO(data)) + + def test_save_path_returns_none(self): + p = _make_persistency_with_data() + result = standalone_save(p, "memory://save_returns_none/state") + assert result is None + + def test_load_from_bytes_restores_events_seen(self): + p = _make_persistency_with_data() + data = standalone_save(p) + p2 = EventPersistency(event_data_class=EventDataFrame) + standalone_load(p2, data) + assert "E1" in p2.get_events_seen() + assert "E2" in p2.get_events_seen() + + def test_load_from_bytes_restores_event_data(self): + p = _make_persistency_with_data() + data = standalone_save(p) + p2 = EventPersistency(event_data_class=EventDataFrame) + standalone_load(p2, data) + assert len(p2.get_event_data("E1")) == 2 + + def test_bytes_roundtrip_restores_event_data_class(self): + p = _make_persistency_with_data() + data = standalone_save(p) + p2 = EventPersistency(event_data_class=EventStabilityTracker) + standalone_load(p2, data) + assert p2.event_data_class is EventDataFrame + class TestPersistencySaverThreadSafety: def test_load_with_running_timer_does_not_raise(self):