From 8931ed03501aceca4d6534dd69d72512e9058771 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Wed, 22 Apr 2026 14:23:05 -0700 Subject: [PATCH 1/4] Equality deletes --- pyiceberg/io/pyarrow.py | 124 +++++++++-- pyiceberg/table/__init__.py | 10 +- pyiceberg/table/delete_file_index.py | 59 +++-- pyiceberg/table/update/snapshot.py | 2 + tests/io/test_equality_deletes.py | 313 +++++++++++++++++++++++++++ tests/io/test_pyarrow.py | 1 + tests/table/test_snapshots.py | 56 ++++- 7 files changed, 515 insertions(+), 50 deletions(-) create mode 100644 tests/io/test_equality_deletes.py diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 8f22261f5d..c0ec1727c2 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1136,6 +1136,13 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray] raise ValueError(f"Delete file format not supported: {data_file.file_format}") +def _read_equality_deletes(io: FileIO, delete_file: DataFile) -> pa.Table: + arrow_format = _get_file_format(delete_file.file_format, pre_buffer=True, buffer_size=ONE_MEGABYTE) + with io.new_input(delete_file.file_path).open() as fi: + fragment = arrow_format.make_fragment(fi) + return ds.Scanner.from_fragment(fragment=fragment).to_table() + + def _combine_positional_deletes(positional_deletes: list[pa.ChunkedArray], start_index: int, end_index: int) -> pa.Array: if len(positional_deletes) == 1: all_chunks = positional_deletes[0] @@ -1609,6 +1616,7 @@ def _task_to_record_batches( table_schema: Schema, projected_field_ids: set[int], positional_deletes: list[ChunkedArray] | None, + equality_deletes: list[tuple[set[int], pa.Table]] | None, case_sensitive: bool, name_mapping: NameMapping | None = None, partition_spec: PartitionSpec | None = None, @@ -1643,14 +1651,20 @@ def _task_to_record_batches( bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter, file_schema) - file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + # Ensure equality delete columns are also projected + all_projected_field_ids = projected_field_ids.copy() + if equality_deletes: + for eq_ids, _ in equality_deletes: + all_projected_field_ids.update(eq_ids) + + file_project_schema = prune_columns(file_schema, all_projected_field_ids, select_full_types=False) fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, schema=physical_schema, # This will push down the query to Arrow. - # But in case there are positional deletes, we have to apply them first - filter=pyarrow_filter if not positional_deletes else None, + # But in case there are positional or equality deletes, we have to apply them first + filter=pyarrow_filter if not positional_deletes and not equality_deletes else None, columns=[col.name for col in file_project_schema.columns], ) @@ -1666,6 +1680,38 @@ def _task_to_record_batches( indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch)) current_batch = current_batch.take(indices) + if current_batch.num_rows > 0 and equality_deletes: + for eq_ids, eq_delete_table in equality_deletes: + try: + eq_file_schema = pyarrow_to_schema( + eq_delete_table.schema, + name_mapping=name_mapping, + format_version=format_version, + ) + + rename_map = {} + for field_id in eq_ids: + file_name = eq_file_schema.find_column_name(field_id) + current_name = table_schema.find_column_name(field_id) + if file_name != current_name: + rename_map[file_name] = current_name + + if rename_map: + eq_delete_table = eq_delete_table.rename_columns( + [rename_map.get(name, name) for name in eq_delete_table.column_names] + ) + + join_keys = [table_schema.find_column_name(field_id) for field_id in eq_ids] + current_table = pa.Table.from_batches([current_batch]) + current_table = current_table.join(eq_delete_table, keys=join_keys, join_type="left anti") + + if current_table.num_rows == 0: + current_batch = current_table.to_batches()[0] + break + current_batch = current_table.to_batches()[0] + except (ValueError, ResolveError): + continue + # skip empty batches if current_batch.num_rows == 0: continue @@ -1691,23 +1737,57 @@ def _task_to_record_batches( ) -def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[str, list[ChunkedArray]]: - deletes_per_file: dict[str, list[ChunkedArray]] = {} +def _read_all_delete_files( + io: FileIO, tasks: Iterable[FileScanTask] +) -> tuple[dict[str, list[ChunkedArray]], dict[str, list[tuple[set[int], pa.Table]]]]: + pos_deletes_per_file: dict[str, list[ChunkedArray]] = {} + eq_deletes_per_file: dict[str, list[tuple[set[int], pa.Table]]] = {} + unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks])) if len(unique_deletes) > 0: + unique_pos_deletes = {d for d in unique_deletes if d.content == DataFileContent.POSITION_DELETES} + unique_eq_deletes = {d for d in unique_deletes if d.content == DataFileContent.EQUALITY_DELETES} + executor = ExecutorFactory.get_or_create() - deletes_per_files: Iterator[dict[str, ChunkedArray]] = executor.map( - lambda args: _read_deletes(*args), - [(io, delete_file) for delete_file in unique_deletes], - ) - for delete in deletes_per_files: - for file, arr in delete.items(): - if file in deletes_per_file: - deletes_per_file[file].append(arr) - else: - deletes_per_file[file] = [arr] - return deletes_per_file + if len(unique_pos_deletes) > 0: + pos_deletes: Iterator[dict[str, ChunkedArray]] = executor.map( + lambda args: _read_deletes(*args), + [(io, delete_file) for delete_file in unique_pos_deletes], + ) + for delete in pos_deletes: + for file, arr in delete.items(): + if file in pos_deletes_per_file: + pos_deletes_per_file[file].append(arr) + else: + pos_deletes_per_file[file] = [arr] + + if len(unique_eq_deletes) > 0: + # We map each unique eq delete file location to its loaded table and its equality IDs + eq_deletes_tables: dict[str, tuple[set[int], pa.Table]] = dict( + zip( + [d.file_path for d in unique_eq_deletes], + zip( + [set(d.equality_ids) if d.equality_ids else set() for d in unique_eq_deletes], + executor.map( + lambda args: _read_equality_deletes(*args), + [(io, d) for d in unique_eq_deletes], + ), + strict=True, + ), + strict=True, + ) + ) + + # Map eq deletes to each task's data file path + for task in tasks: + eq_deletes_for_task = [ + eq_deletes_tables[d.file_path] for d in task.delete_files if d.content == DataFileContent.EQUALITY_DELETES + ] + if eq_deletes_for_task: + eq_deletes_per_file[task.file.file_path] = eq_deletes_for_task + + return pos_deletes_per_file, eq_deletes_per_file class ArrowScan: @@ -1807,7 +1887,7 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record ResolveError: When a required field cannot be found in the file ValueError: When a field type in the file cannot be projected to the schema type """ - deletes_per_file = _read_all_delete_files(self._io, tasks) + pos_deletes_per_file, eq_deletes_per_file = _read_all_delete_files(self._io, tasks) total_row_count = 0 executor = ExecutorFactory.get_or_create() @@ -1816,7 +1896,7 @@ def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]: # Materialize the iterator here to ensure execution happens within the executor. # Otherwise, the iterator would be lazily consumed later (in the main thread), # defeating the purpose of using executor.map. - return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)) + return list(self._record_batches_from_scan_tasks_and_deletes([task], pos_deletes_per_file, eq_deletes_per_file)) limit_reached = False for batches in executor.map(batches_for_task, tasks): @@ -1836,7 +1916,10 @@ def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]: break def _record_batches_from_scan_tasks_and_deletes( - self, tasks: Iterable[FileScanTask], deletes_per_file: dict[str, list[ChunkedArray]] + self, + tasks: Iterable[FileScanTask], + pos_deletes_per_file: dict[str, list[ChunkedArray]], + eq_deletes_per_file: dict[str, list[pa.Table]], ) -> Iterator[pa.RecordBatch]: total_row_count = 0 for task in tasks: @@ -1849,7 +1932,8 @@ def _record_batches_from_scan_tasks_and_deletes( self._projected_schema, self._table_metadata.schema(), self._projected_field_ids, - deletes_per_file.get(task.file.file_path), + pos_deletes_per_file.get(task.file.file_path), + eq_deletes_per_file.get(task.file.file_path), self._case_sensitive, self._table_metadata.name_mapping(), self._table_metadata.specs().get(task.file.spec_id), diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index bb8765b651..7d8e3133c4 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -641,7 +641,7 @@ def delete( self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT) == TableProperties.DELETE_MODE_MERGE_ON_READ ): - warnings.warn("Merge on read is not yet supported, falling back to copy-on-write", stacklevel=2) + raise NotImplementedError("Merge on read is not yet supported") if isinstance(delete_filter, str): delete_filter = _parse_row_filter(delete_filter) @@ -1833,16 +1833,12 @@ def from_rest_response( Raises: NotImplementedError: If equality delete files are encountered. """ - from pyiceberg.catalog.rest.scan_planning import RESTEqualityDeleteFile - data_file = _rest_file_to_data_file(rest_task.data_file) resolved_deletes: set[DataFile] = set() if rest_task.delete_file_references: for idx in rest_task.delete_file_references: delete_file = delete_files[idx] - if isinstance(delete_file, RESTEqualityDeleteFile): - raise NotImplementedError(f"PyIceberg does not yet support equality deletes: {delete_file.file_path}") resolved_deletes.add(_rest_file_to_data_file(delete_file)) return FileScanTask( @@ -2067,10 +2063,8 @@ def _plan_files_local(self) -> Iterable[FileScanTask]: data_file = manifest_entry.data_file if data_file.content == DataFileContent.DATA: data_entries.append(manifest_entry) - elif data_file.content == DataFileContent.POSITION_DELETES: + elif data_file.content in {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}: delete_index.add_delete_file(manifest_entry, partition_key=data_file.partition) - elif data_file.content == DataFileContent.EQUALITY_DELETES: - raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568") else: raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}") return [ diff --git a/pyiceberg/table/delete_file_index.py b/pyiceberg/table/delete_file_index.py index 3f513aabe5..8cd093adc6 100644 --- a/pyiceberg/table/delete_file_index.py +++ b/pyiceberg/table/delete_file_index.py @@ -20,7 +20,7 @@ from pyiceberg.expressions import EqualTo from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator -from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, ManifestEntry +from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, DataFileContent, ManifestEntry from pyiceberg.typedef import Record PATH_FIELD_ID = 2147483546 @@ -59,6 +59,10 @@ def referenced_delete_files(self) -> list[DataFile]: return [data_file for data_file, _ in self._files] +class EqualityDeletes(PositionDeletes): + """Collects equality delete files and indexes them by sequence number.""" + + def _has_path_bounds(delete_file: DataFile) -> bool: lower = delete_file.lower_bounds upper = delete_file.upper_bounds @@ -103,26 +107,32 @@ def _partition_key(spec_id: int, partition: Record | None) -> tuple[int, Record] class DeleteFileIndex: - """Indexes position delete files by partition and by exact data file path.""" + """Indexes position and equality delete files by partition and by exact data file path.""" def __init__(self) -> None: - self._by_partition: dict[tuple[int, Record], PositionDeletes] = {} - self._by_path: dict[str, PositionDeletes] = {} + self._pos_by_partition: dict[tuple[int, Record], PositionDeletes] = {} + self._pos_by_path: dict[str, PositionDeletes] = {} + self._eq_by_partition: dict[tuple[int, Record], EqualityDeletes] = {} def is_empty(self) -> bool: - return not self._by_partition and not self._by_path + return not self._pos_by_partition and not self._pos_by_path and not self._eq_by_partition def add_delete_file(self, manifest_entry: ManifestEntry, partition_key: Record | None = None) -> None: delete_file = manifest_entry.data_file seq = manifest_entry.sequence_number or INITIAL_SEQUENCE_NUMBER - target_path = _referenced_data_file_path(delete_file) - if target_path: - deletes = self._by_path.setdefault(target_path, PositionDeletes()) - deletes.add(delete_file, seq) - else: + if delete_file.content == DataFileContent.POSITION_DELETES: + target_path = _referenced_data_file_path(delete_file) + if target_path: + deletes = self._pos_by_path.setdefault(target_path, PositionDeletes()) + deletes.add(delete_file, seq) + else: + key = _partition_key(delete_file.spec_id or 0, partition_key) + deletes = self._pos_by_partition.setdefault(key, PositionDeletes()) + deletes.add(delete_file, seq) + elif delete_file.content == DataFileContent.EQUALITY_DELETES: key = _partition_key(delete_file.spec_id or 0, partition_key) - deletes = self._by_partition.setdefault(key, PositionDeletes()) + deletes = self._eq_by_partition.setdefault(key, EqualityDeletes()) deletes.add(delete_file, seq) def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record | None = None) -> set[DataFile]: @@ -131,27 +141,36 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record deletes: set[DataFile] = set() spec_id = data_file.spec_id or 0 - key = _partition_key(spec_id, partition_key) - partition_deletes = self._by_partition.get(key) - if partition_deletes: - for delete_file in partition_deletes.filter_by_seq(seq_num): + + # Add position deletes + partition_pos_deletes = self._pos_by_partition.get(key) + if partition_pos_deletes: + for delete_file in partition_pos_deletes.filter_by_seq(seq_num): if _applies_to_data_file(delete_file, data_file): deletes.add(delete_file) - path_deletes = self._by_path.get(data_file.file_path) - if path_deletes: - deletes.update(path_deletes.filter_by_seq(seq_num)) + path_pos_deletes = self._pos_by_path.get(data_file.file_path) + if path_pos_deletes: + deletes.update(path_pos_deletes.filter_by_seq(seq_num)) + + # Add equality deletes + partition_eq_deletes = self._eq_by_partition.get(key) + if partition_eq_deletes: + deletes.update(partition_eq_deletes.filter_by_seq(seq_num)) return deletes def referenced_delete_files(self) -> list[DataFile]: data_files: list[DataFile] = [] - for deletes in self._by_partition.values(): + for deletes in self._pos_by_partition.values(): + data_files.extend(deletes.referenced_delete_files()) + + for deletes in self._pos_by_path.values(): data_files.extend(deletes.referenced_delete_files()) - for deletes in self._by_path.values(): + for deletes in self._eq_by_partition.values(): data_files.extend(deletes.referenced_delete_files()) return data_files diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 37d120969a..3120692efd 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -145,6 +145,8 @@ def _validate_target_branch(self, branch: str | None) -> str | None: return branch def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: + if data_file.content == DataFileContent.EQUALITY_DELETES: + raise NotImplementedError(f"PyIceberg does not support writing {data_file.content}") self._added_data_files.append(data_file) return self diff --git a/tests/io/test_equality_deletes.py b/tests/io/test_equality_deletes.py new file mode 100644 index 0000000000..b8625bc67d --- /dev/null +++ b/tests/io/test_equality_deletes.py @@ -0,0 +1,313 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os +import tempfile + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.expressions import AlwaysTrue +from pyiceberg.io.pyarrow import ArrowScan, PyArrowFileIO, _task_to_record_batches +from pyiceberg.manifest import DataFile, DataFileContent, FileFormat +from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import FileScanTask, TableProperties +from pyiceberg.table.metadata import TableMetadataV2 +from pyiceberg.typedef import Record +from pyiceberg.types import IntegerType, NestedField, StringType + + +@pytest.fixture +def table_schema() -> Schema: + return Schema( + NestedField(1, "id", IntegerType()), + NestedField(2, "data", StringType()), + ) + + +def test_task_to_record_batches_with_equality_deletes(table_schema: Schema) -> None: + with tempfile.TemporaryDirectory() as temp_dir: + # 1. Create data file + data_file_path = os.path.join(temp_dir, "data.parquet") + data_table = pa.Table.from_arrays([pa.array([1, 2, 3]), pa.array(["a", "b", "c"])], names=["id", "data"]) + # Add field IDs to schema metadata + data_table = data_table.cast( + pa.schema( + [ + pa.field("id", pa.int32(), metadata={b"PARQUET:field_id": b"1"}), + pa.field("data", pa.string(), metadata={b"PARQUET:field_id": b"2"}), + ] + ) + ) + pq.write_table(data_table, data_file_path) + + # 2. Create equality delete file (deleting id=2) + delete_file_path = os.path.join(temp_dir, "delete.parquet") + delete_table = pa.Table.from_arrays([pa.array([2], type=pa.int32())], names=["id"]) + delete_table = delete_table.cast( + pa.schema( + [ + pa.field("id", pa.int32(), metadata={b"PARQUET:field_id": b"1"}), + ] + ) + ) + pq.write_table(delete_table, delete_file_path) + + # 3. Set up Iceberg metadata + io = PyArrowFileIO() + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=data_file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=3, + file_size_in_bytes=os.path.getsize(data_file_path), + ) + data_file.spec_id = 0 + delete_file = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path=delete_file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=1, + file_size_in_bytes=os.path.getsize(delete_file_path), + equality_ids=[1], + ) + delete_file.spec_id = 0 + + task = FileScanTask( + data_file=data_file, + delete_files={delete_file}, + ) + + # 4. Run _task_to_record_batches + # We need to pass the equality delete table already loaded + eq_deletes = [({1}, delete_table)] + + batches = list( + _task_to_record_batches( + io=io, + task=task, + bound_row_filter=AlwaysTrue(), + projected_schema=table_schema, + table_schema=table_schema, + projected_field_ids={1, 2}, + positional_deletes=None, + equality_deletes=eq_deletes, + case_sensitive=True, + ) + ) + + # 5. Verify results + result_table = pa.Table.from_batches(batches) + assert result_table.to_pydict() == {"id": [1, 3], "data": ["a", "c"]} + + +def test_task_to_record_batches_with_multiple_equality_deletes(table_schema: Schema) -> None: + with tempfile.TemporaryDirectory() as temp_dir: + # 1. Create data file + data_file_path = os.path.join(temp_dir, "data.parquet") + data_table = pa.Table.from_arrays([pa.array([1, 2, 3, 4]), pa.array(["a", "b", "c", "d"])], names=["id", "data"]) + data_table = data_table.cast( + pa.schema( + [ + pa.field("id", pa.int32(), metadata={b"PARQUET:field_id": b"1"}), + pa.field("data", pa.string(), metadata={b"PARQUET:field_id": b"2"}), + ] + ) + ) + pq.write_table(data_table, data_file_path) + + # 2. Create equality delete file 1 (deleting id=2) + delete_file_path1 = os.path.join(temp_dir, "delete1.parquet") + delete_table1 = pa.Table.from_arrays([pa.array([2], type=pa.int32())], names=["id"]) + delete_table1 = delete_table1.cast( + pa.schema( + [ + pa.field("id", pa.int32(), metadata={b"PARQUET:field_id": b"1"}), + ] + ) + ) + pq.write_table(delete_table1, delete_file_path1) + + # 3. Create equality delete file 2 (deleting data='c') + delete_file_path2 = os.path.join(temp_dir, "delete2.parquet") + delete_table2 = pa.Table.from_arrays([pa.array(["c"])], names=["data"]) + delete_table2 = delete_table2.cast( + pa.schema( + [ + pa.field("data", pa.string(), metadata={b"PARQUET:field_id": b"2"}), + ] + ) + ) + pq.write_table(delete_table2, delete_file_path2) + + io = PyArrowFileIO() + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=data_file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=4, + file_size_in_bytes=os.path.getsize(data_file_path), + ) + data_file.spec_id = 0 + + task = FileScanTask(data_file=data_file, delete_files=set()) + + eq_deletes = [ + ({1}, delete_table1), + ({2}, delete_table2), + ] + + batches = list( + _task_to_record_batches( + io=io, + task=task, + bound_row_filter=AlwaysTrue(), + projected_schema=table_schema, + table_schema=table_schema, + projected_field_ids={1, 2}, + positional_deletes=None, + equality_deletes=eq_deletes, + case_sensitive=True, + ) + ) + + # 5. Verify results + result_table = pa.Table.from_batches(batches) + assert result_table.to_pydict() == {"id": [1, 4], "data": ["a", "d"]} + + +def test_arrow_scan_with_equality_deletes(table_schema: Schema) -> None: + with tempfile.TemporaryDirectory() as temp_dir: + # 1. Create data file + data_file_path = os.path.join(temp_dir, "data.parquet") + data_table = pa.Table.from_arrays([pa.array([1, 2, 3], type=pa.int32()), pa.array(["a", "b", "c"])], names=["id", "data"]) + data_table = data_table.cast( + pa.schema( + [ + pa.field("id", pa.int32(), metadata={b"PARQUET:field_id": b"1"}), + pa.field("data", pa.string(), metadata={b"PARQUET:field_id": b"2"}), + ] + ) + ) + pq.write_table(data_table, data_file_path) + + # 2. Create equality delete file (deleting id=2) + delete_file_path = os.path.join(temp_dir, "delete.parquet") + delete_table = pa.Table.from_arrays([pa.array([2], type=pa.int32())], names=["id"]) + delete_table = delete_table.cast( + pa.schema( + [ + pa.field("id", pa.int32(), metadata={b"PARQUET:field_id": b"1"}), + ] + ) + ) + pq.write_table(delete_table, delete_file_path) + + # 3. Set up Iceberg metadata + io = PyArrowFileIO() + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=data_file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=3, + file_size_in_bytes=os.path.getsize(data_file_path), + ) + data_file.spec_id = 0 + delete_file = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path=delete_file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=1, + file_size_in_bytes=os.path.getsize(delete_file_path), + equality_ids=[1], + ) + delete_file.spec_id = 0 + + task = FileScanTask( + data_file=data_file, + delete_files={delete_file}, + ) + + metadata = TableMetadataV2( + location=temp_dir, + table_uuid="fb072c92-a02b-11e9-ae9c-1bb7bc9eca94", + last_column_id=2, + format_version=2, + current_schema_id=table_schema.schema_id, + schemas=[table_schema], + partition_specs=[PartitionSpec(spec_id=0)], + default_spec_id=0, + last_partition_id=1000, + properties={}, + snapshots=[], + ) + + scan = ArrowScan( + table_metadata=metadata, + io=io, + projected_schema=table_schema, + row_filter=AlwaysTrue(), + case_sensitive=True, + ) + + # 4. Run to_table + result_table = scan.to_table(tasks=[task]) + + # 5. Verify results + assert result_table.to_pydict() == {"id": [1, 3], "data": ["a", "c"]} + + +def test_block_writing_equality_deletes(table_schema: Schema, catalog: Catalog) -> None: + identifier = "default.test_block_writing_equality_deletes" + + # Create table with MoR delete mode + catalog.create_table( + identifier, + schema=table_schema, + properties={TableProperties.DELETE_MODE: TableProperties.DELETE_MODE_MERGE_ON_READ, "format-version": "2"}, + ) + + tbl = catalog.load_table(identifier) + + df = pa.Table.from_pydict({"id": [1], "data": ["a"]}) + tbl.append(df) + + # Create a Equality Delete file to force writing it. + delete_file = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path="/tmp/delete.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=1, + file_size_in_bytes=100, + equality_ids=[1], + ) + delete_file.spec_id = 0 + + with pytest.raises(NotImplementedError) as exc: + with tbl.transaction() as tx: + with tx.update_snapshot().fast_append() as append: + append.append_data_file(delete_file) + + assert "PyIceberg does not support writing EQUALITY_DELETES" in str(exc.value) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 2170741bdd..ae70df3fc9 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -3130,6 +3130,7 @@ def test_task_to_record_batches_nanos(format_version: TableVersion, tmpdir: str) table_schema=table_schema, projected_field_ids={1}, positional_deletes=None, + equality_deletes=None, case_sensitive=True, format_version=format_version, ) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index cfdc516227..363930ed40 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -19,10 +19,11 @@ import pytest -from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile +from pyiceberg.catalog import Catalog +from pyiceberg.manifest import DataFile, DataFileContent, FileFormat, ManifestContent, ManifestFile from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import Table +from pyiceberg.table import Table, TableProperties from pyiceberg.table.snapshots import ( Operation, Snapshot, @@ -551,3 +552,54 @@ def test_latest_ancestor_before_timestamp() -> None: result = latest_ancestor_before_timestamp(metadata, 1000) assert result is None + + +def test_block_writing_equality_deletes(table_schema_simple: Schema, catalog: Catalog) -> None: + identifier = "default.test_block_writing_equality_deletes" + catalog.create_namespace("default") + + # Create table with MoR delete mode + catalog.create_table( + identifier, + schema=table_schema_simple, + properties={TableProperties.DELETE_MODE: TableProperties.DELETE_MODE_MERGE_ON_READ, "format-version": "2"}, + ) + + tbl = catalog.load_table(identifier) + + # Attempting to manually append an equality delete file should fail + delete_file = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path="/tmp/delete.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=1, + file_size_in_bytes=100, + equality_ids=[1], + ) + delete_file.spec_id = 0 + + with pytest.raises(NotImplementedError) as exc: + with tbl.transaction() as tx: + with tx.update_snapshot().fast_append() as append: + append.append_data_file(delete_file) + + assert "PyIceberg does not support writing DataFileContent.EQUALITY_DELETES" in str(exc.value) + + +def test_delete_mode_mor_raises_error(table_schema_simple: Schema, catalog: Catalog) -> None: + identifier = "default.test_delete_mode_mor_raises_error" + catalog.create_namespace("default") + + catalog.create_table( + identifier, + schema=table_schema_simple, + properties={TableProperties.DELETE_MODE: TableProperties.DELETE_MODE_MERGE_ON_READ, "format-version": "2"}, + ) + + tbl = catalog.load_table(identifier) + + with pytest.raises(NotImplementedError) as exc: + tbl.delete("id = 1") + + assert "Merge on read is not yet supported" in str(exc.value) From b10575c2cb419a3f5dbd147c1a071bf23cd147d7 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Wed, 22 Apr 2026 15:10:43 -0700 Subject: [PATCH 2/4] undo warnings.warn --- pyiceberg/table/__init__.py | 31 +++++++++++++++------- tests/catalog/test_scan_planning_models.py | 14 +++++++--- tests/table/test_snapshots.py | 10 +++---- 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 7d8e3133c4..5f751fd5e4 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -641,7 +641,7 @@ def delete( self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT) == TableProperties.DELETE_MODE_MERGE_ON_READ ): - raise NotImplementedError("Merge on read is not yet supported") + warnings.warn("Merge on read is not yet supported, falling back to copy-on-write", stacklevel=2) if isinstance(delete_filter, str): delete_filter = _parse_row_filter(delete_filter) @@ -1829,9 +1829,6 @@ def from_rest_response( Returns: A FileScanTask with the converted data and delete files. - - Raises: - NotImplementedError: If equality delete files are encountered. """ data_file = _rest_file_to_data_file(rest_task.data_file) @@ -1850,18 +1847,28 @@ def from_rest_response( def _rest_file_to_data_file(rest_file: RESTContentFile) -> DataFile: """Convert a REST content file to a manifest DataFile.""" - from pyiceberg.catalog.rest.scan_planning import RESTDataFile + from pyiceberg.catalog.rest.scan_planning import RESTDataFile, RESTEqualityDeleteFile, RESTPositionDeleteFile + + column_sizes = None + value_counts = None + null_value_counts = None + nan_value_counts = None + equality_ids = None + referenced_data_file = None + content_offset = None + content_size_in_bytes = None if isinstance(rest_file, RESTDataFile): column_sizes = rest_file.column_sizes.to_dict() if rest_file.column_sizes else None value_counts = rest_file.value_counts.to_dict() if rest_file.value_counts else None null_value_counts = rest_file.null_value_counts.to_dict() if rest_file.null_value_counts else None nan_value_counts = rest_file.nan_value_counts.to_dict() if rest_file.nan_value_counts else None - else: - column_sizes = None - value_counts = None - null_value_counts = None - nan_value_counts = None + elif isinstance(rest_file, RESTEqualityDeleteFile): + equality_ids = rest_file.equality_ids + elif isinstance(rest_file, RESTPositionDeleteFile): + referenced_data_file = rest_file.referenced_data_file + content_offset = rest_file.content_offset + content_size_in_bytes = rest_file.content_size_in_bytes data_file = DataFile.from_args( content=DataFileContent.from_rest_type(rest_file.content), @@ -1875,6 +1882,10 @@ def _rest_file_to_data_file(rest_file: RESTContentFile) -> DataFile: null_value_counts=null_value_counts, nan_value_counts=nan_value_counts, split_offsets=rest_file.split_offsets, + equality_ids=equality_ids, + referenced_data_file=referenced_data_file, + content_offset=content_offset, + content_size_in_bytes=content_size_in_bytes, sort_order_id=rest_file.sort_order_id, ) data_file.spec_id = rest_file.spec_id diff --git a/tests/catalog/test_scan_planning_models.py b/tests/catalog/test_scan_planning_models.py index 567f1444a7..965af997f3 100644 --- a/tests/catalog/test_scan_planning_models.py +++ b/tests/catalog/test_scan_planning_models.py @@ -38,7 +38,7 @@ ValueMap, ) from pyiceberg.expressions import AlwaysTrue, EqualTo, Reference -from pyiceberg.manifest import FileFormat +from pyiceberg.manifest import DataFileContent, FileFormat TEST_URI = "https://iceberg-test-catalog/" @@ -545,7 +545,7 @@ def test_plan_scan_cancelled(rest_scan_catalog: RestCatalog, requests_mock: Mock list(rest_scan_catalog.plan_scan(("db", "tbl"), request)) -def test_plan_scan_equality_deletes_not_supported(rest_scan_catalog: RestCatalog, requests_mock: Mocker) -> None: +def test_plan_scan_equality_deletes_supported(rest_scan_catalog: RestCatalog, requests_mock: Mocker) -> None: file_one = _rest_data_file(file_path="s3://bucket/tbl/data/file1.parquet") equality_delete = _rest_equality_delete_file(equality_ids=[1, 2]) requests_mock.post( @@ -566,5 +566,11 @@ def test_plan_scan_equality_deletes_not_supported(rest_scan_catalog: RestCatalog ) request = PlanTableScanRequest() - with pytest.raises(NotImplementedError, match="PyIceberg does not yet support equality deletes"): - list(rest_scan_catalog.plan_scan(("db", "tbl"), request)) + tasks = list(rest_scan_catalog.plan_scan(("db", "tbl"), request)) + assert len(tasks) == 1 + task = tasks[0] + assert task.file.file_path == "s3://bucket/tbl/data/file1.parquet" + assert len(task.delete_files) == 1 + delete_file = list(task.delete_files)[0] + assert delete_file.content == DataFileContent.EQUALITY_DELETES + assert delete_file.equality_ids == [1, 2] diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 363930ed40..ca5090cbf1 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -587,8 +587,8 @@ def test_block_writing_equality_deletes(table_schema_simple: Schema, catalog: Ca assert "PyIceberg does not support writing DataFileContent.EQUALITY_DELETES" in str(exc.value) -def test_delete_mode_mor_raises_error(table_schema_simple: Schema, catalog: Catalog) -> None: - identifier = "default.test_delete_mode_mor_raises_error" +def test_delete_mode_mor_warns(table_schema_simple: Schema, catalog: Catalog) -> None: + identifier = "default.test_delete_mode_mor_warns" catalog.create_namespace("default") catalog.create_table( @@ -599,7 +599,7 @@ def test_delete_mode_mor_raises_error(table_schema_simple: Schema, catalog: Cata tbl = catalog.load_table(identifier) - with pytest.raises(NotImplementedError) as exc: - tbl.delete("id = 1") + with pytest.warns() as record: + tbl.delete("foo = 'a'") - assert "Merge on read is not yet supported" in str(exc.value) + assert any("Merge on read is not yet supported, falling back to copy-on-write" in str(w.message) for w in record) From 4b3a1138f88ca58dd0b2cb04d8bb0a689da25be2 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Wed, 22 Apr 2026 15:42:21 -0700 Subject: [PATCH 3/4] fix unit tests --- pyiceberg/table/update/snapshot.py | 2 +- tests/io/test_equality_deletes.py | 11 ++++++++++- tests/table/test_snapshots.py | 2 +- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 3120692efd..d30cfa7aa1 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -146,7 +146,7 @@ def _validate_target_branch(self, branch: str | None) -> str | None: def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: if data_file.content == DataFileContent.EQUALITY_DELETES: - raise NotImplementedError(f"PyIceberg does not support writing {data_file.content}") + raise NotImplementedError(f"PyIceberg does not support writing {data_file.content.name}") self._added_data_files.append(data_file) return self diff --git a/tests/io/test_equality_deletes.py b/tests/io/test_equality_deletes.py index b8625bc67d..2bb264c695 100644 --- a/tests/io/test_equality_deletes.py +++ b/tests/io/test_equality_deletes.py @@ -282,6 +282,7 @@ def test_block_writing_equality_deletes(table_schema: Schema, catalog: Catalog) identifier = "default.test_block_writing_equality_deletes" # Create table with MoR delete mode + catalog.create_namespace("default") catalog.create_table( identifier, schema=table_schema, @@ -290,7 +291,15 @@ def test_block_writing_equality_deletes(table_schema: Schema, catalog: Catalog) tbl = catalog.load_table(identifier) - df = pa.Table.from_pydict({"id": [1], "data": ["a"]}) + df = pa.Table.from_pydict( + {"id": [1], "data": ["a"]}, + schema=pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("data", pa.string()), + ] + ), + ) tbl.append(df) # Create a Equality Delete file to force writing it. diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index ca5090cbf1..3d1275625b 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -584,7 +584,7 @@ def test_block_writing_equality_deletes(table_schema_simple: Schema, catalog: Ca with tx.update_snapshot().fast_append() as append: append.append_data_file(delete_file) - assert "PyIceberg does not support writing DataFileContent.EQUALITY_DELETES" in str(exc.value) + assert "PyIceberg does not support writing EQUALITY_DELETES" in str(exc.value) def test_delete_mode_mor_warns(table_schema_simple: Schema, catalog: Catalog) -> None: From 96f4ff7735adf5d65e16cf59fb74372d7d84cf71 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Thu, 23 Apr 2026 15:45:24 -0700 Subject: [PATCH 4/4] fixing indexes --- pyiceberg/table/delete_file_index.py | 5 ++++ tests/table/test_delete_file_index.py | 33 +++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/pyiceberg/table/delete_file_index.py b/pyiceberg/table/delete_file_index.py index 8cd093adc6..fa54618514 100644 --- a/pyiceberg/table/delete_file_index.py +++ b/pyiceberg/table/delete_file_index.py @@ -62,6 +62,11 @@ def referenced_delete_files(self) -> list[DataFile]: class EqualityDeletes(PositionDeletes): """Collects equality delete files and indexes them by sequence number.""" + def add(self, delete_file: DataFile, seq_num: int) -> None: + # Equality deletes are indexed by sequence number - 1 to ensure they only + # apply to data files added in strictly earlier snapshots. + super().add(delete_file, seq_num - 1) + def _has_path_bounds(delete_file: DataFile) -> bool: lower = delete_file.lower_bounds diff --git a/tests/table/test_delete_file_index.py b/tests/table/test_delete_file_index.py index 09dd9ac81b..51aed9c114 100644 --- a/tests/table/test_delete_file_index.py +++ b/tests/table/test_delete_file_index.py @@ -81,6 +81,20 @@ def _create_deletion_vector( return ManifestEntry.from_args(status=ManifestEntryStatus.ADDED, sequence_number=sequence_number, data_file=delete_file) +def _create_equality_delete(sequence_number: int = 1, spec_id: int = 0) -> ManifestEntry: + delete_file = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path=f"s3://bucket/eq-delete-{sequence_number}.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=10, + file_size_in_bytes=100, + equality_ids=[1], + ) + delete_file._spec_id = spec_id + return ManifestEntry.from_args(status=ManifestEntryStatus.ADDED, sequence_number=sequence_number, data_file=delete_file) + + def test_empty_index() -> None: index = DeleteFileIndex() data_file = _create_data_file() @@ -187,3 +201,22 @@ def test_record_equality_for_partition_lookup() -> None: assert len(index.for_data_file(1, data_file, partition_b)) == 1 assert len(index.for_data_file(1, data_file, partition_c)) == 0 + + +def test_equality_delete_sequence_number_filtering() -> None: + index = DeleteFileIndex() + + # Equality delete with sequence number 2 + index.add_delete_file(_create_equality_delete(sequence_number=2)) + + data_file = _create_data_file() + + # Data file with sequence number 1 should be affected by equality delete with sequence number 2 + assert len(index.for_data_file(1, data_file)) == 1 + + # Data file with sequence number 2 should NOT be affected by equality delete with sequence number 2 + # Equality deletes apply only to data files added in strictly earlier snapshots (seq - 1) + assert len(index.for_data_file(2, data_file)) == 0 + + # Data file with sequence number 3 should NOT be affected + assert len(index.for_data_file(3, data_file)) == 0