diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/execution/datasources/AvroZipArchiveReadSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/execution/datasources/AvroZipArchiveReadSuite.scala new file mode 100644 index 000000000000..24281fb6637f --- /dev/null +++ b/connector/avro/src/test/scala/org/apache/spark/sql/execution/datasources/AvroZipArchiveReadSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +/** + * Reads of Avro files packed in zip archives (`.zip`): the shared archive tests from + * [[ArchiveReadSuiteBase]] plus the Avro-specific ones from [[AvroArchiveReadBase]], run over zip + * containers via [[ZipArchiveReadBase]]. + */ +class AvroZipArchiveReadSuite + extends ArchiveReadSuiteBase + with AvroArchiveReadBase + with ZipArchiveReadBase diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ff2dd2dbd483..179ec5dda37d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2727,11 +2727,8 @@ object SQLConf { .createWithDefaultString("128MB") // parquet.block.size val ARCHIVE_FORMAT_READER_ENABLED = buildConf("spark.sql.files.archive.reader.enabled") - .doc("When true, a supported data source can read tar archives (.tar, .tar.gz, .tgz): " + - "each archive is read as a single split and its entries are streamed through that data " + - "source's parser (never unpacked to disk), as if the entries were separate files, both " + - "during scan and schema inference. The CSV, JSON, and text data sources support " + - "reading archives.") + .doc("When true, supported file-based data sources can read archive files, reading each " + + "archive's entries as if they were separate files, during both scan and schema inference.") .version("5.0.0") .withBindingPolicy(ConfigBindingPolicy.SESSION) .booleanConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala index 75f04f8f38c6..1c928fd19da8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala @@ -24,7 +24,9 @@ import java.util.zip.GZIPInputStream import scala.util.control.NonFatal -import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream} +import org.apache.commons.compress.archivers.{ArchiveEntry, ArchiveInputStream} +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream +import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream import org.apache.commons.io.ByteOrderMark import org.apache.commons.io.input.{BOMInputStream, CloseShieldInputStream} import org.apache.hadoop.conf.Configuration @@ -48,12 +50,49 @@ import org.apache.spark.util.HadoopFSUtils * `parseEntry` that turns one entry stream into rows (or tokens). Formats that need random access * within a file (e.g. Parquet/ORC footers) cannot use this streaming path. * - * A concrete subclass implements [[readEntries]] for a specific archive format. Obtain the reader - * for a path via `ArchiveReader(path)`, which selects the implementation by file extension; new - * archive formats are added by writing another subclass rather than modifying existing ones. + * The entry-streaming engine ([[readEntries]]) is shared across archive formats -- tar and zip + * differ only in how the container stream is opened, and both use commons-compress + * `ArchiveInputStream`. A concrete subclass implements only [[openArchiveStream]]. Obtain the + * reader for a path via `ArchiveReader(path)`, which selects the implementation by file + * extension; new archive formats are added by writing another subclass rather than modifying the + * shared engine. */ abstract class ArchiveReader(path: Path) { + /** + * Opens the archive at `path` as a commons-compress stream, transparently handling its + * compression. The shared [[readEntries]] engine steps through entries via `getNextEntry`; a + * subclass only chooses the container type (e.g. tar vs zip). + */ + protected def openArchiveStream(conf: Configuration): ArchiveInputStream[_ <: ArchiveEntry] + + /** + * Whether an entry is not a real data file and must be skipped: a directory, or a name Spark's + * own file listing would filter out. Applying [[HadoopFSUtils.shouldFilterOutPathName]] (the + * `InMemoryFileIndex` filter) with the same effective `ignoredPathSegmentRegex` to every path + * component keeps archive reads in parity with reading the same entries as loose files, + * including when the user supplies a custom `ignoredPathSegmentRegex` option: under the default + * filter, `.`-prefixed sidecars (macOS `._x`, `.DS_Store`), `_`-prefixed markers (`_SUCCESS`, + * `_committed_*`), and anything under a `.`/`_`-prefixed directory (e.g. a leftover + * `_temporary/` from a failed write) are skipped, while data files are kept. The per-component + * check matters because `InMemoryFileIndex` never recurses into such directories, so a + * basename-only filter would read `_temporary/part-0.csv` that a loose-file scan drops. + */ + private def shouldSkipEntry(entry: ArchiveEntry, ignoredPathSegmentRegex: Pattern): Boolean = { + if (entry.isDirectory) return true + entry.getName.split("/").exists(c => + c.nonEmpty && HadoopFSUtils.shouldFilterOutPathName(c, ignoredPathSegmentRegex)) + } + + /** + * Wraps the shared archive stream as a view over exactly the current entry's bytes + * (`ArchiveInputStream.read` returns -1 at the entry boundary). [[CloseShieldInputStream]] + * ignores `close()`, so a parser closing its input does not close the underlying archive; any + * unread remainder of an entry is skipped by `getNextEntry()` when advancing. + */ + private def entryStream(archive: ArchiveInputStream[_ <: ArchiveEntry]): InputStream = + CloseShieldInputStream.wrap(archive) + /** * Streams the archive entry by entry, applying `parseEntry` to each non-skipped entry's * `(name, stream)` and concatenating the results into a single iterator. The next entry is opened @@ -67,25 +106,109 @@ abstract class ArchiveReader(path: Path) { def readEntries[T]( conf: Configuration, ignoredPathSegmentRegex: Pattern = HadoopFSUtils.defaultIgnoredPathSegmentRegexPattern)( - parseEntry: (String, InputStream) => Iterator[T]): Iterator[T] + parseEntry: (String, InputStream) => Iterator[T]): Iterator[T] = { + val archive = openArchiveStream(conf) + var closed = false + + def cleanup(): Unit = { + if (!closed) { + closed = true + try archive.close() catch { case NonFatal(_) => } + } + } + + Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => cleanup())) + + val entries = new Iterator[T] with Closeable { + private var currentIter: Iterator[T] = Iterator.empty + private var done = false + + // Move to the next entry whose iterator has elements (releasing each exhausted entry's + // reader and skipping any unread bytes), or mark the stream done once entries run out. + // Advancing here -- driven by `hasNext` -- rather than eagerly after producing a row in + // `next` is essential for parsers that reuse a single mutable row and look ahead on + // `hasNext`: probing the current entry right after returning a row would overwrite that row's + // contents before the caller has copied it. + private def advance(): Unit = { + while (!done && !currentIter.hasNext) { + currentIter match { + case c: Closeable => try c.close() catch { case NonFatal(_) => } + case _ => + } + var entry = archive.getNextEntry + while (entry != null && shouldSkipEntry(entry, ignoredPathSegmentRegex)) { + entry = archive.getNextEntry + } + if (entry == null) { + done = true + cleanup() + } else { + currentIter = parseEntry(entry.getName, entryStream(archive)) + } + } + } + + override def hasNext: Boolean = { + advance() + !done && currentIter.hasNext + } + + override def next(): T = { + if (!hasNext) throw new NoSuchElementException + currentIter.next() + } + + override def close(): Unit = { + done = true + currentIter = Iterator.empty + cleanup() + } + } + + // Open the first entry eagerly so the construction cost (and any failure) surfaces here rather + // than at the first `hasNext`. A corrupt archive throws before the caller ever holds the + // iterator, leaving it nothing to close: executors release the stream through the task- + // completion listener, but driver-side callers (e.g. Avro's header-only schema inference) have + // no task, so close it here before propagating. + try { + entries.hasNext + } catch { + case NonFatal(e) => + cleanup() + throw e + } + entries + } } object ArchiveReader { /** * Whether `path` names an archive this reader can stream. Dispatched purely on the file - * extension -- `.tar`, `.tar.gz`, or `.tgz` -- since the bytes are not inspected here. + * extension -- `.tar`, `.tar.gz`, `.tgz`, or `.zip` -- since the bytes are not inspected here. */ def isArchivePath(path: Path): Boolean = { val name = path.getName.toLowerCase(Locale.ROOT) - name.endsWith(".tar") || name.endsWith(".tar.gz") || name.endsWith(".tgz") + name.endsWith(".tar") || name.endsWith(".tar.gz") || name.endsWith(".tgz") || + name.endsWith(".zip") } /** * Returns the [[ArchiveReader]] implementation for `path`, selected by its file extension. Only * paths for which [[isArchivePath]] is true are supported; new archive formats add a case here. */ - def apply(path: Path): ArchiveReader = new TarArchiveReader(path) + def apply(path: Path): ArchiveReader = { + val name = path.getName.toLowerCase(Locale.ROOT) + name match { + case n if n.endsWith(".tar") || n.endsWith(".tar.gz") || n.endsWith(".tgz") => + new TarArchiveReader(path) + case n if n.endsWith(".zip") => + new ZipArchiveReader(path) + case _ => + throw new IllegalArgumentException( + s"$path is not a supported archive (expected .tar, .tar.gz, .tgz, or .zip)") + } + } /** * Splits one already-decompressed archive entry's bytes into lines. The reusable, format-agnostic @@ -149,26 +272,8 @@ class TarArchiveReader(path: Path) extends ArchiveReader(path) { private def needsExplicitGunzip: Boolean = path.getName.toLowerCase(Locale.ROOT).endsWith(".tgz") - /** - * Whether an entry is not a real data file and must be skipped: a directory, or a name Spark's - * own file listing would filter out. Applying [[HadoopFSUtils.shouldFilterOutPathName]] (the - * `InMemoryFileIndex` filter) with the same effective `ignoredPathSegmentRegex` to every path - * component keeps archive reads in parity with reading the same entries as loose files, - * including when the user supplies a custom `ignoredPathSegmentRegex` option: under the default - * filter, `.`-prefixed sidecars (macOS `._x`, `.DS_Store`), `_`-prefixed markers (`_SUCCESS`, - * `_committed_*`), and anything under a `.`/`_`-prefixed directory (e.g. a leftover - * `_temporary/` from a failed write) are skipped, while data files are kept. The per-component - * check matters because `InMemoryFileIndex` never recurses into such directories, so a - * basename-only filter would read `_temporary/part-0.csv` that a loose-file scan drops. - */ - private def shouldSkipEntry(entry: TarArchiveEntry, ignoredPathSegmentRegex: Pattern): Boolean = { - if (entry.isDirectory) return true - entry.getName.split("/").exists(c => - c.nonEmpty && HadoopFSUtils.shouldFilterOutPathName(c, ignoredPathSegmentRegex)) - } - - /** Opens the archive as a tar stream, transparently decompressing `.tar.gz` / `.tgz`. */ - private def openTarStream(conf: Configuration): TarArchiveInputStream = { + override protected def openArchiveStream( + conf: Configuration): ArchiveInputStream[_ <: ArchiveEntry] = { val base = CodecStreams.createInputStreamWithCloseResource(conf, path) try { // GZIPInputStream reads the gzip header in its constructor, so a corrupt archive can throw @@ -181,90 +286,19 @@ class TarArchiveReader(path: Path) extends ArchiveReader(path) { throw e } } +} - /** - * Wraps the shared tar stream as a view over exactly the current entry's bytes - * (`TarArchiveInputStream.read` returns -1 at the entry boundary). [[CloseShieldInputStream]] - * ignores `close()`, so a parser closing its input does not close the underlying archive; any - * unread remainder of an entry is skipped by `getNextEntry()` when advancing. - */ - private def entryStream(tar: TarArchiveInputStream): InputStream = - CloseShieldInputStream.wrap(tar) - - override def readEntries[T]( - conf: Configuration, - ignoredPathSegmentRegex: Pattern)( - parseEntry: (String, InputStream) => Iterator[T]): Iterator[T] = { - val tar = openTarStream(conf) - var closed = false - - def cleanup(): Unit = { - if (!closed) { - closed = true - try tar.close() catch { case NonFatal(_) => } - } - } - - Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => cleanup())) - - val entries = new Iterator[T] with Closeable { - private var currentIter: Iterator[T] = Iterator.empty - private var done = false - - // Move to the next entry whose iterator has elements (releasing each exhausted entry's - // reader and skipping any unread bytes), or mark the stream done once entries run out. - // Advancing here -- driven by `hasNext` -- rather than eagerly after producing a row in - // `next` is essential for parsers that reuse a single mutable row and look ahead on - // `hasNext`: probing the current entry right after returning a row would overwrite that row's - // contents before the caller has copied it. - private def advance(): Unit = { - while (!done && !currentIter.hasNext) { - currentIter match { - case c: Closeable => try c.close() catch { case NonFatal(_) => } - case _ => - } - var entry = tar.getNextEntry - while (entry != null && shouldSkipEntry(entry, ignoredPathSegmentRegex)) { - entry = tar.getNextEntry - } - if (entry == null) { - done = true - cleanup() - } else { - currentIter = parseEntry(entry.getName, entryStream(tar)) - } - } - } - - override def hasNext: Boolean = { - advance() - !done && currentIter.hasNext - } - - override def next(): T = { - if (!hasNext) throw new NoSuchElementException - currentIter.next() - } - - override def close(): Unit = { - done = true - currentIter = Iterator.empty - cleanup() - } - } +/** + * [[ArchiveReader]] for zip archives (`.zip`). Each entry is individually compressed inside the + * container (the container itself is not gzip-wrapped), so `ZipArchiveInputStream` decompresses + * entries as they are streamed and no Hadoop codec layer is applied. The stream reads local file + * headers sequentially rather than the central directory, matching the tar reader's pure-streaming + * model: a few unusual zips (e.g. a stored entry whose size is recorded only in a trailing data + * descriptor) are not streamable this way. + */ +class ZipArchiveReader(path: Path) extends ArchiveReader(path) { - // Open the first entry eagerly so the construction cost (and any failure) surfaces here rather - // than at the first `hasNext`. A corrupt archive throws before the caller ever holds the - // iterator, leaving it nothing to close: executors release the stream through the task- - // completion listener, but driver-side callers (e.g. Avro's header-only schema inference) have - // no task, so close it here before propagating. - try { - entries.hasNext - } catch { - case NonFatal(e) => - cleanup() - throw e - } - entries - } + override protected def openArchiveStream( + conf: Configuration): ArchiveInputStream[_ <: ArchiveEntry] = + new ZipArchiveInputStream(CodecStreams.createInputStreamWithCloseResource(conf, path)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReaderSuite.scala index 2cbd1ddd9fd3..e49800ec0640 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReaderSuite.scala @@ -26,6 +26,7 @@ import java.util.zip.GZIPOutputStream import scala.collection.mutable.ArrayBuffer import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveOutputStream} +import org.apache.commons.compress.archivers.zip.{ZipArchiveEntry, ZipArchiveOutputStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -63,6 +64,64 @@ class ArchiveReaderSuite extends SparkFunSuite { } finally out.close() } + /** Write a zip archive, used to verify the `.zip` archive path. */ + private def writeZip(file: File, entries: Seq[Entry]): Unit = { + val out = new ZipArchiveOutputStream(new FileOutputStream(file)) + try { + entries.foreach { e => + // A trailing slash marks a directory entry. + val rawName = if (e.isDir && !e.name.endsWith("/")) e.name + "/" else e.name + val zipEntry = new ZipArchiveEntry(rawName) + if (!e.isDir) zipEntry.setSize(e.data.length.toLong) + out.putArchiveEntry(zipEntry) + if (!e.isDir) out.write(e.data) + out.closeArchiveEntry() + } + out.finish() + } finally out.close() + } + + /** + * Writes a zip with one STORED (uncompressed) entry that uses a data descriptor: general-purpose + * bit 3 is set and the local header's crc/size fields are zeroed, so the real values live only in + * the trailing data descriptor. `ZipArchiveInputStream` cannot stream such an entry -- it has no + * size to bound the read -- so `read` throws rather than yielding truncated bytes. This is the + * non-streamable case `ZipArchiveReader` documents; `ZipArchiveOutputStream` cannot produce it + * (it rejects an unsized STORED entry, or rewrites the header when the sink is seekable), so the + * bytes are assembled by hand. + */ + private def writeStoredEntryWithDataDescriptor(file: File, name: String, body: String): Unit = { + val nameBytes = name.getBytes(StandardCharsets.UTF_8) + val data = body.getBytes(StandardCharsets.UTF_8) + val crc = { val c = new java.util.zip.CRC32(); c.update(data); c.getValue } + val out = new ByteArrayOutputStream() + def u16(v: Int): Unit = { out.write(v & 0xFF); out.write((v >>> 8) & 0xFF) } + def u32(v: Long): Unit = { + out.write((v & 0xFF).toInt); out.write(((v >>> 8) & 0xFF).toInt) + out.write(((v >>> 16) & 0xFF).toInt); out.write(((v >>> 24) & 0xFF).toInt) + } + // Local file header: GP bit 3 set (data descriptor), STORED method, sizes zeroed here. + val localHeaderOffset = out.size() + u32(0x04034b50L); u16(10); u16(0x0008); u16(0); u16(0); u16(0) + u32(0); u32(0); u32(0) + u16(nameBytes.length); u16(0) + out.write(nameBytes); out.write(data) + // Data descriptor (with optional signature): the real crc and sizes. + u32(0x08074b50L); u32(crc); u32(data.length.toLong); u32(data.length.toLong) + // Central directory. + val cdOffset = out.size() + u32(0x02014b50L); u16(20); u16(10); u16(0x0008); u16(0); u16(0); u16(0) + u32(crc); u32(data.length.toLong); u32(data.length.toLong) + u16(nameBytes.length); u16(0); u16(0); u16(0); u16(0); u32(0); u32(localHeaderOffset.toLong) + out.write(nameBytes) + val cdSize = out.size() - cdOffset + // End of central directory. + u32(0x06054b50L); u16(0); u16(0); u16(1); u16(1) + u32(cdSize.toLong); u32(cdOffset.toLong); u16(0) + val fos = new FileOutputStream(file) + try fos.write(out.toByteArray) finally fos.close() + } + private def textEntry(name: String, body: String): Entry = Entry(name, body.getBytes(StandardCharsets.UTF_8)) @@ -89,15 +148,16 @@ class ArchiveReaderSuite extends SparkFunSuite { Seq( "foo.tar", "FOO.TAR", "/a/b/c/x.tar", "weird.TaR", "foo.tar.gz", "FOO.TAR.GZ", "mixed.Tar.Gz", "/a/b/c/x.tar.gz", - "foo.tgz", "FOO.TGZ", "/a/b/c/x.tgz" + "foo.tgz", "FOO.TGZ", "/a/b/c/x.tgz", + "data.zip", "FOO.ZIP", "weird.ZiP", "/a/b/c/x.zip" ).foreach { p => assert(ArchiveReader.isArchivePath(new Path(p)), s"expected archive match for $p") } } test("isArchivePath: negative cases") { - Seq("foo.csv", "foo.gz", "foo", "dir/", "foo.tarball", "data.zip", - "foo.tar.bz2", "foo.targz").foreach { p => + Seq("foo.csv", "foo.gz", "foo", "dir/", "foo.tarball", + "foo.tar.bz2", "foo.targz", "foo.zipx", "foo.gzip").foreach { p => assert(!ArchiveReader.isArchivePath(new Path(p)), s"expected non-match for $p") } } @@ -274,4 +334,109 @@ class ArchiveReaderSuite extends SparkFunSuite { } } } + + // ----- zip ---------------------------------------------------------------- + // The streaming engine is shared with tar (only stream-opening differs), so these cases focus on + // the `.zip` dispatch and the `ZipArchiveInputStream` container behaving like the tar path. + + test("readEntries: empty zip yields empty iterator") { + withTempDir { dir => + val zip = new File(dir, "empty.zip") + writeZip(zip, Seq.empty) + assert(collect(zip).isEmpty) + } + } + + test("readEntries: zip single entry exposes its name and bytes") { + withTempDir { dir => + val zip = new File(dir, "single.zip") + writeZip(zip, Seq(textEntry("only.csv", "hello\n"))) + assert(collect(zip) == Seq("only.csv" -> "hello\n")) + } + } + + test("readEntries: zip multiple entries chained in archive order") { + withTempDir { dir => + val zip = new File(dir, "multi.zip") + writeZip(zip, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"), textEntry("c.csv", "c"))) + assert(collect(zip) == Seq("a.csv" -> "a", "b.csv" -> "b", "c.csv" -> "c")) + } + } + + test("readEntries: zip directory entries are skipped") { + withTempDir { dir => + val zip = new File(dir, "dirs.zip") + writeZip(zip, Seq( + Entry("subdir", Array.emptyByteArray, isDir = true), + textEntry("subdir/data.csv", "x"))) + assert(collect(zip) == Seq("subdir/data.csv" -> "x")) + } + } + + test("readEntries: zip dotfile, underscore-marker, and prefixed-dir entries are skipped") { + withTempDir { dir => + val zip = new File(dir, "skipped.zip") + writeZip(zip, Seq( + textEntry("._real.csv", "junk"), // macOS AppleDouble sidecar + textEntry(".hidden", "ignored"), // bare dotfile + textEntry("_SUCCESS", "marker"), // _-prefixed marker (InMemoryFileIndex skips it) + textEntry("_temporary/part-0.csv", "tmp"), // entry under a _-prefixed dir (skipped whole) + textEntry("real.csv", "kept"), + textEntry("nested/._sidecar", "junk2"))) // dotfile in a subdir + assert(collect(zip) == Seq("real.csv" -> "kept")) + } + } + + test("readEntries: zip advances lazily, one entry at a time") { + withTempDir { dir => + val zip = new File(dir, "lazy.zip") + writeZip(zip, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"), textEntry("c.csv", "c"))) + + val opened = ArrayBuffer[String]() + val it = ArchiveReader(new Path(zip.toURI)).readEntries(new Configuration()) { (name, _) => + opened += name + Iterator.single(name) + } + // Construction opens only the first entry; advancing past each boundary opens the next. + assert(opened.toList == List("a.csv")) + assert(it.hasNext) + assert(it.next() == "a.csv") + assert(opened.toList == List("a.csv")) + assert(it.next() == "b.csv") + assert(opened.toList == List("a.csv", "b.csv")) + assert(it.next() == "c.csv") + assert(opened.toList == List("a.csv", "b.csv", "c.csv")) + assert(!it.hasNext) + assert(opened.size == 3) + } + } + + test("readEntries: a zip parseEntry that closes its stream still advances to the next entry") { + withTempDir { dir => + val zip = new File(dir, "close.zip") + writeZip(zip, Seq(textEntry("a.csv", "a"), textEntry("b.csv", "b"))) + + val seen = ArrayBuffer[String]() + val it = ArchiveReader(new Path(zip.toURI)).readEntries(new Configuration()) { (name, in) => + val body = new String(readAll(in), StandardCharsets.UTF_8) + in.close() // must NOT close the underlying archive + seen += body + Iterator.single(name) + } + assert(it.toList == List("a.csv", "b.csv")) + assert(seen.toList == List("a", "b")) + } + } + + test("readEntries: a non-streamable zip entry fails loudly rather than yielding garbled bytes") { + withTempDir { dir => + val zip = new File(dir, "stored-dd.zip") + writeStoredEntryWithDataDescriptor(zip, "a.csv", "hello") + // A stored entry sized only by a trailing data descriptor is the documented non-streamable + // case: ZipArchiveInputStream throws on read instead of returning truncated/garbled bytes. + val ex = intercept[java.io.IOException](collect(zip)) + assert(ex.getMessage != null && ex.getMessage.contains("data descriptor"), + s"expected a clear unsupported-feature error, got $ex") + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CSVHeaderZipArchiveReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CSVHeaderZipArchiveReadSuite.scala new file mode 100644 index 000000000000..eb3bf1d78596 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CSVHeaderZipArchiveReadSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +/** + * Reads of header-carrying CSV files packed in zip archives (`.zip`): the shared archive tests from + * [[ArchiveReadSuiteBase]] plus the header-mode CSV tests from [[CSVHeaderArchiveReadBase]], run + * over zip containers via [[ZipArchiveReadBase]]. + */ +class CSVHeaderZipArchiveReadSuite + extends ArchiveReadSuiteBase + with CSVHeaderArchiveReadBase + with ZipArchiveReadBase diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CSVHeaderlessZipArchiveReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CSVHeaderlessZipArchiveReadSuite.scala new file mode 100644 index 000000000000..b4bddb603725 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CSVHeaderlessZipArchiveReadSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +/** + * Reads of headerless CSV files packed in zip archives (`.zip`): the shared archive tests from + * [[ArchiveReadSuiteBase]] plus the headerless CSV tests from [[CSVHeaderlessArchiveReadBase]], run + * over zip containers via [[ZipArchiveReadBase]]. + */ +class CSVHeaderlessZipArchiveReadSuite + extends ArchiveReadSuiteBase + with CSVHeaderlessArchiveReadBase + with ZipArchiveReadBase diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/JSONZipArchiveReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/JSONZipArchiveReadSuite.scala new file mode 100644 index 000000000000..76cee32238b3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/JSONZipArchiveReadSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +/** + * Reads of JSON files packed in zip archives (`.zip`): the shared archive tests from + * [[ArchiveReadSuiteBase]] plus the JSON-specific ones from [[JSONArchiveReadBase]], run over zip + * containers via [[ZipArchiveReadBase]]. + */ +class JSONZipArchiveReadSuite + extends ArchiveReadSuiteBase + with JSONArchiveReadBase + with ZipArchiveReadBase diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TextArchiveReadBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TextArchiveReadBase.scala new file mode 100644 index 000000000000..e2506afa6b34 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TextArchiveReadBase.scala @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.nio.charset.StandardCharsets +import java.nio.file.Files + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils + +/** + * Reads of text files packed in archives, streamed through the [[ArchiveReader]] path. Entries are + * streamed (never unpacked to disk), and the central contract verified throughout is parity with + * reading the same files from a directory. + * + * Unlike CSV/JSON this does not extend [[ArchiveReadSuiteBase]]: the text data source has a single + * fixed `value` column (one row per line, or per entry with `wholetext`) and no schema inference, + * so the structured, two-column tests there do not apply. The container-writing hooks below are + * supplied by a container trait the concrete suite mixes in ([[TarArchiveTestUtils]] / + * [[ZipArchiveTestUtils]]); `corruptArchiveExtension` is supplied by the concrete suite. + */ +trait TextArchiveReadBase extends QueryTest with SharedSparkSession { + + /** Archive extensions to exercise; the head is the default. Supplied by the container trait. */ + protected def archiveExtensions: Seq[String] + + /** Writes `entries` (name -> bytes) into the archive at `dest`. From the container trait. */ + protected def writeArchive(dest: File, entries: Seq[(String, Array[Byte])]): Unit + + /** Writes bytes that are not a readable archive at `dest`. From the container trait. */ + protected def writeCorruptArchive(dest: File): Unit + + /** Extension of the archive [[writeCorruptArchive]] produces (corruption is format-specific). */ + protected def corruptArchiveExtension: String + + override def sparkConf: SparkConf = + super.sparkConf.set(SQLConf.ARCHIVE_FORMAT_READER_ENABLED.key, "true") + + private def textBytes(s: String): Array[Byte] = s.getBytes(StandardCharsets.UTF_8) + + /** Provides an archive-extensioned path inside a fresh temp dir to `f`. */ + private def withArchiveFile( + extension: String = archiveExtensions.head)(f: File => Unit): Unit = { + val dir = Utils.createTempDir(namePrefix = "archive-test") + try f(new File(dir, s"archive.$extension")) finally Utils.deleteRecursively(dir) + } + + private def read(path: String, options: Map[String, String] = Map.empty): DataFrame = + spark.read.options(options).text(path) + + test("read an archive of multiple text entries matches the union of the lines") { + archiveExtensions.foreach { ext => + withArchiveFile(ext) { archive => + writeArchive(archive, Seq( + "a.txt" -> textBytes("line1\nline2\n"), + "b.txt" -> textBytes("line3\n"), + "c.txt" -> textBytes("line4\nline5\n"))) + checkAnswer( + read(archive.getCanonicalPath), + Seq("line1", "line2", "line3", "line4", "line5").map(Row(_))) + } + } + } + + test("archive entries read like a directory of the same files") { + val entries = Seq("a.txt" -> textBytes("a1\na2\n"), "b.txt" -> textBytes("b1\n")) + withArchiveFile() { archive => + writeArchive(archive, entries) + val fromArchive = read(archive.getCanonicalPath) + withTempDir { dir => + entries.foreach { case (n, b) => Files.write(new File(dir, n).toPath, b) } + checkAnswer(fromArchive, read(dir.getCanonicalPath).collect().toSeq) + } + } + } + + test("an empty archive yields no rows") { + withArchiveFile() { archive => + writeArchive(archive, Seq.empty) + checkAnswer(read(archive.getCanonicalPath), Seq.empty[Row]) + } + } + + test("an archive and loose text files in the same directory are all read") { + withTempDir { dir => + val ext = archiveExtensions.head + writeArchive( + new File(dir, s"data.$ext"), + Seq("a.txt" -> textBytes("in-archive-1\nin-archive-2\n"))) + Files.write(new File(dir, "loose.txt").toPath, textBytes("loose-1\n")) + checkAnswer( + read(dir.getCanonicalPath), + Seq("in-archive-1", "in-archive-2", "loose-1").map(Row(_))) + } + } + + test("wholetext reads each entry as a single row") { + withArchiveFile() { archive => + writeArchive(archive, Seq( + "a.txt" -> textBytes("l1\nl2"), + "b.txt" -> textBytes("only"))) + checkAnswer( + read(archive.getCanonicalPath, Map("wholetext" -> "true")), + Seq(Row("l1\nl2"), Row("only"))) + } + } + + test("a custom line separator splits entries into rows") { + withArchiveFile() { archive => + writeArchive(archive, Seq("a.txt" -> textBytes("x;y;z"))) + checkAnswer( + read(archive.getCanonicalPath, Map("lineSep" -> ";")), + Seq(Row("x"), Row("y"), Row("z"))) + } + } + + test("count over an archive reads the right number of rows with an empty required schema") { + withArchiveFile() { archive => + writeArchive(archive, Seq( + "a.txt" -> textBytes("1\n2\n3\n"), + "b.txt" -> textBytes("4\n"))) + assert(read(archive.getCanonicalPath).count() == 4L) + } + } + + test("an archive always yields a single partition regardless of size") { + withArchiveFile() { archive => + val big = (1 to 1000).map(i => s"value-$i").mkString("\n") + writeArchive(archive, (0 until 4).map(i => s"part-$i.txt" -> textBytes(big + "\n"))) + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "1024") { + val readDf = read(archive.getCanonicalPath) + assert(readDf.rdd.getNumPartitions == 1, + s"archive should be a single partition; got ${readDf.rdd.getNumPartitions}") + assert(readDf.count() == 4000L) + } + } + } + + Seq(true, false).foreach { ignoreCorrupt => + test(s"ignoreCorruptFiles=$ignoreCorrupt controls whether a corrupt archive is skipped") { + withArchiveFile(corruptArchiveExtension) { archive => + writeCorruptArchive(archive) + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorrupt.toString) { + if (ignoreCorrupt) { + checkAnswer(read(archive.getCanonicalPath), Seq.empty[Row]) + } else { + intercept[SparkException](read(archive.getCanonicalPath).collect()) + } + } + } + } + } + + Seq(true, false).foreach { ignoreMissing => + test(s"ignoreMissingFiles=$ignoreMissing controls whether a missing archive is skipped") { + withArchiveFile() { archive => + writeArchive(archive, Seq("a.txt" -> textBytes("line1\nline2\n"))) + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> ignoreMissing.toString) { + // The archive is listed when the DataFrame is built, then deleted before the scan opens + // it, so the reader hits a missing file -- handled by `FileScanRDD`, like any file. + val df = read(archive.getCanonicalPath) + assert(archive.delete(), s"failed to delete $archive") + if (ignoreMissing) { + checkAnswer(df, Seq.empty[Row]) + } else { + intercept[SparkException](df.collect()) + } + } + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TextTarArchiveReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TextTarArchiveReadSuite.scala index 0163d03a168a..1764bfdefcfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TextTarArchiveReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TextTarArchiveReadSuite.scala @@ -17,162 +17,11 @@ package org.apache.spark.sql.execution.datasources -import java.io.File -import java.nio.charset.StandardCharsets -import java.nio.file.Files - -import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.sql.{DataFrame, QueryTest, Row} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.util.Utils - /** - * Reads of text files packed in tar archives (`.tar`/`.tar.gz`/`.tgz`), streamed through the - * [[ArchiveReader]] path. Entries are streamed (never unpacked to disk), and the central contract - * verified throughout is parity with reading the same files from a directory. - * - * Unlike CSV/JSON this does not reuse [[ArchiveReadSuiteBase]]: the text data source has a single - * fixed `value` column (one row per line, or per entry with `wholetext`) and no schema inference, - * so the structured, two-column tests there do not apply. The tar container-writing helpers come - * from [[TarArchiveTestUtils]] (shared with [[TarArchiveReadBase]]). + * Reads of text files packed in tar archives (`.tar`/`.tar.gz`/`.tgz`): the shared text archive + * tests from [[TextArchiveReadBase]], run over tar containers via [[TarArchiveTestUtils]]. */ -class TextTarArchiveReadSuite extends QueryTest with SharedSparkSession with TarArchiveTestUtils { - - override def sparkConf: SparkConf = - super.sparkConf.set(SQLConf.ARCHIVE_FORMAT_READER_ENABLED.key, "true") - - private def textBytes(s: String): Array[Byte] = s.getBytes(StandardCharsets.UTF_8) - - /** Provides an archive-extensioned path inside a fresh temp dir to `f`. */ - private def withArchiveFile( - extension: String = archiveExtensions.head)(f: File => Unit): Unit = { - val dir = Utils.createTempDir(namePrefix = "archive-test") - try f(new File(dir, s"archive.$extension")) finally Utils.deleteRecursively(dir) - } - - private def read(path: String, options: Map[String, String] = Map.empty): DataFrame = - spark.read.options(options).text(path) - - test("read a tar archive of multiple text entries matches the union of the lines") { - archiveExtensions.foreach { ext => - withArchiveFile(ext) { archive => - writeArchive(archive, Seq( - "a.txt" -> textBytes("line1\nline2\n"), - "b.txt" -> textBytes("line3\n"), - "c.txt" -> textBytes("line4\nline5\n"))) - checkAnswer( - read(archive.getCanonicalPath), - Seq("line1", "line2", "line3", "line4", "line5").map(Row(_))) - } - } - } - - test("archive entries read like a directory of the same files") { - val entries = Seq("a.txt" -> textBytes("a1\na2\n"), "b.txt" -> textBytes("b1\n")) - withArchiveFile() { archive => - writeArchive(archive, entries) - val fromArchive = read(archive.getCanonicalPath) - withTempDir { dir => - entries.foreach { case (n, b) => Files.write(new File(dir, n).toPath, b) } - checkAnswer(fromArchive, read(dir.getCanonicalPath).collect().toSeq) - } - } - } - - test("an empty archive yields no rows") { - withArchiveFile() { archive => - writeArchive(archive, Seq.empty) - checkAnswer(read(archive.getCanonicalPath), Seq.empty[Row]) - } - } - - test("an archive and loose text files in the same directory are all read") { - withTempDir { dir => - val ext = archiveExtensions.head - writeArchive( - new File(dir, s"data.$ext"), - Seq("a.txt" -> textBytes("in-archive-1\nin-archive-2\n"))) - Files.write(new File(dir, "loose.txt").toPath, textBytes("loose-1\n")) - checkAnswer( - read(dir.getCanonicalPath), - Seq("in-archive-1", "in-archive-2", "loose-1").map(Row(_))) - } - } - - test("wholetext reads each entry as a single row") { - withArchiveFile() { archive => - writeArchive(archive, Seq( - "a.txt" -> textBytes("l1\nl2"), - "b.txt" -> textBytes("only"))) - checkAnswer( - read(archive.getCanonicalPath, Map("wholetext" -> "true")), - Seq(Row("l1\nl2"), Row("only"))) - } - } - - test("a custom line separator splits entries into rows") { - withArchiveFile() { archive => - writeArchive(archive, Seq("a.txt" -> textBytes("x;y;z"))) - checkAnswer( - read(archive.getCanonicalPath, Map("lineSep" -> ";")), - Seq(Row("x"), Row("y"), Row("z"))) - } - } - - test("count over an archive reads the right number of rows with an empty required schema") { - withArchiveFile() { archive => - writeArchive(archive, Seq( - "a.txt" -> textBytes("1\n2\n3\n"), - "b.txt" -> textBytes("4\n"))) - assert(read(archive.getCanonicalPath).count() == 4L) - } - } - - test("an archive always yields a single partition regardless of size") { - withArchiveFile() { archive => - val big = (1 to 1000).map(i => s"value-$i").mkString("\n") - writeArchive(archive, (0 until 4).map(i => s"part-$i.txt" -> textBytes(big + "\n"))) - withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "1024") { - val readDf = read(archive.getCanonicalPath) - assert(readDf.rdd.getNumPartitions == 1, - s"archive should be a single partition; got ${readDf.rdd.getNumPartitions}") - assert(readDf.count() == 4000L) - } - } - } - - Seq(true, false).foreach { ignoreCorrupt => - test(s"ignoreCorruptFiles=$ignoreCorrupt controls whether a corrupt archive is skipped") { - withArchiveFile("tar.gz") { archive => - writeCorruptArchive(archive) - withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorrupt.toString) { - if (ignoreCorrupt) { - checkAnswer(read(archive.getCanonicalPath), Seq.empty[Row]) - } else { - intercept[SparkException](read(archive.getCanonicalPath).collect()) - } - } - } - } - } +class TextTarArchiveReadSuite extends TextArchiveReadBase with TarArchiveTestUtils { - Seq(true, false).foreach { ignoreMissing => - test(s"ignoreMissingFiles=$ignoreMissing controls whether a missing archive is skipped") { - withArchiveFile() { archive => - writeArchive(archive, Seq("a.txt" -> textBytes("line1\nline2\n"))) - withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> ignoreMissing.toString) { - // The archive is listed when the DataFrame is built, then deleted before the scan opens - // it, so the reader hits a missing file -- handled by `FileScanRDD`, like any file. - val df = read(archive.getCanonicalPath) - assert(archive.delete(), s"failed to delete $archive") - if (ignoreMissing) { - checkAnswer(df, Seq.empty[Row]) - } else { - intercept[SparkException](df.collect()) - } - } - } - } - } + override protected def corruptArchiveExtension: String = "tar.gz" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TextZipArchiveReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TextZipArchiveReadSuite.scala new file mode 100644 index 000000000000..a22951e8d5df --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TextZipArchiveReadSuite.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +/** + * Reads of text files packed in zip archives (`.zip`): the shared text archive tests from + * [[TextArchiveReadBase]], run over zip containers via [[ZipArchiveTestUtils]]. + */ +class TextZipArchiveReadSuite extends TextArchiveReadBase with ZipArchiveTestUtils { + + override protected def corruptArchiveExtension: String = "zip" +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/XMLZipArchiveReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/XMLZipArchiveReadSuite.scala new file mode 100644 index 000000000000..9721561b756f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/XMLZipArchiveReadSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +/** + * Reads of XML files packed in zip archives (`.zip`): the shared archive tests from + * [[ArchiveReadSuiteBase]] plus the XML-specific ones from [[XMLArchiveReadBase]], run over zip + * containers via [[ZipArchiveReadBase]]. + */ +class XMLZipArchiveReadSuite + extends ArchiveReadSuiteBase + with XMLArchiveReadBase + with ZipArchiveReadBase diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ZipArchiveReadBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ZipArchiveReadBase.scala new file mode 100644 index 000000000000..516468a30584 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ZipArchiveReadBase.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +/** + * Binds [[ArchiveReadSuiteBase]]'s archive-format hooks to zip containers (`.zip`). The + * container-writing helpers live in [[ZipArchiveTestUtils]] (shared with standalone suites that + * cannot extend `ArchiveReadSuiteBase`). Reusable across file formats -- a + * `ZipArchiveReadSuite` mixes this in alongside the file-format trait. + */ +trait ZipArchiveReadBase extends ArchiveReadSuiteBase with ZipArchiveTestUtils { + + override protected def corruptArchiveExtension: String = "zip" +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ZipArchiveTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ZipArchiveTestUtils.scala new file mode 100644 index 000000000000..cf9e42262a7b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ZipArchiveTestUtils.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{File, FileOutputStream} +import java.nio.charset.StandardCharsets +import java.nio.file.Files + +import org.apache.commons.compress.archivers.zip.{ZipArchiveEntry, ZipArchiveOutputStream} + +/** + * Zip container-writing helpers (`.zip`), independent of any read-test harness -- the zip peer of + * [[TarArchiveTestUtils]]. [[ZipArchiveReadBase]] mixes this into [[ArchiveReadSuiteBase]] for the + * format-agnostic suites, and standalone suites that cannot extend `ArchiveReadSuiteBase` (e.g. + * `TextZipArchiveReadSuite`) mix it in directly, so the container logic lives in one place. + */ +trait ZipArchiveTestUtils { + + /** Zip extensions to exercise; the head is the default. */ + protected def archiveExtensions: Seq[String] = Seq("zip") + + /** Writes `entries` (name -> bytes) into the zip archive at `dest`. */ + protected def writeArchive(dest: File, entries: Seq[(String, Array[Byte])]): Unit = { + val out = new ZipArchiveOutputStream(new FileOutputStream(dest)) + try { + entries.foreach { case (entryName, bytes) => + val entry = new ZipArchiveEntry(entryName) + entry.setSize(bytes.length.toLong) + out.putArchiveEntry(entry) + out.write(bytes) + out.closeArchiveEntry() + } + out.finish() + } finally out.close() + } + + /** + * Writes bytes that are not a readable zip archive to `dest`. The leading bytes are not a local + * file header signature, so `ZipArchiveInputStream` fails when the first entry is read (rather + * than silently reporting an empty archive). + */ + protected def writeCorruptArchive(dest: File): Unit = + Files.write(dest.toPath, "this is not a valid zip archive, just some random bytes" + .getBytes(StandardCharsets.UTF_8)) +}