[fix](streaming-job) bound cdc_client RPCs with per-category timeouts#62870
[fix](streaming-job) bound cdc_client RPCs with per-category timeouts#62870JNSimba wants to merge 1 commit intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
3d487cc to
d462026
Compare
### What problem does this PR solve? Issue Number: close #xxx Problem Summary: `JdbcSourceOffsetProvider.cleanMeta()` and several other cdc_client RPCs called `future.get()` with no timeout. When the cdc_client (or PG/MySQL behind it) hangs, the call blocks forever. For `cleanMeta()` this is fatal because it runs inside `JobManager.dropJobInternal()` while holding `JobManager.writeLock()` — any subsequent CREATE / DROP / SHOW JOB on streaming jobs is then serialized behind the dead lock, effectively freezing the streaming-job control plane. ### Fix Introduce two configurable timeouts (mirroring the BE `brpc_light/heavy_work_pool` naming) and apply them to all 8 cdc_client RPC call sites: - `streaming_cdc_light_rpc_timeout_sec = 30` for `/api/close`, `/api/compareOffset`, `/api/getTaskOffset`, `/api/getFailReason` (expected sub-second). - `streaming_cdc_heavy_rpc_timeout_sec = 600` for `/api/initReader`, `/api/fetchSplits`, `/api/fetchEndOffset`, `/api/writeRecords` (may take minutes for schema discovery / large snapshot splits). Both configs are `mutable = true` so they can be tuned via `ADMIN SET FRONTEND CONFIG` without restarting FE. `BackendServiceClient.requestCdcClient` gains a timeout overload that applies a gRPC `withDeadlineAfter`; the per-call site `future.get(...)` also passes the same timeout so the deadline is enforced on both sides. On timeout we WARN with a uniform line carrying api / jobId / backend / timeout_sec for easy log aggregation. `cleanMeta` keeps its existing swallow-on-failure semantics (a cleanup hiccup must not fail DROP JOB); the other seven sites throw `JobException` consistent with their existing `ExecutionException` handling. ### Release note Add streaming-job FE configs `streaming_cdc_light_rpc_timeout_sec` and `streaming_cdc_heavy_rpc_timeout_sec` to bound cdc_client RPCs.
d462026 to
403fefc
Compare
|
run buildall |
There was a problem hiding this comment.
Pull request overview
This PR prevents the streaming-job FE control plane from hanging indefinitely by bounding cdc_client RPC waits with configurable per-category timeouts, enforced both via gRPC deadlines and Future.get(...) timeouts.
Changes:
- Add timeout-aware
requestCdcClientoverloads inBackendServiceClient/BackendServiceProxyusing gRPCwithDeadlineAfter. - Apply per-RPC timeouts (light vs heavy) and explicit
future.get(timeout, ...)across streaming-job cdc_client call sites, with consistent timeout WARN logging. - Introduce new mutable FE configs for cdc_client RPC timeouts.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java | Adds requestCdcClient(..., timeoutSec) overload to pass timeout to client. |
| fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java | Adds timeout overload applying gRPC deadline for cdc_client RPCs. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java | Bounds /api/getTaskOffset wait with configurable timeout and adds timeout logging. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java | Bounds multiple cdc_client RPCs with configurable timeouts and adds timeout handling/logging. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java | Bounds /api/writeRecords and /api/getFailReason calls and adds timeout logging/handling. |
| fe/fe-common/src/main/java/org/apache/doris/common/Config.java | Introduces new mutable FE configs for light/heavy cdc_client RPC timeouts. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance() | ||
| .requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec); | ||
| result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS); |
There was a problem hiding this comment.
/api/fetchEndOffset is using streaming_cdc_light_rpc_timeout_sec for both the gRPC deadline and future.get(...). Per the PR description this endpoint is categorized as a heavy RPC (can legitimately take minutes), so it should use streaming_cdc_heavy_rpc_timeout_sec to avoid false timeouts on large snapshots/schema discovery.
| @ConfField(mutable = true, masterOnly = true) | ||
| public static int streaming_cdc_light_rpc_timeout_sec = 90; | ||
|
|
There was a problem hiding this comment.
The new default for streaming_cdc_light_rpc_timeout_sec is set to 90s, but the PR description/release note says the light RPC timeout default should be 30s. Please align the default value (or update the PR description/release note) so operators get the intended behavior out of the box.
| } catch (TimeoutException te) { | ||
| log.warn("cdc_client RPC timeout api=/api/getFailReason taskId={} timeout_sec={}", | ||
| getTaskId(), Config.streaming_cdc_light_rpc_timeout_sec); |
There was a problem hiding this comment.
Timeout WARN logs are meant to be uniform for aggregation (api/jobId/backend/timeout_sec), but this timeout branch only logs taskId and timeout_sec and omits jobId/backend. Consider including jobId and backend={host:port} here (you can re-fetch the backend from runningBackendId in the catch) to keep the timeout logs consistent across RPCs.
| log.warn("cdc_client RPC timeout api=/api/getTaskOffset taskId={} backend={}:{} timeout_sec={}", | ||
| taskId, backend.getHost(), backend.getBrpcPort(), |
There was a problem hiding this comment.
This timeout log includes taskId and backend, but it omits jobId even though it’s available in this provider. Adding jobId would make the timeout WARN lines easier to aggregate consistently with the other cdc_client RPC sites.
| log.warn("cdc_client RPC timeout api=/api/getTaskOffset taskId={} backend={}:{} timeout_sec={}", | |
| taskId, backend.getHost(), backend.getBrpcPort(), | |
| log.warn("cdc_client RPC timeout api=/api/getTaskOffset jobId={} taskId={} backend={}:{} timeout_sec={}", | |
| jobId, taskId, backend.getHost(), backend.getBrpcPort(), |
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
JdbcSourceOffsetProvider.cleanMeta()and several other cdc_client RPCs calledfuture.get()with no timeout. When the cdc_client (or PG/MySQL behind it) hangs, the call blocks forever. ForcleanMeta()this is fatal because it runs insideJobManager.dropJobInternal()while holdingJobManager.writeLock()— any subsequentCREATE / DROP / SHOW JOBon streaming jobs is then serialized behind the dead lock, effectively freezing the streaming-job control plane.Fix
Introduce two configurable timeouts (mirroring the BE
brpc_light/heavy_work_poolnaming) and apply them to all 8 cdc_client RPC call sites:streaming_cdc_light_rpc_timeout_sec = 30for/api/close,/api/compareOffset,/api/getTaskOffset,/api/getFailReason(expected sub-second).streaming_cdc_heavy_rpc_timeout_sec = 600for/api/initReader,/api/fetchSplits,/api/fetchEndOffset,/api/writeRecords(may legitimately take minutes for schema discovery / large snapshot splits).Both configs are
mutable = trueso they can be tuned viaADMIN SET FRONTEND CONFIGwithout restarting FE.BackendServiceClient.requestCdcClientgains a timeout overload that applies a gRPCwithDeadlineAfter; the per-call-sitefuture.get(...)also passes the same timeout so the deadline is enforced on both sides.On timeout we WARN with a uniform line carrying
api / jobId / backend / timeout_secfor easy log aggregation.cleanMetakeeps its existing swallow-on-failure semantics (a cleanup hiccup must not failDROP JOB); the other seven sites throwJobExceptionconsistent with their existingExecutionExceptionhandling.Release note
Add streaming-job FE configs
streaming_cdc_light_rpc_timeout_secandstreaming_cdc_heavy_rpc_timeout_secto bound cdc_client RPCs.Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)