diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp index fb6507c33ac616..22a4e96f45b9b3 100644 --- a/be/src/exec/scan/scanner.cpp +++ b/be/src/exec/scan/scanner.cpp @@ -253,6 +253,7 @@ Status Scanner::try_append_late_arrival_runtime_filter() { // avoid conjunct destroy in used by storage layer _conjuncts.clear(); RETURN_IF_ERROR(_local_state->clone_conjunct_ctxs(_conjuncts)); + _applied_rf_num = arrived_rf_num; return Status::OK(); } diff --git a/be/test/exec/scan/scanner_late_arrival_rf_test.cpp b/be/test/exec/scan/scanner_late_arrival_rf_test.cpp new file mode 100644 index 00000000000000..f1e21ebc4c34dd --- /dev/null +++ b/be/test/exec/scan/scanner_late_arrival_rf_test.cpp @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "core/data_type/data_type_factory.hpp" +#include "core/data_type/data_type_number.h" +#include "exec/operator/mock_scan_operator.h" +#include "exec/runtime_filter/runtime_filter_consumer.h" +#include "exec/runtime_filter/runtime_filter_consumer_helper.h" +#include "exec/runtime_filter/runtime_filter_producer.h" +#include "exec/runtime_filter/runtime_filter_test_utils.h" +#include "exec/scan/scanner.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" + +namespace doris { + +// Minimal concrete Scanner so the abstract base can be instantiated. The +// behavior under test (`Scanner::try_append_late_arrival_runtime_filter`) lives +// entirely in the base class and never calls `_get_block_impl`. +class TestScanner final : public Scanner { +public: + TestScanner(RuntimeState* state, ScanLocalStateBase* local_state, int64_t limit, + RuntimeProfile* profile) + : Scanner(state, local_state, limit, profile) {} + +protected: + Status _get_block_impl(RuntimeState* /*state*/, Block* /*block*/, bool* eof) override { + *eof = true; + return Status::OK(); + } +}; + +class ScannerLateArrivalRfTest : public RuntimeFilterTest { +public: + void SetUp() override { + RuntimeFilterTest::SetUp(); + // The runtime filter helper schedules a timer on the global queue when + // it initialises, so the queue must exist before init() is called. + ExecEnv::GetInstance()->_init_runtime_filter_timer_queue(); + } +}; + +// Regression: PR #59786 accidentally removed `_applied_rf_num = arrived_rf_num` +// from `Scanner::try_append_late_arrival_runtime_filter`, leaving the counter +// at 0 forever. Subsequent calls then re-clone conjunct ctxs every time, the +// `ApplyAllRuntimeFilters=True` info-string is never emitted, and the +// `_applied_rf_num <= _total_rf_num` DCHECK becomes dead. This test asserts +// the counter advances after RFs arrive and that the second call short-circuits +// via the fast path at the top of the function. +TEST_F(ScannerLateArrivalRfTest, applied_rf_num_advances_after_late_arrival) { + std::vector rf_descs = { + TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(), + TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build()}; + + SlotDescriptor slot_desc; + slot_desc._type = DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, false); + TupleDescriptor tuple_desc; + tuple_desc.add_slot(&slot_desc); + RowDescriptor row_desc; + _tbl._slot_desc_map[0] = &slot_desc; + const_cast&>(row_desc._tuple_desc_map).push_back(&tuple_desc); + + // MockScanOperatorX feeds `_runtime_filter_descs` and `_row_descriptor` + // into the local state's helper. Access to the protected members is + // permitted by the build-wide `-fno-access-control` flag. + auto op = std::make_shared(); + op->_runtime_filter_descs = rf_descs; + op->_row_descriptor = row_desc; + + auto local_state = std::make_shared(_runtime_states[0].get(), op.get()); + + std::vector> rf_dependencies; + ASSERT_TRUE(local_state->_helper.init(_runtime_states[0].get(), true, 0, 0, rf_dependencies, "") + .ok()); + + auto scanner = std::make_unique(_runtime_states[0].get(), local_state.get(), + -1 /*limit*/, &_profile); + ASSERT_TRUE(scanner->init(_runtime_states[0].get(), {}).ok()); + ASSERT_EQ(scanner->_total_rf_num, 2); + ASSERT_EQ(scanner->_applied_rf_num, 0); + + std::shared_ptr producer; + ASSERT_TRUE(RuntimeFilterProducer::create(_query_ctx.get(), rf_descs.data(), &producer).ok()); + producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); + local_state->_helper._consumers[0]->signal(producer.get()); + local_state->_helper._consumers[1]->signal(producer.get()); + + // First call after both RFs arrived: counter must advance to total. Before + // the fix this stayed at 0 because the assignment was missing. + ASSERT_TRUE(scanner->try_append_late_arrival_runtime_filter().ok()); + ASSERT_EQ(scanner->_applied_rf_num, 2); + + // Second call: must hit the fast-path early return without re-cloning. + // We clear `_conjuncts` and verify the function does NOT repopulate them; + // if `_applied_rf_num` were still 0 the function would call + // `clone_conjunct_ctxs` and overwrite the cleared vector. + scanner->_conjuncts.clear(); + ASSERT_TRUE(scanner->try_append_late_arrival_runtime_filter().ok()); + ASSERT_EQ(scanner->_applied_rf_num, 2); + ASSERT_TRUE(scanner->_conjuncts.empty()); +} + +} // namespace doris