From e3812d21b75666638ef5a74c9faf921361d6d5b8 Mon Sep 17 00:00:00 2001 From: Pavel Pereslegin Date: Wed, 15 Dec 2021 12:59:50 +0300 Subject: [PATCH 1/3] IGNITE-14794 Rework after IGNITE-14744 (wip). --- .../ignite/util/GridCommandHandlerTest.java | 36 ++-- .../snapshot/IgniteSnapshotManager.java | 38 +++-- .../snapshot/SnapshotMXBeanImpl.java | 20 +++ .../snapshot/SnapshotRestoreCancelTask.java | 18 +- .../SnapshotRestoreManagementTask.java | 16 +- .../snapshot/SnapshotRestoreProcess.java | 159 +++++++++++++++--- .../SnapshotRestoreStatusDetails.java | 130 ++++++++++++++ .../snapshot/SnapshotRestoreStatusTask.java | 36 +++- .../snapshot/VisorSnapshotRestoreTask.java | 102 ++++++++++- .../apache/ignite/mxbean/SnapshotMXBean.java | 25 +++ .../snapshot/IgniteSnapshotMXBeanTest.java | 151 ++++++++++++++++- 11 files changed, 654 insertions(+), 77 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusDetails.java diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java index 12416244e8a10..62f3b4b132a2b 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java @@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; import java.util.function.UnaryOperator; @@ -91,6 +90,7 @@ import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.db.IgniteCacheGroupsWithRestartsTest; import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.dumpprocessors.ToFileDumpProcessor; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStatusDetails; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; @@ -3217,29 +3217,31 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception { ((SingleNodeMessage)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal()); // Restore single cache group. - assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--start", DEFAULT_CACHE_NAME)); - assertContains(log, testOut.toString(), - "Snapshot cache group restore operation started [snapshot=" + snpName + ", group(s)=" + DEFAULT_CACHE_NAME + ']'); + ig.snapshot().restoreSnapshot(snpName, Collections.singleton(DEFAULT_CACHE_NAME)); assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status")); assertContains(log, testOut.toString(), - "Snapshot cache group restore operation is running [snapshot=" + snpName + ']'); + "Restore operation for snapshot \"" + snpName + "\" is still in progress"); // Check wrong snapshot name. assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", missingSnpName, "--status")); assertContains(log, testOut.toString(), - "Snapshot cache group restore operation is NOT running [snapshot=" + missingSnpName + ']'); + "No information about restoring snapshot \"" + missingSnpName + "\" is available."); assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", missingSnpName, "--cancel")); assertContains(log, testOut.toString(), "Snapshot cache group restore operation is not in progress [snapshot=" + missingSnpName + ']'); - GridTestUtils.runAsync(() -> { + IgniteInternalFuture fut = runAsync(() -> { // Wait for the process to be interrupted. - AtomicReference errRef = U.field((Object)U.field((Object)U.field( - grid(0).context().cache().context().snapshotMgr(), "restoreCacheGrpProc"), "opCtx"), "err"); + boolean canceled = waitForCondition(() -> { + SnapshotRestoreStatusDetails status = + grid(0).context().cache().context().snapshotMgr().localRestoreStatus(snpName); + + return status != null && status.errorMessage() != null; + }, getTestTimeout()); - waitForCondition(() -> errRef.get() != null, getTestTimeout()); + assertTrue(canceled); spi.stopBlock(); @@ -3250,9 +3252,21 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception { assertContains(log, testOut.toString(), "Snapshot cache group restore operation canceled [snapshot=" + snpName + ']'); + fut.get(getTestTimeout()); + assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status")); assertContains(log, testOut.toString(), - "Snapshot cache group restore operation is NOT running [snapshot=" + snpName + ']'); + "Error: Operation has been canceled by the user."); + + ig.snapshot().restoreSnapshot(snpName, null).get(getTestTimeout()); + + assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status")); + + String out = testOut.toString(); + + assertContains(log, out, "Restore operation for snapshot \"" + snpName + "\" completed successfully"); + assertContains(log, out, "Cache groups: " + DEFAULT_CACHE_NAME); + assertContains(log, out, "Progress: 100% completed"); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 9be0829bb21ca..5f82342b400bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -442,6 +442,8 @@ public static String partDeltaFileName(int partId) { "The list of names of all snapshots currently saved on the local node with respect to " + "the configured via IgniteConfiguration snapshot working path."); + restoreCacheGrpProc.registerMetrics(); + cctx.exchange().registerExchangeAwareComponent(this); ctx.internalSubscriptionProcessor().registerMetastorageListener(this); @@ -935,16 +937,6 @@ public boolean isRestoring() { return restoreCacheGrpProc.restoringSnapshotName() != null; } - /** - * Check if snapshot restore process is currently running. - * - * @param snpName Snapshot name. - * @return {@code True} if the snapshot restore operation from the specified snapshot is in progress locally. - */ - public boolean isRestoring(String snpName) { - return snpName.equals(restoreCacheGrpProc.restoringSnapshotName()); - } - /** * Check if the cache or group with the specified name is currently being restored from the snapshot. * @@ -956,17 +948,27 @@ public boolean isRestoring(CacheConfiguration ccfg) { } /** - * Status of the restore operation cluster-wide. + * Get the status of a cluster-wide restore operation. * * @param snpName Snapshot name. - * @return Future that will be completed when the status of the restore operation is received from all the server - * nodes. The result of this future will be {@code false} if the restore process with the specified snapshot name is - * not running on all nodes. + * @return Future that will be completed when the status of the restore operation is received from all server nodes. + * The result of this future is the node ids mapping with restore operation state. */ - public IgniteFuture restoreStatus(String snpName) { + public IgniteFuture> clusterRestoreStatus(String snpName) { return executeRestoreManagementTask(SnapshotRestoreStatusTask.class, snpName); } + /** + * Get the status of the last local snapshot restore operation. + * + * @param snpName Snapshot name. + * @return Status of the last local snapshot restore operation, {@code null} if the snapshot name of the last + * started operation differs from the specified one. + */ + public @Nullable SnapshotRestoreStatusDetails localRestoreStatus(String snpName) { + return restoreCacheGrpProc.status(snpName); + } + /** * @param restoreId Restore process ID. * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when @@ -1861,9 +1863,11 @@ static void copy(FileIOFactory factory, File from, File to, long length) { /** * @param taskCls Snapshot restore operation management task class. * @param snpName Snapshot name. + * @param Type of the task result returning from {@link SnapshotRestoreManagementTask#reduce(List)} method. + * @return Task future. */ - private IgniteFuture executeRestoreManagementTask( - Class> taskCls, + private IgniteFuture executeRestoreManagementTask( + Class> taskCls, String snpName ) { cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java index 9dc13618e234d..60abf07606ae6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java @@ -17,7 +17,11 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.mxbean.SnapshotMXBean; @@ -47,4 +51,20 @@ public SnapshotMXBeanImpl(GridKernalContext ctx) { @Override public void cancelSnapshot(String snpName) { mgr.cancelSnapshot(snpName).get(); } + + /** {@inheritDoc} */ + @Override public void restoreSnapshot(String name, String grpNames) { + Set grpNamesSet = F.isEmpty(grpNames) ? null : + Arrays.stream(grpNames.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet()); + + IgniteFuture fut = mgr.restoreSnapshot(name, grpNamesSet); + + if (fut.isDone()) + fut.get(); + } + + /** {@inheritDoc} */ + @Override public void cancelSnapshotRestore(String name) { + mgr.cancelSnapshotRestore(name).get(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreCancelTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreCancelTask.java index 10cc8d43b8907..6e2547778ca71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreCancelTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreCancelTask.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; +import java.util.List; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobAdapter; +import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.resources.IgniteInstanceResource; @@ -28,7 +30,7 @@ * Snapshot restore cancel task. */ @GridInternal -class SnapshotRestoreCancelTask extends SnapshotRestoreManagementTask { +class SnapshotRestoreCancelTask extends SnapshotRestoreManagementTask { /** Serial version uid. */ private static final long serialVersionUID = 0L; @@ -44,4 +46,18 @@ class SnapshotRestoreCancelTask extends SnapshotRestoreManagementTask { } }; } + + /** {@inheritDoc} */ + @Override public Boolean reduce(List results) throws IgniteException { + boolean ret = false; + + for (ComputeJobResult r : results) { + if (r.getException() != null) + throw new IgniteException("Failed to execute job [nodeId=" + r.getNode().id() + ']', r.getException()); + + ret |= Boolean.TRUE.equals(r.getData()); + } + + return ret; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java index 1cc6956c509c7..6f21cead173d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java @@ -31,7 +31,7 @@ /** * Snapshot restore management task. */ -abstract class SnapshotRestoreManagementTask extends ComputeTaskAdapter { +abstract class SnapshotRestoreManagementTask extends ComputeTaskAdapter { /** * @param param Compute job argument. * @return Compute job. @@ -51,20 +51,6 @@ abstract class SnapshotRestoreManagementTask extends ComputeTaskAdapter results) throws IgniteException { - boolean ret = false; - - for (ComputeJobResult r : results) { - if (r.getException() != null) - throw new IgniteException("Failed to execute job [nodeId=" + r.getNode().id() + ']', r.getException()); - - ret |= Boolean.TRUE.equals(r.getData()); - } - - return ret; - } - /** {@inheritDoc} */ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List rcvd) throws IgniteException { // Handle all exceptions during the `reduce` operation. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index c796299dffddf..2f31c692801e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -38,6 +38,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; import java.util.function.BooleanSupplier; @@ -66,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; +import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.util.distributed.DistributedProcess; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -101,6 +103,9 @@ public class SnapshotRestoreProcess { /** Temporary cache directory prefix. */ public static final String TMP_CACHE_DIR_PREFIX = "_tmp_snp_restore_"; + /** Snapshot restore metrics prefix. */ + public static final String SNAPSHOT_RESTORE_METRICS = "snapshot-restore"; + /** Reject operation message. */ private static final String OP_REJECT_MSG = "Cache group restore operation was rejected. "; @@ -128,12 +133,18 @@ public class SnapshotRestoreProcess { /** Logger. */ private final IgniteLogger log; - /** Future to be completed when the cache restore process is complete (this future will be returned to the user). */ - private volatile ClusterSnapshotFuture fut; + /** + * Future to be completed when the cache restore process is complete. By default, this is a stub. + * When the process is started the future is recreated on the initiator node and passed to the user. + */ + private volatile ClusterSnapshotFuture fut = new ClusterSnapshotFuture(); - /** Snapshot restore operation context. */ + /** Current snapshot restore operation context (will be {@code null} when the operation is not running). */ private volatile SnapshotRestoreContext opCtx; + /** Last snapshot restore operation context (saves the metrics of the last operation). */ + private volatile SnapshotRestoreContext lastOpCtx = new SnapshotRestoreContext(); + /** * @param ctx Kernal context. */ @@ -173,6 +184,30 @@ protected void cleanup() throws IgniteCheckedException { } } + /** + * Register local metrics. + */ + protected void registerMetrics() { + MetricRegistry mreg = ctx.metric().registry(SNAPSHOT_RESTORE_METRICS); + + mreg.register("startTime", () -> lastOpCtx.startTime, + "The system time of the start of the cluster snapshot restore operation on this node."); + mreg.register("endTime", () -> lastOpCtx.endTime, + "The system time when the restore operation of a cluster snapshot on this node ended."); + mreg.register("snapshotName", () -> lastOpCtx.snpName, String.class, + "The snapshot name of the last running cluster snapshot restore operation on this node."); + mreg.register("cacheGroupNames", () -> lastOpCtx.cacheGrpNames, String.class, + "The names of the cache groups that are being restored from the snapshot."); + mreg.register("totalPartitions", () -> lastOpCtx.totalParts, + "The total number of partitions to be restored on this node."); + mreg.register("processedPartitions", () -> lastOpCtx.processedParts.get(), + "The number of processed partitions on this node."); + mreg.register("totalPartitionsSize", () -> lastOpCtx.totalBytes, + "The total size of the partitions to be restored on this node."); + mreg.register("processedPartitionsSize", () -> lastOpCtx.processedBytes.get(), + "The total size of processed partitions on this node."); + } + /** * Start cache group restore operation. * @@ -206,9 +241,7 @@ public IgniteFuture start(String snpName, @Nullable Collection cac if (restoringSnapshotName() != null) throw new IgniteException(OP_REJECT_MSG + "The previous snapshot restore operation was not completed."); - fut = new ClusterSnapshotFuture(UUID.randomUUID(), snpName); - - fut0 = fut; + fut0 = fut = new ClusterSnapshotFuture(UUID.randomUUID(), snpName); } } catch (IgniteException e) { @@ -322,7 +355,7 @@ public IgniteFuture start(String snpName, @Nullable Collection cac ClusterSnapshotFuture fut0 = fut; - return fut0 != null ? fut0.name : null; + return fut0.isDone() ? null : fut0.name; } /** @@ -372,6 +405,49 @@ else if (CU.cacheId(locGrpName) == cacheId) return false; } + /** + * Get the status of the last local snapshot restore operation. + * + * @param snpName Snapshot name. + * @return Status details. + */ + public @Nullable SnapshotRestoreStatusDetails status(String snpName) { + SnapshotRestoreContext opCtx = lastOpCtx; + ClusterSnapshotFuture fut0 = fut; + + // Future is created only on node initiator, context is created on all nodes, but later. + boolean futValid = snpName.equals(fut0.name); + boolean ctxValid = snpName.equals(opCtx.snpName); + + if (!ctxValid && !futValid) + return null; + + if (ctxValid && futValid && !fut0.rqId.equals(opCtx.reqId)) { + // If the request ID does not match, we must read the metrics of the later operation. + if (fut0.startTime <= opCtx.startTime) + futValid = false; + else + ctxValid = false; + } + + long startTime = futValid ? fut0.startTime : opCtx.startTime; + long endTime = futValid ? fut0.endTime : opCtx.endTime; + UUID reqId = futValid ? fut0.rqId : opCtx.reqId; + Throwable err = ctxValid ? opCtx.err.get() : fut0.interruptEx; + + return new SnapshotRestoreStatusDetails( + reqId, + startTime, + endTime, + err == null ? null : err.getMessage(), + ctxValid ? opCtx.processedParts.get() : 0, + ctxValid ? opCtx.processedBytes.get() : 0, + ctxValid ? opCtx.cacheGrpNames : null, + ctxValid ? opCtx.totalParts : 0, + ctxValid ? opCtx.totalBytes : 0 + ); + } + /** * @param reqId Request ID. * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when @@ -386,15 +462,6 @@ public Set cacheStartRequiredAliveNodes(IgniteUuid reqId) { return new HashSet<>(opCtx0.nodes()); } - /** - * Finish local cache group restore process. - * - * @param reqId Request ID. - */ - private void finishProcess(UUID reqId) { - finishProcess(reqId, null); - } - /** * Finish local cache group restore process. * @@ -409,16 +476,21 @@ else if (log.isInfoEnabled()) SnapshotRestoreContext opCtx0 = opCtx; - if (opCtx0 != null && reqId.equals(opCtx0.reqId)) + if (opCtx0 != null && reqId.equals(opCtx0.reqId)) { opCtx = null; + opCtx0.endTime = U.currentTimeMillis(); + } + synchronized (this) { ClusterSnapshotFuture fut0 = fut; - if (fut0 != null && reqId.equals(fut0.rqId)) { - fut = null; + if (!fut0.isDone() && reqId.equals(fut0.rqId)) { + ctx.pools().getSystemExecutorService().submit(() -> { + fut0.endTime = U.currentTimeMillis(); - ctx.pools().getSystemExecutorService().submit(() -> fut0.onDone(null, err)); + fut0.onDone(null, err); + }); } } } @@ -452,7 +524,7 @@ public IgniteFuture cancel(IgniteCheckedException reason, String snpNam synchronized (this) { opCtx0 = opCtx; - if (fut != null && fut.name.equals(snpName)) { + if (!fut.isDone() && fut.name.equals(snpName)) { fut0 = fut; fut0.interruptEx = reason; @@ -560,11 +632,11 @@ private IgniteInternalFuture prepare(SnapshotO SnapshotRestoreContext opCtx0 = prepareContext(req); synchronized (this) { - opCtx = opCtx0; + lastOpCtx = opCtx = opCtx0; ClusterSnapshotFuture fut0 = fut; - if (fut0 != null) + if (!fut0.isDone() && fut0.interruptEx != null) opCtx0.errHnd.accept(fut0.interruptEx); } @@ -1104,7 +1176,7 @@ private void finishCacheStart(UUID reqId, Map res, Map affCache = new ConcurrentHashMap<>(); + /** Operation start time. */ + private final long startTime; + + /** Names of the restored cache groups. */ + private final String cacheGrpNames; + + /** Number of processed (copied) partitions. */ + private final AtomicLong processedParts = new AtomicLong(); + + /** Size of processed (copied) partitions in bytes. */ + private final AtomicLong processedBytes = new AtomicLong(); + + /** Total number of partitions to be restored. */ + private volatile long totalParts; + + /** Total size of the partitions to be restored in bytes. */ + private volatile long totalBytes; + /** Cache ID to configuration mapping. */ private volatile Map cfgs; /** Graceful shutdown future. */ private volatile IgniteFuture stopFut; + /** Operation end time. */ + private volatile long endTime; + /** * @param req Request to prepare cache group restore from the snapshot. * @param cfgs Cache ID to configuration mapping. @@ -1409,6 +1502,24 @@ protected SnapshotRestoreContext( this.cfgs = cfgs; metasPerNode.computeIfAbsent(locNodeId, id -> new ArrayList<>()).addAll(locMetas); + + startTime = 0; + + cacheGrpNames = F.concat(F.viewReadOnly(dirs, FilePageStoreManager::cacheGroupName), ","); + } + + /** + * Default constructor. + */ + protected SnapshotRestoreContext() { + reqId = opNodeId = null; +// dirs = null; +// opNodeId = null; +// nodes = null; + snpName = cacheGrpNames = ""; + startTime = 0; + + discoCache = null; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusDetails.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusDetails.java new file mode 100644 index 0000000000000..95d455f77259b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusDetails.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.Serializable; +import java.util.UUID; + +/** + * Snapshot restore operation details. + */ +public class SnapshotRestoreStatusDetails implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Message of the exception that led to an interruption of the process. */ + private String errMsg; + + /** Request ID. */ + private UUID reqId; + + /** Operation start time. */ + private long startTime; + + /** Operation end time. */ + private long endTime; + + /** Names of the restored cache groups. */ + private String cacheGrpNames; + + /** Number of processed (copied) partitions. */ + private long processedParts; + + /** Total number of partitions to be restored. */ + private long totalParts; + + /** Size of processed (copied) partitions in bytes. */ + private long processedBytes; + + /** Total size of the partitions to be restored in bytes. */ + private long totalBytes; + + /** Default constructor. */ + public SnapshotRestoreStatusDetails() { + // No-op. + } + + /** + * @param reqId Request ID. + * @param startTime Operation start time. + * @param endTime Operation end time. + * @param errMsg Message of the exception that led to an interruption of the process. + * @param processedParts Number of processed (copied) partitions. + * @param processedBytes Size of processed (copied) partitions in bytes. + * @param cacheGrpNames Names of the restored cache groups. + * @param totalParts Total number of partitions to be restored. + * @param totalBytes Total size of the partitions to be restored in bytes. + */ + public SnapshotRestoreStatusDetails(UUID reqId, long startTime, long endTime, String errMsg, + long processedParts, long processedBytes, String cacheGrpNames, long totalParts, long totalBytes) { + this.reqId = reqId; + this.errMsg = errMsg; + this.startTime = startTime; + this.cacheGrpNames = cacheGrpNames; + this.processedParts = processedParts; + this.processedBytes = processedBytes; + this.totalParts = totalParts; + this.totalBytes = totalBytes; + this.endTime = endTime; + } + + /** @return Message of the exception that led to an interruption of the process. */ + public String errorMessage() { + return errMsg; + } + + /** @return Request ID. */ + public UUID requestId() { + return reqId; + } + + /** @return Operation start time. */ + public long startTime() { + return startTime; + } + + /** @return Operation end time. */ + public long endTime() { + return endTime; + } + + /** @return Names of the restored cache groups. */ + public String cacheGroupNames() { + return cacheGrpNames; + } + + /** @return Number of processed (copied) partitions. */ + public long processedParts() { + return processedParts; + } + + /** @return Size of processed (copied) partitions in bytes. */ + public long processedBytes() { + return processedBytes; + } + + /** @return Total number of partitions to be restored. */ + public long totalParts() { + return totalParts; + } + + /** @return Total size of the partitions to be restored in bytes. */ + public long totalBytes() { + return totalBytes; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusTask.java index e988042b965d5..e617485cc90c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusTask.java @@ -17,31 +17,59 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobAdapter; +import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.resources.IgniteInstanceResource; /** * Snapshot restore status task. */ @GridInternal -class SnapshotRestoreStatusTask extends SnapshotRestoreManagementTask { +class SnapshotRestoreStatusTask extends SnapshotRestoreManagementTask> { /** Serial version uid. */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ @Override protected ComputeJob makeJob(String snpName) { return new ComputeJobAdapter() { - /** Auto-injected grid instance. */ @IgniteInstanceResource private transient IgniteEx ignite; - @Override public Boolean execute() throws IgniteException { - return ignite.context().cache().context().snapshotMgr().isRestoring(snpName); + @Override public SnapshotRestoreStatusDetails execute() throws IgniteException { + return ignite.context().cache().context().snapshotMgr().localRestoreStatus(snpName); } }; } + + /** {@inheritDoc} */ + @Override public Map reduce(List results) throws IgniteException { + Map> reqMap = new HashMap<>(); + T2 oldestUUID = new T2<>(0L, null); + + for (ComputeJobResult r : results) { + if (r.getException() != null) + throw new IgniteException("Failed to execute job [nodeId=" + r.getNode().id() + ']', r.getException()); + + SnapshotRestoreStatusDetails details = r.getData(); + + if (details == null) + continue; + + if (oldestUUID.get1() < details.startTime()) + oldestUUID.set(details.startTime(), details.requestId()); + + reqMap.computeIfAbsent(details.requestId(), v -> new HashMap<>()).put(r.getNode().id(), details); + } + + return reqMap.isEmpty() ? null : reqMap.get(oldestUUID.get2()); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTask.java index 8f4ab226e189b..b9181cb007715 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTask.java @@ -17,9 +17,16 @@ package org.apache.ignite.internal.visor.snapshot; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; +import java.util.Map; +import java.util.UUID; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStatusDetails; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorOneNodeTask; import org.apache.ignite.lang.IgniteFuture; @@ -112,10 +119,99 @@ protected VisorSnapshotRestoreStatusJob(VisorSnapshotRestoreTaskArg arg, boolean /** {@inheritDoc} */ @Override protected String run(VisorSnapshotRestoreTaskArg arg) throws IgniteException { - boolean state = ignite.context().cache().context().snapshotMgr().restoreStatus(arg.snapshotName()).get(); + Map nodesStatus = + ignite.context().cache().context().snapshotMgr().clusterRestoreStatus(arg.snapshotName()).get(); - return "Snapshot cache group restore operation is " + (state ? "" : "NOT ") + - "running [snapshot=" + arg.snapshotName() + ']'; + long clusterProcParts = 0; + long clusterProcBytes = 0; + long globalStartTime = 0; + long globalEndTime = 0; + long clusterParts = 0; + long clusterBytes = 0; + String errMsg = null; + String grps = null; + UUID reqId = null; + + if (nodesStatus == null) + return "No information about restoring snapshot \"" + arg.snapshotName() + "\" is available."; + + SB buf = new SB(); + + for (Map.Entry e : nodesStatus.entrySet()) { + SnapshotRestoreStatusDetails details = e.getValue(); + + if (globalStartTime == 0 || globalStartTime > details.startTime()) { + globalStartTime = details.startTime(); + errMsg = details.errorMessage(); + globalEndTime = details.endTime(); + reqId = details.requestId(); + } + + if (grps == null && !F.isEmpty(details.cacheGroupNames())) + grps = details.cacheGroupNames(); + + long procParts = details.processedParts(); + long totalParts = details.totalParts(); + long procBytes = details.processedBytes(); + long totalBytes = details.totalBytes(); + + clusterParts += totalParts; + clusterBytes += totalBytes; + clusterProcParts += procParts; + clusterProcBytes += procBytes; + + if (totalBytes == 0) + continue; + + buf.a(" Node ").a(e.getKey()).a(": ").a(fprmatProgress(procParts, totalParts, procBytes, totalBytes)); + } + + boolean err = errMsg != null; + String nodesInfo = buf.toString(); + + buf.setLength(0); + + buf.a("Restore operation for snapshot \"").a(arg.snapshotName()).a("\" ") + .a(err ? "failed" : (globalEndTime == 0 ? " is still in progress" : "completed successfully")) + .a(" (requestId=").a(reqId).a(").").a(System.lineSeparator()).a(System.lineSeparator()); + + if (err) + buf.a(" Error: ").a(errMsg).a(System.lineSeparator()); + else if (clusterBytes != 0) + buf.a(" Progress: ").a(fprmatProgress(clusterProcParts, clusterParts, clusterProcBytes, clusterBytes)); + + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + + buf.a(" Started: ").a(dateFormat.format(new Date(globalStartTime))).a(System.lineSeparator()); + + if (globalEndTime != 0) + buf.a(" Finished: ").a(dateFormat.format(new Date(globalEndTime))).a(System.lineSeparator()); + + if (grps != null) + buf.a(" Cache groups: ").a(grps).a(System.lineSeparator()); + + buf.a(System.lineSeparator()).a(nodesInfo); + + return buf.toString(); + } + + /** + * @param procParts Processed partitions. + * @param totalParts Total partitions. + * @param procBytes Size in bytes of processed partitions. + * @param totalBytes Total partitions size. + */ + private String fprmatProgress(long procParts, long totalParts, long procBytes, long totalBytes) { + long base = 1024L; + + int exponent = Math.max((int)(Math.log(totalBytes) / Math.log(base)), 0); + String unit = String.valueOf(" KMGTPE".charAt(exponent)).trim(); + double baseBound = Math.pow(base, exponent); + + return new SB().a(procBytes * 100 / totalBytes).a("% completed (") + .a(procParts).a('/').a(totalParts).a(" partitions, ") + .a(String.format((Locale)null, "%.1f/%.1f %sB)", procBytes / baseBound, totalBytes / baseBound, unit)) + .a(System.lineSeparator()).toString(); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java index e6b425835503d..fcb7a399c9878 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java @@ -17,6 +17,7 @@ package org.apache.ignite.mxbean; +import java.util.Collection; import org.apache.ignite.IgniteSnapshot; /** @@ -40,4 +41,28 @@ public interface SnapshotMXBean { */ @MXBeanDescription("Cancel started cluster-wide snapshot on the node initiator.") public void cancelSnapshot(@MXBeanParameter(name = "snpName", description = "Snapshot name.") String snpName); + + /** + * Restore cluster-wide snapshot. + * + * @param name Snapshot name. + * @param cacheGroupNames Optional comma-separated list of cache group names. + * @see IgniteSnapshot#restoreSnapshot(String, Collection) + */ + @MXBeanDescription("Restore cluster-wide snapshot.") + public void restoreSnapshot( + @MXBeanParameter(name = "snpName", description = "Snapshot name.") + String name, + @MXBeanParameter(name = "cacheGroupNames", description = "Optional comma-separated list of cache group names.") + String cacheGroupNames + ); + + /** + * Cancel previously started snapshot restore operation. + * + * @param name Snapshot name. + * @see IgniteSnapshot#cancelSnapshotRestore(String) + */ + @MXBeanDescription("Cancel previously started snapshot restore operation.") + public void cancelSnapshotRestore(@MXBeanParameter(name = "snpName", description = "Snapshot name.") String name); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java index c662a5a9ed7d5..b44076ad4d1c6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java @@ -17,19 +17,38 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.management.AttributeNotFoundException; import javax.management.DynamicMBean; import javax.management.MBeanException; import javax.management.ReflectionException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.util.distributed.SingleNodeMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.mxbean.SnapshotMXBean; import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS; +import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.SNAPSHOT_RESTORE_METRICS; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** * Tests {@link SnapshotMXBean}. @@ -56,7 +75,7 @@ public void testCreateSnapshot() throws Exception { mxBean.createSnapshot(SNAPSHOT_NAME); assertTrue("Waiting for snapshot operation failed.", - GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, 10_000)); + GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, TIMEOUT)); stopAllGrids(); @@ -81,8 +100,121 @@ public void testCancelSnapshot() throws Exception { mxBean::cancelSnapshot); } - /** + /** @throws Exception If fails. */ + @Test + public void testRestoreSnapshot() throws Exception { + CacheConfiguration ccfg1 = new CacheConfiguration<>(dfltCacheCfg).setBackups(1).setName("cache1"); + CacheConfiguration ccfg2 = new CacheConfiguration<>(ccfg1).setName("cache2"); + + IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, Integer::new, ccfg1, ccfg2); + + DynamicMBean snpMBean = metricRegistry(ignite.name(), null, SNAPSHOT_METRICS); + + assertEquals("Snapshot end time must be undefined on first snapshot operation starts.", + 0, getLastSnapshotEndTime(snpMBean)); + + SnapshotMXBean mxBean = getMxBean(ignite.name(), "Snapshot", SnapshotMXBeanImpl.class, SnapshotMXBean.class); + + mxBean.createSnapshot(SNAPSHOT_NAME); + + assertTrue(GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, TIMEOUT)); + + ignite.destroyCaches(Arrays.asList(ccfg1.getName(), ccfg2.getName())); + + awaitPartitionMapExchange(); + + DynamicMBean restoreMBean = metricRegistry(ignite.name(), null, SNAPSHOT_RESTORE_METRICS); + + assertEquals(0, getLongMetric("endTime", restoreMBean)); + assertEquals(0, getLongMetric("totalPartitions", restoreMBean)); + assertEquals(0, getLongMetric("processedPartitions", restoreMBean)); + assertEquals(0, getLongMetric("totalPartitionsSize", restoreMBean)); + assertEquals(0, getLongMetric("processedPartitionsSize", restoreMBean)); + assertTrue(String.valueOf(restoreMBean.getAttribute("snapshotName")).isEmpty()); + assertTrue(String.valueOf(restoreMBean.getAttribute("cacheGroupNames")).isEmpty()); + + Set grpNames = new HashSet<>(F.asList(ccfg1.getName(), ccfg2.getName())); + + mxBean.restoreSnapshot(SNAPSHOT_NAME, F.concat(grpNames, " ,")); + + assertTrue(GridTestUtils.waitForCondition(() -> getLongMetric("endTime", restoreMBean) > 0, TIMEOUT)); + + int expPartCnt = grid(0).cachex(ccfg1.getName()).context().affinity().partitions() + + grid(0).cachex(ccfg2.getName()).context().affinity().partitions(); + + for (Ignite grid : G.allGrids()) { + DynamicMBean mReg = metricRegistry(grid.name(), null, SNAPSHOT_RESTORE_METRICS); + + assertEquals(expPartCnt, getLongMetric("totalPartitions", mReg)); + assertEquals(expPartCnt, getLongMetric("processedPartitions", mReg)); + + long totalPartsSize = getLongMetric("totalPartitionsSize", mReg); + + assertTrue(totalPartsSize > 0); + assertEquals(totalPartsSize, getLongMetric("processedPartitionsSize", mReg)); + + assertEquals(SNAPSHOT_NAME, restoreMBean.getAttribute("snapshotName")); + + String cacheGrpNames = (String)restoreMBean.getAttribute("cacheGroupNames"); + + assertNotNull(cacheGrpNames); + + assertEquals(grpNames, Arrays.stream(cacheGrpNames.split(",")).collect(Collectors.toSet())); + } + + assertSnapshotCacheKeys(ignite.cache(ccfg1.getName())); + assertSnapshotCacheKeys(ignite.cache(ccfg2.getName())); + } + + /** @throws Exception If fails. */ + @Test + public void testCancelRestoreSnapshot() throws Exception { + IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE); + + DynamicMBean snpMBean = metricRegistry(ignite.name(), null, SNAPSHOT_METRICS); + + assertEquals("Snapshot end time must be undefined on first snapshot operation starts.", + 0, getLastSnapshotEndTime(snpMBean)); + + SnapshotMXBean mxBean = getMxBean(ignite.name(), "Snapshot", SnapshotMXBeanImpl.class, SnapshotMXBean.class); + + mxBean.createSnapshot(SNAPSHOT_NAME); + + assertTrue(GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, TIMEOUT)); + + ignite.destroyCache(dfltCacheCfg.getName()); + + awaitPartitionMapExchange(); + + TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(1)); + + spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage && + ((SingleNodeMessage)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal()); + + IgniteFuture fut = ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null); + + spi.waitForBlocked(); + + IgniteInternalFuture interruptFut = GridTestUtils.runAsync(() -> { + // Wait for the process to be interrupted. + AtomicReference errRef = U.field((Object)U.field((Object)U.field( + grid(0).context().cache().context().snapshotMgr(), "restoreCacheGrpProc"), "opCtx"), "err"); + boolean interrupted = waitForCondition(() -> errRef.get() != null, getTestTimeout()); + + spi.stopBlock(); + + return interrupted; + }); + + mxBean.cancelSnapshotRestore(SNAPSHOT_NAME); + + assertTrue(interruptFut.get()); + + assertThrowsAnyCause(log, fut::get, IgniteCheckedException.class, "Operation has been canceled by the user"); + } + + /** * @param mBean Ignite snapshot MBean. * @return Value of snapshot end time. */ @@ -94,4 +226,19 @@ private static long getLastSnapshotEndTime(DynamicMBean mBean) { throw new RuntimeException(e); } } + + /** + * @param mBean Ignite snapshot restore MBean. + * @param name Metric name. + * @return Metric value. + */ + private static long getLongMetric(String name, DynamicMBean mBean) { + try { + return (long)mBean.getAttribute(name); + } + catch (MBeanException | ReflectionException | AttributeNotFoundException e) { + throw new RuntimeException(e); + } + } + } From 7a07c920e0eeb61e1e5b7c80badaf6af6e6aec58 Mon Sep 17 00:00:00 2001 From: Pavel Pereslegin Date: Wed, 15 Dec 2021 19:43:41 +0300 Subject: [PATCH 2/3] IGNITE-14794 Removed status command extension. --- .../ignite/util/GridCommandHandlerTest.java | 30 ++---- .../snapshot/IgniteSnapshotManager.java | 36 +++---- .../snapshot/SnapshotRestoreCancelTask.java | 18 +--- .../SnapshotRestoreManagementTask.java | 18 +++- .../snapshot/SnapshotRestoreProcess.java | 88 ++++----------- .../snapshot/SnapshotRestoreStatusTask.java | 36 +------ .../snapshot/VisorSnapshotRestoreTask.java | 102 +----------------- .../snapshot/AbstractSnapshotSelfTest.java | 2 +- .../snapshot/IgniteSnapshotMXBeanTest.java | 16 +-- 9 files changed, 70 insertions(+), 276 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java index 62f3b4b132a2b..b3e5a35d08547 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; import java.util.function.UnaryOperator; @@ -90,7 +91,6 @@ import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.db.IgniteCacheGroupsWithRestartsTest; import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.dumpprocessors.ToFileDumpProcessor; -import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStatusDetails; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; @@ -3221,27 +3221,23 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception { assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status")); assertContains(log, testOut.toString(), - "Restore operation for snapshot \"" + snpName + "\" is still in progress"); + "Snapshot cache group restore operation is running [snapshot=" + snpName + ']'); // Check wrong snapshot name. assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", missingSnpName, "--status")); assertContains(log, testOut.toString(), - "No information about restoring snapshot \"" + missingSnpName + "\" is available."); + "Snapshot cache group restore operation is NOT running [snapshot=" + missingSnpName + ']'); assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", missingSnpName, "--cancel")); assertContains(log, testOut.toString(), "Snapshot cache group restore operation is not in progress [snapshot=" + missingSnpName + ']'); - IgniteInternalFuture fut = runAsync(() -> { + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { // Wait for the process to be interrupted. - boolean canceled = waitForCondition(() -> { - SnapshotRestoreStatusDetails status = - grid(0).context().cache().context().snapshotMgr().localRestoreStatus(snpName); + AtomicReference errRef = U.field((Object)U.field((Object)U.field( + grid(0).context().cache().context().snapshotMgr(), "restoreCacheGrpProc"), "opCtx"), "err"); - return status != null && status.errorMessage() != null; - }, getTestTimeout()); - - assertTrue(canceled); + waitForCondition(() -> errRef.get() != null, getTestTimeout()); spi.stopBlock(); @@ -3256,17 +3252,7 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception { assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status")); assertContains(log, testOut.toString(), - "Error: Operation has been canceled by the user."); - - ig.snapshot().restoreSnapshot(snpName, null).get(getTestTimeout()); - - assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status")); - - String out = testOut.toString(); - - assertContains(log, out, "Restore operation for snapshot \"" + snpName + "\" completed successfully"); - assertContains(log, out, "Cache groups: " + DEFAULT_CACHE_NAME); - assertContains(log, out, "Progress: 100% completed"); + "Snapshot cache group restore operation is NOT running [snapshot=" + snpName + ']'); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 5f82342b400bb..7a57c8520573d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -937,6 +937,16 @@ public boolean isRestoring() { return restoreCacheGrpProc.restoringSnapshotName() != null; } + /** + * Check if snapshot restore process is currently running. + * + * @param snpName Snapshot name. + * @return {@code True} if the snapshot restore operation from the specified snapshot is in progress locally. + */ + public boolean isRestoring(String snpName) { + return snpName.equals(restoreCacheGrpProc.restoringSnapshotName()); + } + /** * Check if the cache or group with the specified name is currently being restored from the snapshot. * @@ -948,27 +958,17 @@ public boolean isRestoring(CacheConfiguration ccfg) { } /** - * Get the status of a cluster-wide restore operation. + * Status of the restore operation cluster-wide. * * @param snpName Snapshot name. - * @return Future that will be completed when the status of the restore operation is received from all server nodes. - * The result of this future is the node ids mapping with restore operation state. + * @return Future that will be completed when the status of the restore operation is received from all the server + * nodes. The result of this future will be {@code false} if the restore process with the specified snapshot name is + * not running on all nodes. */ - public IgniteFuture> clusterRestoreStatus(String snpName) { + public IgniteFuture restoreStatus(String snpName) { return executeRestoreManagementTask(SnapshotRestoreStatusTask.class, snpName); } - /** - * Get the status of the last local snapshot restore operation. - * - * @param snpName Snapshot name. - * @return Status of the last local snapshot restore operation, {@code null} if the snapshot name of the last - * started operation differs from the specified one. - */ - public @Nullable SnapshotRestoreStatusDetails localRestoreStatus(String snpName) { - return restoreCacheGrpProc.status(snpName); - } - /** * @param restoreId Restore process ID. * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when @@ -1863,11 +1863,9 @@ static void copy(FileIOFactory factory, File from, File to, long length) { /** * @param taskCls Snapshot restore operation management task class. * @param snpName Snapshot name. - * @param Type of the task result returning from {@link SnapshotRestoreManagementTask#reduce(List)} method. - * @return Task future. */ - private IgniteFuture executeRestoreManagementTask( - Class> taskCls, + private IgniteFuture executeRestoreManagementTask( + Class> taskCls, String snpName ) { cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreCancelTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreCancelTask.java index 6e2547778ca71..10cc8d43b8907 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreCancelTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreCancelTask.java @@ -17,11 +17,9 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; -import java.util.List; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobAdapter; -import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.resources.IgniteInstanceResource; @@ -30,7 +28,7 @@ * Snapshot restore cancel task. */ @GridInternal -class SnapshotRestoreCancelTask extends SnapshotRestoreManagementTask { +class SnapshotRestoreCancelTask extends SnapshotRestoreManagementTask { /** Serial version uid. */ private static final long serialVersionUID = 0L; @@ -46,18 +44,4 @@ class SnapshotRestoreCancelTask extends SnapshotRestoreManagementTask { } }; } - - /** {@inheritDoc} */ - @Override public Boolean reduce(List results) throws IgniteException { - boolean ret = false; - - for (ComputeJobResult r : results) { - if (r.getException() != null) - throw new IgniteException("Failed to execute job [nodeId=" + r.getNode().id() + ']', r.getException()); - - ret |= Boolean.TRUE.equals(r.getData()); - } - - return ret; - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java index 6f21cead173d4..a4044d9eee6ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreManagementTask.java @@ -31,8 +31,8 @@ /** * Snapshot restore management task. */ -abstract class SnapshotRestoreManagementTask extends ComputeTaskAdapter { - /** +abstract class SnapshotRestoreManagementTask extends ComputeTaskAdapter { + /** * @param param Compute job argument. * @return Compute job. */ @@ -51,6 +51,20 @@ abstract class SnapshotRestoreManagementTask extends ComputeTaskAdapter results) throws IgniteException { + boolean ret = false; + + for (ComputeJobResult r : results) { + if (r.getException() != null) + throw new IgniteException("Failed to execute job [nodeId=" + r.getNode().id() + ']', r.getException()); + + ret |= Boolean.TRUE.equals(r.getData()); + } + + return ret; + } + /** {@inheritDoc} */ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List rcvd) throws IgniteException { // Handle all exceptions during the `reduce` operation. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index 2f31c692801e1..9dbba1252ef50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -196,16 +196,10 @@ protected void registerMetrics() { "The system time when the restore operation of a cluster snapshot on this node ended."); mreg.register("snapshotName", () -> lastOpCtx.snpName, String.class, "The snapshot name of the last running cluster snapshot restore operation on this node."); - mreg.register("cacheGroupNames", () -> lastOpCtx.cacheGrpNames, String.class, - "The names of the cache groups that are being restored from the snapshot."); mreg.register("totalPartitions", () -> lastOpCtx.totalParts, "The total number of partitions to be restored on this node."); mreg.register("processedPartitions", () -> lastOpCtx.processedParts.get(), "The number of processed partitions on this node."); - mreg.register("totalPartitionsSize", () -> lastOpCtx.totalBytes, - "The total size of the partitions to be restored on this node."); - mreg.register("processedPartitionsSize", () -> lastOpCtx.processedBytes.get(), - "The total size of processed partitions on this node."); } /** @@ -405,49 +399,6 @@ else if (CU.cacheId(locGrpName) == cacheId) return false; } - /** - * Get the status of the last local snapshot restore operation. - * - * @param snpName Snapshot name. - * @return Status details. - */ - public @Nullable SnapshotRestoreStatusDetails status(String snpName) { - SnapshotRestoreContext opCtx = lastOpCtx; - ClusterSnapshotFuture fut0 = fut; - - // Future is created only on node initiator, context is created on all nodes, but later. - boolean futValid = snpName.equals(fut0.name); - boolean ctxValid = snpName.equals(opCtx.snpName); - - if (!ctxValid && !futValid) - return null; - - if (ctxValid && futValid && !fut0.rqId.equals(opCtx.reqId)) { - // If the request ID does not match, we must read the metrics of the later operation. - if (fut0.startTime <= opCtx.startTime) - futValid = false; - else - ctxValid = false; - } - - long startTime = futValid ? fut0.startTime : opCtx.startTime; - long endTime = futValid ? fut0.endTime : opCtx.endTime; - UUID reqId = futValid ? fut0.rqId : opCtx.reqId; - Throwable err = ctxValid ? opCtx.err.get() : fut0.interruptEx; - - return new SnapshotRestoreStatusDetails( - reqId, - startTime, - endTime, - err == null ? null : err.getMessage(), - ctxValid ? opCtx.processedParts.get() : 0, - ctxValid ? opCtx.processedBytes.get() : 0, - ctxValid ? opCtx.cacheGrpNames : null, - ctxValid ? opCtx.totalParts : 0, - ctxValid ? opCtx.totalBytes : 0 - ); - } - /** * @param reqId Request ID. * @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when @@ -909,6 +860,7 @@ private IgniteInternalFuture preload(UUID reqId) { Map> rmtLoadParts = new HashMap<>(); ClusterNode locNode = ctx.cache().context().localNode(); + long totalParts = 0; for (File dir : opCtx0.dirs) { String cacheOrGrpName = cacheGroupName(dir); @@ -927,6 +879,8 @@ private IgniteInternalFuture preload(UUID reqId) { if (leftParts.isEmpty()) continue; + totalParts += leftParts.size(); + SnapshotMetadata full = findMetadataWithSamePartitions(locMetas, grpId, leftParts.stream().map(p -> p.partId).collect(Collectors.toSet())); @@ -970,6 +924,8 @@ private IgniteInternalFuture preload(UUID reqId) { opCtx0.locProgress.computeIfAbsent(grpId, g -> new HashSet<>()) .add(idxFut = new PartitionRestoreFuture(INDEX_PARTITION)); + totalParts += 1; + copyLocalAsync(ctx.cache().context().snapshotMgr(), opCtx, snpCacheDir, @@ -980,6 +936,8 @@ private IgniteInternalFuture preload(UUID reqId) { } } + opCtx0.totalParts = totalParts; + // Load other partitions from remote nodes. List rmtAwaitParts = rmtLoadParts.values().stream() .flatMap(Collection::stream) @@ -1040,6 +998,8 @@ private IgniteInternalFuture preload(UUID reqId) { partFile.toFile(), snpFile.length()); + opCtx0.processedParts.incrementAndGet(); + partFut.complete(partFile); } catch (Exception e) { @@ -1362,6 +1322,8 @@ private static void copyLocalAsync( IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile, partFile.toFile(), snpFile.length()); + opCtx.processedParts.incrementAndGet(); + return partFile; }, mgr.snapshotExecutorService()) .whenComplete((r, t) -> opCtx.errHnd.accept(t)) @@ -1419,7 +1381,7 @@ private static class SnapshotRestoreContext { /** Snapshot name. */ private final String snpName; - /** Baseline discovery cache for node IDs that must be alive to complete the operation.*/ + /** Baseline discovery cache for node IDs that must be alive to complete the operation. */ private final DiscoCache discoCache; /** Operational node id. */ @@ -1459,20 +1421,11 @@ private static class SnapshotRestoreContext { /** Operation start time. */ private final long startTime; - /** Names of the restored cache groups. */ - private final String cacheGrpNames; - /** Number of processed (copied) partitions. */ - private final AtomicLong processedParts = new AtomicLong(); - - /** Size of processed (copied) partitions in bytes. */ - private final AtomicLong processedBytes = new AtomicLong(); + private final AtomicLong processedParts = new AtomicLong(0); /** Total number of partitions to be restored. */ - private volatile long totalParts; - - /** Total size of the partitions to be restored in bytes. */ - private volatile long totalBytes; + private volatile long totalParts = -1; /** Cache ID to configuration mapping. */ private volatile Map cfgs; @@ -1485,7 +1438,10 @@ private static class SnapshotRestoreContext { /** * @param req Request to prepare cache group restore from the snapshot. + * @param discoCache Baseline discovery cache for node IDs that must be alive to complete the operation. * @param cfgs Cache ID to configuration mapping. + * @param locNodeId Local node ID. + * @param locMetas List of snapshot metadata. */ protected SnapshotRestoreContext( SnapshotOperationRequest req, @@ -1504,8 +1460,6 @@ protected SnapshotRestoreContext( metasPerNode.computeIfAbsent(locNodeId, id -> new ArrayList<>()).addAll(locMetas); startTime = 0; - - cacheGrpNames = F.concat(F.viewReadOnly(dirs, FilePageStoreManager::cacheGroupName), ","); } /** @@ -1513,13 +1467,9 @@ protected SnapshotRestoreContext( */ protected SnapshotRestoreContext() { reqId = opNodeId = null; -// dirs = null; -// opNodeId = null; -// nodes = null; - snpName = cacheGrpNames = ""; - startTime = 0; - discoCache = null; + snpName = ""; + startTime = 0; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusTask.java index e617485cc90c9..e988042b965d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusTask.java @@ -17,59 +17,31 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobAdapter; -import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.task.GridInternal; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.resources.IgniteInstanceResource; /** * Snapshot restore status task. */ @GridInternal -class SnapshotRestoreStatusTask extends SnapshotRestoreManagementTask> { +class SnapshotRestoreStatusTask extends SnapshotRestoreManagementTask { /** Serial version uid. */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ @Override protected ComputeJob makeJob(String snpName) { return new ComputeJobAdapter() { + /** Auto-injected grid instance. */ @IgniteInstanceResource private transient IgniteEx ignite; - @Override public SnapshotRestoreStatusDetails execute() throws IgniteException { - return ignite.context().cache().context().snapshotMgr().localRestoreStatus(snpName); + @Override public Boolean execute() throws IgniteException { + return ignite.context().cache().context().snapshotMgr().isRestoring(snpName); } }; } - - /** {@inheritDoc} */ - @Override public Map reduce(List results) throws IgniteException { - Map> reqMap = new HashMap<>(); - T2 oldestUUID = new T2<>(0L, null); - - for (ComputeJobResult r : results) { - if (r.getException() != null) - throw new IgniteException("Failed to execute job [nodeId=" + r.getNode().id() + ']', r.getException()); - - SnapshotRestoreStatusDetails details = r.getData(); - - if (details == null) - continue; - - if (oldestUUID.get1() < details.startTime()) - oldestUUID.set(details.startTime(), details.requestId()); - - reqMap.computeIfAbsent(details.requestId(), v -> new HashMap<>()).put(r.getNode().id(), details); - } - - return reqMap.isEmpty() ? null : reqMap.get(oldestUUID.get2()); - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTask.java index b9181cb007715..8f4ab226e189b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTask.java @@ -17,16 +17,9 @@ package org.apache.ignite.internal.visor.snapshot; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Locale; -import java.util.Map; -import java.util.UUID; import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStatusDetails; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorOneNodeTask; import org.apache.ignite.lang.IgniteFuture; @@ -119,99 +112,10 @@ protected VisorSnapshotRestoreStatusJob(VisorSnapshotRestoreTaskArg arg, boolean /** {@inheritDoc} */ @Override protected String run(VisorSnapshotRestoreTaskArg arg) throws IgniteException { - Map nodesStatus = - ignite.context().cache().context().snapshotMgr().clusterRestoreStatus(arg.snapshotName()).get(); + boolean state = ignite.context().cache().context().snapshotMgr().restoreStatus(arg.snapshotName()).get(); - long clusterProcParts = 0; - long clusterProcBytes = 0; - long globalStartTime = 0; - long globalEndTime = 0; - long clusterParts = 0; - long clusterBytes = 0; - String errMsg = null; - String grps = null; - UUID reqId = null; - - if (nodesStatus == null) - return "No information about restoring snapshot \"" + arg.snapshotName() + "\" is available."; - - SB buf = new SB(); - - for (Map.Entry e : nodesStatus.entrySet()) { - SnapshotRestoreStatusDetails details = e.getValue(); - - if (globalStartTime == 0 || globalStartTime > details.startTime()) { - globalStartTime = details.startTime(); - errMsg = details.errorMessage(); - globalEndTime = details.endTime(); - reqId = details.requestId(); - } - - if (grps == null && !F.isEmpty(details.cacheGroupNames())) - grps = details.cacheGroupNames(); - - long procParts = details.processedParts(); - long totalParts = details.totalParts(); - long procBytes = details.processedBytes(); - long totalBytes = details.totalBytes(); - - clusterParts += totalParts; - clusterBytes += totalBytes; - clusterProcParts += procParts; - clusterProcBytes += procBytes; - - if (totalBytes == 0) - continue; - - buf.a(" Node ").a(e.getKey()).a(": ").a(fprmatProgress(procParts, totalParts, procBytes, totalBytes)); - } - - boolean err = errMsg != null; - String nodesInfo = buf.toString(); - - buf.setLength(0); - - buf.a("Restore operation for snapshot \"").a(arg.snapshotName()).a("\" ") - .a(err ? "failed" : (globalEndTime == 0 ? " is still in progress" : "completed successfully")) - .a(" (requestId=").a(reqId).a(").").a(System.lineSeparator()).a(System.lineSeparator()); - - if (err) - buf.a(" Error: ").a(errMsg).a(System.lineSeparator()); - else if (clusterBytes != 0) - buf.a(" Progress: ").a(fprmatProgress(clusterProcParts, clusterParts, clusterProcBytes, clusterBytes)); - - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - - buf.a(" Started: ").a(dateFormat.format(new Date(globalStartTime))).a(System.lineSeparator()); - - if (globalEndTime != 0) - buf.a(" Finished: ").a(dateFormat.format(new Date(globalEndTime))).a(System.lineSeparator()); - - if (grps != null) - buf.a(" Cache groups: ").a(grps).a(System.lineSeparator()); - - buf.a(System.lineSeparator()).a(nodesInfo); - - return buf.toString(); - } - - /** - * @param procParts Processed partitions. - * @param totalParts Total partitions. - * @param procBytes Size in bytes of processed partitions. - * @param totalBytes Total partitions size. - */ - private String fprmatProgress(long procParts, long totalParts, long procBytes, long totalBytes) { - long base = 1024L; - - int exponent = Math.max((int)(Math.log(totalBytes) / Math.log(base)), 0); - String unit = String.valueOf(" KMGTPE".charAt(exponent)).trim(); - double baseBound = Math.pow(base, exponent); - - return new SB().a(procBytes * 100 / totalBytes).a("% completed (") - .a(procParts).a('/').a(totalParts).a(" partitions, ") - .a(String.format((Locale)null, "%.1f/%.1f %sB)", procBytes / baseBound, totalBytes / baseBound, unit)) - .a(System.lineSeparator()).toString(); + return "Snapshot cache group restore operation is " + (state ? "" : "NOT ") + + "running [snapshot=" + arg.snapshotName() + ']'; } } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java index aec06469a4fc8..66c269d9e5d48 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java @@ -151,7 +151,7 @@ protected Function valueBuilder() { /** Parameters. */ @Parameterized.Parameters(name = "Encryption={0}") public static Iterable encryptionParams() { - return Arrays.asList(false, true); + return Arrays.asList(false); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java index b44076ad4d1c6..87dee8d8602ca 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java @@ -126,12 +126,9 @@ public void testRestoreSnapshot() throws Exception { DynamicMBean restoreMBean = metricRegistry(ignite.name(), null, SNAPSHOT_RESTORE_METRICS); assertEquals(0, getLongMetric("endTime", restoreMBean)); - assertEquals(0, getLongMetric("totalPartitions", restoreMBean)); + assertEquals(-1, getLongMetric("totalPartitions", restoreMBean)); assertEquals(0, getLongMetric("processedPartitions", restoreMBean)); - assertEquals(0, getLongMetric("totalPartitionsSize", restoreMBean)); - assertEquals(0, getLongMetric("processedPartitionsSize", restoreMBean)); assertTrue(String.valueOf(restoreMBean.getAttribute("snapshotName")).isEmpty()); - assertTrue(String.valueOf(restoreMBean.getAttribute("cacheGroupNames")).isEmpty()); Set grpNames = new HashSet<>(F.asList(ccfg1.getName(), ccfg2.getName())); @@ -148,18 +145,7 @@ public void testRestoreSnapshot() throws Exception { assertEquals(expPartCnt, getLongMetric("totalPartitions", mReg)); assertEquals(expPartCnt, getLongMetric("processedPartitions", mReg)); - long totalPartsSize = getLongMetric("totalPartitionsSize", mReg); - - assertTrue(totalPartsSize > 0); - assertEquals(totalPartsSize, getLongMetric("processedPartitionsSize", mReg)); - assertEquals(SNAPSHOT_NAME, restoreMBean.getAttribute("snapshotName")); - - String cacheGrpNames = (String)restoreMBean.getAttribute("cacheGroupNames"); - - assertNotNull(cacheGrpNames); - - assertEquals(grpNames, Arrays.stream(cacheGrpNames.split(",")).collect(Collectors.toSet())); } assertSnapshotCacheKeys(ignite.cache(ccfg1.getName())); From 3e0b91c7f89128bc0fd58d606db6e2a6f99745d0 Mon Sep 17 00:00:00 2001 From: Pavel Pereslegin Date: Wed, 15 Dec 2021 22:31:12 +0300 Subject: [PATCH 3/3] IGNITE-14794 (minor) Cleanup. --- .../snapshot/SnapshotRestoreProcess.java | 23 +++- .../SnapshotRestoreStatusDetails.java | 130 ------------------ 2 files changed, 17 insertions(+), 136 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusDetails.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index 9dbba1252ef50..2fa6a2053dc3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -137,7 +137,7 @@ public class SnapshotRestoreProcess { * Future to be completed when the cache restore process is complete. By default, this is a stub. * When the process is started the future is recreated on the initiator node and passed to the user. */ - private volatile ClusterSnapshotFuture fut = new ClusterSnapshotFuture(); + private volatile ClusterSnapshotFuture fut; /** Current snapshot restore operation context (will be {@code null} when the operation is not running). */ private volatile SnapshotRestoreContext opCtx; @@ -349,7 +349,7 @@ public IgniteFuture start(String snpName, @Nullable Collection cac ClusterSnapshotFuture fut0 = fut; - return fut0.isDone() ? null : fut0.name; + return fut0 != null ? fut0.name : null; } /** @@ -413,6 +413,15 @@ public Set cacheStartRequiredAliveNodes(IgniteUuid reqId) { return new HashSet<>(opCtx0.nodes()); } + /** + * Finish local cache group restore process. + * + * @param reqId Request ID. + */ + private void finishProcess(UUID reqId) { + finishProcess(reqId, null); + } + /** * Finish local cache group restore process. * @@ -436,7 +445,9 @@ else if (log.isInfoEnabled()) synchronized (this) { ClusterSnapshotFuture fut0 = fut; - if (!fut0.isDone() && reqId.equals(fut0.rqId)) { + if (fut0 != null && reqId.equals(fut0.rqId)) { + fut = null; + ctx.pools().getSystemExecutorService().submit(() -> { fut0.endTime = U.currentTimeMillis(); @@ -475,7 +486,7 @@ public IgniteFuture cancel(IgniteCheckedException reason, String snpNam synchronized (this) { opCtx0 = opCtx; - if (!fut.isDone() && fut.name.equals(snpName)) { + if (fut != null && fut.name.equals(snpName)) { fut0 = fut; fut0.interruptEx = reason; @@ -587,7 +598,7 @@ private IgniteInternalFuture prepare(SnapshotO ClusterSnapshotFuture fut0 = fut; - if (!fut0.isDone() && fut0.interruptEx != null) + if (fut0 != null && fut0.interruptEx != null) opCtx0.errHnd.accept(fut0.interruptEx); } @@ -1459,7 +1470,7 @@ protected SnapshotRestoreContext( metasPerNode.computeIfAbsent(locNodeId, id -> new ArrayList<>()).addAll(locMetas); - startTime = 0; + startTime = U.currentTimeMillis(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusDetails.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusDetails.java deleted file mode 100644 index 95d455f77259b..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStatusDetails.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.persistence.snapshot; - -import java.io.Serializable; -import java.util.UUID; - -/** - * Snapshot restore operation details. - */ -public class SnapshotRestoreStatusDetails implements Serializable { - /** Serial version uid. */ - private static final long serialVersionUID = 0L; - - /** Message of the exception that led to an interruption of the process. */ - private String errMsg; - - /** Request ID. */ - private UUID reqId; - - /** Operation start time. */ - private long startTime; - - /** Operation end time. */ - private long endTime; - - /** Names of the restored cache groups. */ - private String cacheGrpNames; - - /** Number of processed (copied) partitions. */ - private long processedParts; - - /** Total number of partitions to be restored. */ - private long totalParts; - - /** Size of processed (copied) partitions in bytes. */ - private long processedBytes; - - /** Total size of the partitions to be restored in bytes. */ - private long totalBytes; - - /** Default constructor. */ - public SnapshotRestoreStatusDetails() { - // No-op. - } - - /** - * @param reqId Request ID. - * @param startTime Operation start time. - * @param endTime Operation end time. - * @param errMsg Message of the exception that led to an interruption of the process. - * @param processedParts Number of processed (copied) partitions. - * @param processedBytes Size of processed (copied) partitions in bytes. - * @param cacheGrpNames Names of the restored cache groups. - * @param totalParts Total number of partitions to be restored. - * @param totalBytes Total size of the partitions to be restored in bytes. - */ - public SnapshotRestoreStatusDetails(UUID reqId, long startTime, long endTime, String errMsg, - long processedParts, long processedBytes, String cacheGrpNames, long totalParts, long totalBytes) { - this.reqId = reqId; - this.errMsg = errMsg; - this.startTime = startTime; - this.cacheGrpNames = cacheGrpNames; - this.processedParts = processedParts; - this.processedBytes = processedBytes; - this.totalParts = totalParts; - this.totalBytes = totalBytes; - this.endTime = endTime; - } - - /** @return Message of the exception that led to an interruption of the process. */ - public String errorMessage() { - return errMsg; - } - - /** @return Request ID. */ - public UUID requestId() { - return reqId; - } - - /** @return Operation start time. */ - public long startTime() { - return startTime; - } - - /** @return Operation end time. */ - public long endTime() { - return endTime; - } - - /** @return Names of the restored cache groups. */ - public String cacheGroupNames() { - return cacheGrpNames; - } - - /** @return Number of processed (copied) partitions. */ - public long processedParts() { - return processedParts; - } - - /** @return Size of processed (copied) partitions in bytes. */ - public long processedBytes() { - return processedBytes; - } - - /** @return Total number of partitions to be restored. */ - public long totalParts() { - return totalParts; - } - - /** @return Total size of the partitions to be restored in bytes. */ - public long totalBytes() { - return totalBytes; - } -}