From fba86fb4eed708af802182546a57b9e563b9acfd Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Mon, 20 Apr 2026 02:58:38 +0800 Subject: [PATCH 1/3] Push down IndexedSplit row ranges to Lance native --- .../read/reader/format_lance_reader.py | 12 +- paimon-python/pypaimon/read/split_read.py | 12 +- .../pypaimon/tests/data_evolution_test.py | 160 ++++++++++++++++++ 3 files changed, 175 insertions(+), 9 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/format_lance_reader.py b/paimon-python/pypaimon/read/reader/format_lance_reader.py index 4be30a6f5d7e..401a3a7a1c77 100644 --- a/paimon-python/pypaimon/read/reader/format_lance_reader.py +++ b/paimon-python/pypaimon/read/reader/format_lance_reader.py @@ -30,11 +30,12 @@ class FormatLanceReader(RecordBatchReader): """ A Format Reader that reads record batch from a Lance file using PyArrow, and filters it based on the provided predicate and projection. + Supports native row-index pushdown via LanceFileReader.take_rows(). """ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], - push_down_predicate: Any, batch_size: int = 1024): - """Initialize Lance reader.""" + push_down_predicate: Any, batch_size: int = 1024, + row_indices: Optional[List[int]] = None): import lance file_path_for_lance, storage_options = to_lance_specified(file_io, file_path) @@ -44,9 +45,12 @@ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], file_path_for_lance, storage_options=storage_options, columns=columns_for_lance) - reader_results = lance_reader.read_all() - # Convert to PyArrow table + if row_indices is not None: + reader_results = lance_reader.take_rows(row_indices) + else: + reader_results = lance_reader.read_all() + pa_table = reader_results.to_table() if push_down_predicate is not None: diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 920ff423172d..efd88ea8ad7e 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -139,14 +139,15 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, batch_size = self.table.options.read_batch_size() - # Compute effective row ranges and Vortex row_indices from row_ranges + # Compute effective row ranges and, for formats that support native row-index pushdown + # (Vortex, Lance), convert to local indices. Other formats fall back to post-read filtering. row_indices = None if row_ranges is not None: effective_row_ranges = Range.and_(row_ranges, [file.row_id_range()]) if len(effective_row_ranges) == 0: return EmptyRecordBatchReader() - if file_format == CoreOptions.FILE_FORMAT_VORTEX: - # Convert global row ranges to local indices for Vortex pushdown + if file_format in (CoreOptions.FILE_FORMAT_VORTEX, CoreOptions.FILE_FORMAT_LANCE): + # Convert global row IDs to file-local indices for native pushdown row_indices = [] for r in effective_row_ranges: start = r.from_ - file.first_row_id @@ -164,7 +165,8 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, batch_size=batch_size) elif file_format == CoreOptions.FILE_FORMAT_LANCE: format_reader = FormatLanceReader(self.table.file_io, file_path, read_file_fields, - read_arrow_predicate, batch_size=batch_size) + read_arrow_predicate, batch_size=batch_size, + row_indices=row_indices) elif file_format == CoreOptions.FILE_FORMAT_VORTEX: name_to_field = {f.name: f for f in self.read_fields} ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field] @@ -225,7 +227,7 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, blob_descriptor_fields=blob_descriptor_fields, file_io=self.table.file_io) - # For non-Vortex formats, wrap with RowIdFilterRecordBatchReader + # Formats without native row-index pushdown (Parquet, ORC, Avro, Blob) need post-read filtering if row_ranges is not None and row_indices is None: reader = RowIdFilterRecordBatchReader(reader, file.first_row_id, effective_row_ranges) diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py b/paimon-python/pypaimon/tests/data_evolution_test.py index b9ca4c7accb5..26b70f68088b 100644 --- a/paimon-python/pypaimon/tests/data_evolution_test.py +++ b/paimon-python/pypaimon/tests/data_evolution_test.py @@ -26,6 +26,12 @@ import pyarrow as pa import pyarrow.dataset as ds +try: + import lance # noqa: F401 + HAS_LANCE = True +except ImportError: + HAS_LANCE = False + from pypaimon import CatalogFactory, Schema from pypaimon.common.predicate import Predicate from pypaimon.manifest.manifest_list_manager import ManifestListManager @@ -1594,6 +1600,160 @@ def test_vortex_with_slice(self): self.assertEqual(sliced.num_rows, 3) self.assertEqual(sorted(sliced.column('id').to_pylist()), [2, 3, 4]) + @unittest.skipUnless(HAS_LANCE, "lance not installed") + def test_lance_basic(self): + """Test basic data evolution read/write with Lance format.""" + pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'lance', + } + ) + self.catalog.create_table('default.test_lance_basic', schema, False) + table = self.catalog.get_table('default.test_lance_basic') + + write_builder = table.new_batch_write_builder() + + # Commit 1: write f0, f1 and f2 in separate writers (data evolution) + w0 = write_builder.new_write().with_write_type(['f0', 'f1']) + w1 = write_builder.new_write().with_write_type(['f2']) + c = write_builder.new_commit() + d0 = pa.Table.from_pydict( + {'f0': [1, 2, 3], 'f1': ['a', 'b', 'c']}, + schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + d1 = pa.Table.from_pydict( + {'f2': ['x', 'y', 'z']}, + schema=pa.schema([('f2', pa.string())])) + w0.write_arrow(d0) + w1.write_arrow(d1) + cmts = w0.prepare_commit() + w1.prepare_commit() + for msg in cmts: + for nf in msg.new_files: + nf.first_row_id = 0 + c.commit(cmts) + w0.close() + w1.close() + c.close() + + read_builder = table.new_read_builder() + actual = read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits()) + expected = pa.Table.from_pydict({ + 'f0': [1, 2, 3], + 'f1': ['a', 'b', 'c'], + 'f2': ['x', 'y', 'z'], + }, schema=pa_schema) + self.assertEqual(actual, expected) + + @unittest.skipUnless(HAS_LANCE, "lance not installed") + def test_lance_row_id_filter(self): + """Test that Lance row_indices pushdown works via file_reader_supplier row_ranges.""" + pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'lance', + } + ) + self.catalog.create_table('default.test_lance_row_id_filter', schema, False) + table = self.catalog.get_table('default.test_lance_row_id_filter') + + write_builder = table.new_batch_write_builder() + + # Commit 1: rows 0-4 + w = write_builder.new_write() + c = write_builder.new_commit() + w.write_arrow(pa.Table.from_pydict( + {'f0': list(range(5)), 'f1': [f'v{i}' for i in range(5)]}, + schema=pa_schema)) + c.commit(w.prepare_commit()) + w.close() + c.close() + + # Commit 2: rows 5-9 + w = write_builder.new_write() + c = write_builder.new_commit() + w.write_arrow(pa.Table.from_pydict( + {'f0': list(range(5, 10)), 'f1': [f'v{i}' for i in range(5, 10)]}, + schema=pa_schema)) + c.commit(w.prepare_commit()) + w.close() + c.close() + + # Full read baseline + rb = table.new_read_builder() + full = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(full.num_rows, 10) + + # Filter by _ROW_ID — triggers row_ranges pushdown in Lance via take_rows() + pb = table.new_read_builder().with_projection(['f0', 'f1', '_ROW_ID']).new_predicate_builder() + rb_filtered = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 3)) + filtered = rb_filtered.new_read().to_arrow(rb_filtered.new_scan().plan().splits()) + self.assertEqual(filtered.num_rows, 1) + self.assertEqual(filtered.column('f0')[0].as_py(), 3) + self.assertEqual(filtered.column('f1')[0].as_py(), 'v3') + + # Filter by _ROW_ID range spanning two files + rb_range = table.new_read_builder().with_filter(pb.between('_ROW_ID', 3, 6)) + range_result = rb_range.new_read().to_arrow(rb_range.new_scan().plan().splits()) + self.assertEqual(range_result.num_rows, 4) + self.assertEqual(sorted(range_result.column('f0').to_pylist()), [3, 4, 5, 6]) + + @unittest.skipUnless(HAS_LANCE, "lance not installed") + def test_lance_with_slice(self): + """Test with_slice on Lance data evolution table.""" + pa_schema = pa.schema([ + ('id', pa.int64()), + ('val', pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'lance', + 'source.split.target-size': '512m', + } + ) + self.catalog.create_table('default.test_lance_with_slice', schema, False) + table = self.catalog.get_table('default.test_lance_with_slice') + + for batch in [ + {'id': [1, 2], 'val': [10, 20]}, + {'id': [3, 4], 'val': [30, 40]}, + {'id': [5, 6], 'val': [50, 60]}, + ]: + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema)) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + rb = table.new_read_builder() + + # Full read + full = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(full.num_rows, 6) + + # with_slice(1, 4) -> rows at index 1,2,3 -> id in (2,3,4) + scan = rb.new_scan().with_slice(1, 4) + sliced = rb.new_read().to_arrow(scan.plan().splits()) + self.assertEqual(sliced.num_rows, 3) + self.assertEqual(sorted(sliced.column('id').to_pylist()), [2, 3, 4]) + def test_large_file_read(self): pa_schema = pa.schema([ ('id', pa.int32()), From 6142a6a7d08d1d25675f37a66036f5a93a2d0017 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Mon, 20 Apr 2026 03:49:20 +0800 Subject: [PATCH 2/3] Push down IndexedSplit row ranges to Lance native --- paimon-python/pypaimon/read/reader/format_lance_reader.py | 4 ++-- paimon-python/pypaimon/read/split_read.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/format_lance_reader.py b/paimon-python/pypaimon/read/reader/format_lance_reader.py index 401a3a7a1c77..18df80bc6d73 100644 --- a/paimon-python/pypaimon/read/reader/format_lance_reader.py +++ b/paimon-python/pypaimon/read/reader/format_lance_reader.py @@ -34,8 +34,8 @@ class FormatLanceReader(RecordBatchReader): """ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], - push_down_predicate: Any, batch_size: int = 1024, - row_indices: Optional[List[int]] = None): + row_indices: Optional[List[int]], push_down_predicate: Any, + batch_size: int = 1024): import lance file_path_for_lance, storage_options = to_lance_specified(file_io, file_path) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index efd88ea8ad7e..cdd2b5066261 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -165,8 +165,8 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, batch_size=batch_size) elif file_format == CoreOptions.FILE_FORMAT_LANCE: format_reader = FormatLanceReader(self.table.file_io, file_path, read_file_fields, - read_arrow_predicate, batch_size=batch_size, - row_indices=row_indices) + row_indices, read_arrow_predicate, + batch_size=batch_size) elif file_format == CoreOptions.FILE_FORMAT_VORTEX: name_to_field = {f.name: f for f in self.read_fields} ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field] From 9caf7e2cc3e068a26547548b6e428231d5efc385 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Mon, 20 Apr 2026 03:59:46 +0800 Subject: [PATCH 3/3] Push down IndexedSplit row ranges to Lance native --- paimon-python/pypaimon/read/reader/format_lance_reader.py | 2 +- paimon-python/pypaimon/read/split_read.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/format_lance_reader.py b/paimon-python/pypaimon/read/reader/format_lance_reader.py index 18df80bc6d73..a17e8c7d833e 100644 --- a/paimon-python/pypaimon/read/reader/format_lance_reader.py +++ b/paimon-python/pypaimon/read/reader/format_lance_reader.py @@ -34,7 +34,7 @@ class FormatLanceReader(RecordBatchReader): """ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], - row_indices: Optional[List[int]], push_down_predicate: Any, + push_down_predicate: Any, row_indices: Optional[List[int]] = None, batch_size: int = 1024): import lance diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index cdd2b5066261..588358a8e2ea 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -165,7 +165,7 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, batch_size=batch_size) elif file_format == CoreOptions.FILE_FORMAT_LANCE: format_reader = FormatLanceReader(self.table.file_io, file_path, read_file_fields, - row_indices, read_arrow_predicate, + read_arrow_predicate, row_indices, batch_size=batch_size) elif file_format == CoreOptions.FILE_FORMAT_VORTEX: name_to_field = {f.name: f for f in self.read_fields}