From 693f88257c8545316c7496b1f7e15b5b564da3bc Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 13 Apr 2026 11:47:48 +0900 Subject: [PATCH 1/8] [WIP] state operator metric fix --- .../apache/spark/sql/streaming/progress.scala | 5 +- .../streaming/runtime/ProgressReporter.scala | 4 +- .../streaming/ProgressReporterSuite.scala | 88 +++++++++++++++++++ 3 files changed, 92 insertions(+), 5 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 5652545ea5676..5fba2731cf061 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -67,13 +67,14 @@ class StateOperatorProgress private[spark] ( private[sql] def copy( newNumRowsUpdated: Long, - newNumRowsDroppedByWatermark: Long): StateOperatorProgress = + newNumRowsDroppedByWatermark: Long, + newNumRowsRemoved: Long = numRowsRemoved): StateOperatorProgress = new StateOperatorProgress( operatorName = operatorName, numRowsTotal = numRowsTotal, numRowsUpdated = newNumRowsUpdated, allUpdatesTimeMs = allUpdatesTimeMs, - numRowsRemoved = numRowsRemoved, + numRowsRemoved = newNumRowsRemoved, allRemovalsTimeMs = allRemovalsTimeMs, commitTimeMs = commitTimeMs, memoryUsedBytes = memoryUsedBytes, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala index dd0d91f39f7c0..2735211e436f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala @@ -648,12 +648,10 @@ abstract class ProgressContext( * New execution stats will only retain the values as a snapshot of the query status. * (E.g. for stateful operators, numRowsTotal is a snapshot of the status, whereas * numRowsUpdated is bound to the batch.) - * TODO: We do not seem to clear up all values in StateOperatorProgress which are bound to the - * batch. Fix this. */ private def resetExecStatsForNoExecution(originExecStats: ExecutionStats): ExecutionStats = { val newStatefulOperators = originExecStats.stateOperators.map { so => - so.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0) + so.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0, newNumRowsRemoved = 0) } val newEventTimeStats = if (originExecStats.eventTimeStats.contains("watermark")) { Map("watermark" -> progressReporter.formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala new file mode 100644 index 0000000000000..43f33f1c356dd --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.functions.{count, timestamp_seconds, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock + +class ProgressReporterSuite extends StreamTest { + + import testImplicits._ + + test("no-data batch resets numRowsRemoved to zero via resetExecStatsForNoExecution") { + // After a stateful operator evicts state rows via watermark, subsequent no-execution triggers + // must report numRowsRemoved=0. Exercises: finishNoExecutionTrigger -> + // resetExecStatsForNoExecution -> StateOperatorProgress.copy + val clock = new StreamManualClock + val input = MemoryStream[Int] + val agg = input.toDF() + .select(timestamp_seconds($"value") as "ts", $"value") + .withWatermark("ts", "10 seconds") + .groupBy(window($"ts", "10 seconds")) + .agg(count("*") as "cnt") + + withSQLConf(SQLConf.STREAMING_POLLING_DELAY.key -> "0") { + testStream(agg, outputMode = OutputMode.Update)( + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), + AddData(input, 1, 2, 3), + AdvanceManualClock(1 * 1000), + AddData(input, 21), + AdvanceManualClock(1 * 1000), + Execute("verify eviction") { q => + val lastProgress = q.recentProgress.filter(_.stateOperators.nonEmpty).last + assert( + lastProgress.stateOperators.head.numRowsRemoved > 0, + s"Expected eviction but numRowsRemoved=${lastProgress.stateOperators.head.numRowsRemoved}") + }, + // Manual clock advance schedules the next trigger; with no runnable batch the engine + // reports progress via finishNoExecutionTrigger -> resetExecStatsForNoExecution. + AdvanceManualClock(1 * 1000), + AssertOnQuery("numRowsRemoved must be 0 in idle trigger after eviction") { q => + val progresses = q.recentProgress.filter(_.stateOperators.nonEmpty) + val evictionIdx = progresses.lastIndexWhere(_.stateOperators.head.numRowsRemoved > 0) + assert(evictionIdx >= 0, "Expected eviction batch in progresses") + + val idleProgresses = progresses.drop(evictionIdx + 1).filter(_.batchDuration <= 1) + assert( + idleProgresses.nonEmpty, + "Expected idle trigger progress after eviction, after eviction: " + + progresses.drop(evictionIdx).map(p => + s"(bid=${p.batchId},dur=${p.batchDuration}," + + s"removed=${p.stateOperators.head.numRowsRemoved})").mkString(", ")) + + val idleProgress = idleProgresses.head + assert( + idleProgress.stateOperators.head.numRowsRemoved === 0, + s"numRowsRemoved should be 0 in idle trigger but got " + + s"${idleProgress.stateOperators.head.numRowsRemoved}") + assert( + idleProgress.stateOperators.head.numRowsUpdated === 0, + s"numRowsUpdated should be 0 in idle trigger") + assert( + idleProgress.stateOperators.head.numRowsDroppedByWatermark === 0, + s"numRowsDroppedByWatermark should be 0 in idle trigger") + true + }, + StopStream + ) + } + } +} From 8e8398e4cd8b24205421c44b838fb28b08b845ef Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 13 Apr 2026 12:34:04 +0900 Subject: [PATCH 2/8] reflect review comments --- .../org/apache/spark/sql/streaming/progress.scala | 2 +- .../execution/streaming/ProgressReporterSuite.scala | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 5fba2731cf061..0502936e3cc4e 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -68,7 +68,7 @@ class StateOperatorProgress private[spark] ( private[sql] def copy( newNumRowsUpdated: Long, newNumRowsDroppedByWatermark: Long, - newNumRowsRemoved: Long = numRowsRemoved): StateOperatorProgress = + newNumRowsRemoved: Long): StateOperatorProgress = new StateOperatorProgress( operatorName = operatorName, numRowsTotal = numRowsTotal, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala index 43f33f1c356dd..23d637f3e6e80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala @@ -60,13 +60,13 @@ class ProgressReporterSuite extends StreamTest { val evictionIdx = progresses.lastIndexWhere(_.stateOperators.head.numRowsRemoved > 0) assert(evictionIdx >= 0, "Expected eviction batch in progresses") - val idleProgresses = progresses.drop(evictionIdx + 1).filter(_.batchDuration <= 1) + val idleProgresses = + progresses.drop(evictionIdx + 1).filter(_.batchDuration == 0) assert( idleProgresses.nonEmpty, "Expected idle trigger progress after eviction, after eviction: " + progresses.drop(evictionIdx).map(p => - s"(bid=${p.batchId},dur=${p.batchDuration}," + - s"removed=${p.stateOperators.head.numRowsRemoved})").mkString(", ")) + s"(batchId=${p.batchId},duration=${p.batchDuration}").mkString(", ")) val idleProgress = idleProgresses.head assert( @@ -75,10 +75,12 @@ class ProgressReporterSuite extends StreamTest { s"${idleProgress.stateOperators.head.numRowsRemoved}") assert( idleProgress.stateOperators.head.numRowsUpdated === 0, - s"numRowsUpdated should be 0 in idle trigger") + s"numRowsUpdated should be 0 in idle trigger but got " + + s"${idleProgress.stateOperators.head.numRowsUpdated}") assert( idleProgress.stateOperators.head.numRowsDroppedByWatermark === 0, - s"numRowsDroppedByWatermark should be 0 in idle trigger") + s"numRowsDroppedByWatermark should be 0 in idle trigger but got " + + s"${idleProgress.stateOperators.head.numRowsDroppedByWatermark}") true }, StopStream From 6c3993d4fb192f78c1311a8e29b2c5288290358b Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 13 Apr 2026 12:50:01 +0900 Subject: [PATCH 3/8] fix compilation --- .../streaming/operators/stateful/statefulOperators.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala index 4736d7ef17089..76b395d225042 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala @@ -1304,7 +1304,8 @@ case class SessionWindowStateStoreSaveExec( stateOpProgress.copy( newNumRowsUpdated = stateOpProgress.numRowsUpdated, - newNumRowsDroppedByWatermark = numRowsDroppedByWatermark) + newNumRowsDroppedByWatermark = numRowsDroppedByWatermark, + newNumRowsRemoved = stateOpProgress.numRowsRemoved) } } From a64d10dd133c9ee1da15c0c0868fe88d50c68347 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 13 Apr 2026 13:24:51 +0900 Subject: [PATCH 4/8] scalastyle fix --- .../sql/execution/streaming/ProgressReporterSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala index 23d637f3e6e80..cc3332f790994 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala @@ -48,9 +48,8 @@ class ProgressReporterSuite extends StreamTest { AdvanceManualClock(1 * 1000), Execute("verify eviction") { q => val lastProgress = q.recentProgress.filter(_.stateOperators.nonEmpty).last - assert( - lastProgress.stateOperators.head.numRowsRemoved > 0, - s"Expected eviction but numRowsRemoved=${lastProgress.stateOperators.head.numRowsRemoved}") + val removed = lastProgress.stateOperators.head.numRowsRemoved + assert(removed > 0, s"Expected eviction but numRowsRemoved=$removed") }, // Manual clock advance schedules the next trigger; with no runnable batch the engine // reports progress via finishNoExecutionTrigger -> resetExecStatsForNoExecution. From ba43c7cebfa840c3d109ca805a71a8f72365d45c Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 13 Apr 2026 16:01:13 +0900 Subject: [PATCH 5/8] some more change --- .../streaming/ProgressReporterSuite.scala | 90 +++++++++++-------- 1 file changed, 52 insertions(+), 38 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala index cc3332f790994..ba39e0c260149 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.streaming +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.concurrent.PatienceConfiguration.Timeout + import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.functions.{count, timestamp_seconds, window} import org.apache.spark.sql.internal.SQLConf @@ -24,13 +27,10 @@ import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger} import org.apache.spark.sql.streaming.util.StreamManualClock class ProgressReporterSuite extends StreamTest { - import testImplicits._ - test("no-data batch resets numRowsRemoved to zero via resetExecStatsForNoExecution") { - // After a stateful operator evicts state rows via watermark, subsequent no-execution triggers - // must report numRowsRemoved=0. Exercises: finishNoExecutionTrigger -> - // resetExecStatsForNoExecution -> StateOperatorProgress.copy + test("no-data batch resets numRowsRemoved to zero" + + " via resetExecStatsForNoExecution") { val clock = new StreamManualClock val input = MemoryStream[Int] val agg = input.toDF() @@ -38,48 +38,62 @@ class ProgressReporterSuite extends StreamTest { .withWatermark("ts", "10 seconds") .groupBy(window($"ts", "10 seconds")) .agg(count("*") as "cnt") + .select($"window".getField("start").cast("long"), $"cnt") - withSQLConf(SQLConf.STREAMING_POLLING_DELAY.key -> "0") { + // noDataProgressEventInterval=0 ensures idle-trigger progress + // is always recorded regardless of clock gap. + withSQLConf( + SQLConf.STREAMING_POLLING_DELAY.key -> "0", + SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "0") { testStream(agg, outputMode = OutputMode.Update)( - StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), + StartStream( + Trigger.ProcessingTime("1 second"), + triggerClock = clock), + // Batch 0: [1,2,3] -> window [0s,10s) cnt=3. + // Watermark after: max(1,2,3)-10 < 0 -> stays 0. AddData(input, 1, 2, 3), AdvanceManualClock(1 * 1000), + CheckNewAnswer((0L, 3L)), + // Batch 1: [21] -> window [20s,30s) cnt=1. + // Watermark used: 0 (no eviction yet). + // Watermark after: 21-10 = 11. AddData(input, 21), AdvanceManualClock(1 * 1000), + CheckNewAnswer((20L, 1L)), + // Batch 2: no-data cleanup with watermark=11. + // Evicts window [0s,10s) (end 10 <= 11). + AdvanceManualClock(1 * 1000), + Execute("wait for cleanup batch") { q => + eventually(Timeout(streamingTimeout)) { + assert(q.lastProgress.batchId >= 2) + } + }, Execute("verify eviction") { q => - val lastProgress = q.recentProgress.filter(_.stateOperators.nonEmpty).last - val removed = lastProgress.stateOperators.head.numRowsRemoved - assert(removed > 0, s"Expected eviction but numRowsRemoved=$removed") + val removed = q.recentProgress + .filter(_.stateOperators.nonEmpty) + .exists(_.stateOperators.head.numRowsRemoved > 0) + assert(removed, "Expected numRowsRemoved > 0") }, - // Manual clock advance schedules the next trigger; with no runnable batch the engine - // reports progress via finishNoExecutionTrigger -> resetExecStatsForNoExecution. + // Idle trigger — finishNoExecutionTrigger calls + // resetExecStatsForNoExecution which must zero out + // per-batch metrics. AdvanceManualClock(1 * 1000), - AssertOnQuery("numRowsRemoved must be 0 in idle trigger after eviction") { q => - val progresses = q.recentProgress.filter(_.stateOperators.nonEmpty) - val evictionIdx = progresses.lastIndexWhere(_.stateOperators.head.numRowsRemoved > 0) - assert(evictionIdx >= 0, "Expected eviction batch in progresses") - - val idleProgresses = - progresses.drop(evictionIdx + 1).filter(_.batchDuration == 0) - assert( - idleProgresses.nonEmpty, - "Expected idle trigger progress after eviction, after eviction: " + - progresses.drop(evictionIdx).map(p => - s"(batchId=${p.batchId},duration=${p.batchDuration}").mkString(", ")) - - val idleProgress = idleProgresses.head - assert( - idleProgress.stateOperators.head.numRowsRemoved === 0, - s"numRowsRemoved should be 0 in idle trigger but got " + - s"${idleProgress.stateOperators.head.numRowsRemoved}") - assert( - idleProgress.stateOperators.head.numRowsUpdated === 0, - s"numRowsUpdated should be 0 in idle trigger but got " + - s"${idleProgress.stateOperators.head.numRowsUpdated}") - assert( - idleProgress.stateOperators.head.numRowsDroppedByWatermark === 0, - s"numRowsDroppedByWatermark should be 0 in idle trigger but got " + - s"${idleProgress.stateOperators.head.numRowsDroppedByWatermark}") + Execute("wait for idle trigger progress") { q => + eventually(Timeout(streamingTimeout)) { + val p = q.recentProgress.filter(_.stateOperators.nonEmpty) + val i = p.lastIndexWhere(_.stateOperators.head.numRowsRemoved > 0) + assert(i >= 0 && p.length > i + 1) + } + }, + AssertOnQuery("idle trigger must reset metrics") { q => + val p = q.recentProgress.filter(_.stateOperators.nonEmpty) + val i = p.lastIndexWhere(_.stateOperators.head.numRowsRemoved > 0) + assert(i >= 0, "no eviction batch found") + val so = p(i + 1).stateOperators.head + assert(so.numRowsRemoved === 0, s"numRowsRemoved=${so.numRowsRemoved}") + assert(so.numRowsUpdated === 0, s"numRowsUpdated=${so.numRowsUpdated}") + assert(so.numRowsDroppedByWatermark === 0, + s"numRowsDroppedByWatermark=${so.numRowsDroppedByWatermark}") true }, StopStream From 0566cd7fb13610bd2d5c17bfb095f1995f3e4883 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 13 Apr 2026 16:57:41 +0900 Subject: [PATCH 6/8] fix --- .../spark/sql/execution/streaming/ProgressReporterSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala index ba39e0c260149..a7e71cfd7227e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala @@ -85,7 +85,7 @@ class ProgressReporterSuite extends StreamTest { assert(i >= 0 && p.length > i + 1) } }, - AssertOnQuery("idle trigger must reset metrics") { q => + Execute("idle trigger must reset metrics") { q => val p = q.recentProgress.filter(_.stateOperators.nonEmpty) val i = p.lastIndexWhere(_.stateOperators.head.numRowsRemoved > 0) assert(i >= 0, "no eviction batch found") @@ -94,7 +94,6 @@ class ProgressReporterSuite extends StreamTest { assert(so.numRowsUpdated === 0, s"numRowsUpdated=${so.numRowsUpdated}") assert(so.numRowsDroppedByWatermark === 0, s"numRowsDroppedByWatermark=${so.numRowsDroppedByWatermark}") - true }, StopStream ) From 26331abb19261a7b14524469fa72812f091d896a Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 13 Apr 2026 17:11:38 +0900 Subject: [PATCH 7/8] probably better validation code --- .../streaming/ProgressReporterSuite.scala | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala index a7e71cfd7227e..904397df45700 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala @@ -78,23 +78,25 @@ class ProgressReporterSuite extends StreamTest { // resetExecStatsForNoExecution which must zero out // per-batch metrics. AdvanceManualClock(1 * 1000), - Execute("wait for idle trigger progress") { q => + Execute("idle trigger must reset per-batch metrics") { q => eventually(Timeout(streamingTimeout)) { - val p = q.recentProgress.filter(_.stateOperators.nonEmpty) - val i = p.lastIndexWhere(_.stateOperators.head.numRowsRemoved > 0) - assert(i >= 0 && p.length > i + 1) + val progress = q.recentProgress.filter(_.stateOperators.nonEmpty) + val lastEviction = progress.lastIndexWhere { p => + p.durationMs.containsKey("addBatch") && + p.stateOperators.head.numRowsRemoved > 0 + } + assert(lastEviction >= 0, "no eviction batch found") + val idleIdx = progress.indexWhere( + !_.durationMs.containsKey("addBatch"), lastEviction + 1) + assert(idleIdx > lastEviction, + "no idle trigger found after eviction batch") + val so = progress(idleIdx).stateOperators.head + assert(so.numRowsRemoved === 0, s"numRowsRemoved=${so.numRowsRemoved}") + assert(so.numRowsUpdated === 0, s"numRowsUpdated=${so.numRowsUpdated}") + assert(so.numRowsDroppedByWatermark === 0, + s"numRowsDroppedByWatermark=${so.numRowsDroppedByWatermark}") } }, - Execute("idle trigger must reset metrics") { q => - val p = q.recentProgress.filter(_.stateOperators.nonEmpty) - val i = p.lastIndexWhere(_.stateOperators.head.numRowsRemoved > 0) - assert(i >= 0, "no eviction batch found") - val so = p(i + 1).stateOperators.head - assert(so.numRowsRemoved === 0, s"numRowsRemoved=${so.numRowsRemoved}") - assert(so.numRowsUpdated === 0, s"numRowsUpdated=${so.numRowsUpdated}") - assert(so.numRowsDroppedByWatermark === 0, - s"numRowsDroppedByWatermark=${so.numRowsDroppedByWatermark}") - }, StopStream ) } From 9f8d81dfd98e7d435f79d38f3da44fdc81142827 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 14 Apr 2026 10:52:06 +0900 Subject: [PATCH 8/8] fix style --- .../spark/sql/execution/streaming/ProgressReporterSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala index 904397df45700..da037936849e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.streaming -import org.scalatest.concurrent.Eventually.eventually import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.apache.spark.sql.execution.streaming.runtime.MemoryStream