[SPARK-57102][SQL] Support nanosecond-precision timestamps in the Parquet data source#56407
[SPARK-57102][SQL] Support nanosecond-precision timestamps in the Parquet data source#56407stevomitric wants to merge 7 commits into
Conversation
…and fix affected tests Extends Parquet nanosecond-timestamp support to the V2 file source (relax the ParquetTable type guard and propagate spark.sql.timestampNanosTypes.enabled in ParquetScan), and updates two existing tests the feature changes: SPARK-40819 pins the feature off to keep asserting the legacy TIMESTAMP(NANOS) reject path (the flag defaults on under tests); SPARK-57166 drops Parquet from the unsupported-datasource list. Adds a V2 round-trip test to ParquetTimestampNanosSuite. Co-authored-by: Isaac
|
cc @MaxGekk PTAL. |
MaxGekk
left a comment
There was a problem hiding this comment.
1 blocking, 3 non-blocking, 1 nit.
Design / architecture (1): write/read rebase asymmetry for TimestampLTZNanosType — see inline comment on ParquetWriteSupport.scala:278.
Correctness (2): weak disabled-feature assertion; overflow test covers NTZ only — see inline comments.
Suggestions (1): add schema-unit tests to ParquetSchemaSuite — see inline comment.
Nits (1): displaced inShredded comment — see inline comment.
… on read, strengthen tests Co-authored-by: Isaac
MaxGekk
left a comment
There was a problem hiding this comment.
5 addressed, 0 remaining, 2 new. (2 late catches — my own round-1 misses, not regressions.)
All round-1 findings resolved cleanly. The rebase exemption is now symmetric on read and write, documented inline, and pinned by the 3-mode invariance test — I independently verified the 1800-01-01 test value and the rebase-config coverage claim against SQLConf.
Suggestions (1)
ParquetWriteSupport.scala:192: out-of-range write fails as rawArithmeticException: long overflow— the existingDATETIME_OVERFLOWerror condition would fit; see inline
Nits: 1 minor item (see inline comment).
|
cc @uros-b PTAL when you get a chance. |
| schema.forall(f => isBatchReadSupported(sqlConf, f.dataType)) | ||
|
|
||
| def isBatchReadSupported(sqlConf: SQLConf, dt: DataType): Boolean = dt match { | ||
| case _: TimestampNTZNanosType | _: TimestampLTZNanosType => |
There was a problem hiding this comment.
Just leaving a performance note here: because isBatchReadSupported returns false for these types and isBatchReadSupportedForSchema uses forall, a single nanos column disables vectorized reads for the entire file/row group. That's fine and the vectorized follow-up is acknowledged.
| case _: TimestampLTZNanosType => | ||
| Types.primitive(INT64, repetition) | ||
| .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.NANOS)).named(field.name) | ||
|
|
||
| case _: TimestampNTZNanosType => | ||
| Types.primitive(INT64, repetition) | ||
| .as(LogicalTypeAnnotation.timestampType(false, TimeUnit.NANOS)).named(field.name) |
There was a problem hiding this comment.
It seems that the TimestampLTZNanosType and TimestampNTZNanosType cases have byte-for-byte identical bodies and guards. These two could collapse into one case, e.g.
case (_: TimestampLTZNanosType | _: TimestampNTZNanosType) if isNanosTimestamp(parquetType) =>
...
with the verbose annotation check extracted into a small predicate, mirroring the existing canReadAsTimestampNTZ helper.
There was a problem hiding this comment.
they differ in isAdjustedToUTC: LTZ writes timestampType(true, NANOS) and NTZ writes timestampType(false, NANOS)
| (row: SpecializedGetters, ordinal: Int) => recordConsumer.addLong(row.getLong(ordinal)) | ||
|
|
||
| // TIMESTAMP(NANOS) values are always proleptic Gregorian and are exempt from datetime | ||
| // rebasing; see the TIMESTAMP(NANOS) converters in `ParquetRowConverter` for details. |
There was a problem hiding this comment.
A bit of a nit: both converter cases are guarded on the Parquet annotation being TIMESTAMP(NANOS). If a user supplies an explicit read schema with a nanos type over a column whose Parquet annotation is not NANOS, both guards fail and the match falls through to the generic handling.
Let's just confirm that this produces a clear error rather than a confusing one. Schema clipping should normally prevent the situation, but a quick check (or an explicit unguarded case _: TimestampLTZNanosType => that throws a descriptive error) would make the contract explicit.
Please see how other similar types work, and let's consider whether we need to take care of this or not.
There was a problem hiding this comment.
Confirmed it produces a clear error, so I don't think a dedicated case is needed. When the annotation isn't NANOS the nanos guards fail and makeConverter falls through to its default case t => throw cannotCreateParquetConverterForDataTypeError(t, parquetType), which raises PARQUET_CONVERSION_FAILURE.UNSUPPORTED naming both the requested Spark type and the actual Parquet type. This is the same fall-through the existing types use - TimestampType/TimestampNTZTyp
| } | ||
|
|
||
| def parquetTimestampNanosOverflowError( | ||
| value: TimestampNanosVal, isNtz: Boolean): ArithmeticException = { |
There was a problem hiding this comment.
Why don't we declare SparkArithmeticException here? IIUC, that is what we throw below.
There was a problem hiding this comment.
Done - changed the return type to SparkArithmeticException.
uros-b
left a comment
There was a problem hiding this comment.
Thank you @stevomitric and @MaxGekk! I left a few more comments, but otherwise LGTM.
Resolve conflicts in the Parquet data source type-support checks.
While this branch was open, master added a temporary guard
`case _: AnyTimestampNanoType => false` ("not yet supported by this
datasource") to ParquetFileFormat.supportDataType and
ParquetTable.supportsDataType. This PR adds exactly that support, so the
guard is dropped on both, letting nanosecond-capable timestamp types fall
through to the AtomicType cases. This matches the PR's FileBasedDataSourceSuite
change, which moves Parquet out of the "unsupported nanos" list.
Also fix pre-existing CI failures on the branch (independent of the merge):
- ParquetTimestampNanosSuite referenced SQLConf.TYPES_FRAMEWORK_ENABLED,
which does not exist on master; gate solely on TIMESTAMP_NANOS_TYPES_ENABLED
(consistent with the rest of the suite).
- Reformat SparkDateTimeUtils.scala per scalafmt; the signature exceeded the
98-column limit after `private` became `private[sql]`.
Verified locally: sql/core compiles; ParquetTimestampNanosSuite and
ParquetSchemaSuite (147 tests) and FileBasedDataSourceSuite SPARK-57166 pass;
scalafmt and scalastyle are clean.
Co-authored-by: Isaac
What changes were proposed in this pull request?
This PR adds read and write support for the nanosecond-capable timestamp types
TimestampNTZNanosType(p)/TimestampLTZNanosType(p)(precisionpin[7, 9], from the SPIP [SPARK-56822]) in the built-in Parquet data source, gated behind the existing preview flagspark.sql.timestampNanosTypes.enabled.ParquetSchemaConverter, both directions):TimestampLTZNanosType/TimestampNTZNanosType->INT64annotatedTIMESTAMP(NANOS, isAdjustedToUTC)(isAdjustedToUTC = truefor LTZ,falsefor NTZ).INT64+TIMESTAMP(NANOS, ...)->TimestampLTZNanosType(9)/TimestampNTZNanosType(9). Parquet'sNANOSunit carries no precision parameter, so reads mint the canonical precision 9. The legacyspark.sql.legacy.parquet.nanosAsLongpath keeps precedence and is unchanged.ParquetRowConverter): anINT64epoch-nanoseconds value is split intoepochMicros = floorDiv(v, 1000)andnanosWithinMicro = floorMod(v, 1000)and stored asTimestampNanosVal.TIMESTAMP(NANOS)values are exempt from datetime rebasing on both read and write: theNANOSunit postdates the legacy hybrid-calendar writers, so such files are always proleptic Gregorian (thespark.sql.parquet.datetimeRebaseModeIn{Read,Write}configs only cover DATE / TIMESTAMP_MILLIS / TIMESTAMP_MICROS).ParquetWriteSupport): aTimestampNanosValis written asINT64epoch-nanoseconds using exact arithmetic (Math.addExact(Math.multiplyExact(epochMicros, 1000), nanosWithinMicro)); values outside the representableINT64epoch-nanosecond range (~1677-09-21 .. 2262-04-11) fail instead of silently wrapping.supportDataTypeguards (V1ParquetFileFormatand V2ParquetTable) are relaxed to accept the nanos types, and the feature flag is propagated to the read Hadoop configuration in both the V1 and V2 paths.ParquetUtils.isBatchReadSupported, so columnar reads transparently fall back to the row-based reader. Vectorized-reader support is a follow-up.Spark-written files round-trip the exact type (including precision) via the Spark schema stored in the Parquet key-value metadata; "foreign" files with no Spark metadata (e.g. produced by Trino/DuckDB/pandas) derive the nanos type from the Parquet annotation.
Why are the changes needed?
Nanosecond-precision timestamps are common in data produced by pandas/PyArrow, Trino, ClickHouse, DuckDB, and similar systems. Spark currently rejects Parquet
INT64 TIMESTAMP(NANOS)(PARQUET_TYPE_ILLEGAL), or, withspark.sql.legacy.parquet.nanosAsLong=true, reads it as a rawLongTypethat drops all timestamp and time-zone semantics. This PR lets Spark read and write such data as first-class nanosecond timestamp types, as part of the SPIP [SPARK-56822] "Timestamps with nanosecond precision".Does this PR introduce any user-facing change?
Yes, behind the preview flag
spark.sql.timestampNanosTypes.enabled(default off in production). When the flag is enabled:INT64 TIMESTAMP(NANOS, isAdjustedToUTC=true/false)are read asTimestampLTZNanosType(9)/TimestampNTZNanosType(9)instead of being rejected.INT64 TIMESTAMP(NANOS)).When the flag is off, behavior is unchanged, including the legacy
spark.sql.legacy.parquet.nanosAsLongescape hatch.How was this patch tested?
New
ParquetTimestampNanosSuitecovering: Spark write/read round-trip preserving value and precision atp= 7, 8, 9 (vectorized reader on and off); reading "foreign"TIMESTAMP(NANOS)files written directly via parquet-mr for both NTZ and LTZ, including a pre-epoch (negative) instant that exercises floor semantics and nulls;nanosAsLongprecedence; the disabled-feature error pinned viacheckError(PARQUET_TYPE_ILLEGAL, bothisAdjustedToUTCvalues); an out-of-INT64-range write failing for both NTZ and LTZ; rebase-mode invariance (a foreign pre-1883TIMESTAMP(NANOS)file reads identically underEXCEPTION/CORRECTED/LEGACY); a nested (array) column round-trip; and a V2 file-source round-trip. Schema-level conversion cases added toParquetSchemaSuite(round-trip for both nanos types atp= 9, write direction atp= 7, andnanosAsLongtaking precedence over the nanos types; thetestParquetToCatalyst/testSchemahelpers gained atimestampNanosTypesEnabledflag). Existing tests updated:SPARK-40819(pin the feature off to keep asserting the legacy reject path) andSPARK-57166(drop Parquet, which is now supported). scalastyle passes.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Anthropic Claude Opus 4.8)