Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,22 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
connectionManager = new SCMConnectionManager(conf);
context = new StateContext(this.conf, DatanodeStates.getInitState(), this,
threadNamePrefix);
ReplicationConfig replicationConfig =
conf.getObject(ReplicationConfig.class);
supervisor = ReplicationSupervisor.newBuilder()
.stateContext(context)
.datanodeConfig(dnConf)
.replicationConfig(replicationConfig)
.clock(clock)
.build();

// OzoneContainer instance is used in a non-thread safe way by the context
// past to its constructor, so we much synchronize its access. See
// HDDS-3116 for more details.
constructionLock.writeLock().lock();
try {
container = new OzoneContainer(this.datanodeDetails,
conf, context, certClient, secretKeyClient);
conf, context, certClient, secretKeyClient, supervisor);
} finally {
constructionLock.writeLock().unlock();
}
Expand All @@ -188,11 +197,13 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
ContainerImporter importer = new ContainerImporter(conf,
container.getContainerSet(),
container.getController(),
container.getVolumeSet());
container.getVolumeSet(),
supervisor);
ContainerReplicator pullReplicator = new DownloadAndImportReplicator(
conf, container.getContainerSet(),
importer,
new SimpleContainerDownloader(conf, certClient));
new SimpleContainerDownloader(conf, certClient),
supervisor);
ContainerReplicator pushReplicator = new PushReplicator(conf,
new OnDemandContainerReplicationSource(container.getController()),
new GrpcContainerUploader(conf, certClient)
Expand All @@ -201,23 +212,14 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
pullReplicatorWithMetrics = new MeasuredReplicator(pullReplicator, "pull");
pushReplicatorWithMetrics = new MeasuredReplicator(pushReplicator, "push");

ReplicationConfig replicationConfig =
conf.getObject(ReplicationConfig.class);
supervisor = ReplicationSupervisor.newBuilder()
.stateContext(context)
.datanodeConfig(dnConf)
.replicationConfig(replicationConfig)
.clock(clock)
.build();

replicationSupervisorMetrics =
ReplicationSupervisorMetrics.create(supervisor);

ecReconstructionMetrics = ECReconstructionMetrics.create();

ecReconstructionCoordinator = new ECReconstructionCoordinator(
conf, certClient, secretKeyClient, context, ecReconstructionMetrics,
threadNamePrefix);
threadNamePrefix, supervisor);

