Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3217,9 +3217,7 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception {
((SingleNodeMessage<?>)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal());

// Restore single cache group.
assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--start", DEFAULT_CACHE_NAME));
assertContains(log, testOut.toString(),
"Snapshot cache group restore operation started [snapshot=" + snpName + ", group(s)=" + DEFAULT_CACHE_NAME + ']');
ig.snapshot().restoreSnapshot(snpName, Collections.singleton(DEFAULT_CACHE_NAME));

assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status"));
assertContains(log, testOut.toString(),
Expand All @@ -3234,7 +3232,7 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception {
assertContains(log, testOut.toString(),
"Snapshot cache group restore operation is not in progress [snapshot=" + missingSnpName + ']');

GridTestUtils.runAsync(() -> {
IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(() -> {
// Wait for the process to be interrupted.
AtomicReference<?> errRef = U.field((Object)U.field((Object)U.field(
grid(0).context().cache().context().snapshotMgr(), "restoreCacheGrpProc"), "opCtx"), "err");
Expand All @@ -3250,6 +3248,8 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception {
assertContains(log, testOut.toString(),
"Snapshot cache group restore operation canceled [snapshot=" + snpName + ']');

fut.get(getTestTimeout());

assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status"));
assertContains(log, testOut.toString(),
"Snapshot cache group restore operation is NOT running [snapshot=" + snpName + ']');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ public static String partDeltaFileName(int partId) {
"The list of names of all snapshots currently saved on the local node with respect to " +
"the configured via IgniteConfiguration snapshot working path.");

restoreCacheGrpProc.registerMetrics();

cctx.exchange().registerExchangeAwareComponent(this);

ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.mxbean.SnapshotMXBean;

Expand Down Expand Up @@ -47,4 +51,20 @@ public SnapshotMXBeanImpl(GridKernalContext ctx) {
@Override public void cancelSnapshot(String snpName) {
mgr.cancelSnapshot(snpName).get();
}

/** {@inheritDoc} */
@Override public void restoreSnapshot(String name, String grpNames) {
Set<String> grpNamesSet = F.isEmpty(grpNames) ? null :
Arrays.stream(grpNames.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());

IgniteFuture<Void> fut = mgr.restoreSnapshot(name, grpNamesSet);

if (fut.isDone())
fut.get();
}

/** {@inheritDoc} */
@Override public void cancelSnapshotRestore(String name) {
mgr.cancelSnapshotRestore(name).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* Snapshot restore management task.
*/
abstract class SnapshotRestoreManagementTask extends ComputeTaskAdapter<String, Boolean> {
/**
/**
* @param param Compute job argument.
* @return Compute job.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -66,6 +67,7 @@
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
Expand Down Expand Up @@ -101,6 +103,9 @@ public class SnapshotRestoreProcess {
/** Temporary cache directory prefix. */
public static final String TMP_CACHE_DIR_PREFIX = "_tmp_snp_restore_";

/** Snapshot restore metrics prefix. */
public static final String SNAPSHOT_RESTORE_METRICS = "snapshot-restore";

/** Reject operation message. */
private static final String OP_REJECT_MSG = "Cache group restore operation was rejected. ";

Expand Down Expand Up @@ -128,12 +133,18 @@ public class SnapshotRestoreProcess {
/** Logger. */
private final IgniteLogger log;

/** Future to be completed when the cache restore process is complete (this future will be returned to the user). */
/**
* Future to be completed when the cache restore process is complete. By default, this is a stub.
* When the process is started the future is recreated on the initiator node and passed to the user.
*/
private volatile ClusterSnapshotFuture fut;

/** Snapshot restore operation context. */
/** Current snapshot restore operation context (will be {@code null} when the operation is not running). */
private volatile SnapshotRestoreContext opCtx;

/** Last snapshot restore operation context (saves the metrics of the last operation). */
private volatile SnapshotRestoreContext lastOpCtx = new SnapshotRestoreContext();

/**
* @param ctx Kernal context.
*/
Expand Down Expand Up @@ -173,6 +184,24 @@ protected void cleanup() throws IgniteCheckedException {
}
}

/**
* Register local metrics.
*/
protected void registerMetrics() {
MetricRegistry mreg = ctx.metric().registry(SNAPSHOT_RESTORE_METRICS);

mreg.register("startTime", () -> lastOpCtx.startTime,
"The system time of the start of the cluster snapshot restore operation on this node.");
mreg.register("endTime", () -> lastOpCtx.endTime,
"The system time when the restore operation of a cluster snapshot on this node ended.");
mreg.register("snapshotName", () -> lastOpCtx.snpName, String.class,
"The snapshot name of the last running cluster snapshot restore operation on this node.");
mreg.register("totalPartitions", () -> lastOpCtx.totalParts,
"The total number of partitions to be restored on this node.");
mreg.register("processedPartitions", () -> lastOpCtx.processedParts.get(),
"The number of processed partitions on this node.");
}

/**
* Start cache group restore operation.
*
Expand Down Expand Up @@ -206,9 +235,7 @@ public IgniteFuture<Void> start(String snpName, @Nullable Collection<String> cac
if (restoringSnapshotName() != null)
throw new IgniteException(OP_REJECT_MSG + "The previous snapshot restore operation was not completed.");

fut = new ClusterSnapshotFuture(UUID.randomUUID(), snpName);

fut0 = fut;
fut0 = fut = new ClusterSnapshotFuture(UUID.randomUUID(), snpName);
}
}
catch (IgniteException e) {
Expand Down Expand Up @@ -409,16 +436,23 @@ else if (log.isInfoEnabled())

SnapshotRestoreContext opCtx0 = opCtx;

if (opCtx0 != null && reqId.equals(opCtx0.reqId))
if (opCtx0 != null && reqId.equals(opCtx0.reqId)) {
opCtx = null;

opCtx0.endTime = U.currentTimeMillis();
}

synchronized (this) {
ClusterSnapshotFuture fut0 = fut;

if (fut0 != null && reqId.equals(fut0.rqId)) {
fut = null;

ctx.pools().getSystemExecutorService().submit(() -> fut0.onDone(null, err));
ctx.pools().getSystemExecutorService().submit(() -> {
fut0.endTime = U.currentTimeMillis();

fut0.onDone(null, err);
});
}
}
}
Expand Down Expand Up @@ -560,11 +594,11 @@ private IgniteInternalFuture<SnapshotRestoreOperationResponse> prepare(SnapshotO
SnapshotRestoreContext opCtx0 = prepareContext(req);

synchronized (this) {
opCtx = opCtx0;
lastOpCtx = opCtx = opCtx0;

ClusterSnapshotFuture fut0 = fut;

if (fut0 != null)
if (fut0 != null && fut0.interruptEx != null)
opCtx0.errHnd.accept(fut0.interruptEx);
}

Expand Down Expand Up @@ -837,6 +871,7 @@ private IgniteInternalFuture<Boolean> preload(UUID reqId) {

Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new HashMap<>();
ClusterNode locNode = ctx.cache().context().localNode();
long totalParts = 0;

for (File dir : opCtx0.dirs) {
String cacheOrGrpName = cacheGroupName(dir);
Expand All @@ -855,6 +890,8 @@ private IgniteInternalFuture<Boolean> preload(UUID reqId) {
if (leftParts.isEmpty())
continue;

totalParts += leftParts.size();

SnapshotMetadata full = findMetadataWithSamePartitions(locMetas,
grpId,
leftParts.stream().map(p -> p.partId).collect(Collectors.toSet()));
Expand Down Expand Up @@ -898,6 +935,8 @@ private IgniteInternalFuture<Boolean> preload(UUID reqId) {
opCtx0.locProgress.computeIfAbsent(grpId, g -> new HashSet<>())
.add(idxFut = new PartitionRestoreFuture(INDEX_PARTITION));

totalParts += 1;

copyLocalAsync(ctx.cache().context().snapshotMgr(),
opCtx,
snpCacheDir,
Expand All @@ -908,6 +947,8 @@ private IgniteInternalFuture<Boolean> preload(UUID reqId) {
}
}

opCtx0.totalParts = totalParts;

// Load other partitions from remote nodes.
List<PartitionRestoreFuture> rmtAwaitParts = rmtLoadParts.values().stream()
.flatMap(Collection::stream)
Expand Down Expand Up @@ -968,6 +1009,8 @@ private IgniteInternalFuture<Boolean> preload(UUID reqId) {
partFile.toFile(),
snpFile.length());

opCtx0.processedParts.incrementAndGet();

partFut.complete(partFile);
}
catch (Exception e) {
Expand Down Expand Up @@ -1104,7 +1147,7 @@ private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exce
orElse(checkNodeLeft(opCtx0.nodes(), res.keySet()));

if (failure == null) {
finishProcess(reqId);
finishProcess(reqId, null);

return;
}
Expand Down Expand Up @@ -1290,6 +1333,8 @@ private static void copyLocalAsync(

IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile, partFile.toFile(), snpFile.length());

opCtx.processedParts.incrementAndGet();

return partFile;
}, mgr.snapshotExecutorService())
.whenComplete((r, t) -> opCtx.errHnd.accept(t))
Expand Down Expand Up @@ -1347,7 +1392,7 @@ private static class SnapshotRestoreContext {
/** Snapshot name. */
private final String snpName;

/** Baseline discovery cache for node IDs that must be alive to complete the operation.*/
/** Baseline discovery cache for node IDs that must be alive to complete the operation. */
private final DiscoCache discoCache;

/** Operational node id. */
Expand Down Expand Up @@ -1384,15 +1429,30 @@ private static class SnapshotRestoreContext {
/** Calculated affinity assignment cache per each cache group. */
private final Map<String, GridAffinityAssignmentCache> affCache = new ConcurrentHashMap<>();

/** Operation start time. */
private final long startTime;

/** Number of processed (copied) partitions. */
private final AtomicLong processedParts = new AtomicLong(0);

/** Total number of partitions to be restored. */
private volatile long totalParts = -1;

/** Cache ID to configuration mapping. */
private volatile Map<Integer, StoredCacheData> cfgs;

/** Graceful shutdown future. */
private volatile IgniteFuture<?> stopFut;

/** Operation end time. */
private volatile long endTime;

/**
* @param req Request to prepare cache group restore from the snapshot.
* @param discoCache Baseline discovery cache for node IDs that must be alive to complete the operation.
* @param cfgs Cache ID to configuration mapping.
* @param locNodeId Local node ID.
* @param locMetas List of snapshot metadata.
*/
protected SnapshotRestoreContext(
SnapshotOperationRequest req,
Expand All @@ -1409,6 +1469,18 @@ protected SnapshotRestoreContext(
this.cfgs = cfgs;

metasPerNode.computeIfAbsent(locNodeId, id -> new ArrayList<>()).addAll(locMetas);

startTime = U.currentTimeMillis();
}

/**
* Default constructor.
*/
protected SnapshotRestoreContext() {
reqId = opNodeId = null;
discoCache = null;
snpName = "";
startTime = 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.mxbean;

import java.util.Collection;
import org.apache.ignite.IgniteSnapshot;

/**
Expand All @@ -40,4 +41,28 @@ public interface SnapshotMXBean {
*/
@MXBeanDescription("Cancel started cluster-wide snapshot on the node initiator.")
public void cancelSnapshot(@MXBeanParameter(name = "snpName", description = "Snapshot name.") String snpName);

/**
* Restore cluster-wide snapshot.
*
* @param name Snapshot name.
* @param cacheGroupNames Optional comma-separated list of cache group names.
* @see IgniteSnapshot#restoreSnapshot(String, Collection)
*/
@MXBeanDescription("Restore cluster-wide snapshot.")
public void restoreSnapshot(
@MXBeanParameter(name = "snpName", description = "Snapshot name.")
String name,
@MXBeanParameter(name = "cacheGroupNames", description = "Optional comma-separated list of cache group names.")
String cacheGroupNames
);

/**
* Cancel previously started snapshot restore operation.
*
* @param name Snapshot name.
* @see IgniteSnapshot#cancelSnapshotRestore(String)
*/
@MXBeanDescription("Cancel previously started snapshot restore operation.")
public void cancelSnapshotRestore(@MXBeanParameter(name = "snpName", description = "Snapshot name.") String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected Function<Integer, Object> valueBuilder() {
/** Parameters. */
@Parameterized.Parameters(name = "Encryption={0}")
public static Iterable<Boolean> encryptionParams() {
return Arrays.asList(false, true);
return Arrays.asList(false);
}

/** {@inheritDoc} */
Expand Down
Loading