From 8267d85404d01df5d2ea319f15d8301764d22ada Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 26 Jun 2026 13:39:45 -0700 Subject: [PATCH] Spark: Stop streaming queries before dropping table in streaming read test teardown TestStructuredStreamingRead3 sets STREAMING_SNAPSHOT_POLLING_INTERVAL_MS=1 for the async parameter, so AsyncSparkMicroBatchPlanner's background thread refreshes the table from the catalog ~1000x/second. The class has two @AfterEach methods, stopStreams() and removeTables(), whose relative order is not guaranteed. When DROP TABLE runs while the planner thread is still alive, the flood of catalog refreshes contends with the drop and stalls teardown for ~20s per async test execution. Stop active streams before dropping the table in removeTables() so the background refresh thread is gone before the drop. The streaming reads themselves were never the bottleneck (each completes in <1s). Full-class TestStructuredStreamingRead3 on spark v3.5 drops from ~305s to ~188s (66 tests, still green). Applied to v3.5, v4.0 and v4.1. --- .../iceberg/spark/source/TestStructuredStreamingRead3.java | 6 +++++- .../iceberg/spark/source/TestStructuredStreamingRead3.java | 6 +++++- .../iceberg/spark/source/TestStructuredStreamingRead3.java | 6 +++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 4efb883b5d6c..609de8afec21 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -175,7 +175,11 @@ public void stopStreams() throws TimeoutException { } @AfterEach - public void removeTables() { + public void removeTables() throws TimeoutException { + // Stop active streams before dropping the table. In async mode the planner's background + // thread keeps refreshing the table from the catalog; dropping the table while that thread + // is alive lets those refreshes contend with DROP TABLE and stall teardown for seconds. + stopStreams(); sql("DROP TABLE IF EXISTS %s", tableName); } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 06189b304299..1f2a87f17218 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -175,7 +175,11 @@ public void stopStreams() throws TimeoutException { } @AfterEach - public void removeTables() { + public void removeTables() throws TimeoutException { + // Stop active streams before dropping the table. In async mode the planner's background + // thread keeps refreshing the table from the catalog; dropping the table while that thread + // is alive lets those refreshes contend with DROP TABLE and stall teardown for seconds. + stopStreams(); sql("DROP TABLE IF EXISTS %s", tableName); } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 89947f73ea38..00238cfcc341 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -176,7 +176,11 @@ public void stopStreams() throws TimeoutException { } @AfterEach - public void removeTables() { + public void removeTables() throws TimeoutException { + // Stop active streams before dropping the table. In async mode the planner's background + // thread keeps refreshing the table from the catalog; dropping the table while that thread + // is alive lets those refreshes contend with DROP TABLE and stall teardown for seconds. + stopStreams(); sql("DROP TABLE IF EXISTS %s", tableName); }