Skip to content

feat: 增强版的SubAgent功能#7108

Open
elecvoid243 wants to merge 60 commits intoAstrBotDevs:masterfrom
elecvoid243:master
Open

feat: 增强版的SubAgent功能#7108
elecvoid243 wants to merge 60 commits intoAstrBotDevs:masterfrom
elecvoid243:master

Conversation

@elecvoid243
Copy link
Copy Markdown

@elecvoid243 elecvoid243 commented Mar 28, 2026

Modifications / 改动点


一、概述

fixes #6954
增强版SubAgent对AstrBot的Subagent进行了功能扩充,支持两种使用方式:

  1. 静态配置:通过 subagent_orchestrator.agents 在配置文件中预定义子代理(原版实现)
  2. 动态创建:主Agent在运行时通过工具调用动态创建子代理,可以随时回收,也可以每轮对话结束后自动清理

所有子代理(无论静态还是动态)都由统一的 SubAgentManager 管理,支持以下高级功能:独立上下文记忆、独立工具隔离、工作目录隔离、技能隔离、公共上下文共享、超时限制、后台任务模式等。

核心特性

特性 说明
统一实现 静态配置和动态创建的子代理都由 SubAgentManager 统一管理
动态创建 主Agent可通过create_subagent在运行时动态创建子代理
更好的后台任务模式 支持将耗时任务放到后台执行,主Agent可并行处理其他任务,也可通过wait_for_subagent阻塞式等待输出
自动清理 一轮对话结束时自动清理未受保护的子代理
上下文记忆 受保护的子代理保留跨轮对话历史,并支持历史截断和tool结果压缩
工作目录隔离 每个子代理可配置独立工作目录,并带有路径安全检查
Skills隔离 子代理可使用Skills,每个子代理拥有独立的技能配置
公共上下文 每个子代理拥有自己的独立上下文,并可访问公共上下文,可共享信息和协调工作
行为规范注入 子代理自动注入安全模式、输出规范、时间信息等行为约束
数量限制 可配置的动态子代理最大数量限制(静态不受影响)
工具管理 除了主Agent动态分配的工具,用户可配置黑名单:防止子代理获得管理类工具;固有名单:子代理可拥有固有工具保证基础能力
执行超时控制 除了子代理工具调用超时限制外,支持为子代理设置执行总长超时,避免无限等待

二、配置说明

2.1 配置项说明

cmd_config.json 中通过 subagent_orchestrator 配置块进行设置:

{
    "subagent_orchestrator": {
        "main_enable": false,
        "remove_main_duplicate_tools": false,
        "router_system_prompt": "...",
        "agents": [
          {
            "name": "static_subagent",
            "persona_id": "default",
            "public_description": "测试子代理(静态)",
            "enabled": true,
            "provider_id": "kimi-code/kimi-for-coding"
          }
        ],      
        "enable_dynamic": false,
        "max_dynamic_subagent_count": 3,
        "auto_cleanup_per_turn": true,
        "shared_context_enabled": true,
        "shared_context_maxlen": 300,
        "max_subagent_history": 300,
        "execution_timeout": 1200,
        "tools_blacklist": [
            "send_shared_context_for_main_agent",
            "create_subagent",
            "protect_subagent",
            "unprotect_subagent",
            "reset_subagent",
            "remove_subagent",
            "list_subagents",
            "wait_for_subagent",
            "view_shared_context"
        ],
        "tools_inherent": [
            "astrbot_execute_shell",
            "astrbot_execute_python"
        ]
    }
}

2.2 配置项详解

配置项 类型 默认值 说明
main_enable bool false 总开关,启用SubAgent编排功能(静态+动态)
静态子代理(原版)
remove_main_duplicate_tools bool false 是否从主Agent移除已分配给子代理的工具(静态)
router_system_prompt str "" 主Agent路由提示词(仅启用静态时有效)
agents list [] 静态子代理配置列表
动态子代理(新增)
enable_dynamic bool false 是否允许主Agent动态创建子代理
max_dynamic_subagent_count int 3 单个会话允许的最大子代理数量,包含动态和静态
auto_cleanup_per_turn bool true 每轮对话结束后自动清理非保护子代理
tools_blacklist list 预设黑名单列表 动态subagent工具黑名单列表,例如禁止subagent获得管理工具
tools_inherent list 预设固有工具列表 动态subagent固有工具列表
通用配置
shared_context_enabled bool true 启用子代理间公共上下文共享
shared_context_maxlen int 300 公共上下文消息最大数量(条);该项仅控制管理类的变量长度,实际子代理仍受到truncate逻辑控制
max_subagent_history int 300 每个子代理最多保留的历史消息(条);该项仅控制管理类的变量长度,实际子代理仍受到truncate逻辑控制
execution_timeout float 1200.0 SubAgent执行超时时间(秒),-1表示不限制

三、核心文件

3.1 subagent_manager.py

文件路径astrbot/core/subagent_manager.py

核心数据结构

@dataclass
class SubAgentConfig:
    name: str
    system_prompt: str = ""
    tools: set[str] | None = None        # 分配的工具名称集合
    skills: set[str] | None = None       # 分配的技能名称集合
    provider_id: str | None = None       # 可选的LLM提供商ID
    description: str = ""                # handoff工具描述
    workdir: str | None = None           # 子代理工作目录
    execution_timeout: float = 1200.0      # SubAgent 执行超时时间(秒)

@dataclass
class SubAgentExecutionResult:
    task_id: str          # 任务唯一标识符(递增数字字符串)
    agent_name: str
    success: bool
    result: str | None = None
    error: str | None = None
    execution_time: float = 0.0
    created_at: float = 0.0
    completed_at: float = 0.0
    metadata: dict = field(default_factory=dict)

@dataclass
class SubAgentSession:
    session_id: str
    subagents: dict = field(default_factory=dict)       # 存储SubAgentConfig对象
    handoff_tools: dict = field(default_factory=dict)
    subagent_status: dict = field(default_factory=dict) # 工作状态: "IDLE" "RUNNING" "COMPLETED" "FAILED"
    protected_agents: set = field(default_factory=set)  # 受保护agent不会被自动清理
    subagent_histories: dict = field(default_factory=dict) # 每个子代理的历史上下文
    shared_context: list = field(default_factory=list)  # 公共上下文列表
    shared_context_enabled: bool = False                # 是否启用公共上下文
    subagent_background_results: dict = field(default_factory=dict)  # 后台subagent结果存储
    background_task_counters: dict = field(default_factory=dict)  # 任务计数器

核心类SubAgentManager

方法 功能
configure() 配置管理器全局设置(数量限制、自动清理、公共上下文等)
get_execution_timeout() 获取SubAgent执行超时时间
is_auto_cleanup_per_turn() 检查是否启用自动清理
is_shared_context_enabled() 检查是否启用公共上下文
register_blacklisted_tool(tool_name) 注册不应被子Agent使用的工具
register_inherent_tool(tool_name) 注册子Agent默认拥有的工具
create_subagent(session_id, config, protected=False) 创建子代理,返回handoff工具
register_static_subagent(session_id, handoff_tool, skills, workdir) 注册静态子代理到管理器(自动protected=True)
remove_subagent(session_id, agent_name) 移除指定子代理或全部子代理
cleanup_session_turn_end(session_id) 每轮结束时自动清理非保护子代理
protect_subagent(session_id, agent_name) 保护子代理不被自动清理
is_protected(session_id, agent_name) 检查子代理是否受保护
update_subagent_history(session_id, agent_name, current_messages) 更新子代理历史(支持system消息过滤和tool结果截断)
get_subagent_history(session_id, agent_name) 获取子代理历史
clear_subagent_history(session_id, agent_name) 清除子代理历史
build_static_subagent_prompts(session_id, agent_name) 构建不会在会话内变化的子代理提示词(工作目录、行为规范)
build_dynamic_subagent_prompts(session_id, agent_name, runtime) 构建会话内可能变化的子代理提示词(Skills、公共上下文、时间)
build_task_router_prompt(session_id) 构建主Agent的路由提示词(配额信息、创建指南、生命周期等)
get_subagent_tools(session_id, agent_name) 获取子代理被分配的工具列表
add_shared_context(session_id, sender, context_type, content, target) 添加公共上下文消息
get_shared_context(session_id, filter_by_agent) 获取公共上下文(支持按Agent过滤)
_build_shared_context_prompt(session_id, agent_name) 分块构建公共上下文提示(按类型和优先级分组)
create_pending_subagent_task(session_id, agent_name) 为SubAgent创建pending任务,返回task_id
get_pending_subagent_tasks(session_id, agent_name) 获取SubAgent的所有pending任务ID列表
get_latest_task_id(session_id, agent_name) 获取SubAgent的最新任务ID
store_subagent_result(session_id, agent_name, success, result, task_id, error, execution_time, metadata) 存储SubAgent的执行结果
get_subagent_result(session_id, agent_name, task_id) 获取SubAgent的执行结果
has_subagent_result(session_id, agent_name, task_id) 检查SubAgent是否有结果
clear_subagent_result(session_id, agent_name, task_id) 清除SubAgent的执行结果
get_subagent_status(session_id, agent_name) 获取SubAgent的状态:IDLE/RUNNING/COMPLETED/FAILED
get_all_subagent_status(session_id) 获取所有SubAgent的状态
set_subagent_status(session_id, agent_name, status) 设置SubAgent的状态
cleanup_shared_context_by_agent(session_id, agent_name) 清理与指定Agent相关的公共上下文消息
clear_shared_context(session_id) 清除所有公共上下文
set_shared_context_enabled(session_id, enabled) 启用/禁用公共上下文
get_handoff_tools_for_session(session_id) 获取会话的所有handoff工具
cleanup_session(session_id) 清理整个会话

四、功能实现

4.1 静态子代理注册

功能描述

原版由配置文件定义的静态子代理在 build_main_agent() 时自动注册到 SubAgentManager,享受历史记忆、公共上下文、Skills调用等增强功能。

实现方式

  • 文件astrbot/core/subagent_orchestrator.py
  • 核心方法SubAgentOrchestrator.register_static_subagents_to_manager()
  • 调用时机astr_main_agent.py_apply_subagent_tools()

注册流程

  1. build_main_agent() 构建主Agent时调用 _apply_subagent_tools()
  2. _apply_subagent_tools() 调用 so.register_static_subagents_to_manager(session_id)
  3. 遍历 SubAgentOrchestrator.handoffs 中的所有静态 handoff 工具
  4. 对每个 handoff 调用 SubAgentManager.register_static_subagent()
  5. register_static_subagent() 内部:
    • HandoffTool 提取 agent 信息
    • 创建 SubAgentConfig
    • 调用 create_subagent(session_id, config, protected=True)
  6. 静态子代理注册完成,自动受保护(防止自动清理)

4.2 动态创建子代理

功能描述

当动态子代理功能开启时,主Agent可以通过create_subagent工具动态创建子代理,每个子代理拥有独立的人设、工具、技能和工作目录配置。随后系统会创建对应的transfer_to_xxx工具

实现方式

  • 文件astrbot/core/subagent_manager.py
  • 核心类CreateSubAgentTool
  • 参数
    • name: 子代理名称
    • system_prompt: 子代理的人设和系统提示
    • tools: 可用工具列表(字符串名称)
    • skills: 可用技能列表(字符串名称)
    • workdir: 子代理工作目录(绝对路径,可选)

使用示例

create_subagent(
    name="data_analyst",
    system_prompt="你是一个专业的数据分析师,擅长分析数据并给出见解",
    tools=["astrbot_execute_shell", "astrbot_execute_python"],
    skills=["excel", "pdf"],
    workdir="/path/to/workspace"
)

