Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.placementdriver;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.sleep;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
* Tests for the lease updater's inflight futures.
*/
public class ItLeaseUpdaterInflightTest extends ClusterPerTestIntegrationTest {
private static final String TEST_ZONE = "TEST_ZONE";
private static final String TEST_TABLE = "TEST_TABLE";
private static final String LEASE_EXPIRATION_INTERVAL_MILLIS_STR = "2000";

@Override
protected int initialNodes() {
return 1;
}

@BeforeEach
public void setup() {
sql("CREATE ZONE " + TEST_ZONE + " (partitions 1, replicas 1) storage profiles ['" + CatalogService.DEFAULT_STORAGE_PROFILE + "']");
sql("CREATE TABLE " + TEST_TABLE + " (ID INT PRIMARY KEY, VAL VARCHAR(20)) ZONE " + TEST_ZONE);
}

@Override
protected void customizeInitParameters(InitParametersBuilder builder) {
super.customizeInitParameters(builder);

builder.clusterConfiguration("ignite {"
+ " replication.leaseExpirationIntervalMillis: " + LEASE_EXPIRATION_INTERVAL_MILLIS_STR
+ "}");
}

@Test
public void test() {
AtomicInteger msInflightCount = new AtomicInteger();
AtomicBoolean stopped = new AtomicBoolean();

IgniteImpl node = anyNode();

int zoneId = unwrapTableImpl(node.tables().table(TEST_TABLE)).zoneId();
ZonePartitionId partId = new ZonePartitionId(zoneId, 0);

ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, partId);

log.info("Test: zoneId={}, leaseStartTime={}", zoneId, replicaMeta.getStartTime());

String testKey = "testKey";
node.metaStorageManager().registerPrefixWatch(new ByteArray(testKey), event -> {
sleep(10);

msInflightCount.decrementAndGet();

return nullCompletedFuture();
});

runAsync(() -> {
while (!stopped.get()) {
if (msInflightCount.get() > 300) {
continue;
}

msInflightCount.incrementAndGet();
node.metaStorageManager().put(new ByteArray(testKey), "testValue".getBytes(StandardCharsets.UTF_8));
}
});

try {
sleep(Long.parseLong(LEASE_EXPIRATION_INTERVAL_MILLIS_STR) * 5);

ReplicaMeta newReplicaMeta = waitAndGetPrimaryReplica(node, partId);
log.info("Test: newLease={}", newReplicaMeta);

assertEquals(replicaMeta.getStartTime().longValue(), newReplicaMeta.getStartTime().longValue());
} finally {
stopped.set(true);
}
}

