Skip to content

[improvement](maxcompute) Simplify FE block ID requests for MaxCompute writes#62880

Open
hubgeter wants to merge 2 commits intoapache:masterfrom
hubgeter:change_mc_write_block_id
Open

[improvement](maxcompute) Simplify FE block ID requests for MaxCompute writes#62880
hubgeter wants to merge 2 commits intoapache:masterfrom
hubgeter:change_mc_write_block_id

Conversation

@hubgeter
Copy link
Copy Markdown
Contributor

What problem does this PR solve?

Related PR: #62578

  1. PR [opt](maxcompute)Allocate write block ids from FE and add catalog-level write_max_block_bytes prop. #62578 moved MaxCompute write block ID allocation from BE-local counters to
    Instead of calling FE through the BE JNI C++ bridge:

MaxCompute connector Java -> BE JNI C++ -> FE
the MaxCompute connector now requests FE directly through thrift:
MaxCompute connector Java -> FE

A new MaxComputeFeClient is added under the MaxCompute connector to handle FE
methods.

  1. Removes the hardcoded MAX_BLOCK_COUNT variable from
    fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
    and moves it to the FE config max_compute_write_max_block_count
    The default value is still 20000, so the existing behavior is preserved.

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@hubgeter
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found two issues that should be addressed before this PR merges.

  1. The new max_compute_write_max_block_count setting turns the previously hard FE-side block-id guard into an unconstrained runtime knob. If this is set above MaxCompute's actual per-session block cap, FE will now allocate out-of-contract block IDs and push the failure to the writer/runtime path.
  2. MaxComputeFeClient now opens a fresh thrift socket for every block-id request. Because block IDs are requested on every writer rotation, this regresses the hot write path and increases FE connection churn compared with the old cached-client path.

Critical checkpoint conclusions:

  • Goal of the task: move MaxCompute block-id allocation from the BE JNI native bridge into the Java connector and keep the FE allocation semantics. The refactor mostly achieves that, and MaxComputeFeClientTest covers basic success/redirect/error retry behavior, but the two issues above mean the new path is not yet equivalent/safe enough.
  • Change scope: the JNI cleanup and direct FE client are reasonably focused, but the config change also widens a previously hard-enforced guard.
  • Concurrency: no new cross-thread locking issue was found in the touched code. MCTransaction.allocateBlockIdRange() still relies on AtomicLong, and the new FE client is used per writer instance.
  • Lifecycle/static init: no special lifecycle or static-initialization issue found in the reviewed paths.
  • Configuration: a new FE config was added. Its current shape is problematic because it makes the external block-id cap operator-configurable without preserving the hard upper bound.
  • Compatibility/incompatible change: no new storage-format or FE/BE thrift compatibility issue was found in the touched fields.
  • Parallel code paths: I did not find another live MaxCompute block-id allocation path after the JNI method removal.
  • Special conditions: NOT_MASTER redirect handling is still present, but the transport implementation changed enough to introduce the connection-churn regression above.
  • Test coverage: good unit coverage for the new redirect/retry logic; missing coverage for invalid block-count configuration and repeated block-allocation hot-path behavior.
  • Test result updates: none in this PR.
  • Observability: existing logs are sufficient for debugging this path.
  • Transaction/persistence/data-write correctness: block IDs are still allocated from FE transaction state, but allowing the limit to exceed the external cap can break write correctness.
  • Performance: per-request socket creation on block allocation is a hot-path regression.
  • Other issues: no additional distinct issues found.
  • User focus: /tmp/review_focus.txt contained no additional review focus, so there were no extra focus-specific findings.

*/
@ConfField(mutable = true, masterOnly = true, description = {
"Maximum number of MaxCompute Storage API write block IDs that can be allocated in one write session."})
public static long max_compute_write_max_block_count = 20000L;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #62578 made 20000 the FE-side guard for MaxCompute's per-session block-id cap. After turning that into an unconstrained config here, setting max_compute_write_max_block_count > 20000 lets allocateBlockIdRange() hand out block IDs that the previous FE-side guard explicitly refused. That moves the failure from deterministic FE validation to a later write-time error after data has already been produced. Can we keep 20000 as a hard upper bound and only make a lower soft cap configurable (or validate/clamp the config value before it is used)?


