diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java index 025a6d17c023..46b38858b8aa 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java @@ -26,6 +26,7 @@ import java.util.UUID; import javax.annotation.Nullable; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; @@ -378,7 +379,9 @@ private DataStream changeStream(String tableName, TableLoader loade // Create a monitor source to provide the TableChange stream MonitorSource source = new MonitorSource( - loader, RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), maxReadBack); + loader, + RateLimiterStrategy.perSecond(monitorRatePerSecond(rateLimit.toMillis())), + maxReadBack); return setSlotSharingGroup( env.fromSource( source, @@ -395,6 +398,16 @@ private static String nameFor(MaintenanceTaskBuilder streamBuilder, int taskI return String.format(Locale.ROOT, "%s [%d]", streamBuilder.maintenanceTaskName(), taskIndex); } + /** + * Monitor poll rate per rate-limit interval, in checks/second. We compute from millis instead + * of seconds, otherwise sub-second intervals could be truncated to 0, yielding an infinite rate + * which busy-loops the source. + */ + @VisibleForTesting + static double monitorRatePerSecond(long rateLimitMillis) { + return 1000.0 / rateLimitMillis; + } + private SingleOutputStreamOperator setSlotSharingGroup( SingleOutputStreamOperator operator) { return slotSharingGroup == null ? operator : operator.slotSharingGroup(slotSharingGroup); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java index 49219d5b4698..96b2fe04d66e 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java @@ -350,6 +350,14 @@ void testUidAndSlotSharingGroupForMonitorSource() throws IOException { checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); } + @Test + void testMonitorRatePerSecond() { + // Sub-second rate limits must not yield infinite (unthrottled) monitor rates. + assertThat(TableMaintenance.Builder.monitorRatePerSecond(50)).isEqualTo(20.0); + assertThat(TableMaintenance.Builder.monitorRatePerSecond(100)).isEqualTo(10.0); + assertThat(TableMaintenance.Builder.monitorRatePerSecond(60_000)).isEqualTo(1.0 / 60); + } + /** * Sends the events though the {@link ManualSource} provided, and waits until the given number of * records are processed. diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java index 025a6d17c023..46b38858b8aa 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java @@ -26,6 +26,7 @@ import java.util.UUID; import javax.annotation.Nullable; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; @@ -378,7 +379,9 @@ private DataStream changeStream(String tableName, TableLoader loade // Create a monitor source to provide the TableChange stream MonitorSource source = new MonitorSource( - loader, RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), maxReadBack); + loader, + RateLimiterStrategy.perSecond(monitorRatePerSecond(rateLimit.toMillis())), + maxReadBack); return setSlotSharingGroup( env.fromSource( source, @@ -395,6 +398,16 @@ private static String nameFor(MaintenanceTaskBuilder streamBuilder, int taskI return String.format(Locale.ROOT, "%s [%d]", streamBuilder.maintenanceTaskName(), taskIndex); } + /** + * Monitor poll rate per rate-limit interval, in checks/second. We compute from millis instead + * of seconds, otherwise sub-second intervals could be truncated to 0, yielding an infinite rate + * which busy-loops the source. + */ + @VisibleForTesting + static double monitorRatePerSecond(long rateLimitMillis) { + return 1000.0 / rateLimitMillis; + } + private SingleOutputStreamOperator setSlotSharingGroup( SingleOutputStreamOperator operator) { return slotSharingGroup == null ? operator : operator.slotSharingGroup(slotSharingGroup); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java index 49219d5b4698..96b2fe04d66e 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java @@ -350,6 +350,14 @@ void testUidAndSlotSharingGroupForMonitorSource() throws IOException { checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); } + @Test + void testMonitorRatePerSecond() { + // Sub-second rate limits must not yield infinite (unthrottled) monitor rates. + assertThat(TableMaintenance.Builder.monitorRatePerSecond(50)).isEqualTo(20.0); + assertThat(TableMaintenance.Builder.monitorRatePerSecond(100)).isEqualTo(10.0); + assertThat(TableMaintenance.Builder.monitorRatePerSecond(60_000)).isEqualTo(1.0 / 60); + } + /** * Sends the events though the {@link ManualSource} provided, and waits until the given number of * records are processed.