Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 104 additions & 20 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,13 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray]
raise ValueError(f"Delete file format not supported: {data_file.file_format}")


def _read_equality_deletes(io: FileIO, delete_file: DataFile) -> pa.Table:
arrow_format = _get_file_format(delete_file.file_format, pre_buffer=True, buffer_size=ONE_MEGABYTE)
with io.new_input(delete_file.file_path).open() as fi:
fragment = arrow_format.make_fragment(fi)
return ds.Scanner.from_fragment(fragment=fragment).to_table()


def _combine_positional_deletes(positional_deletes: list[pa.ChunkedArray], start_index: int, end_index: int) -> pa.Array:
if len(positional_deletes) == 1:
all_chunks = positional_deletes[0]
Expand Down Expand Up @@ -1609,6 +1616,7 @@ def _task_to_record_batches(
table_schema: Schema,
projected_field_ids: set[int],
positional_deletes: list[ChunkedArray] | None,
equality_deletes: list[tuple[set[int], pa.Table]] | None,
case_sensitive: bool,
name_mapping: NameMapping | None = None,
partition_spec: PartitionSpec | None = None,
Expand Down Expand Up @@ -1643,14 +1651,20 @@ def _task_to_record_batches(
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
pyarrow_filter = expression_to_pyarrow(bound_file_filter, file_schema)

file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)
# Ensure equality delete columns are also projected
all_projected_field_ids = projected_field_ids.copy()
if equality_deletes:
for eq_ids, _ in equality_deletes:
all_projected_field_ids.update(eq_ids)

file_project_schema = prune_columns(file_schema, all_projected_field_ids, select_full_types=False)

fragment_scanner = ds.Scanner.from_fragment(
fragment=fragment,
schema=physical_schema,
# This will push down the query to Arrow.
# But in case there are positional deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes else None,
# But in case there are positional or equality deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes and not equality_deletes else None,
columns=[col.name for col in file_project_schema.columns],
)

Expand All @@ -1666,6 +1680,38 @@ def _task_to_record_batches(
indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch))
current_batch = current_batch.take(indices)

if current_batch.num_rows > 0 and equality_deletes:
for eq_ids, eq_delete_table in equality_deletes:
try:
eq_file_schema = pyarrow_to_schema(
eq_delete_table.schema,
name_mapping=name_mapping,
format_version=format_version,
)

rename_map = {}
for field_id in eq_ids:
file_name = eq_file_schema.find_column_name(field_id)
current_name = table_schema.find_column_name(field_id)
if file_name != current_name:
rename_map[file_name] = current_name

if rename_map:
eq_delete_table = eq_delete_table.rename_columns(
[rename_map.get(name, name) for name in eq_delete_table.column_names]
)

join_keys = [table_schema.find_column_name(field_id) for field_id in eq_ids]
current_table = pa.Table.from_batches([current_batch])
current_table = current_table.join(eq_delete_table, keys=join_keys, join_type="left anti")

if current_table.num_rows == 0:
current_batch = current_table.to_batches()[0]
break
current_batch = current_table.to_batches()[0]
except (ValueError, ResolveError):
continue

# skip empty batches
if current_batch.num_rows == 0:
continue
Expand All @@ -1691,23 +1737,57 @@ def _task_to_record_batches(
)


def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[str, list[ChunkedArray]]:
deletes_per_file: dict[str, list[ChunkedArray]] = {}
def _read_all_delete_files(
io: FileIO, tasks: Iterable[FileScanTask]
) -> tuple[dict[str, list[ChunkedArray]], dict[str, list[tuple[set[int], pa.Table]]]]:
pos_deletes_per_file: dict[str, list[ChunkedArray]] = {}
eq_deletes_per_file: dict[str, list[tuple[set[int], pa.Table]]] = {}

unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks]))
if len(unique_deletes) > 0:
unique_pos_deletes = {d for d in unique_deletes if d.content == DataFileContent.POSITION_DELETES}
unique_eq_deletes = {d for d in unique_deletes if d.content == DataFileContent.EQUALITY_DELETES}

executor = ExecutorFactory.get_or_create()
deletes_per_files: Iterator[dict[str, ChunkedArray]] = executor.map(
lambda args: _read_deletes(*args),
[(io, delete_file) for delete_file in unique_deletes],
)
for delete in deletes_per_files:
for file, arr in delete.items():
if file in deletes_per_file:
deletes_per_file[file].append(arr)
else:
deletes_per_file[file] = [arr]

return deletes_per_file
if len(unique_pos_deletes) > 0:
pos_deletes: Iterator[dict[str, ChunkedArray]] = executor.map(
lambda args: _read_deletes(*args),
[(io, delete_file) for delete_file in unique_pos_deletes],
)
for delete in pos_deletes:
for file, arr in delete.items():
if file in pos_deletes_per_file:
pos_deletes_per_file[file].append(arr)
else:
pos_deletes_per_file[file] = [arr]

if len(unique_eq_deletes) > 0:
# We map each unique eq delete file location to its loaded table and its equality IDs
eq_deletes_tables: dict[str, tuple[set[int], pa.Table]] = dict(
zip(
[d.file_path for d in unique_eq_deletes],
zip(
[set(d.equality_ids) if d.equality_ids else set() for d in unique_eq_deletes],
executor.map(
lambda args: _read_equality_deletes(*args),
[(io, d) for d in unique_eq_deletes],
),
strict=True,
),
strict=True,
)
)

# Map eq deletes to each task's data file path
for task in tasks:
eq_deletes_for_task = [
eq_deletes_tables[d.file_path] for d in task.delete_files if d.content == DataFileContent.EQUALITY_DELETES
]
if eq_deletes_for_task:
eq_deletes_per_file[task.file.file_path] = eq_deletes_for_task

return pos_deletes_per_file, eq_deletes_per_file


class ArrowScan:
Expand Down Expand Up @@ -1807,7 +1887,7 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type
"""
deletes_per_file = _read_all_delete_files(self._io, tasks)
pos_deletes_per_file, eq_deletes_per_file = _read_all_delete_files(self._io, tasks)

total_row_count = 0
executor = ExecutorFactory.get_or_create()
Expand All @@ -1816,7 +1896,7 @@ def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]:
# Materialize the iterator here to ensure execution happens within the executor.
# Otherwise, the iterator would be lazily consumed later (in the main thread),
# defeating the purpose of using executor.map.
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
return list(self._record_batches_from_scan_tasks_and_deletes([task], pos_deletes_per_file, eq_deletes_per_file))

limit_reached = False
for batches in executor.map(batches_for_task, tasks):
Expand All @@ -1836,7 +1916,10 @@ def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]:
break

def _record_batches_from_scan_tasks_and_deletes(
self, tasks: Iterable[FileScanTask], deletes_per_file: dict[str, list[ChunkedArray]]
self,
tasks: Iterable[FileScanTask],
pos_deletes_per_file: dict[str, list[ChunkedArray]],
eq_deletes_per_file: dict[str, list[pa.Table]],
) -> Iterator[pa.RecordBatch]:
total_row_count = 0
for task in tasks:
Expand All @@ -1849,7 +1932,8 @@ def _record_batches_from_scan_tasks_and_deletes(
self._projected_schema,
self._table_metadata.schema(),
self._projected_field_ids,
deletes_per_file.get(task.file.file_path),
pos_deletes_per_file.get(task.file.file_path),
eq_deletes_per_file.get(task.file.file_path),
self._case_sensitive,
self._table_metadata.name_mapping(),
self._table_metadata.specs().get(task.file.spec_id),
Expand Down
37 changes: 21 additions & 16 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1829,20 +1829,13 @@ def from_rest_response(

Returns:
A FileScanTask with the converted data and delete files.

Raises:
NotImplementedError: If equality delete files are encountered.
"""
from pyiceberg.catalog.rest.scan_planning import RESTEqualityDeleteFile

data_file = _rest_file_to_data_file(rest_task.data_file)

