Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/ensure-jars-have-correct-contents.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ allowed_expr+="|^org/apache/spark/sql/$"
allowed_expr+="|^org/apache/spark/sql/ExtendedExplainGenerator.*$"
allowed_expr+="|^org/apache/spark/CometPlugin.class$"
allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$"
allowed_expr+="|^org/apache/spark/CometSource.*$"
allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$"
allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.*$"
allowed_expr+="|^scala-collection-compat.properties$"
Expand Down
34 changes: 34 additions & 0 deletions spark/src/main/scala/org/apache/comet/CometMetricsListener.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet

import org.apache.spark.CometSource
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener

class CometMetricsListener extends QueryExecutionListener {

override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
val stats = CometCoverageStats.forPlan(qe.executedPlan)
CometSource.recordStats(stats)
}

override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
}
19 changes: 19 additions & 0 deletions spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,25 @@ class CometCoverageStats {
}
}

object CometCoverageStats {

/**
* Compute coverage stats for a plan without generating explain string.
*/
def forPlan(plan: SparkPlan): CometCoverageStats = {
val stats = new CometCoverageStats()
val explainInfo = new ExtendedExplainInfo()
explainInfo.generateTreeString(
CometExplainInfo.getActualPlan(plan),
0,
Seq(),
0,
new StringBuilder(),
stats)
stats
}
}

object CometExplainInfo {
val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")

Expand Down
62 changes: 62 additions & 0 deletions spark/src/main/scala/org/apache/spark/CometSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark

import org.apache.spark.metrics.source.Source

import com.codahale.metrics.{Counter, Gauge, MetricRegistry}

import org.apache.comet.CometCoverageStats

/**
* Exposes following metrics (hooked from CometCoverageStats)
* - operators.native: Total operators executed natively
* - operators.spark: Total operators that fell back to Spark
* - queries.planned: Total queries processed
* - transitions: Total Spark-to-Comet transitions
* - acceleration.ratio: native / (native + spark)
*/
object CometSource extends Source {
override val sourceName = "comet"
override val metricRegistry = new MetricRegistry()

val NATIVE_OPERATORS: Counter =
metricRegistry.counter(MetricRegistry.name("operators", "native"))
val SPARK_OPERATORS: Counter = metricRegistry.counter(MetricRegistry.name("operators", "spark"))
val QUERIES_PLANNED: Counter = metricRegistry.counter(MetricRegistry.name("queries", "planned"))
val TRANSITIONS: Counter = metricRegistry.counter(MetricRegistry.name("transitions"))

metricRegistry.register(
MetricRegistry.name("acceleration", "ratio"),
new Gauge[Double] {
override def getValue: Double = {
val native = NATIVE_OPERATORS.getCount
val total = native + SPARK_OPERATORS.getCount
if (total > 0) native.toDouble / total else 0.0
}
})

def recordStats(stats: CometCoverageStats): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't exactly ideal. These stats make sense per plan. Here we are incrementing all metrics globally, so all plans that are executed across all sessions in a Spark application will have their counts accumulated and may provide no useful information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. But spark does follow a similar approach here but there is always an option to improve granularity

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't saying the approach is wrong. Just pointing out that for these specific metrics, if the Spark application has multiple plans, the accumulated values are not necessarily meaningful.

NATIVE_OPERATORS.inc(stats.cometOperators)
SPARK_OPERATORS.inc(stats.sparkOperators)
TRANSITIONS.inc(stats.transitions)
QUERIES_PLANNED.inc()
}
}
22 changes: 22 additions & 0 deletions spark/src/main/scala/org/apache/spark/Plugins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
// register CometSparkSessionExtensions if it isn't already registered
CometDriverPlugin.registerCometSessionExtension(sc.conf)

// Register Comet metrics
CometDriverPlugin.registerCometMetrics(sc)

if (CometSparkSessionExtensions.shouldOverrideMemoryConf(sc.getConf)) {
val execMemOverhead = if (sc.getConf.contains(EXECUTOR_MEMORY_OVERHEAD.key)) {
sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)
Expand Down Expand Up @@ -101,6 +104,25 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
}

object CometDriverPlugin extends Logging {
def registerCometMetrics(sc: SparkContext): Unit = {
sc.env.metricsSystem.registerSource(CometSource)

val listenerKey = "spark.sql.queryExecutionListeners"
val listenerClass = "org.apache.comet.CometMetricsListener"
val listeners = sc.conf.get(listenerKey, "")
if (listeners.isEmpty) {
logInfo(s"Setting $listenerKey=$listenerClass")
sc.conf.set(listenerKey, listenerClass)
} else {
val currentListeners = listeners.split(",").map(_.trim)
if (!currentListeners.contains(listenerClass)) {
val newValue = s"$listeners,$listenerClass"
logInfo(s"Setting $listenerKey=$newValue")
sc.conf.set(listenerKey, newValue)
}
}
}

def registerCometSessionExtension(conf: SparkConf): Unit = {
val extensionKey = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key
val extensionClass = classOf[CometSparkSessionExtensions].getName
Expand Down
41 changes: 40 additions & 1 deletion spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.apache.spark

import org.apache.spark.sql.CometTestBase
import java.io.File

import org.apache.spark.sql.{CometTestBase, SaveMode}
import org.apache.spark.sql.internal.StaticSQLConf

class CometPluginsSuite extends CometTestBase {
Expand Down Expand Up @@ -77,6 +79,43 @@ class CometPluginsSuite extends CometTestBase {
}
}

test("CometSource metrics are recorded") {
val nativeBefore = CometSource.NATIVE_OPERATORS.getCount
val queriesBefore = CometSource.QUERIES_PLANNED.getCount

withTempPath { dir =>
val path = new File(dir, "test.parquet").toString
spark.range(1000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path)
spark.read.parquet(path).filter("id > 500").collect()
}
spark.sparkContext.listenerBus.waitUntilEmpty()
assert(
CometSource.QUERIES_PLANNED.getCount > queriesBefore,
"queries.planned should increment after query")
assert(
CometSource.NATIVE_OPERATORS.getCount > nativeBefore,
"operators.native should increment for native execution")
}

test("metrics not double counted with AQE") {
withSQLConf("spark.sql.adaptive.enabled" -> "true") {
withTempPath { dir =>
val path = new File(dir, "test.parquet").toString
spark.range(10000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path)

spark.sparkContext.listenerBus.waitUntilEmpty()
val queriesBefore = CometSource.QUERIES_PLANNED.getCount
spark.read.parquet(path).filter("id > 100").collect()
spark.read.parquet(path).filter("id > 200").collect()
spark.sparkContext.listenerBus.waitUntilEmpty()
val queriesAfter = CometSource.QUERIES_PLANNED.getCount
assert(
queriesAfter == queriesBefore + 2,
s"Expected 2 queries, got ${queriesAfter - queriesBefore}")
}
}
}

test("Default Comet memory overhead") {
val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead")
val execMemOverhead2 = spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")
Expand Down
Loading