Skip to content

[SPARK-57705][SQL] Read CSV, JSON, text, and XML files from zip archives#56784

Open
akshatshenoi-db wants to merge 1 commit into
apache:masterfrom
akshatshenoi-db:archive-zip
Open

[SPARK-57705][SQL] Read CSV, JSON, text, and XML files from zip archives#56784
akshatshenoi-db wants to merge 1 commit into
apache:masterfrom
akshatshenoi-db:archive-zip

Conversation

@akshatshenoi-db

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Add zip (.zip) support to the streaming archive reader (ArchiveReader), extending the existing tar support (.tar/.tar.gz/.tgz) and continuing the archive-reader series: SPARK-57135 (CSV read), SPARK-57321 (CSV inference), SPARK-57419 (JSON), SPARK-57478 (text), SPARK-57479 (XML), SPARK-57481 (Avro).

The archive read/inference integration is format-agnostic -- every data source dispatches through ArchiveReader.isArchivePath and ArchiveReader(path).readEntries(...) -- so zip works for every data source already wired up (CSV, JSON, text, XML, Avro) with no per-data-source changes.

The change is concentrated in ArchiveReader. The entry-streaming engine (lazy one-entry-at-a-time advance, directory/dotfile/marker skipping with the same ignoredPathSegmentRegex filter as a loose-file listing, close-shielded entry streams, eager-first-entry error surfacing, task-completion cleanup) is hoisted from TarArchiveReader into the abstract base. Since TarArchiveInputStream and ZipArchiveInputStream both extend commons-compress ArchiveInputStream, the base steps entries via getNextEntry directly and a subclass implements only openArchiveStream(conf): ArchiveInputStream[_ <: ArchiveEntry]:

ArchiveReader (abstract)            -- shared readEntries engine + entry filtering
  +- TarArchiveReader               -- opens a TarArchiveInputStream (explicit .tgz gunzip)
  +- ZipArchiveReader  (new)        -- opens a ZipArchiveInputStream
  • TarArchiveReader is reduced to opening a TarArchiveInputStream (keeping the .tgz explicit gunzip, with a defensive close so a gzip-header failure can't leak the base stream); behavior is unchanged.
  • New ZipArchiveReader opens a ZipArchiveInputStream (zip entries are individually deflated, so no Hadoop codec layer is applied). It streams local file headers sequentially, matching the tar reader's pure-streaming model; a few unusual zips (e.g. a deflated entry whose size is recorded only in a trailing data descriptor) are not streamable this way.
  • isArchivePath / apply now recognize and dispatch .zip (apply matches the tar and zip extensions explicitly and throws on anything else).
  • The spark.sql.files.archive.reader.enabled doc is simplified to describe the user-facing behavior without enumerating formats or implementation details. No new flag -- zip rides the existing (default-off) gate.

Why are the changes needed?

The archive reader already supports CSV, JSON, text, XML, and Avro over tar. Zip is one of the most common archive containers for shipped data, and extending the same opt-in archive path to zip lets users read and infer schema from files packed in a .zip without unpacking them first, with the same directory-read parity the rest of the series guarantees -- and, because the integration is format-agnostic, across every data source already wired up.

Does this PR introduce any user-facing change?

Yes. When spark.sql.files.archive.reader.enabled is set (default false), the CSV, JSON, text, XML, and Avro data sources can now read .zip archives in addition to tar archives -- each archive is read as a single split and its entries are streamed through the data source's parser (never unpacked to disk), as if the entries were separate files, during both scan and schema inference. Previously only .tar/.tar.gz/.tgz were recognized; .zip files were treated as ordinary (non-archive) files. With the flag at its default, behavior is unchanged.

How was this patch tested?

  • ArchiveReaderSuite -- added .zip cases for isArchivePath dispatch and readEntries (empty/single/multiple entries, directory and dotfile/marker skipping, lazy advance, non-closing entry stream), alongside the existing tar cases that guard the refactor.
  • New zip format suites mounted on the existing shared scaffolding: CSVHeaderZipArchiveReadSuite, CSVHeaderlessZipArchiveReadSuite, JSONZipArchiveReadSuite, XMLZipArchiveReadSuite, TextZipArchiveReadSuite, and AvroZipArchiveReadSuite. The text read tests were extracted into a container-agnostic TextArchiveReadBase (with the tar suite slimmed to match the CSV/JSON/XML base+container pattern), and ZipArchiveReadBase/ZipArchiveTestUtils are the zip peers of TarArchiveReadBase/TarArchiveTestUtils.
  • Re-ran all tar archive suites (including Avro) to confirm the shared-engine refactor is behavior-preserving.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

@HyukjinKwon HyukjinKwon left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 blocking, 2 non-blocking, 0 nits.
A clean, well-documented generalization of the tar reader — the entry-streaming engine is hoisted verbatim into the base, tar behavior is unchanged, and zip is wired in with no per-data-source changes. Two non-blocking follow-ups, both on the one genuinely new surface: zip entries commons-compress can't stream.

Correctness (1)

  • ArchiveReader.scala:138: advance loop doesn't check canReadEntryData(entry) for non-streamable zip entries (data descriptor / unsupported method / encryption) — see inline

Suggestions (1)

  • No test pins the documented "not streamable" zip behavior — add a STORED-with-data-descriptor fixture — see inline

case c: Closeable => try c.close() catch { case NonFatal(_) => }
case _ =>
}
var entry = archive.getNextEntry

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The advance loop calls parseEntry on every non-skipped entry without consulting ArchiveInputStream.canReadEntryData(entry). For ZipArchiveInputStream that returns false for entries it can't stream — a STORED entry with a trailing data descriptor, an unsupported compression method, or encryption — i.e. exactly the "a few unusual zips … are not streamable this way" case the new Scaladoc documents.

