-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-57705][SQL] Read CSV, JSON, text, and XML files from zip archives #56784
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.datasources | ||
|
|
||
| /** | ||
| * Reads of Avro files packed in zip archives (`.zip`): the shared archive tests from | ||
| * [[ArchiveReadSuiteBase]] plus the Avro-specific ones from [[AvroArchiveReadBase]], run over zip | ||
| * containers via [[ZipArchiveReadBase]]. | ||
| */ | ||
| class AvroZipArchiveReadSuite | ||
| extends ArchiveReadSuiteBase | ||
| with AvroArchiveReadBase | ||
| with ZipArchiveReadBase |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,9 @@ import java.util.zip.GZIPInputStream | |
|
|
||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream} | ||
| import org.apache.commons.compress.archivers.{ArchiveEntry, ArchiveInputStream} | ||
| import org.apache.commons.compress.archivers.tar.TarArchiveInputStream | ||
| import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream | ||
| import org.apache.commons.io.ByteOrderMark | ||
| import org.apache.commons.io.input.{BOMInputStream, CloseShieldInputStream} | ||
| import org.apache.hadoop.conf.Configuration | ||
|
|
@@ -48,12 +50,49 @@ import org.apache.spark.util.HadoopFSUtils | |
| * `parseEntry` that turns one entry stream into rows (or tokens). Formats that need random access | ||
| * within a file (e.g. Parquet/ORC footers) cannot use this streaming path. | ||
| * | ||
| * A concrete subclass implements [[readEntries]] for a specific archive format. Obtain the reader | ||
| * for a path via `ArchiveReader(path)`, which selects the implementation by file extension; new | ||
| * archive formats are added by writing another subclass rather than modifying existing ones. | ||
| * The entry-streaming engine ([[readEntries]]) is shared across archive formats -- tar and zip | ||
| * differ only in how the container stream is opened, and both use commons-compress | ||
| * `ArchiveInputStream`. A concrete subclass implements only [[openArchiveStream]]. Obtain the | ||
| * reader for a path via `ArchiveReader(path)`, which selects the implementation by file | ||
| * extension; new archive formats are added by writing another subclass rather than modifying the | ||
| * shared engine. | ||
| */ | ||
| abstract class ArchiveReader(path: Path) { | ||
|
|
||
| /** | ||
| * Opens the archive at `path` as a commons-compress stream, transparently handling its | ||
| * compression. The shared [[readEntries]] engine steps through entries via `getNextEntry`; a | ||
| * subclass only chooses the container type (e.g. tar vs zip). | ||
| */ | ||
| protected def openArchiveStream(conf: Configuration): ArchiveInputStream[_ <: ArchiveEntry] | ||
|
|
||
| /** | ||
| * Whether an entry is not a real data file and must be skipped: a directory, or a name Spark's | ||
| * own file listing would filter out. Applying [[HadoopFSUtils.shouldFilterOutPathName]] (the | ||
| * `InMemoryFileIndex` filter) with the same effective `ignoredPathSegmentRegex` to every path | ||
| * component keeps archive reads in parity with reading the same entries as loose files, | ||
| * including when the user supplies a custom `ignoredPathSegmentRegex` option: under the default | ||
| * filter, `.`-prefixed sidecars (macOS `._x`, `.DS_Store`), `_`-prefixed markers (`_SUCCESS`, | ||
| * `_committed_*`), and anything under a `.`/`_`-prefixed directory (e.g. a leftover | ||
| * `_temporary/` from a failed write) are skipped, while data files are kept. The per-component | ||
| * check matters because `InMemoryFileIndex` never recurses into such directories, so a | ||
| * basename-only filter would read `_temporary/part-0.csv` that a loose-file scan drops. | ||
| */ | ||
| private def shouldSkipEntry(entry: ArchiveEntry, ignoredPathSegmentRegex: Pattern): Boolean = { | ||
| if (entry.isDirectory) return true | ||
| entry.getName.split("/").exists(c => | ||
| c.nonEmpty && HadoopFSUtils.shouldFilterOutPathName(c, ignoredPathSegmentRegex)) | ||
| } | ||
|
|
||
| /** | ||
| * Wraps the shared archive stream as a view over exactly the current entry's bytes | ||
| * (`ArchiveInputStream.read` returns -1 at the entry boundary). [[CloseShieldInputStream]] | ||
| * ignores `close()`, so a parser closing its input does not close the underlying archive; any | ||
| * unread remainder of an entry is skipped by `getNextEntry()` when advancing. | ||
| */ | ||
| private def entryStream(archive: ArchiveInputStream[_ <: ArchiveEntry]): InputStream = | ||
| CloseShieldInputStream.wrap(archive) | ||
|
|
||
| /** | ||
| * Streams the archive entry by entry, applying `parseEntry` to each non-skipped entry's | ||
| * `(name, stream)` and concatenating the results into a single iterator. The next entry is opened | ||
|
|
@@ -67,25 +106,109 @@ abstract class ArchiveReader(path: Path) { | |
| def readEntries[T]( | ||
| conf: Configuration, | ||
| ignoredPathSegmentRegex: Pattern = HadoopFSUtils.defaultIgnoredPathSegmentRegexPattern)( | ||
| parseEntry: (String, InputStream) => Iterator[T]): Iterator[T] | ||
| parseEntry: (String, InputStream) => Iterator[T]): Iterator[T] = { | ||
| val archive = openArchiveStream(conf) | ||
| var closed = false | ||
|
|
||
| def cleanup(): Unit = { | ||
| if (!closed) { | ||
| closed = true | ||
| try archive.close() catch { case NonFatal(_) => } | ||
| } | ||
| } | ||
|
|
||
| Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => cleanup())) | ||
|
|
||
| val entries = new Iterator[T] with Closeable { | ||
| private var currentIter: Iterator[T] = Iterator.empty | ||
| private var done = false | ||
|
|
||
| // Move to the next entry whose iterator has elements (releasing each exhausted entry's | ||
| // reader and skipping any unread bytes), or mark the stream done once entries run out. | ||
| // Advancing here -- driven by `hasNext` -- rather than eagerly after producing a row in | ||
| // `next` is essential for parsers that reuse a single mutable row and look ahead on | ||
| // `hasNext`: probing the current entry right after returning a row would overwrite that row's | ||
| // contents before the caller has copied it. | ||
| private def advance(): Unit = { | ||
| while (!done && !currentIter.hasNext) { | ||
| currentIter match { | ||
| case c: Closeable => try c.close() catch { case NonFatal(_) => } | ||
| case _ => | ||
| } | ||
| var entry = archive.getNextEntry | ||
| while (entry != null && shouldSkipEntry(entry, ignoredPathSegmentRegex)) { | ||
| entry = archive.getNextEntry | ||
| } | ||
| if (entry == null) { | ||
| done = true | ||
| cleanup() | ||
| } else { | ||
| currentIter = parseEntry(entry.getName, entryStream(archive)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| override def hasNext: Boolean = { | ||
| advance() | ||
| !done && currentIter.hasNext | ||
| } | ||
|
|
||
| override def next(): T = { | ||
| if (!hasNext) throw new NoSuchElementException | ||
| currentIter.next() | ||
| } | ||
|
|
||
| override def close(): Unit = { | ||
| done = true | ||
| currentIter = Iterator.empty | ||
| cleanup() | ||
| } | ||
| } | ||
|
|
||
| // Open the first entry eagerly so the construction cost (and any failure) surfaces here rather | ||
| // than at the first `hasNext`. A corrupt archive throws before the caller ever holds the | ||
| // iterator, leaving it nothing to close: executors release the stream through the task- | ||
| // completion listener, but driver-side callers (e.g. Avro's header-only schema inference) have | ||
| // no task, so close it here before propagating. | ||
| try { | ||
| entries.hasNext | ||
| } catch { | ||
| case NonFatal(e) => | ||
| cleanup() | ||
| throw e | ||
| } | ||
| entries | ||
| } | ||
| } | ||
|
|
||
| object ArchiveReader { | ||
|
|
||
| /** | ||
| * Whether `path` names an archive this reader can stream. Dispatched purely on the file | ||
| * extension -- `.tar`, `.tar.gz`, or `.tgz` -- since the bytes are not inspected here. | ||
| * extension -- `.tar`, `.tar.gz`, `.tgz`, or `.zip` -- since the bytes are not inspected here. | ||
| */ | ||
| def isArchivePath(path: Path): Boolean = { | ||
| val name = path.getName.toLowerCase(Locale.ROOT) | ||
| name.endsWith(".tar") || name.endsWith(".tar.gz") || name.endsWith(".tgz") | ||
| name.endsWith(".tar") || name.endsWith(".tar.gz") || name.endsWith(".tgz") || | ||
| name.endsWith(".zip") | ||
| } | ||
|
|
||
| /** | ||
| * Returns the [[ArchiveReader]] implementation for `path`, selected by its file extension. Only | ||
| * paths for which [[isArchivePath]] is true are supported; new archive formats add a case here. | ||
| */ | ||
| def apply(path: Path): ArchiveReader = new TarArchiveReader(path) | ||
| def apply(path: Path): ArchiveReader = { | ||
| val name = path.getName.toLowerCase(Locale.ROOT) | ||
| name match { | ||
| case n if n.endsWith(".tar") || n.endsWith(".tar.gz") || n.endsWith(".tgz") => | ||
| new TarArchiveReader(path) | ||
| case n if n.endsWith(".zip") => | ||
| new ZipArchiveReader(path) | ||
| case _ => | ||
| throw new IllegalArgumentException( | ||
| s"$path is not a supported archive (expected .tar, .tar.gz, .tgz, or .zip)") | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Splits one already-decompressed archive entry's bytes into lines. The reusable, format-agnostic | ||
|
|
@@ -149,26 +272,8 @@ class TarArchiveReader(path: Path) extends ArchiveReader(path) { | |
| private def needsExplicitGunzip: Boolean = | ||
| path.getName.toLowerCase(Locale.ROOT).endsWith(".tgz") | ||
|
|
||
| /** | ||
| * Whether an entry is not a real data file and must be skipped: a directory, or a name Spark's | ||
| * own file listing would filter out. Applying [[HadoopFSUtils.shouldFilterOutPathName]] (the | ||
| * `InMemoryFileIndex` filter) with the same effective `ignoredPathSegmentRegex` to every path | ||
| * component keeps archive reads in parity with reading the same entries as loose files, | ||
| * including when the user supplies a custom `ignoredPathSegmentRegex` option: under the default | ||
| * filter, `.`-prefixed sidecars (macOS `._x`, `.DS_Store`), `_`-prefixed markers (`_SUCCESS`, | ||
| * `_committed_*`), and anything under a `.`/`_`-prefixed directory (e.g. a leftover | ||
| * `_temporary/` from a failed write) are skipped, while data files are kept. The per-component | ||
| * check matters because `InMemoryFileIndex` never recurses into such directories, so a | ||
| * basename-only filter would read `_temporary/part-0.csv` that a loose-file scan drops. | ||
| */ | ||
| private def shouldSkipEntry(entry: TarArchiveEntry, ignoredPathSegmentRegex: Pattern): Boolean = { | ||
| if (entry.isDirectory) return true | ||
| entry.getName.split("/").exists(c => | ||
| c.nonEmpty && HadoopFSUtils.shouldFilterOutPathName(c, ignoredPathSegmentRegex)) | ||
| } | ||
|
|
||
| /** Opens the archive as a tar stream, transparently decompressing `.tar.gz` / `.tgz`. */ | ||
| private def openTarStream(conf: Configuration): TarArchiveInputStream = { | ||
| override protected def openArchiveStream( | ||
| conf: Configuration): ArchiveInputStream[_ <: ArchiveEntry] = { | ||
| val base = CodecStreams.createInputStreamWithCloseResource(conf, path) | ||
| try { | ||
| // GZIPInputStream reads the gzip header in its constructor, so a corrupt archive can throw | ||
|
|
@@ -181,90 +286,19 @@ class TarArchiveReader(path: Path) extends ArchiveReader(path) { | |
| throw e | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Wraps the shared tar stream as a view over exactly the current entry's bytes | ||
| * (`TarArchiveInputStream.read` returns -1 at the entry boundary). [[CloseShieldInputStream]] | ||
| * ignores `close()`, so a parser closing its input does not close the underlying archive; any | ||
| * unread remainder of an entry is skipped by `getNextEntry()` when advancing. | ||
| */ | ||
| private def entryStream(tar: TarArchiveInputStream): InputStream = | ||
| CloseShieldInputStream.wrap(tar) | ||
|
|
||
| override def readEntries[T]( | ||
| conf: Configuration, | ||
| ignoredPathSegmentRegex: Pattern)( | ||
| parseEntry: (String, InputStream) => Iterator[T]): Iterator[T] = { | ||
| val tar = openTarStream(conf) | ||
| var closed = false | ||
|
|
||
| def cleanup(): Unit = { | ||
| if (!closed) { | ||
| closed = true | ||
| try tar.close() catch { case NonFatal(_) => } | ||
| } | ||
| } | ||
|
|
||
| Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => cleanup())) | ||
|
|
||
| val entries = new Iterator[T] with Closeable { | ||
| private var currentIter: Iterator[T] = Iterator.empty | ||
| private var done = false | ||
|
|
||
| // Move to the next entry whose iterator has elements (releasing each exhausted entry's | ||
| // reader and skipping any unread bytes), or mark the stream done once entries run out. | ||
| // Advancing here -- driven by `hasNext` -- rather than eagerly after producing a row in | ||
| // `next` is essential for parsers that reuse a single mutable row and look ahead on | ||
| // `hasNext`: probing the current entry right after returning a row would overwrite that row's | ||
| // contents before the caller has copied it. | ||
| private def advance(): Unit = { | ||
| while (!done && !currentIter.hasNext) { | ||
| currentIter match { | ||
| case c: Closeable => try c.close() catch { case NonFatal(_) => } | ||
| case _ => | ||
| } | ||
| var entry = tar.getNextEntry | ||
| while (entry != null && shouldSkipEntry(entry, ignoredPathSegmentRegex)) { | ||
| entry = tar.getNextEntry | ||
| } | ||
| if (entry == null) { | ||
| done = true | ||
| cleanup() | ||
| } else { | ||
| currentIter = parseEntry(entry.getName, entryStream(tar)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| override def hasNext: Boolean = { | ||
| advance() | ||
| !done && currentIter.hasNext | ||
| } | ||
|
|
||
| override def next(): T = { | ||
| if (!hasNext) throw new NoSuchElementException | ||
| currentIter.next() | ||
| } | ||
|
|
||
| override def close(): Unit = { | ||
| done = true | ||
| currentIter = Iterator.empty | ||
| cleanup() | ||
| } | ||
| } | ||
| /** | ||
| * [[ArchiveReader]] for zip archives (`.zip`). Each entry is individually compressed inside the | ||
| * container (the container itself is not gzip-wrapped), so `ZipArchiveInputStream` decompresses | ||
| * entries as they are streamed and no Hadoop codec layer is applied. The stream reads local file | ||
| * headers sequentially rather than the central directory, matching the tar reader's pure-streaming | ||
| * model: a few unusual zips (e.g. a stored entry whose size is recorded only in a trailing data | ||
| * descriptor) are not streamable this way. | ||
| */ | ||
| class ZipArchiveReader(path: Path) extends ArchiveReader(path) { | ||
|
|
||
| // Open the first entry eagerly so the construction cost (and any failure) surfaces here rather | ||
| // than at the first `hasNext`. A corrupt archive throws before the caller ever holds the | ||
| // iterator, leaving it nothing to close: executors release the stream through the task- | ||
| // completion listener, but driver-side callers (e.g. Avro's header-only schema inference) have | ||
| // no task, so close it here before propagating. | ||
| try { | ||
| entries.hasNext | ||
| } catch { | ||
| case NonFatal(e) => | ||
| cleanup() | ||
| throw e | ||
| } | ||
| entries | ||
| } | ||
| override protected def openArchiveStream( | ||
| conf: Configuration): ArchiveInputStream[_ <: ArchiveEntry] = | ||
| new ZipArchiveInputStream(CodecStreams.createInputStreamWithCloseResource(conf, path)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The "not streamable" limitation this |
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The advance loop calls
parseEntryon every non-skipped entry without consultingArchiveInputStream.canReadEntryData(entry). ForZipArchiveInputStreamthat returns false for entries it can't stream — a STORED entry with a trailing data descriptor, an unsupported compression method, or encryption — i.e. exactly the "a few unusual zips … are not streamable this way" case the new Scaladoc documents.Question: on such an entry, does the current path fail loudly with a clear error, or can it silently yield a truncated/garbled entry? If the latter is possible, consider guarding here:
so the documented limitation surfaces deterministically rather than depending on
ZipArchiveInputStream's default read behavior.