From 79ef4dd7a0ab1b8bb1151f5e16124ec5a947dfd4 Mon Sep 17 00:00:00 2001 From: Prassanna Ravishankar Date: Tue, 14 Apr 2026 17:20:19 +0100 Subject: [PATCH] fix: Temporal Union deserialization causing tool_response messages to be lost Temporal's payload converter deserializes Union types by trying each variant in order. ToolResponseContent was silently misdeserialized as TextContent (both share 'author' and 'content' fields), creating text messages instead of tool_response messages in the database. Fix: hooks now pass .model_dump() dicts to the activity, and the activity reconstructs the correct Pydantic model using the 'type' discriminator. Also fix test polling to handle the DONE/tool_response ordering race condition. --- .../tests/test_agent.py | 10 +++- .../tests/test_agent.py | 10 +++- .../plugins/openai_agents/hooks/activities.py | 57 ++++++++++--------- .../plugins/openai_agents/hooks/hooks.py | 8 +-- 4 files changed, 50 insertions(+), 35 deletions(-) diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py index 8bf59cafc..e5f2982f9 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py @@ -111,6 +111,9 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): # Track tool_response messages (get_weather result) if message.content and message.content.type == "tool_response": seen_tool_response = True + # If we already saw DONE but were waiting for tool_response, exit now + if final_message and getattr(final_message, "streaming_status", None) == "DONE": + break # Track agent text messages and their streaming updates if message.content and message.content.type == "text" and message.content.author == "agent": @@ -118,9 +121,12 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): content_length = len(str(agent_text)) final_message = message - # Stop when we get DONE status + # Stop when we get DONE with content, but only if tool_response + # is already visible. The DONE text can be persisted before the + # lifecycle activity persists tool_response to the message list. if message.streaming_status == "DONE" and content_length > 0: - break + if not seen_tool_request or seen_tool_response: + break # Verify we got all the expected pieces assert seen_tool_request, "Expected to see tool_request message (agent calling get_weather)" diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py index 3377c1ea8..3a0386ffb 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py @@ -143,15 +143,21 @@ async def test_send_event_and_poll_with_human_approval(self, client: AsyncAgente # Track tool_response messages (child workflow completion) if message.content and message.content.type == "tool_response": seen_tool_response = True + # If we already saw DONE but were waiting for tool_response, exit now + if found_final_response: + break # Track agent text messages and their streaming updates if message.content and message.content.type == "text" and message.content.author == "agent": content_length = len(message.content.content) if message.content.content else 0 - # Stop when we get DONE status with actual content + # Stop when we get DONE with content, but only if tool_response + # is already visible. The DONE text can be persisted before the + # lifecycle activity persists tool_response to the message list. if message.streaming_status == "DONE" and content_length > 0: found_final_response = True - break + if not seen_tool_request or seen_tool_response: + break # Verify that we saw the complete flow: tool_request -> human approval -> tool_response -> final answer assert seen_tool_request, "Expected to see tool_request message (agent calling wait_for_confirmation)" diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/activities.py b/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/activities.py index bcd82385a..c65ae3c8b 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/activities.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/activities.py @@ -4,7 +4,7 @@ to the AgentEx UI, designed to work with TemporalStreamingHooks. """ -from typing import Union +from typing import Any, Dict from temporalio import activity @@ -12,16 +12,32 @@ from agentex.types.text_content import TextContent from agentex.types.task_message_update import StreamTaskMessageFull from agentex.types.task_message_content import ( - TaskMessageContent, ToolRequestContent, ToolResponseContent, ) +def _deserialize_content(data: Dict[str, Any]): + """Reconstruct the correct content type from a dict using the 'type' discriminator. + + Temporal's payload converter deserializes Union types by trying each variant + in order, which causes ToolResponseContent to be misdeserialized as TextContent + (both have 'author' and 'content' fields). This function uses the 'type' field + to pick the correct Pydantic model. + """ + content_type = data.get("type") + if content_type == "tool_request": + return ToolRequestContent.model_validate(data) + elif content_type == "tool_response": + return ToolResponseContent.model_validate(data) + else: + return TextContent.model_validate(data) + + @activity.defn(name="stream_lifecycle_content") async def stream_lifecycle_content( task_id: str, - content: Union[TextContent, ToolRequestContent, ToolResponseContent, TaskMessageContent], + content: Dict[str, Any], ) -> None: """Stream agent lifecycle content to the AgentEx UI. @@ -32,28 +48,16 @@ async def stream_lifecycle_content( Designed to work seamlessly with TemporalStreamingHooks. The hooks class will call this activity automatically when lifecycle events occur. + Note: The content parameter is a dict (not a typed Union) because Temporal's + payload converter misdeserializes Union types with overlapping fields. + The correct Pydantic model is reconstructed using the 'type' discriminator. + Args: task_id: The AgentEx task ID for routing the content to the correct UI session - content: The content to stream - can be any of: - - TextContent: Plain text messages (e.g., handoff notifications) - - ToolRequestContent: Tool invocation requests with call_id and name - - ToolResponseContent: Tool execution results with call_id and output - - TaskMessageContent: Generic task message content - - Example: - Register this activity with your Temporal worker:: - - from agentex.lib.core.temporal.plugins.openai_agents import ( - TemporalStreamingHooks, - stream_lifecycle_content, - ) - - # In your workflow - hooks = TemporalStreamingHooks( - task_id=params.task.id, - stream_activity=stream_lifecycle_content - ) - result = await Runner.run(agent, input, hooks=hooks) + content: Dict with a 'type' field that determines the content model: + - type="text": TextContent (plain text messages, handoff notifications) + - type="tool_request": ToolRequestContent (tool invocation with call_id) + - type="tool_response": ToolResponseContent (tool result with call_id) Note: This activity is non-blocking and will not throw exceptions to the workflow. @@ -61,18 +65,17 @@ async def stream_lifecycle_content( that streaming failures don't break the agent execution. """ try: + typed_content = _deserialize_content(content) async with adk.streaming.streaming_task_message_context( task_id=task_id, - initial_content=content, + initial_content=typed_content, ) as streaming_context: - # Send the content as a full message update await streaming_context.stream_update( StreamTaskMessageFull( parent_task_message=streaming_context.task_message, - content=content, + content=typed_content, type="full", ) ) except Exception as e: - # Log error but don't fail the activity - streaming failures shouldn't break execution activity.logger.warning(f"Failed to stream content to task {task_id}: {e}") diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py b/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py index 795d44a0a..0b3e03852 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py @@ -139,8 +139,8 @@ async def on_tool_start(self, context: RunContextWrapper, agent: Agent, tool: To author="agent", tool_call_id=tool_call_id, name=tool.name, - arguments=tool_arguments, # Now properly extracted from context - ), + arguments=tool_arguments, + ).model_dump(), ], start_to_close_timeout=self.timeout, ) @@ -176,7 +176,7 @@ async def on_tool_end( tool_call_id=tool_call_id, name=tool.name, content=result, - ), + ).model_dump(), ], start_to_close_timeout=self.timeout, ) @@ -203,7 +203,7 @@ async def on_handoff( author="agent", content=f"Handoff from {from_agent.name} to {to_agent.name}", type="text", - ), + ).model_dump(), ], start_to_close_timeout=self.timeout, )