diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java index ee608565ef045..8bbb9ff642d81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java @@ -18,11 +18,13 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; import java.util.UUID; import java.util.stream.Collectors; - import org.apache.ignite.internal.util.typedef.F; +import static java.lang.Boolean.TRUE; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerType.CREATE; /** @@ -47,9 +49,9 @@ public class DataStreamerUpdatesHandler implements SnapshotHandler { } /** {@inheritDoc} */ - @Override public void complete(String name, Collection> results) + @Override public void complete(String name, Map> results) throws SnapshotWarningException { - Collection nodes = F.viewReadOnly(results, r -> r.node().id(), SnapshotHandlerResult::data); + Collection nodes = F.viewReadOnly(results.entrySet(), Entry::getKey, e -> TRUE.equals(e.getValue().data())); if (!F.isEmpty(nodes)) { throw new SnapshotWarningException(WRN_MSG + " Updates from DataStreamer detected on the nodes: " + diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 2a24ffcca9690..d18aa26634376 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -1152,7 +1152,7 @@ else if (!F.isEmpty(err)) { "Uncompleted snapshot will be deleted [err=" + err + ']')); } - completeHandlersAsyncIfNeeded(snpOp, res.values()) + completeHandlersAsyncIfNeeded(snpOp, res) .listen(f -> { if (f.error() != null) snpOp.error(f.error()); @@ -1194,7 +1194,7 @@ public void storeSnapshotMeta(M meta, File smf) { } /** - * Execute the {@link SnapshotHandler#complete(String, Collection)} method of the snapshot handlers asynchronously. + * Execute the {@link SnapshotHandler#complete(String, Map)} method of the snapshot handlers asynchronously. * * @param snpOp Snapshot creation operation. * @param res Results. @@ -1202,22 +1202,23 @@ public void storeSnapshotMeta(M meta, File smf) { */ private IgniteInternalFuture completeHandlersAsyncIfNeeded( SnapshotOperation snpOp, - Collection res + Map res ) { if (snpOp.error() != null) return new GridFinishedFuture<>(); SnapshotOperationRequest req = snpOp.request(); - Map>> clusterHndResults = new HashMap<>(); + Map>> clusterHndResults = new HashMap<>(); - for (SnapshotOperationResponse snpRes : res) { + res.forEach((nodeId, snpRes) -> { if (snpRes == null || snpRes.handlerResults() == null) - continue; + return; - for (Map.Entry> entry : snpRes.handlerResults().entrySet()) - clusterHndResults.computeIfAbsent(entry.getKey(), v -> new ArrayList<>()).add(entry.getValue()); - } + snpRes.handlerResults().forEach((hndName, hndRes) -> + clusterHndResults.computeIfAbsent(hndName, v -> new HashMap<>()) + .put(nodeId, hndRes)); + }); if (clusterHndResults.isEmpty()) return new GridFinishedFuture<>(); @@ -2944,7 +2945,7 @@ private void initialize(GridKernalContext ctx, ExecutorService execSvc) { protected void completeAll( SnapshotHandlerType type, String snpName, - Map>> res, + Map>> res, Collection reqNodes, Consumer> wrnsHnd ) throws Exception { @@ -2963,13 +2964,13 @@ protected void completeAll( List wrns = new ArrayList<>(); for (SnapshotHandler hnd : hnds) { - List> nodesRes = res.get(hnd.getClass().getName()); + Map> nodesRes = res.get(hnd.getClass().getName()); if (nodesRes == null || nodesRes.size() < reqNodes.size()) { Set missing = new HashSet<>(reqNodes); if (nodesRes != null) - missing.removeAll(F.viewReadOnly(nodesRes, r -> r.node().id())); + missing.removeAll(nodesRes.keySet()); throw new IgniteCheckedException("Snapshot handlers configuration mismatch, " + "\"" + hnd.getClass().getName() + "\" handler is missing on the remote node(s). " + @@ -2996,12 +2997,12 @@ protected void completeAll( */ private SnapshotHandlerResult invoke(SnapshotHandler hnd, SnapshotHandlerContext ctx) { try { - return new SnapshotHandlerResult<>(hnd.invoke(ctx), null, ctx.localNode()); + return new SnapshotHandlerResult<>(hnd.invoke(ctx), null); } catch (Exception e) { U.error(null, "Error invoking snapshot handler", e); - return new SnapshotHandlerResult<>(null, e, ctx.localNode()); + return new SnapshotHandlerResult<>(null, e); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java index f62fc22ff8225..29bd175cb9ca9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java @@ -207,7 +207,7 @@ private void reduceCustomHandlersResults( throw F.firstValue(errors); // Check responses: checking node -> snapshot part's consistent id -> handler name -> handler result. - Map>>> reduced = new HashMap<>(results.size(), 1.0f); + Map>>> reduced = new HashMap<>(results.size(), 1.0f); for (Map.Entry respEntry : results.entrySet()) { SnapshotCheckResponse nodeResp = respEntry.getValue(); @@ -225,23 +225,19 @@ private void reduceCustomHandlersResults( cstHndRes.forEach((consId, respPerConsIdMap) -> { // Reduced map of the handlers results per snapshot part's consistent id for certain node. Map>> nodePerConsIdResultMap - = reduced.computeIfAbsent(kctx.cluster().get().node(nodeId), n -> new HashMap<>()); + = reduced.computeIfAbsent(nodeId, n -> new HashMap<>()); respPerConsIdMap.forEach((hndId, hndRes) -> nodePerConsIdResultMap.computeIfAbsent(consId, cstId -> new HashMap<>()).put(hndId, hndRes)); }); } - Map>> clusterResults = new HashMap<>(); - Collection execNodes = new ArrayList<>(reduced.size()); + Map>> clusterResults = new HashMap<>(); // Checking node -> Map by consistend id. - for (Map.Entry>>> nodeRes : reduced.entrySet()) { + for (Map.Entry>>> nodeRes : reduced.entrySet()) { // Consistent id -> Map by handler name. for (Map.Entry>> res : nodeRes.getValue().entrySet()) { - // Depending on the job mapping, we can get several different results from one node. - execNodes.add(nodeRes.getKey().id()); - Map> nodeDataMap = res.getValue(); assert nodeDataMap != null : "At least the default snapshot restore handler should have been executed "; @@ -249,13 +245,14 @@ private void reduceCustomHandlersResults( for (Map.Entry> entry : nodeDataMap.entrySet()) { String hndName = entry.getKey(); - clusterResults.computeIfAbsent(hndName, v -> new ArrayList<>()).add(entry.getValue()); + clusterResults.computeIfAbsent(hndName, v -> new HashMap<>()) + .put(nodeRes.getKey(), entry.getValue()); } } } kctx.cache().context().snapshotMgr().handlers().completeAll( - SnapshotHandlerType.RESTORE, ctx.req.snapshotName(), clusterResults, execNodes, wrns -> {}); + SnapshotHandlerType.RESTORE, ctx.req.snapshotName(), clusterResults, reduced.keySet(), wrns -> {}); fut.onDone(new SnapshotPartitionsVerifyResult(ctx.clusterMetas, null)); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java index abd97834c3c66..4147404de6adf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java @@ -17,7 +17,8 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; -import java.util.Collection; +import java.util.Map; +import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.plugin.Extension; import org.jetbrains.annotations.Nullable; @@ -28,11 +29,11 @@ * The execution of the handler consists of two steps: *
    *
  1. Local call of {@link #invoke(SnapshotHandlerContext)} method on all nodes containing the snapshot data.
  2. - *
  3. Processing the results of local invocations in the {@link #complete(String, Collection)} method on one of the + *
  4. Processing the results of local invocations in the {@link #complete(String, Map)} method on one of the * nodes containing the snapshot data.
  5. *
* Note: If during the execution of a snapshot operation some node exits, then whole operation is rolled back, in which - * case the {@link #complete(String, Collection)} method may not be called. + * case the {@link #complete(String, Map)} method may not be called. * * @param Type of the local processing result. */ @@ -45,9 +46,9 @@ public interface SnapshotHandler extends Extension { * * @param ctx Snapshot handler context. * @return Result of local processing. This result will be returned in {@link SnapshotHandlerResult#data()} method - * passed into {@link #complete(String, Collection)} handler method. + * passed into {@link #complete(String, Map)} handler method. * @throws Exception If invocation caused an exception. This exception will be returned in {@link - * SnapshotHandlerResult#error()}} method passed into {@link #complete(String, Collection)} handler method. + * SnapshotHandlerResult#error()}} method passed into {@link #complete(String, Map)} handler method. */ public @Nullable T invoke(SnapshotHandlerContext ctx) throws Exception; @@ -65,16 +66,18 @@ public interface SnapshotHandler extends Extension { * @throws Exception If the snapshot operation needs to be aborted. * @see SnapshotHandlerResult */ - public default void complete(String name, Collection> results) + public default void complete(String name, Map> results) throws SnapshotWarningException, Exception { - for (SnapshotHandlerResult res : results) { - if (res.error() == null) + for (Map.Entry> e : results.entrySet()) { + SnapshotHandlerResult hndRes = e.getValue(); + + if (hndRes.error() == null) continue; - throw new IgniteCheckedException("Snapshot handler has failed. " + res.error().getMessage() + + throw new IgniteCheckedException("Snapshot handler has failed. " + hndRes.error().getMessage() + " [snapshot=" + name + ", handler=" + getClass().getName() + - ", nodeId=" + res.node().id() + "].", res.error()); + ", nodeId=" + e.getKey() + "].", hndRes.error()); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerResult.java index b9243287ee947..06d38877aa541 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerResult.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; import java.io.Serializable; -import org.apache.ignite.cluster.ClusterNode; import org.jetbrains.annotations.Nullable; /** @@ -37,18 +36,13 @@ public class SnapshotHandlerResult implements Serializable { /** Processing error. */ private final Exception err; - /** Processing node. */ - private final ClusterNode node; - /** * @param data Result of local processing. * @param err Processing error. - * @param node Processing node. */ - public SnapshotHandlerResult(@Nullable T data, @Nullable Exception err, ClusterNode node) { + public SnapshotHandlerResult(@Nullable T data, @Nullable Exception err) { this.data = data; this.err = err; - this.node = node; } /** @return Result of local processing. */ @@ -60,9 +54,4 @@ public SnapshotHandlerResult(@Nullable T data, @Nullable Exception err, ClusterN public @Nullable Exception error() { return err; } - - /** @return Processing node. */ - public ClusterNode node() { - return node; - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java index 84f8b90311f5c..2a276afec2669 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java @@ -17,12 +17,12 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.management.cache.PartitionKey; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -69,21 +69,21 @@ public SnapshotPartitionsQuickVerifyHandler(GridCacheSharedContext cctx) { /** {@inheritDoc} */ @Override public void complete( String name, - Collection>> results + Map>> results ) throws IgniteCheckedException { - Exception err = results.stream().map(SnapshotHandlerResult::error).filter(Objects::nonNull).findAny().orElse(null); + Exception err = results.values().stream().map(SnapshotHandlerResult::error).filter(Objects::nonNull).findAny().orElse(null); if (err != null) throw U.cast(err); // Null means that the streamer was already detected (See #invoke). - if (results.stream().anyMatch(res -> res.data() == null)) + if (results.values().stream().anyMatch(res -> res.data() == null)) return; Set wrnGrps = new HashSet<>(); Map total = new HashMap<>(); - for (SnapshotHandlerResult> result : results) { + for (SnapshotHandlerResult> result : results.values()) { result.data().forEach((part, val) -> { PartitionHashRecord other = total.putIfAbsent(part, val); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java index c12f86e15efd7..8619b53fc02f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; @@ -439,13 +440,18 @@ private PartitionHashRecord calculateDumpedPartitionHash(Dump dump, String folde } /** {@inheritDoc} */ - @Override public void complete(String name, - Collection>> results) throws IgniteCheckedException { + @Override public void complete( + String name, + Map>> results + ) throws IgniteCheckedException { IdleVerifyResult.Builder bldr = IdleVerifyResult.builder(); - for (SnapshotHandlerResult> res : results) { + for (Map.Entry>> e : results.entrySet()) { + UUID nodeId = e.getKey(); + SnapshotHandlerResult> res = e.getValue(); + if (res.error() != null) { - bldr.addException(res.node(), res.error()); + bldr.addException(cctx.discovery().historicalNode(nodeId), res.error()); continue; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java index 243a86e784b9d..167c267f79093 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java @@ -22,8 +22,8 @@ import java.io.FileOutputStream; import java.io.OutputStream; import java.util.ArrayList; -import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -100,8 +100,8 @@ public void testClusterSnapshotHandlers() throws Exception { } @Override public void complete(String name, - Collection> results) throws IgniteCheckedException { - for (SnapshotHandlerResult res : results) { + Map> results) throws IgniteCheckedException { + for (SnapshotHandlerResult res : results.values()) { if (!reqIdRef.compareAndSet(null, res.data()) && !reqIdRef.get().equals(res.data())) throw new IgniteCheckedException("The request ID must be the same on all nodes."); } @@ -118,8 +118,8 @@ public void testClusterSnapshotHandlers() throws Exception { } @Override public void complete(String name, - Collection> results) throws IgniteCheckedException { - for (SnapshotHandlerResult res : results) { + Map> results) throws IgniteCheckedException { + for (SnapshotHandlerResult res : results.values()) { if (!reqIdRef.get().equals(res.data())) throw new IgniteCheckedException(expMsg); } @@ -305,7 +305,7 @@ public void testClusterSnapshotHandlerConfigurationMismatch() throws Exception { /** * Test ensures that the snapshot creation is aborted if node exits while the {@link - * SnapshotHandler#complete(String, Collection)} method is executed. + * SnapshotHandler#complete(String, Map)} method is executed. * * @throws Exception If fails. */ @@ -322,7 +322,7 @@ public void testCrdChangeDuringHandlerCompleteOnSnapshotCreate() throws Exceptio return null; } - @Override public void complete(String name, Collection> results) + @Override public void complete(String name, Map> results) throws Exception { if (latch.getCount() == 1) { latch.countDown();