// This is created as an instance variable as Mockito needs to access it in
// a test. The test mocks it in a running mini-cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
Expand Down Expand Up @@ -116,13 +117,16 @@ public class ECReconstructionCoordinator implements Closeable {
private final ECReconstructionMetrics metrics;
private final StateContext context;
private final OzoneClientConfig ozoneClientConfig;
private final ReplicationSupervisor replicationSupervisor;

public ECReconstructionCoordinator(
ConfigurationSource conf, CertificateClient certificateClient,
SecretKeySignerClient secretKeyClient, StateContext context,
ECReconstructionMetrics metrics,
String threadNamePrefix) throws IOException {
String threadNamePrefix,
ReplicationSupervisor supervisor) throws IOException {
this.context = context;
this.replicationSupervisor = supervisor;
this.containerOperationClient = new ECContainerOperationClient(conf,
certificateClient);
this.byteBufferPool = new ElasticByteBufferPool();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.common.utils.ContainerInspectorUtil;
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
Expand All @@ -59,6 +60,7 @@
import org.apache.hadoop.ozone.container.replication.ContainerImporter;
import org.apache.hadoop.ozone.container.replication.ReplicationServer;
import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures.SchemaV3;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.Timer;
Expand All @@ -76,6 +78,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
Expand Down Expand Up @@ -138,10 +141,11 @@ enum InitializingStatus {
* @throws DiskOutOfSpaceException
* @throws IOException
*/
public OzoneContainer(
DatanodeDetails datanodeDetails, ConfigurationSource conf,
StateContext context, CertificateClient certClient,
SecretKeyVerifierClient secretKeyClient) throws IOException {
public OzoneContainer(DatanodeDetails datanodeDetails,
ConfigurationSource conf, StateContext context,
CertificateClient certClient, SecretKeyVerifierClient secretKeyClient,
ReplicationSupervisor supervisor) throws IOException {

config = conf;
this.datanodeDetails = datanodeDetails;
this.context = context;
Expand Down Expand Up @@ -211,12 +215,14 @@ public OzoneContainer(

replicationServer = new ReplicationServer(
controller,
conf.getObject(ReplicationConfig.class),
conf.getObject(ReplicationServer.ReplicationConfig.class),
secConf,
certClient,
new ContainerImporter(conf, containerSet, controller,
volumeSet),
datanodeDetails.threadNamePrefix());
volumeSet, supervisor),
datanodeDetails.threadNamePrefix(),
supervisor);


readChannel = new XceiverServerGrpc(
datanodeDetails, config, hddsDispatcher, certClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class ContainerImporter {
private final MutableVolumeSet volumeSet;
private final VolumeChoosingPolicy volumeChoosingPolicy;
private final long containerSize;
private final ReplicationSupervisor replicationSupervisor;

private final Set<Long> importContainerProgress
= Collections.synchronizedSet(new HashSet<>());
Expand All @@ -70,10 +71,12 @@ public class ContainerImporter {
public ContainerImporter(@Nonnull ConfigurationSource conf,
@Nonnull ContainerSet containerSet,
@Nonnull ContainerController controller,
@Nonnull MutableVolumeSet volumeSet) {
@Nonnull MutableVolumeSet volumeSet,
@Nonnull ReplicationSupervisor supervisor) {
this.containerSet = containerSet;
this.controller = controller;
this.volumeSet = volumeSet;
this.replicationSupervisor = supervisor;
try {
volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
} catch (Exception e) {
Expand Down Expand Up @@ -147,9 +150,21 @@ private static void deleteFileQuietely(Path tarFilePath) {

HddsVolume chooseNextVolume() throws IOException {
// Choose volume that can hold both container in tmp and dest directory
return volumeChoosingPolicy.chooseVolume(
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
containerSize * 2);
List<HddsVolume> hddsVolumes =
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
List<HddsVolume> nonBusyVolumes = hddsVolumes.stream()
.filter(v -> replicationSupervisor
.getInFlightReplications(v.getVolumeRootDir()) == 0)
.collect(java.util.stream.Collectors.toList());

if (!nonBusyVolumes.isEmpty()) {
try {
return volumeChoosingPolicy.chooseVolume(nonBusyVolumes, containerSize * 2);
} catch (IOException e) {
LOG.debug("Failed to choose from non-busy volumes, falling back to all volumes.", e);
}
}
return volumeChoosingPolicy.chooseVolume(hddsVolumes, containerSize * 2);
}

public static Path getUntarDirectory(HddsVolume hddsVolume)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,52 +17,52 @@
*/
package org.apache.hadoop.ozone.container.replication;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

/**
* Default replication implementation.
* <p>
* This class does the real job. Executes the download and import the container
* to the container set.
* Replicator which downloads the container tarball and imports it.
*/
public class DownloadAndImportReplicator implements ContainerReplicator {

public static final Logger LOG =
private static final Logger LOG =
LoggerFactory.getLogger(DownloadAndImportReplicator.class);

private final ConfigurationSource conf;
private final ContainerDownloader downloader;
private final ContainerImporter containerImporter;
private final ContainerSet containerSet;
private final ReplicationSupervisor replicationSupervisor;

public DownloadAndImportReplicator(
ConfigurationSource conf, ContainerSet containerSet,
ContainerImporter containerImporter,
ContainerDownloader downloader) {
ContainerDownloader downloader,
ReplicationSupervisor replicationSupervisor) {
this.conf = conf;
this.containerSet = containerSet;
this.downloader = downloader;
this.containerImporter = containerImporter;
this.replicationSupervisor = replicationSupervisor;
}

@Override
public void replicate(ReplicationTask task) {
long containerID = task.getContainerId();
if (containerSet.getContainer(containerID) != null) {
LOG.debug("Container {} has already been downloaded.", containerID);
task.setStatus(Status.SKIPPED);
LOG.info("Container {} already exists. Skipping replication.",
containerID);
task.setStatus(Status.DONE);
return;
}

Expand All @@ -73,8 +73,12 @@ public void replicate(ReplicationTask task) {
LOG.info("Starting replication of container {} from {} using {}",
containerID, sourceDatanodes, compression);

HddsVolume targetVolume = null;
try {
HddsVolume targetVolume = containerImporter.chooseNextVolume();
targetVolume = containerImporter.chooseNextVolume();
replicationSupervisor.incrementInFlightReplication(
targetVolume.getVolumeRootDir());

// Wait for the download. This thread pool is limiting the parallel
// downloads, so it's ok to block here and wait for the full download.
Path tarFilePath =
Expand All @@ -97,6 +101,11 @@ public void replicate(ReplicationTask task) {
} catch (IOException e) {
LOG.error("Container {} replication was unsuccessful.", containerID, e);
task.setStatus(Status.FAILED);
} finally {
if (targetVolume != null) {
replicationSupervisor.decrementInFlightReplication(
targetVolume.getVolumeRootDir());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@ public final class ReplicationSupervisor {
*/
private final Set<AbstractReplicationTask> inFlight;

private final Map<String, AtomicInteger> inFlightReplicationsPerVolume =
new ConcurrentHashMap<>();

public void incrementInFlightReplication(String volumeRoot) {
inFlightReplicationsPerVolume.computeIfAbsent(volumeRoot,
k -> new AtomicInteger(0)).incrementAndGet();
}

public void decrementInFlightReplication(String volumeRoot) {
AtomicInteger counter = inFlightReplicationsPerVolume.get(volumeRoot);
if (counter != null) {
counter.decrementAndGet();
}
}

public int getInFlightReplications(String volumeRoot) {
AtomicInteger counter = inFlightReplicationsPerVolume.get(volumeRoot);
return counter == null ? 0 : counter.get();
}

private final Map<Class<?>, AtomicInteger> taskCounter =
new ConcurrentHashMap<>();
private int maxQueueSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container.replication;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;

/**
* Class to wrap details used to track pending replications.
Expand All @@ -35,6 +36,9 @@ public enum PendingOpType {
private final DatanodeDetails target;
private final int replicaIndex;
private final long deadlineEpochMillis;
private final long containerSize;
private final SCMCommand<?> command;
private final DatanodeDetails decommissionSource;

public static ContainerReplicaOp create(PendingOpType opType,
DatanodeDetails target, int replicaIndex) {
Expand All @@ -43,11 +47,21 @@ public static ContainerReplicaOp create(PendingOpType opType,
}

public ContainerReplicaOp(PendingOpType opType,
DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
DatanodeDetails target, int replicaIndex, SCMCommand<?> command,
long deadlineEpochMillis, long containerSize,
DatanodeDetails decommissionSource) {
this.opType = opType;
this.target = target;
this.replicaIndex = replicaIndex;
this.command = command;
this.deadlineEpochMillis = deadlineEpochMillis;
this.containerSize = containerSize;
this.decommissionSource = decommissionSource;
}

public ContainerReplicaOp(PendingOpType opType,
DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
this(opType, target, replicaIndex, null, deadlineEpochMillis, 0, null);
}

public PendingOpType getOpType() {
Expand All @@ -66,4 +80,16 @@ public long getDeadlineEpochMillis() {
return deadlineEpochMillis;
}

public long getContainerSize() {
return containerSize;
}

public SCMCommand<?> getCommand() {
return command;
}

public DatanodeDetails getDecommissionSource() {
return decommissionSource;
}

}
Loading