Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,9 @@ public static class Replicator {

/** Replica is absent on the node and the node is not in assignments for this replica. */
public static final int REPLICA_ABSENT_ERR = REPLICATOR_ERR_GROUP.registerErrorCode((short) 11);

/** Node is overloaded: in-flight partition operation byte limit reached. */
public static final int REPLICA_OVERLOADED_ERR = REPLICATOR_ERR_GROUP.registerErrorCode((short) 12);
}

/** Storage error group. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,6 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;

// Each inbound handler in a pipeline has to release the received messages.
var unpacker = new ClientMessageUnpacker(byteBuf);

Expand Down Expand Up @@ -882,7 +881,6 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
if (ClientOp.isPartitionOperation(opCode)) {
long requestId0 = requestId;
int opCode0 = opCode;

partitionOperationsExecutor.execute(() -> {
try {
processOperationInternal(ctx, in, requestId0, opCode0, guard);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.lang;

import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_OVERLOADED_ERR;

/**
* Thrown when the node has reached the in-flight partition operation byte limit
* ({@code replication.partitionOperationHeapUsagePercent}) and cannot accept new requests.
*/
public class ReplicaOverloadedException extends IgniteInternalException {
private static final long serialVersionUID = -6023736883539658779L;

/** Constructor. */
public ReplicaOverloadedException() {
super(REPLICA_OVERLOADED_ERR, "Node is overloaded: in-flight partition operation byte limit reached.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.util;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntSupplier;
import org.jetbrains.annotations.Nullable;

/**
* Limits the total in-flight bytes of partition operations (queued or executing) across the replica manager and thin-client connector.
*
* <p>The byte limit is computed as a percentage of the JVM heap ({@code Runtime.getRuntime().maxMemory()}).
* When the heap percentage is zero or less, all operations are permitted unconditionally.
*
* <p>{@link #tryAcquire(int)} returns {@code false} once adding {@code messageBytes} would exceed the limit.
* A permit must be released via {@link #release(int)} when the operation completes.
*/
public class PartitionOperationInflightLimiter {
/** Byte limit computed from heap percentage; {@code 0} means unlimited. */
private volatile long byteLimit;

private final @Nullable IntSupplier heapPercentSupplier;

private volatile boolean initialized;

/** Running total of in-flight bytes. */
private final AtomicLong inFlightBytes = new AtomicLong();

/**
* Constructor.
*
* @param heapPercent Percentage of max JVM heap to use as the in-flight byte limit. Zero or negative disables the limit.
*/
public PartitionOperationInflightLimiter(int heapPercent) {
this.byteLimit = computeByteLimit(heapPercent);
this.heapPercentSupplier = null;
this.initialized = true;
}

/**
* Constructor with a lazy supplier of the heap percentage.
*
* @param heapPercentSupplier Supplier of heap percentage (0 or less disables the limit). Called at most once, on first use.
*/
public PartitionOperationInflightLimiter(@Nullable IntSupplier heapPercentSupplier) {
this.heapPercentSupplier = heapPercentSupplier;
this.initialized = false;
}

/**
* Attempts to reserve {@code messageBytes} in-flight bytes.
*
* @param messageBytes Number of bytes to reserve.
* @return {@code true} if the reservation was made or the limit is disabled; {@code false} if adding the bytes would exceed the limit.
*/
public boolean tryAcquire(int messageBytes) {
long limit = resolvedByteLimit();

if (limit <= 0) {
return true;
}

while (true) {
long current = inFlightBytes.get();

if (current + messageBytes > limit) {
return false;
}

if (inFlightBytes.compareAndSet(current, current + messageBytes)) {
return true;
}
}
}

/**
* Releases previously reserved in-flight bytes.
* Must only be called after a successful {@link #tryAcquire(int)}.
*
* @param messageBytes Number of bytes to release.
*/
public void release(int messageBytes) {
long limit = resolvedByteLimit();

if (limit > 0) {
inFlightBytes.addAndGet(-messageBytes);
}
}

private long resolvedByteLimit() {
if (initialized) {
return byteLimit;
}
synchronized (this) {
if (initialized) {
return byteLimit;
}
if (heapPercentSupplier != null) {
byteLimit = computeByteLimit(heapPercentSupplier.getAsInt());
}
initialized = true;
}
return byteLimit;
}

private static long computeByteLimit(int heapPercent) {
if (heapPercent <= 0) {
return 0;
}
return (long) (heapPercent / 100.0 * Runtime.getRuntime().maxMemory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.util;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.Test;

class PartitionOperationInFlightLimiterTest {
private static final long MAX_MEMORY = Runtime.getRuntime().maxMemory();

@Test
void zeroHeapPercentAlwaysPermits() {
var limiter = new PartitionOperationInflightLimiter(0);

for (int i = 0; i < 100; i++) {
assertTrue(limiter.tryAcquire(1000));
}
}

@Test
void negativeHeapPercentAlwaysPermits() {
var limiter = new PartitionOperationInflightLimiter(-1);

for (int i = 0; i < 100; i++) {
assertTrue(limiter.tryAcquire(1000));
}
}

@Test
void acquireFailsWhenByteLimitExceeded() {
// Use 10% heap limit.
var limiter = new PartitionOperationInflightLimiter(10);
long limit = (long) (0.10 * MAX_MEMORY);

// A single chunk that exceeds the limit should be rejected.
assertFalse(limiter.tryAcquire((int) Math.min(limit + 1, Integer.MAX_VALUE)));
}

@Test
void acquireSucceedsUpToLimit() {
var limiter = new PartitionOperationInflightLimiter(10);
long limit = (long) (0.10 * MAX_MEMORY);

// Chunk size that fits within the limit.
int chunkBytes = (int) Math.min(limit / 2, Integer.MAX_VALUE / 2);

assertTrue(limiter.tryAcquire(chunkBytes));
assertTrue(limiter.tryAcquire(chunkBytes));
}

@Test
void releaseRestoresBudget() {
var limiter = new PartitionOperationInflightLimiter(10);
long limit = (long) (0.10 * MAX_MEMORY);
int chunkBytes = (int) Math.min(limit / 2, Integer.MAX_VALUE / 2);

assertTrue(limiter.tryAcquire(chunkBytes));
assertTrue(limiter.tryAcquire(chunkBytes));
// Now at or near limit; another chunk should fail.
assertFalse(limiter.tryAcquire(chunkBytes));

limiter.release(chunkBytes);

assertTrue(limiter.tryAcquire(chunkBytes));
}

@Test
void releaseOnZeroLimitIsNoOp() {
var limiter = new PartitionOperationInflightLimiter(0);

// Should not throw.
limiter.release(1000);

assertTrue(limiter.tryAcquire(1000));
}

@Test
void supplierConstructorInitializesLazily() {
int[] callCount = {0};

// 100% heap — effectively unlimited for this test.
var limiter = new PartitionOperationInflightLimiter(() -> {
callCount[0]++;
return 100;
});

assertTrue(callCount[0] == 0, "supplier should not be called at construction time");

assertTrue(limiter.tryAcquire(1));
assertTrue(callCount[0] == 1, "supplier should be called exactly once");

assertTrue(limiter.tryAcquire(1));
assertTrue(callCount[0] == 1, "supplier should not be called again");
}

@Test
void supplierConstructorWithZeroPercentAlwaysPermits() {
var limiter = new PartitionOperationInflightLimiter(() -> 0);

for (int i = 0; i < 100; i++) {
assertTrue(limiter.tryAcquire(1000));
}
}

@Test
void multipleReleasesRestoreBudget() {
var limiter = new PartitionOperationInflightLimiter(10);
long limit = (long) (0.10 * MAX_MEMORY);
int chunkBytes = (int) Math.min(limit / 4, Integer.MAX_VALUE / 4);

// Acquire 4 chunks.
for (int i = 0; i < 4; i++) {
assertTrue(limiter.tryAcquire(chunkBytes), "acquire " + i + " should succeed");
}

// Release all.
for (int i = 0; i < 4; i++) {
limiter.release(chunkBytes);
}

// Should be able to acquire again.
for (int i = 0; i < 4; i++) {
assertTrue(limiter.tryAcquire(chunkBytes), "re-acquire " + i + " should succeed after release");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.util.PartitionOperationInflightLimiter;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.network.NetworkAddress;
Expand Down Expand Up @@ -1487,6 +1488,7 @@ private class Node {
Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class),
placementDriver,
threadPoolsManager.partitionOperationsExecutor(),
new PartitionOperationInflightLimiter(0),
partitionIdleSafeTimePropagationPeriodMsSupplier,
new NoOpFailureManager(),
new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,28 @@ public TypeSpec generateMessageImpl(MessageClass message, TypeSpec builderInterf

messageImpl.addMethod(messageTypeMethod);

// messageSize field with getter/setter (overrides NetworkMessage.getMessageSize()).
// Declared transient so it does not affect serialVersionUID computation and is not included in Java serialization.
FieldSpec messageSizeField = FieldSpec.builder(int.class, "messageSize")
.addModifiers(Modifier.PRIVATE, Modifier.TRANSIENT)
.addAnnotation(IgniteToStringExclude.class)
.build();

messageImpl.addField(messageSizeField);

messageImpl.addMethod(MethodSpec.methodBuilder("getMessageSize")
.addAnnotation(Override.class)
.addModifiers(Modifier.PUBLIC)
.returns(int.class)
.addStatement("return $N", messageSizeField)
.build());

messageImpl.addMethod(MethodSpec.methodBuilder("setMessageSize")
.addModifiers(Modifier.PUBLIC)
.addParameter(int.class, "messageSize")
.addStatement("this.messageSize = messageSize")
.build());

// equals and hashCode
generateEqualsAndHashCode(messageImpl, message);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,19 @@ default boolean needAck() {
default String toStringForLightLogging() {
return getClass().getName();
}

/**
* Returns the wire size of this message in bytes (header + body).
* Returns {@code 0} if the message was not received from the network (e.g. created locally).
*/
default int getMessageSize() {
return 0;
}

/**
* Sets the wire size of this message in bytes. Called by the inbound decoder after decoding.
*/
default void setMessageSize(int messageSize) {
// No-op for messages not received from the network.
}
}
Loading