Skip to content

[fix](streaming-job) bound cdc_client RPCs with per-category timeouts#62870

Open
JNSimba wants to merge 1 commit intoapache:masterfrom
JNSimba:fix-doris-25420-cdc-rpc-timeout
Open

[fix](streaming-job) bound cdc_client RPCs with per-category timeouts#62870
JNSimba wants to merge 1 commit intoapache:masterfrom
JNSimba:fix-doris-25420-cdc-rpc-timeout

Conversation

@JNSimba
Copy link
Copy Markdown
Member

@JNSimba JNSimba commented Apr 27, 2026

What problem does this PR solve?

Issue Number: close #xxx

Related PR: #xxx

Problem Summary:

JdbcSourceOffsetProvider.cleanMeta() and several other cdc_client RPCs called future.get() with no timeout. When the cdc_client (or PG/MySQL behind it) hangs, the call blocks forever. For cleanMeta() this is fatal because it runs inside JobManager.dropJobInternal() while holding JobManager.writeLock() — any subsequent CREATE / DROP / SHOW JOB on streaming jobs is then serialized behind the dead lock, effectively freezing the streaming-job control plane.

Fix

Introduce two configurable timeouts (mirroring the BE brpc_light/heavy_work_pool naming) and apply them to all 8 cdc_client RPC call sites:

  • streaming_cdc_light_rpc_timeout_sec = 30 for /api/close, /api/compareOffset, /api/getTaskOffset, /api/getFailReason (expected sub-second).
  • streaming_cdc_heavy_rpc_timeout_sec = 600 for /api/initReader, /api/fetchSplits, /api/fetchEndOffset, /api/writeRecords (may legitimately take minutes for schema discovery / large snapshot splits).

Both configs are mutable = true so they can be tuned via ADMIN SET FRONTEND CONFIG without restarting FE.

BackendServiceClient.requestCdcClient gains a timeout overload that applies a gRPC withDeadlineAfter; the per-call-site future.get(...) also passes the same timeout so the deadline is enforced on both sides.

On timeout we WARN with a uniform line carrying api / jobId / backend / timeout_sec for easy log aggregation. cleanMeta keeps its existing swallow-on-failure semantics (a cleanup hiccup must not fail DROP JOB); the other seven sites throw JobException consistent with their existing ExecutionException handling.

Release note

Add streaming-job FE configs streaming_cdc_light_rpc_timeout_sec and streaming_cdc_heavy_rpc_timeout_sec to bound cdc_client RPCs.

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason: defensive timeout — main path behavior is unchanged, the new branch only triggers when cdc_client itself is misbehaving (which is hard to reproduce in CI).
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@Thearas
Copy link
Copy Markdown
Contributor

Thearas commented Apr 27, 2026

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 28, 2026

/review

@JNSimba JNSimba force-pushed the fix-doris-25420-cdc-rpc-timeout branch 2 times, most recently from 3d487cc to d462026 Compare April 28, 2026 02:34
### What problem does this PR solve?

Issue Number: close #xxx

Problem Summary:

`JdbcSourceOffsetProvider.cleanMeta()` and several other cdc_client RPCs
called `future.get()` with no timeout. When the cdc_client (or PG/MySQL
behind it) hangs, the call blocks forever. For `cleanMeta()` this is
fatal because it runs inside `JobManager.dropJobInternal()` while
holding `JobManager.writeLock()` — any subsequent CREATE / DROP / SHOW
JOB on streaming jobs is then serialized behind the dead lock,
effectively freezing the streaming-job control plane.

### Fix

Introduce two configurable timeouts (mirroring the BE
`brpc_light/heavy_work_pool` naming) and apply them to all 8 cdc_client
RPC call sites:

- `streaming_cdc_light_rpc_timeout_sec = 30` for
  `/api/close`, `/api/compareOffset`, `/api/getTaskOffset`,
  `/api/getFailReason` (expected sub-second).
- `streaming_cdc_heavy_rpc_timeout_sec = 600` for
  `/api/initReader`, `/api/fetchSplits`, `/api/fetchEndOffset`,
  `/api/writeRecords` (may take minutes for schema discovery / large
  snapshot splits).

Both configs are `mutable = true` so they can be tuned via
`ADMIN SET FRONTEND CONFIG` without restarting FE.

