Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions paimon-python/pypaimon/read/reader/format_lance_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ class FormatLanceReader(RecordBatchReader):
"""
A Format Reader that reads record batch from a Lance file using PyArrow,
and filters it based on the provided predicate and projection.
Supports native row-index pushdown via LanceFileReader.take_rows().
"""

def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
push_down_predicate: Any, batch_size: int = 1024):
"""Initialize Lance reader."""
push_down_predicate: Any, row_indices: Optional[List[int]] = None,
batch_size: int = 1024):
import lance

file_path_for_lance, storage_options = to_lance_specified(file_io, file_path)
Expand All @@ -44,9 +45,12 @@ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
file_path_for_lance,
storage_options=storage_options,
columns=columns_for_lance)
reader_results = lance_reader.read_all()

# Convert to PyArrow table
if row_indices is not None:
reader_results = lance_reader.take_rows(row_indices)
else:
reader_results = lance_reader.read_all()

pa_table = reader_results.to_table()

if push_down_predicate is not None:
Expand Down
12 changes: 7 additions & 5 deletions paimon-python/pypaimon/read/split_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,15 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,

batch_size = self.table.options.read_batch_size()

# Compute effective row ranges and Vortex row_indices from row_ranges
# Compute effective row ranges and, for formats that support native row-index pushdown
# (Vortex, Lance), convert to local indices. Other formats fall back to post-read filtering.
row_indices = None
if row_ranges is not None:
effective_row_ranges = Range.and_(row_ranges, [file.row_id_range()])
if len(effective_row_ranges) == 0:
return EmptyRecordBatchReader()
if file_format == CoreOptions.FILE_FORMAT_VORTEX:
# Convert global row ranges to local indices for Vortex pushdown
if file_format in (CoreOptions.FILE_FORMAT_VORTEX, CoreOptions.FILE_FORMAT_LANCE):
# Convert global row IDs to file-local indices for native pushdown
row_indices = []
for r in effective_row_ranges:
start = r.from_ - file.first_row_id
Expand All @@ -164,7 +165,8 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
batch_size=batch_size)
elif file_format == CoreOptions.FILE_FORMAT_LANCE:
format_reader = FormatLanceReader(self.table.file_io, file_path, read_file_fields,
read_arrow_predicate, batch_size=batch_size)
read_arrow_predicate, row_indices,
batch_size=batch_size)
elif file_format == CoreOptions.FILE_FORMAT_VORTEX:
name_to_field = {f.name: f for f in self.read_fields}
ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field]
Expand Down Expand Up @@ -225,7 +227,7 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
blob_descriptor_fields=blob_descriptor_fields,
file_io=self.table.file_io)

# For non-Vortex formats, wrap with RowIdFilterRecordBatchReader
# Formats without native row-index pushdown (Parquet, ORC, Avro, Blob) need post-read filtering
if row_ranges is not None and row_indices is None:
reader = RowIdFilterRecordBatchReader(reader, file.first_row_id, effective_row_ranges)

Expand Down
160 changes: 160 additions & 0 deletions paimon-python/pypaimon/tests/data_evolution_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
import pyarrow as pa
import pyarrow.dataset as ds

try:
import lance # noqa: F401
HAS_LANCE = True
except ImportError:
HAS_LANCE = False

from pypaimon import CatalogFactory, Schema
from pypaimon.common.predicate import Predicate
from pypaimon.manifest.manifest_list_manager import ManifestListManager
Expand Down Expand Up @@ -1594,6 +1600,160 @@ def test_vortex_with_slice(self):
self.assertEqual(sliced.num_rows, 3)
self.assertEqual(sorted(sliced.column('id').to_pylist()), [2, 3, 4])

@unittest.skipUnless(HAS_LANCE, "lance not installed")
def test_lance_basic(self):
"""Test basic data evolution read/write with Lance format."""
pa_schema = pa.schema([
('f0', pa.int32()),
('f1', pa.string()),
('f2', pa.string()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'lance',
}
)
self.catalog.create_table('default.test_lance_basic', schema, False)
table = self.catalog.get_table('default.test_lance_basic')

write_builder = table.new_batch_write_builder()

# Commit 1: write f0, f1 and f2 in separate writers (data evolution)
w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
w1 = write_builder.new_write().with_write_type(['f2'])
c = write_builder.new_commit()
d0 = pa.Table.from_pydict(
{'f0': [1, 2, 3], 'f1': ['a', 'b', 'c']},
schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())]))
d1 = pa.Table.from_pydict(
{'f2': ['x', 'y', 'z']},
schema=pa.schema([('f2', pa.string())]))
w0.write_arrow(d0)
w1.write_arrow(d1)
cmts = w0.prepare_commit() + w1.prepare_commit()
for msg in cmts:
for nf in msg.new_files:
nf.first_row_id = 0
c.commit(cmts)
w0.close()
w1.close()
c.close()

