Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2368,6 +2368,18 @@ class PlanGenerationTestSuite extends ConnectFunSuite with Logging {
fn.make_date(fn.lit(2018), fn.lit(5), fn.lit(14))
}

temporalFunctionTest("make_time") {
fn.make_time(fn.lit(12), fn.lit(13), fn.lit(14))
}

temporalFunctionTest("current_time") {
fn.current_time()
}

temporalFunctionTest("current_time with precision") {
fn.current_time(3)
}

temporalFunctionTest("months_between") {
fn.months_between(fn.current_date(), fn.col("d"))
}
Expand Down Expand Up @@ -3440,6 +3452,8 @@ class PlanGenerationTestSuite extends ConnectFunSuite with Logging {
fn.lit(Array(java.sql.Date.valueOf("2023-02-23"), java.sql.Date.valueOf("2023-03-01"))),
fn.lit(Array(java.time.Duration.ofSeconds(100L), java.time.Duration.ofSeconds(200L))),
fn.lit(Array(java.time.Period.ofDays(100), java.time.Period.ofDays(200))),
fn.lit(
Array(java.time.LocalTime.of(23, 59, 59, 999999999), java.time.LocalTime.of(12, 0, 0))),
fn.lit(Array(new CalendarInterval(2, 20, 100L), new CalendarInterval(2, 21, 200L))))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1745,6 +1745,52 @@ class ClientE2ETestSuite
checkAnswer(df, Row(LocalTime.of(12, 13, 14)))
}

test("SPARK-57566: make_time builtin returns a TIME value over Connect") {
val df = spark.range(1).select(make_time(lit(12), lit(13), lit(14)).as("t"))
assert(df.schema.fields.head.dataType.isInstanceOf[TimeType])
checkAnswer(df, Row(LocalTime.of(12, 13, 14)))
}

test("SPARK-57566: hour, minute and second extract fields from a TIME value over Connect") {
val df = spark
.range(1)
.select(make_time(lit(12), lit(13), lit(14)).as("t"))
.select(hour(col("t")), minute(col("t")), second(col("t")))
checkAnswer(df, Row(12, 13, 14))
}

test("SPARK-57566: current_time returns a TIME-typed column over Connect") {
val df = spark.sql("SELECT current_time()")
assert(df.schema.fields.head.dataType.isInstanceOf[TimeType])
}

test("SPARK-57566: TIME column round-trips via createDataFrame over Connect") {
val schema = StructType(Array(StructField("t", TimeType())))
val rows = Seq(
Row(LocalTime.of(1, 2, 3)),
Row(LocalTime.of(23, 59, 59)),
Row(LocalTime.of(12, 30, 45, 123456000)))
val df = spark.createDataFrame(rows.asJava, schema)
assert(df.schema.fields.head.dataType === TimeType())
checkAnswer(df, rows)
}

test("SPARK-57566: TIME column round-trips through a parquet datasource over Connect") {
val schema = StructType(Array(StructField("t", TimeType())))
val rows = Seq(
Row(LocalTime.of(1, 2, 3)),
Row(LocalTime.of(23, 59, 59)),
Row(LocalTime.of(12, 30, 45, 123456000)))
withTempPath { file =>
val path = file.toPath.toAbsolutePath.toString
spark.createDataFrame(rows.asJava, schema).write.parquet(path)

val df = spark.read.parquet(path)
assert(df.schema.fields.head.dataType === TimeType())
checkAnswer(df, rows)
}
}