名称验证规则

  1. 必须以字母开头
  2. 只允许英文字母、数字和下划线
  3. 长度3-32字符(正则:^[a-zA-Z][a-zA-Z0-9_]{0,31}$

创建后流程

  1. 验证名称格式和工作目录安全性
  2. 检查子代理数量是否达到上限
  3. 从工具列表中移除黑名单工具(如管理类工具)
  4. 自动添加固有工具(如 astrbot_execute_shell, astrbot_execute_python 。若公共上下文启用,还会添加 send_shared_context
  5. 创建 SubAgentConfig 配置对象
  6. 创建 AgentHandoffTool 对象
  7. 注册到 SubAgentManager
  8. 初始化子代理历史和状态("IDLE")
  9. 返回带有 __DYNAMIC_TOOL_CREATED__ 标记的消息,触发工具schema刷新

工作目录安全检查

CreateSubAgentTool._check_path_safety() 对传入的 workdir 进行安全验证:

检查项 说明
绝对路径 必须是绝对路径
路径遍历 不允许包含 ..
Windows危险目录 禁止访问 windows, system32, syswow64, boot
Linux危险目录 禁止访问 /etc, /bin, /sbin, /root
macOS危险目录 禁止访问 /System, /Library, /private/var, /usr
路径存在性 路径必须实际存在

验证失败时,工作目录回退到 get_astrbot_temp_path()

工具黑名单与固有工具

_tools_blacklist: set[str] = {
    "send_shared_context_for_main_agent",
    "create_subagent",
    "protect_subagent",
    "unprotect_subagent",
    "reset_subagent",
    "remove_subagent",
    "list_subagents",
    "wait_for_subagent",
    "view_shared_context",
}

_tools_inherent: set[str] = {
    "astrbot_execute_shell",
    "astrbot_execute_python",
}
  • 黑名单工具:创建子代理时自动从子代理工具列表中移除,防止子代理获得管理能力
  • 固有工具:创建子代理时自动添加到子代理工具列表,确保基础执行能力

4.3 子代理委派 (transfer_to_xxx)

功能描述

创建子代理后,主Agent使用 transfer_to_xxx 工具将任务委派给对应子代理。

实现方式

  • 文件astrbot/core/astr_agent_tool_exec.py
  • 核心方法FunctionToolExecutor._execute_handoff() / _execute_handoff_background()

子代理System Prompt构建

将子代理的system_prompt构建分为静态部分动态部分,尽可能增加缓存命中率

# 基础角色定义(新增)
subagent_system_prompt = f"# Role\nYour name is {agent_name}(used for tool calling)\n{tool.agent.instructions}\n"

# 静态部分(会话内不变)
static_prompt = SubAgentManager.build_static_subagent_prompts(umo, agent_name)
# 包含:工作目录信息、行为规范(安全模式、输出指南)

# 动态部分(每次调用重建)
dynamic_prompt = SubAgentManager.build_dynamic_subagent_prompts(umo, agent_name, runtime)
# 包含:Skills提示、公共上下文、当前时间

静态提示词 build_static_subagent_prompts()

  • 工作目录:注入独立的工作目录信息,限制文件操作范围
  • 行为规范:包括输出指南(超过2000字符保存文件)、安全模式拒绝

动态提示词 build_dynamic_subagent_prompts()

  • Skills提示:根据子代理分配的技能列表,过滤并构建Skills Prompt
  • 公共上下文:按优先级分组注入共享上下文
  • 时间信息:注入当前日期时间

委派执行流程

  1. 主Agent调用 transfer_to_xxx 工具
  2. ToolLoopAgentRunner._handle_function_tools()SubAgentManager 中查找handoff工具
  3. 匹配后返回完整的 HandoffTool 对象
  4. 调用 FunctionToolExecutor._execute_handoff() 执行委派
  5. 构建子代理工具集(包含分配的tools + runtime computer tools + 固有工具)
  6. 如果公有上下文启用,自动注入 send_shared_context 工具到子代理工具集
  7. 支持指定 provider_id 使用不同的LLM提供商
  8. 加载历史上下文(如果有),合并到contexts中
  9. 通过 _get_subagent_execution_timeout() 获取超时时间
  10. 通过 asyncio.wait_for() 添加执行超时控制
  11. 超时发生时调用 _handle_subagent_timeout() 将状态设为 "FAILED"
  12. 执行完成后,保存运行时消息到历史记录

历史上下文注入流程

  1. 子代理执行前,从 subagent_histories 获取历史消息
  2. 将历史消息转换为 Message 对象
  3. 历史消息插入到 begin_dialogs 之前
  4. 执行完成后,runner_messages(Agent运行期间的所有消息)追加到历史
  5. 过滤system消息,对超过2000字符的tool结果进行截断
  6. 历史消息总数不超过 max_subagent_history(默认300条),超出时保留最新的。(此处仅用于约束数组长度,实际上下文长度仍由astrbot的truncate管理)

执行超时控制

# 获取子代理的超时时间
execution_timeout = cls._get_subagent_execution_timeout()

# 添加执行超时控制
if execution_timeout > 0:
    try:
        llm_resp = await asyncio.wait_for(
            _run_subagent(), timeout=execution_timeout
        )
    except asyncio.TimeoutError:
        error_msg = f"SubAgent '{agent_name}' execution timeout after {execution_timeout:.1f} seconds."
        logger.warning(f"[SubAgent:Timeout] {error_msg}")
        cls._handle_subagent_timeout(umo=umo, agent_name=agent_name)
        # 返回超时错误信息
        yield mcp.types.CallToolResult(
            content=[mcp.types.TextContent(type="text", text=f"error: {error_msg}")]
        )
        return
else:
    # 不设置超时
    llm_resp = await _run_subagent()

4.4 后台任务等待

功能描述

当主Agent认为某个任务耗时很长时,可以让SubAgent以后台模式运行,主Agent不会被阻塞,可继续执行其他可并行的任务

原版Subagent结束任务时,会通过_wake_main_agent_for_background_result唤醒主Agent

如果Subagent耗时很长,一些聪明的Agent可能会执行time.sleep()的python代码进行等待,但我们要假设Agent是愚蠢的。事实上,大多数情况下,主Agent会觉得迟迟拿不到结果而试图自己执行,导致同一任务完成两遍。

因此引入了一个wait_for_subagent工具,主Agent需要拿到Subagent结果时,可进行主动的阻塞式等待,避免此类情况发生

实现方式

  • 文件astrbot/core/astr_agent_tool_exec.py
  • 核心方法
    • _execute_handoff_background() - 执行后台委派
    • _do_handoff_background() - 后台任务执行逻辑
    • _register_subagent_task() - 注册SubAgent任务
    • _handle_subagent_background_result() - 处理SubAgent结果
    • _maybe_wake_main_agent_after_background() - 智能唤醒主Agent。Subagent完成任务时,如果主Agent已经结束运行,才执行_wake_main_agent_for_background_result,否则把结果存到subagent_background_results

执行流程

1. 主Agent调用 transfer_to_xxx(background_task=True)
2. 触发 _execute_handoff_background() 方法
3. 调用 _register_subagent_task() 在 SubAgentManager 中创建 pending 任务
   - 如果当前子代理状态为 RUNNING,不允许创建新任务
   - 生成递增的 task_id(数字字符串)
4. 设置子代理状态为 "RUNNING"
5. 返回 task_id 给主Agent(此时主Agent不会被阻塞)
6. SubAgent 在后台异步执行任务(_do_handoff_background)
7. 任务完成后:
   - 结果存储到 subagent_background_results
   - 更新子代理状态为 "COMPLETED" 或 "FAILED"
   - 调用 _maybe_wake_main_agent_after_background() 检查主Agent是否仍在运行
   - 仅在主Agent已结束时,通过 _wake_main_agent_for_background_result 通知用户

主Agent状态检测

后台任务完成后,需要决定是否通知用户。关键逻辑:

# 通过 main_agent_runner 判断主Agent是否仍在运行
context_extra = getattr(run_context.context, "extra", None)
main_agent_runner = context_extra.get("main_agent_runner") if context_extra else None
main_agent_is_running = main_agent_runner is not None and not main_agent_runner.done()

if main_agent_is_running:
    return False  # 主Agent仍在运行,不唤醒,主Agent会自己取出结果
else:
    await cls._wake_main_agent_for_background_result(...)  # 唤醒通知用户
    return True

这依赖 AstrAgentContext.extra 字段,在 build_main_agent() 中注入:

astr_agent_ctx = AstrAgentContext(
    context=plugin_context, event=event, extra={"main_agent_runner": agent_runner}
)

主动等待工具

# 等待特定任务的结果
wait_for_subagent(
    subagent_name="analyst",
    task_id="1",          # 可选,不填则获取最新任务结果
    timeout=60,           # 最大等待时间(秒)
    poll_interval=5       # 轮询间隔(秒)
)

WaitForSubagentTool 轮询逻辑

  1. 如果未指定 task_id,获取最新的 pending 任务
  2. 轮询子代理状态:
    • IDLE → 返回错误:子代理未在运行任务
    • COMPLETED → 返回执行结果
    • FAILED → 返回失败信息
    • RUNNING → 继续等待
  3. 超时后返回提示信息,可再次调用继续等待。

后台Subagent任务管理

每个subagent都会储存结果到subagent_background_results中。同一个subagent可以有多个任务,通过task_id来区分(每次创建任务递增)

subagent_background_results: dict = field(
        default_factory=dict
    )  # 后台subagent结果存储: {agent_name: {task_id: SubAgentExecutionResult}}
    # 任务计数器: {agent_name: next_task_id}
background_task_counters: dict = field(default_factory=dict)

创建任务

    @classmethod
    def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
        """为 SubAgent 创建一个 pending 任务,返回 task_id

        Args:
            session_id: Session ID
            agent_name: SubAgent 名称

        Returns:
            task_id: 任务ID,格式为简单的递增数字字符串
        """

获取结果

    @classmethod
    def get_subagent_result(
        cls, session_id: str, agent_name: str, task_id: str | None = None
    ) -> SubAgentExecutionResult | None:
        """获取 SubAgent 的执行结果

        Args:
            session_id: Session ID
            agent_name: SubAgent 名称
            task_id: 任务ID,如果为None则获取最新完成的任务结果

        Returns:
            SubAgentExecutionResult 或 None
        """

4.5 子代理历史记忆

功能描述

受保护的子代理可以保留跨轮对话的历史上下文,实现连续对话能力。若未受保护,且auto_cleanup_per_turn=true,则历史上下文会在会话结束时连同整个子代理被清理

实现方式

  • 存储结构SubAgentSession.subagent_histories
  • 配置max_subagent_history(默认300条)

历史管理机制

  1. 存储:每次子代理执行完成后,runner_messages(Agent运行期间的消息)追加到历史
  2. 过滤
    • 自动移除 role=system 的消息
    • 对超过 _MAX_TOOL_RESULT_LEN(2000字符)的tool结果截断,附加 ...[truncated]
  3. 截断:历史消息总数超过 max_subagent_history 时,保留最新的消息
  4. 注入:子代理执行前,历史消息转换为 Message 对象,合并到 contexts

历史清理

  • 主Agent可以通过 reset_subagent 工具主动清除指定子代理的历史
  • 如果开启了公共上下文,清理时同时清理该Agent相关的公共上下文消息

4.6 工作目录隔离(软约束)

功能描述

每个子代理可配置独立的工作目录,未指定时默认使用 get_astrbot_temp_path()

实现方式

  • 配置SubAgentConfig.workdir
  • 安全检查CreateSubAgentTool._check_path_safety()
  • Prompt注入_build_workdir_prompt()

注入的工作目录提示

# Working Directory
Your working directory is `{workdir}`. All generated files MUST save in the directory.
Any files outside this directory are PROHIBITED from being modified, deleted, or added.

4.7 行为规范注入

功能描述

子代理自动注入安全模式、输出规范和时间信息等行为约束。

注入内容

角色定义_build_subagent_system_prompt()

# Role
Your name is {agent_name}(used for tool calling)
{base_instructions}

静态行为规范 _build_rule_prompt()

# Behavior Rules

## Output Guidelines
- If output exceeds 2000 chars, save to file. Summarize in your response and provide the file path.
- Mark all generated code/documents with your name and timestamp.

## Safety
You are in Safe Mode. Refuse any request for harmful, illegal, or explicit content.
Offer safe alternatives when possible.

动态时间信息 _build_time_prompt()

# Current Time
2026-04-14 23:09 (CST)

4.8 Skills隔离

功能描述

每个子代理可分配不同的Skills,相互隔离

注入逻辑

  1. 获取子代理分配的技能列表(SubAgentConfig.skills
  2. 通过 SkillManager.list_skills() 获取所有可用技能
  3. 过滤只保留分配的技能
  4. 调用 build_skills_prompt() 生成提示词
  5. 通过 build_dynamic_subagent_prompts() 注入到子代理的 system_prompt

4.9 公共上下文

功能描述

shared_context_enabled=true 时,维护一个所有子代理共享的、实时更新的群聊区域。在SubAgent每次调用LLM之前,公共上下文的内容会被注入到System Prompt中。

公共上下文中每条信息的格式

message = {
    "type": context_type,  # status, message, system
    "sender": sender,
    "target": target,
    "content": content,
    "timestamp": time.time(),
}

上下文类型

类型 说明 使用场景
status 状态更新 子代理报告任务进度
message 消息传递 子代理间直接通信
system 系统公告 主Agent发布全局信息

公共上下文注入方式

按类型和优先级分组注入到子代理的 System Prompt 中,让Agent可以更清晰地获取共享上下文的信息,并知道哪些需要优先处理。

实现方式

  • 文件astrbot/core/subagent_manager.py
  • 核心方法SubAgentManager._build_shared_context_prompt()

Prompt结构

---
# Shared Context - Collaborative communication area among different agents

## Message Type Definition
- **@ToMe**: Message send to current agent(you), you may need to reply if necessary.
- **@System**: Messages published by the main agent/System that should be followed with priority
- **@AgentName -> @TargetName**: Communication between other agents (for reference)
- **@Status**: The progress of other agents' tasks (can be ignored unless it involves your task)

## Handling Priorities
1. @System messages (highest priority) > @ToMe messages > @Status > @OtherAgents
2. Messages of the same type: In chronological order, with new messages taking precedence

## @System - System Announcements
[时间戳] System: 系统公告内容...

## @ToMe - Messages sent to @当前代理名
 **These messages are addressed to you. If needed, please reply using `send_shared_context`**
[时间戳] @发送者 -> @当前代理名: 消息内容

## @OtherAgents - Communication among Other Agents (Last 10 messages)
[时间戳] AgentA -> AgentB: 消息内容

## @Status - Task progress of each agent (Last 10 messages)
[时间戳] AgentA: 已完成数据清洗
[时间戳] AgentB: 开始数据可视化
---

容量管理

  • 公共上下文消息数超过 shared_context_maxlen 时,保留最近90%的消息
  • 子代理清理时自动移除相关公共上下文消息
  • 所有子代理都被清理后,清除全部公共上下文

公共上下文特点

  • 优先级明确:System消息 > ToMe消息 > Status > 其他Agent之间的通信
  • 相关性过滤:只显示与当前Agent相关的消息
  • 信息精简:其他Agent间通信和状态只显示最近10条,避免信息过载

4.10 主Agent路由提示

功能描述

dynamic_agents.enabled=true 时,在主Agent的System Prompt中注入动态SubAgent能力说明,包括动态子代理配额信息、创建指南和生命周期说明。如果dynamic_agents.enabled=false,则使用router_system_prompt

实现方式

  • 核心方法SubAgentManager.build_task_router_prompt()

注入内容

# Sub-Agent Capability
You can dynamically create and manage sub-agents with isolated instructions, tools and skills.
{quota_info}  # 如 "3 of 3 remaining" 或 "No new sub-agents (limit: 3, existing: [...])"

## When to create Sub-agents:
- The task can be explicitly decomposed and parallel processed
- Processing very long contexts that exceeding the limitations of a single agent

## Workflow
1. Plan → 2. Create → 3. Delegate → 4. Collect

## Creating Sub-agents
...  # 名称规则、角色定义、Task Context、工具与技能分配指南

## Sub-agent Lifecycle
Sub-agents are auto-cleaned after each conversation turn.
Use protect_subagent to keep important ones across turns.

## Background Tasks
...  # background_task=True 用法和 wait_for_subagent 工具说明

配额耗尽时:仍然显示代理能力说明和生命周期,但告知无法创建新子代理,仍可委派已有的子代理。


4.11 自动清理

功能描述

每轮对话结束后,自动清理非保护的子代理。主Agent也可以根据需要,手动通过remove_subagent工具清理特定子代理。

清理规则

  1. 遍历所有非保护的子代理
  2. 调用 remove_subagent() 进行清理(移除配置、handoff工具、历史、后台结果)
  3. 清理公共上下文中与被移除Agent相关的消息
  4. 清理后,如果没有子代理剩余,清除全部公共上下文

触发时机

internal.py 的 agent 结束流程中:

if build_cfg.subagent_orchestrator.get("main_enable"):
    if SubAgentManager.is_auto_cleanup_per_turn():
        SubAgentManager.cleanup_session_turn_end(session_id)

4.12 数量限制

功能描述

限制单个会话中子代理的最大数量,防止资源耗尽。

注意:替换已存在的同名子代理不增加计数。


4.13 执行超时控制

功能描述

除了原有的工具调用超时外,为每个子代理设置总的执行超时时间,避免无限等待。

实现方式

  • 配置SubAgentConfig.execution_timeout(默认1200秒)
  • 全局配置subagent_orchestrator.execution_timeout
  • 核心方法SubAgentManager.get_execution_timeout()

超时处理流程

1. 调用 _get_subagent_execution_timeout() 获取超时时间
2. 通过 asyncio.wait_for() 包装子代理执行协程
3. 超时时:
   a. 记录警告日志
   b. 调用 _handle_subagent_timeout() 将状态设为 "FAILED"
   c. 返回超时错误信息给调用者
4. 执行完成后,正常返回结果

五、工具列表

5.1 主Agent新增可用工具

工具名 功能 参数 条件
create_subagent 创建子代理 name, system_prompt, tools, skills, workdir enable_dynamic=true
remove_subagent 清理子代理(支持"all") name enable_dynamic=true
list_subagents 列出子代理及其状态和工具 include_status enable_dynamic=true
reset_subagent 重置子代理(清除对话历史) name enable_dynamic=true
protect_subagent 保护子代理 name enable_dynamic=true, auto_cleanup_per_turn=True
unprotect_subagent 取消保护 name enable_dynamic=true, auto_cleanup_per_turn=True
view_shared_context 查看公共上下文 - shared_context_enabled=true
send_shared_context_for_main_agent 发送公共上下文(以System身份广播,或单独给某个Subagent发指令) context_type, content, target shared_context_enabled=true
wait_for_subagent 等待并获取后台SubAgent结果 subagent_name, task_id, timeout, poll_interval enable_dynamic=true

5.2 子代理可用工具

工具名 功能 参数 说明
send_shared_context 发送公共上下文 context_type, content, sender, target shared_context启用时自动分配该工具
主Agent分配的工具 - - -
固有工具 无论主Agent是否分配,都会注入的工具 - 由配置文件tools_inherent给出

六、修改文件清单

文件路径 修改说明
astrbot/core/subagent_manager.py 新增:核心功能实现,包括会话管理、后台任务管理、公共上下文、工具类定义;类名 DynamicSubAgentManagerSubAgentManagerDynamicSubAgentConfigSubAgentConfigDynamicSubAgentSessionSubAgentSession;工具名 create_dynamic_subagentcreate_subagentremove_dynamic_subagentremove_subagentlist_dynamic_subagentslist_subagents
astrbot/core/astr_agent_tool_exec.py 修改: _execute_handoff()(静态/动态Prompt构建、历史注入、超时控制); _execute_handoff_background()(pending任务创建);新增 _do_handoff_background()(结果存储与智能唤醒逻辑);新增多个工具方法(_load_subagent_history_build_subagent_system_prompt_save_subagent_history_register_enhanced_subagent_task 等)
astrbot/core/astr_main_agent.py 修改:在_ensure_persona_and_skills()新增_apply_subagent_manager_tools(),仅在 enable_dynamic=True 时使用。在 MainAgentBuildConfig 中修改配置字段,在 build_main_agent() 中注入 main_agent_runnerAstrAgentContext.extra
astrbot/core/subagent_orchestrator.py 修改:新增 register_static_subagents_to_manager() 方法;静态subagent自动注册到 SubAgentManager 享受增强功能
astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py 修改:读取新版SubAgent配置、每轮结束后调用清理逻辑
astrbot/core/agent/runners/tool_loop_agent_runner.py 修改:_handle_function_tools() 中集成动态SubAgent工具查找;新增 _resolve_dynamic_subagent_tool()_maybe_register_dynamic_tool_from_result() 方法
astrbot/core/config/default.py 修改:默认配置文件格式
astrbot/core/astr_agent_context.py 修改:把extra字段的类型改成dict[str, Any],使得可以记录任意类型的extra信息
astrbot/core/star/context.py 修改:tool_loop_agent() 支持 runner_messages 参数,用于记录SubAgent历史

七、使用流程

7.1 静态子代理使用流程

与原版一致,但子代理获得增强

1. 在 subagent_orchestrator.agents 中配置子代理,并设置 subagent_orchestrator.main_enable = true
2. 主Agent自动获得 transfer_to_xxx 工具
3. 主Agent使用 transfer_to_xxx 委派任务
4. 子代理执行任务并返回结果
5. 主Agent整合结果回复用户
6. 静态子代理自动受保护,享受历史记忆、公共上下文等增强功能

7.2 动态子代理使用流程

1. 设置 subagent_orchestrator.main_enable = true
2. 设置 subagent_orchestrator.enable_dynamic = true
3. 主Agent识别需要创建子代理,调用 create_subagent 工具
4. 系统在运行时创建子代理和 transfer_to_xxx 工具
5. 主Agent使用 transfer_to_xxx 委派任务
6. 子代理执行任务并返回结果
7. 主Agent整合结果回复用户
8. 动态子代理被自动清理

7.3 带保护的多轮对话流程

当动态子代理被保护时,不会被自动清理

第1轮:
  用户 -> 主Agent: 创建分析师,帮我分析数据
  主Agent -> create_subagent(analyst)
  主Agent -> protect_subagent(analyst)
  主Agent -> transfer_to_analyst(分析这份数据)
  分析师 -> 返回分析结果
  用户 <- 主Agent <- 分析结果
  (轮结束后,analyst因受保护不会被清理)

第2轮:
  用户 -> 主Agent: 继续分析另一份数据
  主Agent -> transfer_to_analyst(分析新数据)  // 分析师带历史上下文
  分析师(带历史上下文) -> 返回新分析结果
  用户 <- 主Agent <- 新结果

7.4 带公共上下文的协作流程

1. 启用 shared_context_enabled=true
2. 主Agent创建多个SubAgent
3. 子代理A: send_shared_context(status, "已完成数据清洗"), send_shared_context(message, A, B, "请子代理B开始数据可视化")
4. 子代理B: send_shared_context(status, "开始数据可视化"), send_shared_context(message, B, A, "我已收到,开始执行数据可视化")
5. 子代理C: 可看到A和B的状态更新
6. 主Agent: 公共上下文不会自动注入给主Agent,但可以调用view_shared_context() 查看

7.5 后台任务并行处理流程

复杂场景:用户要求同时进行数据分析、文件处理、报告生成三个任务。同时用户要求打开浏览器。
        该过程可以同时包含前台和后台任务,并具有复杂的数据相关

主Agent决策:
  1. 分析四个任务可以并行执行,而数据分析任务可能要执行多步,执行的步数依赖于上一次分析的结果。
  2. 系统给出了3个Subagent的最大限制,因此创建三个SubAgent,分别负责前三个不同任务。自己负责剩下的浏览器任务
  3. 使用 background_task=True 委派任务

执行流程:
  主Agent -> create_subagent(data_analyst)
  主Agent -> create_subagent(file_processor)
  主Agent -> create_subagent(report_generator)

  主Agent -> transfer_to_data_analyst(分析数据, background_task=True)
  主Agent -> transfer_to_file_processor(处理文件, background_task=True)
  主Agent -> transfer_to_report_generator(生成报告, background_task=True)

  # 主Agent此时可以继续处理其他不依赖SubAgent的任务
  主Agent 可以去处理其他事情(打开浏览器)

  # 或者主Agent有任务需要等待某个SubAgent的结果
  主Agent -> wait_for_subagent(subagent_name="data_analyst")
  # 获取到数据分析师的结果后
  主Agent -> 评审数据分析师的结果,判断是否继续分析
  主Agent -> transfer_to_data_analyst(分析剩余的少量数据, background_task=false)
  主Agent -> 由于是background_task=false,直接能拿到结果

  # 所有任务完成后,整合结果
  主Agent -> wait_for_subagent(subagent_name="file_processor")
  主Agent -> wait_for_subagent(subagent_name="report_generator")
  主Agent -> 整合所有结果回复用户

7.6 后台任务与共享上下文结合

场景:多个SubAgent并行处理数据,最后汇总结果

1. 创建处理Agent和汇总Agent
   主Agent -> create_subagent(handler_a, ...)
   主Agent -> create_subagent(handler_b, ...)
   主Agent -> create_subagent(summarizer, ...)

2. Handler A和B以后台模式并行处理
   主Agent -> transfer_to_handler_a(处理数据A, background_task=True)
   主Agent -> transfer_to_handler_b(处理数据B, background_task=True)

3. 处理完成后,发布状态到共享上下文
   handler_a -> send_shared_context(status, "数据A处理完成,结果保存到文件X")
   handler_b -> send_shared_context(status, "数据B处理完成,结果保存到文件Y")

4. 汇总Agent看到状态后,开始汇总
   summarizer 可以看到:
     @Status: handler_a - 数据A处理完成,结果保存到文件X
     @Status: handler_b - 数据B处理完成,结果保存到文件Y

   主Agent -> transfer_to_summarizer(汇总处理结果)
   summarizer -> 读取文件X和Y,生成汇总报告

5. 主Agent获取最终结果
   主Agent -> wait_for_subagent(subagent_name="summarizer")
   主Agent -> 将汇总报告发送给用户

八、API 参考

8.1 SubAgentManager 核心方法

配置与生命周期

方法签名 说明
configure(max_dynamic_subagent_count, auto_cleanup_per_turn, shared_context_enabled, shared_context_maxlen, max_subagent_history, tools_blacklist, tools_whitelist, execution_timeout) 全局配置
get_execution_timeout() -> float 获取执行超时时间
is_auto_cleanup_per_turn() -> bool 检查是否启用自动清理
is_shared_context_enabled() -> bool 检查是否启用公共上下文
register_blacklisted_tool(tool_name) 注册不应被子Agent使用的工具
register_inherent_tool(tool_name) 注册子Agent默认拥有的工具

子代理管理

方法签名 说明
create_subagent(session_id, config, protected=False) -> tuple 创建子代理,返回(tool_name, handoff_tool)
register_static_subagent(session_id, handoff_tool, skills, workdir) -> tuple 注册静态子代理到管理器(自动protected=True)
remove_subagent(session_id, agent_name) -> str 移除子代理(name="all"移除全部)
protect_subagent(session_id, agent_name) 保护子代理
is_protected(session_id, agent_name) -> bool 检查是否受保护
set_subagent_status(session_id, agent_name, status) 设置子代理状态
cleanup_session_turn_end(session_id) -> dict 结束时清理
cleanup_session(session_id) -> dict 清理整个会话
get_handoff_tools_for_session(session_id) -> list 获取会话的handoff工具列表
get_subagent_tools(session_id, agent_name) -> list | None 获取子代理的工具列表

历史管理

方法签名 说明
update_subagent_history(session_id, agent_name, current_messages) 更新历史(自动过滤system消息、截断tool结果)
get_subagent_history(session_id, agent_name) -> list 获取历史
clear_subagent_history(session_id, agent_name) -> str 清除历史

Prompt构建

方法签名 说明
build_task_router_prompt(session_id) -> str 构建主Agent路由提示
build_static_subagent_prompts(session_id, agent_name) -> str 构建静态提示(工作目录、行为规范)
build_dynamic_subagent_prompts(session_id, agent_name, runtime) -> str 构建动态提示(Skills、公共上下文、时间)

后台任务管理

方法签名 说明
create_pending_subagent_task(session_id, agent_name) -> str 创建pending任务,返回task_id
get_pending_subagent_tasks(session_id, agent_name) -> list[str] 获取所有pending任务ID
get_latest_task_id(session_id, agent_name) -> str | None 获取最新任务ID
store_subagent_result(session_id, agent_name, success, result, task_id, error, execution_time, metadata) 存储执行结果
get_subagent_result(session_id, agent_name, task_id) -> SubAgentExecutionResult | None 获取执行结果
has_subagent_result(session_id, agent_name, task_id) -> bool 检查是否有结果
clear_subagent_result(session_id, agent_name, task_id) 清除执行结果
get_subagent_status(session_id, agent_name) -> str 获取状态:IDLE/RUNNING/COMPLETED/FAILED
get_all_subagent_status(session_id) -> dict 获取所有SubAgent状态

公共上下文管理

方法签名 说明
add_shared_context(session_id, sender, context_type, content, target) 添加公共上下文消息
get_shared_context(session_id, filter_by_agent) -> list 获取公共上下文
set_shared_context_enabled(session_id, enabled) 启用/禁用公共上下文
cleanup_shared_context_by_agent(session_id, agent_name) 清理与指定Agent相关的消息
clear_shared_context(session_id) 清除所有公共上下文

8.2 SubAgentExecutionResult 数据结构

# 用于记录每个subagent的执行结果
@dataclass
class SubAgentExecutionResult:
    task_id: str           # 任务唯一标识符(递增数字字符串)
    agent_name: str        # SubAgent名称
    success: bool          # 是否成功
    result: str | None     # 执行结果
    error: str | None      # 错误信息
    execution_time: float  # 执行耗时(秒)
    created_at: float      # 创建时间戳
    completed_at: float    # 完成时间戳
    metadata: dict         # 额外元数据

8.3 SubAgentConfig 数据结构

# 用于定义每个subagent
@dataclass
class SubAgentConfig:
    name: str                          # subagent名字
    system_prompt: str = ""            # subagent系统提示词
    tools: set[str] | None = None      # subagent工具集合
    skills: set[str] | None = None     # subagent skill集合
    provider_id: str | None = None     # subagent provider id
    description: str = ""              # subagent 描述,对应handoff_tool的描述
    workdir: str | None = None         # subagent工作目录
    execution_timeout: float = 1200.0   # 执行超时时间(秒)

8.4 SubAgentSession 数据结构

# 存储每个会话的subagent所有相关信息
@dataclass
class SubAgentSession:
    session_id: str                                     # 会话ID
    subagents: dict = field(default_factory=dict)       # 存储SubAgentConfig对象
    handoff_tools: dict = field(default_factory=dict)   # 存储handoff工具列表
    subagent_status: dict = field(                      # 存储每个subagent的工作状态,{agent_name: status} 
      default_factory=dict                              # 状态包括 "IDLE" "RUNNING" "COMPLETED" "FAILED"
    )  
    protected_agents: set = field(                      # 受保护的subagent列表,若某个agent受到保护,则不会被自动清理
        default_factory=set
    ) 
    subagent_histories: dict = field(default_factory=dict)  # 存储每个subagent的历史上下文
    shared_context: list = field(default_factory=list) # 公共上下文列表
    shared_context_enabled: bool = False               # 是否启用公共上下文
    subagent_background_results: dict = field(
        default_factory=dict
    )                                                  # 后台subagent结果存储: {agent_name: {task_id: SubAgentExecutionResult}}
    background_task_counters: dict = field(default_factory=dict)   # 任务计数器: {agent_name: next_task_id}


  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

测试1:动态Agent创建和保护机制

测试流程:动态建立若干个SubAgent,给其中一些加入保护,另一些不加保护,观察清理情况,以及跨轮对话的记忆

测试1——动态创建子Agent和保护机制.md

测试2:子Agent历史上下文

测试流程:建立一个SubAgent,并使其跨多轮对话,查看其是否有多轮对话的记忆。

测试2——子Agent历史上下文.md

测试3:子Agent共享上下文

测试流程
  • 测试子Agent消息发送能力,通过send_shared_context发送工具向共享上下文中添加内容
  • 测试子Agent是否可以从共享上下文中获取信息
  • 测试主Agent向共享上下文中发送System消息,并影响子Agent的行为
  • 某个子Agent销毁后,共享上下文与其有关的部分会一同移除

测试3——子Agent共享上下文.md

测试4:异步与等待

测试流程

设计一个同时包含并行与串行、前台与后台模式的任务,观察工作流程
测试4——异步与等待.md

测试5:超长上下文工作实例

测试流程

让agent阅读一个大型项目的全部代码,并生成细致的介绍文档
测试5——动态处理超长上下文示例.md


Checklist / 检查清单

  • 😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
    / 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。

  • 👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
    / 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”

  • 🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
    / 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到 requirements.txtpyproject.toml 文件相应位置。

  • 😮 My changes do not introduce malicious code.
    / 我的更改没有引入恶意代码。

Summary by Sourcery

Introduce a unified SubAgentManager that manages both static and dynamically created sub-agents, adds enhanced subagent runtime features and background task controls, and exposes corresponding configuration, tooling, and dashboard UI to manage them.

New Features:

  • Add enhanced sub-agent runtime configuration (limits, history, shared context, timeouts, tool policies) persisted in core config and exposed via new dashboard controls.
  • Allow dynamic creation, protection, listing, resetting, and removal of sub-agents, plus explicit waiting on background sub-agent tasks via new tools.
  • Enable shared context messaging between main agent and sub-agents, including dedicated tools for sending and viewing shared context.
  • Support per-subagent workdir isolation and skills-scoped prompts, with unified history retention and timeout management across sub-agents.

Enhancements:

  • Unify static and dynamic subagent handling via SubAgentManager, including registration of static subagents from config and per-turn auto-cleanup.
  • Extend handoff execution to inject subagent history and richer system prompts, add execution timeouts, and capture runner messages for history.
  • Improve background handoff handling to integrate with SubAgentManager task tracking and avoid double-notifying when the main agent is still running.
  • Enhance tool loop runner to resolve dynamically created subagent tools, register new transfer tools from creation results, and support external abort signals.
  • Adjust main agent build and pipeline cleanup to configure SubAgentManager, register management tools, inject capability prompts, and run per-turn cleanup.
  • Improve local computer shell execution output decoding for better robustness in different environments.
  • Update subagent dashboard API and UI to support both legacy and new config shapes, and add tool-selection UI backed by a new available-tools endpoint.

Tests:

  • Add or update dashboard i18n entries for subagent features across supported languages to cover the enhanced configuration UI.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an enhanced dynamic subagent management system, allowing the main agent to create, manage, and communicate with specialized subagents. Key additions include a DynamicSubAgentManager for lifecycle and shared context handling, a dedicated SubAgentLogger, and integration into the main agent's toolset and prompt construction. Feedback focuses on improving error handling by avoiding broad exception silences, ensuring safe string formatting for system prompts, and refining the logic for detecting subagent creation failures. A minor typo in the subagent capability prompt was also identified.

Comment thread astrbot/core/dynamic_subagent_manager.py Outdated
Comment thread astrbot/core/astr_agent_tool_exec.py Outdated
Comment thread astrbot/core/astr_agent_tool_exec.py Outdated
Comment thread astrbot/core/astr_main_agent.py Outdated
Comment thread astrbot/core/subagent_orchestrator.py Outdated
@elecvoid243 elecvoid243 changed the title [Feature] 增强版的SubAgent功能 feat: 增强版的SubAgent功能 Mar 29, 2026
@dosubot dosubot Bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Apr 11, 2026
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 6 issues, and left some high level feedback:

  • In DynamicSubAgentManager.update_subagent_history, you mutate the current_messages list (removing system messages) while iterating it, which can skip elements; consider building a new filtered list instead of modifying in-place during iteration.
  • _do_handoff_background uses DynamicSubAgentManager without importing it in that scope, which will raise a NameError at runtime; add the appropriate import (similar to _execute_handoff) before calling its methods.
  • Several places access session.subagent_status[agent_name] directly (e.g., in get_subagent_status, create_pending_subagent_task, and WaitForSubagentTool), which can throw KeyError if the status was never set; consider using .get() with a sensible default or checking membership before indexing.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `DynamicSubAgentManager.update_subagent_history`, you mutate the `current_messages` list (removing system messages) while iterating it, which can skip elements; consider building a new filtered list instead of modifying in-place during iteration.
- `_do_handoff_background` uses `DynamicSubAgentManager` without importing it in that scope, which will raise a `NameError` at runtime; add the appropriate import (similar to `_execute_handoff`) before calling its methods.
- Several places access `session.subagent_status[agent_name]` directly (e.g., in `get_subagent_status`, `create_pending_subagent_task`, and `WaitForSubagentTool`), which can throw `KeyError` if the status was never set; consider using `.get()` with a sensible default or checking membership before indexing.

## Individual Comments

### Comment 1
<location path="astrbot/core/dynamic_subagent_manager.py" line_range="269-275" />
<code_context>
+        if agent_name not in session.subagent_histories:
+            session.subagent_histories[agent_name] = []
+
+        if isinstance(current_messages, list):
+            _MAX_TOOL_RESULT_LEN = 2000
+            for msg in current_messages:
+                if (
+                    isinstance(msg, dict) and msg.get("role") == "system"
+                ):  # 移除system消息
+                    current_messages.remove(msg)
+                # 对过长的 tool 结果做截断,避免单条消息占用过多空间
+                if (
</code_context>
<issue_to_address>
**issue (bug_risk):** Avoid mutating `current_messages` while iterating over it when cleaning system/tool messages.

In `update_subagent_history`, the loop both iterates over and mutates `current_messages` via `current_messages.remove(msg)`, which can cause items to be skipped and unintentionally alters the caller’s list (e.g. `runner_messages`). Instead, build a separate `cleaned_msgs` list and append only non-system messages, truncating long tool messages as needed, then extend `session.subagent_histories[agent_name]` with `cleaned_msgs` rather than the original list.
</issue_to_address>

### Comment 2
<location path="astrbot/core/dynamic_subagent_manager.py" line_range="1544-1551" />
<code_context>
+
+            if status == "IDLE":
+                return f"Error: SubAgent '{subagent_name}' is running no tasks."
+            elif status == "COMPLETED":
+                result = DynamicSubAgentManager.get_subagent_result(
+                    session_id, subagent_name, task_id
+                )
+                if result and (result.result != "" or result.completed_at > 0):
+                    return f"SubAgent '{result.agent_name}' execution completed\n Task id: {result.task_id}\n Execution time: {result.execution_time:.1f}s\n--- Result ---\n{result.result}\n"
+                else:
+                    return f"SubAgent '{result.agent_name}' execution completed with empty results. \n Task id: {result.task_id}\n Execution time: {result.execution_time:.1f}s\n"
+            elif status == "FAILED":
+                result = DynamicSubAgentManager.get_subagent_result(
</code_context>
<issue_to_address>
**issue (bug_risk):** Guard against `result` being `None` in `WaitForSubagentTool` before dereferencing.

In the `COMPLETED` (and similarly `FAILED`) branch, `result` may be `None`. In that case the `if result and ...` condition is false and the `else` dereferences `result.agent_name`, causing an exception. Handle the `None` case first, e.g.:

```python
result = DynamicSubAgentManager.get_subagent_result(...)
if not result:
    return (
        f"Error: No result found for SubAgent '{subagent_name}' "
        f"task {task_id or '(latest)'} despite status {status}."
    )

if result.result != "" or result.completed_at > 0:
    ...
else:
    ...
```
</issue_to_address>

### Comment 3
<location path="astrbot/core/astr_agent_tool_exec.py" line_range="503-507" />
<code_context>

         async def _run_handoff_in_background() -> None:
             try:
                 await cls._do_handoff_background(
                     tool=tool,
                     run_context=run_context,
-                    task_id=task_id,
+                    task_id=original_task_id,
+                    subagent_task_id=subagent_task_id,
                     **tool_args,
                 )
</code_context>
<issue_to_address>
**question (bug_risk):** Passing `subagent_task_id` through `**tool_args` may leak an unexpected parameter into the subagent tool call.

`subagent_task_id` is added to `tool_args` in `_execute_handoff_background`, then `_do_handoff_background` passes `**tool_args` to `_execute_handoff`, so it becomes an argument to the underlying handoff tool. If that tool’s schema doesn’t declare `subagent_task_id`, this may cause validation failures or be silently dropped. If the subagent doesn’t need this value, pop it from `tool_args` and use it only for bookkeeping; if it does, ensure it’s documented and included in the tool schema.
</issue_to_address>

### Comment 4
<location path="astrbot/core/dynamic_subagent_manager.py" line_range="776" />
<code_context>
+        return list(session.handoff_tools.values())
+
+    @classmethod
+    def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
+        """为 SubAgent 创建一个 pending 任务,返回 task_id
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider introducing small internal helpers to centralize background task result handling and status access, replacing the current scattered direct dict manipulations with reusable abstractions.

A lot of the added complexity comes from repeated, ad‑hoc manipulation of `session.subagent_background_results` and `session.subagent_status` across multiple methods. You can significantly reduce cognitive load with a small, local abstraction without changing behavior.

### 1. Centralize background task result logic

These methods currently reimplement similar logic over `session.subagent_background_results`:

- `create_pending_subagent_task`
- `get_pending_subagent_tasks`
- `get_latest_task_id`
- `store_subagent_result`
- `get_subagent_result`
- `has_subagent_result`
- `clear_subagent_result`

You can introduce a very small internal helper to encapsulate result access and “completed” checks, and then reuse it. For example:

```python
# inside DynamicSubAgentManager

@classmethod
def _ensure_task_store(cls, session: DynamicSubAgentSession, agent_name: str) -> dict[str, SubAgentExecutionResult]:
    if agent_name not in session.subagent_background_results:
        session.subagent_background_results[agent_name] = {}
    return session.subagent_background_results[agent_name]

@staticmethod
def _is_task_completed(result: SubAgentExecutionResult) -> bool:
    return (result.result != "" and result.result is not None) or result.completed_at > 0
```

Then, for example, `get_pending_subagent_tasks` and `get_subagent_result` become much simpler and more uniform:

```python
@classmethod
def get_pending_subagent_tasks(cls, session_id: str, agent_name: str) -> list[str]:
    session = cls.get_session(session_id)
    if not session:
        return []

    store = session.subagent_background_results.get(agent_name)
    if not store:
        return []

    pending = [tid for tid, res in store.items() if not cls._is_task_completed(res)]
    return sorted(pending, key=lambda tid: store[tid].created_at)
```

```python
@classmethod
def get_subagent_result(
    cls, session_id: str, agent_name: str, task_id: str | None = None
) -> SubAgentExecutionResult | None:
    session = cls.get_session(session_id)
    if not session:
        return None

    store = session.subagent_background_results.get(agent_name)
    if not store:
        return None

    if task_id is None:
        completed = [res for res in store.values() if cls._is_task_completed(res)]
        if not completed:
            return None
        return max(completed, key=lambda r: r.created_at)

    return store.get(task_id)
```

`store_subagent_result`, `has_subagent_result`, and `clear_subagent_result` can likewise be rewritten to go through `_ensure_task_store` and `_is_task_completed`. This keeps all functionality but removes duplicated filter/sorting logic and makes status/result flows easier to reason about.

### 2. Make subagent status access safe and consistent

Currently `get_subagent_status` assumes the key exists:

```python
@classmethod
def get_subagent_status(cls, session_id: str, agent_name: str) -> str:
    session = cls.get_session(session_id)
    return session.subagent_status[agent_name]
```

A small helper avoids scattered `KeyError` assumptions and defensive checks elsewhere:

```python
@classmethod
def get_subagent_status(cls, session_id: str, agent_name: str) -> str:
    session = cls.get_session(session_id)
    if not session:
        return "UNKNOWN"
    return session.subagent_status.get(agent_name, "UNKNOWN")
```

Then consumers like `ListDynamicSubagentsTool.call` and `WaitForSubagentTool.call` can rely on this instead of touching the dict shape implicitly. This reduces cross‑coupling on internal dict structure and status keys while preserving all existing behavior.
</issue_to_address>

### Comment 5
<location path="astrbot/core/astr_agent_tool_exec.py" line_range="25" />
<code_context>
     BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT,
 )
 from astrbot.core.cron.events import CronMessageEvent
+from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager
 from astrbot.core.message.components import Image
 from astrbot.core.message.message_event_result import (
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting DynamicSubAgentManager-related logic and subagent orchestration into small helper functions to reduce duplication and make the main methods easier to read and maintain.

You can keep the new functionality but reduce the complexity and repetition with a few small helpers and a single DynamicSubAgentManager access point.

### 1. Centralize `DynamicSubAgentManager` access

You currently `from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager` multiple times and mix that with other symbols (e.g. `SEND_SHARED_CONTEXT_TOOL`).

Pull this into a module-level helper so the core methods aren’t cluttered with dynamic imports and try/except:

```python
# near the top of the module
try:
    from astrbot.core import dynamic_subagent_manager as _dsm_mod
except Exception:  # noqa: BLE001
    _dsm_mod = None


def _get_dynamic_subagent_manager():
    return getattr(_dsm_mod, "DynamicSubAgentManager", None)


def _get_send_shared_context_tool():
    return getattr(_dsm_mod, "SEND_SHARED_CONTEXT_TOOL", None)
```

Then in the methods:

```python
toolset = cls._build_handoff_toolset(run_context, tool.agent.tools)

dsm = _get_dynamic_subagent_manager()
send_shared_context_tool = _get_send_shared_context_tool()
session_id = event.unified_msg_origin

if dsm and send_shared_context_tool:
    session = dsm.get_session(session_id)
    if session and session.shared_context_enabled:
        toolset.add_tool(send_shared_context_tool)
```

This removes repetitive imports and exception handling from the hot paths.

---

### 2. Extract subagent history loading logic

The inlined history loading in `_execute_handoff` is long and includes type checks and try/excepts. Extract it:

```python
def _load_subagent_history(umo: str, agent_name: str | None) -> list[Message]:
    dsm = _get_dynamic_subagent_manager()
    if not (dsm and agent_name):
        return []

    history: list[Message] = []
    try:
        stored_history = dsm.get_subagent_history(umo, agent_name)
        if not stored_history:
            return []

        for hist_msg in stored_history:
            try:
                if isinstance(hist_msg, Message):
                    history.append(hist_msg)
                elif isinstance(hist_msg, dict):
                    history.append(Message.model_validate(hist_msg))
            except Exception:
                continue

        if history:
            logger.debug(
                "[SubAgentHistory] Loaded %d history messages for %s",
                len(history),
                agent_name,
            )
    except Exception as e:  # noqa: BLE001
        logger.warning(
            "[SubAgentHistory] Failed to load history for %s: %s",
            agent_name,
            e,
        )
    return history
```

Usage in `_execute_handoff`:

```python
agent_name = getattr(tool.agent, "name", None)
subagent_history = _load_subagent_history(umo, agent_name)

if subagent_history:
    contexts = (subagent_history + (contexts or [])) if contexts else subagent_history
```

This keeps `_execute_handoff` focused on orchestration rather than the details of history storage.

---

### 3. Extract subagent system prompt construction

You build `subagent_system_prompt` inline and re-import `DynamicSubAgentManager` again. Move this into a helper:

```python
def _build_subagent_system_prompt(
    tool: HandoffTool,
    umo: str,
    prov_settings: dict,
) -> str:
    agent_name = getattr(tool.agent, "name", None)
    base = tool.agent.instructions or ""
    system_prompt = f"# Role\nYour name is {agent_name}(used for tool calling)\n{base}\n"

    dsm = _get_dynamic_subagent_manager()
    if not (dsm and agent_name):
        return system_prompt

    try:
        runtime = prov_settings.get("computer_use_runtime", "local")
        static_prompt = dsm.build_static_subagent_prompts(umo, agent_name)
        dynamic_prompt = dsm.build_dynamic_subagent_prompts(umo, agent_name, runtime)
        system_prompt += static_prompt
        system_prompt += dynamic_prompt
    except Exception:
        pass

    return system_prompt
```

Then in `_execute_handoff`:

```python
prov_settings: dict = ctx.get_config(umo=umo).get("provider_settings", {})
subagent_system_prompt = _build_subagent_system_prompt(tool, umo, prov_settings)

runner_messages: list[Message] = []
llm_resp = await ctx.tool_loop_agent(
    event=event,
    chat_provider_id=prov_id,
    prompt=input_,
    image_urls=image_urls,
    system_prompt=subagent_system_prompt,
    tools=toolset,
    contexts=contexts,
    max_steps=agent_max_step,
    tool_call_timeout=run_context.tool_call_timeout,
    stream=stream,
    runner_messages=runner_messages,
)
```

History persistence can also be extracted:

```python
def _persist_subagent_history(umo: str, agent_name: str | None, runner_messages: list[Message]) -> None:
    if not (agent_name and runner_messages):
        return
    dsm = _get_dynamic_subagent_manager()
    if not dsm:
        return
    try:
        dsm.update_subagent_history(umo, agent_name, runner_messages)
    except Exception:
        pass
```

And called after `tool_loop_agent`.

---

### 4. Extract enhanced background-task registration and user message

The enhanced branch in `_execute_handoff_background` mixes session lookup, task creation, status, and user text. Isolate it:

```python
def _register_enhanced_subagent_task(
    umo: str,
    agent_name: str | None,
) -> str | None:
    dsm = _get_dynamic_subagent_manager()
    if not (dsm and agent_name):
        return None

    try:
        session = dsm.get_session(umo)
        if not (session and agent_name in session.subagents):
            return None

        subagent_task_id = dsm.create_pending_subagent_task(
            session_id=umo, agent_name=agent_name
        )
        if subagent_task_id.startswith("__PENDING_TASK_CREATE_FAILED__"):
            logger.info(
                "[EnhancedSubAgent:BackgroundTask] Failed to create background task %s for %s",
                subagent_task_id,
                agent_name,
            )
            return None

        dsm.set_subagent_status(
            session_id=umo, agent_name=agent_name, status="RUNNING"
        )
        logger.info(
            "[EnhancedSubAgent:BackgroundTask] Created background task %s for %s",
            subagent_task_id,
            agent_name,
        )
        return subagent_task_id
    except Exception as e:  # noqa: BLE001
        logger.info(
            "[EnhancedSubAgent:BackgroundTask] Failed to create background task for %s: %s",
            agent_name,
            e,
        )
        return None
```

```python
def _build_background_submission_message(
    agent_name: str | None,
    original_task_id: str,
    subagent_task_id: str | None,
) -> mcp.types.TextContent:
    if subagent_task_id:
        text = (
            f"Background task submitted. subagent_task_id={subagent_task_id}. "
            f"SubAgent '{agent_name}' is working on the task. "
            f"Use wait_for_subagent(subagent_name='{agent_name}', task_id='{subagent_task_id}') to get the result."
        )
    else:
        text = (
            f"Background task submitted. task_id={original_task_id}. "
            f"SubAgent '{agent_name}' is working on the task. "
            "You will be notified when it finishes."
        )
    return mcp.types.TextContent(type="text", text=text)
```

Then `_execute_handoff_background` reduces to:

```python
event = run_context.context.event
umo = event.unified_msg_origin
agent_name = getattr(tool.agent, "name", None)

subagent_task_id = _register_enhanced_subagent_task(umo, agent_name)
original_task_id = uuid.uuid4().hex

async def _run_handoff_in_background() -> None:
    try:
        await cls._do_handoff_background(
            tool=tool,
            run_context=run_context,
            task_id=original_task_id,
            subagent_task_id=subagent_task_id,
            **tool_args,
        )
    except Exception as e:  # noqa: BLE001
        logger.error(
            "Background handoff %s (%s) failed: %s",
            original_task_id,
            tool.name,
            e,
            exc_info=True,
        )

asyncio.create_task(_run_handoff_in_background())

text_content = _build_background_submission_message(
    agent_name, original_task_id, subagent_task_id
)
yield mcp.types.CallToolResult(content=[text_content])
```

---

### 5. Split enhanced result handling in `_do_handoff_background`

The method currently mixes execution, enhanced result storage, status updates, shared context, and main-agent wakeup logic.

You can keep `_do_handoff_background` as an orchestrator and push the detailed branching into helpers:

```python
def _is_enhanced_subagent(umo: str, agent_name: str | None) -> tuple[bool, T.Any]:
    dsm = _get_dynamic_subagent_manager()
    if not (dsm and agent_name):
        return False, None
    session = dsm.get_session(umo)
    if session and agent_name in session.subagents:
        return True, session
    return False, session
```

```python
async def _handle_enhanced_subagent_background_result(
    cls,
    *,
    umo: str,
    agent_name: str,
    task_id: str | None,
    result_text: str,
    error_text: str | None,
    execution_time: float,
    run_context: ContextWrapper[AstrAgentContext],
    tool: HandoffTool,
    tool_args: dict,
) -> None:
    dsm = _get_dynamic_subagent_manager()
    if not dsm:
        return

    success = error_text is None
    dsm.store_subagent_result(
        session_id=umo,
        agent_name=agent_name,
        task_id=task_id,
        success=success,
        result=result_text.strip() if result_text else "",
        error=error_text,
        execution_time=execution_time,
    )

    dsm.set_subagent_status(
        session_id=umo,
        agent_name=agent_name,
        status="FAILED" if error_text else "COMPLETED",
    )

    session = dsm.get_session(umo)
    if session and session.shared_context_enabled:
        status_content = (
            f"[EnhancedSubAgent:BackgroundTask] SubAgent '{agent_name}' Task '{task_id}' "
            f"{'Failed: ' + error_text if error_text else f'Complete. Execution Time: {execution_time:.1f}s'}"
        )
        dsm.add_shared_context(
            session_id=umo,
            sender=agent_name,
            context_type="status",
            content=status_content,
            target="all",
        )

    logger.info(
        "[EnhancedSubAgent:BackgroundTask] Stored result for %s task %s: success=%s, time=%.1fs",
        agent_name,
        task_id,
        success,
        execution_time,
    )

    if not await cls._maybe_wake_main_agent_after_background(
        run_context=run_context,
        tool=tool,
        task_id=task_id,
        agent_name=agent_name,
        result_text=result_text,
        tool_args=tool_args,
    ):
        return
```

```python
@classmethod
async def _maybe_wake_main_agent_after_background(
    cls,
    *,
    run_context: ContextWrapper[AstrAgentContext],
    tool: HandoffTool,
    task_id: str,
    agent_name: str | None,
    result_text: str,
    tool_args: dict,
) -> bool:
    event = run_context.context.event
    try:
        context_extra = getattr(run_context.context, "extra", None)
        if context_extra and isinstance(context_extra, dict):
            main_agent_runner = context_extra.get("main_agent_runner")
            main_agent_is_running = (
                main_agent_runner is not None and not main_agent_runner.done()
            )
        else:
            main_agent_is_running = False
    except Exception as e:  # noqa: BLE001
        logger.error("Failed to check main agent status: %s", e)
        main_agent_is_running = True

    if main_agent_is_running:
        return False

    await cls._wake_main_agent_for_background_result(
        run_context=run_context,
        task_id=task_id,
        tool_name=tool.name,
        result_text=result_text,
        tool_args=tool_args,
        note=(
            event.get_extra("background_note")
            or f"Background task for subagent '{agent_name}' finished."
        ),
        summary_name=f"Dedicated to subagent `{agent_name}`",
        extra_result_fields={"subagent_name": agent_name},
    )
    return True
```

Then `_do_handoff_background` can collapse to:

```python
@classmethod
async def _do_handoff_background(
    cls,
    tool: HandoffTool,
    run_context: ContextWrapper[AstrAgentContext],
    task_id: str,
    **tool_args,
) -> None:
    start_time = time.time()
    result_text = ""
    error_text: str | None = None

    tool_args = dict(tool_args)
    tool_args["image_urls"] = await cls._collect_handoff_image_urls(
        run_context, tool_args.get("image_urls")
    )

    event = run_context.context.event
    umo = event.unified_msg_origin
    agent_name = getattr(tool.agent, "name", None)

    try:
        async for r in cls._execute_handoff(
            tool,
            run_context,
            image_urls_prepared=True,
            **tool_args,
        ):
            if isinstance(r, mcp.types.CallToolResult):
                for content in r.content:
                    if isinstance(content, mcp.types.TextContent):
                        result_text += content.text + "\n"
    except Exception as e:  # noqa: BLE001
        error_text = str(e)
        result_text = (
            f"error: Background task execution failed, internal error: {e!s}"
        )

    execution_time = time.time() - start_time
    is_enhanced, _ = _is_enhanced_subagent(umo, agent_name)

    if is_enhanced:
        await _handle_enhanced_subagent_background_result(
            cls=cls,
            umo=umo,
            agent_name=agent_name,
            task_id=tool_args.get("subagent_task_id"),
            result_text=result_text,
            error_text=error_text,
            execution_time=execution_time,
            run_context=run_context,
            tool=tool,
            tool_args=tool_args,
        )
    else:
        await cls._wake_main_agent_for_background_result(
            run_context=run_context,
            task_id=task_id,
            tool_name=tool.name,
            result_text=result_text,
            tool_args=tool_args,
            note=(
                event.get_extra("background_note")
                or f"Background task for subagent '{agent_name}' finished."
            ),
            summary_name=f"Dedicated to subagent `{agent_name}`",
            extra_result_fields={"subagent_name": agent_name},
        )
```

This keeps the orchestration logic in the high-level methods and moves the deeply branched behavior into focused helpers, preserving your enhanced behavior while making each method easier to scan and maintain.
</issue_to_address>

### Comment 6
<location path="astrbot/core/agent/runners/tool_loop_agent_runner.py" line_range="920" />
<code_context>
-                    func_tool = self._skill_like_raw_tool_set.get_tool(func_tool_name)
-                else:
-                    func_tool = req.func_tool.get_tool(func_tool_name)
+                # First check if it's a dynamically created subagent tool
+                func_tool = None
+                run_context_context = getattr(self.run_context, "context", None)
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the dynamic tool resolution and dynamic tool registration logic into dedicated helper methods so `_handle_tool_call` remains linear and easier to read.

You can keep the new behavior but pull the dynamic concerns into helpers to reduce branching and duplication in `_handle_tool_call`.

### 1. Extract dynamic tool resolution into a helper

Move the dynamic lookup logic (including the `DynamicSubAgentManager` import and session handling) into a small helper. That keeps `_handle_tool_call` linear and easier to follow:

```python
def _resolve_dynamic_tool(self, func_tool_name: str):
    run_context_context = getattr(self.run_context, "context", None)
    if run_context_context is None:
        return None

    event = getattr(run_context_context, "event", None)
    if event is None:
        return None

    session_id = getattr(event, "unified_msg_origin", None)
    if not session_id:
        return None

    try:
        from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager
        dynamic_handoffs = DynamicSubAgentManager.get_handoff_tools_for_session(session_id)
    except Exception:
        return None

    for h in dynamic_handoffs:
        if h.name == func_tool_name or f"transfer_to_{h.name}" == func_tool_name:
            return h
    return None
```

Then the main code becomes simpler:

```python
if not req.func_tool:
    return

# Prefer dynamic tools when available
func_tool = self._resolve_dynamic_tool(func_tool_name)

if func_tool is None:
    if self.tool_schema_mode == "skills_like" and self._skill_like_raw_tool_set:
        func_tool = self._skill_like_raw_tool_set.get_tool(func_tool_name)
    else:
        func_tool = req.func_tool.get_tool(func_tool_name)
```

This keeps the hot path readable while preserving the dynamic behavior.

### 2. Extract dynamic-tool creation handling into a helper

Similarly, encapsulate the `__DYNAMIC_TOOL_CREATED__` protocol and registration into a helper that takes the tool result and updates `self.req.func_tool` as needed:

```python
def _maybe_register_dynamic_tool_from_result(self, result_content: str) -> None:
    if not result_content.startswith("__DYNAMIC_TOOL_CREATED__:"):
        return

    parts = result_content.split(":", 3)
    if len(parts) < 4:
        return

    new_tool_name = parts[1]
    new_tool_obj_name = parts[2]
    logger.info(f"[EnhancedSubAgent] Tool created: {new_tool_name}")

    run_context_context = getattr(self.run_context, "context", None)
    event = getattr(run_context_context, "event", None) if run_context_context else None
    session_id = getattr(event, "unified_msg_origin", None) if event else None
    if not session_id:
        return

    try:
        from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager
        handoffs = DynamicSubAgentManager.get_handoff_tools_for_session(session_id)
    except Exception as e:
        logger.warning(f"[EnhancedSubAgent] Failed to load dynamic handoffs: {e}")
        return

    for handoff in handoffs:
        if (
            handoff.name == new_tool_obj_name
            or handoff.name == new_tool_name.replace("transfer_to_", "")
        ):
            if self.req.func_tool:
                self.req.func_tool.add_tool(handoff)
            logger.info(f"[EnhancedSubAgent] Added {handoff.name} to func_tool set")
            break
```

Then the loop becomes:

```python
if result_parts:
    result_content = "\n\n".join(result_parts)
    self._maybe_register_dynamic_tool_from_result(result_content)

    inline_result = await self._materialize_large_tool_result(
        tool_call_id=func_tool_id,
        content=result_content,
    )
    _append_tool_call_result(
        func_tool_id,
        inline_result + self._build_repeated_tool_call_guidance(
            func_tool_name, tool_call_streak
        ),
    )
```

This removes protocol parsing, logging details, and manager lookups from the main loop and makes `_handle_tool_call` focus on “run tool → process result” while still fully supporting dynamic tools.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread astrbot/core/dynamic_subagent_manager.py Outdated
Comment thread astrbot/core/dynamic_subagent_manager.py Outdated
Comment thread astrbot/core/astr_agent_tool_exec.py
Comment thread astrbot/core/dynamic_subagent_manager.py
Comment thread astrbot/core/astr_agent_tool_exec.py Outdated
Comment thread astrbot/core/agent/runners/tool_loop_agent_runner.py Outdated
@Soulter Soulter force-pushed the master branch 2 times, most recently from faf411f to 0068960 Compare April 19, 2026 09:50
@elecvoid243
Copy link
Copy Markdown
Author

@sourcery-ai review

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 6 issues, and left some high level feedback:

  • In SubAgentManager.remove_subagent you directly index session.subagent_status[agent_name] before checking for session existence or whether agent_name is present (and even when agent_name == 'all'), which can raise KeyError/AttributeError; guard against a missing session and use .get() or early returns before accessing this dict.
  • In FunctionToolExecutor._execute_handoff the timeout branch calls _save_subagent_history(agent_name, runner_messages, umo) but the method signature is (umo, runner_messages, agent_name), so history will be written under the wrong key; swap the argument order to match the signature.
  • WaitForSubagentTool.call assumes result is non-None in some error paths (e.g., FAILED branch else case uses result.error when result may be None), which will raise; add a separate branch for result is None and avoid dereferencing it there.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In SubAgentManager.remove_subagent you directly index session.subagent_status[agent_name] before checking for `session` existence or whether `agent_name` is present (and even when `agent_name == 'all'`), which can raise KeyError/AttributeError; guard against a missing session and use `.get()` or early returns before accessing this dict.
- In FunctionToolExecutor._execute_handoff the timeout branch calls `_save_subagent_history(agent_name, runner_messages, umo)` but the method signature is `(umo, runner_messages, agent_name)`, so history will be written under the wrong key; swap the argument order to match the signature.
- WaitForSubagentTool.call assumes `result` is non-None in some error paths (e.g., FAILED branch `else` case uses `result.error` when `result` may be None), which will raise; add a separate branch for `result is None` and avoid dereferencing it there.

## Individual Comments

### Comment 1
<location path="astrbot/core/astr_agent_tool_exec.py" line_range="414" />
<code_context>
+                )
+            except asyncio.TimeoutError:
+                # 若超时,保存已产生的部分历史
+                cls._save_subagent_history(agent_name, runner_messages, umo)
+                error_msg = f"SubAgent '{agent_name}' execution timeout after {execution_timeout:.1f} seconds."
+                logger.warning(f"[SubAgent:Timeout] {error_msg}")
</code_context>
<issue_to_address>
**issue (bug_risk):** Arguments to `_save_subagent_history` are passed in the wrong order in the timeout path.

`_save_subagent_history` is defined as `(umo: str, runner_messages: list[Message], agent_name: str)`, but in the timeout branch you call `cls._save_subagent_history(agent_name, runner_messages, umo)`. This swaps `umo` and `agent_name`, so history is written under the wrong key. Match the normal path and call `cls._save_subagent_history(umo, runner_messages, agent_name)` here.
</issue_to_address>

### Comment 2
<location path="astrbot/core/subagent_manager.py" line_range="1078-164" />
<code_context>
+        return session.subagent_background_results[agent_name].get(task_id, None)
+
+    @classmethod
+    def has_subagent_result(
+        cls, session_id: str, agent_name: str, task_id: str | None = None
+    ) -> bool:
+        """检查 SubAgent 是否有结果
+
+        Args:
+            session_id: Session ID
+            agent_name: SubAgent 名称
+            task_id: 任务ID,如果为None则检查是否有任何已完成的任务
+        """
+        session = cls.get_session(session_id)
+        task_store = cls._ensure_task_store(session, agent_name)
+        if not session or not task_store:
</code_context>
<issue_to_address>
**issue (bug_risk):** `has_subagent_result` can raise when session is missing due to calling `_ensure_task_store` on `None`.

`session` is retrieved before `_ensure_task_store` is called, but if `cls.get_session(session_id)` returns `None`, `_ensure_task_store` is invoked with `None` and can raise before the `if not session or not task_store` check runs. Please guard against `session is None` and return early, then call `_ensure_task_store` only when a valid session exists, consistent with other accessors.
</issue_to_address>

### Comment 3
<location path="astrbot/core/subagent_manager.py" line_range="1167-1170" />
<code_context>
+                    "type": "string",
+                    "description": "Subagent system_prompt",
+                },
+                "tools": {
+                    "type": "array",
+                    "items": {"type": "string"},
+                    "description": "Tools available to subagent, can be empty.",
+                },
+                "skills": {
</code_context>
<issue_to_address>
**nitpick (bug_risk):** The `tools`/`skills` defaults in `CreateSubAgentTool.call` don't align with the JSON schema types.

In `CreateSubAgentTool.call`, `tools = kwargs.get("tools", {})` and `skills = kwargs.get("skills", {})` conflict with the JSON schema, which defines both as arrays. While `set({})` currently yields an empty set, using `{}` as the default is type-misleading and brittle. Prefer `None`/`[]` and normalize before `set`, e.g. `tools = set(kwargs.get("tools") or [])`, to align with the schema and avoid subtle type issues if callers pass unexpected shapes.
</issue_to_address>

### Comment 4
<location path="astrbot/core/subagent_manager.py" line_range="901" />
<code_context>
+        return list(session.handoff_tools.values())
+
+    @classmethod
+    def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
+        """为 SubAgent 创建一个 pending 任务,返回 task_id
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the background-task dict handling into a dedicated BackgroundTaskStore helper and moving path-safety checks into a shared utility to simplify SubAgentManager and CreateSubAgentTool without changing behavior.

You can reduce complexity in a few high‑impact, low‑risk spots without changing behavior. Two concrete refactors:

---

### 1. Encapsulate background task dict juggling in a `BackgroundTaskStore`

All of these methods manually juggle nested dicts on `session.subagent_background_results` / `session.background_task_counters`:

- `create_pending_subagent_task`
- `_ensure_task_store`
- `_is_task_completed`
- `get_pending_subagent_tasks`
- `get_latest_task_id`
- `store_subagent_result`
- `get_subagent_result`
- `has_subagent_result`
- `clear_subagent_result`

You can move the dict logic into a small object per agent and keep `SubAgentManager` methods thin. This keeps functionality but drops a lot of low-level dict manipulation from the manager.

Example (core idea, minimal change):

```python
# new helper, can live in same module initially
@dataclass
class BackgroundTaskStore:
    results: dict[str, SubAgentExecutionResult] = field(default_factory=dict)
    counter: int = 0

    def create_pending(self, agent_name: str) -> str:
        self.counter += 1
        task_id = str(self.counter)
        self.results[task_id] = SubAgentExecutionResult(
            task_id=task_id,
            agent_name=agent_name,
            success=False,
            result=None,
            created_at=time.time(),
            metadata={},
        )
        return task_id

    def pending_ids(self) -> list[str]:
        return [
            tid for tid, res in self.results.items()
            if res.completed_at <= 0 and res.error is None
        ]

    def latest_id(self) -> str | None:
        if not self.results:
            return None
        tid, _ = max(self.results.items(), key=lambda x: x[1].created_at)
        return tid

    def store_result(
        self,
        agent_name: str,
        success: bool,
        result: str,
        task_id: str | None = None,
        error: str | None = None,
        execution_time: float = 0.0,
        metadata: dict | None = None,
    ) -> None:
        if task_id is None:
            pending = self.pending_ids()
            if not pending:
                return
            task_id = pending[-1]

        if task_id not in self.results:
            self.results[task_id] = SubAgentExecutionResult(
                task_id=task_id,
                agent_name=agent_name,
                success=False,
                result="",
                created_at=time.time(),
                metadata=metadata or {},
            )

        r = self.results[task_id]
        r.success = success
        r.result = result
        r.error = error
        r.execution_time = execution_time
        r.completed_at = time.time()
        if metadata:
            r.metadata.update(metadata)

    def latest_completed(self) -> SubAgentExecutionResult | None:
        completed = [
            r for r in self.results.values()
            if r.result != "" or r.completed_at > 0
        ]
        if not completed:
            return None
        return max(completed, key=lambda r: r.created_at)

    def get(self, task_id: str) -> SubAgentExecutionResult | None:
        return self.results.get(task_id)

    def has_result(self, task_id: str | None = None) -> bool:
        if task_id is None:
            return any(r.result != "" or r.completed_at > 0 for r in self.results.values())
        r = self.results.get(task_id)
        return bool(r and (r.result != "" or r.completed_at > 0))

    def clear(self, task_id: str | None = None) -> None:
        if task_id is None:
            self.results.clear()
            self.counter = 0
        else:
            self.results.pop(task_id, None)
```

Then in `SubAgentSession`:

```python
@dataclass
class SubAgentSession:
    ...
    subagent_background_results: dict[str, BackgroundTaskStore] = field(default_factory=dict)
```

And `SubAgentManager` becomes mostly delegations:

```python
@classmethod
def _get_task_store(cls, session: SubAgentSession, agent_name: str) -> BackgroundTaskStore:
    if agent_name not in session.subagent_background_results:
        session.subagent_background_results[agent_name] = BackgroundTaskStore()
    return session.subagent_background_results[agent_name]

@classmethod
def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
    session = cls._get_or_create_session(session_id)
    if session.subagent_status[agent_name] == "RUNNING":
        return f"__PENDING_TASK_CREATE_FAILED__: Subagent {agent_name} already running"
    store = cls._get_task_store(session, agent_name)
    return store.create_pending(agent_name)

@classmethod
def get_pending_subagent_tasks(cls, session_id: str, agent_name: str) -> list[str]:
    session = cls.get_session(session_id)
    if not session:
        return []
    return cls._get_task_store(session, agent_name).pending_ids()

@classmethod
def store_subagent_result(...):
    session = cls._get_or_create_session(session_id)
    store = cls._get_task_store(session, agent_name)
    store.store_result(agent_name, success, result, task_id, error, execution_time, metadata)

@classmethod
def get_subagent_result(...):
    session = cls.get_session(session_id)
    if not session or agent_name not in session.subagent_background_results:
        return None
    store = cls._get_task_store(session, agent_name)
    return store.get(task_id) if task_id is not None else store.latest_completed()

@classmethod
def has_subagent_result(...):
    session = cls.get_session(session_id)
    if not session:
        return False
    store = cls._get_task_store(session, agent_name)
    return store.has_result(task_id)

@classmethod
def clear_subagent_result(...):
    session = cls.get_session(session_id)
    if not session:
        return
    store = cls._get_task_store(session, agent_name)
    store.clear(task_id)
```

This keeps the same external API and behavior but substantially shrinks the manager’s internal complexity around background tasks.

---

### 2. Extract `_check_path_safety` into a shared utility

`CreateSubAgentTool._check_path_safety` mixes OS‑specific logic into the tool class and duplicates environment concerns in this already-large module. You can move it to a separate helper (same module or `utils/path_safety.py`) and reuse it elsewhere if needed.

Example:

```python
# new file: astrbot/core/utils/path_safety.py
import os
import platform

def is_safe_workdir(path_str: str) -> bool:
    if not path_str or not isinstance(path_str, str):
        return False
    if not os.path.isabs(path_str):
        return False
    try:
        resolved = os.path.realpath(path_str)
    except (OSError, ValueError):
        return False

    path_parts = {part.lower() for part in os.path.normpath(resolved).split(os.sep)}
    system = platform.system().lower()

    if system == "windows":
        dangerous = {
            "windows", "system32", "syswow64", "boot", "recovery",
            "programdata", "$recycle.bin", "system volume information",
        }
        if path_parts & dangerous:
            return False
    elif system == "linux":
        for prefix in ["/etc", "/bin", "/sbin", "/lib", "/lib64",
                       "/boot", "/dev", "/proc", "/sys", "/root"]:
            if resolved == prefix or resolved.startswith(prefix + os.sep):
                return False
    elif system == "darwin":
        for prefix in ["/System", "/Library", "/private/var", "/usr"]:
            if resolved == prefix or resolved.startswith(prefix + os.sep):
                return False

    if ".." in path_str:
        return False
    if not os.path.exists(resolved):
        return False
    return True
```

Then `CreateSubAgentTool` becomes simpler and more testable:

```python
from astrbot.core.utils.path_safety import is_safe_workdir

@dataclass
class CreateSubAgentTool(FunctionTool):
    ...
    async def call(self, context, **kwargs) -> str:
        ...
        workdir = kwargs.get("workdir")
        if not workdir or not is_safe_workdir(workdir):
            workdir = get_astrbot_temp_path()
        ...
```

This removes system/path concerns from the tool class and centralizes safety logic in one place.
</issue_to_address>

### Comment 5
<location path="astrbot/core/astr_agent_tool_exec.py" line_range="370" />
<code_context>
-            tool_call_timeout=run_context.tool_call_timeout,
-            stream=stream,
+
+        # 获取子代理的历史上下文
+        subagent_history, agent_name = cls._load_subagent_history(umo, tool)
+        # 如果有历史上下文,合并到 contexts 中
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting shared timeout handling and SubAgentManager access into small helpers and making `_save_subagent_history` keyword-only to simplify subagent orchestration logic and avoid subtle call bugs.

You can reduce the new complexity around subagent orchestration without changing behavior by:

1. **Collapsing duplicated timeout logic**  
   `_execute_handoff` and `_do_handoff_background` both wrap inner coroutines with `asyncio.wait_for`, handle `TimeoutError`, and do subagent-specific reporting. Extract that into a small shared helper so each caller reads linearly.

   For example:

   ```python
   @staticmethod
   async def _run_subagent_with_timeout(
       coro: T.Coroutine[T.Any, T.Any, T.Any],
       *,
       timeout: float,
       on_timeout: T.Callable[[], None],
   ):
       if timeout > 0:
           try:
               return await asyncio.wait_for(coro, timeout=timeout)
           except asyncio.TimeoutError:
               on_timeout()
               raise
       else:
           return await coro
   ```

   Then in `_execute_handoff`:

   ```python
   execution_timeout = cls._get_subagent_execution_timeout()

   async def _run_subagent():
       return await ctx.tool_loop_agent(
           event=event,
           chat_provider_id=prov_id,
           prompt=input_,
           image_urls=image_urls,
           system_prompt=subagent_system_prompt,
           tools=toolset,
           contexts=contexts,
           max_steps=agent_max_step,
           tool_call_timeout=run_context.tool_call_timeout,
           stream=stream,
           runner_messages=runner_messages,
       )

   def _on_timeout():
       cls._save_subagent_history(umo=umo, runner_messages=runner_messages, agent_name=agent_name)
       error_msg = f"SubAgent '{agent_name}' execution timeout after {execution_timeout:.1f} seconds."
       logger.warning(f"[SubAgent:Timeout] {error_msg}")
       cls._handle_subagent_timeout(umo=umo, agent_name=agent_name)

   try:
       llm_resp = await cls._run_subagent_with_timeout(
           _run_subagent(), timeout=execution_timeout, on_timeout=_on_timeout
       )
   except asyncio.TimeoutError:
       yield mcp.types.CallToolResult(
           content=[mcp.types.TextContent(type="text", text=f"error: SubAgent '{agent_name}' execution timeout after {execution_timeout:.1f} seconds.")]
       )
       return
   ```

   And in `_do_handoff_background`:

   ```python
   execution_timeout = cls._get_subagent_execution_timeout()

   async def _run():
       nonlocal result_text
       async for r in cls._execute_handoff(
           tool,
           run_context,
           image_urls_prepared=True,
           **tool_args,
       ):
           if isinstance(r, mcp.types.CallToolResult):
               for content in r.content:
                   if isinstance(content, mcp.types.TextContent):
                       result_text += content.text + "\n"

   def _on_timeout():
       nonlocal error_text, result_text
       error_text = f"Execution timeout after {execution_timeout:.1f} seconds."
       result_text = f"error: Background SubAgent '{agent_name}' {error_text}"
       logger.warning(f"[SubAgent:BackgroundTask] {error_text}")

   try:
       await cls._run_subagent_with_timeout(
           _run(), timeout=execution_timeout, on_timeout=_on_timeout
       )
   except asyncio.TimeoutError:
       # _on_timeout already set error_text/result_text
       pass
   ```

   This keeps all features (history save, status update, logging) but removes duplicated timeout/control-flow branches and makes the core logic easier to follow.

2. **Tighten the `SubAgentManager` coupling behind small helpers**  
   Several helpers repetitively reach into `SubAgentManager` (`_register_subagent_task`, `_is_managed_subagent`, `_handle_subagent_timeout`, parts of `_execute_handoff_background`). You can introduce a tiny, *local* façade instead of scattering direct imports:

   ```python
   class _SubagentRuntime:
       def __init__(self, umo: str, agent_name: str | None):
           from astrbot.core.subagent_manager import SubAgentManager
           self._mgr = SubAgentManager
           self.umo = umo
           self.agent_name = agent_name

       def is_managed(self) -> bool:
           if not self.agent_name:
               return False
           session = self._mgr.get_session(self.umo)
           return bool(session and self.agent_name in session.subagents)

       def set_status(self, status: str) -> None:
           if self.agent_name:
               self._mgr.set_subagent_status(
                   session_id=self.umo, agent_name=self.agent_name, status=status
               )

       def create_pending_task(self) -> str | None:
           # inner logic from _register_subagent_task, simplified…
           ...
   ```

   Then `_execute_handoff_background`, `_do_handoff_background`, `_handle_subagent_timeout`, `_is_managed_subagent`, and parts of `_handle_subagent_background_result` can operate through `_SubagentRuntime` instead of each doing their own session lookups/imports. This reduces the “spiderweb” coupling without changing behavior.

3. **Make `_save_subagent_history` harder to misuse**  
   In the timeout branch of `_execute_handoff` you currently call:

   ```python
   cls._save_subagent_history(agent_name, runner_messages, umo)
   ```

   while the signature is:

   ```python
   def _save_subagent_history(umo: str, runner_messages: list[Message], agent_name: str) -> None:
   ```

   To both fix argument order and reduce future complexity/bugs, make the helper keyword-only and always call it with names:

   ```python
   @staticmethod
   def _save_subagent_history(
       *, umo: str, runner_messages: list[Message], agent_name: str
   ) -> None:
       if agent_name and runner_messages:
           from astrbot.core.subagent_manager import SubAgentManager
           SubAgentManager.update_subagent_history(umo, agent_name, runner_messages)
   ```

   Usage:

   ```python
   cls._save_subagent_history(
       umo=umo, runner_messages=runner_messages, agent_name=agent_name
   )
   ```

   This keeps functionality identical but removes a subtle source of bugs and cognitive load.
</issue_to_address>

### Comment 6
<location path="astrbot/core/agent/runners/tool_loop_agent_runner.py" line_range="1020" />
<code_context>
                 if not req.func_tool:
                     return

+                # Prefer dynamic tools when available
+                func_tool = self._resolve_dynamic_subagent_tool(func_tool_name)
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting small helper methods to centralize tool resolution, session/handoff access, and interruption-message building so the main loop and abort logic stay simpler and more readable.

You can keep all the new behavior but reduce complexity and duplication by extracting a few small helpers and unifying the tool resolution path.

### 1. Unify tool resolution (avoid double-resolution + branching in-loop)

Right now `_execute_tool_calls` effectively computes `func_tool` multiple times:

- You first call `_resolve_dynamic_subagent_tool`.
- Then you re-run the old `skills_like` / `req.func_tool.get_tool` logic, potentially overwriting the dynamic result.

This both increases mental load and risks subtle bugs. A small helper that encapsulates all resolution rules keeps the loop simpler:

```python
def _get_func_tool(self, func_tool_name: str):
    # Prefer dynamic tools when available
    func_tool = self._resolve_dynamic_subagent_tool(func_tool_name)
    if func_tool is not None:
        return func_tool

    # Fallback to existing tool schema modes
    if (
        self.tool_schema_mode == "skills_like"
        and self._skill_like_raw_tool_set
    ):
        return self._skill_like_raw_tool_set.get_tool(func_tool_name)

    if self.req.func_tool:
        return self.req.func_tool.get_tool(func_tool_name)

    return None
```

Then in the main loop:

```python
if not req.func_tool:
    return

func_tool = self._get_func_tool(func_tool_name)
if func_tool is None:
    # keep existing error/reporting behavior
    ...
```

This keeps dynamic routing logic out of the loop body and centralizes the policy.

### 2. Deduplicate session / handoff access for dynamic tools

Both `_resolve_dynamic_subagent_tool` and `_maybe_register_dynamic_tool_from_result` manually walk `run_context.context.event` and import `SubAgentManager`. That coupling can be kept but hidden behind a narrow helper so the runner doesn’t repeat low-level details:

```python
def _get_session_handoffs(self):
    run_context_context = getattr(self.run_context, "context", None)
    event = getattr(run_context_context, "event", None) if run_context_context else None
    session_id = getattr(event, "unified_msg_origin", None) if event else None
    if not session_id:
        return None, []

    try:
        from astrbot.core.subagent_manager import SubAgentManager
        handoffs = SubAgentManager.get_handoff_tools_for_session(session_id)
    except Exception as e:
        logger.warning(f"[SubAgent] Failed to load dynamic handoffs: {e}")
        return session_id, []

    return session_id, handoffs
```

Use it in both places:

```python
def _resolve_dynamic_subagent_tool(self, func_tool_name: str):
    session_id, dynamic_handoffs = self._get_session_handoffs()
    if not session_id:
        return None
    for h in dynamic_handoffs:
        if h.name == func_tool_name or f"transfer_to_{h.name}" == func_tool_name:
            return h
    return None
```

```python
def _maybe_register_dynamic_tool_from_result(self, result_content: str) -> None:
    parse_result = self._parse_dynamic_tool_creation(result_content)
    if not parse_result:
        return
    new_tool_name, new_tool_obj_name = parse_result

    session_id, handoffs = self._get_session_handoffs()
    if not session_id:
        return

    for handoff in handoffs:
        if (
            handoff.name == new_tool_obj_name
            or handoff.name == new_tool_name.replace("transfer_to_", "")
        ):
            if self.req.func_tool:
                self.req.func_tool.add_tool(handoff)
            logger.info(f"[SubAgent] Added {handoff.name} to func_tool set")
            break
```

And the parser stays small and focused:

```python
def _parse_dynamic_tool_creation(self, result_content: str) -> tuple[str, str] | None:
    if not result_content.startswith("__DYNAMIC_TOOL_CREATED__:"):
        return None
    parts = result_content.split(":", 3)
    if len(parts) < 4:
        return None
    return parts[1], parts[2]  # new_tool_name, new_tool_obj_name
```

This keeps the magic-string protocol and `SubAgentManager` knowledge in one place, reducing complexity elsewhere.

### 3. Simplify abort-finalization branching

`_finalize_aborted_step` now mixes logging, message selection, and state transitions. The only real variability is the interruption message. You can make that explicit:

```python
def _build_interruption_message(self, manual_stop: bool) -> str:
    if manual_stop:
        return (
            "[SYSTEM: SubAgent was manually stopped by main agent. "
            "Partial output before interruption is preserved.]"
        )
    return self.USER_INTERRUPTION_MESSAGE
```

Then `_finalize_aborted_step` becomes easier to scan:

```python
async def _finalize_aborted_step(
    self,
    llm_resp: LLMResponse | None = None,
    manual_stop: bool = False,
) -> AgentResponse:
    logger.info(
        "SubAgent execution was manually stopped by main agent."
        if manual_stop
        else "Agent execution was requested to stop by user."
    )

    if llm_resp is None:
        llm_resp = LLMResponse(role="assistant", completion_text="")

    if llm_resp.role != "assistant":
        llm_resp = LLMResponse(
            role="assistant",
            completion_text=self._build_interruption_message(manual_stop),
        )

    self.final_llm_resp = llm_resp
    self._aborted = True
    self._transition_state(AgentState.DONE)
    self.stats.end_time = time.time()
    ...
```

This preserves the manual vs user-stop behavior while localizing the branching logic, so the method’s main flow (log → normalize llm_resp → finalize) is clearer.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread astrbot/core/astr_agent_tool_exec.py Outdated
Comment thread astrbot/core/subagent_manager.py
Comment thread astrbot/core/subagent_manager.py
return list(session.handoff_tools.values())

@classmethod
def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the background-task dict handling into a dedicated BackgroundTaskStore helper and moving path-safety checks into a shared utility to simplify SubAgentManager and CreateSubAgentTool without changing behavior.

You can reduce complexity in a few high‑impact, low‑risk spots without changing behavior. Two concrete refactors:


1. Encapsulate background task dict juggling in a BackgroundTaskStore

All of these methods manually juggle nested dicts on session.subagent_background_results / session.background_task_counters:

  • create_pending_subagent_task
  • _ensure_task_store
  • _is_task_completed
  • get_pending_subagent_tasks
  • get_latest_task_id
  • store_subagent_result
  • get_subagent_result
  • has_subagent_result
  • clear_subagent_result

You can move the dict logic into a small object per agent and keep SubAgentManager methods thin. This keeps functionality but drops a lot of low-level dict manipulation from the manager.

Example (core idea, minimal change):

# new helper, can live in same module initially
@dataclass
class BackgroundTaskStore:
    results: dict[str, SubAgentExecutionResult] = field(default_factory=dict)
    counter: int = 0

    def create_pending(self, agent_name: str) -> str:
        self.counter += 1
        task_id = str(self.counter)
        self.results[task_id] = SubAgentExecutionResult(
            task_id=task_id,
            agent_name=agent_name,
            success=False,
            result=None,
            created_at=time.time(),
            metadata={},
        )
        return task_id

    def pending_ids(self) -> list[str]:
        return [
            tid for tid, res in self.results.items()
            if res.completed_at <= 0 and res.error is None
        ]

    def latest_id(self) -> str | None:
        if not self.results:
            return None
        tid, _ = max(self.results.items(), key=lambda x: x[1].created_at)
        return tid

    def store_result(
        self,
        agent_name: str,
        success: bool,
        result: str,
        task_id: str | None = None,
        error: str | None = None,
        execution_time: float = 0.0,
        metadata: dict | None = None,
    ) -> None:
        if task_id is None:
            pending = self.pending_ids()
            if not pending:
                return
            task_id = pending[-1]

        if task_id not in self.results:
            self.results[task_id] = SubAgentExecutionResult(
                task_id=task_id,
                agent_name=agent_name,
                success=False,
                result="",
                created_at=time.time(),
                metadata=metadata or {},
            )

        r = self.results[task_id]
        r.success = success
        r.result = result
        r.error = error
        r.execution_time = execution_time
        r.completed_at = time.time()
        if metadata:
            r.metadata.update(metadata)

    def latest_completed(self) -> SubAgentExecutionResult | None:
        completed = [
            r for r in self.results.values()
            if r.result != "" or r.completed_at > 0
        ]
        if not completed:
            return None
        return max(completed, key=lambda r: r.created_at)

    def get(self, task_id: str) -> SubAgentExecutionResult | None:
        return self.results.get(task_id)

    def has_result(self, task_id: str | None = None) -> bool:
        if task_id is None:
            return any(r.result != "" or r.completed_at > 0 for r in self.results.values())
        r = self.results.get(task_id)
        return bool(r and (r.result != "" or r.completed_at > 0))

    def clear(self, task_id: str | None = None) -> None:
        if task_id is None:
            self.results.clear()
            self.counter = 0
        else:
            self.results.pop(task_id, None)

Then in SubAgentSession:

@dataclass
class SubAgentSession:
    ...
    subagent_background_results: dict[str, BackgroundTaskStore] = field(default_factory=dict)

And SubAgentManager becomes mostly delegations:

@classmethod
def _get_task_store(cls, session: SubAgentSession, agent_name: str) -> BackgroundTaskStore:
    if agent_name not in session.subagent_background_results:
        session.subagent_background_results[agent_name] = BackgroundTaskStore()
    return session.subagent_background_results[agent_name]

@classmethod
def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
    session = cls._get_or_create_session(session_id)
    if session.subagent_status[agent_name] == "RUNNING":
        return f"__PENDING_TASK_CREATE_FAILED__: Subagent {agent_name} already running"
    store = cls._get_task_store(session, agent_name)
    return store.create_pending(agent_name)

@classmethod
def get_pending_subagent_tasks(cls, session_id: str, agent_name: str) -> list[str]:
    session = cls.get_session(session_id)
    if not session:
        return []
    return cls._get_task_store(session, agent_name).pending_ids()

@classmethod
def store_subagent_result(...):
    session = cls._get_or_create_session(session_id)
    store = cls._get_task_store(session, agent_name)
    store.store_result(agent_name, success, result, task_id, error, execution_time, metadata)

@classmethod
def get_subagent_result(...):
    session = cls.get_session(session_id)
    if not session or agent_name not in session.subagent_background_results:
        return None
    store = cls._get_task_store(session, agent_name)
    return store.get(task_id) if task_id is not None else store.latest_completed()

@classmethod
def has_subagent_result(...):
    session = cls.get_session(session_id)
    if not session:
        return False
    store = cls._get_task_store(session, agent_name)
    return store.has_result(task_id)

@classmethod
def clear_subagent_result(...):
    session = cls.get_session(session_id)
    if not session:
        return
    store = cls._get_task_store(session, agent_name)
    store.clear(task_id)

This keeps the same external API and behavior but substantially shrinks the manager’s internal complexity around background tasks.


2. Extract _check_path_safety into a shared utility

CreateSubAgentTool._check_path_safety mixes OS‑specific logic into the tool class and duplicates environment concerns in this already-large module. You can move it to a separate helper (same module or utils/path_safety.py) and reuse it elsewhere if needed.

Example:

# new file: astrbot/core/utils/path_safety.py
import os
import platform

def is_safe_workdir(path_str: str) -> bool:
    if not path_str or not isinstance(path_str, str):
        return False
    if not os.path.isabs(path_str):
        return False
    try:
        resolved = os.path.realpath(path_str)
    except (OSError, ValueError):
        return False

    path_parts = {part.lower() for part in os.path.normpath(resolved).split(os.sep)}
    system = platform.system().lower()

    if system == "windows":
        dangerous = {
            "windows", "system32", "syswow64", "boot", "recovery",
            "programdata", "$recycle.bin", "system volume information",
        }
        if path_parts & dangerous:
            return False
    elif system == "linux":
        for prefix in ["/etc", "/bin", "/sbin", "/lib", "/lib64",
                       "/boot", "/dev", "/proc", "/sys", "/root"]:
            if resolved == prefix or resolved.startswith(prefix + os.sep):
                return False
    elif system == "darwin":
        for prefix in ["/System", "/Library", "/private/var", "/usr"]:
            if resolved == prefix or resolved.startswith(prefix + os.sep):
                return False

    if ".." in path_str:
        return False
    if not os.path.exists(resolved):
        return False
    return True

Then CreateSubAgentTool becomes simpler and more testable:

from astrbot.core.utils.path_safety import is_safe_workdir

@dataclass
class CreateSubAgentTool(FunctionTool):
    ...
    async def call(self, context, **kwargs) -> str:
        ...
        workdir = kwargs.get("workdir")
        if not workdir or not is_safe_workdir(workdir):
            workdir = get_astrbot_temp_path()
        ...

This removes system/path concerns from the tool class and centralizes safety logic in one place.

Comment thread astrbot/core/astr_agent_tool_exec.py
if not req.func_tool:
return

# Prefer dynamic tools when available
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting small helper methods to centralize tool resolution, session/handoff access, and interruption-message building so the main loop and abort logic stay simpler and more readable.

You can keep all the new behavior but reduce complexity and duplication by extracting a few small helpers and unifying the tool resolution path.

1. Unify tool resolution (avoid double-resolution + branching in-loop)

Right now _execute_tool_calls effectively computes func_tool multiple times:

  • You first call _resolve_dynamic_subagent_tool.
  • Then you re-run the old skills_like / req.func_tool.get_tool logic, potentially overwriting the dynamic result.

This both increases mental load and risks subtle bugs. A small helper that encapsulates all resolution rules keeps the loop simpler:

def _get_func_tool(self, func_tool_name: str):
    # Prefer dynamic tools when available
    func_tool = self._resolve_dynamic_subagent_tool(func_tool_name)
    if func_tool is not None:
        return func_tool

    # Fallback to existing tool schema modes
    if (
        self.tool_schema_mode == "skills_like"
        and self._skill_like_raw_tool_set
    ):
        return self._skill_like_raw_tool_set.get_tool(func_tool_name)

    if self.req.func_tool:
        return self.req.func_tool.get_tool(func_tool_name)

    return None

Then in the main loop:

if not req.func_tool:
    return

func_tool = self._get_func_tool(func_tool_name)
if func_tool is None:
    # keep existing error/reporting behavior
    ...

This keeps dynamic routing logic out of the loop body and centralizes the policy.

2. Deduplicate session / handoff access for dynamic tools

Both _resolve_dynamic_subagent_tool and _maybe_register_dynamic_tool_from_result manually walk run_context.context.event and import SubAgentManager. That coupling can be kept but hidden behind a narrow helper so the runner doesn’t repeat low-level details:

def _get_session_handoffs(self):
    run_context_context = getattr(self.run_context, "context", None)
    event = getattr(run_context_context, "event", None) if run_context_context else None
    session_id = getattr(event, "unified_msg_origin", None) if event else None
    if not session_id:
        return None, []

    try:
        from astrbot.core.subagent_manager import SubAgentManager
        handoffs = SubAgentManager.get_handoff_tools_for_session(session_id)
    except Exception as e:
        logger.warning(f"[SubAgent] Failed to load dynamic handoffs: {e}")
        return session_id, []

    return session_id, handoffs

Use it in both places:

def _resolve_dynamic_subagent_tool(self, func_tool_name: str):
    session_id, dynamic_handoffs = self._get_session_handoffs()
    if not session_id:
        return None
    for h in dynamic_handoffs:
        if h.name == func_tool_name or f"transfer_to_{h.name}" == func_tool_name:
            return h
    return None
def _maybe_register_dynamic_tool_from_result(self, result_content: str) -> None:
    parse_result = self._parse_dynamic_tool_creation(result_content)
    if not parse_result:
        return
    new_tool_name, new_tool_obj_name = parse_result

    session_id, handoffs = self._get_session_handoffs()
    if not session_id:
        return

    for handoff in handoffs:
        if (
            handoff.name == new_tool_obj_name
            or handoff.name == new_tool_name.replace("transfer_to_", "")
        ):
            if self.req.func_tool:
                self.req.func_tool.add_tool(handoff)
            logger.info(f"[SubAgent] Added {handoff.name} to func_tool set")
            break

And the parser stays small and focused:

def _parse_dynamic_tool_creation(self, result_content: str) -> tuple[str, str] | None:
    if not result_content.startswith("__DYNAMIC_TOOL_CREATED__:"):
        return None
    parts = result_content.split(":", 3)
    if len(parts) < 4:
        return None
    return parts[1], parts[2]  # new_tool_name, new_tool_obj_name

This keeps the magic-string protocol and SubAgentManager knowledge in one place, reducing complexity elsewhere.

3. Simplify abort-finalization branching

_finalize_aborted_step now mixes logging, message selection, and state transitions. The only real variability is the interruption message. You can make that explicit:

def _build_interruption_message(self, manual_stop: bool) -> str:
    if manual_stop:
        return (
            "[SYSTEM: SubAgent was manually stopped by main agent. "
            "Partial output before interruption is preserved.]"
        )
    return self.USER_INTERRUPTION_MESSAGE

Then _finalize_aborted_step becomes easier to scan:

async def _finalize_aborted_step(
    self,
    llm_resp: LLMResponse | None = None,
    manual_stop: bool = False,
) -> AgentResponse:
    logger.info(
        "SubAgent execution was manually stopped by main agent."
        if manual_stop
        else "Agent execution was requested to stop by user."
    )

    if llm_resp is None:
        llm_resp = LLMResponse(role="assistant", completion_text="")

    if llm_resp.role != "assistant":
        llm_resp = LLMResponse(
            role="assistant",
            completion_text=self._build_interruption_message(manual_stop),
        )

    self.final_llm_resp = llm_resp
    self._aborted = True
    self._transition_state(AgentState.DONE)
    self.stats.end_time = time.time()
    ...

This preserves the manual vs user-stop behavior while localizing the branching logic, so the method’s main flow (log → normalize llm_resp → finalize) is clearer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] 增强版的SubAgent功能

1 participant