private static <T> T callFe(TNetworkAddress address, int timeoutMs,
boolean useFramedTransport, FeCall<T> call) throws Exception {
TSocket socket = new TSocket(address.getHostname(), address.getPort(), timeoutMs);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this refactor, block-id allocation went through ThriftRpcHelper::rpc<FrontendServiceClient>(), so repeated requests reused Doris' FE client cache. callFe() now creates and closes a brand-new TSocket/TTransport for every requestBlockId(). Since requestBlockId() runs on every writer rotation (rotateCurrentBatchWriter()), small block sizes or many partitions now turn into one TCP connection per block. That is a hot-path regression and increases FE socket churn. Please keep a reusable client/transport per MaxComputeFeClient (or another pooled helper) instead of reconnecting for every allocation.

@hubgeter
Copy link
Copy Markdown
Contributor Author

/review

@hubgeter
Copy link
Copy Markdown
Contributor Author

run buildall

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides the two already-open review threads, I found one additional blocking issue.

Critical checkpoints

  • Goal / correctness: The refactor mostly achieves the FE-direct block-id path, but it does not preserve availability across FE master failover. The added tests only prove steady-state success / NOT_MASTER redirect.
  • Scope: The change is reasonably focused around MaxCompute block-id allocation.
  • Concurrency: MaxComputeFeClient is synchronized and appears single-writer scoped; I did not find a deadlock or lock-order issue in the touched code.
  • Lifecycle: The new client now lives for the whole writer lifetime, and that lifetime-long FE address pinning is the source of the failure below.
  • Config: max_compute_write_max_block_count is added; I am not duplicating the already-open thread about enforcing the historic 20000 upper bound.
  • Parallel code paths: I did not find another MaxCompute writer path in this PR that also needed the same fix.
  • Special conditions: NOT_MASTER is handled, but the old-master-down / no-redirect case is not.
  • Tests: Added FE unit and Java unit tests cover happy path, redirect, FE error, and retry. They do not cover master crash / heartbeat refresh behavior.
  • Test result files: none changed.
  • Observability: adequate for most FE-side failures; Java-side RPC failure logs could include txn_id / write_session_id, but that is secondary.
  • Transaction / persistence: no EditLog changes; external transaction bookkeeping remains in-memory as before.
  • Data writes: FE failover can now break further block-id allocation for an in-flight write.
  • FE/BE variables: the new FE address / thrift-server-type params are wired through the BE-to-Java path.
  • Performance: the latest revision fixes the already-open connection-churn regression by reusing a transport per writer.
  • Other: I did not find further distinct issues beyond the already-open threads and the inline comment below.


Exception lastException = null;
for (int retryTimes = 0; retryTimes < FETCH_BLOCK_ID_MAX_RETRY_TIMES; retryTimes++) {
TNetworkAddress requestAddress = copyAddress(masterAddress);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this refactor, each block-id allocation started from ExecEnv::GetInstance()->cluster_info()->master_fe_addr, so the next allocation could follow BE heartbeat updates after a master crash. Here the request address always comes from the client's cached masterAddress, which only changes after a successful NOT_MASTER response. If the old master dies before it can return NOT_MASTER, BE can learn the new leader, but this writer never re-reads it and keeps retrying the dead endpoint for the rest of the query. Can we refresh the baseline FE address between independent requestBlockId() calls (or on RPC failures) instead of pinning the whole writer lifetime to the first address?

@hello-stephen
Copy link
Copy Markdown
Contributor

BE UT Coverage Report

Increment line coverage 0.00% (0/5) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 53.38% (20430/38270)
Line Coverage 36.93% (192588/521457)
Region Coverage 33.21% (149637/450585)
Branch Coverage 34.36% (65541/190750)

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 0.00% (0/1) 🎉
Increment coverage report
Complete coverage report

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 0.00% (0/5) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.86% (27692/37495)
Line Coverage 57.65% (299855/520162)
Region Coverage 54.81% (249316/454911)
Branch Coverage 56.40% (108000/191480)

@hello-stephen
Copy link
Copy Markdown
Contributor

FE Regression Coverage Report

Increment line coverage 0.00% (0/5) 🎉
Increment coverage report
Complete coverage report

}

MaxComputeFeClient(TNetworkAddress masterAddress, int rpcTimeoutMs, String thriftServerType) {
this(masterAddress, rpcTimeoutMs, thriftServerType, new ReusableRpcExecutor(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这是一个client,为什么要感知到server type?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be/src/common/config.cpp:
// Frontend mainly use two thrift sever type: THREAD_POOL, THREADED_SELECTOR. if fe use THREADED_SELECTOR model for thrift server,
// the thrift_server_type_of_fe should be set THREADED_SELECTOR to make be thrift client to fe constructed with TFramedTransport
DEFINE_String(thrift_server_type_of_fe, "THREAD_POOL");

throw new IOException("failed to allocate MaxCompute block_id from FE, missing status code, "
+ "txn_id=" + txnId + ", write_session_id=" + writeSessionId);
}
if (code == TStatusCode.NOT_MASTER) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个我们在乎是否是master?我们只要是coordinator的fe 的地址,感觉不需要管是否是master?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

master 维护 MCTransaction , 申请 block ID 需要 txn Id, master 根据txn Id 找到MCTransaction ,才能分出block id. 所以需要master

+ result.getLength() + ", txn_id=" + txnId + ", write_session_id=" + writeSessionId);
}

LOG.info("Allocated MaxCompute block_id from FE@" + formatAddress(requestAddress)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这种得是debug 日志,否则消息太多了

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的

try {
Thread.sleep(retrySleepMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥catch 到expcetion 之后,还得innterrupt 一下?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.sleep() 抛 InterruptedException 时会清掉线程的 interrupted flag,catch 后调用Thread.currentThread().interrupt() 是 Java 推荐做法,用来恢复中断状态,避免上层感知不到中断

return new MaxComputeFeClient(new TNetworkAddress(host, port), timeoutMs, serverType);
}

MaxComputeFeClient(TNetworkAddress masterAddress, int rpcTimeoutMs, String thriftServerType) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥这么多构造函数,真的有用吗?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

会 remove 的

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants