diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java index 0876bd22ea4f..1e7aece7ed09 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java @@ -138,10 +138,31 @@ public class ScmConfig extends ReconfigurableConfig { ) private int transactionToDNsCommitMapLimit = 5000000; + @Config(key = "hdds.scm.container.pending.allocation.roll.interval", + defaultValue = "5m", + type = ConfigType.TIME, + tags = { ConfigTag.SCM, ConfigTag.CONTAINER }, + description = + "Time interval for rolling the pending container allocation window. " + + "Pending container allocations are tracked in a two-window tumbling bucket " + + "pattern. Each window has this duration. " + + "After 2x this interval, allocations that haven't been confirmed via " + + "container reports will automatically age out. Default is 5 minutes." + ) + private Duration pendingContainerAllocationRollInterval = Duration.ofMinutes(5); + public int getTransactionToDNsCommitMapLimit() { return transactionToDNsCommitMapLimit; } + public Duration getPendingContainerAllocationRollInterval() { + return pendingContainerAllocationRollInterval; + } + + public void setPendingContainerAllocationRollInterval(Duration duration) { + this.pendingContainerAllocationRollInterval = duration; + } + public Duration getBlockDeletionInterval() { return blockDeletionInterval; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index 7780c0839462..3daf07afa641 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -245,7 +245,7 @@ private ContainerInfo createContainer(Pipeline pipeline, String owner) private ContainerInfo allocateContainer(final Pipeline pipeline, final String owner) throws IOException { - if (!pipelineManager.hasEnoughSpace(pipeline, maxContainerSize)) { + if (!pipelineManager.hasEnoughSpace(pipeline)) { LOG.debug("Cannot allocate a new container because pipeline {} does not have the required space {}.", pipeline, maxContainerSize); return null; @@ -278,6 +278,8 @@ private ContainerInfo allocateContainer(final Pipeline pipeline, containerStateManager.addContainer(containerInfoBuilder.build()); scmContainerManagerMetrics.incNumSuccessfulCreateContainers(); + pipelineManager.recordPendingAllocation(pipeline, containerID); + return containerStateManager.getContainer(containerID); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 0cebcb10ef2c..e34305491e41 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer; @@ -175,6 +176,17 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode, if (!alreadyInDn) { // This is a new Container not in the nodeManager -> dn map yet getNodeManager().addContainer(datanodeDetails, cid); + + // Remove from pending tracker when container is added to DN + // This container was just confirmed for the first time on this DN + // No need to remove on subsequent reports (it's already been removed) + if (container != null) { + PendingContainerTracker tracker = + getNodeManager().getPendingContainerTracker(); + if (tracker != null) { + tracker.removePendingAllocation(datanodeDetails, cid); + } + } } if (container == null || ContainerReportValidator .validate(container, datanodeDetails, replica)) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java index 247e3667d9ef..123189887820 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -103,6 +104,12 @@ protected void processICR(IncrementalContainerReportFromDatanode report, } if (ContainerReportValidator.validate(container, dd, replicaProto)) { processContainerReplica(dd, container, replicaProto, publisher, detailsForLogging); + + PendingContainerTracker tracker = + getNodeManager().getPendingContainerTracker(); + if (tracker != null) { + tracker.removePendingAllocation(dd, id); + } } success = true; } catch (ContainerNotFoundException e) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index e9a019945c1f..039bb71ae877 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -422,4 +422,28 @@ default void removeNode(DatanodeDetails datanodeDetails) throws NodeNotFoundExce } int openContainerLimit(List datanodes); + + /** + * SCM-side tracker for container allocations not yet reported by datanodes. + */ + PendingContainerTracker getPendingContainerTracker(); + + /** + * True if the node can accept another container of the given size, accounting for + * {@link #getPendingContainerTracker()}. + */ + boolean hasSpaceForNewContainerAllocation(DatanodeDetails node); + + /** + * Records a pending container allocation for {@code node} so that subsequent + * space checks via {@link #hasSpaceForNewContainerAllocation} account for the + * in-flight allocation before the datanode sends an ICR. + * + * @param node the datanode that will receive the new container replica + * @param containerID the container being allocated + */ + default void recordPendingAllocationForDatanode(DatanodeDetails node, + ContainerID containerID) { + getPendingContainerTracker().recordPendingAllocationForDatanode(node, containerID); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 3289e7b312a8..7c014addb4f9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -143,6 +144,13 @@ public class SCMNodeManager implements NodeManager { private final NonWritableNodeFilter nonWritableNodeFilter; private final int numContainerPerVolume; + /** + * SCM-side pending container allocations per datanode (not yet in container reports). + */ + private final PendingContainerTracker pendingContainerTracker; + + private final long maxContainerSizeBytes; + /** * Lock used to synchronize some operation in Node manager to ensure a * consistent view of the node state. @@ -205,6 +213,14 @@ public SCMNodeManager( this.scmContext = scmContext; this.sendCommandNotifyMap = new HashMap<>(); this.nonWritableNodeFilter = new NonWritableNodeFilter(conf); + + this.maxContainerSizeBytes = (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + ScmConfig scmConfig = conf.getObject(ScmConfig.class); + long rollIntervalMs = scmConfig.getPendingContainerAllocationRollInterval().toMillis(); + this.pendingContainerTracker = new PendingContainerTracker( + maxContainerSizeBytes, rollIntervalMs, this.metrics); } @Override @@ -225,6 +241,40 @@ private void unregisterMXBean() { } } + @Override + public PendingContainerTracker getPendingContainerTracker() { + return pendingContainerTracker; + } + + /** + * Effective space check aligned with container allocation: per-disk slot model minus + * SCM pending allocations. + */ + @Override + public boolean hasSpaceForNewContainerAllocation(DatanodeDetails node) { + Objects.requireNonNull(node, "node==null"); + try { + DatanodeInfo datanodeInfo = getDatanodeInfo(node); + if (datanodeInfo == null) { + LOG.warn("DatanodeInfo not found for node {}", node.getUuidString()); + return false; + } + return pendingContainerTracker.hasEffectiveAllocatableSpaceForNewContainer( + node, datanodeInfo); + } catch (Exception e) { + LOG.warn("Error checking allocatable space for node {}", node.getUuidString(), e); + return false; + } + } + + @Override + public void recordPendingAllocationForDatanode(DatanodeDetails node, + ContainerID containerID) { + Objects.requireNonNull(node, "node==null"); + Objects.requireNonNull(containerID, "containerID==null"); + pendingContainerTracker.recordPendingAllocationForDatanode(node, containerID); + } + protected NodeStateManager getNodeStateManager() { return nodeStateManager; } @@ -706,6 +756,7 @@ public void processNodeReport(DatanodeDetails datanodeDetails, datanodeInfo.updateStorageReports(nodeReport.getStorageReportList()); datanodeInfo.updateMetaDataStorageReports(nodeReport. getMetadataStorageReportList()); + pendingContainerTracker.rollWindowsIfNeeded(datanodeDetails); metrics.incNumNodeReportProcessed(); } } catch (NodeNotFoundException e) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java index 0014936a80db..0b90247ae8ab 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java @@ -136,6 +136,14 @@ void incNumPendingContainersRemoved() { numPendingContainersRemoved.incr(); } + public long getNumPendingContainersAdded() { + return numPendingContainersAdded.value(); + } + + public long getNumPendingContainersRemoved() { + return numPendingContainersRemoved.value(); + } + void incNumSkippedFullNodeContainerAllocation() { numSkippedFullNodeContainerAllocation.incr(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 6a448d6c88df..ecfe9b1ef5f4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -213,13 +213,14 @@ void reinitialize(Table pipelineStore) void releaseWriteLock(); /** - * Checks whether all Datanodes in the specified pipeline have greater than the specified space, containerSize. + * Checks whether all Datanodes in the specified pipeline have greater than the container size. * @param pipeline pipeline to check - * @param containerSize the required amount of space - * @return false if all the volumes on any Datanode in the pipeline have space less than equal to the specified + * @return false if all the volumes on any Datanode in the pipeline have space less than equal to the * containerSize, otherwise true */ - boolean hasEnoughSpace(Pipeline pipeline, long containerSize); + boolean hasEnoughSpace(Pipeline pipeline); + + void recordPendingAllocation(Pipeline pipeline, ContainerID containerID); int openContainerLimit(List datanodes); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index 9c529e22e7e1..2e3c522677a2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -54,7 +53,6 @@ import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; -import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -636,19 +634,22 @@ private boolean isOpenWithUnregisteredNodes(Pipeline pipeline) { } @Override - public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) { + public boolean hasEnoughSpace(Pipeline pipeline) { for (DatanodeDetails node : pipeline.getNodes()) { - if (!(node instanceof DatanodeInfo)) { - node = nodeManager.getDatanodeInfo(node); - } - if (!SCMCommonPlacementPolicy.hasEnoughSpace(node, 0, containerSize)) { + if (!nodeManager.hasSpaceForNewContainerAllocation(node)) { return false; } } - return true; } + @Override + public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) { + for (DatanodeDetails node : pipeline.getNodes()) { + nodeManager.recordPendingAllocationForDatanode(node, containerID); + } + } + /** * Schedules a fixed interval job to create pipelines. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 156eed688d81..38cacb1c6086 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -255,6 +255,13 @@ public ContainerWithPipeline allocateContainer(ReplicationConfig replicationConf getScm().checkAdminAccess(getRemoteUser(), false); final ContainerInfo container = scm.getContainerManager() .allocateContainer(replicationConfig, owner); + if (container == null) { + throw new SCMException( + "Could not allocate container for replication " + replicationConfig + + ", owner=" + owner + + ": no suitable open pipeline with enough space", + ResultCodes.FAILED_TO_ALLOCATE_CONTAINER); + } final Pipeline pipeline = scm.getPipelineManager() .getPipeline(container.getPipelineID()); ContainerWithPipeline cp = new ContainerWithPipeline(container, pipeline); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 013f14b16504..dbf2ae4f7481 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -113,6 +114,7 @@ public class MockNodeManager implements NodeManager { private ConcurrentMap> dnsToUuidMap; private int numHealthyDisksPerDatanode; private int numPipelinePerDatanode; + private PendingContainerTracker pendingContainerTracker; { this.healthyNodes = new LinkedList<>(); @@ -941,6 +943,25 @@ public void setNumHealthyVolumes(int value) { numHealthyDisksPerDatanode = value; } + @Override + public PendingContainerTracker getPendingContainerTracker() { + int rollIntervalMs = 5 * 60 * 1000; + if (pendingContainerTracker == null) { + pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024, rollIntervalMs, null); + } + return pendingContainerTracker; + } + + @Override + public boolean hasSpaceForNewContainerAllocation(DatanodeDetails node) { + DatanodeInfo info = getDatanodeInfo(node); + if (info == null) { + return false; + } + return getPendingContainerTracker() + .hasEffectiveAllocatableSpaceForNewContainer(node, info); + } + /** * A class to declare some values for the nodes so that our tests * won't fail. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index 4f0679470eab..97ce8cbe3c67 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -62,6 +63,7 @@ public class SimpleMockNodeManager implements NodeManager { private Map nodeMap = new ConcurrentHashMap<>(); private Map> pipelineMap = new ConcurrentHashMap<>(); private Map> containerMap = new ConcurrentHashMap<>(); + private PendingContainerTracker pendingContainerTracker; public void register(DatanodeDetails dd, NodeStatus status) { dd.setPersistedOpState(status.getOperationalState()); @@ -435,4 +437,18 @@ public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) { return false; } + @Override + public PendingContainerTracker getPendingContainerTracker() { + int rollIntervalMs = 5 * 60 * 1000; + if (pendingContainerTracker == null) { + pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024, rollIntervalMs, null); + } + return pendingContainerTracker; + } + + @Override + public boolean hasSpaceForNewContainerAllocation(DatanodeDetails node) { + return true; + } + } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java index 218a2137e3e6..68dcc634a5e3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java @@ -26,7 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -103,7 +102,7 @@ void setUp() throws Exception { pipelineManager = spy(base); // Default: allow allocation in tests unless a test overrides it. - doReturn(true).when(pipelineManager).hasEnoughSpace(any(Pipeline.class), anyLong()); + doReturn(true).when(pipelineManager).hasEnoughSpace(any(Pipeline.class)); pipelineManager.createPipeline(RatisReplicationConfig.getInstance( ReplicationFactor.THREE)); @@ -141,7 +140,7 @@ void testAllocateContainer() throws Exception { */ @Test public void testGetMatchingContainerReturnsNullWhenNotEnoughSpaceInDatanodes() throws IOException { - doReturn(false).when(pipelineManager).hasEnoughSpace(any(), anyLong()); + doReturn(false).when(pipelineManager).hasEnoughSpace(any()); long sizeRequired = 256 * 1024 * 1024; // 256 MB Pipeline pipeline = pipelineManager.getPipelines().iterator().next(); @@ -166,7 +165,7 @@ public void testGetMatchingContainerReturnsContainerWhenEnoughSpaceInDatanodes() // create a spy to mock hasEnoughSpace to always return true PipelineManager spyPipelineManager = spy(pipelineManager); doReturn(true).when(spyPipelineManager) - .hasEnoughSpace(any(Pipeline.class), anyLong()); + .hasEnoughSpace(any(Pipeline.class)); // create a new ContainerManager using the spy File tempDir = new File(testDir, "tempDir"); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 9b7c5c77b2cd..4dbe79fc1351 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -26,7 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -160,7 +159,7 @@ SCMNodeManager createNodeManager(OzoneConfiguration config) { ContainerManager createContainerManager() throws IOException { pipelineManager = spy(pipelineManager); - doReturn(true).when(pipelineManager).hasEnoughSpace(any(), anyLong()); + doReturn(true).when(pipelineManager).hasEnoughSpace(any()); return new ContainerManagerImpl(conf, scmhaManager, sequenceIdGen, pipelineManager, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java index d6a3fc546352..2889fdcb6cc2 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java @@ -50,9 +50,11 @@ public class MockPipelineManager implements PipelineManager { private final PipelineStateManager stateManager; + private final NodeManager nodeManager; public MockPipelineManager(DBStore dbStore, SCMHAManager scmhaManager, NodeManager nodeManager) throws RocksDatabaseException, CodecException, DuplicatedPipelineIdException { + this.nodeManager = nodeManager; stateManager = PipelineStateManagerImpl .newBuilder().setNodeManager(nodeManager) .setRatisServer(scmhaManager.getRatisServer()) @@ -332,8 +334,17 @@ public boolean isPipelineCreationFrozen() { } @Override - public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) { - return false; + public boolean hasEnoughSpace(Pipeline pipeline) { + for (DatanodeDetails node : pipeline.getNodes()) { + if (!nodeManager.hasSpaceForNewContainerAllocation(node)) { + return false; + } + } + return true; + } + + @Override + public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) { } @Override diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index e7fc6f14f9b6..6b0b91eec822 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -937,7 +938,7 @@ public void testCreatePipelineForRead() throws IOException { } /** - * {@link PipelineManager#hasEnoughSpace(Pipeline, long)} should return false if all the + * {@link PipelineManager#hasEnoughSpace(Pipeline)} should return false if all the * volumes on any Datanode in the pipeline have less than equal to the space required for creating a new container. */ @Test @@ -976,7 +977,12 @@ public void testHasEnoughSpace() throws IOException { doReturn(info).when(mockedNodeManager).getDatanodeInfo(dn); datanodeInfoList.add(info); } - assertTrue(pipelineManager.hasEnoughSpace(pipeline, containerSize)); + doAnswer(invocation -> { + DatanodeDetails dn = invocation.getArgument(0); + DatanodeInfo info = mockedNodeManager.getDatanodeInfo(dn); + return SCMCommonPlacementPolicy.hasEnoughSpace(info, 0, containerSize); + }).when(mockedNodeManager).hasSpaceForNewContainerAllocation(any(DatanodeDetails.class)); + assertTrue(pipelineManager.hasEnoughSpace(pipeline)); // Case 2: One node does not have enough space. /* @@ -990,13 +996,13 @@ public void testHasEnoughSpace() throws IOException { DatanodeInfo datanodeInfo = datanodeInfoList.get(0); datanodeInfo.updateStorageReports(HddsTestUtils.createStorageReports(datanodeInfo.getID(), 200L, 120L, 20L)); - assertFalse(pipelineManager.hasEnoughSpace(pipeline, containerSize)); + assertFalse(pipelineManager.hasEnoughSpace(pipeline)); // Case 3: All nodes do not have enough space. for (DatanodeInfo info : datanodeInfoList) { info.updateStorageReports(HddsTestUtils.createStorageReports(info.getID(), 200L, 100L, 20L)); } - assertFalse(pipelineManager.hasEnoughSpace(pipeline, containerSize)); + assertFalse(pipelineManager.hasEnoughSpace(pipeline)); } private Set createContainerReplicasList( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java new file mode 100644 index 000000000000..83053c5e1f1a --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; +import org.apache.hadoop.hdds.scm.node.SCMNodeManager; +import org.apache.hadoop.hdds.scm.node.SCMNodeMetrics; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.TestDataUtil; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests for PendingContainerTracker. + */ +@Timeout(300) +public class TestPendingContainerTrackerIntegration { + + private static final Logger LOG = + LoggerFactory.getLogger(TestPendingContainerTrackerIntegration.class); + private MiniOzoneCluster cluster; + private StorageContainerManager scm; + private OzoneClient client; + private ContainerManager containerManager; + private PendingContainerTracker pendingTracker; + private SCMNodeMetrics metrics; + private OzoneBucket bucket; + + @BeforeEach + public void setup() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + + conf.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "60s"); + + // Reduce heartbeat interval for faster container reports + conf.set(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, "10s"); + + conf.set("ozone.scm.container.size", "100MB"); + conf.set("ozone.scm.pipeline.owner.container.count", "1"); + conf.set("ozone.scm.pipeline.per.metadata.disk", "1"); + conf.set("ozone.scm.datanode.pipeline.limit", "1"); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(3) + .build(); + cluster.waitForClusterToBeReady(); + cluster.waitTobeOutOfSafeMode(); + + scm = cluster.getStorageContainerManager(); + containerManager = scm.getContainerManager(); + client = cluster.newClient(); + + // Create bucket for testing + bucket = TestDataUtil.createVolumeAndBucket(client); + + SCMNodeManager nodeManager = (SCMNodeManager) scm.getScmNodeManager(); + assertNotNull(nodeManager); + pendingTracker = nodeManager.getPendingContainerTracker(); + assertNotNull(pendingTracker, "PendingContainerTracker should be initialized"); + metrics = pendingTracker.getMetrics(); + + LOG.info("Test setup complete - ICR interval: 5s, Heartbeat interval: 1s"); + } + + @AfterEach + public void cleanup() throws Exception { + if (client != null) { + client.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Test: Write key → Container allocation → Pending tracked → ICR → Pending removed. + */ + @Test + public void testKeyWriteRecordsPendingAndICRRemovesIt() throws Exception { + long initialAdded = metrics.getNumPendingContainersAdded(); + long initialRemoved = metrics.getNumPendingContainersRemoved(); + + // Allocate a container directly + ContainerInfo container = containerManager.allocateContainer( + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + "omServiceIdDefault"); + + // Find the container that was allocated + ContainerInfo containerInfo = scm.getContainerManager().getContainers().get(0); + ContainerWithPipeline containerWithPipeline = + scm.getClientProtocolServer().getContainerWithPipeline( + containerInfo.getContainerID()); + + Pipeline pipeline = containerWithPipeline.getPipeline(); + + // Verify pending containers are tracked for all nodes in pipeline + List nodesWithPending = new ArrayList<>(); + for (DatanodeDetails dn : pipeline.getNodes()) { + long pendingCount = pendingTracker.getPendingContainerCount(dn); + if (pendingCount > 0) { + nodesWithPending.add(dn); + LOG.info("DataNode {} has {} pending containers", dn.getUuidString(), pendingCount); + + assertThat(pendingTracker.containsPendingContainer(dn, container.containerID())); + } + } + + assertThat(nodesWithPending).isNotEmpty(); + + // Verify metrics increased + long afterAdded = metrics.getNumPendingContainersAdded(); + assertThat(afterAdded).isGreaterThan(initialAdded); + + LOG.info("Pending tracked successfully. Waiting for ICR to remove pending..."); + + // Write a key + String keyName = "testKey1"; + byte[] data = "Testing Pending Container Tracker".getBytes(UTF_8); + + LOG.info("Writing key: {}", keyName); + try (OzoneOutputStream out = bucket.createKey(keyName, data.length, + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new java.util.HashMap<>())) { + out.write(data); + } + LOG.info("Key written successfully"); + + // Wait for ICRs to be sent + GenericTestUtils.waitFor(() -> { + for (DatanodeDetails dn : nodesWithPending) { + if (pendingTracker.containsPendingContainer(dn, container.containerID())) { + LOG.info("Still waiting for ICR from DN {}", dn.getUuidString()); + return false; + } + } + + LOG.info("All pending containers removed via ICR!"); + return true; + }, 100, 5000); + + // Verify all pending containers removed + for (DatanodeDetails dn : nodesWithPending) { + assertFalse(pendingTracker.containsPendingContainer(dn, container.containerID())); + } + + // Verify remove metrics increased + long afterRemoved = metrics.getNumPendingContainersRemoved(); + assertThat(afterRemoved).isGreaterThan(initialRemoved); + + LOG.info("After added = " + afterAdded); + + } + + /** + * Test: Verify idempotency - container reported multiple times. + */ + @Test + public void testIdempotentPendingTracking() throws Exception { + // Allocate a container directly + ContainerInfo container = containerManager.allocateContainer( + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + "omServiceIdDefault"); + + Pipeline pipeline = scm.getPipelineManager().getPipeline(container.getPipelineID()); + DatanodeDetails firstNode = pipeline.getFirstNode(); + + // Record initial state + long initialCount = pendingTracker.getPendingContainerCount(firstNode); + + LOG.info("Initial pending state: count={}", initialCount); + + // Try adding the same container again (simulates retry or duplicate allocation) + pendingTracker.recordPendingAllocationForDatanode(firstNode, container.containerID()); + + long afterCount = pendingTracker.getPendingContainerCount(firstNode); + + // Count should remain the same (idempotent) + assertEquals(initialCount, afterCount, + "Pending count should not change when adding duplicate container"); + + } + + /** + * Test: Verify metrics are updated correctly. + */ + @Test + public void testMetricsUpdateThroughLifecycle() throws Exception { + long initialAdded = metrics.getNumPendingContainersAdded(); + long initialRemoved = metrics.getNumPendingContainersRemoved(); + + LOG.info("Initial metrics: added={}, removed={}", initialAdded, initialRemoved); + + // Write multiple keys + for (int i = 0; i < 3; i++) { + String keyName = "metricsTestKey" + i; + byte[] data = ("Metrics test " + i).getBytes(UTF_8); + + try (OzoneOutputStream out = bucket.createKey(keyName, data.length, + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new java.util.HashMap<>())) { + out.write(data); + } + } + + // addedMetrics should increase as containers are allocated + GenericTestUtils.waitFor(() -> { + long afterAdded = metrics.getNumPendingContainersAdded(); + return afterAdded > initialAdded; + }, 100, 5000); + + // Removed metric should increase after icr process + GenericTestUtils.waitFor(() -> { + long afterRemoved = metrics.getNumPendingContainersRemoved(); + return initialRemoved < afterRemoved; + }, 100, 5000); + } +}