[improvement](maxcompute) Simplify FE block ID requests for MaxCompute writes#62880
[improvement](maxcompute) Simplify FE block ID requests for MaxCompute writes#62880hubgeter wants to merge 2 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
There was a problem hiding this comment.
I found two issues that should be addressed before this PR merges.
- The new
max_compute_write_max_block_countsetting turns the previously hard FE-side block-id guard into an unconstrained runtime knob. If this is set above MaxCompute's actual per-session block cap, FE will now allocate out-of-contract block IDs and push the failure to the writer/runtime path. MaxComputeFeClientnow opens a fresh thrift socket for every block-id request. Because block IDs are requested on every writer rotation, this regresses the hot write path and increases FE connection churn compared with the old cached-client path.
Critical checkpoint conclusions:
- Goal of the task: move MaxCompute block-id allocation from the BE JNI native bridge into the Java connector and keep the FE allocation semantics. The refactor mostly achieves that, and
MaxComputeFeClientTestcovers basic success/redirect/error retry behavior, but the two issues above mean the new path is not yet equivalent/safe enough. - Change scope: the JNI cleanup and direct FE client are reasonably focused, but the config change also widens a previously hard-enforced guard.
- Concurrency: no new cross-thread locking issue was found in the touched code.
MCTransaction.allocateBlockIdRange()still relies onAtomicLong, and the new FE client is used per writer instance. - Lifecycle/static init: no special lifecycle or static-initialization issue found in the reviewed paths.
- Configuration: a new FE config was added. Its current shape is problematic because it makes the external block-id cap operator-configurable without preserving the hard upper bound.
- Compatibility/incompatible change: no new storage-format or FE/BE thrift compatibility issue was found in the touched fields.
- Parallel code paths: I did not find another live MaxCompute block-id allocation path after the JNI method removal.
- Special conditions: NOT_MASTER redirect handling is still present, but the transport implementation changed enough to introduce the connection-churn regression above.
- Test coverage: good unit coverage for the new redirect/retry logic; missing coverage for invalid block-count configuration and repeated block-allocation hot-path behavior.
- Test result updates: none in this PR.
- Observability: existing logs are sufficient for debugging this path.
- Transaction/persistence/data-write correctness: block IDs are still allocated from FE transaction state, but allowing the limit to exceed the external cap can break write correctness.
- Performance: per-request socket creation on block allocation is a hot-path regression.
- Other issues: no additional distinct issues found.
- User focus:
/tmp/review_focus.txtcontained no additional review focus, so there were no extra focus-specific findings.
| */ | ||
| @ConfField(mutable = true, masterOnly = true, description = { | ||
| "Maximum number of MaxCompute Storage API write block IDs that can be allocated in one write session."}) | ||
| public static long max_compute_write_max_block_count = 20000L; |
There was a problem hiding this comment.
PR #62578 made 20000 the FE-side guard for MaxCompute's per-session block-id cap. After turning that into an unconstrained config here, setting max_compute_write_max_block_count > 20000 lets allocateBlockIdRange() hand out block IDs that the previous FE-side guard explicitly refused. That moves the failure from deterministic FE validation to a later write-time error after data has already been produced. Can we keep 20000 as a hard upper bound and only make a lower soft cap configurable (or validate/clamp the config value before it is used)?
|
|
||
| private static <T> T callFe(TNetworkAddress address, int timeoutMs, | ||
| boolean useFramedTransport, FeCall<T> call) throws Exception { | ||
| TSocket socket = new TSocket(address.getHostname(), address.getPort(), timeoutMs); |
There was a problem hiding this comment.
Before this refactor, block-id allocation went through ThriftRpcHelper::rpc<FrontendServiceClient>(), so repeated requests reused Doris' FE client cache. callFe() now creates and closes a brand-new TSocket/TTransport for every requestBlockId(). Since requestBlockId() runs on every writer rotation (rotateCurrentBatchWriter()), small block sizes or many partitions now turn into one TCP connection per block. That is a hot-path regression and increases FE socket churn. Please keep a reusable client/transport per MaxComputeFeClient (or another pooled helper) instead of reconnecting for every allocation.
|
/review |
|
run buildall |
There was a problem hiding this comment.
Besides the two already-open review threads, I found one additional blocking issue.
Critical checkpoints
- Goal / correctness: The refactor mostly achieves the FE-direct block-id path, but it does not preserve availability across FE master failover. The added tests only prove steady-state success /
NOT_MASTERredirect. - Scope: The change is reasonably focused around MaxCompute block-id allocation.
- Concurrency:
MaxComputeFeClientis synchronized and appears single-writer scoped; I did not find a deadlock or lock-order issue in the touched code. - Lifecycle: The new client now lives for the whole writer lifetime, and that lifetime-long FE address pinning is the source of the failure below.
- Config:
max_compute_write_max_block_countis added; I am not duplicating the already-open thread about enforcing the historic20000upper bound. - Parallel code paths: I did not find another MaxCompute writer path in this PR that also needed the same fix.
- Special conditions:
NOT_MASTERis handled, but the old-master-down / no-redirect case is not. - Tests: Added FE unit and Java unit tests cover happy path, redirect, FE error, and retry. They do not cover master crash / heartbeat refresh behavior.
- Test result files: none changed.
- Observability: adequate for most FE-side failures; Java-side RPC failure logs could include
txn_id/write_session_id, but that is secondary. - Transaction / persistence: no EditLog changes; external transaction bookkeeping remains in-memory as before.
- Data writes: FE failover can now break further block-id allocation for an in-flight write.
- FE/BE variables: the new FE address / thrift-server-type params are wired through the BE-to-Java path.
- Performance: the latest revision fixes the already-open connection-churn regression by reusing a transport per writer.
- Other: I did not find further distinct issues beyond the already-open threads and the inline comment below.
|
|
||
| Exception lastException = null; | ||
| for (int retryTimes = 0; retryTimes < FETCH_BLOCK_ID_MAX_RETRY_TIMES; retryTimes++) { | ||
| TNetworkAddress requestAddress = copyAddress(masterAddress); |
There was a problem hiding this comment.
Before this refactor, each block-id allocation started from ExecEnv::GetInstance()->cluster_info()->master_fe_addr, so the next allocation could follow BE heartbeat updates after a master crash. Here the request address always comes from the client's cached masterAddress, which only changes after a successful NOT_MASTER response. If the old master dies before it can return NOT_MASTER, BE can learn the new leader, but this writer never re-reads it and keeps retrying the dead endpoint for the rest of the query. Can we refresh the baseline FE address between independent requestBlockId() calls (or on RPC failures) instead of pinning the whole writer lifetime to the first address?
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
| } | ||
|
|
||
| MaxComputeFeClient(TNetworkAddress masterAddress, int rpcTimeoutMs, String thriftServerType) { | ||
| this(masterAddress, rpcTimeoutMs, thriftServerType, new ReusableRpcExecutor(), |
There was a problem hiding this comment.
这是一个client,为什么要感知到server type?
There was a problem hiding this comment.
be/src/common/config.cpp:
// Frontend mainly use two thrift sever type: THREAD_POOL, THREADED_SELECTOR. if fe use THREADED_SELECTOR model for thrift server,
// the thrift_server_type_of_fe should be set THREADED_SELECTOR to make be thrift client to fe constructed with TFramedTransport
DEFINE_String(thrift_server_type_of_fe, "THREAD_POOL");
| throw new IOException("failed to allocate MaxCompute block_id from FE, missing status code, " | ||
| + "txn_id=" + txnId + ", write_session_id=" + writeSessionId); | ||
| } | ||
| if (code == TStatusCode.NOT_MASTER) { |
There was a problem hiding this comment.
这个我们在乎是否是master?我们只要是coordinator的fe 的地址,感觉不需要管是否是master?
There was a problem hiding this comment.
master 维护 MCTransaction , 申请 block ID 需要 txn Id, master 根据txn Id 找到MCTransaction ,才能分出block id. 所以需要master
| + result.getLength() + ", txn_id=" + txnId + ", write_session_id=" + writeSessionId); | ||
| } | ||
|
|
||
| LOG.info("Allocated MaxCompute block_id from FE@" + formatAddress(requestAddress) |
| try { | ||
| Thread.sleep(retrySleepMs); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
为啥catch 到expcetion 之后,还得innterrupt 一下?
There was a problem hiding this comment.
Thread.sleep() 抛 InterruptedException 时会清掉线程的 interrupted flag,catch 后调用Thread.currentThread().interrupt() 是 Java 推荐做法,用来恢复中断状态,避免上层感知不到中断
| return new MaxComputeFeClient(new TNetworkAddress(host, port), timeoutMs, serverType); | ||
| } | ||
|
|
||
| MaxComputeFeClient(TNetworkAddress masterAddress, int rpcTimeoutMs, String thriftServerType) { |
What problem does this PR solve?
Related PR: #62578
Instead of calling FE through the BE JNI C++ bridge:
MaxCompute connector Java -> BE JNI C++ -> FE
the MaxCompute connector now requests FE directly through thrift:
MaxCompute connector Java -> FE
A new MaxComputeFeClient is added under the MaxCompute connector to handle FE
methods.
MAX_BLOCK_COUNTvariable fromfe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.javaand moves it to the FE config
max_compute_write_max_block_countThe default value is still 20000, so the existing behavior is preserved.
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)