From cae8b5d033210dd5cd477d7c31b22861f9998820 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Sat, 27 Jun 2026 08:43:27 +0200 Subject: [PATCH] Flink: Fix monitor source rate limit for sub-second intervals The monitor source rate was computed as perSecond(1.0 / rateLimit.getSeconds()). Duration.getSeconds() rounds any sub-second rate limit down to 0, so the rate became perSecond(Infinity), which turns off rate limiting. This change computes the rate from millis, so sub-second intervals are limited correctly. The 60s default is unchanged. This made the maintenance E2E tests flaky, since they use sub-second rate limits. With no rate limit the source calls table.refresh() in a loop and uses a full CPU core per job. CI runs the tests with -DtestParallelism=auto, which starts one MiniCluster per fork, so these busy sources use up all the cores. The converter runs on a timer, and under that load the timer fires too slowly, letting the test fail after the timeout hits. --- .../flink/maintenance/api/TableMaintenance.java | 15 ++++++++++++++- .../maintenance/api/TestTableMaintenance.java | 8 ++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java index 025a6d17c023..46b38858b8aa 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java @@ -26,6 +26,7 @@ import java.util.UUID; import javax.annotation.Nullable; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; @@ -378,7 +379,9 @@ private DataStream changeStream(String tableName, TableLoader loade // Create a monitor source to provide the TableChange stream MonitorSource source = new MonitorSource( - loader, RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), maxReadBack); + loader, + RateLimiterStrategy.perSecond(monitorRatePerSecond(rateLimit.toMillis())), + maxReadBack); return setSlotSharingGroup( env.fromSource( source, @@ -395,6 +398,16 @@ private static String nameFor(MaintenanceTaskBuilder streamBuilder, int taskI return String.format(Locale.ROOT, "%s [%d]", streamBuilder.maintenanceTaskName(), taskIndex); } + /** + * Monitor poll rate per rate-limit interval, in checks/second. We compute from millis instead + * of seconds, otherwise sub-second intervals could be truncated to 0, yielding an infinite rate + * which busy-loops the source. + */ + @VisibleForTesting + static double monitorRatePerSecond(long rateLimitMillis) { + return 1000.0 / rateLimitMillis; + } + private SingleOutputStreamOperator setSlotSharingGroup( SingleOutputStreamOperator operator) { return slotSharingGroup == null ? operator : operator.slotSharingGroup(slotSharingGroup); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java index 49219d5b4698..96b2fe04d66e 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java @@ -350,6 +350,14 @@ void testUidAndSlotSharingGroupForMonitorSource() throws IOException { checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); } + @Test + void testMonitorRatePerSecond() { + // Sub-second rate limits must not yield infinite (unthrottled) monitor rates. + assertThat(TableMaintenance.Builder.monitorRatePerSecond(50)).isEqualTo(20.0); + assertThat(TableMaintenance.Builder.monitorRatePerSecond(100)).isEqualTo(10.0); + assertThat(TableMaintenance.Builder.monitorRatePerSecond(60_000)).isEqualTo(1.0 / 60); + } + /** * Sends the events though the {@link ManualSource} provided, and waits until the given number of * records are processed.