diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/ScalarIndexedFieldsVisitor.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/ScalarIndexedFieldsVisitor.java new file mode 100644 index 000000000000..a1c4c266f5ab --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/ScalarIndexedFieldsVisitor.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex; + +import org.apache.paimon.index.GlobalIndexMeta; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.predicate.And; +import org.apache.paimon.predicate.Between; +import org.apache.paimon.predicate.CompoundFunction; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.In; +import org.apache.paimon.predicate.IsNull; +import org.apache.paimon.predicate.LeafFunction; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.Or; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.predicate.PredicateVisitor; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.Pair; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates; +import static org.apache.paimon.table.source.snapshot.TimeTravelUtil.tryTravelOrLatest; + +/** A visitor to test whether a predicate is fully covered by scalar index. */ +public class ScalarIndexedFieldsVisitor implements PredicateVisitor { + + private static final String BTREE_INDEX_TYPE = "btree"; + + private final Set scalarIndexedFields; + + public ScalarIndexedFieldsVisitor(Set scalarIndexedFields) { + this.scalarIndexedFields = scalarIndexedFields; + } + + public static boolean allFieldsIndexed( + Table table, + @Nullable Predicate predicate, + @Nullable PartitionPredicate partitionPredicate) { + if (!(table instanceof FileStoreTable)) { + return false; + } + + FileStoreTable storeTable = (FileStoreTable) table; + + if (!storeTable.coreOptions().dataEvolutionEnabled()) { + return false; + } + + if (predicate == null || !storeTable.coreOptions().globalIndexEnabled()) { + return false; + } + + // We should split the PartitionPredicate to filter index entries, or the result may be + // wrong. For example, if we have two partitions `dt=1`(indexed) and `dt=2`(unindexed), + // the where condition `id=10 AND dt=2` should not be consumed. Because the index evaluation + // during the plan phase will decide not to use the index. + Pair, List> splitPredicates = + splitPartitionPredicatesAndDataPredicates( + predicate, table.rowType(), table.partitionKeys()); + PartitionPredicate effectivePartPredicate = + partitionPredicate != null + ? partitionPredicate + : splitPredicates.getLeft().orElse(null); + + Set indexedFields = + storeTable.store().newIndexFileHandler() + .scan(tryTravelOrLatest(storeTable), entryFilter(effectivePartPredicate)) + .stream() + .map(IndexManifestEntry::indexFile) + .map(indexFile -> indexFile.globalIndexMeta()) + .filter(Objects::nonNull) + .map(GlobalIndexMeta::indexFieldId) + .filter(storeTable.rowType()::containsField) + .map(fieldId -> storeTable.rowType().getField(fieldId).name()) + .collect(Collectors.toSet()); + + if (indexedFields.isEmpty()) { + return false; + } + + return PredicateBuilder.and(splitPredicates.getRight()) + .visit(new ScalarIndexedFieldsVisitor(indexedFields)); + } + + private static org.apache.paimon.utils.Filter entryFilter( + PartitionPredicate partitionPredicate) { + return entry -> { + if (partitionPredicate != null && !partitionPredicate.test(entry.partition())) { + return false; + } + GlobalIndexMeta globalIndexMeta = entry.indexFile().globalIndexMeta(); + return globalIndexMeta != null + && BTREE_INDEX_TYPE.equals(entry.indexFile().indexType()); + }; + } + + @Override + public Boolean visit(LeafPredicate predicate) { + Optional fieldRefOptional = predicate.fieldRefOptional(); + if (!fieldRefOptional.isPresent()) { + return false; + } + + FieldRef fieldRef = fieldRefOptional.get(); + if (!isScalarIndexed(fieldRef)) { + return false; + } + + LeafFunction function = predicate.function(); + return function instanceof Equal + || function instanceof In + || function instanceof Between + || function instanceof IsNull; + } + + @Override + public Boolean visit(CompoundPredicate predicate) { + CompoundFunction function = predicate.function(); + if (!(function instanceof And) && !(function instanceof Or)) { + return false; + } + + for (Predicate child : predicate.children()) { + if (!child.visit(this)) { + return false; + } + } + return true; + } + + private boolean isScalarIndexed(FieldRef fieldRef) { + return scalarIndexedFields.contains(fieldRef.name()); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/globalindex/ScalarIndexedFieldsVisitorTest.java b/paimon-core/src/test/java/org/apache/paimon/globalindex/ScalarIndexedFieldsVisitorTest.java new file mode 100644 index 000000000000..1d73779f3c71 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/globalindex/ScalarIndexedFieldsVisitorTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.predicate.UpperTransform; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ScalarIndexedFieldsVisitor}. */ +public class ScalarIndexedFieldsVisitorTest { + + private static final RowType ROW_TYPE = + new RowType( + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "name", DataTypes.STRING()), + new DataField(2, "score", DataTypes.INT()))); + + private static final PredicateBuilder BUILDER = new PredicateBuilder(ROW_TYPE); + + private static final ScalarIndexedFieldsVisitor VISITOR = + new ScalarIndexedFieldsVisitor(new HashSet<>(Arrays.asList("id", "score"))); + + @Test + public void testSupportedPredicates() { + assertThat(BUILDER.equal(0, 10).visit(VISITOR)).isTrue(); + assertThat(BUILDER.in(0, Arrays.asList(1, 2, 3)).visit(VISITOR)).isTrue(); + assertThat(BUILDER.between(2, 90, 100).visit(VISITOR)).isTrue(); + assertThat(BUILDER.isNull(0).visit(VISITOR)).isTrue(); + } + + @Test + public void testCompoundPredicates() { + Predicate andPredicate = + PredicateBuilder.and(BUILDER.equal(0, 10), BUILDER.between(2, 90, 100)); + Predicate orPredicate = + PredicateBuilder.or(BUILDER.in(0, Arrays.asList(1, 2, 3)), BUILDER.isNull(2)); + + assertThat(andPredicate.visit(VISITOR)).isTrue(); + assertThat(orPredicate.visit(VISITOR)).isTrue(); + } + + @Test + public void testUnsupportedPredicates() { + assertThat(BUILDER.greaterThan(0, 10).visit(VISITOR)).isFalse(); + assertThat(BUILDER.equal(1, BinaryString.fromString("name_10")).visit(VISITOR)).isFalse(); + } + + @Test + public void testNonFieldPredicate() { + LeafPredicate upperNameEquals = + LeafPredicate.of( + new UpperTransform( + Collections.singletonList( + new FieldRef(1, "name", DataTypes.STRING()))), + Equal.INSTANCE, + Collections.singletonList(BinaryString.fromString("NAME_10"))); + + assertThat(upperNameEquals.visit(VISITOR)).isFalse(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index 66cb49798aa0..285973781be8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -26,6 +26,7 @@ import org.apache.paimon.flink.lookup.DynamicPartitionLoader; import org.apache.paimon.flink.lookup.PartitionLoader; import org.apache.paimon.flink.lookup.StaticPartitionLoader; +import org.apache.paimon.globalindex.ScalarIndexedFieldsVisitor; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; @@ -115,6 +116,7 @@ public Result applyFilters(List filters) { List unConsumedFilters = new ArrayList<>(); List consumedFilters = new ArrayList<>(); List converted = new ArrayList<>(); + boolean hasUnconvertedFilter = false; PredicateVisitor onlyPartFieldsVisitor = new PartitionPredicateVisitor(partitionKeys); @@ -123,6 +125,7 @@ public Result applyFilters(List filters) { if (!predicateOptional.isPresent()) { unConsumedFilters.add(filter); + hasUnconvertedFilter = true; } else { Predicate p = predicateOptional.get(); if (isUnbounded() || !p.visit(onlyPartFieldsVisitor)) { @@ -134,6 +137,16 @@ public Result applyFilters(List filters) { } } predicate = converted.isEmpty() ? null : PredicateBuilder.and(converted); + + // If all predicates are covered by global index, they can be consumed. + if (!isUnbounded() + && !hasUnconvertedFilter + && ScalarIndexedFieldsVisitor.allFieldsIndexed( + table, predicate, partitionPredicate)) { + unConsumedFilters.clear(); + consumedFilters = new ArrayList<>(filters); + } + LOG.info("Consumed filters: {} of {}", consumedFilters, filters); return Result.of(filters, unConsumedFilters); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 48fcbbda710e..615ee8283f10 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -51,6 +51,7 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static java.util.Collections.singletonList; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; @@ -810,6 +811,98 @@ public void testCountStarPKDv() { validateCount1PushDown(sql); } + @Test + public void testCountStarBTreeIndexNonPartitionedTable() { + sql( + "CREATE TABLE count_btree_index_non_partitioned (id INT, name STRING, category INT) WITH (" + + "'global-index.enabled' = 'true', " + + "'row-tracking.enabled' = 'true', " + + "'data-evolution.enabled' = 'true'" + + ")"); + + String values = + IntStream.range(0, 20) + .mapToObj(i -> String.format("(%s, '%s', %s)", i, "name_" + i, i % 2)) + .collect(Collectors.joining(",")); + sql("INSERT INTO count_btree_index_non_partitioned VALUES " + values); + sql( + "CALL sys.create_global_index(`table` => 'default.count_btree_index_non_partitioned', index_column => 'id', index_type => 'btree')"); + + assertCountStarQuery( + "SELECT COUNT(*) FROM count_btree_index_non_partitioned WHERE id IN (1, 2, 3)", + 3L, + true); + assertCountStarQuery( + "SELECT COUNT(*) FROM count_btree_index_non_partitioned WHERE id BETWEEN 10 AND 14", + 5L, + true); + assertCountStarQuery( + "SELECT COUNT(*) FROM count_btree_index_non_partitioned WHERE id = 6 AND category = 0", + 1L, + false); + assertCountStarQuery( + "SELECT COUNT(*) FROM count_btree_index_non_partitioned WHERE id > 15", 4L, false); + } + + @Test + public void testCountStarBTreeIndexPartitionedTable() { + sql( + "CREATE TABLE count_btree_index_partitioned (id INT, name STRING, category INT, dt STRING) " + + "PARTITIONED BY (dt) WITH (" + + "'global-index.enabled' = 'true', " + + "'row-tracking.enabled' = 'true', " + + "'data-evolution.enabled' = 'true'" + + ")"); + + String values = + IntStream.range(0, 10) + .mapToObj( + i -> + String.format( + "(%s, '%s', %s, 'dt1'), (%s, '%s', %s, 'dt2')", + i, "name_" + i, i % 2, i, "name_" + i, (i + 1) % 2)) + .collect(Collectors.joining(",")); + sql("INSERT INTO count_btree_index_partitioned VALUES " + values); + sql( + "CALL sys.create_global_index(" + + "`table` => 'default.count_btree_index_partitioned', " + + "index_column => 'id', " + + "index_type => 'btree', " + + "partitions => 'dt=dt1')"); + + assertCountStarQuery( + "SELECT COUNT(*) FROM count_btree_index_partitioned WHERE dt = 'dt1' AND id = 1", + 1L, + true); + assertCountStarQuery( + "SELECT COUNT(*) FROM count_btree_index_partitioned WHERE dt = 'dt1' AND id IN (1, 2, 3)", + 3L, + true); + assertCountStarQuery( + "SELECT COUNT(*) FROM count_btree_index_partitioned WHERE dt = 'dt2' AND id = 1", + 1L, + false); + assertCountStarQuery( + "SELECT COUNT(*) FROM count_btree_index_partitioned WHERE id = 1", 1L, true); + assertCountStarQuery( + "SELECT COUNT(*) FROM count_btree_index_partitioned WHERE dt = 'dt1' AND id = 1 AND category = 1", + 1L, + false); + assertCountStarQuery( + "SELECT COUNT(*) FROM count_btree_index_partitioned WHERE dt = 'dt1' AND id > 5", + 4L, + false); + } + + private void assertCountStarQuery(String sql, long expectedCount, boolean pushDown) { + if (pushDown) { + validateCount1PushDown(sql); + } else { + validateCount1NotPushDown(sql); + } + assertThat(sql(sql)).containsOnly(Row.of(expectedCount)); + } + private void validateCount1PushDown(String sql) { Transformation transformation = AbstractTestBase.translate(tEnv, sql); while (!transformation.getInputs().isEmpty()) {