test("SPARK-53054: DataFrameReader defaults to spark.sql.sources.default") {
withTempPath { file =>
val path = file.getAbsoluteFile.toURI.toString
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Project [current_time(6, Some(America/Los_Angeles)) AS current_time(6)#0]
+- LocalRelation <empty>, [d#0, t#0, s#0, x#0L, wt#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Project [current_time(3, Some(America/Los_Angeles)) AS current_time(3)#0]
+- LocalRelation <empty>, [d#0, t#0, s#0, x#0L, wt#0]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [[] AS ARRAY()#0, [[1],[2],[3]] AS ARRAY(ARRAY(1), ARRAY(2), ARRAY(3))#0, [[[1]],[[2]],[[3]]] AS ARRAY(ARRAY(ARRAY(1)), ARRAY(ARRAY(2)), ARRAY(ARRAY(3)))#0, [true,false] AS ARRAY(true, false)#0, 0x434445 AS X'434445'#0, [9872,9873,9874] AS ARRAY(9872S, 9873S, 9874S)#0, [-8726532,8726532,-8726533] AS ARRAY(-8726532, 8726532, -8726533)#0, [7834609328726531,7834609328726532,7834609328726533] AS ARRAY(7834609328726531L, 7834609328726532L, 7834609328726533L)#0, [2.718281828459045,1.0,2.0] AS ARRAY(2.718281828459045D, 1.0D, 2.0D)#0, [-0.8,-0.7,-0.9] AS ARRAY(CAST('-0.8' AS FLOAT), CAST('-0.7' AS FLOAT), CAST('-0.9' AS FLOAT))#0, [89.976200000000000000,89.976210000000000000] AS ARRAY(89.976200000000000000BD, 89.976210000000000000BD)#0, [89889.766723100000000000,89889.766723100000000000] AS ARRAY(89889.766723100000000000BD, 89889.766723100000000000BD)#0, [connect!,disconnect!] AS ARRAY('connect!', 'disconnect!')#0, TF AS TF#0, [ABCDEFGHIJ,BCDEFGHIJK] AS ARRAY('ABCDEFGHIJ', 'BCDEFGHIJK')#0, [18545,18546] AS ARRAY(DATE '2020-10-10', DATE '2020-10-11')#0, [1677155519808000,1677155519809000] AS ARRAY(TIMESTAMP '2023-02-23 04:31:59.808', TIMESTAMP '2023-02-23 04:31:59.809')#0, [12345000,23456000] AS ARRAY(TIMESTAMP '1969-12-31 16:00:12.345', TIMESTAMP '1969-12-31 16:00:23.456')#0, [1677184560000000,1677188160000000] AS ARRAY(TIMESTAMP_NTZ '2023-02-23 20:36:00', TIMESTAMP_NTZ '2023-02-23 21:36:00')#0, [19411,19417] AS ARRAY(DATE '2023-02-23', DATE '2023-03-01')#0, [100000000,200000000] AS ARRAY(INTERVAL '0 00:01:40' DAY TO SECOND, INTERVAL '0 00:03:20' DAY TO SECOND)#0, [0,0] AS ARRAY(INTERVAL '0-0' YEAR TO MONTH, INTERVAL '0-0' YEAR TO MONTH)#0, [2 months 20 days 0.0001 seconds,2 months 21 days 0.0002 seconds] AS ARRAY(INTERVAL '2 months 20 days 0.0001 seconds', INTERVAL '2 months 21 days 0.0002 seconds')#0]
Project [[] AS ARRAY()#0, [[1],[2],[3]] AS ARRAY(ARRAY(1), ARRAY(2), ARRAY(3))#0, [[[1]],[[2]],[[3]]] AS ARRAY(ARRAY(ARRAY(1)), ARRAY(ARRAY(2)), ARRAY(ARRAY(3)))#0, [true,false] AS ARRAY(true, false)#0, 0x434445 AS X'434445'#0, [9872,9873,9874] AS ARRAY(9872S, 9873S, 9874S)#0, [-8726532,8726532,-8726533] AS ARRAY(-8726532, 8726532, -8726533)#0, [7834609328726531,7834609328726532,7834609328726533] AS ARRAY(7834609328726531L, 7834609328726532L, 7834609328726533L)#0, [2.718281828459045,1.0,2.0] AS ARRAY(2.718281828459045D, 1.0D, 2.0D)#0, [-0.8,-0.7,-0.9] AS ARRAY(CAST('-0.8' AS FLOAT), CAST('-0.7' AS FLOAT), CAST('-0.9' AS FLOAT))#0, [89.976200000000000000,89.976210000000000000] AS ARRAY(89.976200000000000000BD, 89.976210000000000000BD)#0, [89889.766723100000000000,89889.766723100000000000] AS ARRAY(89889.766723100000000000BD, 89889.766723100000000000BD)#0, [connect!,disconnect!] AS ARRAY('connect!', 'disconnect!')#0, TF AS TF#0, [ABCDEFGHIJ,BCDEFGHIJK] AS ARRAY('ABCDEFGHIJ', 'BCDEFGHIJK')#0, [18545,18546] AS ARRAY(DATE '2020-10-10', DATE '2020-10-11')#0, [1677155519808000,1677155519809000] AS ARRAY(TIMESTAMP '2023-02-23 04:31:59.808', TIMESTAMP '2023-02-23 04:31:59.809')#0, [12345000,23456000] AS ARRAY(TIMESTAMP '1969-12-31 16:00:12.345', TIMESTAMP '1969-12-31 16:00:23.456')#0, [1677184560000000,1677188160000000] AS ARRAY(TIMESTAMP_NTZ '2023-02-23 20:36:00', TIMESTAMP_NTZ '2023-02-23 21:36:00')#0, [19411,19417] AS ARRAY(DATE '2023-02-23', DATE '2023-03-01')#0, [100000000,200000000] AS ARRAY(INTERVAL '0 00:01:40' DAY TO SECOND, INTERVAL '0 00:03:20' DAY TO SECOND)#0, [0,0] AS ARRAY(INTERVAL '0-0' YEAR TO MONTH, INTERVAL '0-0' YEAR TO MONTH)#0, [86399999999999,43200000000000] AS ARRAY(TIME '23:59:59.999999999', TIME '12:00:00')#0, [2 months 20 days 0.0001 seconds,2 months 21 days 0.0002 seconds] AS ARRAY(INTERVAL '2 months 20 days 0.0001 seconds', INTERVAL '2 months 21 days 0.0002 seconds')#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Project [static_invoke(DateTimeUtils.makeTime(12, 13, cast(14 as decimal(16,6)))) AS make_time(12, 13, 14)#0]
+- LocalRelation <empty>, [d#0, t#0, s#0, x#0L, wt#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"common": {
"planId": "1"
},
"project": {
"input": {
"common": {
"planId": "0"
},
"localRelation": {
"schema": "struct\u003cd:date,t:timestamp,s:string,x:bigint,wt:struct\u003cstart:timestamp,end:timestamp\u003e\u003e"
}
},
"expressions": [{
"unresolvedFunction": {
"functionName": "current_time",
"isInternal": false
},
"common": {
"origin": {
"jvmOrigin": {
"stackTrace": [{
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.functions$",
"methodName": "current_time",
"fileName": "functions.scala"
}, {
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
"methodName": "~~trimmed~anonfun~~",
"fileName": "PlanGenerationTestSuite.scala"
}]
}
}
}
}]
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"common": {
"planId": "1"
},
"project": {
"input": {
"common": {
"planId": "0"
},
"localRelation": {
"schema": "struct\u003cd:date,t:timestamp,s:string,x:bigint,wt:struct\u003cstart:timestamp,end:timestamp\u003e\u003e"
}
},
"expressions": [{
"unresolvedFunction": {
"functionName": "current_time",
"arguments": [{
"literal": {
"integer": 3
},
"common": {
"origin": {
"jvmOrigin": {
"stackTrace": [{
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.functions$",
"methodName": "current_time",
"fileName": "functions.scala"
}, {
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
"methodName": "~~trimmed~anonfun~~",
"fileName": "PlanGenerationTestSuite.scala"
}]
}
}
}
}],
"isInternal": false
},
"common": {
"origin": {
"jvmOrigin": {
"stackTrace": [{
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.functions$",
"methodName": "current_time",
"fileName": "functions.scala"
}, {
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
"methodName": "~~trimmed~anonfun~~",
"fileName": "PlanGenerationTestSuite.scala"
}]
}
}
}
}]
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,49 @@
}
}
}
}, {
"literal": {
"array": {
"elements": [{
"time": {
"nano": "86399999999999",
"precision": 6
}
}, {
"time": {
"nano": "43200000000000",
"precision": 6
}
}]
},
"dataType": {
"array": {
"elementType": {
"time": {
"precision": 6
}
},
"containsNull": true
}
}
},
"common": {
"origin": {
"jvmOrigin": {
"stackTrace": [{
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.functions$",
"methodName": "lit",
"fileName": "functions.scala"
}, {
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
"methodName": "~~trimmed~anonfun~~",
"fileName": "PlanGenerationTestSuite.scala"
}]
}
}
}
}, {
"literal": {
"array": {
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
{
"common": {
"planId": "1"
},
"project": {
"input": {
"common": {
"planId": "0"
},
"localRelation": {
"schema": "struct\u003cd:date,t:timestamp,s:string,x:bigint,wt:struct\u003cstart:timestamp,end:timestamp\u003e\u003e"
}
},
"expressions": [{
"unresolvedFunction": {
"functionName": "make_time",
"arguments": [{
"literal": {
"integer": 12
},
"common": {
"origin": {
"jvmOrigin": {
"stackTrace": [{
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.functions$",
"methodName": "lit",
"fileName": "functions.scala"
}, {
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
"methodName": "~~trimmed~anonfun~~",
"fileName": "PlanGenerationTestSuite.scala"
}]
}
}
}
}, {
"literal": {
"integer": 13
},
"common": {
"origin": {
"jvmOrigin": {
"stackTrace": [{
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.functions$",
"methodName": "lit",
"fileName": "functions.scala"
}, {
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
"methodName": "~~trimmed~anonfun~~",
"fileName": "PlanGenerationTestSuite.scala"
}]
}
}
}
}, {
"literal": {
"integer": 14
},
"common": {
"origin": {
"jvmOrigin": {
"stackTrace": [{
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.functions$",
"methodName": "lit",
"fileName": "functions.scala"
}, {
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
"methodName": "~~trimmed~anonfun~~",
"fileName": "PlanGenerationTestSuite.scala"
}]
}
}
}
}],
"isInternal": false
},
"common": {
"origin": {
"jvmOrigin": {
"stackTrace": [{
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.functions$",
"methodName": "make_time",
"fileName": "functions.scala"
}, {
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
"methodName": "~~trimmed~anonfun~~",
"fileName": "PlanGenerationTestSuite.scala"
}]
}
}
}
}]
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.connect.planner

import java.time.LocalTime

import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite

import org.apache.spark.connect.proto
Expand Down Expand Up @@ -53,6 +55,28 @@ class LiteralExpressionProtoConverterSuite extends AnyFunSuite { // scalastyle:i
}
}

test("SPARK-57566: TIME literal proto and catalyst value round-trip") {
val times =
Seq(LocalTime.of(0, 0, 0), LocalTime.of(12, 13, 14), LocalTime.of(23, 59, 59, 999999999))
for (t <- times) {
val literalProto = toLiteralProto(t, TimeType())
// The literal carries the TIME proto type with the expected precision.
assert(literalProto.getTime.getPrecision == TimeType.DEFAULT_PRECISION)
// Proto -> Scala value round-trips back to the original LocalTime.
assertResult(t)(LiteralValueProtoConverter.toScalaValue(literalProto))
// Proto -> Catalyst expression matches a directly-built catalyst literal.
val convert = CatalystTypeConverters.createToCatalystConverter(TimeType())
val expected = expressions.Literal(convert(t), TimeType())
assertResult(expected)(LiteralExpressionProtoConverter.toCatalystExpression(literalProto))
}
}

test("SPARK-57566: TIME literal proto propagates a non-default precision") {
val literalProto = toLiteralProto(LocalTime.of(1, 2, 3), TimeType(3))
assert(literalProto.getTime.getPrecision == 3)
assertResult(LocalTime.of(1, 2, 3))(LiteralValueProtoConverter.toScalaValue(literalProto))
}

// The goal of this test is to check that converting a Scala value -> Proto -> Catalyst value
// is equivalent to converting a Scala value directly to a Catalyst value.
Seq[(Any, DataType)](
Expand Down