Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,31 @@ public class ScmConfig extends ReconfigurableConfig {
)
private int transactionToDNsCommitMapLimit = 5000000;

@Config(key = "hdds.scm.container.pending.allocation.roll.interval",
defaultValue = "5m",
type = ConfigType.TIME,
tags = { ConfigTag.SCM, ConfigTag.CONTAINER },
description =
"Time interval for rolling the pending container allocation window. " +
"Pending container allocations are tracked in a two-window tumbling bucket " +
"pattern. Each window has this duration. " +
"After 2x this interval, allocations that haven't been confirmed via " +
"container reports will automatically age out. Default is 5 minutes."
)
private Duration pendingContainerAllocationRollInterval = Duration.ofMinutes(5);

public int getTransactionToDNsCommitMapLimit() {
return transactionToDNsCommitMapLimit;
}

public Duration getPendingContainerAllocationRollInterval() {
return pendingContainerAllocationRollInterval;
}

public void setPendingContainerAllocationRollInterval(Duration duration) {
this.pendingContainerAllocationRollInterval = duration;
}

public Duration getBlockDeletionInterval() {
return blockDeletionInterval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private ContainerInfo createContainer(Pipeline pipeline, String owner)
private ContainerInfo allocateContainer(final Pipeline pipeline,
final String owner)
throws IOException {
if (!pipelineManager.hasEnoughSpace(pipeline, maxContainerSize)) {
if (!pipelineManager.hasEnoughSpace(pipeline)) {
LOG.debug("Cannot allocate a new container because pipeline {} does not have the required space {}.",
pipeline, maxContainerSize);
return null;
Expand Down Expand Up @@ -278,6 +278,8 @@ private ContainerInfo allocateContainer(final Pipeline pipeline,

containerStateManager.addContainer(containerInfoBuilder.build());
scmContainerManagerMetrics.incNumSuccessfulCreateContainers();
pipelineManager.recordPendingAllocation(pipeline, containerID);

return containerStateManager.getContainer(containerID);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.PendingContainerTracker;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer;
Expand Down Expand Up @@ -175,6 +176,17 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
if (!alreadyInDn) {
// This is a new Container not in the nodeManager -> dn map yet
getNodeManager().addContainer(datanodeDetails, cid);

// Remove from pending tracker when container is added to DN
// This container was just confirmed for the first time on this DN
// No need to remove on subsequent reports (it's already been removed)
if (container != null) {
PendingContainerTracker tracker =
getNodeManager().getPendingContainerTracker();
if (tracker != null) {
tracker.removePendingAllocation(datanodeDetails, cid);
}
}
}
if (container == null || ContainerReportValidator
.validate(container, datanodeDetails, replica)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.PendingContainerTracker;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
Expand Down Expand Up @@ -103,6 +104,12 @@ protected void processICR(IncrementalContainerReportFromDatanode report,
}
if (ContainerReportValidator.validate(container, dd, replicaProto)) {
processContainerReplica(dd, container, replicaProto, publisher, detailsForLogging);

PendingContainerTracker tracker =
getNodeManager().getPendingContainerTracker();
if (tracker != null) {
tracker.removePendingAllocation(dd, id);
}
}
success = true;
} catch (ContainerNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,4 +422,28 @@ default void removeNode(DatanodeDetails datanodeDetails) throws NodeNotFoundExce
}

int openContainerLimit(List<DatanodeDetails> datanodes);

/**
* SCM-side tracker for container allocations not yet reported by datanodes.
*/
PendingContainerTracker getPendingContainerTracker();

/**
* True if the node can accept another container of the given size, accounting for
* {@link #getPendingContainerTracker()}.
*/
boolean hasSpaceForNewContainerAllocation(DatanodeDetails node);

/**
* Records a pending container allocation for {@code node} so that subsequent
* space checks via {@link #hasSpaceForNewContainerAllocation} account for the
* in-flight allocation before the datanode sends an ICR.
*
* @param node the datanode that will receive the new container replica
* @param containerID the container being allocated
*/
default void recordPendingAllocationForDatanode(DatanodeDetails node,
ContainerID containerID) {
getPendingContainerTracker().recordPendingAllocationForDatanode(node, containerID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.scm.container.ContainerID;
Expand Down Expand Up @@ -143,6 +144,13 @@ public class SCMNodeManager implements NodeManager {
private final NonWritableNodeFilter nonWritableNodeFilter;
private final int numContainerPerVolume;

/**
* SCM-side pending container allocations per datanode (not yet in container reports).
*/
private final PendingContainerTracker pendingContainerTracker;

private final long maxContainerSizeBytes;

/**
* Lock used to synchronize some operation in Node manager to ensure a
* consistent view of the node state.
Expand Down Expand Up @@ -205,6 +213,14 @@ public SCMNodeManager(
this.scmContext = scmContext;
this.sendCommandNotifyMap = new HashMap<>();
this.nonWritableNodeFilter = new NonWritableNodeFilter(conf);

this.maxContainerSizeBytes = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
ScmConfig scmConfig = conf.getObject(ScmConfig.class);
long rollIntervalMs = scmConfig.getPendingContainerAllocationRollInterval().toMillis();
this.pendingContainerTracker = new PendingContainerTracker(
maxContainerSizeBytes, rollIntervalMs, this.metrics);
}

@Override
Expand All @@ -225,6 +241,40 @@ private void unregisterMXBean() {
}
}

@Override
public PendingContainerTracker getPendingContainerTracker() {
return pendingContainerTracker;
}

/**
* Effective space check aligned with container allocation: per-disk slot model minus
* SCM pending allocations.
*/
@Override
public boolean hasSpaceForNewContainerAllocation(DatanodeDetails node) {
Objects.requireNonNull(node, "node==null");
try {
DatanodeInfo datanodeInfo = getDatanodeInfo(node);
if (datanodeInfo == null) {
LOG.warn("DatanodeInfo not found for node {}", node.getUuidString());
return false;
}
return pendingContainerTracker.hasEffectiveAllocatableSpaceForNewContainer(
node, datanodeInfo);
} catch (Exception e) {
LOG.warn("Error checking allocatable space for node {}", node.getUuidString(), e);
return false;
}
}

@Override
public void recordPendingAllocationForDatanode(DatanodeDetails node,
ContainerID containerID) {
Objects.requireNonNull(node, "node==null");
Objects.requireNonNull(containerID, "containerID==null");
pendingContainerTracker.recordPendingAllocationForDatanode(node, containerID);
}

protected NodeStateManager getNodeStateManager() {
return nodeStateManager;
}
Expand Down Expand Up @@ -706,6 +756,7 @@ public void processNodeReport(DatanodeDetails datanodeDetails,
datanodeInfo.updateStorageReports(nodeReport.getStorageReportList());
datanodeInfo.updateMetaDataStorageReports(nodeReport.
getMetadataStorageReportList());
pendingContainerTracker.rollWindowsIfNeeded(datanodeDetails);
metrics.incNumNodeReportProcessed();
}
} catch (NodeNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ void incNumPendingContainersRemoved() {
numPendingContainersRemoved.incr();
}

public long getNumPendingContainersAdded() {
return numPendingContainersAdded.value();
}

public long getNumPendingContainersRemoved() {
return numPendingContainersRemoved.value();
}

void incNumSkippedFullNodeContainerAllocation() {
numSkippedFullNodeContainerAllocation.incr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,14 @@ void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
void releaseWriteLock();

/**
* Checks whether all Datanodes in the specified pipeline have greater than the specified space, containerSize.
* Checks whether all Datanodes in the specified pipeline have greater than the container size.
* @param pipeline pipeline to check
* @param containerSize the required amount of space
* @return false if all the volumes on any Datanode in the pipeline have space less than equal to the specified
* @return false if all the volumes on any Datanode in the pipeline have space less than equal to the
* containerSize, otherwise true
*/
boolean hasEnoughSpace(Pipeline pipeline, long containerSize);
boolean hasEnoughSpace(Pipeline pipeline);

void recordPendingAllocation(Pipeline pipeline, ContainerID containerID);

int openContainerLimit(List<DatanodeDetails> datanodes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
Expand All @@ -54,7 +53,6 @@
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
Expand Down Expand Up @@ -636,19 +634,22 @@ private boolean isOpenWithUnregisteredNodes(Pipeline pipeline) {
}

@Override
public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) {
public boolean hasEnoughSpace(Pipeline pipeline) {
for (DatanodeDetails node : pipeline.getNodes()) {
if (!(node instanceof DatanodeInfo)) {
node = nodeManager.getDatanodeInfo(node);
}
if (!SCMCommonPlacementPolicy.hasEnoughSpace(node, 0, containerSize)) {
if (!nodeManager.hasSpaceForNewContainerAllocation(node)) {
return false;
}
}

return true;
}

@Override
public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) {
for (DatanodeDetails node : pipeline.getNodes()) {
nodeManager.recordPendingAllocationForDatanode(node, containerID);
}
}

/**
* Schedules a fixed interval job to create pipelines.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ public ContainerWithPipeline allocateContainer(ReplicationConfig replicationConf
getScm().checkAdminAccess(getRemoteUser(), false);
final ContainerInfo container = scm.getContainerManager()
.allocateContainer(replicationConfig, owner);
if (container == null) {
throw new SCMException(
"Could not allocate container for replication " + replicationConfig
+ ", owner=" + owner
+ ": no suitable open pipeline with enough space",
ResultCodes.FAILED_TO_ALLOCATE_CONTAINER);
}
final Pipeline pipeline = scm.getPipelineManager()
.getPipeline(container.getPipelineID());
ContainerWithPipeline cp = new ContainerWithPipeline(container, pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.PendingContainerTracker;
import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
Expand Down Expand Up @@ -113,6 +114,7 @@ public class MockNodeManager implements NodeManager {
private ConcurrentMap<String, Set<String>> dnsToUuidMap;
private int numHealthyDisksPerDatanode;
private int numPipelinePerDatanode;
private PendingContainerTracker pendingContainerTracker;

{
this.healthyNodes = new LinkedList<>();
Expand Down Expand Up @@ -941,6 +943,25 @@ public void setNumHealthyVolumes(int value) {
numHealthyDisksPerDatanode = value;
}

@Override
public PendingContainerTracker getPendingContainerTracker() {
int rollIntervalMs = 5 * 60 * 1000;
if (pendingContainerTracker == null) {
pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024, rollIntervalMs, null);
}
return pendingContainerTracker;
}

@Override
public boolean hasSpaceForNewContainerAllocation(DatanodeDetails node) {
DatanodeInfo info = getDatanodeInfo(node);
if (info == null) {
return false;
}
return getPendingContainerTracker()
.hasEffectiveAllocatableSpaceForNewContainer(node, info);
}

/**
* A class to declare some values for the nodes so that our tests
* won't fail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.PendingContainerTracker;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
Expand All @@ -62,6 +63,7 @@ public class SimpleMockNodeManager implements NodeManager {
private Map<DatanodeID, DatanodeInfo> nodeMap = new ConcurrentHashMap<>();
private Map<DatanodeID, Set<PipelineID>> pipelineMap = new ConcurrentHashMap<>();
private Map<DatanodeID, Set<ContainerID>> containerMap = new ConcurrentHashMap<>();
private PendingContainerTracker pendingContainerTracker;

public void register(DatanodeDetails dd, NodeStatus status) {
dd.setPersistedOpState(status.getOperationalState());
Expand Down Expand Up @@ -435,4 +437,18 @@ public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) {
return false;
}

@Override
public PendingContainerTracker getPendingContainerTracker() {
int rollIntervalMs = 5 * 60 * 1000;
if (pendingContainerTracker == null) {
pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024, rollIntervalMs, null);
}
return pendingContainerTracker;
}

@Override
public boolean hasSpaceForNewContainerAllocation(DatanodeDetails node) {
return true;
}

}
Loading