`BackendServiceClient.requestCdcClient` gains a timeout overload that
applies a gRPC `withDeadlineAfter`; the per-call site `future.get(...)`
also passes the same timeout so the deadline is enforced on both sides.

On timeout we WARN with a uniform line carrying api / jobId / backend /
timeout_sec for easy log aggregation. `cleanMeta` keeps its existing
swallow-on-failure semantics (a cleanup hiccup must not fail DROP JOB);
the other seven sites throw `JobException` consistent with their
existing `ExecutionException` handling.

### Release note

Add streaming-job FE configs `streaming_cdc_light_rpc_timeout_sec` and
`streaming_cdc_heavy_rpc_timeout_sec` to bound cdc_client RPCs.
@JNSimba JNSimba force-pushed the fix-doris-25420-cdc-rpc-timeout branch from d462026 to 403fefc Compare April 28, 2026 06:21
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 28, 2026

run buildall

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR prevents the streaming-job FE control plane from hanging indefinitely by bounding cdc_client RPC waits with configurable per-category timeouts, enforced both via gRPC deadlines and Future.get(...) timeouts.

Changes:

  • Add timeout-aware requestCdcClient overloads in BackendServiceClient/BackendServiceProxy using gRPC withDeadlineAfter.
  • Apply per-RPC timeouts (light vs heavy) and explicit future.get(timeout, ...) across streaming-job cdc_client call sites, with consistent timeout WARN logging.
  • Introduce new mutable FE configs for cdc_client RPC timeouts.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java Adds requestCdcClient(..., timeoutSec) overload to pass timeout to client.
fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java Adds timeout overload applying gRPC deadline for cdc_client RPCs.
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java Bounds /api/getTaskOffset wait with configurable timeout and adds timeout logging.
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java Bounds multiple cdc_client RPCs with configurable timeouts and adds timeout handling/logging.
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java Bounds /api/writeRecords and /api/getFailReason calls and adds timeout logging/handling.
fe/fe-common/src/main/java/org/apache/doris/common/Config.java Introduces new mutable FE configs for light/heavy cdc_client RPC timeouts.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +235 to +237
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec);
result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS);
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/api/fetchEndOffset is using streaming_cdc_light_rpc_timeout_sec for both the gRPC deadline and future.get(...). Per the PR description this endpoint is categorized as a heavy RPC (can legitimately take minutes), so it should use streaming_cdc_heavy_rpc_timeout_sec to avoid false timeouts on large snapshots/schema discovery.

Copilot uses AI. Check for mistakes.
Comment on lines +1158 to +1160
@ConfField(mutable = true, masterOnly = true)
public static int streaming_cdc_light_rpc_timeout_sec = 90;

Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new default for streaming_cdc_light_rpc_timeout_sec is set to 90s, but the PR description/release note says the light RPC timeout default should be 30s. Please align the default value (or update the PR description/release note) so operators get the intended behavior out of the box.

Copilot uses AI. Check for mistakes.
Comment on lines +373 to +375
} catch (TimeoutException te) {
log.warn("cdc_client RPC timeout api=/api/getFailReason taskId={} timeout_sec={}",
getTaskId(), Config.streaming_cdc_light_rpc_timeout_sec);
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout WARN logs are meant to be uniform for aggregation (api/jobId/backend/timeout_sec), but this timeout branch only logs taskId and timeout_sec and omits jobId/backend. Consider including jobId and backend={host:port} here (you can re-fetch the backend from runningBackendId in the catch) to keep the timeout logs consistent across RPCs.

Copilot uses AI. Check for mistakes.
Comment on lines +228 to +229
log.warn("cdc_client RPC timeout api=/api/getTaskOffset taskId={} backend={}:{} timeout_sec={}",
taskId, backend.getHost(), backend.getBrpcPort(),
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout log includes taskId and backend, but it omits jobId even though it’s available in this provider. Adding jobId would make the timeout WARN lines easier to aggregate consistently with the other cdc_client RPC sites.

Suggested change
log.warn("cdc_client RPC timeout api=/api/getTaskOffset taskId={} backend={}:{} timeout_sec={}",
taskId, backend.getHost(), backend.getBrpcPort(),
log.warn("cdc_client RPC timeout api=/api/getTaskOffset jobId={} taskId={} backend={}:{} timeout_sec={}",
jobId, taskId, backend.getHost(), backend.getBrpcPort(),

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants