[AURON #2217] Support Iceberg _spec_id metadata in native scan#2218
[AURON #2217] Support Iceberg _spec_id metadata in native scan#2218weimingdiit wants to merge 1 commit intoapache:masterfrom
Conversation
Signed-off-by: weimingdiit <weimingdiit@gmail.com>
There was a problem hiding this comment.
Pull request overview
Adds native Iceberg scan support for projecting the _spec_id metadata column (a file-level constant), avoiding Spark fallback when _spec_id is requested.
Changes:
- Allow
_spec_idas a supported Iceberg metadata column in native-scan validation. - Materialize
_spec_idper file fromFileScanTask.file().specId()via native partition values. - Add integration tests for
_spec_idprojection and mixed projections with_file.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala | Marks _spec_id as a supported metadata column so native planning doesn’t fallback. |
| thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala | Builds per-file _spec_id mapping and emits it as a constant partition value alongside _file. |
| thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala | Adds integration coverage for native projection of _spec_id (alone and with _file). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| case name if name == MetadataColumns.SPEC_ID.name() => | ||
| NativeConverters | ||
| .convertExpr(Literal.create(fileSpecIds(filePath), IntegerType)) | ||
| .getLiteral |
There was a problem hiding this comment.
fileSpecIds(filePath) uses Map.apply, which will throw a NoSuchElementException with little context if the key isn’t present (e.g., due to path normalization differences between Iceberg DataFile.location() and Spark PartitionedFile.filePath.toString). Consider using getOrElse and throwing an IllegalStateException with a clear message, and/or normalizing the key the same way in both buildFileSpecIds and metadataPartitionValues to guarantee consistent lookups.
| fileTasks | ||
| .map(task => task.file().location() -> task.file().specId()) | ||
| .groupBy(_._1) | ||
| .mapValues(_.head._2) | ||
| .toMap |
There was a problem hiding this comment.
buildFileSpecIds currently does map(...).groupBy(...).mapValues(_.head...), which allocates a full grouped collection of all tasks. Since duplicates are expected only because a single file can be split into multiple ranges, this can be built in a single pass (and optionally validated for consistency when the same path appears multiple times) to reduce memory and CPU overhead on large scans.
Which issue does this PR close?
Closes #2217
Rationale for this change
Native Iceberg scan currently supports projecting the
_filemetadata column, but falls back when_spec_idis requested._spec_idis a file-level Iceberg metadata column and can be materialized as a per-file constant value in the native scan, similar to_file.What changes are included in this PR?
This PR adds native scan support for the Iceberg
_spec_idmetadata column._spec_idin Iceberg native scan metadata column validation._spec_idfromFileScanTask.file().specId()as a per-file partition value._spec_idprojection and mixed data/metadata projection with_fileand_spec_id.Are there any user-facing changes?
Yes. Queries that project Iceberg
_spec_idcan now use the native Iceberg scan path instead of falling back to Spark.How was this patch tested?
CI.