private static ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node, ZonePartitionId replicationGrpId) {
CompletableFuture<ReplicaMeta> primaryReplicaFut = node.placementDriver().awaitPrimaryReplica(
replicationGrpId,
node.clock().now(),
10,
SECONDS
);

assertThat(primaryReplicaFut, willCompleteSuccessfully());

return primaryReplicaFut.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.ignite.internal.placementdriver;

import static java.util.Collections.emptyMap;
import static java.util.Objects.hash;
import static java.util.Objects.requireNonNullElse;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
Expand All @@ -28,9 +30,11 @@
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
import static org.apache.ignite.internal.placementdriver.leases.Lease.emptyLease;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.CollectionUtils.union;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import static org.apache.ignite.internal.util.IgniteUtils.newHashMap;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -135,6 +139,14 @@ public class LeaseUpdater {

private final Executor throttledLogExecutor;

private CompletableFuture<?> leaseUpdateFuture = nullCompletedFuture();

/**
* Leases cache for updating leases via {@link MetaStorageManager#invoke}. It is renewed right before the lease update, because leases
* in {@link LeaseTracker} may be stale a bit, which is critical for invoke.
*/
private volatile Leases leases = new Leases(emptyMap(), BYTE_EMPTY_ARRAY);

/**
* Constructor.
*
Expand Down Expand Up @@ -388,6 +400,8 @@ public void run() {

try {
if (active()) {
waitForInflight();

updateLeaseBatchInternal();
}
} catch (Throwable e) {
Expand All @@ -408,6 +422,28 @@ public void run() {
}
}

private void waitForInflight() {
try {
leaseUpdateFuture.get(replicationConfiguration.leaseExpirationIntervalMillis().value() / 2, MILLISECONDS);
} catch (Exception e) {
LOG.info("Could not wait for the previous lease update to complete, proceeding with the next update attempt.", e);
}

var entry = msManager.getLocally(PLACEMENTDRIVER_LEASES_KEY);

if (entry != null && entry.value() != null) {
LeaseBatch leaseBatch = LeaseBatch.fromBytes(entry.value());
Map<ReplicationGroupId, Lease> newLeasesMap = newHashMap(leaseBatch.leases().size());
for (Lease lease : leaseBatch.leases()) {
newLeasesMap.put(lease.replicationGroupId(), lease);
}

leases = new Leases(newLeasesMap, entry.value());
} else {
leases = leaseTracker.leasesLatest();
}
}

/** Updates leases in Meta storage. This method is supposed to be used in the busy lock. */
private void updateLeaseBatchInternal() {
HybridTimestamp currentTime = clockService.current();
Expand All @@ -418,7 +454,7 @@ private void updateLeaseBatchInternal() {

HybridTimestamp newExpirationTimestamp = new HybridTimestamp(currentTime.getPhysical() + leaseExpirationInterval, 0);

Leases leasesCurrent = leaseTracker.leasesLatest();
Leases leasesCurrent = leases;
Map<ReplicationGroupId, LeaseAgreement> toBeNegotiated = new HashMap<>();
Map<ReplicationGroupId, Lease> renewedLeases = new HashMap<>(leasesCurrent.leaseByGroupId().size());

Expand Down Expand Up @@ -505,11 +541,12 @@ private void updateLeaseBatchInternal() {
// so we must start a negotiation round from the beginning; the same we do for the groups that don't have
// leaseholders at all.
if (isLeaseOutdated) {
LOG.info("Lease is expired, creating a new one [groupId={}, lease={}, candidate={}]", grpId, lease, candidate);

// New lease is granted.
Lease newLease = writeNewLease(grpId, candidate, renewedLeases);

LOG.info("Lease is expired, creating a new one [groupId={}, oldLease={}, newLease={}, candidate={}]",
grpId, lease, newLease, candidate);

boolean force = !lease.isProlongable() && lease.proposedCandidate() != null;

toBeNegotiated.put(grpId, new LeaseAgreement(newLease, force));
Expand Down Expand Up @@ -553,7 +590,7 @@ private void updateLeaseBatchInternal() {

byte[] renewedValue = new LeaseBatch(renewedLeases.values()).bytes();

msManager.invoke(
leaseUpdateFuture = msManager.invoke(
or(notExists(key), value(key).eq(leasesCurrent.leasesBytes())),
put(key, renewedValue),
noop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class ReplicationConfigurationSchema {
/** Default value for {@link #idleSafeTimePropagationDurationMillis}. */
public static final long DEFAULT_IDLE_SAFE_TIME_PROP_DURATION = TimeUnit.SECONDS.toMillis(1);

/** Default value for {@link #leaseExpirationIntervalMillis}. */
public static final long DEFAULT_LEASE_EXPIRATION_INTERVAL_MILLIS = 5000;

/** Default value for {@link #batchSizeBytes}. */
public static final int DEFAULT_BATCH_SIZE_BYTES = 8192;

Expand All @@ -57,7 +60,7 @@ public class ReplicationConfigurationSchema {
@Value(hasDefault = true)
@Range(min = 2000, max = 120000)
@PublicName(legacyNames = "leaseExpirationInterval")
public long leaseExpirationIntervalMillis = 5_000;
public long leaseExpirationIntervalMillis = DEFAULT_LEASE_EXPIRATION_INTERVAL_MILLIS;

@Value(hasDefault = true)
@Range(max = 10_000)
Expand Down