From eb5c54c46a1209833c5498e7454fe63fec17f171 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 18 Jun 2026 00:24:31 +0800 Subject: [PATCH] [SPARK-57523][SQL] Declarative Pipelines should not retry flows that fail due to streaming source changes When a streaming flow fails because its set of sources changed since the last run (the "There are [N] sources in the checkpoint offsets and now there are [M] sources requested by the query. Cannot continue." assertion), the failure is unrecoverable without a full refresh. The flow's retry decision is made by GraphExecution.determineFlowExecutionActionFromError, which only looked at the retry count, so the flow was retried maxFlowRetryAttempts times against the same checkpoint, re-failing identically each time, and the run ended up reporting the unrelated "has failed more than N times" reason. Centralize the decision in determineFlowExecutionActionFromError - the intended single source of truth for retryability per its own TODO - which now recognizes the source-change error and stops the flow immediately with a dedicated StreamingSourcesChanged reason. The source-change predicate is factored into PipelinesErrors.streamingSourcesChanged, and the now-redundant special-case branch in checkStreamingErrorsAndRetry is removed so the handling flows through the single path. The existing source-change test now also asserts the flow fails exactly once (not maxFlowRetryAttempts + 1 times) and that the run no longer terminates with "has failed more than N times". --- .../sql/pipelines/graph/GraphExecution.scala | 38 +++++++++++++--- .../sql/pipelines/graph/PipelinesErrors.scala | 43 +++++++++---------- .../graph/TriggeredGraphExecutionSuite.scala | 13 ++++++ 3 files changed, 64 insertions(+), 30 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala index c687c7f01ed7a..4f366bcf3ffe8 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala @@ -261,12 +261,32 @@ object GraphExecution extends Logging { } /** - * Analyze the exception thrown by flow execution and figure out if we should retry the execution, - * or we need to reanalyze the flow entirely to resolve issues like schema changes. + * Represents that the `FlowExecution` should be stopped because a streaming flow's set of + * sources changed since the last run. This is unrecoverable without a full refresh, so the flow + * must not be retried regardless of the remaining retry budget. + */ + private case class StreamingSourcesChanged( + cause: Throwable, + flowDisplayName: String + ) extends FlowExecutionStopReason { + override lazy val runTerminationReason: RunTerminationReason = { + QueryExecutionFailure(flowDisplayName, maxRetries = 0, Option(cause)) + } + override lazy val failureMessage: String = { + s"Flow '$flowDisplayName' had streaming sources added or removed. It will not be " + + s"retried. Please perform a full refresh to rebuild it against the current sources." + } + } + + /** + * Analyze the exception thrown by flow execution and decide whether to retry the execution or + * stop it. The result is either RetryFlowExecution or StopFlowExecution; this function does not + * reanalyze the flow itself. * This should be the narrow waist for all exception analysis in flow execution. - * TODO: currently it only handles schema change and max retries, we should aim to extend this to - * include other non-retryable exception as well so we can have a single SoT for all these error - * matching logic. + * Currently it handles max retries and streaming source changes; other non-retryable errors are + * still routed through the retry path. + * TODO: extend this to include other non-retryable exceptions as well so we can have a single + * SoT for all these error matching logic. * @param ex Exception to analyze. * @param flowDisplayName The user facing flow name with the error. * @param currentNumTries Number of times the flow has been tried. @@ -278,8 +298,12 @@ object GraphExecution extends Logging { currentNumTries: => Int, maxAllowedRetries: => Int ): FlowExecutionAction = { - val flowExecutionNonRetryableReasonOpt = if (currentNumTries > maxAllowedRetries) { - Some(MaxRetryExceeded(ex, flowDisplayName, maxAllowedRetries)) + val error = ex + val flowExecutionNonRetryableReasonOpt = if (PipelinesErrors.streamingSourcesChanged(error)) { + // Source-set changes need a full refresh, so they are never retried. + Some(StreamingSourcesChanged(error, flowDisplayName)) + } else if (currentNumTries > maxAllowedRetries) { + Some(MaxRetryExceeded(error, flowDisplayName, maxAllowedRetries)) } else { None } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala index b194e9c235fba..aab3e08df49f3 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala @@ -63,6 +63,23 @@ object PipelinesErrors extends Logging { getExceptionChain(throwable).exists(check) } + /** + * Returns true if `ex` (or any of its causes) indicates that a streaming flow's set of sources + * changed since the last run. This is unrecoverable without a full refresh, so a flow that fails + * with this error must not be retried. + */ + private[graph] def streamingSourcesChanged(ex: Throwable): Boolean = { + checkCauses( + throwable = ex, + check = cause => { + cause.isInstanceOf[AssertionError] && + cause.getMessage != null && + cause.getMessage.contains("sources in the checkpoint offsets and now there are") && + cause.getMessage.contains("sources requested by the query. Cannot continue.") + } + ) + } + /** * Checks an error for streaming specific handling. This is a pretty messy signature as a result * of unifying some divergences between the triggered caller in TriggeredGraphExecution and the @@ -87,28 +104,7 @@ object PipelinesErrors extends Logging { maxRetries: Int, onRetry: => Unit ): Unit = { - if (PipelinesErrors.checkCauses( - throwable = ex, - check = ex => { - ex.isInstanceOf[AssertionError] && - ex.getMessage != null && - ex.getMessage.contains("sources in the checkpoint offsets and now there are") && - ex.getMessage.contains("sources requested by the query. Cannot continue.") - } - )) { - val message = s""" - |Flow '${flow.displayName}' had streaming sources added or removed. Please perform a - |full refresh in order to rebuild '${flow.displayName}' against the current set of - |sources. - |""".stripMargin - - env.flowProgressEventLogger.recordFailed( - flow = flow, - exception = ex, - logAsWarn = false, - messageOpt = Option(message) - ) - } else if (flow.once && ex == null) { + if (flow.once && ex == null) { // No need to do anything if this is a ONCE flow with no exception. That just means it's done. } else { val actionFromError = GraphExecution.determineFlowExecutionActionFromError( @@ -120,7 +116,8 @@ object PipelinesErrors extends Logging { actionFromError match { // Simply retry case GraphExecution.RetryFlowExecution => onRetry - // Schema change exception + // Non-retryable stop reason (max retries exceeded, streaming sources changed, ...). + // When shouldRethrow is true, this rethrows so the run stops eagerly on these reasons. case GraphExecution.StopFlowExecution(reason) => val msg = reason.failureMessage if (reason.warnInsteadOfError) { diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala index 57baf4c2d5b11..5c31c8fb95ff9 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala @@ -462,6 +462,8 @@ class TriggeredGraphExecutionSuite extends ExecutionTest with SharedSparkSession updateContext2.pipelineExecution.runPipeline() updateContext2.pipelineExecution.awaitCompletion() + // A streaming source change is unrecoverable without a full refresh, so the flow must not be + // retried: we should see exactly one failure rather than maxFlowRetryAttempts + 1 of them. assertFlowProgressEvent( eventBuffer = updateContext2.eventBuffer, identifier = fullyQualifiedIdentifier("input_table"), @@ -469,6 +471,17 @@ class TriggeredGraphExecutionSuite extends ExecutionTest with SharedSparkSession expectedEventLevel = EventLevel.ERROR, msgChecker = _.contains( s"Flow '${eventLogName("input_table")}' had streaming sources added or removed." + ), + expectedNumOfEvents = Option(1) + ) + + // The run should fail because of the source change, not because the flow exhausted its retries. + assertRunProgressEvent( + eventBuffer = updateContext2.eventBuffer, + state = RunState.FAILED, + expectedEventLevel = EventLevel.ERROR, + msgChecker = _.contains( + s"flow '${eventLogName("input_table")}' has failed." ) ) }