From c8e40628a8a1678b4db9185135bedc9fe471120f Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 18 Jun 2026 16:02:26 +0800 Subject: [PATCH] [SPARK-57525][CONNECT] Declarative Pipelines should not throw NoSuchElementException when a run fails without an attached cause When a pipeline run fails, PipelinesHandler.startRun rethrows the failure to the Spark Connect client via runFailureEvent.foreach { event => throw event.error.get }. But event.error is None for some run termination reasons - UnexpectedRunFailure and FailureStoppingFlow both have cause = None - so event.error.get raised a NoSuchElementException, crashing the handler with a meaningless internal error and hiding the real failure (e.g. "Run failed unexpectedly.") from the client. These reasons reach this code via the asynchronous onCompletion path, where PipelineExecution.runPipeline's own catch never fires. Extract the rethrow into throwRunFailure: when a cause is present it is rethrown unchanged; when absent, throw a SparkException with a new PIPELINE_RUN_FAILED error condition carrying the run's termination message. PIPELINE_RUN_FAILED (rather than INTERNAL_ERROR) is used so operational outcomes such as FailureStoppingFlow are not mislabeled as Spark bugs. Added PipelinesHandlerSuite. The cause-less termination reasons cannot be triggered deterministically through the end-to-end run path, so the rethrow is unit-tested directly using the real UnexpectedRunFailure and FailureStoppingFlow messages. --- .../resources/error/error-conditions.json | 6 ++ .../connect/pipelines/PipelinesHandler.scala | 21 +++++- .../pipelines/PipelinesHandlerSuite.scala | 70 +++++++++++++++++++ 3 files changed, 94 insertions(+), 3 deletions(-) create mode 100644 sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandlerSuite.scala diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 8e7125a07c1e8..8afdcea716477 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -6178,6 +6178,12 @@ ], "sqlState" : "0A000" }, + "PIPELINE_RUN_FAILED" : { + "message" : [ + "" + ], + "sqlState" : "58000" + }, "PIPELINE_SQL_GRAPH_ELEMENT_REGISTRATION_ERROR" : { "message" : [ "", diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index f8edbc9928000..8eca9afa42675 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -22,6 +22,7 @@ import scala.util.Using import io.grpc.stub.StreamObserver +import org.apache.spark.SparkException import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{ExecutePlanResponse, PipelineCommandResult, Relation, ResolvedIdentifier} import org.apache.spark.connect.proto.PipelineCommand.DefineFlow.AutoCdcFlowDetails @@ -563,12 +564,26 @@ private[connect] object PipelinesHandler extends Logging { // Rethrow any exceptions that caused the pipeline run to fail so that the exception is // propagated back to the SC client / CLI. - runFailureEvent.foreach { event => - throw event.error.get - } + runFailureEvent.foreach(throwRunFailure) } } + /** + * Rethrows the failure behind a terminal run-failure event so it reaches the Spark Connect + * client. Most failures carry the underlying cause (e.g. a flow's QueryExecutionFailure), but + * some termination reasons (UnexpectedRunFailure, FailureStoppingFlow) have none. When the cause + * is absent, throw a PIPELINE_RUN_FAILED error built from the event message rather than calling + * Option.get, which would throw a NoSuchElementException and hide the real failure. Using + * PIPELINE_RUN_FAILED instead of INTERNAL_ERROR avoids mislabeling operational failures as bugs. + */ + private[connect] def throwRunFailure(failureEvent: PipelineEvent): Nothing = { + throw failureEvent.error.getOrElse( + new SparkException( + errorClass = "PIPELINE_RUN_FAILED", + messageParameters = Map("message" -> failureEvent.message), + cause = null)) + } + /** * Creates the table filters for the full refresh and refresh operations based on the StartRun * command user provided. Also validates the command parameters to ensure that they are diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandlerSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandlerSuite.scala new file mode 100644 index 0000000000000..fd90ce8eacc42 --- /dev/null +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandlerSuite.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.pipelines + +import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.pipelines.common.RunState +import org.apache.spark.sql.pipelines.graph.{FailureStoppingFlow, UnexpectedRunFailure} +import org.apache.spark.sql.pipelines.logging.{ + ConstructPipelineEvent, EventLevel, PipelineEventOrigin, RunProgress} + +class PipelinesHandlerSuite extends SparkFunSuite { + + private def runFailedEvent(message: String, error: Option[Throwable]) = + ConstructPipelineEvent( + origin = PipelineEventOrigin(datasetName = None, flowName = None, sourceCodeLocation = None), + // throwRunFailure only reads message and exception; the remaining fields are filled with + // valid placeholder values to construct the event. + level = EventLevel.INFO, + message = message, + details = RunProgress(RunState.FAILED), + exception = error) + + // Use the real no-cause termination-reason messages so the tests break if their wording drifts. + private val unexpectedRunFailureMessage = UnexpectedRunFailure().message + + private val failureStoppingFlowMessage = + FailureStoppingFlow( + Seq(TableIdentifier("t1", Some("db")), TableIdentifier("t2", Some("db")))).message + + // Regression guard rather than a fix-validation test: the old buggy code (throw error.get) also + // rethrew the cause unchanged, so this case passes against both implementations. The no-cause + // test below is the one that genuinely exercises this PR's fix. + test("throwRunFailure rethrows the underlying cause when present") { + val cause = new RuntimeException("boom") + val thrown = intercept[RuntimeException] { + PipelinesHandler.throwRunFailure(runFailedEvent("Run failed.", Some(cause))) + } + assert(thrown eq cause) + } + + test("throwRunFailure surfaces the message when the failure has no cause") { + // No-cause reasons must fall back to a PIPELINE_RUN_FAILED error built from the event message + // rather than raising NoSuchElementException; the message is forwarded verbatim. + Seq(unexpectedRunFailureMessage, failureStoppingFlowMessage).foreach { message => + val thrown = intercept[SparkException] { + PipelinesHandler.throwRunFailure(runFailedEvent(message, None)) + } + checkError( + thrown, + condition = "PIPELINE_RUN_FAILED", + parameters = Map("message" -> message)) + } + } +}