Skip to content

Commit 1e636d0

Browse files
BrendanWalshmhamilton723
authored andcommitted
chore: Adding Spark35 support
1 parent 33b5a39 commit 1e636d0

File tree

49 files changed

+166
-127
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+166
-127
lines changed

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,22 @@ In Microsoft Fabric notebooks SynapseML is already installed. To change the vers
112112

113113
In Azure Synapse notebooks please place the following in the first cell of your notebook.
114114

115+
- For Spark 3.5 Pools:
116+
117+
```bash
118+
%%configure -f
119+
{
120+
"name": "synapseml",
121+
"conf": {
122+
"spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.3",
123+
"spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
124+
"spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind",
125+
"spark.yarn.user.classpath.first": "true",
126+
"spark.sql.parquet.enableVectorizedReader": "false"
127+
}
128+
}
129+
```
130+
115131
- For Spark 3.4 Pools:
116132

117133
```bash

build.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import scala.xml.transform.{RewriteRule, RuleTransformer}
77
import scala.xml.{Node => XmlNode, NodeSeq => XmlNodeSeq, _}
88

99
val condaEnvName = "synapseml"
10-
val sparkVersion = "3.4.1"
10+
val sparkVersion = "3.5.0"
1111
name := "synapseml"
1212
ThisBuild / organization := "com.microsoft.azure"
1313
ThisBuild / scalaVersion := "2.12.17"
@@ -34,7 +34,7 @@ val extraDependencies = Seq(
3434
"com.jcraft" % "jsch" % "0.1.54",
3535
"org.apache.httpcomponents.client5" % "httpclient5" % "5.1.3",
3636
"org.apache.httpcomponents" % "httpmime" % "4.5.13",
37-
"com.linkedin.isolation-forest" %% "isolation-forest_3.4.2" % "3.0.4"
37+
"com.linkedin.isolation-forest" %% "isolation-forest_3.5.0" % "3.0.5"
3838
exclude("com.google.protobuf", "protobuf-java") exclude("org.apache.spark", "spark-mllib_2.12")
3939
exclude("org.apache.spark", "spark-core_2.12") exclude("org.apache.spark", "spark-avro_2.12")
4040
exclude("org.apache.spark", "spark-sql_2.12"),

cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/bing/BingImageSearch.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import org.apache.spark.injections.UDFUtils
1515
import org.apache.spark.ml.ComplexParamsReadable
1616
import org.apache.spark.ml.util._
1717
import org.apache.spark.sql.Row
18-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
18+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1919
import org.apache.spark.sql.functions.{col, explode}
2020
import org.apache.spark.sql.types._
2121
import spray.json.DefaultJsonProtocol._
@@ -44,7 +44,7 @@ object BingImageSearch extends ComplexParamsReadable[BingImageSearch] with Seria
4444
): Lambda = {
4545
Lambda({ df =>
4646
val outputSchema = df.schema.add(bytesCol, BinaryType, nullable = true)
47-
val encoder = RowEncoder(outputSchema)
47+
val encoder = ExpressionEncoder(outputSchema)
4848
df.toDF().mapPartitions { rows =>
4949
val futures = rows.map { row: Row =>
5050
(Future {

cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/openai/OpenAIPrompt.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import org.apache.http.entity.AbstractHttpEntity
1313
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
1414
import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap, ParamValidators}
1515
import org.apache.spark.ml.util.Identifiable
16+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1617
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, functions => F, types => T}
17-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
1818
import org.apache.spark.sql.functions.{col, udf}
1919
import org.apache.spark.sql.types.{DataType, StructField, StructType}
2020

@@ -142,7 +142,7 @@ class OpenAIPrompt(override val uid: String) extends Transformer
142142
} else {
143143
row
144144
}
145-
})(RowEncoder(df.schema))
145+
})(ExpressionEncoder(df.schema))
146146
}
147147

148148
override def transform(dataset: Dataset[_]): DataFrame = {

cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeakerEmotionInference.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import org.apache.http.entity.{AbstractHttpEntity, StringEntity}
1212
import org.apache.spark.ml.util.Identifiable
1313
import org.apache.spark.ml.{ComplexParamsReadable, NamespaceInjections, PipelineModel, Transformer}
1414
import org.apache.spark.sql.Row
15-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
15+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1616
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
1717
import org.apache.spark.sql.types.{DataType, StringType, StructType}
1818
import spray.json.DefaultJsonProtocol.StringJsonFormat
@@ -93,7 +93,7 @@ class SpeakerEmotionInference(override val uid: String)
9393
converter(row.getAs[Row](row.fieldIndex(getOutputCol)))
9494
)
9595
new GenericRowWithSchema((row.toSeq.dropRight(1) ++ Seq(ssml)).toArray, newSchema): Row
96-
})(RowEncoder({
96+
})(ExpressionEncoder({
9797
newSchema
9898
}))
9999
})

cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.injections.SConf
2424
import org.apache.spark.ml.param._
2525
import org.apache.spark.ml.util._
2626
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
27-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
27+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2828
import org.apache.spark.sql.functions._
2929
import org.apache.spark.sql.types._
3030
import org.apache.spark.sql.{DataFrame, Dataset, Row}
@@ -400,7 +400,7 @@ abstract class SpeechSDKBase extends Transformer
400400
ArrayType(responseTypeBinding.schema)
401401
}
402402

403-
val enc = RowEncoder(enrichedDf.schema.add(getOutputCol, addedSchema))
403+
val enc = ExpressionEncoder(enrichedDf.schema.add(getOutputCol, addedSchema))
404404
val sc = df.sparkSession.sparkContext
405405
val bConf = sc.broadcast(new SConf(sc.hadoopConfiguration))
406406
val isUriAudio = df.schema(getAudioDataCol).dataType match {

cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/TextToSpeech.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import org.apache.hadoop.io.{IOUtils => HUtils}
1515
import org.apache.spark.ml.param.{Param, ParamMap}
1616
import org.apache.spark.ml.util._
1717
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
18-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
18+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1919
import org.apache.spark.sql.types.StructType
2020
import org.apache.spark.sql.{DataFrame, Dataset, Row}
2121
import org.apache.spark.util.SerializableConfiguration
@@ -152,7 +152,7 @@ class TextToSpeech(override val uid: String)
152152
}
153153
Row.fromSeq(row.toSeq ++ Seq(errorRow))
154154
}.get
155-
}(RowEncoder(dataset.schema.add(getErrorCol, SpeechSynthesisError.schema)))
155+
}(ExpressionEncoder(dataset.schema.add(getErrorCol, SpeechSynthesisError.schema)))
156156
}
157157

158158
override def copy(extra: ParamMap): Transformer = defaultCopy(extra)

core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ object PackageUtils {
2121
// Use a fixed version for local testing
2222
// val PackageMavenCoordinate = s"$PackageGroup:$PackageName:1.0.9"
2323

24-
private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.4.1"
24+
private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.5.0"
2525
val PackageRepository: String = SparkMLRepository
2626

2727
// If testing onnx package with snapshots repo, make sure to switch to using

core/src/main/scala/com/microsoft/azure/synapse/ml/core/schema/SparkBindings.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ package com.microsoft.azure.synapse.ml.core.schema
55

66
import org.apache.spark.sql.Row
77
import org.apache.spark.sql.catalyst.InternalRow
8-
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
8+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
99
import org.apache.spark.sql.types.StructType
1010

1111
import scala.reflect.runtime.universe.TypeTag
@@ -14,7 +14,7 @@ abstract class SparkBindings[T: TypeTag] extends Serializable {
1414

1515
lazy val schema: StructType = enc.schema
1616
private lazy val enc: ExpressionEncoder[T] = ExpressionEncoder[T]().resolveAndBind()
17-
private lazy val rowEnc: ExpressionEncoder[Row] = RowEncoder(enc.schema).resolveAndBind()
17+
private lazy val rowEnc: ExpressionEncoder[Row] = ExpressionEncoder(enc.schema).resolveAndBind()
1818

1919
// WARNING: each time you use this function on a dataframe, you should make a new converter.
2020
// Spark does some magic that makes this leak memory if re-used on a

core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import org.apache.spark.ml.Transformer
1414
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
1515
import org.apache.spark.ml.linalg.Vector
1616
import org.apache.spark.ml.param._
17-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
17+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1818
import org.apache.spark.sql.expressions.UserDefinedFunction
1919
import org.apache.spark.sql.functions._
2020
import org.apache.spark.sql.types._
@@ -44,7 +44,7 @@ object LIMEUtils extends SLogging {
4444
case field if colsToSquish.contains(field.name) => StructField(field.name, ArrayType(field.dataType))
4545
case f => f
4646
})
47-
val encoder = RowEncoder(schema)
47+
val encoder = ExpressionEncoder(schema)
4848
val indiciesToSquish = colsToSquish.map(df.schema.fieldIndex)
4949
df.mapPartitions { it =>
5050
val isEmpty = it.isEmpty

0 commit comments

Comments
 (0)