feat: 增强版的SubAgent功能#7108
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces an enhanced dynamic subagent management system, allowing the main agent to create, manage, and communicate with specialized subagents. Key additions include a DynamicSubAgentManager for lifecycle and shared context handling, a dedicated SubAgentLogger, and integration into the main agent's toolset and prompt construction. Feedback focuses on improving error handling by avoiding broad exception silences, ensuring safe string formatting for system prompts, and refining the logic for detecting subagent creation failures. A minor typo in the subagent capability prompt was also identified.
There was a problem hiding this comment.
Hey - I've found 6 issues, and left some high level feedback:
- In
DynamicSubAgentManager.update_subagent_history, you mutate thecurrent_messageslist (removing system messages) while iterating it, which can skip elements; consider building a new filtered list instead of modifying in-place during iteration. _do_handoff_backgroundusesDynamicSubAgentManagerwithout importing it in that scope, which will raise aNameErrorat runtime; add the appropriate import (similar to_execute_handoff) before calling its methods.- Several places access
session.subagent_status[agent_name]directly (e.g., inget_subagent_status,create_pending_subagent_task, andWaitForSubagentTool), which can throwKeyErrorif the status was never set; consider using.get()with a sensible default or checking membership before indexing.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `DynamicSubAgentManager.update_subagent_history`, you mutate the `current_messages` list (removing system messages) while iterating it, which can skip elements; consider building a new filtered list instead of modifying in-place during iteration.
- `_do_handoff_background` uses `DynamicSubAgentManager` without importing it in that scope, which will raise a `NameError` at runtime; add the appropriate import (similar to `_execute_handoff`) before calling its methods.
- Several places access `session.subagent_status[agent_name]` directly (e.g., in `get_subagent_status`, `create_pending_subagent_task`, and `WaitForSubagentTool`), which can throw `KeyError` if the status was never set; consider using `.get()` with a sensible default or checking membership before indexing.
## Individual Comments
### Comment 1
<location path="astrbot/core/dynamic_subagent_manager.py" line_range="269-275" />
<code_context>
+ if agent_name not in session.subagent_histories:
+ session.subagent_histories[agent_name] = []
+
+ if isinstance(current_messages, list):
+ _MAX_TOOL_RESULT_LEN = 2000
+ for msg in current_messages:
+ if (
+ isinstance(msg, dict) and msg.get("role") == "system"
+ ): # 移除system消息
+ current_messages.remove(msg)
+ # 对过长的 tool 结果做截断,避免单条消息占用过多空间
+ if (
</code_context>
<issue_to_address>
**issue (bug_risk):** Avoid mutating `current_messages` while iterating over it when cleaning system/tool messages.
In `update_subagent_history`, the loop both iterates over and mutates `current_messages` via `current_messages.remove(msg)`, which can cause items to be skipped and unintentionally alters the caller’s list (e.g. `runner_messages`). Instead, build a separate `cleaned_msgs` list and append only non-system messages, truncating long tool messages as needed, then extend `session.subagent_histories[agent_name]` with `cleaned_msgs` rather than the original list.
</issue_to_address>
### Comment 2
<location path="astrbot/core/dynamic_subagent_manager.py" line_range="1544-1551" />
<code_context>
+
+ if status == "IDLE":
+ return f"Error: SubAgent '{subagent_name}' is running no tasks."
+ elif status == "COMPLETED":
+ result = DynamicSubAgentManager.get_subagent_result(
+ session_id, subagent_name, task_id
+ )
+ if result and (result.result != "" or result.completed_at > 0):
+ return f"SubAgent '{result.agent_name}' execution completed\n Task id: {result.task_id}\n Execution time: {result.execution_time:.1f}s\n--- Result ---\n{result.result}\n"
+ else:
+ return f"SubAgent '{result.agent_name}' execution completed with empty results. \n Task id: {result.task_id}\n Execution time: {result.execution_time:.1f}s\n"
+ elif status == "FAILED":
+ result = DynamicSubAgentManager.get_subagent_result(
</code_context>
<issue_to_address>
**issue (bug_risk):** Guard against `result` being `None` in `WaitForSubagentTool` before dereferencing.
In the `COMPLETED` (and similarly `FAILED`) branch, `result` may be `None`. In that case the `if result and ...` condition is false and the `else` dereferences `result.agent_name`, causing an exception. Handle the `None` case first, e.g.:
```python
result = DynamicSubAgentManager.get_subagent_result(...)
if not result:
return (
f"Error: No result found for SubAgent '{subagent_name}' "
f"task {task_id or '(latest)'} despite status {status}."
)
if result.result != "" or result.completed_at > 0:
...
else:
...
```
</issue_to_address>
### Comment 3
<location path="astrbot/core/astr_agent_tool_exec.py" line_range="503-507" />
<code_context>
async def _run_handoff_in_background() -> None:
try:
await cls._do_handoff_background(
tool=tool,
run_context=run_context,
- task_id=task_id,
+ task_id=original_task_id,
+ subagent_task_id=subagent_task_id,
**tool_args,
)
</code_context>
<issue_to_address>
**question (bug_risk):** Passing `subagent_task_id` through `**tool_args` may leak an unexpected parameter into the subagent tool call.
`subagent_task_id` is added to `tool_args` in `_execute_handoff_background`, then `_do_handoff_background` passes `**tool_args` to `_execute_handoff`, so it becomes an argument to the underlying handoff tool. If that tool’s schema doesn’t declare `subagent_task_id`, this may cause validation failures or be silently dropped. If the subagent doesn’t need this value, pop it from `tool_args` and use it only for bookkeeping; if it does, ensure it’s documented and included in the tool schema.
</issue_to_address>
### Comment 4
<location path="astrbot/core/dynamic_subagent_manager.py" line_range="776" />
<code_context>
+ return list(session.handoff_tools.values())
+
+ @classmethod
+ def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
+ """为 SubAgent 创建一个 pending 任务,返回 task_id
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider introducing small internal helpers to centralize background task result handling and status access, replacing the current scattered direct dict manipulations with reusable abstractions.
A lot of the added complexity comes from repeated, ad‑hoc manipulation of `session.subagent_background_results` and `session.subagent_status` across multiple methods. You can significantly reduce cognitive load with a small, local abstraction without changing behavior.
### 1. Centralize background task result logic
These methods currently reimplement similar logic over `session.subagent_background_results`:
- `create_pending_subagent_task`
- `get_pending_subagent_tasks`
- `get_latest_task_id`
- `store_subagent_result`
- `get_subagent_result`
- `has_subagent_result`
- `clear_subagent_result`
You can introduce a very small internal helper to encapsulate result access and “completed” checks, and then reuse it. For example:
```python
# inside DynamicSubAgentManager
@classmethod
def _ensure_task_store(cls, session: DynamicSubAgentSession, agent_name: str) -> dict[str, SubAgentExecutionResult]:
if agent_name not in session.subagent_background_results:
session.subagent_background_results[agent_name] = {}
return session.subagent_background_results[agent_name]
@staticmethod
def _is_task_completed(result: SubAgentExecutionResult) -> bool:
return (result.result != "" and result.result is not None) or result.completed_at > 0
```
Then, for example, `get_pending_subagent_tasks` and `get_subagent_result` become much simpler and more uniform:
```python
@classmethod
def get_pending_subagent_tasks(cls, session_id: str, agent_name: str) -> list[str]:
session = cls.get_session(session_id)
if not session:
return []
store = session.subagent_background_results.get(agent_name)
if not store:
return []
pending = [tid for tid, res in store.items() if not cls._is_task_completed(res)]
return sorted(pending, key=lambda tid: store[tid].created_at)
```
```python
@classmethod
def get_subagent_result(
cls, session_id: str, agent_name: str, task_id: str | None = None
) -> SubAgentExecutionResult | None:
session = cls.get_session(session_id)
if not session:
return None
store = session.subagent_background_results.get(agent_name)
if not store:
return None
if task_id is None:
completed = [res for res in store.values() if cls._is_task_completed(res)]
if not completed:
return None
return max(completed, key=lambda r: r.created_at)
return store.get(task_id)
```
`store_subagent_result`, `has_subagent_result`, and `clear_subagent_result` can likewise be rewritten to go through `_ensure_task_store` and `_is_task_completed`. This keeps all functionality but removes duplicated filter/sorting logic and makes status/result flows easier to reason about.
### 2. Make subagent status access safe and consistent
Currently `get_subagent_status` assumes the key exists:
```python
@classmethod
def get_subagent_status(cls, session_id: str, agent_name: str) -> str:
session = cls.get_session(session_id)
return session.subagent_status[agent_name]
```
A small helper avoids scattered `KeyError` assumptions and defensive checks elsewhere:
```python
@classmethod
def get_subagent_status(cls, session_id: str, agent_name: str) -> str:
session = cls.get_session(session_id)
if not session:
return "UNKNOWN"
return session.subagent_status.get(agent_name, "UNKNOWN")
```
Then consumers like `ListDynamicSubagentsTool.call` and `WaitForSubagentTool.call` can rely on this instead of touching the dict shape implicitly. This reduces cross‑coupling on internal dict structure and status keys while preserving all existing behavior.
</issue_to_address>
### Comment 5
<location path="astrbot/core/astr_agent_tool_exec.py" line_range="25" />
<code_context>
BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT,
)
from astrbot.core.cron.events import CronMessageEvent
+from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager
from astrbot.core.message.components import Image
from astrbot.core.message.message_event_result import (
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting DynamicSubAgentManager-related logic and subagent orchestration into small helper functions to reduce duplication and make the main methods easier to read and maintain.
You can keep the new functionality but reduce the complexity and repetition with a few small helpers and a single DynamicSubAgentManager access point.
### 1. Centralize `DynamicSubAgentManager` access
You currently `from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager` multiple times and mix that with other symbols (e.g. `SEND_SHARED_CONTEXT_TOOL`).
Pull this into a module-level helper so the core methods aren’t cluttered with dynamic imports and try/except:
```python
# near the top of the module
try:
from astrbot.core import dynamic_subagent_manager as _dsm_mod
except Exception: # noqa: BLE001
_dsm_mod = None
def _get_dynamic_subagent_manager():
return getattr(_dsm_mod, "DynamicSubAgentManager", None)
def _get_send_shared_context_tool():
return getattr(_dsm_mod, "SEND_SHARED_CONTEXT_TOOL", None)
```
Then in the methods:
```python
toolset = cls._build_handoff_toolset(run_context, tool.agent.tools)
dsm = _get_dynamic_subagent_manager()
send_shared_context_tool = _get_send_shared_context_tool()
session_id = event.unified_msg_origin
if dsm and send_shared_context_tool:
session = dsm.get_session(session_id)
if session and session.shared_context_enabled:
toolset.add_tool(send_shared_context_tool)
```
This removes repetitive imports and exception handling from the hot paths.
---
### 2. Extract subagent history loading logic
The inlined history loading in `_execute_handoff` is long and includes type checks and try/excepts. Extract it:
```python
def _load_subagent_history(umo: str, agent_name: str | None) -> list[Message]:
dsm = _get_dynamic_subagent_manager()
if not (dsm and agent_name):
return []
history: list[Message] = []
try:
stored_history = dsm.get_subagent_history(umo, agent_name)
if not stored_history:
return []
for hist_msg in stored_history:
try:
if isinstance(hist_msg, Message):
history.append(hist_msg)
elif isinstance(hist_msg, dict):
history.append(Message.model_validate(hist_msg))
except Exception:
continue
if history:
logger.debug(
"[SubAgentHistory] Loaded %d history messages for %s",
len(history),
agent_name,
)
except Exception as e: # noqa: BLE001
logger.warning(
"[SubAgentHistory] Failed to load history for %s: %s",
agent_name,
e,
)
return history
```
Usage in `_execute_handoff`:
```python
agent_name = getattr(tool.agent, "name", None)
subagent_history = _load_subagent_history(umo, agent_name)
if subagent_history:
contexts = (subagent_history + (contexts or [])) if contexts else subagent_history
```
This keeps `_execute_handoff` focused on orchestration rather than the details of history storage.
---
### 3. Extract subagent system prompt construction
You build `subagent_system_prompt` inline and re-import `DynamicSubAgentManager` again. Move this into a helper:
```python
def _build_subagent_system_prompt(
tool: HandoffTool,
umo: str,
prov_settings: dict,
) -> str:
agent_name = getattr(tool.agent, "name", None)
base = tool.agent.instructions or ""
system_prompt = f"# Role\nYour name is {agent_name}(used for tool calling)\n{base}\n"
dsm = _get_dynamic_subagent_manager()
if not (dsm and agent_name):
return system_prompt
try:
runtime = prov_settings.get("computer_use_runtime", "local")
static_prompt = dsm.build_static_subagent_prompts(umo, agent_name)
dynamic_prompt = dsm.build_dynamic_subagent_prompts(umo, agent_name, runtime)
system_prompt += static_prompt
system_prompt += dynamic_prompt
except Exception:
pass
return system_prompt
```
Then in `_execute_handoff`:
```python
prov_settings: dict = ctx.get_config(umo=umo).get("provider_settings", {})
subagent_system_prompt = _build_subagent_system_prompt(tool, umo, prov_settings)
runner_messages: list[Message] = []
llm_resp = await ctx.tool_loop_agent(
event=event,
chat_provider_id=prov_id,
prompt=input_,
image_urls=image_urls,
system_prompt=subagent_system_prompt,
tools=toolset,
contexts=contexts,
max_steps=agent_max_step,
tool_call_timeout=run_context.tool_call_timeout,
stream=stream,
runner_messages=runner_messages,
)
```
History persistence can also be extracted:
```python
def _persist_subagent_history(umo: str, agent_name: str | None, runner_messages: list[Message]) -> None:
if not (agent_name and runner_messages):
return
dsm = _get_dynamic_subagent_manager()
if not dsm:
return
try:
dsm.update_subagent_history(umo, agent_name, runner_messages)
except Exception:
pass
```
And called after `tool_loop_agent`.
---
### 4. Extract enhanced background-task registration and user message
The enhanced branch in `_execute_handoff_background` mixes session lookup, task creation, status, and user text. Isolate it:
```python
def _register_enhanced_subagent_task(
umo: str,
agent_name: str | None,
) -> str | None:
dsm = _get_dynamic_subagent_manager()
if not (dsm and agent_name):
return None
try:
session = dsm.get_session(umo)
if not (session and agent_name in session.subagents):
return None
subagent_task_id = dsm.create_pending_subagent_task(
session_id=umo, agent_name=agent_name
)
if subagent_task_id.startswith("__PENDING_TASK_CREATE_FAILED__"):
logger.info(
"[EnhancedSubAgent:BackgroundTask] Failed to create background task %s for %s",
subagent_task_id,
agent_name,
)
return None
dsm.set_subagent_status(
session_id=umo, agent_name=agent_name, status="RUNNING"
)
logger.info(
"[EnhancedSubAgent:BackgroundTask] Created background task %s for %s",
subagent_task_id,
agent_name,
)
return subagent_task_id
except Exception as e: # noqa: BLE001
logger.info(
"[EnhancedSubAgent:BackgroundTask] Failed to create background task for %s: %s",
agent_name,
e,
)
return None
```
```python
def _build_background_submission_message(
agent_name: str | None,
original_task_id: str,
subagent_task_id: str | None,
) -> mcp.types.TextContent:
if subagent_task_id:
text = (
f"Background task submitted. subagent_task_id={subagent_task_id}. "
f"SubAgent '{agent_name}' is working on the task. "
f"Use wait_for_subagent(subagent_name='{agent_name}', task_id='{subagent_task_id}') to get the result."
)
else:
text = (
f"Background task submitted. task_id={original_task_id}. "
f"SubAgent '{agent_name}' is working on the task. "
"You will be notified when it finishes."
)
return mcp.types.TextContent(type="text", text=text)
```
Then `_execute_handoff_background` reduces to:
```python
event = run_context.context.event
umo = event.unified_msg_origin
agent_name = getattr(tool.agent, "name", None)
subagent_task_id = _register_enhanced_subagent_task(umo, agent_name)
original_task_id = uuid.uuid4().hex
async def _run_handoff_in_background() -> None:
try:
await cls._do_handoff_background(
tool=tool,
run_context=run_context,
task_id=original_task_id,
subagent_task_id=subagent_task_id,
**tool_args,
)
except Exception as e: # noqa: BLE001
logger.error(
"Background handoff %s (%s) failed: %s",
original_task_id,
tool.name,
e,
exc_info=True,
)
asyncio.create_task(_run_handoff_in_background())
text_content = _build_background_submission_message(
agent_name, original_task_id, subagent_task_id
)
yield mcp.types.CallToolResult(content=[text_content])
```
---
### 5. Split enhanced result handling in `_do_handoff_background`
The method currently mixes execution, enhanced result storage, status updates, shared context, and main-agent wakeup logic.
You can keep `_do_handoff_background` as an orchestrator and push the detailed branching into helpers:
```python
def _is_enhanced_subagent(umo: str, agent_name: str | None) -> tuple[bool, T.Any]:
dsm = _get_dynamic_subagent_manager()
if not (dsm and agent_name):
return False, None
session = dsm.get_session(umo)
if session and agent_name in session.subagents:
return True, session
return False, session
```
```python
async def _handle_enhanced_subagent_background_result(
cls,
*,
umo: str,
agent_name: str,
task_id: str | None,
result_text: str,
error_text: str | None,
execution_time: float,
run_context: ContextWrapper[AstrAgentContext],
tool: HandoffTool,
tool_args: dict,
) -> None:
dsm = _get_dynamic_subagent_manager()
if not dsm:
return
success = error_text is None
dsm.store_subagent_result(
session_id=umo,
agent_name=agent_name,
task_id=task_id,
success=success,
result=result_text.strip() if result_text else "",
error=error_text,
execution_time=execution_time,
)
dsm.set_subagent_status(
session_id=umo,
agent_name=agent_name,
status="FAILED" if error_text else "COMPLETED",
)
session = dsm.get_session(umo)
if session and session.shared_context_enabled:
status_content = (
f"[EnhancedSubAgent:BackgroundTask] SubAgent '{agent_name}' Task '{task_id}' "
f"{'Failed: ' + error_text if error_text else f'Complete. Execution Time: {execution_time:.1f}s'}"
)
dsm.add_shared_context(
session_id=umo,
sender=agent_name,
context_type="status",
content=status_content,
target="all",
)
logger.info(
"[EnhancedSubAgent:BackgroundTask] Stored result for %s task %s: success=%s, time=%.1fs",
agent_name,
task_id,
success,
execution_time,
)
if not await cls._maybe_wake_main_agent_after_background(
run_context=run_context,
tool=tool,
task_id=task_id,
agent_name=agent_name,
result_text=result_text,
tool_args=tool_args,
):
return
```
```python
@classmethod
async def _maybe_wake_main_agent_after_background(
cls,
*,
run_context: ContextWrapper[AstrAgentContext],
tool: HandoffTool,
task_id: str,
agent_name: str | None,
result_text: str,
tool_args: dict,
) -> bool:
event = run_context.context.event
try:
context_extra = getattr(run_context.context, "extra", None)
if context_extra and isinstance(context_extra, dict):
main_agent_runner = context_extra.get("main_agent_runner")
main_agent_is_running = (
main_agent_runner is not None and not main_agent_runner.done()
)
else:
main_agent_is_running = False
except Exception as e: # noqa: BLE001
logger.error("Failed to check main agent status: %s", e)
main_agent_is_running = True
if main_agent_is_running:
return False
await cls._wake_main_agent_for_background_result(
run_context=run_context,
task_id=task_id,
tool_name=tool.name,
result_text=result_text,
tool_args=tool_args,
note=(
event.get_extra("background_note")
or f"Background task for subagent '{agent_name}' finished."
),
summary_name=f"Dedicated to subagent `{agent_name}`",
extra_result_fields={"subagent_name": agent_name},
)
return True
```
Then `_do_handoff_background` can collapse to:
```python
@classmethod
async def _do_handoff_background(
cls,
tool: HandoffTool,
run_context: ContextWrapper[AstrAgentContext],
task_id: str,
**tool_args,
) -> None:
start_time = time.time()
result_text = ""
error_text: str | None = None
tool_args = dict(tool_args)
tool_args["image_urls"] = await cls._collect_handoff_image_urls(
run_context, tool_args.get("image_urls")
)
event = run_context.context.event
umo = event.unified_msg_origin
agent_name = getattr(tool.agent, "name", None)
try:
async for r in cls._execute_handoff(
tool,
run_context,
image_urls_prepared=True,
**tool_args,
):
if isinstance(r, mcp.types.CallToolResult):
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"
except Exception as e: # noqa: BLE001
error_text = str(e)
result_text = (
f"error: Background task execution failed, internal error: {e!s}"
)
execution_time = time.time() - start_time
is_enhanced, _ = _is_enhanced_subagent(umo, agent_name)
if is_enhanced:
await _handle_enhanced_subagent_background_result(
cls=cls,
umo=umo,
agent_name=agent_name,
task_id=tool_args.get("subagent_task_id"),
result_text=result_text,
error_text=error_text,
execution_time=execution_time,
run_context=run_context,
tool=tool,
tool_args=tool_args,
)
else:
await cls._wake_main_agent_for_background_result(
run_context=run_context,
task_id=task_id,
tool_name=tool.name,
result_text=result_text,
tool_args=tool_args,
note=(
event.get_extra("background_note")
or f"Background task for subagent '{agent_name}' finished."
),
summary_name=f"Dedicated to subagent `{agent_name}`",
extra_result_fields={"subagent_name": agent_name},
)
```
This keeps the orchestration logic in the high-level methods and moves the deeply branched behavior into focused helpers, preserving your enhanced behavior while making each method easier to scan and maintain.
</issue_to_address>
### Comment 6
<location path="astrbot/core/agent/runners/tool_loop_agent_runner.py" line_range="920" />
<code_context>
- func_tool = self._skill_like_raw_tool_set.get_tool(func_tool_name)
- else:
- func_tool = req.func_tool.get_tool(func_tool_name)
+ # First check if it's a dynamically created subagent tool
+ func_tool = None
+ run_context_context = getattr(self.run_context, "context", None)
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the dynamic tool resolution and dynamic tool registration logic into dedicated helper methods so `_handle_tool_call` remains linear and easier to read.
You can keep the new behavior but pull the dynamic concerns into helpers to reduce branching and duplication in `_handle_tool_call`.
### 1. Extract dynamic tool resolution into a helper
Move the dynamic lookup logic (including the `DynamicSubAgentManager` import and session handling) into a small helper. That keeps `_handle_tool_call` linear and easier to follow:
```python
def _resolve_dynamic_tool(self, func_tool_name: str):
run_context_context = getattr(self.run_context, "context", None)
if run_context_context is None:
return None
event = getattr(run_context_context, "event", None)
if event is None:
return None
session_id = getattr(event, "unified_msg_origin", None)
if not session_id:
return None
try:
from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager
dynamic_handoffs = DynamicSubAgentManager.get_handoff_tools_for_session(session_id)
except Exception:
return None
for h in dynamic_handoffs:
if h.name == func_tool_name or f"transfer_to_{h.name}" == func_tool_name:
return h
return None
```
Then the main code becomes simpler:
```python
if not req.func_tool:
return
# Prefer dynamic tools when available
func_tool = self._resolve_dynamic_tool(func_tool_name)
if func_tool is None:
if self.tool_schema_mode == "skills_like" and self._skill_like_raw_tool_set:
func_tool = self._skill_like_raw_tool_set.get_tool(func_tool_name)
else:
func_tool = req.func_tool.get_tool(func_tool_name)
```
This keeps the hot path readable while preserving the dynamic behavior.
### 2. Extract dynamic-tool creation handling into a helper
Similarly, encapsulate the `__DYNAMIC_TOOL_CREATED__` protocol and registration into a helper that takes the tool result and updates `self.req.func_tool` as needed:
```python
def _maybe_register_dynamic_tool_from_result(self, result_content: str) -> None:
if not result_content.startswith("__DYNAMIC_TOOL_CREATED__:"):
return
parts = result_content.split(":", 3)
if len(parts) < 4:
return
new_tool_name = parts[1]
new_tool_obj_name = parts[2]
logger.info(f"[EnhancedSubAgent] Tool created: {new_tool_name}")
run_context_context = getattr(self.run_context, "context", None)
event = getattr(run_context_context, "event", None) if run_context_context else None
session_id = getattr(event, "unified_msg_origin", None) if event else None
if not session_id:
return
try:
from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager
handoffs = DynamicSubAgentManager.get_handoff_tools_for_session(session_id)
except Exception as e:
logger.warning(f"[EnhancedSubAgent] Failed to load dynamic handoffs: {e}")
return
for handoff in handoffs:
if (
handoff.name == new_tool_obj_name
or handoff.name == new_tool_name.replace("transfer_to_", "")
):
if self.req.func_tool:
self.req.func_tool.add_tool(handoff)
logger.info(f"[EnhancedSubAgent] Added {handoff.name} to func_tool set")
break
```
Then the loop becomes:
```python
if result_parts:
result_content = "\n\n".join(result_parts)
self._maybe_register_dynamic_tool_from_result(result_content)
inline_result = await self._materialize_large_tool_result(
tool_call_id=func_tool_id,
content=result_content,
)
_append_tool_call_result(
func_tool_id,
inline_result + self._build_repeated_tool_call_guidance(
func_tool_name, tool_call_streak
),
)
```
This removes protocol parsing, logging details, and manager lookups from the main loop and makes `_handle_tool_call` focus on “run tool → process result” while still fully supporting dynamic tools.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
faf411f to
0068960
Compare
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 6 issues, and left some high level feedback:
- In SubAgentManager.remove_subagent you directly index session.subagent_status[agent_name] before checking for
sessionexistence or whetheragent_nameis present (and even whenagent_name == 'all'), which can raise KeyError/AttributeError; guard against a missing session and use.get()or early returns before accessing this dict. - In FunctionToolExecutor._execute_handoff the timeout branch calls
_save_subagent_history(agent_name, runner_messages, umo)but the method signature is(umo, runner_messages, agent_name), so history will be written under the wrong key; swap the argument order to match the signature. - WaitForSubagentTool.call assumes
resultis non-None in some error paths (e.g., FAILED branchelsecase usesresult.errorwhenresultmay be None), which will raise; add a separate branch forresult is Noneand avoid dereferencing it there.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In SubAgentManager.remove_subagent you directly index session.subagent_status[agent_name] before checking for `session` existence or whether `agent_name` is present (and even when `agent_name == 'all'`), which can raise KeyError/AttributeError; guard against a missing session and use `.get()` or early returns before accessing this dict.
- In FunctionToolExecutor._execute_handoff the timeout branch calls `_save_subagent_history(agent_name, runner_messages, umo)` but the method signature is `(umo, runner_messages, agent_name)`, so history will be written under the wrong key; swap the argument order to match the signature.
- WaitForSubagentTool.call assumes `result` is non-None in some error paths (e.g., FAILED branch `else` case uses `result.error` when `result` may be None), which will raise; add a separate branch for `result is None` and avoid dereferencing it there.
## Individual Comments
### Comment 1
<location path="astrbot/core/astr_agent_tool_exec.py" line_range="414" />
<code_context>
+ )
+ except asyncio.TimeoutError:
+ # 若超时,保存已产生的部分历史
+ cls._save_subagent_history(agent_name, runner_messages, umo)
+ error_msg = f"SubAgent '{agent_name}' execution timeout after {execution_timeout:.1f} seconds."
+ logger.warning(f"[SubAgent:Timeout] {error_msg}")
</code_context>
<issue_to_address>
**issue (bug_risk):** Arguments to `_save_subagent_history` are passed in the wrong order in the timeout path.
`_save_subagent_history` is defined as `(umo: str, runner_messages: list[Message], agent_name: str)`, but in the timeout branch you call `cls._save_subagent_history(agent_name, runner_messages, umo)`. This swaps `umo` and `agent_name`, so history is written under the wrong key. Match the normal path and call `cls._save_subagent_history(umo, runner_messages, agent_name)` here.
</issue_to_address>
### Comment 2
<location path="astrbot/core/subagent_manager.py" line_range="1078-164" />
<code_context>
+ return session.subagent_background_results[agent_name].get(task_id, None)
+
+ @classmethod
+ def has_subagent_result(
+ cls, session_id: str, agent_name: str, task_id: str | None = None
+ ) -> bool:
+ """检查 SubAgent 是否有结果
+
+ Args:
+ session_id: Session ID
+ agent_name: SubAgent 名称
+ task_id: 任务ID,如果为None则检查是否有任何已完成的任务
+ """
+ session = cls.get_session(session_id)
+ task_store = cls._ensure_task_store(session, agent_name)
+ if not session or not task_store:
</code_context>
<issue_to_address>
**issue (bug_risk):** `has_subagent_result` can raise when session is missing due to calling `_ensure_task_store` on `None`.
`session` is retrieved before `_ensure_task_store` is called, but if `cls.get_session(session_id)` returns `None`, `_ensure_task_store` is invoked with `None` and can raise before the `if not session or not task_store` check runs. Please guard against `session is None` and return early, then call `_ensure_task_store` only when a valid session exists, consistent with other accessors.
</issue_to_address>
### Comment 3
<location path="astrbot/core/subagent_manager.py" line_range="1167-1170" />
<code_context>
+ "type": "string",
+ "description": "Subagent system_prompt",
+ },
+ "tools": {
+ "type": "array",
+ "items": {"type": "string"},
+ "description": "Tools available to subagent, can be empty.",
+ },
+ "skills": {
</code_context>
<issue_to_address>
**nitpick (bug_risk):** The `tools`/`skills` defaults in `CreateSubAgentTool.call` don't align with the JSON schema types.
In `CreateSubAgentTool.call`, `tools = kwargs.get("tools", {})` and `skills = kwargs.get("skills", {})` conflict with the JSON schema, which defines both as arrays. While `set({})` currently yields an empty set, using `{}` as the default is type-misleading and brittle. Prefer `None`/`[]` and normalize before `set`, e.g. `tools = set(kwargs.get("tools") or [])`, to align with the schema and avoid subtle type issues if callers pass unexpected shapes.
</issue_to_address>
### Comment 4
<location path="astrbot/core/subagent_manager.py" line_range="901" />
<code_context>
+ return list(session.handoff_tools.values())
+
+ @classmethod
+ def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
+ """为 SubAgent 创建一个 pending 任务,返回 task_id
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the background-task dict handling into a dedicated BackgroundTaskStore helper and moving path-safety checks into a shared utility to simplify SubAgentManager and CreateSubAgentTool without changing behavior.
You can reduce complexity in a few high‑impact, low‑risk spots without changing behavior. Two concrete refactors:
---
### 1. Encapsulate background task dict juggling in a `BackgroundTaskStore`
All of these methods manually juggle nested dicts on `session.subagent_background_results` / `session.background_task_counters`:
- `create_pending_subagent_task`
- `_ensure_task_store`
- `_is_task_completed`
- `get_pending_subagent_tasks`
- `get_latest_task_id`
- `store_subagent_result`
- `get_subagent_result`
- `has_subagent_result`
- `clear_subagent_result`
You can move the dict logic into a small object per agent and keep `SubAgentManager` methods thin. This keeps functionality but drops a lot of low-level dict manipulation from the manager.
Example (core idea, minimal change):
```python
# new helper, can live in same module initially
@dataclass
class BackgroundTaskStore:
results: dict[str, SubAgentExecutionResult] = field(default_factory=dict)
counter: int = 0
def create_pending(self, agent_name: str) -> str:
self.counter += 1
task_id = str(self.counter)
self.results[task_id] = SubAgentExecutionResult(
task_id=task_id,
agent_name=agent_name,
success=False,
result=None,
created_at=time.time(),
metadata={},
)
return task_id
def pending_ids(self) -> list[str]:
return [
tid for tid, res in self.results.items()
if res.completed_at <= 0 and res.error is None
]
def latest_id(self) -> str | None:
if not self.results:
return None
tid, _ = max(self.results.items(), key=lambda x: x[1].created_at)
return tid
def store_result(
self,
agent_name: str,
success: bool,
result: str,
task_id: str | None = None,
error: str | None = None,
execution_time: float = 0.0,
metadata: dict | None = None,
) -> None:
if task_id is None:
pending = self.pending_ids()
if not pending:
return
task_id = pending[-1]
if task_id not in self.results:
self.results[task_id] = SubAgentExecutionResult(
task_id=task_id,
agent_name=agent_name,
success=False,
result="",
created_at=time.time(),
metadata=metadata or {},
)
r = self.results[task_id]
r.success = success
r.result = result
r.error = error
r.execution_time = execution_time
r.completed_at = time.time()
if metadata:
r.metadata.update(metadata)
def latest_completed(self) -> SubAgentExecutionResult | None:
completed = [
r for r in self.results.values()
if r.result != "" or r.completed_at > 0
]
if not completed:
return None
return max(completed, key=lambda r: r.created_at)
def get(self, task_id: str) -> SubAgentExecutionResult | None:
return self.results.get(task_id)
def has_result(self, task_id: str | None = None) -> bool:
if task_id is None:
return any(r.result != "" or r.completed_at > 0 for r in self.results.values())
r = self.results.get(task_id)
return bool(r and (r.result != "" or r.completed_at > 0))
def clear(self, task_id: str | None = None) -> None:
if task_id is None:
self.results.clear()
self.counter = 0
else:
self.results.pop(task_id, None)
```
Then in `SubAgentSession`:
```python
@dataclass
class SubAgentSession:
...
subagent_background_results: dict[str, BackgroundTaskStore] = field(default_factory=dict)
```
And `SubAgentManager` becomes mostly delegations:
```python
@classmethod
def _get_task_store(cls, session: SubAgentSession, agent_name: str) -> BackgroundTaskStore:
if agent_name not in session.subagent_background_results:
session.subagent_background_results[agent_name] = BackgroundTaskStore()
return session.subagent_background_results[agent_name]
@classmethod
def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
session = cls._get_or_create_session(session_id)
if session.subagent_status[agent_name] == "RUNNING":
return f"__PENDING_TASK_CREATE_FAILED__: Subagent {agent_name} already running"
store = cls._get_task_store(session, agent_name)
return store.create_pending(agent_name)
@classmethod
def get_pending_subagent_tasks(cls, session_id: str, agent_name: str) -> list[str]:
session = cls.get_session(session_id)
if not session:
return []
return cls._get_task_store(session, agent_name).pending_ids()
@classmethod
def store_subagent_result(...):
session = cls._get_or_create_session(session_id)
store = cls._get_task_store(session, agent_name)
store.store_result(agent_name, success, result, task_id, error, execution_time, metadata)
@classmethod
def get_subagent_result(...):
session = cls.get_session(session_id)
if not session or agent_name not in session.subagent_background_results:
return None
store = cls._get_task_store(session, agent_name)
return store.get(task_id) if task_id is not None else store.latest_completed()
@classmethod
def has_subagent_result(...):
session = cls.get_session(session_id)
if not session:
return False
store = cls._get_task_store(session, agent_name)
return store.has_result(task_id)
@classmethod
def clear_subagent_result(...):
session = cls.get_session(session_id)
if not session:
return
store = cls._get_task_store(session, agent_name)
store.clear(task_id)
```
This keeps the same external API and behavior but substantially shrinks the manager’s internal complexity around background tasks.
---
### 2. Extract `_check_path_safety` into a shared utility
`CreateSubAgentTool._check_path_safety` mixes OS‑specific logic into the tool class and duplicates environment concerns in this already-large module. You can move it to a separate helper (same module or `utils/path_safety.py`) and reuse it elsewhere if needed.
Example:
```python
# new file: astrbot/core/utils/path_safety.py
import os
import platform
def is_safe_workdir(path_str: str) -> bool:
if not path_str or not isinstance(path_str, str):
return False
if not os.path.isabs(path_str):
return False
try:
resolved = os.path.realpath(path_str)
except (OSError, ValueError):
return False
path_parts = {part.lower() for part in os.path.normpath(resolved).split(os.sep)}
system = platform.system().lower()
if system == "windows":
dangerous = {
"windows", "system32", "syswow64", "boot", "recovery",
"programdata", "$recycle.bin", "system volume information",
}
if path_parts & dangerous:
return False
elif system == "linux":
for prefix in ["/etc", "/bin", "/sbin", "/lib", "/lib64",
"/boot", "/dev", "/proc", "/sys", "/root"]:
if resolved == prefix or resolved.startswith(prefix + os.sep):
return False
elif system == "darwin":
for prefix in ["/System", "/Library", "/private/var", "/usr"]:
if resolved == prefix or resolved.startswith(prefix + os.sep):
return False
if ".." in path_str:
return False
if not os.path.exists(resolved):
return False
return True
```
Then `CreateSubAgentTool` becomes simpler and more testable:
```python
from astrbot.core.utils.path_safety import is_safe_workdir
@dataclass
class CreateSubAgentTool(FunctionTool):
...
async def call(self, context, **kwargs) -> str:
...
workdir = kwargs.get("workdir")
if not workdir or not is_safe_workdir(workdir):
workdir = get_astrbot_temp_path()
...
```
This removes system/path concerns from the tool class and centralizes safety logic in one place.
</issue_to_address>
### Comment 5
<location path="astrbot/core/astr_agent_tool_exec.py" line_range="370" />
<code_context>
- tool_call_timeout=run_context.tool_call_timeout,
- stream=stream,
+
+ # 获取子代理的历史上下文
+ subagent_history, agent_name = cls._load_subagent_history(umo, tool)
+ # 如果有历史上下文,合并到 contexts 中
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting shared timeout handling and SubAgentManager access into small helpers and making `_save_subagent_history` keyword-only to simplify subagent orchestration logic and avoid subtle call bugs.
You can reduce the new complexity around subagent orchestration without changing behavior by:
1. **Collapsing duplicated timeout logic**
`_execute_handoff` and `_do_handoff_background` both wrap inner coroutines with `asyncio.wait_for`, handle `TimeoutError`, and do subagent-specific reporting. Extract that into a small shared helper so each caller reads linearly.
For example:
```python
@staticmethod
async def _run_subagent_with_timeout(
coro: T.Coroutine[T.Any, T.Any, T.Any],
*,
timeout: float,
on_timeout: T.Callable[[], None],
):
if timeout > 0:
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
on_timeout()
raise
else:
return await coro
```
Then in `_execute_handoff`:
```python
execution_timeout = cls._get_subagent_execution_timeout()
async def _run_subagent():
return await ctx.tool_loop_agent(
event=event,
chat_provider_id=prov_id,
prompt=input_,
image_urls=image_urls,
system_prompt=subagent_system_prompt,
tools=toolset,
contexts=contexts,
max_steps=agent_max_step,
tool_call_timeout=run_context.tool_call_timeout,
stream=stream,
runner_messages=runner_messages,
)
def _on_timeout():
cls._save_subagent_history(umo=umo, runner_messages=runner_messages, agent_name=agent_name)
error_msg = f"SubAgent '{agent_name}' execution timeout after {execution_timeout:.1f} seconds."
logger.warning(f"[SubAgent:Timeout] {error_msg}")
cls._handle_subagent_timeout(umo=umo, agent_name=agent_name)
try:
llm_resp = await cls._run_subagent_with_timeout(
_run_subagent(), timeout=execution_timeout, on_timeout=_on_timeout
)
except asyncio.TimeoutError:
yield mcp.types.CallToolResult(
content=[mcp.types.TextContent(type="text", text=f"error: SubAgent '{agent_name}' execution timeout after {execution_timeout:.1f} seconds.")]
)
return
```
And in `_do_handoff_background`:
```python
execution_timeout = cls._get_subagent_execution_timeout()
async def _run():
nonlocal result_text
async for r in cls._execute_handoff(
tool,
run_context,
image_urls_prepared=True,
**tool_args,
):
if isinstance(r, mcp.types.CallToolResult):
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"
def _on_timeout():
nonlocal error_text, result_text
error_text = f"Execution timeout after {execution_timeout:.1f} seconds."
result_text = f"error: Background SubAgent '{agent_name}' {error_text}"
logger.warning(f"[SubAgent:BackgroundTask] {error_text}")
try:
await cls._run_subagent_with_timeout(
_run(), timeout=execution_timeout, on_timeout=_on_timeout
)
except asyncio.TimeoutError:
# _on_timeout already set error_text/result_text
pass
```
This keeps all features (history save, status update, logging) but removes duplicated timeout/control-flow branches and makes the core logic easier to follow.
2. **Tighten the `SubAgentManager` coupling behind small helpers**
Several helpers repetitively reach into `SubAgentManager` (`_register_subagent_task`, `_is_managed_subagent`, `_handle_subagent_timeout`, parts of `_execute_handoff_background`). You can introduce a tiny, *local* façade instead of scattering direct imports:
```python
class _SubagentRuntime:
def __init__(self, umo: str, agent_name: str | None):
from astrbot.core.subagent_manager import SubAgentManager
self._mgr = SubAgentManager
self.umo = umo
self.agent_name = agent_name
def is_managed(self) -> bool:
if not self.agent_name:
return False
session = self._mgr.get_session(self.umo)
return bool(session and self.agent_name in session.subagents)
def set_status(self, status: str) -> None:
if self.agent_name:
self._mgr.set_subagent_status(
session_id=self.umo, agent_name=self.agent_name, status=status
)
def create_pending_task(self) -> str | None:
# inner logic from _register_subagent_task, simplified…
...
```
Then `_execute_handoff_background`, `_do_handoff_background`, `_handle_subagent_timeout`, `_is_managed_subagent`, and parts of `_handle_subagent_background_result` can operate through `_SubagentRuntime` instead of each doing their own session lookups/imports. This reduces the “spiderweb” coupling without changing behavior.
3. **Make `_save_subagent_history` harder to misuse**
In the timeout branch of `_execute_handoff` you currently call:
```python
cls._save_subagent_history(agent_name, runner_messages, umo)
```
while the signature is:
```python
def _save_subagent_history(umo: str, runner_messages: list[Message], agent_name: str) -> None:
```
To both fix argument order and reduce future complexity/bugs, make the helper keyword-only and always call it with names:
```python
@staticmethod
def _save_subagent_history(
*, umo: str, runner_messages: list[Message], agent_name: str
) -> None:
if agent_name and runner_messages:
from astrbot.core.subagent_manager import SubAgentManager
SubAgentManager.update_subagent_history(umo, agent_name, runner_messages)
```
Usage:
```python
cls._save_subagent_history(
umo=umo, runner_messages=runner_messages, agent_name=agent_name
)
```
This keeps functionality identical but removes a subtle source of bugs and cognitive load.
</issue_to_address>
### Comment 6
<location path="astrbot/core/agent/runners/tool_loop_agent_runner.py" line_range="1020" />
<code_context>
if not req.func_tool:
return
+ # Prefer dynamic tools when available
+ func_tool = self._resolve_dynamic_subagent_tool(func_tool_name)
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting small helper methods to centralize tool resolution, session/handoff access, and interruption-message building so the main loop and abort logic stay simpler and more readable.
You can keep all the new behavior but reduce complexity and duplication by extracting a few small helpers and unifying the tool resolution path.
### 1. Unify tool resolution (avoid double-resolution + branching in-loop)
Right now `_execute_tool_calls` effectively computes `func_tool` multiple times:
- You first call `_resolve_dynamic_subagent_tool`.
- Then you re-run the old `skills_like` / `req.func_tool.get_tool` logic, potentially overwriting the dynamic result.
This both increases mental load and risks subtle bugs. A small helper that encapsulates all resolution rules keeps the loop simpler:
```python
def _get_func_tool(self, func_tool_name: str):
# Prefer dynamic tools when available
func_tool = self._resolve_dynamic_subagent_tool(func_tool_name)
if func_tool is not None:
return func_tool
# Fallback to existing tool schema modes
if (
self.tool_schema_mode == "skills_like"
and self._skill_like_raw_tool_set
):
return self._skill_like_raw_tool_set.get_tool(func_tool_name)
if self.req.func_tool:
return self.req.func_tool.get_tool(func_tool_name)
return None
```
Then in the main loop:
```python
if not req.func_tool:
return
func_tool = self._get_func_tool(func_tool_name)
if func_tool is None:
# keep existing error/reporting behavior
...
```
This keeps dynamic routing logic out of the loop body and centralizes the policy.
### 2. Deduplicate session / handoff access for dynamic tools
Both `_resolve_dynamic_subagent_tool` and `_maybe_register_dynamic_tool_from_result` manually walk `run_context.context.event` and import `SubAgentManager`. That coupling can be kept but hidden behind a narrow helper so the runner doesn’t repeat low-level details:
```python
def _get_session_handoffs(self):
run_context_context = getattr(self.run_context, "context", None)
event = getattr(run_context_context, "event", None) if run_context_context else None
session_id = getattr(event, "unified_msg_origin", None) if event else None
if not session_id:
return None, []
try:
from astrbot.core.subagent_manager import SubAgentManager
handoffs = SubAgentManager.get_handoff_tools_for_session(session_id)
except Exception as e:
logger.warning(f"[SubAgent] Failed to load dynamic handoffs: {e}")
return session_id, []
return session_id, handoffs
```
Use it in both places:
```python
def _resolve_dynamic_subagent_tool(self, func_tool_name: str):
session_id, dynamic_handoffs = self._get_session_handoffs()
if not session_id:
return None
for h in dynamic_handoffs:
if h.name == func_tool_name or f"transfer_to_{h.name}" == func_tool_name:
return h
return None
```
```python
def _maybe_register_dynamic_tool_from_result(self, result_content: str) -> None:
parse_result = self._parse_dynamic_tool_creation(result_content)
if not parse_result:
return
new_tool_name, new_tool_obj_name = parse_result
session_id, handoffs = self._get_session_handoffs()
if not session_id:
return
for handoff in handoffs:
if (
handoff.name == new_tool_obj_name
or handoff.name == new_tool_name.replace("transfer_to_", "")
):
if self.req.func_tool:
self.req.func_tool.add_tool(handoff)
logger.info(f"[SubAgent] Added {handoff.name} to func_tool set")
break
```
And the parser stays small and focused:
```python
def _parse_dynamic_tool_creation(self, result_content: str) -> tuple[str, str] | None:
if not result_content.startswith("__DYNAMIC_TOOL_CREATED__:"):
return None
parts = result_content.split(":", 3)
if len(parts) < 4:
return None
return parts[1], parts[2] # new_tool_name, new_tool_obj_name
```
This keeps the magic-string protocol and `SubAgentManager` knowledge in one place, reducing complexity elsewhere.
### 3. Simplify abort-finalization branching
`_finalize_aborted_step` now mixes logging, message selection, and state transitions. The only real variability is the interruption message. You can make that explicit:
```python
def _build_interruption_message(self, manual_stop: bool) -> str:
if manual_stop:
return (
"[SYSTEM: SubAgent was manually stopped by main agent. "
"Partial output before interruption is preserved.]"
)
return self.USER_INTERRUPTION_MESSAGE
```
Then `_finalize_aborted_step` becomes easier to scan:
```python
async def _finalize_aborted_step(
self,
llm_resp: LLMResponse | None = None,
manual_stop: bool = False,
) -> AgentResponse:
logger.info(
"SubAgent execution was manually stopped by main agent."
if manual_stop
else "Agent execution was requested to stop by user."
)
if llm_resp is None:
llm_resp = LLMResponse(role="assistant", completion_text="")
if llm_resp.role != "assistant":
llm_resp = LLMResponse(
role="assistant",
completion_text=self._build_interruption_message(manual_stop),
)
self.final_llm_resp = llm_resp
self._aborted = True
self._transition_state(AgentState.DONE)
self.stats.end_time = time.time()
...
```
This preserves the manual vs user-stop behavior while localizing the branching logic, so the method’s main flow (log → normalize llm_resp → finalize) is clearer.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| return list(session.handoff_tools.values()) | ||
|
|
||
| @classmethod | ||
| def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str: |
There was a problem hiding this comment.
issue (complexity): Consider extracting the background-task dict handling into a dedicated BackgroundTaskStore helper and moving path-safety checks into a shared utility to simplify SubAgentManager and CreateSubAgentTool without changing behavior.
You can reduce complexity in a few high‑impact, low‑risk spots without changing behavior. Two concrete refactors:
1. Encapsulate background task dict juggling in a BackgroundTaskStore
All of these methods manually juggle nested dicts on session.subagent_background_results / session.background_task_counters:
create_pending_subagent_task_ensure_task_store_is_task_completedget_pending_subagent_tasksget_latest_task_idstore_subagent_resultget_subagent_resulthas_subagent_resultclear_subagent_result
You can move the dict logic into a small object per agent and keep SubAgentManager methods thin. This keeps functionality but drops a lot of low-level dict manipulation from the manager.
Example (core idea, minimal change):
# new helper, can live in same module initially
@dataclass
class BackgroundTaskStore:
results: dict[str, SubAgentExecutionResult] = field(default_factory=dict)
counter: int = 0
def create_pending(self, agent_name: str) -> str:
self.counter += 1
task_id = str(self.counter)
self.results[task_id] = SubAgentExecutionResult(
task_id=task_id,
agent_name=agent_name,
success=False,
result=None,
created_at=time.time(),
metadata={},
)
return task_id
def pending_ids(self) -> list[str]:
return [
tid for tid, res in self.results.items()
if res.completed_at <= 0 and res.error is None
]
def latest_id(self) -> str | None:
if not self.results:
return None
tid, _ = max(self.results.items(), key=lambda x: x[1].created_at)
return tid
def store_result(
self,
agent_name: str,
success: bool,
result: str,
task_id: str | None = None,
error: str | None = None,
execution_time: float = 0.0,
metadata: dict | None = None,
) -> None:
if task_id is None:
pending = self.pending_ids()
if not pending:
return
task_id = pending[-1]
if task_id not in self.results:
self.results[task_id] = SubAgentExecutionResult(
task_id=task_id,
agent_name=agent_name,
success=False,
result="",
created_at=time.time(),
metadata=metadata or {},
)
r = self.results[task_id]
r.success = success
r.result = result
r.error = error
r.execution_time = execution_time
r.completed_at = time.time()
if metadata:
r.metadata.update(metadata)
def latest_completed(self) -> SubAgentExecutionResult | None:
completed = [
r for r in self.results.values()
if r.result != "" or r.completed_at > 0
]
if not completed:
return None
return max(completed, key=lambda r: r.created_at)
def get(self, task_id: str) -> SubAgentExecutionResult | None:
return self.results.get(task_id)
def has_result(self, task_id: str | None = None) -> bool:
if task_id is None:
return any(r.result != "" or r.completed_at > 0 for r in self.results.values())
r = self.results.get(task_id)
return bool(r and (r.result != "" or r.completed_at > 0))
def clear(self, task_id: str | None = None) -> None:
if task_id is None:
self.results.clear()
self.counter = 0
else:
self.results.pop(task_id, None)Then in SubAgentSession:
@dataclass
class SubAgentSession:
...
subagent_background_results: dict[str, BackgroundTaskStore] = field(default_factory=dict)And SubAgentManager becomes mostly delegations:
@classmethod
def _get_task_store(cls, session: SubAgentSession, agent_name: str) -> BackgroundTaskStore:
if agent_name not in session.subagent_background_results:
session.subagent_background_results[agent_name] = BackgroundTaskStore()
return session.subagent_background_results[agent_name]
@classmethod
def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
session = cls._get_or_create_session(session_id)
if session.subagent_status[agent_name] == "RUNNING":
return f"__PENDING_TASK_CREATE_FAILED__: Subagent {agent_name} already running"
store = cls._get_task_store(session, agent_name)
return store.create_pending(agent_name)
@classmethod
def get_pending_subagent_tasks(cls, session_id: str, agent_name: str) -> list[str]:
session = cls.get_session(session_id)
if not session:
return []
return cls._get_task_store(session, agent_name).pending_ids()
@classmethod
def store_subagent_result(...):
session = cls._get_or_create_session(session_id)
store = cls._get_task_store(session, agent_name)
store.store_result(agent_name, success, result, task_id, error, execution_time, metadata)
@classmethod
def get_subagent_result(...):
session = cls.get_session(session_id)
if not session or agent_name not in session.subagent_background_results:
return None
store = cls._get_task_store(session, agent_name)
return store.get(task_id) if task_id is not None else store.latest_completed()
@classmethod
def has_subagent_result(...):
session = cls.get_session(session_id)
if not session:
return False
store = cls._get_task_store(session, agent_name)
return store.has_result(task_id)
@classmethod
def clear_subagent_result(...):
session = cls.get_session(session_id)
if not session:
return
store = cls._get_task_store(session, agent_name)
store.clear(task_id)This keeps the same external API and behavior but substantially shrinks the manager’s internal complexity around background tasks.
2. Extract _check_path_safety into a shared utility
CreateSubAgentTool._check_path_safety mixes OS‑specific logic into the tool class and duplicates environment concerns in this already-large module. You can move it to a separate helper (same module or utils/path_safety.py) and reuse it elsewhere if needed.
Example:
# new file: astrbot/core/utils/path_safety.py
import os
import platform
def is_safe_workdir(path_str: str) -> bool:
if not path_str or not isinstance(path_str, str):
return False
if not os.path.isabs(path_str):
return False
try:
resolved = os.path.realpath(path_str)
except (OSError, ValueError):
return False
path_parts = {part.lower() for part in os.path.normpath(resolved).split(os.sep)}
system = platform.system().lower()
if system == "windows":
dangerous = {
"windows", "system32", "syswow64", "boot", "recovery",
"programdata", "$recycle.bin", "system volume information",
}
if path_parts & dangerous:
return False
elif system == "linux":
for prefix in ["/etc", "/bin", "/sbin", "/lib", "/lib64",
"/boot", "/dev", "/proc", "/sys", "/root"]:
if resolved == prefix or resolved.startswith(prefix + os.sep):
return False
elif system == "darwin":
for prefix in ["/System", "/Library", "/private/var", "/usr"]:
if resolved == prefix or resolved.startswith(prefix + os.sep):
return False
if ".." in path_str:
return False
if not os.path.exists(resolved):
return False
return TrueThen CreateSubAgentTool becomes simpler and more testable:
from astrbot.core.utils.path_safety import is_safe_workdir
@dataclass
class CreateSubAgentTool(FunctionTool):
...
async def call(self, context, **kwargs) -> str:
...
workdir = kwargs.get("workdir")
if not workdir or not is_safe_workdir(workdir):
workdir = get_astrbot_temp_path()
...This removes system/path concerns from the tool class and centralizes safety logic in one place.
| if not req.func_tool: | ||
| return | ||
|
|
||
| # Prefer dynamic tools when available |
There was a problem hiding this comment.
issue (complexity): Consider extracting small helper methods to centralize tool resolution, session/handoff access, and interruption-message building so the main loop and abort logic stay simpler and more readable.
You can keep all the new behavior but reduce complexity and duplication by extracting a few small helpers and unifying the tool resolution path.
1. Unify tool resolution (avoid double-resolution + branching in-loop)
Right now _execute_tool_calls effectively computes func_tool multiple times:
- You first call
_resolve_dynamic_subagent_tool. - Then you re-run the old
skills_like/req.func_tool.get_toollogic, potentially overwriting the dynamic result.
This both increases mental load and risks subtle bugs. A small helper that encapsulates all resolution rules keeps the loop simpler:
def _get_func_tool(self, func_tool_name: str):
# Prefer dynamic tools when available
func_tool = self._resolve_dynamic_subagent_tool(func_tool_name)
if func_tool is not None:
return func_tool
# Fallback to existing tool schema modes
if (
self.tool_schema_mode == "skills_like"
and self._skill_like_raw_tool_set
):
return self._skill_like_raw_tool_set.get_tool(func_tool_name)
if self.req.func_tool:
return self.req.func_tool.get_tool(func_tool_name)
return NoneThen in the main loop:
if not req.func_tool:
return
func_tool = self._get_func_tool(func_tool_name)
if func_tool is None:
# keep existing error/reporting behavior
...This keeps dynamic routing logic out of the loop body and centralizes the policy.
2. Deduplicate session / handoff access for dynamic tools
Both _resolve_dynamic_subagent_tool and _maybe_register_dynamic_tool_from_result manually walk run_context.context.event and import SubAgentManager. That coupling can be kept but hidden behind a narrow helper so the runner doesn’t repeat low-level details:
def _get_session_handoffs(self):
run_context_context = getattr(self.run_context, "context", None)
event = getattr(run_context_context, "event", None) if run_context_context else None
session_id = getattr(event, "unified_msg_origin", None) if event else None
if not session_id:
return None, []
try:
from astrbot.core.subagent_manager import SubAgentManager
handoffs = SubAgentManager.get_handoff_tools_for_session(session_id)
except Exception as e:
logger.warning(f"[SubAgent] Failed to load dynamic handoffs: {e}")
return session_id, []
return session_id, handoffsUse it in both places:
def _resolve_dynamic_subagent_tool(self, func_tool_name: str):
session_id, dynamic_handoffs = self._get_session_handoffs()
if not session_id:
return None
for h in dynamic_handoffs:
if h.name == func_tool_name or f"transfer_to_{h.name}" == func_tool_name:
return h
return Nonedef _maybe_register_dynamic_tool_from_result(self, result_content: str) -> None:
parse_result = self._parse_dynamic_tool_creation(result_content)
if not parse_result:
return
new_tool_name, new_tool_obj_name = parse_result
session_id, handoffs = self._get_session_handoffs()
if not session_id:
return
for handoff in handoffs:
if (
handoff.name == new_tool_obj_name
or handoff.name == new_tool_name.replace("transfer_to_", "")
):
if self.req.func_tool:
self.req.func_tool.add_tool(handoff)
logger.info(f"[SubAgent] Added {handoff.name} to func_tool set")
breakAnd the parser stays small and focused:
def _parse_dynamic_tool_creation(self, result_content: str) -> tuple[str, str] | None:
if not result_content.startswith("__DYNAMIC_TOOL_CREATED__:"):
return None
parts = result_content.split(":", 3)
if len(parts) < 4:
return None
return parts[1], parts[2] # new_tool_name, new_tool_obj_nameThis keeps the magic-string protocol and SubAgentManager knowledge in one place, reducing complexity elsewhere.
3. Simplify abort-finalization branching
_finalize_aborted_step now mixes logging, message selection, and state transitions. The only real variability is the interruption message. You can make that explicit:
def _build_interruption_message(self, manual_stop: bool) -> str:
if manual_stop:
return (
"[SYSTEM: SubAgent was manually stopped by main agent. "
"Partial output before interruption is preserved.]"
)
return self.USER_INTERRUPTION_MESSAGEThen _finalize_aborted_step becomes easier to scan:
async def _finalize_aborted_step(
self,
llm_resp: LLMResponse | None = None,
manual_stop: bool = False,
) -> AgentResponse:
logger.info(
"SubAgent execution was manually stopped by main agent."
if manual_stop
else "Agent execution was requested to stop by user."
)
if llm_resp is None:
llm_resp = LLMResponse(role="assistant", completion_text="")
if llm_resp.role != "assistant":
llm_resp = LLMResponse(
role="assistant",
completion_text=self._build_interruption_message(manual_stop),
)
self.final_llm_resp = llm_resp
self._aborted = True
self._transition_state(AgentState.DONE)
self.stats.end_time = time.time()
...This preserves the manual vs user-stop behavior while localizing the branching logic, so the method’s main flow (log → normalize llm_resp → finalize) is clearer.
Modifications / 改动点
一、概述
fixes #6954
增强版SubAgent对AstrBot的Subagent进行了功能扩充,支持两种使用方式:
subagent_orchestrator.agents在配置文件中预定义子代理(原版实现)所有子代理(无论静态还是动态)都由统一的
SubAgentManager管理,支持以下高级功能:独立上下文记忆、独立工具隔离、工作目录隔离、技能隔离、公共上下文共享、超时限制、后台任务模式等。核心特性
SubAgentManager统一管理create_subagent在运行时动态创建子代理wait_for_subagent阻塞式等待输出二、配置说明
2.1 配置项说明
在
cmd_config.json中通过subagent_orchestrator配置块进行设置:{ "subagent_orchestrator": { "main_enable": false, "remove_main_duplicate_tools": false, "router_system_prompt": "...", "agents": [ { "name": "static_subagent", "persona_id": "default", "public_description": "测试子代理(静态)", "enabled": true, "provider_id": "kimi-code/kimi-for-coding" } ], "enable_dynamic": false, "max_dynamic_subagent_count": 3, "auto_cleanup_per_turn": true, "shared_context_enabled": true, "shared_context_maxlen": 300, "max_subagent_history": 300, "execution_timeout": 1200, "tools_blacklist": [ "send_shared_context_for_main_agent", "create_subagent", "protect_subagent", "unprotect_subagent", "reset_subagent", "remove_subagent", "list_subagents", "wait_for_subagent", "view_shared_context" ], "tools_inherent": [ "astrbot_execute_shell", "astrbot_execute_python" ] } }2.2 配置项详解
main_enablefalseremove_main_duplicate_toolsfalserouter_system_prompt""agents[]enable_dynamicfalsemax_dynamic_subagent_count3auto_cleanup_per_turntruetools_blacklisttools_inherentshared_context_enabledtrueshared_context_maxlen300max_subagent_history300execution_timeout1200.0三、核心文件
3.1 subagent_manager.py
文件路径:
astrbot/core/subagent_manager.py核心数据结构:
核心类:
SubAgentManagerconfigure()get_execution_timeout()is_auto_cleanup_per_turn()is_shared_context_enabled()register_blacklisted_tool(tool_name)register_inherent_tool(tool_name)create_subagent(session_id, config, protected=False)register_static_subagent(session_id, handoff_tool, skills, workdir)remove_subagent(session_id, agent_name)cleanup_session_turn_end(session_id)protect_subagent(session_id, agent_name)is_protected(session_id, agent_name)update_subagent_history(session_id, agent_name, current_messages)get_subagent_history(session_id, agent_name)clear_subagent_history(session_id, agent_name)build_static_subagent_prompts(session_id, agent_name)build_dynamic_subagent_prompts(session_id, agent_name, runtime)build_task_router_prompt(session_id)get_subagent_tools(session_id, agent_name)add_shared_context(session_id, sender, context_type, content, target)get_shared_context(session_id, filter_by_agent)_build_shared_context_prompt(session_id, agent_name)create_pending_subagent_task(session_id, agent_name)get_pending_subagent_tasks(session_id, agent_name)get_latest_task_id(session_id, agent_name)store_subagent_result(session_id, agent_name, success, result, task_id, error, execution_time, metadata)get_subagent_result(session_id, agent_name, task_id)has_subagent_result(session_id, agent_name, task_id)clear_subagent_result(session_id, agent_name, task_id)get_subagent_status(session_id, agent_name)get_all_subagent_status(session_id)set_subagent_status(session_id, agent_name, status)cleanup_shared_context_by_agent(session_id, agent_name)clear_shared_context(session_id)set_shared_context_enabled(session_id, enabled)get_handoff_tools_for_session(session_id)cleanup_session(session_id)四、功能实现
4.1 静态子代理注册
功能描述
原版由配置文件定义的静态子代理在
build_main_agent()时自动注册到SubAgentManager,享受历史记忆、公共上下文、Skills调用等增强功能。实现方式
astrbot/core/subagent_orchestrator.pySubAgentOrchestrator.register_static_subagents_to_manager()astr_main_agent.py的_apply_subagent_tools()中注册流程
build_main_agent()构建主Agent时调用_apply_subagent_tools()_apply_subagent_tools()调用so.register_static_subagents_to_manager(session_id)SubAgentOrchestrator.handoffs中的所有静态 handoff 工具SubAgentManager.register_static_subagent()register_static_subagent()内部:HandoffTool提取 agent 信息SubAgentConfigcreate_subagent(session_id, config, protected=True)4.2 动态创建子代理
功能描述
当动态子代理功能开启时,主Agent可以通过
create_subagent工具动态创建子代理,每个子代理拥有独立的人设、工具、技能和工作目录配置。随后系统会创建对应的transfer_to_xxx工具实现方式
astrbot/core/subagent_manager.pyCreateSubAgentToolname: 子代理名称system_prompt: 子代理的人设和系统提示tools: 可用工具列表(字符串名称)skills: 可用技能列表(字符串名称)workdir: 子代理工作目录(绝对路径,可选)使用示例
名称验证规则
^[a-zA-Z][a-zA-Z0-9_]{0,31}$)创建后流程
astrbot_execute_shell,astrbot_execute_python。若公共上下文启用,还会添加send_shared_context)SubAgentConfig配置对象Agent和HandoffTool对象SubAgentManager__DYNAMIC_TOOL_CREATED__标记的消息,触发工具schema刷新工作目录安全检查
CreateSubAgentTool._check_path_safety()对传入的workdir进行安全验证:..windows,system32,syswow64,boot等/etc,/bin,/sbin,/root等/System,/Library,/private/var,/usr等验证失败时,工作目录回退到
get_astrbot_temp_path()。工具黑名单与固有工具
4.3 子代理委派 (transfer_to_xxx)
功能描述
创建子代理后,主Agent使用
transfer_to_xxx工具将任务委派给对应子代理。实现方式
astrbot/core/astr_agent_tool_exec.pyFunctionToolExecutor._execute_handoff()/_execute_handoff_background()子代理System Prompt构建
将子代理的system_prompt构建分为静态部分和动态部分,尽可能增加缓存命中率
静态提示词
build_static_subagent_prompts():动态提示词
build_dynamic_subagent_prompts():委派执行流程
transfer_to_xxx工具ToolLoopAgentRunner._handle_function_tools()在SubAgentManager中查找handoff工具HandoffTool对象FunctionToolExecutor._execute_handoff()执行委派send_shared_context工具到子代理工具集provider_id使用不同的LLM提供商_get_subagent_execution_timeout()获取超时时间asyncio.wait_for()添加执行超时控制_handle_subagent_timeout()将状态设为 "FAILED"历史上下文注入流程
subagent_histories获取历史消息Message对象begin_dialogs之前runner_messages(Agent运行期间的所有消息)追加到历史max_subagent_history(默认300条),超出时保留最新的。(此处仅用于约束数组长度,实际上下文长度仍由astrbot的truncate管理)执行超时控制
4.4 后台任务等待
功能描述
当主Agent认为某个任务耗时很长时,可以让SubAgent以后台模式运行,主Agent不会被阻塞,可继续执行其他可并行的任务
原版Subagent结束任务时,会通过
_wake_main_agent_for_background_result唤醒主Agent如果Subagent耗时很长,一些聪明的Agent可能会执行
time.sleep()的python代码进行等待,但我们要假设Agent是愚蠢的。事实上,大多数情况下,主Agent会觉得迟迟拿不到结果而试图自己执行,导致同一任务完成两遍。因此引入了一个
wait_for_subagent工具,主Agent需要拿到Subagent结果时,可进行主动的阻塞式等待,避免此类情况发生实现方式
astrbot/core/astr_agent_tool_exec.py_execute_handoff_background()- 执行后台委派_do_handoff_background()- 后台任务执行逻辑_register_subagent_task()- 注册SubAgent任务_handle_subagent_background_result()- 处理SubAgent结果_maybe_wake_main_agent_after_background()- 智能唤醒主Agent。Subagent完成任务时,如果主Agent已经结束运行,才执行_wake_main_agent_for_background_result,否则把结果存到subagent_background_results里执行流程
主Agent状态检测
后台任务完成后,需要决定是否通知用户。关键逻辑:
这依赖
AstrAgentContext.extra字段,在build_main_agent()中注入:主动等待工具
WaitForSubagentTool 轮询逻辑
task_id,获取最新的 pending 任务IDLE→ 返回错误:子代理未在运行任务COMPLETED→ 返回执行结果FAILED→ 返回失败信息RUNNING→ 继续等待后台Subagent任务管理
每个subagent都会储存结果到
subagent_background_results中。同一个subagent可以有多个任务,通过task_id来区分(每次创建任务递增)创建任务
获取结果
4.5 子代理历史记忆
功能描述
受保护的子代理可以保留跨轮对话的历史上下文,实现连续对话能力。若未受保护,且
auto_cleanup_per_turn=true,则历史上下文会在会话结束时连同整个子代理被清理实现方式
SubAgentSession.subagent_historiesmax_subagent_history(默认300条)历史管理机制
runner_messages(Agent运行期间的消息)追加到历史role=system的消息_MAX_TOOL_RESULT_LEN(2000字符)的tool结果截断,附加...[truncated]max_subagent_history时,保留最新的消息Message对象,合并到contexts历史清理
reset_subagent工具主动清除指定子代理的历史4.6 工作目录隔离(软约束)
功能描述
每个子代理可配置独立的工作目录,未指定时默认使用
get_astrbot_temp_path()。实现方式
SubAgentConfig.workdirCreateSubAgentTool._check_path_safety()_build_workdir_prompt()注入的工作目录提示
4.7 行为规范注入
功能描述
子代理自动注入安全模式、输出规范和时间信息等行为约束。
注入内容
角色定义
_build_subagent_system_prompt():# Role Your name is {agent_name}(used for tool calling) {base_instructions}静态行为规范
_build_rule_prompt():动态时间信息
_build_time_prompt():# Current Time 2026-04-14 23:09 (CST)4.8 Skills隔离
功能描述
每个子代理可分配不同的Skills,相互隔离
注入逻辑
SubAgentConfig.skills)SkillManager.list_skills()获取所有可用技能build_skills_prompt()生成提示词build_dynamic_subagent_prompts()注入到子代理的system_prompt4.9 公共上下文
功能描述
当
shared_context_enabled=true时,维护一个所有子代理共享的、实时更新的群聊区域。在SubAgent每次调用LLM之前,公共上下文的内容会被注入到System Prompt中。公共上下文中每条信息的格式
上下文类型
statusmessagesystem公共上下文注入方式
按类型和优先级分组注入到子代理的 System Prompt 中,让Agent可以更清晰地获取共享上下文的信息,并知道哪些需要优先处理。
实现方式
astrbot/core/subagent_manager.pySubAgentManager._build_shared_context_prompt()Prompt结构
容量管理
shared_context_maxlen时,保留最近90%的消息公共上下文特点
4.10 主Agent路由提示
功能描述
当
dynamic_agents.enabled=true时,在主Agent的System Prompt中注入动态SubAgent能力说明,包括动态子代理配额信息、创建指南和生命周期说明。如果dynamic_agents.enabled=false,则使用router_system_prompt实现方式
SubAgentManager.build_task_router_prompt()注入内容
配额耗尽时:仍然显示代理能力说明和生命周期,但告知无法创建新子代理,仍可委派已有的子代理。
4.11 自动清理
功能描述
每轮对话结束后,自动清理非保护的子代理。主Agent也可以根据需要,手动通过
remove_subagent工具清理特定子代理。清理规则
remove_subagent()进行清理(移除配置、handoff工具、历史、后台结果)触发时机
在
internal.py的 agent 结束流程中:4.12 数量限制
功能描述
限制单个会话中子代理的最大数量,防止资源耗尽。
注意:替换已存在的同名子代理不增加计数。
4.13 执行超时控制
功能描述
除了原有的工具调用超时外,为每个子代理设置总的执行超时时间,避免无限等待。
实现方式
SubAgentConfig.execution_timeout(默认1200秒)subagent_orchestrator.execution_timeoutSubAgentManager.get_execution_timeout()超时处理流程
五、工具列表
5.1 主Agent新增可用工具
create_subagentremove_subagentlist_subagentsreset_subagentprotect_subagentunprotect_subagentview_shared_contextsend_shared_context_for_main_agentwait_for_subagent5.2 子代理可用工具
send_shared_contexttools_inherent给出六、修改文件清单
astrbot/core/subagent_manager.pyDynamicSubAgentManager→SubAgentManager,DynamicSubAgentConfig→SubAgentConfig,DynamicSubAgentSession→SubAgentSession;工具名create_dynamic_subagent→create_subagent,remove_dynamic_subagent→remove_subagent,list_dynamic_subagents→list_subagentsastrbot/core/astr_agent_tool_exec.py_execute_handoff()(静态/动态Prompt构建、历史注入、超时控制);_execute_handoff_background()(pending任务创建);新增_do_handoff_background()(结果存储与智能唤醒逻辑);新增多个工具方法(_load_subagent_history、_build_subagent_system_prompt、_save_subagent_history、_register_enhanced_subagent_task等)astrbot/core/astr_main_agent.py_ensure_persona_and_skills()新增_apply_subagent_manager_tools(),仅在enable_dynamic=True时使用。在MainAgentBuildConfig中修改配置字段,在build_main_agent()中注入main_agent_runner到AstrAgentContext.extraastrbot/core/subagent_orchestrator.pyregister_static_subagents_to_manager()方法;静态subagent自动注册到SubAgentManager享受增强功能astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.pyastrbot/core/agent/runners/tool_loop_agent_runner.py_handle_function_tools()中集成动态SubAgent工具查找;新增_resolve_dynamic_subagent_tool()和_maybe_register_dynamic_tool_from_result()方法astrbot/core/config/default.pyastrbot/core/astr_agent_context.pyextra字段的类型改成dict[str, Any],使得可以记录任意类型的extra信息astrbot/core/star/context.pytool_loop_agent()支持runner_messages参数,用于记录SubAgent历史七、使用流程
7.1 静态子代理使用流程
与原版一致,但子代理获得增强
7.2 动态子代理使用流程
7.3 带保护的多轮对话流程
7.4 带公共上下文的协作流程
7.5 后台任务并行处理流程
7.6 后台任务与共享上下文结合
八、API 参考
8.1 SubAgentManager 核心方法
配置与生命周期
configure(max_dynamic_subagent_count, auto_cleanup_per_turn, shared_context_enabled, shared_context_maxlen, max_subagent_history, tools_blacklist, tools_whitelist, execution_timeout)get_execution_timeout() -> floatis_auto_cleanup_per_turn() -> boolis_shared_context_enabled() -> boolregister_blacklisted_tool(tool_name)register_inherent_tool(tool_name)子代理管理
create_subagent(session_id, config, protected=False) -> tupleregister_static_subagent(session_id, handoff_tool, skills, workdir) -> tupleremove_subagent(session_id, agent_name) -> strprotect_subagent(session_id, agent_name)is_protected(session_id, agent_name) -> boolset_subagent_status(session_id, agent_name, status)cleanup_session_turn_end(session_id) -> dictcleanup_session(session_id) -> dictget_handoff_tools_for_session(session_id) -> listget_subagent_tools(session_id, agent_name) -> list | None历史管理
update_subagent_history(session_id, agent_name, current_messages)get_subagent_history(session_id, agent_name) -> listclear_subagent_history(session_id, agent_name) -> strPrompt构建
build_task_router_prompt(session_id) -> strbuild_static_subagent_prompts(session_id, agent_name) -> strbuild_dynamic_subagent_prompts(session_id, agent_name, runtime) -> str后台任务管理
create_pending_subagent_task(session_id, agent_name) -> strget_pending_subagent_tasks(session_id, agent_name) -> list[str]get_latest_task_id(session_id, agent_name) -> str | Nonestore_subagent_result(session_id, agent_name, success, result, task_id, error, execution_time, metadata)get_subagent_result(session_id, agent_name, task_id) -> SubAgentExecutionResult | Nonehas_subagent_result(session_id, agent_name, task_id) -> boolclear_subagent_result(session_id, agent_name, task_id)get_subagent_status(session_id, agent_name) -> strget_all_subagent_status(session_id) -> dict公共上下文管理
add_shared_context(session_id, sender, context_type, content, target)get_shared_context(session_id, filter_by_agent) -> listset_shared_context_enabled(session_id, enabled)cleanup_shared_context_by_agent(session_id, agent_name)clear_shared_context(session_id)8.2 SubAgentExecutionResult 数据结构
8.3 SubAgentConfig 数据结构
8.4 SubAgentSession 数据结构
Screenshots or Test Results / 运行截图或测试结果
测试1:动态Agent创建和保护机制
测试流程:动态建立若干个SubAgent,给其中一些加入保护,另一些不加保护,观察清理情况,以及跨轮对话的记忆
测试1——动态创建子Agent和保护机制.md
测试2:子Agent历史上下文
测试流程:建立一个SubAgent,并使其跨多轮对话,查看其是否有多轮对话的记忆。
测试2——子Agent历史上下文.md
测试3:子Agent共享上下文
测试流程
send_shared_context发送工具向共享上下文中添加内容测试3——子Agent共享上下文.md
测试4:异步与等待
测试流程
设计一个同时包含并行与串行、前台与后台模式的任务,观察工作流程
测试4——异步与等待.md
测试5:超长上下文工作实例
测试流程
让agent阅读一个大型项目的全部代码,并生成细致的介绍文档
测试5——动态处理超长上下文示例.md
Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Introduce a unified SubAgentManager that manages both static and dynamically created sub-agents, adds enhanced subagent runtime features and background task controls, and exposes corresponding configuration, tooling, and dashboard UI to manage them.
New Features:
Enhancements:
Tests: