From 0339bd5aa1b49a92b2c8d509449baa1dbbed7ff8 Mon Sep 17 00:00:00 2001 From: Daniel van Flymen Date: Thu, 2 Jul 2026 21:57:38 +0200 Subject: [PATCH] Convo pruning + Liveness --- README.md | 11 ++ llms.txt | 13 +- synapse_p2p/conversations.py | 42 ++++- synapse_p2p/node.py | 26 ++++ synapse_p2p/teams.py | 198 ++++++++++++++++++++++-- synapse_p2p/tests/test_conversations.py | 81 ++++++++++ synapse_p2p/tests/test_teams.py | 194 +++++++++++++++++++++++ 7 files changed, 545 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 0f274fc..79ebffc 100644 --- a/README.md +++ b/README.md @@ -190,6 +190,8 @@ async def summarize(events: list[ConversationEvent]) -> str: Compaction is local. Each node compresses its own copy of the shared log, and gossip can't resurrect compacted events. There's also `node.compact_conversation(...)` if you'd rather do it by hand. +For nodes that run indefinitely, set `conversation_retention` (seconds) and whole conversations get pruned after going quiet for that long. Events older than the retention window are refused outright, so a pruned conversation can't leak back in through gossip. With retention, compaction, and a SQLite log, a node can run forever on bounded memory and disk. + ## Teams `synapse_p2p.teams` is an optional task layer built on conversation events. A `Team` offers work, `Worker`s claim tasks that match their capabilities, and the team grants each task to the first claimant. Exactly one worker runs each task. @@ -214,6 +216,15 @@ async def implement(assignment: Assignment) -> dict: Each task is one conversation: `task.offer`, `task.claim`, `task.grant`, `task.progress`, then `task.done` or `task.failed`. Every peer can watch it, late joiners can sync it, and long threads compact like anything else. See [`examples/coding_team`](./examples/coding_team) for an architect on one model reviewing work from coders on another. +The layer is built for long-running work: + +- Progress events renew a task's **lease** (`Team(lease=300)`). Workers heartbeat automatically while a handler runs, so a task can take hours without any manual progress calls. +- If an assignee dies or goes quiet past its lease, the team re-offers the task. Unclaimed offers are re-announced too, so a worker that joins late still finds existing work. Cap retries with `max_attempts`. +- A team backed by a SQLite log can rebuild its task table after a restart with `team.restore()` — finished tasks come back with their results, unfinished ones get re-offered. +- Set `task_retention` to drop finished tasks after a while, so a team that offers work forever doesn't grow without bound. + +Delivery is at-least-once: a partitioned-but-alive worker can mean a task runs twice. The first `task.done` wins. + ## Periodic tasks ```python diff --git a/llms.txt b/llms.txt index c0b003e..41543bc 100644 --- a/llms.txt +++ b/llms.txt @@ -159,6 +159,7 @@ node = Node( conversation_log=SqliteConversationLog("conversations.db"), # default is in-memory conversation_max_events=100, # auto-compact past this many events conversation_keep_recent=25, # keep this many recent events verbatim + conversation_retention=86_400, # prune conversations quiet for this many seconds ) @node.summarizer @@ -172,7 +173,7 @@ added = await node.sync_conversation(peer, conversation_id) summary_event = await node.compact_conversation(conversation_id) ``` -Compaction folds older events into one `summary` kind event, preserving the opening `message` event and the recent tail. Compaction is local per node; compacted event ids stay remembered so gossip cannot resurrect them. +Compaction folds older events into one `summary` kind event, preserving the opening `message` event and the recent tail. Compaction is local per node; compacted event ids stay remembered so gossip cannot resurrect them. With `conversation_retention` set, conversations inactive past the window are pruned entirely and events older than the window are refused, so infinitely long-running nodes stay on bounded memory/disk. ## Teams task layer (`synapse_p2p.teams`) @@ -182,7 +183,7 @@ Optional layer built entirely on conversation events. A `Team` offers tasks; `Wo from synapse_p2p.teams import Assignment, Team, TeamTaskError, Worker # offering side -team = Team(node) +team = Team(node, lease=300, max_attempts=None, task_retention=None) task = await team.offer("implement the parser", spec={"file": "parser.py"}, requires=["python"]) result = await team.wait(task, timeout=600) # raises TeamTaskError on failure/timeout @@ -197,6 +198,14 @@ async def implement(assignment: Assignment): Event kinds per task conversation (`conversation_id == task id`): `task.offer` → `task.claim` → `task.grant` → `task.progress` → `task.done` or `task.failed`. +Long-running semantics: + +- Progress events renew the task lease; `Worker` heartbeats automatically every `renew_interval` seconds while a handler runs, so handlers can take hours. +- If an assignee goes quiet past `lease` (or an offer sits unclaimed), the team re-offers the task; `max_attempts` caps retries and then fails the task. Late-joining workers pick up re-offered work. +- `team.restore()` rebuilds the task table from a durable conversation log after a restart (match by stable node name or id); unfinished tasks are re-offered by the lease reaper. +- `task_retention` prunes finished tasks from memory after that many seconds. +- Delivery is at-least-once; the first `task.done` wins. + ## Artifacts and agent cards ```python diff --git a/synapse_p2p/conversations.py b/synapse_p2p/conversations.py index 0967636..66e7c8e 100644 --- a/synapse_p2p/conversations.py +++ b/synapse_p2p/conversations.py @@ -33,14 +33,19 @@ async def default_summarizer(events: list[ConversationEvent]) -> str: if event.kind == SUMMARY_KIND: previous = str(event.payload.get(SUMMARY_KIND, "")) if previous: - lines.append(previous) + lines.append(previous[:1500]) continue preview = json.dumps(event.payload, default=str) if len(preview) > 200: preview = preview[:200] + "…" lines.append(f"{event.peer.name or event.peer.id} [{event.kind}]: {preview}") header = ", ".join(f"{count} {kind}" for kind, count in sorted(counts.items())) - return f"({header})\n" + "\n".join(lines[-40:]) + # Bound the output so repeated compaction of an endless conversation + # converges instead of growing a summary-of-summaries chain. + body = "\n".join(lines[-40:]) + if len(body) > 6000: + body = "…" + body[-6000:] + return f"({header})\n{body}" class BaseConversationLog: @@ -73,6 +78,13 @@ def compact( ) -> None: raise NotImplementedError + def last_activity(self, conversation_id: str) -> float: + raise NotImplementedError + + def prune(self, conversation_id: str) -> None: + """Delete a conversation entirely, including remembered event ids.""" + raise NotImplementedError + def close(self) -> None: pass @@ -81,11 +93,13 @@ class MemoryConversationLog(BaseConversationLog): def __init__(self) -> None: self._events: dict[str, list[ConversationEvent]] = {} self._seen: set[str] = set() + self._ids: dict[str, set[str]] = {} def append(self, event: ConversationEvent) -> bool: if event.event_id in self._seen: return False self._seen.add(event.event_id) + self._ids.setdefault(event.conversation_id, set()).add(event.event_id) self._events.setdefault(event.conversation_id, []).append(event) return True @@ -117,10 +131,19 @@ def compact( if event.event_id not in removed ] self._seen.add(summary.event_id) + self._ids.setdefault(conversation_id, set()).add(summary.event_id) merged = kept + [summary] merged.sort(key=lambda event: event.created_at) self._events[conversation_id] = merged + def last_activity(self, conversation_id: str) -> float: + events = self._events.get(conversation_id, []) + return max((event.created_at for event in events), default=0.0) + + def prune(self, conversation_id: str) -> None: + self._events.pop(conversation_id, None) + self._seen -= self._ids.pop(conversation_id, set()) + class SqliteConversationLog(BaseConversationLog): """Conversation log persisted to a single SQLite file. @@ -195,8 +218,10 @@ def compact( removed_event_ids: list[str], summary: ConversationEvent, ) -> None: + # Blank the payload of compacted rows: they only exist so gossip + # cannot resurrect the events they deduplicate. self._db.executemany( - "UPDATE events SET compacted = 1 WHERE event_id = ?", + "UPDATE events SET compacted = 1, data = '' WHERE event_id = ?", [(event_id,) for event_id in removed_event_ids], ) self._db.execute( @@ -211,5 +236,16 @@ def compact( ) self._db.commit() + def last_activity(self, conversation_id: str) -> float: + row = self._db.execute( + "SELECT MAX(created_at) FROM events WHERE conversation_id = ?", + (conversation_id,), + ).fetchone() + return float(row[0]) if row and row[0] is not None else 0.0 + + def prune(self, conversation_id: str) -> None: + self._db.execute("DELETE FROM events WHERE conversation_id = ?", (conversation_id,)) + self._db.commit() + def close(self) -> None: self._db.close() diff --git a/synapse_p2p/node.py b/synapse_p2p/node.py index 619f219..16d68a0 100644 --- a/synapse_p2p/node.py +++ b/synapse_p2p/node.py @@ -93,6 +93,7 @@ def __init__( conversation_log: BaseConversationLog | None = None, conversation_max_events: int | None = None, conversation_keep_recent: int = 25, + conversation_retention: float | None = None, summarizer: Summarizer | None = None, ) -> None: self.bind = bind @@ -118,6 +119,7 @@ def __init__( self.conversation_log = conversation_log or MemoryConversationLog() self.conversation_max_events = conversation_max_events self.conversation_keep_recent = conversation_keep_recent + self.conversation_retention = conversation_retention self._summarizer: Summarizer = summarizer or default_summarizer self._compacting: set[str] = set() self._background: set[asyncio.Task] = set() @@ -488,7 +490,24 @@ async def _reap_stale_peers(self) -> None: self.peers.pop(peer.id, None) self._emit_lifecycle("peer.offline", peer) + async def _reap_conversations(self) -> None: + assert self.conversation_retention is not None + cutoff = time.time() - self.conversation_retention + for conversation_id in self.conversation_log.conversations(): + if self.conversation_log.last_activity(conversation_id) < cutoff: + self.conversation_log.prune(conversation_id) + self.broadcast_replies.pop(conversation_id, None) + self._emit_lifecycle("conversation.pruned", conversation_id) + def _register_lifecycle_tasks(self) -> None: + if self.conversation_retention is not None: + self.periodic_executor.add_task( + PeriodicTask( + name="_synapse.reap_conversations", + callable=self._reap_conversations, + schedule=every(seconds=min(60.0, max(0.05, self.conversation_retention / 4))), + ) + ) if self.heartbeat_interval is None: return self.periodic_executor.add_task( @@ -582,6 +601,13 @@ def conversations(self) -> list[str]: def _remember_conversation_event(self, event: ConversationEvent) -> bool: self._validate_peer_membership(event.peer) + if ( + self.conversation_retention is not None + and event.created_at < time.time() - self.conversation_retention + ): + # Too old to store: also stops gossip from resurrecting a pruned + # conversation, since anything pruned is by definition this stale. + return False if not self.conversation_log.append(event): return False self.add_peer(event.peer) diff --git a/synapse_p2p/teams.py b/synapse_p2p/teams.py index c71f607..37eb51f 100644 --- a/synapse_p2p/teams.py +++ b/synapse_p2p/teams.py @@ -6,13 +6,27 @@ - ``task.offer`` — a :class:`Team` announces work and its requirements - ``task.claim`` — a :class:`Worker` volunteers for an offered task - ``task.grant`` — the offering team assigns the task to one claimant -- ``task.progress`` — the assignee narrates progress into the shared log +- ``task.progress`` — the assignee narrates progress (and proves liveness) - ``task.done`` — the assignee delivers a result - ``task.failed`` — the assignee reports an error Each task is its own conversation (``conversation_id == task id``), so the full history of a task — offer, claims, progress, result — is one gossiped, durable, compactable thread that any swarm member can watch or sync later. + +Built for long-running work: + +- Progress events renew a task's **lease**. If an assignee goes quiet for + longer than the lease, the team re-offers the task to the swarm. Workers + renew automatically in the background, so a handler can run for hours + without emitting progress by hand. +- Unclaimed offers are re-announced every lease interval, so a worker that + joins the swarm late still finds existing work. +- A team backed by a durable conversation log can rebuild its task table + after a restart with :meth:`Team.restore`. + +Delivery is at-least-once: if an assignee is alive but partitioned past its +lease, the task can run twice. The first ``task.done`` wins. """ from __future__ import annotations @@ -26,7 +40,8 @@ from loguru import logger from synapse_p2p.node import Node, new_nonce -from synapse_p2p.types import ConversationEvent, Peer +from synapse_p2p.schedules import every +from synapse_p2p.types import ConversationEvent, Peer, PeriodicTask TASK_OFFER = "task.offer" TASK_CLAIM = "task.claim" @@ -35,6 +50,8 @@ TASK_DONE = "task.done" TASK_FAILED = "task.failed" +FINISHED = {"done", "failed"} + class TeamTaskError(RuntimeError): """Raised when a task fails or times out while waiting for its result.""" @@ -53,6 +70,9 @@ class TeamTask: result: Any = None error: str | None = None progress: list[dict[str, Any]] = field(default_factory=list) + attempts: int = 1 + last_activity: float = field(default_factory=time.time) + finished_at: float | None = None @dataclass(slots=True) @@ -74,17 +94,44 @@ class Team: """Offer tasks to the swarm and collect results. The team grants each task to the first claimant, so exactly one worker - runs it even when many volunteer. + runs it at a time. ``lease`` is how long a task may sit unclaimed, or an + assignee may stay silent, before the task is offered again. """ - def __init__(self, node: Node) -> None: + def __init__( + self, + node: Node, + *, + lease: float = 300, + max_attempts: int | None = None, + task_retention: float | None = None, + ) -> None: self.node = node + self.lease = lease + self.max_attempts = max_attempts + self.task_retention = task_retention self.tasks: dict[str, TeamTask] = {} self._finished: dict[str, asyncio.Event] = {} node.on(f"conversation.{TASK_CLAIM}")(self._on_claim) node.on(f"conversation.{TASK_PROGRESS}")(self._on_progress) node.on(f"conversation.{TASK_DONE}")(self._on_done) node.on(f"conversation.{TASK_FAILED}")(self._on_failed) + node.periodic_executor.add_task( + PeriodicTask( + name="_teams.reaper", + callable=self._reap, + schedule=every(seconds=max(0.05, lease / 4)), + ) + ) + + def _offer_payload(self, task: TeamTask) -> dict[str, Any]: + return { + "task_id": task.id, + "title": task.title, + "spec": task.spec, + "requires": task.requires, + "attempt": task.attempts, + } async def offer( self, @@ -96,11 +143,7 @@ async def offer( task = TeamTask(id=new_nonce(), title=title, spec=spec or {}, requires=requires or []) self.tasks[task.id] = task self._finished[task.id] = asyncio.Event() - await self.node.emit_conversation_event( - task.id, - TASK_OFFER, - {"task_id": task.id, "title": title, "spec": task.spec, "requires": task.requires}, - ) + await self.node.emit_conversation_event(task.id, TASK_OFFER, self._offer_payload(task)) return task async def wait(self, task: TeamTask, *, timeout: float | None = None) -> Any: @@ -114,12 +157,104 @@ async def wait(self, task: TeamTask, *, timeout: float | None = None) -> Any: raise TeamTaskError(task.error or f"task {task.id} failed") return task.result + def restore(self) -> int: + """Rebuild the task table from the node's conversation log. + + Use after a restart with a durable log (``SqliteConversationLog``) and + a stable node name: only tasks originally offered by a peer matching + this node's id or name are adopted. Unfinished tasks are re-offered by + the reaper once their lease expires. Returns how many tasks were + restored. + """ + restored = 0 + for conversation_id in self.node.conversation_log.conversations(): + if conversation_id in self.tasks: + continue + events = self.node.conversation_log.events(conversation_id) + offers = [event for event in events if event.kind == TASK_OFFER] + if not offers: + continue + origin = offers[0].peer + if origin.id != self.node.node_id and origin.name != self.node.name: + continue + + payload = offers[0].payload + task = TeamTask( + id=conversation_id, + title=str(payload.get("title", "")), + spec=dict(payload.get("spec", {})), + requires=list(payload.get("requires", [])), + attempts=max(len(offers), 1), + last_activity=max(event.created_at for event in events), + ) + self.tasks[task.id] = task + self._finished[task.id] = asyncio.Event() + for event in events: + if event.kind == TASK_GRANT: + task.status = "claimed" + elif event.kind == TASK_PROGRESS: + task.progress.append(dict(event.payload)) + elif event.kind == TASK_DONE and task.status not in FINISHED: + task.status = "done" + task.result = event.payload.get("result") + task.finished_at = event.created_at + self._finished[task.id].set() + elif event.kind == TASK_FAILED and task.status not in FINISHED: + task.status = "failed" + task.error = str(event.payload.get("error", "unknown error")) + task.finished_at = event.created_at + self._finished[task.id].set() + restored += 1 + return restored + + async def _reap(self) -> None: + now = time.time() + for task in list(self.tasks.values()): + if task.status in FINISHED: + if ( + self.task_retention is not None + and task.finished_at is not None + and now - task.finished_at > self.task_retention + ): + self.tasks.pop(task.id, None) + self._finished.pop(task.id, None) + continue + + if now - task.last_activity <= self.lease: + continue + + # Stale: either nobody claimed the offer, or the assignee went + # quiet past its lease. Offer it to the swarm again. + if self.max_attempts is not None and task.attempts >= self.max_attempts: + task.status = "failed" + task.error = f"no worker completed the task after {task.attempts} attempts" + task.finished_at = now + self._finished[task.id].set() + continue + previous = task.assignee.name if task.assignee else None + task.assignee = None + task.status = "offered" + task.attempts += 1 + task.last_activity = now + logger.info( + "Re-offering task {} (attempt {}, previous assignee {})", + task.id, + task.attempts, + previous, + ) + await self.node.emit_conversation_event( + task.id, TASK_OFFER, self._offer_payload(task) + ) + async def _on_claim(self, event: ConversationEvent) -> None: task = self.tasks.get(event.conversation_id) if task is None or task.assignee is not None or task.status != "offered": return + if event.payload.get("attempt") not in (None, task.attempts): + return # a stale claim from an earlier attempt task.assignee = event.peer task.status = "claimed" + task.last_activity = time.time() await self.node.emit_conversation_event( task.id, TASK_GRANT, @@ -129,23 +264,28 @@ async def _on_claim(self, event: ConversationEvent) -> None: async def _on_progress(self, event: ConversationEvent) -> None: task = self.tasks.get(event.conversation_id) - if task is not None: + if task is None: + return + task.last_activity = time.time() + if not event.payload.get("heartbeat"): task.progress.append(dict(event.payload)) async def _on_done(self, event: ConversationEvent) -> None: task = self.tasks.get(event.conversation_id) - if task is None or task.status in {"done", "failed"}: + if task is None or task.status in FINISHED: return task.result = event.payload.get("result") task.status = "done" + task.finished_at = time.time() self._finished[task.id].set() async def _on_failed(self, event: ConversationEvent) -> None: task = self.tasks.get(event.conversation_id) - if task is None or task.status in {"done", "failed"}: + if task is None or task.status in FINISHED: return task.error = str(event.payload.get("error", "unknown error")) task.status = "failed" + task.finished_at = time.time() self._finished[task.id].set() @@ -153,13 +293,22 @@ async def _on_failed(self, event: ConversationEvent) -> None: class Worker: - """Claim offered tasks the node is capable of, and run them when granted.""" + """Claim offered tasks the node is capable of, and run them when granted. + + While a handler runs, the worker emits heartbeat progress events every + ``renew_interval`` seconds so the team's lease stays fresh — a task can + run for hours without any explicit progress calls. + """ - def __init__(self, node: Node, *, claim_timeout: float = 60) -> None: + def __init__( + self, node: Node, *, claim_timeout: float = 60, renew_interval: float = 60 + ) -> None: self.node = node self.claim_timeout = claim_timeout + self.renew_interval = renew_interval self._handler: TaskHandler | None = None self._pending: dict[str, tuple[ConversationEvent, float]] = {} + self._running: set[str] = set() node.on(f"conversation.{TASK_OFFER}")(self._on_offer) node.on(f"conversation.{TASK_GRANT}")(self._on_grant) @@ -184,15 +333,18 @@ async def _on_offer(self, event: ConversationEvent) -> None: return if event.peer.id == self.node.node_id: return + task_id = str(event.payload["task_id"]) + if task_id in self._running: + return # a re-offer for work this worker is already doing if not self._can_do(list(event.payload.get("requires", []))): return - task_id = str(event.payload["task_id"]) self._pending[task_id] = (event, time.time()) await self.node.emit_conversation_event( task_id, TASK_CLAIM, { "task_id": task_id, + "attempt": event.payload.get("attempt"), "capabilities": [capability.name for capability in self.node.capabilities], }, parent_id=event.event_id, @@ -212,10 +364,23 @@ async def _on_grant(self, event: ConversationEvent) -> None: spec=dict(offer.payload.get("spec", {})), node=self.node, ) + self._running.add(task_id) self.node._spawn(self._run(assignment), name=f"task:{task_id}") + async def _renew_lease(self, assignment: Assignment) -> None: + while True: + await asyncio.sleep(self.renew_interval) + await self.node.emit_conversation_event( + assignment.id, + TASK_PROGRESS, + {"task_id": assignment.id, "message": "working", "heartbeat": True}, + ) + async def _run(self, assignment: Assignment) -> None: assert self._handler is not None + renewal = asyncio.create_task( + self._renew_lease(assignment), name=f"task-lease:{assignment.id}" + ) try: result = await self._handler(assignment) except asyncio.CancelledError: @@ -226,6 +391,9 @@ async def _run(self, assignment: Assignment) -> None: assignment.id, TASK_FAILED, {"task_id": assignment.id, "error": str(e)} ) return + finally: + renewal.cancel() + self._running.discard(assignment.id) await self.node.emit_conversation_event( assignment.id, TASK_DONE, {"task_id": assignment.id, "result": result} ) diff --git a/synapse_p2p/tests/test_conversations.py b/synapse_p2p/tests/test_conversations.py index e18a4ea..1e0b40b 100644 --- a/synapse_p2p/tests/test_conversations.py +++ b/synapse_p2p/tests/test_conversations.py @@ -199,3 +199,84 @@ async def test_late_joiner_syncs_conversation_from_peer(): finally: await late.stop() await origin.stop() + + +@pytest.mark.asyncio +async def test_default_summarizer_output_stays_bounded(): + events = [ + ConversationEvent( + conversation_id="conv", + event_id=f"e{index}", + kind="reply", + peer=Peer(id=f"p{index}", address="127.0.0.1", port=1, name=f"peer-{index}"), + payload={"result": "x" * 500}, + ) + for index in range(200) + ] + events.append( + ConversationEvent( + conversation_id="conv", + event_id="old-summary", + kind="summary", + peer=events[0].peer, + payload={"summary": "y" * 50_000}, + ) + ) + text = await default_summarizer(events) + assert len(text) < 10_000 + + +@pytest.mark.parametrize("backend", ["memory", "sqlite"]) +def test_log_prune_removes_events_and_forgets_ids(backend, tmp_path): + log = ( + MemoryConversationLog() + if backend == "memory" + else SqliteConversationLog(tmp_path / "log.db") + ) + event = make_event("a", created_at=123.0) + log.append(event) + assert log.last_activity("conv") == 123.0 + + log.prune("conv") + + assert log.count("conv") == 0 + assert log.conversations() == [] + assert log.seen("a") is False + assert log.append(event) is True # a pruned id can be stored again + log.close() + + +@pytest.mark.asyncio +async def test_node_reaps_inactive_conversations_and_rejects_stale_events(): + import time as time_module + + node = Node( + name="reaper", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + conversation_retention=0.3, + ) + await node.start() + + try: + broadcast = await node.broadcast("team.question", "anyone?") + node.broadcast_replies[broadcast.nonce] = [] + assert node.conversation(broadcast) + + for _ in range(100): + if not node.conversation(broadcast): + break + await asyncio.sleep(0.02) + + assert node.conversation(broadcast) == [] + assert broadcast.nonce not in node.broadcast_replies + + # Events older than the retention window are refused outright, so + # gossip cannot resurrect a pruned conversation. + stale = make_event("stale", created_at=time_module.time() - 60) + stale.peer.swarm = node.swarm + stale.peer.team = node.team + assert node._remember_conversation_event(stale) is False + finally: + await node.stop() diff --git a/synapse_p2p/tests/test_teams.py b/synapse_p2p/tests/test_teams.py index c39dae2..4416953 100644 --- a/synapse_p2p/tests/test_teams.py +++ b/synapse_p2p/tests/test_teams.py @@ -143,3 +143,197 @@ async def implement(assignment: Assignment) -> str: finally: await coder_node.stop() await architect_node.stop() + + +@pytest.mark.asyncio +async def test_dead_assignee_lease_expires_and_task_is_reoffered(): + architect_node = make_node("architect") + dead_node = make_node("dead-coder", ["python"]) + live_node = make_node("live-coder", ["python"]) + team = Team(architect_node, lease=0.3) + + dead_worker = Worker(dead_node, renew_interval=999) + live_worker = Worker(live_node) + + @dead_worker.task + async def hang(assignment: Assignment) -> str: + await asyncio.Event().wait() + return "never" + + @live_worker.task + async def implement(assignment: Assignment) -> str: + return "rescued" + + await architect_node.start() + await dead_node.start() + await connect(architect_node, dead_node) + + try: + task = await team.offer("long job", requires=["python"]) + for _ in range(100): + if task.status == "claimed": + break + await asyncio.sleep(0.02) + assert task.assignee is not None and task.assignee.name == "dead-coder" + + # The healthy worker joins while the first assignee hangs silently. + await live_node.start() + await connect(architect_node, live_node) + + result = await team.wait(task, timeout=5) + assert result == "rescued" + assert task.attempts >= 2 + assert task.assignee is not None and task.assignee.name == "live-coder" + finally: + await live_node.stop() + await dead_node.stop() + await architect_node.stop() + + +@pytest.mark.asyncio +async def test_unclaimed_task_reaches_late_joining_worker(): + architect_node = make_node("architect") + coder_node = make_node("late-coder", ["python"]) + team = Team(architect_node, lease=0.2) + worker = Worker(coder_node) + + @worker.task + async def implement(assignment: Assignment) -> str: + return "late but done" + + await architect_node.start() + + try: + task = await team.offer("waiting job", requires=["python"]) + await asyncio.sleep(0.1) + + await coder_node.start() + await connect(architect_node, coder_node) + + result = await team.wait(task, timeout=5) + assert result == "late but done" + assert task.attempts >= 2 + finally: + await coder_node.stop() + await architect_node.stop() + + +@pytest.mark.asyncio +async def test_task_fails_after_max_attempts_without_workers(): + architect_node = make_node("architect") + team = Team(architect_node, lease=0.1, max_attempts=2) + + await architect_node.start() + try: + task = await team.offer("impossible", requires=["cobol"]) + with pytest.raises(TeamTaskError, match="after 2 attempts"): + await team.wait(task, timeout=5) + finally: + await architect_node.stop() + + +@pytest.mark.asyncio +async def test_worker_heartbeat_keeps_lease_alive_during_long_task(): + architect_node = make_node("architect") + coder_node = make_node("slow-coder", ["python"]) + team = Team(architect_node, lease=0.3) + worker = Worker(coder_node, renew_interval=0.05) + + @worker.task + async def implement(assignment: Assignment) -> str: + await asyncio.sleep(1.0) # much longer than the lease, no manual progress + return "slow and steady" + + await architect_node.start() + await coder_node.start() + await connect(architect_node, coder_node) + + try: + task = await team.offer("marathon", requires=["python"]) + result = await team.wait(task, timeout=5) + + assert result == "slow and steady" + assert task.attempts == 1 # never re-offered + # Heartbeats renewed the lease without polluting recorded progress. + assert all(not entry.get("heartbeat") for entry in task.progress) + finally: + await coder_node.stop() + await architect_node.stop() + + +@pytest.mark.asyncio +async def test_team_restores_task_table_from_durable_log(tmp_path): + from synapse_p2p import SqliteConversationLog + + path = tmp_path / "architect.db" + first_node = Node( + name="architect", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + conversation_log=SqliteConversationLog(path), + ) + coder_node = make_node("coder", ["python"]) + team = Team(first_node) + worker = Worker(coder_node) + + @worker.task + async def implement(assignment: Assignment) -> str: + return "persisted result" + + await first_node.start() + await coder_node.start() + await connect(first_node, coder_node) + + try: + task = await team.offer("durable job", requires=["python"]) + await team.wait(task, timeout=5) + finally: + await coder_node.stop() + await first_node.stop() + first_node.conversation_log.close() + + # The architect restarts with a fresh process but the same log and name. + second_node = Node( + name="architect", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + conversation_log=SqliteConversationLog(path), + ) + restored_team = Team(second_node) + + assert restored_team.restore() == 1 + restored = restored_team.tasks[task.id] + assert restored.status == "done" + assert restored.result == "persisted result" + assert await restored_team.wait(restored, timeout=1) == "persisted result" + second_node.conversation_log.close() + + +@pytest.mark.asyncio +async def test_finished_tasks_are_pruned_after_retention(): + architect_node = make_node("architect") + coder_node = make_node("coder", ["python"]) + team = Team(architect_node, lease=0.1, task_retention=0.2) + worker = Worker(coder_node) + + @worker.task + async def implement(assignment: Assignment) -> str: + return "quick" + + await architect_node.start() + await coder_node.start() + await connect(architect_node, coder_node) + + try: + task = await team.offer("small job", requires=["python"]) + await team.wait(task, timeout=5) + for _ in range(100): + if task.id not in team.tasks: + break + await asyncio.sleep(0.02) + assert task.id not in team.tasks + finally: + await coder_node.stop() + await architect_node.stop()