Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ async def summarize(events: list[ConversationEvent]) -> str:

Compaction is local. Each node compresses its own copy of the shared log, and gossip can't resurrect compacted events. There's also `node.compact_conversation(...)` if you'd rather do it by hand.

For nodes that run indefinitely, set `conversation_retention` (seconds) and whole conversations get pruned after going quiet for that long. Events older than the retention window are refused outright, so a pruned conversation can't leak back in through gossip. With retention, compaction, and a SQLite log, a node can run forever on bounded memory and disk.

## Teams

`synapse_p2p.teams` is an optional task layer built on conversation events. A `Team` offers work, `Worker`s claim tasks that match their capabilities, and the team grants each task to the first claimant. Exactly one worker runs each task.
Expand All @@ -214,6 +216,15 @@ async def implement(assignment: Assignment) -> dict:

Each task is one conversation: `task.offer`, `task.claim`, `task.grant`, `task.progress`, then `task.done` or `task.failed`. Every peer can watch it, late joiners can sync it, and long threads compact like anything else. See [`examples/coding_team`](./examples/coding_team) for an architect on one model reviewing work from coders on another.

The layer is built for long-running work:

- Progress events renew a task's **lease** (`Team(lease=300)`). Workers heartbeat automatically while a handler runs, so a task can take hours without any manual progress calls.
- If an assignee dies or goes quiet past its lease, the team re-offers the task. Unclaimed offers are re-announced too, so a worker that joins late still finds existing work. Cap retries with `max_attempts`.
- A team backed by a SQLite log can rebuild its task table after a restart with `team.restore()` — finished tasks come back with their results, unfinished ones get re-offered.
- Set `task_retention` to drop finished tasks after a while, so a team that offers work forever doesn't grow without bound.

Delivery is at-least-once: a partitioned-but-alive worker can mean a task runs twice. The first `task.done` wins.

## Periodic tasks

```python
Expand Down
13 changes: 11 additions & 2 deletions llms.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ node = Node(
conversation_log=SqliteConversationLog("conversations.db"), # default is in-memory
conversation_max_events=100, # auto-compact past this many events
conversation_keep_recent=25, # keep this many recent events verbatim
conversation_retention=86_400, # prune conversations quiet for this many seconds
)

@node.summarizer
Expand All @@ -172,7 +173,7 @@ added = await node.sync_conversation(peer, conversation_id)
summary_event = await node.compact_conversation(conversation_id)
```

Compaction folds older events into one `summary` kind event, preserving the opening `message` event and the recent tail. Compaction is local per node; compacted event ids stay remembered so gossip cannot resurrect them.
Compaction folds older events into one `summary` kind event, preserving the opening `message` event and the recent tail. Compaction is local per node; compacted event ids stay remembered so gossip cannot resurrect them. With `conversation_retention` set, conversations inactive past the window are pruned entirely and events older than the window are refused, so infinitely long-running nodes stay on bounded memory/disk.

## Teams task layer (`synapse_p2p.teams`)

Expand All @@ -182,7 +183,7 @@ Optional layer built entirely on conversation events. A `Team` offers tasks; `Wo
from synapse_p2p.teams import Assignment, Team, TeamTaskError, Worker

# offering side
team = Team(node)
team = Team(node, lease=300, max_attempts=None, task_retention=None)
task = await team.offer("implement the parser", spec={"file": "parser.py"}, requires=["python"])
result = await team.wait(task, timeout=600) # raises TeamTaskError on failure/timeout

Expand All @@ -197,6 +198,14 @@ async def implement(assignment: Assignment):

Event kinds per task conversation (`conversation_id == task id`): `task.offer` → `task.claim` → `task.grant` → `task.progress` → `task.done` or `task.failed`.

Long-running semantics:

- Progress events renew the task lease; `Worker` heartbeats automatically every `renew_interval` seconds while a handler runs, so handlers can take hours.
- If an assignee goes quiet past `lease` (or an offer sits unclaimed), the team re-offers the task; `max_attempts` caps retries and then fails the task. Late-joining workers pick up re-offered work.
- `team.restore()` rebuilds the task table from a durable conversation log after a restart (match by stable node name or id); unfinished tasks are re-offered by the lease reaper.
- `task_retention` prunes finished tasks from memory after that many seconds.
- Delivery is at-least-once; the first `task.done` wins.

## Artifacts and agent cards

```python
Expand Down
42 changes: 39 additions & 3 deletions synapse_p2p/conversations.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,19 @@ async def default_summarizer(events: list[ConversationEvent]) -> str:
if event.kind == SUMMARY_KIND:
previous = str(event.payload.get(SUMMARY_KIND, ""))
if previous:
lines.append(previous)
lines.append(previous[:1500])
continue
preview = json.dumps(event.payload, default=str)
if len(preview) > 200:
preview = preview[:200] + "…"
lines.append(f"{event.peer.name or event.peer.id} [{event.kind}]: {preview}")
header = ", ".join(f"{count} {kind}" for kind, count in sorted(counts.items()))
return f"({header})\n" + "\n".join(lines[-40:])
# Bound the output so repeated compaction of an endless conversation
# converges instead of growing a summary-of-summaries chain.
body = "\n".join(lines[-40:])
if len(body) > 6000:
body = "…" + body[-6000:]
return f"({header})\n{body}"


