diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py index 8bf59cafc..e5f2982f9 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py @@ -111,6 +111,9 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): # Track tool_response messages (get_weather result) if message.content and message.content.type == "tool_response": seen_tool_response = True + # If we already saw DONE but were waiting for tool_response, exit now + if final_message and getattr(final_message, "streaming_status", None) == "DONE": + break # Track agent text messages and their streaming updates if message.content and message.content.type == "text" and message.content.author == "agent": @@ -118,9 +121,12 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): content_length = len(str(agent_text)) final_message = message - # Stop when we get DONE status + # Stop when we get DONE with content, but only if tool_response + # is already visible. The DONE text can be persisted before the + # lifecycle activity persists tool_response to the message list. if message.streaming_status == "DONE" and content_length > 0: - break + if not seen_tool_request or seen_tool_response: + break # Verify we got all the expected pieces assert seen_tool_request, "Expected to see tool_request message (agent calling get_weather)" diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py index 3377c1ea8..3a0386ffb 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py @@ -143,15 +143,21 @@ async def test_send_event_and_poll_with_human_approval(self, client: AsyncAgente # Track tool_response messages (child workflow completion) if message.content and message.content.type == "tool_response": seen_tool_response = True + # If we already saw DONE but were waiting for tool_response, exit now + if found_final_response: + break # Track agent text messages and their streaming updates if message.content and message.content.type == "text" and message.content.author == "agent": content_length = len(message.content.content) if message.content.content else 0 - # Stop when we get DONE status with actual content + # Stop when we get DONE with content, but only if tool_response + # is already visible. The DONE text can be persisted before the + # lifecycle activity persists tool_response to the message list. if message.streaming_status == "DONE" and content_length > 0: found_final_response = True - break + if not seen_tool_request or seen_tool_response: + break # Verify that we saw the complete flow: tool_request -> human approval -> tool_response -> final answer assert seen_tool_request, "Expected to see tool_request message (agent calling wait_for_confirmation)" diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/activities.py b/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/activities.py index bcd82385a..c65ae3c8b 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/activities.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/activities.py @@ -4,7 +4,7 @@ to the AgentEx UI, designed to work with TemporalStreamingHooks. """ -from typing import Union +from typing import Any, Dict from temporalio import activity @@ -12,16 +12,32 @@ from agentex.types.text_content import TextContent from agentex.types.task_message_update import StreamTaskMessageFull from agentex.types.task_message_content import ( - TaskMessageContent, ToolRequestContent, ToolResponseContent, ) +def _deserialize_content(data: Dict[str, Any]): + """Reconstruct the correct content type from a dict using the 'type' discriminator. + + Temporal's payload converter deserializes Union types by trying each variant + in order, which causes ToolResponseContent to be misdeserialized as TextContent + (both have 'author' and 'content' fields). This function uses the 'type' field + to pick the correct Pydantic model. + """ + content_type = data.get("type") + if content_type == "tool_request": + return ToolRequestContent.model_validate(data) + elif content_type == "tool_response": + return ToolResponseContent.model_validate(data) + else: + return TextContent.model_validate(data) + + @activity.defn(name="stream_lifecycle_content") async def stream_lifecycle_content( task_id: str, - content: Union[TextContent, ToolRequestContent, ToolResponseContent, TaskMessageContent], + content: Dict[str, Any], ) -> None: """Stream agent lifecycle content to the AgentEx UI. @@ -32,28 +48,16 @@ async def stream_lifecycle_content( Designed to work seamlessly with TemporalStreamingHooks. The hooks class will call this activity automatically when lifecycle events occur. + Note: The content parameter is a dict (not a typed Union) because Temporal's + payload converter misdeserializes Union types with overlapping fields. + The correct Pydantic model is reconstructed using the 'type' discriminator. + Args: task_id: The AgentEx task ID for routing the content to the correct UI session - content: The content to stream - can be any of: - - TextContent: Plain text messages (e.g., handoff notifications) - - ToolRequestContent: Tool invocation requests with call_id and name - - ToolResponseContent: Tool execution results with call_id and output - - TaskMessageContent: Generic task message content - - Example: - Register this activity with your Temporal worker:: - - from agentex.lib.core.temporal.plugins.openai_agents import ( - TemporalStreamingHooks, - stream_lifecycle_content, - ) - - # In your workflow - hooks = TemporalStreamingHooks( - task_id=params.task.id, - stream_activity=stream_lifecycle_content - ) - result = await Runner.run(agent, input, hooks=hooks) + content: Dict with a 'type' field that determines the content model: + - type="text": TextContent (plain text messages, handoff notifications) + - type="tool_request": ToolRequestContent (tool invocation with call_id) + - type="tool_response": ToolResponseContent (tool result with call_id) Note: This activity is non-blocking and will not throw exceptions to the workflow. @@ -61,18 +65,17 @@ async def stream_lifecycle_content( that streaming failures don't break the agent execution. """ try: + typed_content = _deserialize_content(content) async with adk.streaming.streaming_task_message_context( task_id=task_id, - initial_content=content, + initial_content=typed_content, ) as streaming_context: - # Send the content as a full message update await streaming_context.stream_update( StreamTaskMessageFull( parent_task_message=streaming_context.task_message, - content=content, + content=typed_content, type="full", ) ) except Exception as e: - # Log error but don't fail the activity - streaming failures shouldn't break execution activity.logger.warning(f"Failed to stream content to task {task_id}: {e}") diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py b/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py index 795d44a0a..0b3e03852 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py @@ -139,8 +139,8 @@ async def on_tool_start(self, context: RunContextWrapper, agent: Agent, tool: To author="agent", tool_call_id=tool_call_id, name=tool.name, - arguments=tool_arguments, # Now properly extracted from context - ), + arguments=tool_arguments, + ).model_dump(), ], start_to_close_timeout=self.timeout, ) @@ -176,7 +176,7 @@ async def on_tool_end( tool_call_id=tool_call_id, name=tool.name, content=result, - ), + ).model_dump(), ], start_to_close_timeout=self.timeout, ) @@ -203,7 +203,7 @@ async def on_handoff( author="agent", content=f"Handoff from {from_agent.name} to {to_agent.name}", type="text", - ), + ).model_dump(), ], start_to_close_timeout=self.timeout, )