From 04f11d1fccb94809fca5b66d462099d62bc97787 Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Tue, 14 Apr 2026 10:32:21 -0700 Subject: [PATCH] feat: Adds cancel_query_job to SDK for agent engine long running async tasks. PiperOrigin-RevId: 899660067 --- .../unit/vertexai/genai/test_agent_engines.py | 22 ++ vertexai/_genai/agent_engines.py | 192 ++++++++++++++++++ vertexai/_genai/types/__init__.py | 14 ++ vertexai/_genai/types/common.py | 72 +++++++ 4 files changed, 300 insertions(+) diff --git a/tests/unit/vertexai/genai/test_agent_engines.py b/tests/unit/vertexai/genai/test_agent_engines.py index 518d371e5e..55e240533b 100644 --- a/tests/unit/vertexai/genai/test_agent_engines.py +++ b/tests/unit/vertexai/genai/test_agent_engines.py @@ -3183,6 +3183,28 @@ def test_query_agent_engine_async(self): None, ) + def test_cancel_query_job_agent_engine(self): + with mock.patch.object( + self.client.agent_engines._api_client, "request" + ) as request_mock: + request_mock.return_value = genai_types.HttpResponse(body="{}") + + result = self.client.agent_engines.cancel_query_job( + name=_TEST_AGENT_ENGINE_RESOURCE_NAME, + config={"operation_name": _TEST_AGENT_ENGINE_OPERATION_NAME}, + ) + + assert isinstance(result, _genai_types.CancelQueryJobResult) + request_mock.assert_called_with( + "post", + f"{_TEST_AGENT_ENGINE_RESOURCE_NAME}:cancelAsyncQuery", + { + "_url": {"name": _TEST_AGENT_ENGINE_RESOURCE_NAME}, + "operationName": _TEST_AGENT_ENGINE_OPERATION_NAME, + }, + None, + ) + def test_check_query_job_agent_engine(self): with mock.patch.object( self.client.agent_engines._api_client, "request" diff --git a/vertexai/_genai/agent_engines.py b/vertexai/_genai/agent_engines.py index ea0a00b9e9..d75ef16910 100644 --- a/vertexai/_genai/agent_engines.py +++ b/vertexai/_genai/agent_engines.py @@ -76,6 +76,38 @@ def _AgentEngineOperation_from_vertex( return to_object +def _CancelQueryJobAgentEngineConfig_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + + if getv(from_object, ["operation_name"]) is not None: + setv(parent_object, ["operationName"], getv(from_object, ["operation_name"])) + + return to_object + + +def _CancelQueryJobAgentEngineRequestParameters_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + if getv(from_object, ["name"]) is not None: + setv(to_object, ["_url", "name"], getv(from_object, ["name"])) + + if getv(from_object, ["config"]) is not None: + setv( + to_object, + ["config"], + _CancelQueryJobAgentEngineConfig_to_vertex( + getv(from_object, ["config"]), to_object + ), + ) + + return to_object + + def _CheckQueryJobAgentEngineConfig_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, @@ -720,6 +752,85 @@ def _UpdateAgentEngineRequestParameters_to_vertex( class AgentEngines(_api_module.BaseModule): + def cancel_query_job( + self, + *, + name: str, + config: Optional[types.CancelQueryJobAgentEngineConfigOrDict] = None, + ) -> types.CancelQueryJobResult: + """ + Cancels a long-running query job on an Agent Engine. + + Args: + name (str): + Required. The reasoning engine resource name. + config (CancelQueryJobAgentEngineConfigOrDict): + Optional. The configuration for the cancel_query_job. + + """ + + parameter_model = types._CancelQueryJobAgentEngineRequestParameters( + name=name, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError("This method is only supported in the Vertex AI client.") + else: + request_dict = _CancelQueryJobAgentEngineRequestParameters_to_vertex( + parameter_model + ) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}:cancelAsyncQuery".format_map(request_url_dict) + else: + path = "{name}:cancelAsyncQuery" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = self._api_client.request("post", path, request_dict, http_options) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.CancelQueryJobResult._from_response( + response=response_dict, + kwargs=( + { + "config": { + "response_schema": getattr( + parameter_model.config, "response_schema", None + ), + "response_json_schema": getattr( + parameter_model.config, "response_json_schema", None + ), + "include_all_fields": getattr( + parameter_model.config, "include_all_fields", None + ), + } + } + if getattr(parameter_model, "config", None) + else {} + ), + ) + + self._api_client._verify_response(return_value) + return return_value + def _check_query_job( self, *, @@ -2971,6 +3082,87 @@ def list_session_events( class AsyncAgentEngines(_api_module.BaseModule): + async def cancel_query_job( + self, + *, + name: str, + config: Optional[types.CancelQueryJobAgentEngineConfigOrDict] = None, + ) -> types.CancelQueryJobResult: + """ + Cancels a long-running query job on an Agent Engine. + + Args: + name (str): + Required. The reasoning engine resource name. + config (CancelQueryJobAgentEngineConfigOrDict): + Optional. The configuration for the cancel_query_job. + + """ + + parameter_model = types._CancelQueryJobAgentEngineRequestParameters( + name=name, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError("This method is only supported in the Vertex AI client.") + else: + request_dict = _CancelQueryJobAgentEngineRequestParameters_to_vertex( + parameter_model + ) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}:cancelAsyncQuery".format_map(request_url_dict) + else: + path = "{name}:cancelAsyncQuery" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = await self._api_client.async_request( + "post", path, request_dict, http_options + ) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.CancelQueryJobResult._from_response( + response=response_dict, + kwargs=( + { + "config": { + "response_schema": getattr( + parameter_model.config, "response_schema", None + ), + "response_json_schema": getattr( + parameter_model.config, "response_json_schema", None + ), + "include_all_fields": getattr( + parameter_model.config, "include_all_fields", None + ), + } + } + if getattr(parameter_model, "config", None) + else {} + ), + ) + + self._api_client._verify_response(return_value) + return return_value + async def _check_query_job( self, *, diff --git a/vertexai/_genai/types/__init__.py b/vertexai/_genai/types/__init__.py index 07cc75b8c1..768abef4b5 100644 --- a/vertexai/_genai/types/__init__.py +++ b/vertexai/_genai/types/__init__.py @@ -26,6 +26,7 @@ from .common import _AppendAgentEngineTaskEventRequestParameters from .common import _AssembleDatasetParameters from .common import _AssessDatasetParameters +from .common import _CancelQueryJobAgentEngineRequestParameters from .common import _CheckQueryJobAgentEngineRequestParameters from .common import _CreateAgentEngineMemoryRequestParameters from .common import _CreateAgentEngineRequestParameters @@ -193,6 +194,12 @@ from .common import BleuResults from .common import BleuResultsDict from .common import BleuResultsOrDict +from .common import CancelQueryJobAgentEngineConfig +from .common import CancelQueryJobAgentEngineConfigDict +from .common import CancelQueryJobAgentEngineConfigOrDict +from .common import CancelQueryJobResult +from .common import CancelQueryJobResultDict +from .common import CancelQueryJobResultOrDict from .common import CandidateResponse from .common import CandidateResponseDict from .common import CandidateResponseOrDict @@ -1668,6 +1675,12 @@ "VertexBaseConfig", "VertexBaseConfigDict", "VertexBaseConfigOrDict", + "CancelQueryJobAgentEngineConfig", + "CancelQueryJobAgentEngineConfigDict", + "CancelQueryJobAgentEngineConfigOrDict", + "CancelQueryJobResult", + "CancelQueryJobResultDict", + "CancelQueryJobResultOrDict", "CheckQueryJobAgentEngineConfig", "CheckQueryJobAgentEngineConfigDict", "CheckQueryJobAgentEngineConfigOrDict", @@ -2367,6 +2380,7 @@ "_OptimizeRequestParameters", "_CustomJobParameters", "_GetCustomJobParameters", + "_CancelQueryJobAgentEngineRequestParameters", "_CheckQueryJobAgentEngineRequestParameters", "_RunQueryJobAgentEngineRequestParameters", "_CreateAgentEngineRequestParameters", diff --git a/vertexai/_genai/types/common.py b/vertexai/_genai/types/common.py index 04b2415ab0..c3ef22d4a1 100644 --- a/vertexai/_genai/types/common.py +++ b/vertexai/_genai/types/common.py @@ -6278,6 +6278,78 @@ class _GetCustomJobParametersDict(TypedDict, total=False): ] +class CancelQueryJobAgentEngineConfig(_common.BaseModel): + """Config for canceling async querying agent engines.""" + + http_options: Optional[genai_types.HttpOptions] = Field( + default=None, description="""Used to override HTTP request options.""" + ) + operation_name: Optional[str] = Field( + default=None, + description="""Name of the longrunning operation returned from run_query_job.""", + ) + + +class CancelQueryJobAgentEngineConfigDict(TypedDict, total=False): + """Config for canceling async querying agent engines.""" + + http_options: Optional[genai_types.HttpOptionsDict] + """Used to override HTTP request options.""" + + operation_name: Optional[str] + """Name of the longrunning operation returned from run_query_job.""" + + +CancelQueryJobAgentEngineConfigOrDict = Union[ + CancelQueryJobAgentEngineConfig, CancelQueryJobAgentEngineConfigDict +] + + +class _CancelQueryJobAgentEngineRequestParameters(_common.BaseModel): + """Parameters for canceling async querying agent engines.""" + + name: Optional[str] = Field( + default=None, description="""Name of the reasoning engine resource.""" + ) + config: Optional[CancelQueryJobAgentEngineConfig] = Field( + default=None, description="""""" + ) + + +class _CancelQueryJobAgentEngineRequestParametersDict(TypedDict, total=False): + """Parameters for canceling async querying agent engines.""" + + name: Optional[str] + """Name of the reasoning engine resource.""" + + config: Optional[CancelQueryJobAgentEngineConfigDict] + """""" + + +_CancelQueryJobAgentEngineRequestParametersOrDict = Union[ + _CancelQueryJobAgentEngineRequestParameters, + _CancelQueryJobAgentEngineRequestParametersDict, +] + + +class CancelQueryJobResult(_common.BaseModel): + """Result of canceling a query job.""" + + http_options: Optional[genai_types.HttpOptions] = Field( + default=None, description="""Used to override HTTP request options.""" + ) + + +class CancelQueryJobResultDict(TypedDict, total=False): + """Result of canceling a query job.""" + + http_options: Optional[genai_types.HttpOptionsDict] + """Used to override HTTP request options.""" + + +CancelQueryJobResultOrDict = Union[CancelQueryJobResult, CancelQueryJobResultDict] + + class CheckQueryJobAgentEngineConfig(_common.BaseModel): """Config for async querying agent engines."""