Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -5851,7 +5851,7 @@
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
"properties" : {
"cpuCores" : {
"type" : "integer"
"type" : "number"
},
"freeMemory" : {
"type" : "integer"
Expand Down Expand Up @@ -6078,7 +6078,7 @@
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
"properties" : {
"cpuCores" : {
"type" : "integer"
"type" : "number"
},
"freeMemory" : {
"type" : "integer"
Expand Down
4 changes: 2 additions & 2 deletions docs/static/generated/rest_v1_dispatcher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2346,8 +2346,8 @@ components:
type: object
properties:
cpuCores:
type: integer
format: int32
type: number
format: double
freeMemory:
type: integer
format: int64
Expand Down
4 changes: 2 additions & 2 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -4363,7 +4363,7 @@
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
"properties" : {
"cpuCores" : {
"type" : "integer"
"type" : "number"
},
"physicalMemory" : {
"type" : "integer"
Expand Down Expand Up @@ -4528,7 +4528,7 @@
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
"properties" : {
"cpuCores" : {
"type" : "integer"
"type" : "number"
},
"physicalMemory" : {
"type" : "integer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static int getPoolSize(Configuration config) {
final int poolSize =
config.get(
ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE,
4 * Hardware.getNumberCPUCores());
(int) Math.ceil(4 * Hardware.getNumberCPUCoresAsDouble()));
Preconditions.checkArgument(
poolSize > 0,
"Illegal pool size (%s) of io-executor, please re-configure '%s'.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public final class HardwareDescription implements Serializable {

/** The number of CPU cores available to the JVM on the compute node. */
@JsonProperty(FIELD_NAME_CPU_CORES)
private final int numberOfCPUCores;
private final double numberOfCPUCores;

/** The size of physical memory in bytes available on the compute node. */
@JsonProperty(FIELD_NAME_SIZE_PHYSICAL_MEMORY)
Expand All @@ -67,7 +67,7 @@ public final class HardwareDescription implements Serializable {
*/
@JsonCreator
public HardwareDescription(
@JsonProperty(FIELD_NAME_CPU_CORES) int numberOfCPUCores,
@JsonProperty(FIELD_NAME_CPU_CORES) double numberOfCPUCores,
@JsonProperty(FIELD_NAME_SIZE_PHYSICAL_MEMORY) long sizeOfPhysicalMemory,
@JsonProperty(FIELD_NAME_SIZE_JVM_HEAP) long sizeOfJvmHeap,
@JsonProperty(FIELD_NAME_SIZE_MANAGED_MEMORY) long sizeOfManagedMemory) {
Expand All @@ -82,7 +82,7 @@ public HardwareDescription(
*
* @return the number of CPU cores available to the JVM on the compute node
*/
public int getNumberOfCPUCores() {
public double getNumberOfCPUCores() {
return this.numberOfCPUCores;
}

Expand Down Expand Up @@ -126,7 +126,7 @@ public boolean equals(Object o) {
return false;
}
HardwareDescription that = (HardwareDescription) o;
return numberOfCPUCores == that.numberOfCPUCores
return Double.compare(numberOfCPUCores, that.numberOfCPUCores) == 0
&& sizeOfPhysicalMemory == that.sizeOfPhysicalMemory
&& sizeOfJvmHeap == that.sizeOfJvmHeap
&& sizeOfManagedMemory == that.sizeOfManagedMemory;
Expand All @@ -141,7 +141,7 @@ public int hashCode() {
@Override
public String toString() {
return String.format(
"cores=%d, physMem=%d, heap=%d, managed=%d",
"cores=%s, physMem=%d, heap=%d, managed=%d",
numberOfCPUCores, sizeOfPhysicalMemory, sizeOfJvmHeap, sizeOfManagedMemory);
}

Expand All @@ -150,7 +150,7 @@ public String toString() {
// --------------------------------------------------------------------------------------------

public static HardwareDescription extractFromSystem(long managedMemory) {
final int numberOfCPUCores = Hardware.getNumberCPUCores();
final double numberOfCPUCores = Hardware.getNumberCPUCoresAsDouble();
final long sizeOfJvmHeap = Runtime.getRuntime().maxMemory();
final long sizeOfPhysicalMemory = Hardware.getSizeOfPhysicalMemory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -45,6 +48,11 @@ public class Hardware {
private static final Pattern LINUX_MEMORY_REGEX =
Pattern.compile("^MemTotal:\\s*(\\d+)\\s+kB$");

private static final String CGROUP_V2_CPU_MAX_PATH = "/sys/fs/cgroup/cpu.max";

private static final String CGROUP_V1_CPU_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
private static final String CGROUP_V1_CPU_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us";

// ------------------------------------------------------------------------

/**
Expand All @@ -56,6 +64,168 @@ public static int getNumberCPUCores() {
return Runtime.getRuntime().availableProcessors();
}

/**
* Gets the number of CPU cores available to the JVM as a fractional value.
*
* <p>On Linux, this method first attempts to detect a container CPU limit via cgroup files (v2,
* then v1). If a limit is found, it is returned as-is (e.g. 0.5, 1.5). If no container limit is
* detected, it falls back to {@link Runtime#availableProcessors()}.
*
* <p>Use this method when the fractional value matters, for example when displaying the CPU
* count in the Web UI or when performing arithmetic before rounding (e.g. {@code 4 * cores}).
* For call sites that need an integer (e.g. thread pool sizes), use {@link
* #getNumberCPUCores()} instead.
*
* @return The number of CPU cores as a double.
*/
public static double getNumberCPUCoresAsDouble() {
double containerLimit = getContainerCpuLimit();
if (containerLimit > 0) {
LOG.debug("Using container CPU limit for core count: limit={}", containerLimit);
return containerLimit;
}
return Runtime.getRuntime().availableProcessors();
}

/**
* Returns the CPU limit of the container as a fractional double by reading Linux cgroup CPU
* quota and period values.
*
* <p>This method attempts to read the CPU limit from cgroup v2 first ({@code
* /sys/fs/cgroup/cpu.max}), then falls back to cgroup v1 ({@code
* /sys/fs/cgroup/cpu/cpu.cfs_quota_us} and {@code cpu.cfs_period_us}).
*
* <p>Examples of return values:
*
* <ul>
* <li>{@code 0.5} - container limited to half a CPU core
* <li>{@code 2.0} - container limited to 2 CPU cores
* <li>{@code -1} - not running in a container, no CPU limit set, or unable to read cgroup
* files (e.g. non-Linux OS)
* </ul>
*
* @return the container CPU limit as a fractional double, or {@code -1} if no limit is detected
*/
public static double getContainerCpuLimit() {
// Try cgroup v2 first
double limit = getCpuLimitFromCgroupV2();
if (limit > 0) {
return limit;
}

// Fall back to cgroup v1
limit = getCpuLimitFromCgroupV1();
if (limit > 0) {
return limit;
}

LOG.debug(
"Could not detect container CPU limit from cgroup files. "
+ "This is expected when not running inside a container or when no CPU limit is set.");
return -1;
}

/**
* Reads CPU limit from cgroup v2.
*
* <p>The file {@code /sys/fs/cgroup/cpu.max} contains two space-separated values: {@code quota
* period}. For example, {@code "50000 100000"} means a limit of 0.5 CPU cores. The string
* {@code "max"} as the quota means no limit is set.
*
* @return the CPU limit as a double, or {@code -1} if unavailable or unlimited
*/
private static double getCpuLimitFromCgroupV2() {
try {
Path path = Paths.get(CGROUP_V2_CPU_MAX_PATH);
if (!Files.exists(path)) {
return -1;
}

String content = Files.readString(path).trim();
String[] parts = content.split("\\s+");
if (parts.length != 2) {
LOG.debug(
"Unexpected format in {}: '{}'. Expected 'quota period'.",
CGROUP_V2_CPU_MAX_PATH,
content);
return -1;
}

// "max" means no CPU limit is set
if ("max".equals(parts[0])) {
LOG.debug("No CPU limit set (cgroup v2 quota is 'max').");
return -1;
}

long quota = Long.parseLong(parts[0]);
long period = Long.parseLong(parts[1]);
if (quota > 0 && period > 0) {
double cpuLimit = (double) quota / period;
LOG.debug(
"Detected cgroup v2 CPU limit: quota={}, period={}, limit={}",
quota,
period,
cpuLimit);
return cpuLimit;
}
} catch (NumberFormatException e) {
LOG.debug("Failed to parse cgroup v2 CPU limit values.", e);
} catch (IOException e) {
LOG.debug("Could not read cgroup v2 CPU limit file: {}", CGROUP_V2_CPU_MAX_PATH, e);
} catch (Throwable t) {
LOG.debug("Unexpected error reading cgroup v2 CPU limit.", t);
}
return -1;
}

/**
* Reads CPU limit from cgroup v1.
*
* <p>The quota is read from {@code /sys/fs/cgroup/cpu/cpu.cfs_quota_us} and the period from
* {@code /sys/fs/cgroup/cpu/cpu.cfs_period_us}. Both values are in microseconds. A quota of
* {@code -1} means no limit is set. The CPU limit is computed as {@code quota / period}.
*
* @return the CPU limit as a double, or {@code -1} if unavailable or unlimited
*/
private static double getCpuLimitFromCgroupV1() {
try {
Path quotaPath = Paths.get(CGROUP_V1_CPU_QUOTA_PATH);
Path periodPath = Paths.get(CGROUP_V1_CPU_PERIOD_PATH);
if (!Files.exists(quotaPath) || !Files.exists(periodPath)) {
return -1;
}

long quota = Long.parseLong(Files.readString(quotaPath).trim());
long period = Long.parseLong(Files.readString(periodPath).trim());

// quota == -1 means no CPU limit is set in cgroup v1
if (quota <= 0) {
LOG.debug("No CPU limit set (cgroup v1 quota={}).", quota);
return -1;
}

if (period <= 0) {
LOG.debug("Invalid cgroup v1 CPU period: {}", period);
return -1;
}

double cpuLimit = (double) quota / period;
LOG.debug(
"Detected cgroup v1 CPU limit: quota={}, period={}, limit={}",
quota,
period,
cpuLimit);
return cpuLimit;
} catch (NumberFormatException e) {
LOG.debug("Failed to parse cgroup v1 CPU limit values.", e);
} catch (IOException e) {
LOG.debug("Could not read cgroup v1 CPU limit files.", e);
} catch (Throwable t) {
LOG.debug("Unexpected error reading cgroup v1 CPU limit.", t);
}
return -1;
}

/**
* Returns the size of the physical memory in bytes.
*
Expand Down