Question: on such an entry, does the current path fail loudly with a clear error, or can it silently yield a truncated/garbled entry? If the latter is possible, consider guarding here:

if (entry != null && !archive.canReadEntryData(entry)) {
  throw <clear error naming the unsupported zip feature>
}

so the documented limitation surfaces deterministically rather than depending on ZipArchiveInputStream's default read behavior.

}
override protected def openArchiveStream(
conf: Configuration): ArchiveInputStream[_ <: ArchiveEntry] =
new ZipArchiveInputStream(CodecStreams.createInputStreamWithCloseResource(conf, path))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "not streamable" limitation this ZipArchiveReader documents has no test pinning its behavior. ZipArchiveTestUtils.writeArchive uses ZipArchiveOutputStream (good — an independent producer, distinct from the reader) and there's a writeCorruptArchive negative case, but nothing exercises a valid-but-non-streamable zip. Consider adding a fixture (e.g. a STORED entry written with a data descriptor) that asserts the chosen behavior, so a future change can't silently turn the canReadEntryData gap above into corrupt rows.

@akshatshenoi-db

Copy link
Copy Markdown
Contributor Author

Self-review (AI-assisted) — 0 blocking, 0 non-blocking, 0 nits.

Reviewed the engine hoist + .zip extension. After the refactor TarArchiveReader and ZipArchiveReader differ in exactly one method (openArchiveStream), with the shared readEntries engine — including ignoredPathSegmentRegex filtering, close-shielding, eager-first-entry error surfacing, and task-completion cleanup — inherited from the base. A state-space pass over the wrapped ZipArchiveInputStream found every case handled or documented: the one unstreamable-zip case (a STORED entry sized only by a trailing data descriptor) surfaces as an exception, not silent corruption. The new ZipArchiveReader correctly applies no Hadoop codec layer, since zip entries are individually deflated.

Test coverage mirrors the merged tar suites: ArchiveReaderSuite .zip cases (dispatch, empty/single/multiple entries, dir/dotfile/marker skipping, lazy advance, non-closing entry stream) plus per-format *ZipArchiveReadSuite on the shared scaffolding, with the text tests extracted into a container-agnostic TextArchiveReadBase. Compile/test validation rests on Apache CI (no warm local OSS build).

@cloud-fan cloud-fan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 blocking, 1 non-blocking, 0 nits.
A clean generalization of the tar reader — the readEntries engine is hoisted verbatim into the base, tar behavior is preserved, and zip is a thin openArchiveStream subclass with full tar/zip test parity (independent-producer fixture writer).

Correctness (1)

  • ArchiveReader.scala:296: Scaladoc names the wrong non-streamable zip case (deflated vs STORED) — see inline

Verification

Traced the ZipArchiveInputStream state space against commons-compress 1.28.0 source: empty / normal / directory / name-filtered / partial-read-then-advance entries are all handled, and every non-streamable case (STORED + data descriptor, encryption, unsupported compression) makes read() throw UnsupportedZipFeatureException (read L939-944) — i.e. it fails loudly, not as silent corruption. This confirms the documented limitation is safe at runtime and is what motivates the doc correction below.

Note: @HyukjinKwon's existing thread already covers a canReadEntryData guard (L138) and a missing non-streamable-zip test (L303); not duplicating those. On the guard question specifically — since read() already throws deterministically for every unstreamable entry, the guard would only produce an earlier/cleaner error, not fix a correctness gap; non-blocking is right.

* container (the container itself is not gzip-wrapped), so `ZipArchiveInputStream` decompresses
* entries as they are streamed and no Hadoop codec layer is applied. The stream reads local file
* headers sequentially rather than the central directory, matching the tar reader's pure-streaming
* model: a few unusual zips (e.g. a deflated entry whose size is recorded only in a trailing data

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scaladoc cites "a deflated entry whose size is recorded only in a trailing data descriptor" as not streamable, but per commons-compress 1.28.0 a DEFLATED entry with a data descriptor is streamable: supportsDataDescriptorFor and supportsCompressedSizeFor both return true when method == DEFLATED, and readDeflated detects end-of-stream via inf.finished() independent of the declared size, so canReadEntryData is true and read() doesn't throw.

The actually non-streamable case is a STORED entry with a data descriptor — supportsDataDescriptorFor returns false (STORED + data descriptor, with allowStoredEntriesWithDataDescriptor false by default), so read() throws UnsupportedZipFeatureException. Your own self-review says "STORED", and ZipArchiveTestUtils.writeArchive (writing through a non-seekable FileOutputStream, which forces data descriptors on DEFLATED entries) produces DEFLATED+data-descriptor entries that stream fine — which confirms the doc's example is the streamable case. Same wording is worth fixing in the PR description too.

Suggested change
* model: a few unusual zips (e.g. a deflated entry whose size is recorded only in a trailing data
* model: a few unusual zips (e.g. a stored entry whose size is recorded only in a trailing data

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants