diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java index 12416244e8a10..b3e5a35d08547 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java @@ -3217,9 +3217,7 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception { ((SingleNodeMessage)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal()); // Restore single cache group. - assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--start", DEFAULT_CACHE_NAME)); - assertContains(log, testOut.toString(), - "Snapshot cache group restore operation started [snapshot=" + snpName + ", group(s)=" + DEFAULT_CACHE_NAME + ']'); + ig.snapshot().restoreSnapshot(snpName, Collections.singleton(DEFAULT_CACHE_NAME)); assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status")); assertContains(log, testOut.toString(), @@ -3234,7 +3232,7 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception { assertContains(log, testOut.toString(), "Snapshot cache group restore operation is not in progress [snapshot=" + missingSnpName + ']'); - GridTestUtils.runAsync(() -> { + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { // Wait for the process to be interrupted. AtomicReference errRef = U.field((Object)U.field((Object)U.field( grid(0).context().cache().context().snapshotMgr(), "restoreCacheGrpProc"), "opCtx"), "err"); @@ -3250,6 +3248,8 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception { assertContains(log, testOut.toString(), "Snapshot cache group restore operation canceled [snapshot=" + snpName + ']'); + fut.get(getTestTimeout()); + assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status")); assertContains(log, testOut.toString(), "Snapshot cache group restore operation is NOT running [snapshot=" + snpName + ']'); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 9be0829bb21ca..7a57c8520573d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -442,6 +442,8 @@ public static String partDeltaFileName(int partId) { "The list of names of all snapshots currently saved on the local node with respect to " + "the configured via IgniteConfiguration snapshot working path."); + restoreCacheGrpProc.registerMetrics(); + cctx.exchange().registerExchangeAwareComponent(this); ctx.internalSubscriptionProcessor().registerMetastorageListener(this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java index 9dc13618e234d..60abf07606ae6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java @@ -17,7 +17,11 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.mxbean.SnapshotMXBean; @@ -47,4 +51,20 @@ public SnapshotMXBeanImpl(GridKernalContext ctx) { @Override public void cancelSnapshot(String snpName) { mgr.cancelSnapshot(snpName).get(); } + + /** {@inheritDoc} */ + @Override public void restoreSnapshot(String name, String grpNames) { + Set grpNamesSet = F.isEmpty(grpNames) ? null : + Arrays.stream(grpNames.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet()); + + IgniteFuture fut = mgr.restoreSnapshot(name, grpNamesSet); + + if (fut.isDone()) + fut.get(); + } + + /** {@inheritDoc} */ + @Override public void cancelSnapshotRestore(String name) { + mgr.cancelSnapshotRestore(name).get(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java index 1cc6956c509c7..a4044d9eee6ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java @@ -32,7 +32,7 @@ * Snapshot restore management task. */ abstract class SnapshotRestoreManagementTask extends ComputeTaskAdapter { - /** + /** * @param param Compute job argument. * @return Compute job. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index c796299dffddf..2fa6a2053dc3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -38,6 +38,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; import java.util.function.BooleanSupplier; @@ -66,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; +import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.util.distributed.DistributedProcess; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -101,6 +103,9 @@ public class SnapshotRestoreProcess { /** Temporary cache directory prefix. */ public static final String TMP_CACHE_DIR_PREFIX = "_tmp_snp_restore_"; + /** Snapshot restore metrics prefix. */ + public static final String SNAPSHOT_RESTORE_METRICS = "snapshot-restore"; + /** Reject operation message. */ private static final String OP_REJECT_MSG = "Cache group restore operation was rejected. "; @@ -128,12 +133,18 @@ public class SnapshotRestoreProcess { /** Logger. */ private final IgniteLogger log; - /** Future to be completed when the cache restore process is complete (this future will be returned to the user). */ + /** + * Future to be completed when the cache restore process is complete. By default, this is a stub. + * When the process is started the future is recreated on the initiator node and passed to the user. + */ private volatile ClusterSnapshotFuture fut; - /** Snapshot restore operation context. */ + /** Current snapshot restore operation context (will be {@code null} when the operation is not running). */ private volatile SnapshotRestoreContext opCtx; + /** Last snapshot restore operation context (saves the metrics of the last operation). */ + private volatile SnapshotRestoreContext lastOpCtx = new SnapshotRestoreContext(); + /** * @param ctx Kernal context. */ @@ -173,6 +184,24 @@ protected void cleanup() throws IgniteCheckedException { } } + /** + * Register local metrics. + */ + protected void registerMetrics() { + MetricRegistry mreg = ctx.metric().registry(SNAPSHOT_RESTORE_METRICS); + + mreg.register("startTime", () -> lastOpCtx.startTime, + "The system time of the start of the cluster snapshot restore operation on this node."); + mreg.register("endTime", () -> lastOpCtx.endTime, + "The system time when the restore operation of a cluster snapshot on this node ended."); + mreg.register("snapshotName", () -> lastOpCtx.snpName, String.class, + "The snapshot name of the last running cluster snapshot restore operation on this node."); + mreg.register("totalPartitions", () -> lastOpCtx.totalParts, + "The total number of partitions to be restored on this node."); + mreg.register("processedPartitions", () -> lastOpCtx.processedParts.get(), + "The number of processed partitions on this node."); + } + /** * Start cache group restore operation. * @@ -206,9 +235,7 @@ public IgniteFuture start(String snpName, @Nullable Collection cac if (restoringSnapshotName() != null) throw new IgniteException(OP_REJECT_MSG + "The previous snapshot restore operation was not completed."); - fut = new ClusterSnapshotFuture(UUID.randomUUID(), snpName); - - fut0 = fut; + fut0 = fut = new ClusterSnapshotFuture(UUID.randomUUID(), snpName); } } catch (IgniteException e) { @@ -409,16 +436,23 @@ else if (log.isInfoEnabled()) SnapshotRestoreContext opCtx0 = opCtx; - if (opCtx0 != null && reqId.equals(opCtx0.reqId)) + if (opCtx0 != null && reqId.equals(opCtx0.reqId)) { opCtx = null; + opCtx0.endTime = U.currentTimeMillis(); + } + synchronized (this) { ClusterSnapshotFuture fut0 = fut; if (fut0 != null && reqId.equals(fut0.rqId)) { fut = null; - ctx.pools().getSystemExecutorService().submit(() -> fut0.onDone(null, err)); + ctx.pools().getSystemExecutorService().submit(() -> { + fut0.endTime = U.currentTimeMillis(); + + fut0.onDone(null, err); + }); } } } @@ -560,11 +594,11 @@ private IgniteInternalFuture prepare(SnapshotO SnapshotRestoreContext opCtx0 = prepareContext(req); synchronized (this) { - opCtx = opCtx0; + lastOpCtx = opCtx = opCtx0; ClusterSnapshotFuture fut0 = fut; - if (fut0 != null) + if (fut0 != null && fut0.interruptEx != null) opCtx0.errHnd.accept(fut0.interruptEx); } @@ -837,6 +871,7 @@ private IgniteInternalFuture preload(UUID reqId) { Map> rmtLoadParts = new HashMap<>(); ClusterNode locNode = ctx.cache().context().localNode(); + long totalParts = 0; for (File dir : opCtx0.dirs) { String cacheOrGrpName = cacheGroupName(dir); @@ -855,6 +890,8 @@ private IgniteInternalFuture preload(UUID reqId) { if (leftParts.isEmpty()) continue; + totalParts += leftParts.size(); + SnapshotMetadata full = findMetadataWithSamePartitions(locMetas, grpId, leftParts.stream().map(p -> p.partId).collect(Collectors.toSet())); @@ -898,6 +935,8 @@ private IgniteInternalFuture preload(UUID reqId) { opCtx0.locProgress.computeIfAbsent(grpId, g -> new HashSet<>()) .add(idxFut = new PartitionRestoreFuture(INDEX_PARTITION)); + totalParts += 1; + copyLocalAsync(ctx.cache().context().snapshotMgr(), opCtx, snpCacheDir, @@ -908,6 +947,8 @@ private IgniteInternalFuture preload(UUID reqId) { } } + opCtx0.totalParts = totalParts; + // Load other partitions from remote nodes. List rmtAwaitParts = rmtLoadParts.values().stream() .flatMap(Collection::stream) @@ -968,6 +1009,8 @@ private IgniteInternalFuture preload(UUID reqId) { partFile.toFile(), snpFile.length()); + opCtx0.processedParts.incrementAndGet(); + partFut.complete(partFile); } catch (Exception e) { @@ -1104,7 +1147,7 @@ private void finishCacheStart(UUID reqId, Map res, Map opCtx.errHnd.accept(t)) @@ -1347,7 +1392,7 @@ private static class SnapshotRestoreContext { /** Snapshot name. */ private final String snpName; - /** Baseline discovery cache for node IDs that must be alive to complete the operation.*/ + /** Baseline discovery cache for node IDs that must be alive to complete the operation. */ private final DiscoCache discoCache; /** Operational node id. */ @@ -1384,15 +1429,30 @@ private static class SnapshotRestoreContext { /** Calculated affinity assignment cache per each cache group. */ private final Map affCache = new ConcurrentHashMap<>(); + /** Operation start time. */ + private final long startTime; + + /** Number of processed (copied) partitions. */ + private final AtomicLong processedParts = new AtomicLong(0); + + /** Total number of partitions to be restored. */ + private volatile long totalParts = -1; + /** Cache ID to configuration mapping. */ private volatile Map cfgs; /** Graceful shutdown future. */ private volatile IgniteFuture stopFut; + /** Operation end time. */ + private volatile long endTime; + /** * @param req Request to prepare cache group restore from the snapshot. + * @param discoCache Baseline discovery cache for node IDs that must be alive to complete the operation. * @param cfgs Cache ID to configuration mapping. + * @param locNodeId Local node ID. + * @param locMetas List of snapshot metadata. */ protected SnapshotRestoreContext( SnapshotOperationRequest req, @@ -1409,6 +1469,18 @@ protected SnapshotRestoreContext( this.cfgs = cfgs; metasPerNode.computeIfAbsent(locNodeId, id -> new ArrayList<>()).addAll(locMetas); + + startTime = U.currentTimeMillis(); + } + + /** + * Default constructor. + */ + protected SnapshotRestoreContext() { + reqId = opNodeId = null; + discoCache = null; + snpName = ""; + startTime = 0; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java index e6b425835503d..fcb7a399c9878 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java @@ -17,6 +17,7 @@ package org.apache.ignite.mxbean; +import java.util.Collection; import org.apache.ignite.IgniteSnapshot; /** @@ -40,4 +41,28 @@ public interface SnapshotMXBean { */ @MXBeanDescription("Cancel started cluster-wide snapshot on the node initiator.") public void cancelSnapshot(@MXBeanParameter(name = "snpName", description = "Snapshot name.") String snpName); + + /** + * Restore cluster-wide snapshot. + * + * @param name Snapshot name. + * @param cacheGroupNames Optional comma-separated list of cache group names. + * @see IgniteSnapshot#restoreSnapshot(String, Collection) + */ + @MXBeanDescription("Restore cluster-wide snapshot.") + public void restoreSnapshot( + @MXBeanParameter(name = "snpName", description = "Snapshot name.") + String name, + @MXBeanParameter(name = "cacheGroupNames", description = "Optional comma-separated list of cache group names.") + String cacheGroupNames + ); + + /** + * Cancel previously started snapshot restore operation. + * + * @param name Snapshot name. + * @see IgniteSnapshot#cancelSnapshotRestore(String) + */ + @MXBeanDescription("Cancel previously started snapshot restore operation.") + public void cancelSnapshotRestore(@MXBeanParameter(name = "snpName", description = "Snapshot name.") String name); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java index aec06469a4fc8..66c269d9e5d48 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java @@ -151,7 +151,7 @@ protected Function valueBuilder() { /** Parameters. */ @Parameterized.Parameters(name = "Encryption={0}") public static Iterable encryptionParams() { - return Arrays.asList(false, true); + return Arrays.asList(false); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java index c662a5a9ed7d5..87dee8d8602ca 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java @@ -17,19 +17,38 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.management.AttributeNotFoundException; import javax.management.DynamicMBean; import javax.management.MBeanException; import javax.management.ReflectionException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.util.distributed.SingleNodeMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.mxbean.SnapshotMXBean; import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS; +import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.SNAPSHOT_RESTORE_METRICS; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** * Tests {@link SnapshotMXBean}. @@ -56,7 +75,7 @@ public void testCreateSnapshot() throws Exception { mxBean.createSnapshot(SNAPSHOT_NAME); assertTrue("Waiting for snapshot operation failed.", - GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, 10_000)); + GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, TIMEOUT)); stopAllGrids(); @@ -81,8 +100,107 @@ public void testCancelSnapshot() throws Exception { mxBean::cancelSnapshot); } - /** + /** @throws Exception If fails. */ + @Test + public void testRestoreSnapshot() throws Exception { + CacheConfiguration ccfg1 = new CacheConfiguration<>(dfltCacheCfg).setBackups(1).setName("cache1"); + CacheConfiguration ccfg2 = new CacheConfiguration<>(ccfg1).setName("cache2"); + + IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, Integer::new, ccfg1, ccfg2); + + DynamicMBean snpMBean = metricRegistry(ignite.name(), null, SNAPSHOT_METRICS); + + assertEquals("Snapshot end time must be undefined on first snapshot operation starts.", + 0, getLastSnapshotEndTime(snpMBean)); + + SnapshotMXBean mxBean = getMxBean(ignite.name(), "Snapshot", SnapshotMXBeanImpl.class, SnapshotMXBean.class); + + mxBean.createSnapshot(SNAPSHOT_NAME); + + assertTrue(GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, TIMEOUT)); + + ignite.destroyCaches(Arrays.asList(ccfg1.getName(), ccfg2.getName())); + + awaitPartitionMapExchange(); + + DynamicMBean restoreMBean = metricRegistry(ignite.name(), null, SNAPSHOT_RESTORE_METRICS); + + assertEquals(0, getLongMetric("endTime", restoreMBean)); + assertEquals(-1, getLongMetric("totalPartitions", restoreMBean)); + assertEquals(0, getLongMetric("processedPartitions", restoreMBean)); + assertTrue(String.valueOf(restoreMBean.getAttribute("snapshotName")).isEmpty()); + + Set grpNames = new HashSet<>(F.asList(ccfg1.getName(), ccfg2.getName())); + + mxBean.restoreSnapshot(SNAPSHOT_NAME, F.concat(grpNames, " ,")); + + assertTrue(GridTestUtils.waitForCondition(() -> getLongMetric("endTime", restoreMBean) > 0, TIMEOUT)); + + int expPartCnt = grid(0).cachex(ccfg1.getName()).context().affinity().partitions() + + grid(0).cachex(ccfg2.getName()).context().affinity().partitions(); + + for (Ignite grid : G.allGrids()) { + DynamicMBean mReg = metricRegistry(grid.name(), null, SNAPSHOT_RESTORE_METRICS); + + assertEquals(expPartCnt, getLongMetric("totalPartitions", mReg)); + assertEquals(expPartCnt, getLongMetric("processedPartitions", mReg)); + + assertEquals(SNAPSHOT_NAME, restoreMBean.getAttribute("snapshotName")); + } + + assertSnapshotCacheKeys(ignite.cache(ccfg1.getName())); + assertSnapshotCacheKeys(ignite.cache(ccfg2.getName())); + } + + /** @throws Exception If fails. */ + @Test + public void testCancelRestoreSnapshot() throws Exception { + IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE); + + DynamicMBean snpMBean = metricRegistry(ignite.name(), null, SNAPSHOT_METRICS); + assertEquals("Snapshot end time must be undefined on first snapshot operation starts.", + 0, getLastSnapshotEndTime(snpMBean)); + + SnapshotMXBean mxBean = getMxBean(ignite.name(), "Snapshot", SnapshotMXBeanImpl.class, SnapshotMXBean.class); + + mxBean.createSnapshot(SNAPSHOT_NAME); + + assertTrue(GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, TIMEOUT)); + + ignite.destroyCache(dfltCacheCfg.getName()); + + awaitPartitionMapExchange(); + + TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(1)); + + spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage && + ((SingleNodeMessage)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal()); + + IgniteFuture fut = ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null); + + spi.waitForBlocked(); + + IgniteInternalFuture interruptFut = GridTestUtils.runAsync(() -> { + // Wait for the process to be interrupted. + AtomicReference errRef = U.field((Object)U.field((Object)U.field( + grid(0).context().cache().context().snapshotMgr(), "restoreCacheGrpProc"), "opCtx"), "err"); + + boolean interrupted = waitForCondition(() -> errRef.get() != null, getTestTimeout()); + + spi.stopBlock(); + + return interrupted; + }); + + mxBean.cancelSnapshotRestore(SNAPSHOT_NAME); + + assertTrue(interruptFut.get()); + + assertThrowsAnyCause(log, fut::get, IgniteCheckedException.class, "Operation has been canceled by the user"); + } + + /** * @param mBean Ignite snapshot MBean. * @return Value of snapshot end time. */ @@ -94,4 +212,19 @@ private static long getLastSnapshotEndTime(DynamicMBean mBean) { throw new RuntimeException(e); } } + + /** + * @param mBean Ignite snapshot restore MBean. + * @param name Metric name. + * @return Metric value. + */ + private static long getLongMetric(String name, DynamicMBean mBean) { + try { + return (long)mBean.getAttribute(name); + } + catch (MBeanException | ReflectionException | AttributeNotFoundException e) { + throw new RuntimeException(e); + } + } + }