From 6199e1169f18d0a8d0cae72c38c162791775018b Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Thu, 23 Apr 2026 22:41:23 -0700 Subject: [PATCH 1/4] fix: handle credentials lifecycle in BigQuery analytics plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes several issues introduced by the credentials parameter addition (34713fb4): - Clear _credentials in __getstate__ so non-picklable credential types (e.g., compute_engine.Credentials with requests.Session) don't break Vertex AI Agent Engine deployment via cloudpickle. Credentials are re-resolved from the user-provided value or ADC on unpickle. - Clear _credentials in _reset_runtime_state so stale HTTP transport state in credential objects doesn't carry across fork boundaries. - Restore lazy storage.Client creation — only construct it when explicit credentials are provided; otherwise let GCSOffloader create a default client on first use. Avoids unnecessary GCP client initialization when GCS offloading is not triggered. - Document the benign race on _credentials resolution when multiple event loops call _create_loop_state concurrently. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../bigquery_agent_analytics_plugin.py | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 7183fdfc6b..5b71349346 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -1988,6 +1988,7 @@ def __init__( self._startup_error: Optional[Exception] = None self._is_shutting_down = False self._setup_lock = None + self._user_credentials = credentials self._credentials = credentials self.client = None self._loop_state_by_loop: dict[asyncio.AbstractEventLoop, _LoopState] = {} @@ -2106,6 +2107,10 @@ def get_credentials(): ) return creds + # Note: this read-then-write is not locked. If two event loops + # race here both will resolve ADC and write back the same creds. + # This is benign — the result is idempotent — so we accept the + # race rather than adding a lock for a one-time init path. if self._credentials is None: self._credentials = await loop.run_in_executor( self._executor, get_credentials @@ -2196,13 +2201,17 @@ async def _lazy_setup(self, **kwargs) -> None: self.offloader = None if self.config.gcs_bucket_name: + gcs_client = None + if self._credentials is not None: + gcs_client = storage.Client( + project=self.project_id, + credentials=self._credentials, + ) self.offloader = GCSOffloader( self.project_id, self.config.gcs_bucket_name, self._executor, - storage_client=storage.Client( - project=self.project_id, credentials=self._credentials - ), + storage_client=gcs_client, ) self.parser = HybridContentParser( @@ -2536,6 +2545,10 @@ def __getstate__(self): state["_startup_error"] = None state["_is_shutting_down"] = False state["_init_pid"] = 0 + # Credential objects may hold non-picklable transport state + # (e.g., requests.Session in compute_engine.Credentials). + # Clear and re-resolve from _user_credentials on unpickle. + state["_credentials"] = None return state def __setstate__(self, state): @@ -2543,6 +2556,12 @@ def __setstate__(self, state): # Backfill keys that may be absent in pickled state from older # code versions so _ensure_started does not raise AttributeError. state.setdefault("_init_pid", 0) + state.setdefault("_user_credentials", None) + # Restore _credentials from _user_credentials (if user provided + # them); ADC-resolved credentials will be re-resolved on next + # _create_loop_state call. + if state.get("_credentials") is None: + state["_credentials"] = state.get("_user_credentials") self.__dict__.update(state) def _reset_runtime_state(self) -> None: @@ -2597,6 +2616,9 @@ def _reset_runtime_state(self) -> None: self._startup_error = None self._is_shutting_down = False self._init_pid = os.getpid() + # Credentials may hold stale HTTP transport state after fork. + # Re-resolve from _user_credentials on next _create_loop_state. + self._credentials = self._user_credentials async def __aenter__(self) -> BigQueryAgentAnalyticsPlugin: await self._ensure_started() From babf2a7413b40aa5a851d3a099b81a991307bd86 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Thu, 23 Apr 2026 22:49:55 -0700 Subject: [PATCH 2/4] fix: clear both _credentials and _user_credentials on pickle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review feedback: - Non-picklable user-provided credentials (e.g., compute_engine with requests.Session) broke pickle.dumps() because _user_credentials was preserved in __getstate__. Now both _credentials and _user_credentials are cleared; credentials re-resolve via ADC after unpickle. - Fork safety: _reset_runtime_state documents that user-provided credentials are kept as-is (we cannot re-create them), while ADC-resolved credentials are cleared for re-resolution. - GCS client: always pass credentials explicitly when available; GCSOffloader.__init__ always creates a client eagerly (the "lazy restoration" claim in the previous commit was incorrect — the pre-credentials code had the same eager behavior). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../bigquery_agent_analytics_plugin.py | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 5b71349346..55de35c5f3 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -2201,17 +2201,18 @@ async def _lazy_setup(self, **kwargs) -> None: self.offloader = None if self.config.gcs_bucket_name: - gcs_client = None + # GCSOffloader always creates a storage.Client eagerly + # (line 1329: storage_client or storage.Client(...)). + # Pass credentials so it uses the same auth as the other + # clients; omit when None to let it use ADC. + gcs_kwargs = {"project": self.project_id} if self._credentials is not None: - gcs_client = storage.Client( - project=self.project_id, - credentials=self._credentials, - ) + gcs_kwargs["credentials"] = self._credentials self.offloader = GCSOffloader( self.project_id, self.config.gcs_bucket_name, self._executor, - storage_client=gcs_client, + storage_client=storage.Client(**gcs_kwargs), ) self.parser = HybridContentParser( @@ -2547,8 +2548,10 @@ def __getstate__(self): state["_init_pid"] = 0 # Credential objects may hold non-picklable transport state # (e.g., requests.Session in compute_engine.Credentials). - # Clear and re-resolve from _user_credentials on unpickle. + # Clear both so pickle succeeds regardless of credential type. + # After unpickle, credentials are re-resolved via ADC. state["_credentials"] = None + state["_user_credentials"] = None return state def __setstate__(self, state): @@ -2557,11 +2560,10 @@ def __setstate__(self, state): # code versions so _ensure_started does not raise AttributeError. state.setdefault("_init_pid", 0) state.setdefault("_user_credentials", None) - # Restore _credentials from _user_credentials (if user provided - # them); ADC-resolved credentials will be re-resolved on next + state.setdefault("_credentials", None) + # Both _credentials and _user_credentials are cleared during + # pickle. Credentials will be re-resolved via ADC on the next # _create_loop_state call. - if state.get("_credentials") is None: - state["_credentials"] = state.get("_user_credentials") self.__dict__.update(state) def _reset_runtime_state(self) -> None: @@ -2616,8 +2618,10 @@ def _reset_runtime_state(self) -> None: self._startup_error = None self._is_shutting_down = False self._init_pid = os.getpid() - # Credentials may hold stale HTTP transport state after fork. - # Re-resolve from _user_credentials on next _create_loop_state. + # For ADC-resolved credentials, clear so they are re-resolved + # in the child process. For user-provided credentials, keep + # the original object — we cannot re-create it. The user is + # responsible for providing fork-safe credentials if needed. self._credentials = self._user_credentials async def __aenter__(self) -> BigQueryAgentAnalyticsPlugin: From db11dbe25349e4a69081018e7942e5a3b7c7d8c0 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Thu, 23 Apr 2026 23:02:02 -0700 Subject: [PATCH 3/4] fix: preserve picklable credentials, drop only non-picklable ones MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit cleared both _credentials and _user_credentials unconditionally in __getstate__, silently dropping picklable user credentials (service-account, AnonymousCredentials) after unpickle. This caused the plugin to fall back to ADC instead of the user's configured identity. Now __getstate__ tests whether _user_credentials is picklable: - Picklable: preserved — survives pickle and restored on unpickle - Non-picklable: dropped gracefully — falls back to ADC Adds two tests covering both paths. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../bigquery_agent_analytics_plugin.py | 24 ++++++--- .../test_bigquery_agent_analytics_plugin.py | 50 +++++++++++++++++++ 2 files changed, 66 insertions(+), 8 deletions(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 55de35c5f3..311ec46985 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -2546,12 +2546,18 @@ def __getstate__(self): state["_startup_error"] = None state["_is_shutting_down"] = False state["_init_pid"] = 0 - # Credential objects may hold non-picklable transport state - # (e.g., requests.Session in compute_engine.Credentials). - # Clear both so pickle succeeds regardless of credential type. - # After unpickle, credentials are re-resolved via ADC. + # _credentials is always runtime-resolved; clear unconditionally. state["_credentials"] = None - state["_user_credentials"] = None + # Preserve _user_credentials if they are picklable (e.g., + # service-account, AnonymousCredentials). Drop only when + # pickle would fail (e.g., compute_engine.Credentials holding + # a requests.Session). + import pickle as _pickle + + try: + _pickle.dumps(state.get("_user_credentials")) + except Exception: + state["_user_credentials"] = None return state def __setstate__(self, state): @@ -2561,9 +2567,11 @@ def __setstate__(self, state): state.setdefault("_init_pid", 0) state.setdefault("_user_credentials", None) state.setdefault("_credentials", None) - # Both _credentials and _user_credentials are cleared during - # pickle. Credentials will be re-resolved via ADC on the next - # _create_loop_state call. + # Restore _credentials from _user_credentials if available so + # _create_loop_state uses the user's identity. When both are + # None (non-picklable credentials were dropped), ADC is used. + if state["_credentials"] is None and state["_user_credentials"]: + state["_credentials"] = state["_user_credentials"] self.__dict__.update(state) def _reset_runtime_state(self) -> None: diff --git a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py index 05b8976da2..3b723b4ceb 100644 --- a/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py +++ b/tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py @@ -2093,6 +2093,56 @@ async def test_pickle_safety(self, mock_auth_default, mock_bq_client): finally: await plugin.shutdown() + @pytest.mark.asyncio + async def test_pickle_preserves_picklable_credentials( + self, mock_auth_default, mock_bq_client + ): + """Picklable user credentials survive pickle/unpickle.""" + import pickle + + picklable_creds = FakeCredentials() + plugin = bigquery_agent_analytics_plugin.BigQueryAgentAnalyticsPlugin( + PROJECT_ID, + DATASET_ID, + table_id=TABLE_ID, + credentials=picklable_creds, + ) + pickled = pickle.dumps(plugin) + unpickled = pickle.loads(pickled) + # User-provided picklable credentials are preserved. + assert unpickled._user_credentials is not None + assert unpickled._credentials is not None + await plugin.shutdown() + + @pytest.mark.asyncio + async def test_pickle_drops_non_picklable_credentials( + self, mock_auth_default, mock_bq_client + ): + """Non-picklable user credentials are dropped gracefully.""" + import pickle + + class NonPicklableCreds(google.auth.credentials.Credentials): + + def refresh(self, request): + pass + + def __getstate__(self): + raise TypeError("cannot pickle") + + plugin = bigquery_agent_analytics_plugin.BigQueryAgentAnalyticsPlugin( + PROJECT_ID, + DATASET_ID, + table_id=TABLE_ID, + credentials=NonPicklableCreds(), + ) + # Should not raise — non-picklable credentials are dropped. + pickled = pickle.dumps(plugin) + unpickled = pickle.loads(pickled) + # Credentials fall back to None (ADC on next use). + assert unpickled._user_credentials is None + assert unpickled._credentials is None + await plugin.shutdown() + @pytest.mark.asyncio async def test_span_hierarchy_llm_call( self, From 3b1ad6e27068c10bd2646e1a367dae08cc20f677 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Thu, 23 Apr 2026 23:08:12 -0700 Subject: [PATCH 4/4] fix: use 'is not None' for credential restore in __setstate__ A valid picklable credential subclass that defines __bool__ as False would not be restored into _credentials, causing silent ADC fallback. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/google/adk/plugins/bigquery_agent_analytics_plugin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 311ec46985..ea29d1a18c 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -2570,7 +2570,7 @@ def __setstate__(self, state): # Restore _credentials from _user_credentials if available so # _create_loop_state uses the user's identity. When both are # None (non-picklable credentials were dropped), ADC is used. - if state["_credentials"] is None and state["_user_credentials"]: + if state["_credentials"] is None and state["_user_credentials"] is not None: state["_credentials"] = state["_user_credentials"] self.__dict__.update(state)