resolved_deletes: set[DataFile] = set()
if rest_task.delete_file_references:
for idx in rest_task.delete_file_references:
delete_file = delete_files[idx]
if isinstance(delete_file, RESTEqualityDeleteFile):
raise NotImplementedError(f"PyIceberg does not yet support equality deletes: {delete_file.file_path}")
resolved_deletes.add(_rest_file_to_data_file(delete_file))

return FileScanTask(
Expand All @@ -1854,18 +1847,28 @@ def from_rest_response(

def _rest_file_to_data_file(rest_file: RESTContentFile) -> DataFile:
"""Convert a REST content file to a manifest DataFile."""
from pyiceberg.catalog.rest.scan_planning import RESTDataFile
from pyiceberg.catalog.rest.scan_planning import RESTDataFile, RESTEqualityDeleteFile, RESTPositionDeleteFile

column_sizes = None
value_counts = None
null_value_counts = None
nan_value_counts = None
equality_ids = None
referenced_data_file = None
content_offset = None
content_size_in_bytes = None

if isinstance(rest_file, RESTDataFile):
column_sizes = rest_file.column_sizes.to_dict() if rest_file.column_sizes else None
value_counts = rest_file.value_counts.to_dict() if rest_file.value_counts else None
null_value_counts = rest_file.null_value_counts.to_dict() if rest_file.null_value_counts else None
nan_value_counts = rest_file.nan_value_counts.to_dict() if rest_file.nan_value_counts else None
else:
column_sizes = None
value_counts = None
null_value_counts = None
nan_value_counts = None
elif isinstance(rest_file, RESTEqualityDeleteFile):
equality_ids = rest_file.equality_ids
elif isinstance(rest_file, RESTPositionDeleteFile):
referenced_data_file = rest_file.referenced_data_file
content_offset = rest_file.content_offset
content_size_in_bytes = rest_file.content_size_in_bytes

data_file = DataFile.from_args(
content=DataFileContent.from_rest_type(rest_file.content),
Expand All @@ -1879,6 +1882,10 @@ def _rest_file_to_data_file(rest_file: RESTContentFile) -> DataFile:
null_value_counts=null_value_counts,
nan_value_counts=nan_value_counts,
split_offsets=rest_file.split_offsets,
equality_ids=equality_ids,
referenced_data_file=referenced_data_file,
content_offset=content_offset,
content_size_in_bytes=content_size_in_bytes,
sort_order_id=rest_file.sort_order_id,
)
data_file.spec_id = rest_file.spec_id
Expand Down Expand Up @@ -2067,10 +2074,8 @@ def _plan_files_local(self) -> Iterable[FileScanTask]:
data_file = manifest_entry.data_file
if data_file.content == DataFileContent.DATA:
data_entries.append(manifest_entry)
elif data_file.content == DataFileContent.POSITION_DELETES:
elif data_file.content in {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}:
delete_index.add_delete_file(manifest_entry, partition_key=data_file.partition)
elif data_file.content == DataFileContent.EQUALITY_DELETES:
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
else:
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")
return [
Expand Down
64 changes: 44 additions & 20 deletions pyiceberg/table/delete_file_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from pyiceberg.expressions import EqualTo
from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator
from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, ManifestEntry
from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, DataFileContent, ManifestEntry
from pyiceberg.typedef import Record

PATH_FIELD_ID = 2147483546
Expand Down Expand Up @@ -59,6 +59,15 @@ def referenced_delete_files(self) -> list[DataFile]:
return [data_file for data_file, _ in self._files]


class EqualityDeletes(PositionDeletes):
"""Collects equality delete files and indexes them by sequence number."""

def add(self, delete_file: DataFile, seq_num: int) -> None:
# Equality deletes are indexed by sequence number - 1 to ensure they only
# apply to data files added in strictly earlier snapshots.
super().add(delete_file, seq_num - 1)


def _has_path_bounds(delete_file: DataFile) -> bool:
lower = delete_file.lower_bounds
upper = delete_file.upper_bounds
Expand Down Expand Up @@ -103,26 +112,32 @@ def _partition_key(spec_id: int, partition: Record | None) -> tuple[int, Record]


class DeleteFileIndex:
"""Indexes position delete files by partition and by exact data file path."""
"""Indexes position and equality delete files by partition and by exact data file path."""

def __init__(self) -> None:
self._by_partition: dict[tuple[int, Record], PositionDeletes] = {}
self._by_path: dict[str, PositionDeletes] = {}
self._pos_by_partition: dict[tuple[int, Record], PositionDeletes] = {}
self._pos_by_path: dict[str, PositionDeletes] = {}
self._eq_by_partition: dict[tuple[int, Record], EqualityDeletes] = {}

def is_empty(self) -> bool:
return not self._by_partition and not self._by_path
return not self._pos_by_partition and not self._pos_by_path and not self._eq_by_partition

def add_delete_file(self, manifest_entry: ManifestEntry, partition_key: Record | None = None) -> None:
delete_file = manifest_entry.data_file
seq = manifest_entry.sequence_number or INITIAL_SEQUENCE_NUMBER
target_path = _referenced_data_file_path(delete_file)

if target_path:
deletes = self._by_path.setdefault(target_path, PositionDeletes())
deletes.add(delete_file, seq)
else:
if delete_file.content == DataFileContent.POSITION_DELETES:
target_path = _referenced_data_file_path(delete_file)
if target_path:
deletes = self._pos_by_path.setdefault(target_path, PositionDeletes())
deletes.add(delete_file, seq)
else:
key = _partition_key(delete_file.spec_id or 0, partition_key)
deletes = self._pos_by_partition.setdefault(key, PositionDeletes())
deletes.add(delete_file, seq)
elif delete_file.content == DataFileContent.EQUALITY_DELETES:
key = _partition_key(delete_file.spec_id or 0, partition_key)
deletes = self._by_partition.setdefault(key, PositionDeletes())
deletes = self._eq_by_partition.setdefault(key, EqualityDeletes())
deletes.add(delete_file, seq)

def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record | None = None) -> set[DataFile]:
Expand All @@ -131,27 +146,36 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record

deletes: set[DataFile] = set()
spec_id = data_file.spec_id or 0

key = _partition_key(spec_id, partition_key)
partition_deletes = self._by_partition.get(key)
if partition_deletes:
for delete_file in partition_deletes.filter_by_seq(seq_num):

# Add position deletes
partition_pos_deletes = self._pos_by_partition.get(key)
if partition_pos_deletes:
for delete_file in partition_pos_deletes.filter_by_seq(seq_num):
if _applies_to_data_file(delete_file, data_file):
deletes.add(delete_file)

path_deletes = self._by_path.get(data_file.file_path)
if path_deletes:
deletes.update(path_deletes.filter_by_seq(seq_num))
path_pos_deletes = self._pos_by_path.get(data_file.file_path)
if path_pos_deletes:
deletes.update(path_pos_deletes.filter_by_seq(seq_num))

# Add equality deletes
partition_eq_deletes = self._eq_by_partition.get(key)
if partition_eq_deletes:
deletes.update(partition_eq_deletes.filter_by_seq(seq_num))

return deletes

def referenced_delete_files(self) -> list[DataFile]:
data_files: list[DataFile] = []

for deletes in self._by_partition.values():
for deletes in self._pos_by_partition.values():
data_files.extend(deletes.referenced_delete_files())

for deletes in self._pos_by_path.values():
data_files.extend(deletes.referenced_delete_files())

for deletes in self._by_path.values():
for deletes in self._eq_by_partition.values():
data_files.extend(deletes.referenced_delete_files())

return data_files
2 changes: 2 additions & 0 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ def _validate_target_branch(self, branch: str | None) -> str | None:
return branch

def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
if data_file.content == DataFileContent.EQUALITY_DELETES:
raise NotImplementedError(f"PyIceberg does not support writing {data_file.content.name}")
self._added_data_files.append(data_file)
return self

Expand Down
Loading
Loading