class BaseConversationLog:
Expand Down Expand Up @@ -73,6 +78,13 @@ def compact(
) -> None:
raise NotImplementedError

def last_activity(self, conversation_id: str) -> float:
raise NotImplementedError

def prune(self, conversation_id: str) -> None:
"""Delete a conversation entirely, including remembered event ids."""
raise NotImplementedError

def close(self) -> None:
pass

Expand All @@ -81,11 +93,13 @@ class MemoryConversationLog(BaseConversationLog):
def __init__(self) -> None:
self._events: dict[str, list[ConversationEvent]] = {}
self._seen: set[str] = set()
self._ids: dict[str, set[str]] = {}

def append(self, event: ConversationEvent) -> bool:
if event.event_id in self._seen:
return False
self._seen.add(event.event_id)
self._ids.setdefault(event.conversation_id, set()).add(event.event_id)
self._events.setdefault(event.conversation_id, []).append(event)
return True

Expand Down Expand Up @@ -117,10 +131,19 @@ def compact(
if event.event_id not in removed
]
self._seen.add(summary.event_id)
self._ids.setdefault(conversation_id, set()).add(summary.event_id)
merged = kept + [summary]
merged.sort(key=lambda event: event.created_at)
self._events[conversation_id] = merged

def last_activity(self, conversation_id: str) -> float:
events = self._events.get(conversation_id, [])
return max((event.created_at for event in events), default=0.0)

def prune(self, conversation_id: str) -> None:
self._events.pop(conversation_id, None)
self._seen -= self._ids.pop(conversation_id, set())


class SqliteConversationLog(BaseConversationLog):
"""Conversation log persisted to a single SQLite file.
Expand Down Expand Up @@ -195,8 +218,10 @@ def compact(
removed_event_ids: list[str],
summary: ConversationEvent,
) -> None:
# Blank the payload of compacted rows: they only exist so gossip
# cannot resurrect the events they deduplicate.
self._db.executemany(
"UPDATE events SET compacted = 1 WHERE event_id = ?",
"UPDATE events SET compacted = 1, data = '' WHERE event_id = ?",
[(event_id,) for event_id in removed_event_ids],
)
self._db.execute(
Expand All @@ -211,5 +236,16 @@ def compact(
)
self._db.commit()

def last_activity(self, conversation_id: str) -> float:
row = self._db.execute(
"SELECT MAX(created_at) FROM events WHERE conversation_id = ?",
(conversation_id,),
).fetchone()
return float(row[0]) if row and row[0] is not None else 0.0

def prune(self, conversation_id: str) -> None:
self._db.execute("DELETE FROM events WHERE conversation_id = ?", (conversation_id,))
self._db.commit()

def close(self) -> None:
self._db.close()
26 changes: 26 additions & 0 deletions synapse_p2p/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def __init__(
conversation_log: BaseConversationLog | None = None,
conversation_max_events: int | None = None,
conversation_keep_recent: int = 25,
conversation_retention: float | None = None,
summarizer: Summarizer | None = None,
) -> None:
self.bind = bind
Expand All @@ -118,6 +119,7 @@ def __init__(
self.conversation_log = conversation_log or MemoryConversationLog()
self.conversation_max_events = conversation_max_events
self.conversation_keep_recent = conversation_keep_recent
self.conversation_retention = conversation_retention
self._summarizer: Summarizer = summarizer or default_summarizer
self._compacting: set[str] = set()
self._background: set[asyncio.Task] = set()
Expand Down Expand Up @@ -488,7 +490,24 @@ async def _reap_stale_peers(self) -> None:
self.peers.pop(peer.id, None)
self._emit_lifecycle("peer.offline", peer)

async def _reap_conversations(self) -> None:
assert self.conversation_retention is not None
cutoff = time.time() - self.conversation_retention
for conversation_id in self.conversation_log.conversations():
if self.conversation_log.last_activity(conversation_id) < cutoff:
self.conversation_log.prune(conversation_id)
self.broadcast_replies.pop(conversation_id, None)
self._emit_lifecycle("conversation.pruned", conversation_id)

def _register_lifecycle_tasks(self) -> None:
if self.conversation_retention is not None:
self.periodic_executor.add_task(
PeriodicTask(
name="_synapse.reap_conversations",
callable=self._reap_conversations,
schedule=every(seconds=min(60.0, max(0.05, self.conversation_retention / 4))),
)
)
if self.heartbeat_interval is None:
return
self.periodic_executor.add_task(
Expand Down Expand Up @@ -582,6 +601,13 @@ def conversations(self) -> list[str]:

def _remember_conversation_event(self, event: ConversationEvent) -> bool:
self._validate_peer_membership(event.peer)
if (
self.conversation_retention is not None
and event.created_at < time.time() - self.conversation_retention
):
# Too old to store: also stops gossip from resurrecting a pruned
# conversation, since anything pruned is by definition this stale.
return False
if not self.conversation_log.append(event):
return False
self.add_peer(event.peer)
Expand Down
Loading
Loading