diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index e1cf9a45bf50..fc2ee8a049ca 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -144,6 +144,19 @@ default int getAllNodeCount() { return getAllNodes().size(); } + /** + * Returns the number of datanodes currently marked finalized by SCM layout version + * metadata/software comparison. + */ + default int getNumDatanodesFinalized() { + return (int) getAllNodes().stream() + .filter(DatanodeInfo.class::isInstance) + .map(DatanodeInfo.class::cast) + .filter(datanodeInfo -> + SCMNodeManager.isDatanodeFinalized(datanodeInfo.getLastKnownLayoutVersion())) + .count(); + } + /** * Returns the aggregated node stats. * @return the aggregated node stats. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index f7432ad47e53..3ef2ddcd3553 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; import java.util.function.Function; @@ -140,6 +141,7 @@ public class SCMNodeManager implements NodeManager { BiConsumer>> sendCommandNotifyMap; private final NonWritableNodeFilter nonWritableNodeFilter; private final int numContainerPerVolume; + private final AtomicInteger datanodesFinalized; /** * Lock used to synchronize some operation in Node manager to ensure a @@ -202,6 +204,7 @@ public SCMNodeManager( this.scmContext = scmContext; this.sendCommandNotifyMap = new HashMap<>(); this.nonWritableNodeFilter = new NonWritableNodeFilter(conf); + this.datanodesFinalized = new AtomicInteger(0); } @Override @@ -422,6 +425,7 @@ public RegisteredCommand register( try { clusterMap.add(datanodeDetails); nodeStateManager.addNode(datanodeDetails, layoutInfo); + datanodesFinalized.addAndGet(isDatanodeFinalized(layoutInfo) ? 1 : 0); // Check that datanode in nodeStateManager has topology parent set DatanodeDetails dn = nodeStateManager.getNode(datanodeDetails); Preconditions.checkState(dn.getParent() != null); @@ -447,7 +451,9 @@ public RegisteredCommand register( hostName, ipAddress, dnId)) { LOG.info("Updating datanode from {} to {}", oldNode, datanodeDetails); clusterMap.update(oldNode, datanodeDetails); + LayoutVersionProto oldLayoutVersion = oldNode.getLastKnownLayoutVersion(); nodeStateManager.updateNode(datanodeDetails, layoutInfo); + updateDatanodesFinalizedCount(oldLayoutVersion, layoutInfo); DatanodeDetails dn = nodeStateManager.getNode(datanodeDetails); Preconditions.checkState(dn.getParent() != null); processNodeReport(datanodeDetails, nodeReport); @@ -457,7 +463,9 @@ public RegisteredCommand register( LOG.info("Update the version for registered datanode {}, " + "oldVersion = {}, newVersion = {}.", datanodeDetails, oldNode.getVersion(), datanodeDetails.getVersion()); + LayoutVersionProto oldLayoutVersion = oldNode.getLastKnownLayoutVersion(); nodeStateManager.updateNode(datanodeDetails, layoutInfo); + updateDatanodesFinalizedCount(oldLayoutVersion, layoutInfo); } } catch (NodeNotFoundException e) { LOG.error("Cannot find datanode {} from nodeStateManager", @@ -732,8 +740,11 @@ public void processLayoutVersionReport(DatanodeDetails datanodeDetails, } try { + DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails); + LayoutVersionProto oldLayoutVersion = datanodeInfo.getLastKnownLayoutVersion(); nodeStateManager.updateLastKnownLayoutVersion(datanodeDetails, layoutVersionReport); + updateDatanodesFinalizedCount(oldLayoutVersion, layoutVersionReport); } catch (NodeNotFoundException e) { LOG.error("SCM trying to process Layout Version from an " + "unregistered node {}.", datanodeDetails); @@ -1053,6 +1064,28 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) { } } + @Override + public int getNumDatanodesFinalized() { + return datanodesFinalized.get(); + } + + private void updateDatanodesFinalizedCount( + LayoutVersionProto previous, LayoutVersionProto current) { + int previousValue = isDatanodeFinalized(previous) ? 1 : 0; + int currentValue = isDatanodeFinalized(current) ? 1 : 0; + if (previousValue != currentValue) { + datanodesFinalized.addAndGet(currentValue - previousValue); + } + } + + public static boolean isDatanodeFinalized(LayoutVersionProto layoutVersion) { + if (layoutVersion == null) { + return false; + } + return layoutVersion.getMetadataLayoutVersion() + >= layoutVersion.getSoftwareLayoutVersion(); + } + /** * Return the node stat of the specified datanode. * @@ -1979,6 +2012,11 @@ public void removeNode(DatanodeDetails datanodeDetails) throws NodeNotFoundExcep if (clusterMap.contains(datanodeDetails)) { clusterMap.remove(datanodeDetails); } + DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails); + if (datanodeInfo != null) { + updateDatanodesFinalizedCount( + datanodeInfo.getLastKnownLayoutVersion(), null); + } nodeStateManager.removeNode(datanodeDetails.getID()); removeFromDnsToDnIdMap(datanodeDetails.getID(), datanodeDetails.getIpAddress()); final List> cmdList = getCommandQueue(datanodeDetails.getID()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index ca29b8c0c350..a9c34cedafb3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -116,6 +116,7 @@ import org.apache.hadoop.ozone.audit.AuditMessage; import org.apache.hadoop.ozone.audit.Auditor; import org.apache.hadoop.ozone.audit.SCMAction; +import org.apache.hadoop.ozone.upgrade.UpgradeFinalization; import org.apache.hadoop.ozone.upgrade.UpgradeFinalization.StatusAndMessages; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -1153,13 +1154,12 @@ public HddsProtos.UpgradeStatus queryUpgradeStatus() throws IOException { try { getScm().checkAdminAccess(getRemoteUser(), true); - // Returning a placeholder for now. - HddsProtos.UpgradeStatus result = HddsProtos.UpgradeStatus.newBuilder() - .setScmFinalized(true) - .setNumDatanodesFinalized(10) - .setNumDatanodesTotal(10) - .setShouldFinalize(true) - .build(); + UpgradeFinalization.Status scmUpgradeStatus = + scm.getLayoutVersionManager().getUpgradeState(); + int totalDatanodes = scm.getScmNodeManager().getAllNodeCount(); + int finalizedDatanodes = scm.getScmNodeManager().getNumDatanodesFinalized(); + HddsProtos.UpgradeStatus result = buildUpgradeStatus( + scmUpgradeStatus, finalizedDatanodes, totalDatanodes); AUDIT.logReadSuccess(buildAuditMessageForSuccess(SCMAction.QUERY_UPGRADE_STATUS, null)); return result; } catch (IOException ex) { @@ -1168,6 +1168,29 @@ public HddsProtos.UpgradeStatus queryUpgradeStatus() throws IOException { } } + static HddsProtos.UpgradeStatus buildUpgradeStatus( + UpgradeFinalization.Status scmUpgradeStatus, + int finalizedDatanodes, + int totalDatanodes) { + return HddsProtos.UpgradeStatus.newBuilder() + .setScmFinalized(isScmFinalized(scmUpgradeStatus)) + .setNumDatanodesFinalized(finalizedDatanodes) + .setNumDatanodesTotal(totalDatanodes) + .setShouldFinalize(shouldFinalize(scmUpgradeStatus, finalizedDatanodes, totalDatanodes)) + .build(); + } + + static boolean isScmFinalized(UpgradeFinalization.Status scmUpgradeStatus) { + return UpgradeFinalization.isFinalized(scmUpgradeStatus) + || UpgradeFinalization.isDone(scmUpgradeStatus); + } + + static boolean shouldFinalize(UpgradeFinalization.Status scmUpgradeStatus, + int finalizedDatanodes, int totalDatanodes) { + return UpgradeFinalization.Status.FINALIZATION_REQUIRED.equals(scmUpgradeStatus) + && finalizedDatanodes == totalDatanodes; + } + @Override public StartContainerBalancerResponseProto startContainerBalancer( Optional threshold, Optional iterations, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index a93fbd4aa99b..547094fe2858 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -737,6 +737,65 @@ public void testProcessLayoutVersion() throws IOException { testProcessLayoutVersionReportHigherMlv(); } + @Test + public void testDatanodeFinalizedCounterTracksLayoutVersionReports() + throws IOException, AuthenticationException { + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + DatanodeDetails node = + HddsTestUtils.createRandomDatanodeAndRegister(nodeManager); + assertEquals(1, nodeManager.getNumDatanodesFinalized(), + "Initial datanode should be counted as finalized"); + + int softwareVersion = + nodeManager.getLayoutVersionManager().getSoftwareLayoutVersion(); + int metadataVersion = + nodeManager.getLayoutVersionManager().getMetadataLayoutVersion(); + nodeManager.processLayoutVersionReport(node, + LayoutVersionProto.newBuilder() + .setMetadataLayoutVersion(metadataVersion - 1) + .setSoftwareLayoutVersion(softwareVersion) + .build()); + assertEquals(0, nodeManager.getNumDatanodesFinalized(), + "Lower metadata layout version should decrement finalized count"); + + nodeManager.processLayoutVersionReport(node, + LayoutVersionProto.newBuilder() + .setMetadataLayoutVersion(metadataVersion) + .setSoftwareLayoutVersion(softwareVersion) + .build()); + assertEquals(1, nodeManager.getNumDatanodesFinalized(), + "Restored metadata layout version should restore finalized count"); + } + } + + @Test + public void testDatanodeFinalizedCounterTracksRegistrationAndRemoveNode() + throws IOException, AuthenticationException, NodeNotFoundException { + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + DatanodeDetails finalizedNode = + registerWithCapacity(nodeManager, CORRECT_LAYOUT_PROTO, success); + assertEquals(1, nodeManager.getNumDatanodesFinalized(), + "Finalized registration should increment finalized count"); + + DatanodeDetails nonFinalizedNode = + registerWithCapacity(nodeManager, SMALLER_MLV_LAYOUT_PROTO, success); + assertEquals(1, nodeManager.getNumDatanodesFinalized(), + "Non-finalized registration should not increment finalized count"); + + nonFinalizedNode.setPersistedOpState( + HddsProtos.NodeOperationalState.DECOMMISSIONED); + nodeManager.removeNode(nonFinalizedNode); + assertEquals(1, nodeManager.getNumDatanodesFinalized(), + "Removing a non-finalized node should not change finalized count"); + + finalizedNode.setPersistedOpState( + HddsProtos.NodeOperationalState.DECOMMISSIONED); + nodeManager.removeNode(finalizedNode); + assertEquals(0, nodeManager.getNumDatanodesFinalized(), + "Removing a finalized node should decrement finalized count"); + } + } + // Currently invoked by testProcessLayoutVersion. public void testProcessLayoutVersionReportHigherMlv() throws IOException { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java index 9402218014ce..a6d24362c6c9 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_READONLY_ADMINISTRATORS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -45,6 +46,7 @@ import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB; import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.upgrade.UpgradeFinalization; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.junit.jupiter.api.AfterEach; @@ -138,6 +140,78 @@ public void testScmListContainer() throws Exception { HddsProtos.ReplicationFactor.THREE).getContainerInfoList().size()); } + @Test + public void testBuildUpgradeStatusMapsFinalizationRequired() { + HddsProtos.UpgradeStatus status = SCMClientProtocolServer.buildUpgradeStatus( + UpgradeFinalization.Status.FINALIZATION_REQUIRED, 1, 2); + + assertFalse(status.getScmFinalized()); + assertFalse(status.getShouldFinalize()); + assertEquals(1, status.getNumDatanodesFinalized()); + assertEquals(2, status.getNumDatanodesTotal()); + } + + @Test + public void testBuildUpgradeStatusMapsFinalizationRequiredAllNodesFinalized() { + HddsProtos.UpgradeStatus status = SCMClientProtocolServer.buildUpgradeStatus( + UpgradeFinalization.Status.FINALIZATION_REQUIRED, 3, 3); + + assertFalse(status.getScmFinalized()); + assertTrue(status.getShouldFinalize()); + assertEquals(3, status.getNumDatanodesFinalized()); + assertEquals(3, status.getNumDatanodesTotal()); + } + + @Test + public void testBuildUpgradeStatusMapsFinalizationInProgress() { + HddsProtos.UpgradeStatus status = SCMClientProtocolServer.buildUpgradeStatus( + UpgradeFinalization.Status.FINALIZATION_IN_PROGRESS, 1, 2); + + assertFalse(status.getScmFinalized()); + assertFalse(status.getShouldFinalize()); + } + + @Test + public void testBuildUpgradeStatusMapsStartingFinalization() { + HddsProtos.UpgradeStatus status = SCMClientProtocolServer.buildUpgradeStatus( + UpgradeFinalization.Status.STARTING_FINALIZATION, 1, 2); + + assertFalse(status.getScmFinalized()); + assertFalse(status.getShouldFinalize()); + assertEquals(1, status.getNumDatanodesFinalized()); + assertEquals(2, status.getNumDatanodesTotal()); + } + + @Test + public void testBuildUpgradeStatusMapsCompletedStatesToFinalized() { + HddsProtos.UpgradeStatus doneStatus = + SCMClientProtocolServer.buildUpgradeStatus( + UpgradeFinalization.Status.FINALIZATION_DONE, 2, 2); + HddsProtos.UpgradeStatus finalizedStatus = + SCMClientProtocolServer.buildUpgradeStatus( + UpgradeFinalization.Status.ALREADY_FINALIZED, 2, 2); + + assertTrue(doneStatus.getScmFinalized()); + assertFalse(doneStatus.getShouldFinalize()); + assertTrue(finalizedStatus.getScmFinalized()); + assertFalse(finalizedStatus.getShouldFinalize()); + } + + @Test + public void testBuildUpgradeStatusFromVersionManagerState() { + HddsProtos.UpgradeStatus needsFinalization = + SCMClientProtocolServer.buildUpgradeStatus(UpgradeFinalization.Status.FINALIZATION_REQUIRED, 1, 3); + assertFalse(needsFinalization.getScmFinalized()); + assertFalse(needsFinalization.getShouldFinalize()); + assertEquals(1, needsFinalization.getNumDatanodesFinalized()); + assertEquals(3, needsFinalization.getNumDatanodesTotal()); + + HddsProtos.UpgradeStatus alreadyFinalized = + SCMClientProtocolServer.buildUpgradeStatus(UpgradeFinalization.Status.ALREADY_FINALIZED, 3, 3); + assertTrue(alreadyFinalized.getScmFinalized()); + assertFalse(alreadyFinalized.getShouldFinalize()); + } + private StorageContainerManager mockStorageContainerManager() { List infos = new ArrayList<>(); for (int i = 0; i < 10; i++) { diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/upgrade/StatusSubCommand.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/upgrade/StatusSubCommand.java index 246887edde9c..b2654e5203d7 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/upgrade/StatusSubCommand.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/upgrade/StatusSubCommand.java @@ -39,8 +39,7 @@ public class StatusSubCommand extends ScmSubcommand { public void execute(ScmClient client) throws IOException { HddsProtos.UpgradeStatus status = client.queryUpgradeStatus(); - // Temporary output to validate the command is working. - out().println("Update status:"); + out().println("Upgrade status:"); out().println(" SCM Finalized: " + status.getScmFinalized()); out().println(" Datanodes finalized: " + status.getNumDatanodesFinalized()); out().println(" Total Datanodes: " + status.getNumDatanodesTotal()); diff --git a/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/ozone/admin/upgrade/TestStatusSubCommand.java b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/ozone/admin/upgrade/TestStatusSubCommand.java new file mode 100644 index 000000000000..d0e0a439d1a0 --- /dev/null +++ b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/ozone/admin/upgrade/TestStatusSubCommand.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.admin.upgrade; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import picocli.CommandLine; + +/** + * Unit tests for {@link StatusSubCommand}. + */ +public class TestStatusSubCommand { + + private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name(); + + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private final PrintStream originalOut = System.out; + private StatusSubCommand cmd; + + @BeforeEach + public void setup() throws UnsupportedEncodingException { + cmd = new StatusSubCommand(); + System.setOut(new PrintStream(outContent, false, DEFAULT_ENCODING)); + } + + @AfterEach + public void tearDown() { + System.setOut(originalOut); + } + + @Test + public void testStatusCommandPrintsUpgradeStatus() throws IOException { + ScmClient scmClient = mock(ScmClient.class); + HddsProtos.UpgradeStatus status = HddsProtos.UpgradeStatus.newBuilder() + .setScmFinalized(false) + .setNumDatanodesFinalized(1) + .setNumDatanodesTotal(3) + .setShouldFinalize(true) + .build(); + when(scmClient.queryUpgradeStatus()).thenReturn(status); + + new CommandLine(cmd).parseArgs(); + cmd.execute(scmClient); + + String output = outContent.toString(DEFAULT_ENCODING); + assertTrue(output.contains("Upgrade status:")); + assertTrue(output.contains("SCM Finalized: false")); + assertTrue(output.contains("Datanodes finalized: 1")); + assertTrue(output.contains("Total Datanodes: 3")); + assertTrue(output.contains("Should Finalize: true")); + verify(scmClient).queryUpgradeStatus(); + } +}