Improve Spark Connect compatibility for NDS and NDS-H#254
Improve Spark Connect compatibility for NDS and NDS-H#254sayedbilalbari merged 2 commits intoNVIDIA:devfrom
Conversation
Greptile SummaryThis PR adds Confidence Score: 5/5PR is safe to merge; all remaining concerns are minor quality items No P0/P1 findings. The two open items from prior review threads (hardcoded 'spark-connect' fallback ID and utils/ vs nds/ PysparkBenchReport divergence) are pre-existing P2 style/traceability concerns that do not block correct benchmark execution. The implementation is functionally correct, well-documented, and smoke-tested. utils/python_benchmark_reporter/PysparkBenchReport.py has slightly less hardened Spark Connect handling than its nds/ counterpart, but it is functionally safe. Important Files Changed
Sequence DiagramsequenceDiagram
participant CLI as CLI Caller
participant Runner as nds_power / nds_h_power
participant Builder as SparkSession.builder
participant SC as Spark Connect Server
participant Classic as Classic SparkContext
CLI->>Runner: --spark_connect sc://host:port
Runner->>Builder: builder.remote(spark_connect)
Builder->>SC: getOrCreate() via Spark Connect
SC-->>Runner: SparkSession (remote)
Runner->>Runner: _get_app_id(session)
Note right of Runner: 1. conf.get(spark.app.id)<br/>2. sparkContext.applicationId<br/>3. fallback spark-connect
Runner->>SC: run queries / setup tables
SC-->>Runner: results
Runner->>SC: spark_session.stop()
CLI->>Runner: (no --spark_connect)
Runner->>Builder: builder (classic)
Builder->>Classic: getOrCreate()
Classic-->>Runner: SparkSession (classic)
Runner->>Runner: _get_app_id(session)
Note right of Runner: conf.get fails → sparkContext.applicationId
Runner->>Classic: run queries
Classic-->>Runner: results
Runner->>Classic: spark_session.stop()
Greploops — Automatically fix all review issues by running Reviews (2): Last reviewed commit: "Document Spark Connect smoke-test expect..." | Re-trigger Greptile |
| def _get_spark_conf(self): | ||
| try: | ||
| return self.spark_session.sparkContext._conf.getAll() | ||
| except Exception: | ||
| get_all = getattr(self.spark_session.conf, 'getAll', None) | ||
| return get_all() if callable(get_all) else (get_all or []) |
There was a problem hiding this comment.
Spark Connect handling diverges from nds/ sibling
nds/PysparkBenchReport.py received a proactive is_remote() guard (via _is_spark_400_or_later) that skips sparkContext entirely in Spark Connect mode, while this copy only adds a try/except fallback. Consequently, in Spark Connect runs this file still attempts sparkContext._conf.getAll(), prints a warning on failure, and does not have a clean skip path for PythonListener registration. The broadened except Exception on the listener (line 88) handles it, but the nds/ version avoids the failed attempt entirely. Aligning this file with the nds/ approach would make Spark Connect behaviour consistent across both report implementations.
| def _get_app_id(spark_session): | ||
| # Spark Connect may not expose applicationId through sparkContext. | ||
| try: | ||
| return spark_session.conf.get("spark.app.id") | ||
| except Exception: | ||
| try: | ||
| return spark_session.sparkContext.applicationId | ||
| except Exception: | ||
| return "spark-connect" |
There was a problem hiding this comment.
Hardcoded fallback app ID makes benchmark runs indistinguishable in CSV
When both conf.get("spark.app.id") and sparkContext.applicationId raise, every row in the time-log CSV is tagged application_id = "spark-connect". Multiple runs against the same endpoint will produce CSVs that cannot be distinguished by application_id. Appending a timestamp (e.g. "spark-connect-{}".format(int(time.time() * 1000))) would give each run a unique identifier. The identical pattern appears in nds/nds_power.py.
| def _get_app_id(spark_session): | |
| # Spark Connect may not expose applicationId through sparkContext. | |
| try: | |
| return spark_session.conf.get("spark.app.id") | |
| except Exception: | |
| try: | |
| return spark_session.sparkContext.applicationId | |
| except Exception: | |
| return "spark-connect" | |
| def _get_app_id(spark_session): | |
| # Spark Connect may not expose applicationId through sparkContext. | |
| try: | |
| return spark_session.conf.get("spark.app.id") | |
| except Exception: | |
| try: | |
| return spark_session.sparkContext.applicationId | |
| except Exception: | |
| return "spark-connect-{}".format(int(time.time() * 1000)) |
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
b485533 to
2c5ce65
Compare
Summary
This PR tightens generic Spark Connect compatibility for the benchmark runners without adding router-specific behavior.
It makes the runner and reporting paths more tolerant of Spark Connect environments where
sparkContextaccess, listenerregistration, or config exposure differ from classic PySpark. It also documents the expected warning patterns seen during
smoke testing so healthy Spark Connect runs do not look broken.
Changes
--spark_connectsupport tonds_power.pyandnds_h_power.pySparkSession.builder.remote(...)when requestedapplicationIdis always available throughsparkContextspark_session.stop()for session shutdown in the Spark Connect pathsparkContextor listener registration is unavailableWhy
The previous benchmark code still had a few direct dependencies on classic Spark APIs that are not reliable in Spark Connect
runs, especially around app id lookup, listener registration, and config access.
The goal here is to keep the benchmark-side work generic:
Validation
python3 -m py_compile nds/nds_power.py nds-h/nds_h_power.py nds/PysparkBenchReport.py utils/python_benchmark_reporter/ PysparkBenchReport.pycompatibility fallbacks working for that path
Notes
Expected Spark Connect smoke-test warnings are now documented explicitly, including:
spark.locality.wait