From 788a31aa7632d6f60edb9a90cf84549b509dc942 Mon Sep 17 00:00:00 2001 From: Dennis-Mircea Ciupitu Date: Mon, 6 Apr 2026 17:18:03 +0300 Subject: [PATCH 1/3] [FLINK-39404][runtime] HardwareDescription reports incorrect CPU cores in containerized environments with fractional CPU limits --- .../entrypoint/ClusterEntrypointUtils.java | 2 +- .../runtime/instance/HardwareDescription.java | 12 +- .../apache/flink/runtime/util/Hardware.java | 170 ++++++++++++++++++ 3 files changed, 177 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointUtils.java index bb1b1cab1ff67..3284144cf0c61 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointUtils.java @@ -115,7 +115,7 @@ public static int getPoolSize(Configuration config) { final int poolSize = config.get( ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE, - 4 * Hardware.getNumberCPUCores()); + (int) Math.ceil(4 * Hardware.getNumberCPUCoresAsDouble())); Preconditions.checkArgument( poolSize > 0, "Illegal pool size (%s) of io-executor, please re-configure '%s'.", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java index ab6ae629ef734..599efe5bfa142 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java @@ -41,7 +41,7 @@ public final class HardwareDescription implements Serializable { /** The number of CPU cores available to the JVM on the compute node. */ @JsonProperty(FIELD_NAME_CPU_CORES) - private final int numberOfCPUCores; + private final double numberOfCPUCores; /** The size of physical memory in bytes available on the compute node. */ @JsonProperty(FIELD_NAME_SIZE_PHYSICAL_MEMORY) @@ -67,7 +67,7 @@ public final class HardwareDescription implements Serializable { */ @JsonCreator public HardwareDescription( - @JsonProperty(FIELD_NAME_CPU_CORES) int numberOfCPUCores, + @JsonProperty(FIELD_NAME_CPU_CORES) double numberOfCPUCores, @JsonProperty(FIELD_NAME_SIZE_PHYSICAL_MEMORY) long sizeOfPhysicalMemory, @JsonProperty(FIELD_NAME_SIZE_JVM_HEAP) long sizeOfJvmHeap, @JsonProperty(FIELD_NAME_SIZE_MANAGED_MEMORY) long sizeOfManagedMemory) { @@ -82,7 +82,7 @@ public HardwareDescription( * * @return the number of CPU cores available to the JVM on the compute node */ - public int getNumberOfCPUCores() { + public double getNumberOfCPUCores() { return this.numberOfCPUCores; } @@ -126,7 +126,7 @@ public boolean equals(Object o) { return false; } HardwareDescription that = (HardwareDescription) o; - return numberOfCPUCores == that.numberOfCPUCores + return Double.compare(numberOfCPUCores, that.numberOfCPUCores) == 0 && sizeOfPhysicalMemory == that.sizeOfPhysicalMemory && sizeOfJvmHeap == that.sizeOfJvmHeap && sizeOfManagedMemory == that.sizeOfManagedMemory; @@ -141,7 +141,7 @@ public int hashCode() { @Override public String toString() { return String.format( - "cores=%d, physMem=%d, heap=%d, managed=%d", + "cores=%s, physMem=%d, heap=%d, managed=%d", numberOfCPUCores, sizeOfPhysicalMemory, sizeOfJvmHeap, sizeOfManagedMemory); } @@ -150,7 +150,7 @@ public String toString() { // -------------------------------------------------------------------------------------------- public static HardwareDescription extractFromSystem(long managedMemory) { - final int numberOfCPUCores = Hardware.getNumberCPUCores(); + final double numberOfCPUCores = Hardware.getNumberCPUCoresAsDouble(); final long sizeOfJvmHeap = Runtime.getRuntime().maxMemory(); final long sizeOfPhysicalMemory = Hardware.getSizeOfPhysicalMemory(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/Hardware.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/Hardware.java index 28e7a4614ad08..4baf55795ee41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/Hardware.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/Hardware.java @@ -32,6 +32,9 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -45,6 +48,11 @@ public class Hardware { private static final Pattern LINUX_MEMORY_REGEX = Pattern.compile("^MemTotal:\\s*(\\d+)\\s+kB$"); + private static final String CGROUP_V2_CPU_MAX_PATH = "/sys/fs/cgroup/cpu.max"; + + private static final String CGROUP_V1_CPU_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; + private static final String CGROUP_V1_CPU_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; + // ------------------------------------------------------------------------ /** @@ -56,6 +64,168 @@ public static int getNumberCPUCores() { return Runtime.getRuntime().availableProcessors(); } + /** + * Gets the number of CPU cores available to the JVM as a fractional value. + * + *

On Linux, this method first attempts to detect a container CPU limit via cgroup files (v2, + * then v1). If a limit is found, it is returned as-is (e.g. 0.5, 1.5). If no container limit is + * detected, it falls back to {@link Runtime#availableProcessors()}. + * + *

Use this method when the fractional value matters, for example when displaying the CPU + * count in the Web UI or when performing arithmetic before rounding (e.g. {@code 4 * cores}). + * For call sites that need an integer (e.g. thread pool sizes), use {@link + * #getNumberCPUCores()} instead. + * + * @return The number of CPU cores as a double. + */ + public static double getNumberCPUCoresAsDouble() { + double containerLimit = getContainerCpuLimit(); + if (containerLimit > 0) { + LOG.debug("Using container CPU limit for core count: limit={}", containerLimit); + return containerLimit; + } + return Runtime.getRuntime().availableProcessors(); + } + + /** + * Returns the CPU limit of the container as a fractional double by reading Linux cgroup CPU + * quota and period values. + * + *

This method attempts to read the CPU limit from cgroup v2 first ({@code + * /sys/fs/cgroup/cpu.max}), then falls back to cgroup v1 ({@code + * /sys/fs/cgroup/cpu/cpu.cfs_quota_us} and {@code cpu.cfs_period_us}). + * + *

Examples of return values: + * + *

+ * + * @return the container CPU limit as a fractional double, or {@code -1} if no limit is detected + */ + public static double getContainerCpuLimit() { + // Try cgroup v2 first + double limit = getCpuLimitFromCgroupV2(); + if (limit > 0) { + return limit; + } + + // Fall back to cgroup v1 + limit = getCpuLimitFromCgroupV1(); + if (limit > 0) { + return limit; + } + + LOG.debug( + "Could not detect container CPU limit from cgroup files. " + + "This is expected when not running inside a container or when no CPU limit is set."); + return -1; + } + + /** + * Reads CPU limit from cgroup v2. + * + *

The file {@code /sys/fs/cgroup/cpu.max} contains two space-separated values: {@code quota + * period}. For example, {@code "50000 100000"} means a limit of 0.5 CPU cores. The string + * {@code "max"} as the quota means no limit is set. + * + * @return the CPU limit as a double, or {@code -1} if unavailable or unlimited + */ + private static double getCpuLimitFromCgroupV2() { + try { + Path path = Paths.get(CGROUP_V2_CPU_MAX_PATH); + if (!Files.exists(path)) { + return -1; + } + + String content = Files.readString(path).trim(); + String[] parts = content.split("\\s+"); + if (parts.length != 2) { + LOG.debug( + "Unexpected format in {}: '{}'. Expected 'quota period'.", + CGROUP_V2_CPU_MAX_PATH, + content); + return -1; + } + + // "max" means no CPU limit is set + if ("max".equals(parts[0])) { + LOG.debug("No CPU limit set (cgroup v2 quota is 'max')."); + return -1; + } + + long quota = Long.parseLong(parts[0]); + long period = Long.parseLong(parts[1]); + if (quota > 0 && period > 0) { + double cpuLimit = (double) quota / period; + LOG.debug( + "Detected cgroup v2 CPU limit: quota={}, period={}, limit={}", + quota, + period, + cpuLimit); + return cpuLimit; + } + } catch (NumberFormatException e) { + LOG.debug("Failed to parse cgroup v2 CPU limit values.", e); + } catch (IOException e) { + LOG.debug("Could not read cgroup v2 CPU limit file: {}", CGROUP_V2_CPU_MAX_PATH, e); + } catch (Throwable t) { + LOG.debug("Unexpected error reading cgroup v2 CPU limit.", t); + } + return -1; + } + + /** + * Reads CPU limit from cgroup v1. + * + *

The quota is read from {@code /sys/fs/cgroup/cpu/cpu.cfs_quota_us} and the period from + * {@code /sys/fs/cgroup/cpu/cpu.cfs_period_us}. Both values are in microseconds. A quota of + * {@code -1} means no limit is set. The CPU limit is computed as {@code quota / period}. + * + * @return the CPU limit as a double, or {@code -1} if unavailable or unlimited + */ + private static double getCpuLimitFromCgroupV1() { + try { + Path quotaPath = Paths.get(CGROUP_V1_CPU_QUOTA_PATH); + Path periodPath = Paths.get(CGROUP_V1_CPU_PERIOD_PATH); + if (!Files.exists(quotaPath) || !Files.exists(periodPath)) { + return -1; + } + + long quota = Long.parseLong(Files.readString(quotaPath).trim()); + long period = Long.parseLong(Files.readString(periodPath).trim()); + + // quota == -1 means no CPU limit is set in cgroup v1 + if (quota <= 0) { + LOG.debug("No CPU limit set (cgroup v1 quota={}).", quota); + return -1; + } + + if (period <= 0) { + LOG.debug("Invalid cgroup v1 CPU period: {}", period); + return -1; + } + + double cpuLimit = (double) quota / period; + LOG.debug( + "Detected cgroup v1 CPU limit: quota={}, period={}, limit={}", + quota, + period, + cpuLimit); + return cpuLimit; + } catch (NumberFormatException e) { + LOG.debug("Failed to parse cgroup v1 CPU limit values.", e); + } catch (IOException e) { + LOG.debug("Could not read cgroup v1 CPU limit files.", e); + } catch (Throwable t) { + LOG.debug("Unexpected error reading cgroup v1 CPU limit.", t); + } + return -1; + } + /** * Returns the size of the physical memory in bytes. * From a647772c57ea9d331db3e14bef50d89d123b5718 Mon Sep 17 00:00:00 2001 From: Dennis-Mircea Ciupitu Date: Mon, 6 Apr 2026 22:39:30 +0300 Subject: [PATCH 2/3] [FLINK-39404][runtime] Update the rest_v1_dispatcher.html & rest_api_v1.snapshot --- docs/layouts/shortcodes/generated/rest_v1_dispatcher.html | 4 ++-- flink-runtime-web/src/test/resources/rest_api_v1.snapshot | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index 07ebf893e0756..9e4533c8d3b4f 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -5851,7 +5851,7 @@ "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription", "properties" : { "cpuCores" : { - "type" : "integer" + "type" : "number" }, "freeMemory" : { "type" : "integer" @@ -6078,7 +6078,7 @@ "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription", "properties" : { "cpuCores" : { - "type" : "integer" + "type" : "number" }, "freeMemory" : { "type" : "integer" diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index e065545a0693c..4a7e9d75bf1b8 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -4363,7 +4363,7 @@ "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription", "properties" : { "cpuCores" : { - "type" : "integer" + "type" : "number" }, "physicalMemory" : { "type" : "integer" @@ -4528,7 +4528,7 @@ "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription", "properties" : { "cpuCores" : { - "type" : "integer" + "type" : "number" }, "physicalMemory" : { "type" : "integer" From e6f2343fb17862a43e5eea298f1fdf80b3cb3153 Mon Sep 17 00:00:00 2001 From: Dennis-Mircea Ciupitu Date: Tue, 7 Apr 2026 08:47:00 +0300 Subject: [PATCH 3/3] [FLINK-39404][runtime] Update the rest_v1_dispatcher.yml --- docs/static/generated/rest_v1_dispatcher.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index 16df36dc7285a..2734e3622c89a 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -2346,8 +2346,8 @@ components: type: object properties: cpuCores: - type: integer - format: int32 + type: number + format: double freeMemory: type: integer format: int64