[SPARK-57705][SQL] Read CSV, JSON, text, and XML files from zip archives#56784
[SPARK-57705][SQL] Read CSV, JSON, text, and XML files from zip archives#56784akshatshenoi-db wants to merge 1 commit into
Conversation
HyukjinKwon
left a comment
There was a problem hiding this comment.
0 blocking, 2 non-blocking, 0 nits.
A clean, well-documented generalization of the tar reader — the entry-streaming engine is hoisted verbatim into the base, tar behavior is unchanged, and zip is wired in with no per-data-source changes. Two non-blocking follow-ups, both on the one genuinely new surface: zip entries commons-compress can't stream.
Correctness (1)
- ArchiveReader.scala:138: advance loop doesn't check
canReadEntryData(entry)for non-streamable zip entries (data descriptor / unsupported method / encryption) — see inline
Suggestions (1)
- No test pins the documented "not streamable" zip behavior — add a STORED-with-data-descriptor fixture — see inline
| case c: Closeable => try c.close() catch { case NonFatal(_) => } | ||
| case _ => | ||
| } | ||
| var entry = archive.getNextEntry |
There was a problem hiding this comment.
The advance loop calls parseEntry on every non-skipped entry without consulting ArchiveInputStream.canReadEntryData(entry). For ZipArchiveInputStream that returns false for entries it can't stream — a STORED entry with a trailing data descriptor, an unsupported compression method, or encryption — i.e. exactly the "a few unusual zips … are not streamable this way" case the new Scaladoc documents.
Question: on such an entry, does the current path fail loudly with a clear error, or can it silently yield a truncated/garbled entry? If the latter is possible, consider guarding here:
if (entry != null && !archive.canReadEntryData(entry)) {
throw <clear error naming the unsupported zip feature>
}so the documented limitation surfaces deterministically rather than depending on ZipArchiveInputStream's default read behavior.
| } | ||
| override protected def openArchiveStream( | ||
| conf: Configuration): ArchiveInputStream[_ <: ArchiveEntry] = | ||
| new ZipArchiveInputStream(CodecStreams.createInputStreamWithCloseResource(conf, path)) |
There was a problem hiding this comment.
The "not streamable" limitation this ZipArchiveReader documents has no test pinning its behavior. ZipArchiveTestUtils.writeArchive uses ZipArchiveOutputStream (good — an independent producer, distinct from the reader) and there's a writeCorruptArchive negative case, but nothing exercises a valid-but-non-streamable zip. Consider adding a fixture (e.g. a STORED entry written with a data descriptor) that asserts the chosen behavior, so a future change can't silently turn the canReadEntryData gap above into corrupt rows.
|
Self-review (AI-assisted) — 0 blocking, 0 non-blocking, 0 nits. Reviewed the engine hoist + Test coverage mirrors the merged tar suites: |
cloud-fan
left a comment
There was a problem hiding this comment.
0 blocking, 1 non-blocking, 0 nits.
A clean generalization of the tar reader — the readEntries engine is hoisted verbatim into the base, tar behavior is preserved, and zip is a thin openArchiveStream subclass with full tar/zip test parity (independent-producer fixture writer).
Correctness (1)
- ArchiveReader.scala:296: Scaladoc names the wrong non-streamable zip case (deflated vs STORED) — see inline
Verification
Traced the ZipArchiveInputStream state space against commons-compress 1.28.0 source: empty / normal / directory / name-filtered / partial-read-then-advance entries are all handled, and every non-streamable case (STORED + data descriptor, encryption, unsupported compression) makes read() throw UnsupportedZipFeatureException (read L939-944) — i.e. it fails loudly, not as silent corruption. This confirms the documented limitation is safe at runtime and is what motivates the doc correction below.
Note: @HyukjinKwon's existing thread already covers a canReadEntryData guard (L138) and a missing non-streamable-zip test (L303); not duplicating those. On the guard question specifically — since read() already throws deterministically for every unstreamable entry, the guard would only produce an earlier/cleaner error, not fix a correctness gap; non-blocking is right.
| * container (the container itself is not gzip-wrapped), so `ZipArchiveInputStream` decompresses | ||
| * entries as they are streamed and no Hadoop codec layer is applied. The stream reads local file | ||
| * headers sequentially rather than the central directory, matching the tar reader's pure-streaming | ||
| * model: a few unusual zips (e.g. a deflated entry whose size is recorded only in a trailing data |
There was a problem hiding this comment.
The Scaladoc cites "a deflated entry whose size is recorded only in a trailing data descriptor" as not streamable, but per commons-compress 1.28.0 a DEFLATED entry with a data descriptor is streamable: supportsDataDescriptorFor and supportsCompressedSizeFor both return true when method == DEFLATED, and readDeflated detects end-of-stream via inf.finished() independent of the declared size, so canReadEntryData is true and read() doesn't throw.
The actually non-streamable case is a STORED entry with a data descriptor — supportsDataDescriptorFor returns false (STORED + data descriptor, with allowStoredEntriesWithDataDescriptor false by default), so read() throws UnsupportedZipFeatureException. Your own self-review says "STORED", and ZipArchiveTestUtils.writeArchive (writing through a non-seekable FileOutputStream, which forces data descriptors on DEFLATED entries) produces DEFLATED+data-descriptor entries that stream fine — which confirms the doc's example is the streamable case. Same wording is worth fixing in the PR description too.
| * model: a few unusual zips (e.g. a deflated entry whose size is recorded only in a trailing data | |
| * model: a few unusual zips (e.g. a stored entry whose size is recorded only in a trailing data |
What changes were proposed in this pull request?
Add zip (
.zip) support to the streaming archive reader (ArchiveReader), extending the existing tar support (.tar/.tar.gz/.tgz) and continuing the archive-reader series: SPARK-57135 (CSV read), SPARK-57321 (CSV inference), SPARK-57419 (JSON), SPARK-57478 (text), SPARK-57479 (XML), SPARK-57481 (Avro).The archive read/inference integration is format-agnostic -- every data source dispatches through
ArchiveReader.isArchivePathandArchiveReader(path).readEntries(...)-- so zip works for every data source already wired up (CSV, JSON, text, XML, Avro) with no per-data-source changes.The change is concentrated in
ArchiveReader. The entry-streaming engine (lazy one-entry-at-a-time advance, directory/dotfile/marker skipping with the sameignoredPathSegmentRegexfilter as a loose-file listing, close-shielded entry streams, eager-first-entry error surfacing, task-completion cleanup) is hoisted fromTarArchiveReaderinto the abstract base. SinceTarArchiveInputStreamandZipArchiveInputStreamboth extend commons-compressArchiveInputStream, the base steps entries viagetNextEntrydirectly and a subclass implements onlyopenArchiveStream(conf): ArchiveInputStream[_ <: ArchiveEntry]:TarArchiveReaderis reduced to opening aTarArchiveInputStream(keeping the.tgzexplicit gunzip, with a defensive close so a gzip-header failure can't leak the base stream); behavior is unchanged.ZipArchiveReaderopens aZipArchiveInputStream(zip entries are individually deflated, so no Hadoop codec layer is applied). It streams local file headers sequentially, matching the tar reader's pure-streaming model; a few unusual zips (e.g. a deflated entry whose size is recorded only in a trailing data descriptor) are not streamable this way.isArchivePath/applynow recognize and dispatch.zip(applymatches the tar and zip extensions explicitly and throws on anything else).spark.sql.files.archive.reader.enableddoc is simplified to describe the user-facing behavior without enumerating formats or implementation details. No new flag -- zip rides the existing (default-off) gate.Why are the changes needed?
The archive reader already supports CSV, JSON, text, XML, and Avro over tar. Zip is one of the most common archive containers for shipped data, and extending the same opt-in archive path to zip lets users read and infer schema from files packed in a
.zipwithout unpacking them first, with the same directory-read parity the rest of the series guarantees -- and, because the integration is format-agnostic, across every data source already wired up.Does this PR introduce any user-facing change?
Yes. When
spark.sql.files.archive.reader.enabledis set (defaultfalse), the CSV, JSON, text, XML, and Avro data sources can now read.ziparchives in addition to tar archives -- each archive is read as a single split and its entries are streamed through the data source's parser (never unpacked to disk), as if the entries were separate files, during both scan and schema inference. Previously only.tar/.tar.gz/.tgzwere recognized;.zipfiles were treated as ordinary (non-archive) files. With the flag at its default, behavior is unchanged.How was this patch tested?
ArchiveReaderSuite-- added.zipcases forisArchivePathdispatch andreadEntries(empty/single/multiple entries, directory and dotfile/marker skipping, lazy advance, non-closing entry stream), alongside the existing tar cases that guard the refactor.CSVHeaderZipArchiveReadSuite,CSVHeaderlessZipArchiveReadSuite,JSONZipArchiveReadSuite,XMLZipArchiveReadSuite,TextZipArchiveReadSuite, andAvroZipArchiveReadSuite. The text read tests were extracted into a container-agnosticTextArchiveReadBase(with the tar suite slimmed to match the CSV/JSON/XML base+container pattern), andZipArchiveReadBase/ZipArchiveTestUtilsare the zip peers ofTarArchiveReadBase/TarArchiveTestUtils.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code