Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.ignite.internal.util.typedef.F;

import static java.lang.Boolean.TRUE;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerType.CREATE;

/**
Expand All @@ -47,9 +49,9 @@ public class DataStreamerUpdatesHandler implements SnapshotHandler<Boolean> {
}

/** {@inheritDoc} */
@Override public void complete(String name, Collection<SnapshotHandlerResult<Boolean>> results)
@Override public void complete(String name, Map<UUID, SnapshotHandlerResult<Boolean>> results)
throws SnapshotWarningException {
Collection<UUID> nodes = F.viewReadOnly(results, r -> r.node().id(), SnapshotHandlerResult::data);
Collection<UUID> nodes = F.viewReadOnly(results.entrySet(), Entry::getKey, e -> TRUE.equals(e.getValue().data()));

if (!F.isEmpty(nodes)) {
throw new SnapshotWarningException(WRN_MSG + " Updates from DataStreamer detected on the nodes: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ else if (!F.isEmpty(err)) {
"Uncompleted snapshot will be deleted [err=" + err + ']'));
}

completeHandlersAsyncIfNeeded(snpOp, res.values())
completeHandlersAsyncIfNeeded(snpOp, res)
.listen(f -> {
if (f.error() != null)
snpOp.error(f.error());
Expand Down Expand Up @@ -1194,30 +1194,31 @@ public <M extends Serializable> void storeSnapshotMeta(M meta, File smf) {
}

/**
* Execute the {@link SnapshotHandler#complete(String, Collection)} method of the snapshot handlers asynchronously.
* Execute the {@link SnapshotHandler#complete(String, Map)} method of the snapshot handlers asynchronously.
*
* @param snpOp Snapshot creation operation.
* @param res Results.
* @return Future that will be completed when the handlers are finished executing.
*/
private IgniteInternalFuture<Void> completeHandlersAsyncIfNeeded(
SnapshotOperation snpOp,
Collection<SnapshotOperationResponse> res
Map<UUID, SnapshotOperationResponse> res
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

) {
if (snpOp.error() != null)
return new GridFinishedFuture<>();

SnapshotOperationRequest req = snpOp.request();

Map<String, List<SnapshotHandlerResult<?>>> clusterHndResults = new HashMap<>();
Map<String, Map<UUID, SnapshotHandlerResult<?>>> clusterHndResults = new HashMap<>();

for (SnapshotOperationResponse snpRes : res) {
res.forEach((nodeId, snpRes) -> {
if (snpRes == null || snpRes.handlerResults() == null)
continue;
return;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, iteration was performed via a loop; now it is done using a lambda


for (Map.Entry<String, SnapshotHandlerResult<Object>> entry : snpRes.handlerResults().entrySet())
clusterHndResults.computeIfAbsent(entry.getKey(), v -> new ArrayList<>()).add(entry.getValue());
}
snpRes.handlerResults().forEach((hndName, hndRes) ->
clusterHndResults.computeIfAbsent(hndName, v -> new HashMap<>())
.put(nodeId, hndRes));
});

if (clusterHndResults.isEmpty())
return new GridFinishedFuture<>();
Expand Down Expand Up @@ -2944,7 +2945,7 @@ private void initialize(GridKernalContext ctx, ExecutorService execSvc) {
protected void completeAll(
SnapshotHandlerType type,
String snpName,
Map<String, List<SnapshotHandlerResult<?>>> res,
Map<String, Map<UUID, SnapshotHandlerResult<?>>> res,
Collection<UUID> reqNodes,
Consumer<List<String>> wrnsHnd
) throws Exception {
Expand All @@ -2963,13 +2964,13 @@ protected void completeAll(
List<String> wrns = new ArrayList<>();

for (SnapshotHandler hnd : hnds) {
List<SnapshotHandlerResult<?>> nodesRes = res.get(hnd.getClass().getName());
Map<UUID, SnapshotHandlerResult<?>> nodesRes = res.get(hnd.getClass().getName());

if (nodesRes == null || nodesRes.size() < reqNodes.size()) {
Set<UUID> missing = new HashSet<>(reqNodes);

if (nodesRes != null)
missing.removeAll(F.viewReadOnly(nodesRes, r -> r.node().id()));
missing.removeAll(nodesRes.keySet());

throw new IgniteCheckedException("Snapshot handlers configuration mismatch, " +
"\"" + hnd.getClass().getName() + "\" handler is missing on the remote node(s). " +
Expand All @@ -2996,12 +2997,12 @@ protected void completeAll(
*/
private SnapshotHandlerResult<Object> invoke(SnapshotHandler<Object> hnd, SnapshotHandlerContext ctx) {
try {
return new SnapshotHandlerResult<>(hnd.invoke(ctx), null, ctx.localNode());
return new SnapshotHandlerResult<>(hnd.invoke(ctx), null);
}
catch (Exception e) {
U.error(null, "Error invoking snapshot handler", e);

return new SnapshotHandlerResult<>(null, e, ctx.localNode());
return new SnapshotHandlerResult<>(null, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ private void reduceCustomHandlersResults(
throw F.firstValue(errors);

// Check responses: checking node -> snapshot part's consistent id -> handler name -> handler result.
Map<ClusterNode, Map<Object, Map<String, SnapshotHandlerResult<?>>>> reduced = new HashMap<>(results.size(), 1.0f);
Map<UUID, Map<Object, Map<String, SnapshotHandlerResult<?>>>> reduced = new HashMap<>(results.size(), 1.0f);

for (Map.Entry<UUID, SnapshotCheckResponse> respEntry : results.entrySet()) {
SnapshotCheckResponse nodeResp = respEntry.getValue();
Expand All @@ -225,37 +225,34 @@ private void reduceCustomHandlersResults(
cstHndRes.forEach((consId, respPerConsIdMap) -> {
// Reduced map of the handlers results per snapshot part's consistent id for certain node.
Map<Object, Map<String, SnapshotHandlerResult<?>>> nodePerConsIdResultMap
= reduced.computeIfAbsent(kctx.cluster().get().node(nodeId), n -> new HashMap<>());
= reduced.computeIfAbsent(nodeId, n -> new HashMap<>());

respPerConsIdMap.forEach((hndId, hndRes) ->
nodePerConsIdResultMap.computeIfAbsent(consId, cstId -> new HashMap<>()).put(hndId, hndRes));
});
}

Map<String, List<SnapshotHandlerResult<?>>> clusterResults = new HashMap<>();
Collection<UUID> execNodes = new ArrayList<>(reduced.size());
Map<String, Map<UUID, SnapshotHandlerResult<?>>> clusterResults = new HashMap<>();

// Checking node -> Map by consistend id.
for (Map.Entry<ClusterNode, Map<Object, Map<String, SnapshotHandlerResult<?>>>> nodeRes : reduced.entrySet()) {
for (Map.Entry<UUID, Map<Object, Map<String, SnapshotHandlerResult<?>>>> nodeRes : reduced.entrySet()) {
// Consistent id -> Map by handler name.
for (Map.Entry<Object, Map<String, SnapshotHandlerResult<?>>> res : nodeRes.getValue().entrySet()) {
// Depending on the job mapping, we can get several different results from one node.
execNodes.add(nodeRes.getKey().id());

Map<String, SnapshotHandlerResult<?>> nodeDataMap = res.getValue();

assert nodeDataMap != null : "At least the default snapshot restore handler should have been executed ";

for (Map.Entry<String, SnapshotHandlerResult<?>> entry : nodeDataMap.entrySet()) {
String hndName = entry.getKey();

clusterResults.computeIfAbsent(hndName, v -> new ArrayList<>()).add(entry.getValue());
clusterResults.computeIfAbsent(hndName, v -> new HashMap<>())
.put(nodeRes.getKey(), entry.getValue());
}
}
}

kctx.cache().context().snapshotMgr().handlers().completeAll(
SnapshotHandlerType.RESTORE, ctx.req.snapshotName(), clusterResults, execNodes, wrns -> {});
SnapshotHandlerType.RESTORE, ctx.req.snapshotName(), clusterResults, reduced.keySet(), wrns -> {});

fut.onDone(new SnapshotPartitionsVerifyResult(ctx.clusterMetas, null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.plugin.Extension;
import org.jetbrains.annotations.Nullable;
Expand All @@ -28,11 +29,11 @@
* The execution of the handler consists of two steps:
* <ol>
* <li>Local call of {@link #invoke(SnapshotHandlerContext)} method on all nodes containing the snapshot data.</li>
* <li>Processing the results of local invocations in the {@link #complete(String, Collection)} method on one of the
* <li>Processing the results of local invocations in the {@link #complete(String, Map)} method on one of the
* nodes containing the snapshot data.</li>
* </ol>
* Note: If during the execution of a snapshot operation some node exits, then whole operation is rolled back, in which
* case the {@link #complete(String, Collection)} method may not be called.
* case the {@link #complete(String, Map)} method may not be called.
*
* @param <T> Type of the local processing result.
*/
Expand All @@ -45,9 +46,9 @@ public interface SnapshotHandler<T> extends Extension {
*
* @param ctx Snapshot handler context.
* @return Result of local processing. This result will be returned in {@link SnapshotHandlerResult#data()} method
* passed into {@link #complete(String, Collection)} handler method.
* passed into {@link #complete(String, Map)} handler method.
* @throws Exception If invocation caused an exception. This exception will be returned in {@link
* SnapshotHandlerResult#error()}} method passed into {@link #complete(String, Collection)} handler method.
* SnapshotHandlerResult#error()}} method passed into {@link #complete(String, Map)} handler method.
*/
public @Nullable T invoke(SnapshotHandlerContext ctx) throws Exception;

Expand All @@ -65,16 +66,18 @@ public interface SnapshotHandler<T> extends Extension {
* @throws Exception If the snapshot operation needs to be aborted.
* @see SnapshotHandlerResult
*/
public default void complete(String name, Collection<SnapshotHandlerResult<T>> results)
public default void complete(String name, Map<UUID, SnapshotHandlerResult<T>> results)
throws SnapshotWarningException, Exception {
for (SnapshotHandlerResult<T> res : results) {
if (res.error() == null)
for (Map.Entry<UUID, SnapshotHandlerResult<T>> e : results.entrySet()) {
SnapshotHandlerResult<T> hndRes = e.getValue();

if (hndRes.error() == null)
continue;

throw new IgniteCheckedException("Snapshot handler has failed. " + res.error().getMessage() +
throw new IgniteCheckedException("Snapshot handler has failed. " + hndRes.error().getMessage() +
" [snapshot=" + name +
", handler=" + getClass().getName() +
", nodeId=" + res.node().id() + "].", res.error());
", nodeId=" + e.getKey() + "].", hndRes.error());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.io.Serializable;
import org.apache.ignite.cluster.ClusterNode;
import org.jetbrains.annotations.Nullable;

/**
Expand All @@ -37,18 +36,13 @@ public class SnapshotHandlerResult<T> implements Serializable {
/** Processing error. */
private final Exception err;

/** Processing node. */
private final ClusterNode node;

/**
* @param data Result of local processing.
* @param err Processing error.
* @param node Processing node.
*/
public SnapshotHandlerResult(@Nullable T data, @Nullable Exception err, ClusterNode node) {
public SnapshotHandlerResult(@Nullable T data, @Nullable Exception err) {
this.data = data;
this.err = err;
this.node = node;
}

/** @return Result of local processing. */
Expand All @@ -60,9 +54,4 @@ public SnapshotHandlerResult(@Nullable T data, @Nullable Exception err, ClusterN
public @Nullable Exception error() {
return err;
}

/** @return Processing node. */
public ClusterNode node() {
return node;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.management.cache.PartitionKey;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
Expand Down Expand Up @@ -69,21 +69,21 @@ public SnapshotPartitionsQuickVerifyHandler(GridCacheSharedContext<?, ?> cctx) {
/** {@inheritDoc} */
@Override public void complete(
String name,
Collection<SnapshotHandlerResult<Map<PartitionKey, PartitionHashRecord>>> results
Map<UUID, SnapshotHandlerResult<Map<PartitionKey, PartitionHashRecord>>> results
) throws IgniteCheckedException {
Exception err = results.stream().map(SnapshotHandlerResult::error).filter(Objects::nonNull).findAny().orElse(null);
Exception err = results.values().stream().map(SnapshotHandlerResult::error).filter(Objects::nonNull).findAny().orElse(null);

if (err != null)
throw U.cast(err);

// Null means that the streamer was already detected (See #invoke).
if (results.stream().anyMatch(res -> res.data() == null))
if (results.values().stream().anyMatch(res -> res.data() == null))
return;

Set<Integer> wrnGrps = new HashSet<>();
Map<PartitionKey, PartitionHashRecord> total = new HashMap<>();

for (SnapshotHandlerResult<Map<PartitionKey, PartitionHashRecord>> result : results) {
for (SnapshotHandlerResult<Map<PartitionKey, PartitionHashRecord>> result : results.values()) {
result.data().forEach((part, val) -> {
PartitionHashRecord other = total.putIfAbsent(part, val);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -439,13 +440,18 @@ private PartitionHashRecord calculateDumpedPartitionHash(Dump dump, String folde
}

/** {@inheritDoc} */
@Override public void complete(String name,
Collection<SnapshotHandlerResult<Map<PartitionKey, PartitionHashRecord>>> results) throws IgniteCheckedException {
@Override public void complete(
String name,
Map<UUID, SnapshotHandlerResult<Map<PartitionKey, PartitionHashRecord>>> results
) throws IgniteCheckedException {
IdleVerifyResult.Builder bldr = IdleVerifyResult.builder();

for (SnapshotHandlerResult<Map<PartitionKey, PartitionHashRecord>> res : results) {
for (Map.Entry<UUID, SnapshotHandlerResult<Map<PartitionKey, PartitionHashRecord>>> e : results.entrySet()) {
UUID nodeId = e.getKey();
SnapshotHandlerResult<Map<PartitionKey, PartitionHashRecord>> res = e.getValue();

if (res.error() != null) {
bldr.addException(res.node(), res.error());
bldr.addException(cctx.discovery().historicalNode(nodeId), res.error());

continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -100,8 +100,8 @@ public void testClusterSnapshotHandlers() throws Exception {
}

@Override public void complete(String name,
Collection<SnapshotHandlerResult<UUID>> results) throws IgniteCheckedException {
for (SnapshotHandlerResult<UUID> res : results) {
Map<UUID, SnapshotHandlerResult<UUID>> results) throws IgniteCheckedException {
for (SnapshotHandlerResult<UUID> res : results.values()) {
if (!reqIdRef.compareAndSet(null, res.data()) && !reqIdRef.get().equals(res.data()))
throw new IgniteCheckedException("The request ID must be the same on all nodes.");
}
Expand All @@ -118,8 +118,8 @@ public void testClusterSnapshotHandlers() throws Exception {
}

@Override public void complete(String name,
Collection<SnapshotHandlerResult<UUID>> results) throws IgniteCheckedException {
for (SnapshotHandlerResult<UUID> res : results) {
Map<UUID, SnapshotHandlerResult<UUID>> results) throws IgniteCheckedException {
for (SnapshotHandlerResult<UUID> res : results.values()) {
if (!reqIdRef.get().equals(res.data()))
throw new IgniteCheckedException(expMsg);
}
Expand Down Expand Up @@ -305,7 +305,7 @@ public void testClusterSnapshotHandlerConfigurationMismatch() throws Exception {

/**
* Test ensures that the snapshot creation is aborted if node exits while the {@link
* SnapshotHandler#complete(String, Collection)} method is executed.
* SnapshotHandler#complete(String, Map)} method is executed.
*
* @throws Exception If fails.
*/
Expand All @@ -322,7 +322,7 @@ public void testCrdChangeDuringHandlerCompleteOnSnapshotCreate() throws Exceptio
return null;
}

@Override public void complete(String name, Collection<SnapshotHandlerResult<Void>> results)
@Override public void complete(String name, Map<UUID, SnapshotHandlerResult<Void>> results)
throws Exception {
if (latch.getCount() == 1) {
latch.countDown();
Expand Down
Loading