read_builder = table.new_read_builder()
actual = read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
expected = pa.Table.from_pydict({
'f0': [1, 2, 3],
'f1': ['a', 'b', 'c'],
'f2': ['x', 'y', 'z'],
}, schema=pa_schema)
self.assertEqual(actual, expected)

@unittest.skipUnless(HAS_LANCE, "lance not installed")
def test_lance_row_id_filter(self):
"""Test that Lance row_indices pushdown works via file_reader_supplier row_ranges."""
pa_schema = pa.schema([
('f0', pa.int32()),
('f1', pa.string()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'lance',
}
)
self.catalog.create_table('default.test_lance_row_id_filter', schema, False)
table = self.catalog.get_table('default.test_lance_row_id_filter')

write_builder = table.new_batch_write_builder()

# Commit 1: rows 0-4
w = write_builder.new_write()
c = write_builder.new_commit()
w.write_arrow(pa.Table.from_pydict(
{'f0': list(range(5)), 'f1': [f'v{i}' for i in range(5)]},
schema=pa_schema))
c.commit(w.prepare_commit())
w.close()
c.close()

# Commit 2: rows 5-9
w = write_builder.new_write()
c = write_builder.new_commit()
w.write_arrow(pa.Table.from_pydict(
{'f0': list(range(5, 10)), 'f1': [f'v{i}' for i in range(5, 10)]},
schema=pa_schema))
c.commit(w.prepare_commit())
w.close()
c.close()

# Full read baseline
rb = table.new_read_builder()
full = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(full.num_rows, 10)

# Filter by _ROW_ID — triggers row_ranges pushdown in Lance via take_rows()
pb = table.new_read_builder().with_projection(['f0', 'f1', '_ROW_ID']).new_predicate_builder()
rb_filtered = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 3))
filtered = rb_filtered.new_read().to_arrow(rb_filtered.new_scan().plan().splits())
self.assertEqual(filtered.num_rows, 1)
self.assertEqual(filtered.column('f0')[0].as_py(), 3)
self.assertEqual(filtered.column('f1')[0].as_py(), 'v3')

# Filter by _ROW_ID range spanning two files
rb_range = table.new_read_builder().with_filter(pb.between('_ROW_ID', 3, 6))
range_result = rb_range.new_read().to_arrow(rb_range.new_scan().plan().splits())
self.assertEqual(range_result.num_rows, 4)
self.assertEqual(sorted(range_result.column('f0').to_pylist()), [3, 4, 5, 6])

@unittest.skipUnless(HAS_LANCE, "lance not installed")
def test_lance_with_slice(self):
"""Test with_slice on Lance data evolution table."""
pa_schema = pa.schema([
('id', pa.int64()),
('val', pa.int32()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'file.format': 'lance',
'source.split.target-size': '512m',
}
)
self.catalog.create_table('default.test_lance_with_slice', schema, False)
table = self.catalog.get_table('default.test_lance_with_slice')

for batch in [
{'id': [1, 2], 'val': [10, 20]},
{'id': [3, 4], 'val': [30, 40]},
{'id': [5, 6], 'val': [50, 60]},
]:
wb = table.new_batch_write_builder()
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema))
tc.commit(tw.prepare_commit())
tw.close()
tc.close()

rb = table.new_read_builder()

# Full read
full = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(full.num_rows, 6)

# with_slice(1, 4) -> rows at index 1,2,3 -> id in (2,3,4)
scan = rb.new_scan().with_slice(1, 4)
sliced = rb.new_read().to_arrow(scan.plan().splits())
self.assertEqual(sliced.num_rows, 3)
self.assertEqual(sorted(sliced.column('id').to_pylist()), [2, 3, 4])

def test_large_file_read(self):
pa_schema = pa.schema([
('id', pa.int32()),
Expand Down
Loading