From 3b72455a1b1dbe08d0dc758be0d8ef33c6c83fd3 Mon Sep 17 00:00:00 2001 From: Prajwal Banakar Date: Tue, 14 Apr 2026 04:10:39 +0000 Subject: [PATCH] [client] Add validation for client.scanner.log.max-poll-records and client.connect-timeout --- .../apache/fluss/client/FlussConnection.java | 2 ++ .../apache/fluss/config/FlussConfigUtils.java | 18 +++++++++++++++ .../fluss/config/FlussConfigUtilsTest.java | 23 +++++++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java index d98e45869e..836bafe912 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java @@ -32,6 +32,7 @@ import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.FlussConfigUtils; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.fs.FileSystem; import org.apache.fluss.metadata.TablePath; @@ -76,6 +77,7 @@ public final class FlussConnection implements Connection { Configuration.fromMap( extractPrefix(new HashMap<>(conf.toMap()), CLIENT_PREFIX + "fs.")), null); + FlussConfigUtils.validateClientConfigs(conf); // for client metrics. setupClientMetricsConfiguration(); String clientId = conf.getString(ConfigOptions.CLIENT_ID); diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index adbbce0af4..b3544bf440 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -225,4 +225,22 @@ private static void validMinValue(ConfigOption option, int value, int m option.key(), minValue)); } } + + public static void validateClientConfigs(Configuration conf) { + int maxPollRecords = conf.getInt(ConfigOptions.CLIENT_SCANNER_LOG_MAX_POLL_RECORDS); + if (maxPollRecords <= 0) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s, it must be greater than 0.", + ConfigOptions.CLIENT_SCANNER_LOG_MAX_POLL_RECORDS.key())); + } + + long connectTimeoutMs = conf.get(ConfigOptions.CLIENT_CONNECT_TIMEOUT).toMillis(); + if (connectTimeoutMs <= 0) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s, it must be greater than 0.", + ConfigOptions.CLIENT_CONNECT_TIMEOUT.key())); + } + } } diff --git a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java index e2690c54ba..b9a5ea6063 100644 --- a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java @@ -175,4 +175,27 @@ void testValidateTabletConfigs() { .hasMessageContaining(ConfigOptions.TABLET_SERVER_ID.key()) .hasMessageContaining("it must be greater than or equal 0"); } + + @Test + void testValidateClientConfigs() { + // valid defaults should pass + Configuration validConf = new Configuration(); + FlussConfigUtils.validateClientConfigs(validConf); + + // max-poll-records = 0 should fail + Configuration zeroPollConf = new Configuration(); + zeroPollConf.set(ConfigOptions.CLIENT_SCANNER_LOG_MAX_POLL_RECORDS, 0); + assertThatThrownBy(() -> FlussConfigUtils.validateClientConfigs(zeroPollConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.CLIENT_SCANNER_LOG_MAX_POLL_RECORDS.key()) + .hasMessageContaining("must be greater than 0"); + + // connect-timeout = 0 should fail + Configuration zeroTimeoutConf = new Configuration(); + zeroTimeoutConf.set(ConfigOptions.CLIENT_CONNECT_TIMEOUT, java.time.Duration.ZERO); + assertThatThrownBy(() -> FlussConfigUtils.validateClientConfigs(zeroTimeoutConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.CLIENT_CONNECT_TIMEOUT.key()) + .hasMessageContaining("must be greater than 0"); + } }