Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,19 @@ default int getAllNodeCount() {
return getAllNodes().size();
}

/**
* Returns the number of datanodes currently marked finalized by SCM layout version
* metadata/software comparison.
*/
default int getNumDatanodesFinalized() {
return (int) getAllNodes().stream()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be the IN_SERVICE + HEALTHY node, rather than all. I guess it might be OK iterating all nodes, but there is a chance that there is some DEAD node that was finalized, and then in the other place where we get the total datanode count it could make the counts match when they should not. Eg we have a finalized decommissioned node, and an unfinalized in_service node and the decommissioned one counts +1 when it should not.

Really what we want to ensure is that all nodes that can finalize, are finalized. Nodes unable to finalize, by being dead should not block finalization.

I think a dead node has to be registered when it comes back alive, at least if the DN process is restarted. If the DN process is just hung for some time preventing the heart beats, the node can be moved back to IN_SERVICE.

I wonder if such a node should be forced to finalize before it rejoins - certainly on registration we can make a node finalize. I think this is out of scope for this PR, but we need to think about it as another change.

.filter(DatanodeInfo.class::isInstance)
.map(DatanodeInfo.class::cast)
.filter(datanodeInfo ->
SCMNodeManager.isDatanodeFinalized(datanodeInfo.getLastKnownLayoutVersion()))
.count();
}

/**
* Returns the aggregated node stats.
* @return the aggregated node stats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand Down Expand Up @@ -140,6 +141,7 @@ public class SCMNodeManager implements NodeManager {
BiConsumer<DatanodeDetails, SCMCommand<?>>> sendCommandNotifyMap;
private final NonWritableNodeFilter nonWritableNodeFilter;
private final int numContainerPerVolume;
private final AtomicInteger datanodesFinalized;

/**
* Lock used to synchronize some operation in Node manager to ensure a
Expand Down Expand Up @@ -202,6 +204,7 @@ public SCMNodeManager(
this.scmContext = scmContext;
this.sendCommandNotifyMap = new HashMap<>();
this.nonWritableNodeFilter = new NonWritableNodeFilter(conf);
this.datanodesFinalized = new AtomicInteger(0);
}

@Override
Expand Down Expand Up @@ -422,6 +425,7 @@ public RegisteredCommand register(
try {
clusterMap.add(datanodeDetails);
nodeStateManager.addNode(datanodeDetails, layoutInfo);
datanodesFinalized.addAndGet(isDatanodeFinalized(layoutInfo) ? 1 : 0);
// Check that datanode in nodeStateManager has topology parent set
DatanodeDetails dn = nodeStateManager.getNode(datanodeDetails);
Preconditions.checkState(dn.getParent() != null);
Expand All @@ -447,7 +451,9 @@ public RegisteredCommand register(
hostName, ipAddress, dnId)) {
LOG.info("Updating datanode from {} to {}", oldNode, datanodeDetails);
clusterMap.update(oldNode, datanodeDetails);
LayoutVersionProto oldLayoutVersion = oldNode.getLastKnownLayoutVersion();
nodeStateManager.updateNode(datanodeDetails, layoutInfo);
updateDatanodesFinalizedCount(oldLayoutVersion, layoutInfo);
DatanodeDetails dn = nodeStateManager.getNode(datanodeDetails);
Preconditions.checkState(dn.getParent() != null);
processNodeReport(datanodeDetails, nodeReport);
Expand All @@ -457,7 +463,9 @@ public RegisteredCommand register(
LOG.info("Update the version for registered datanode {}, " +
"oldVersion = {}, newVersion = {}.",
datanodeDetails, oldNode.getVersion(), datanodeDetails.getVersion());
LayoutVersionProto oldLayoutVersion = oldNode.getLastKnownLayoutVersion();
nodeStateManager.updateNode(datanodeDetails, layoutInfo);
updateDatanodesFinalizedCount(oldLayoutVersion, layoutInfo);
}
} catch (NodeNotFoundException e) {
LOG.error("Cannot find datanode {} from nodeStateManager",
Expand Down Expand Up @@ -732,8 +740,11 @@ public void processLayoutVersionReport(DatanodeDetails datanodeDetails,
}

try {
DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails);
LayoutVersionProto oldLayoutVersion = datanodeInfo.getLastKnownLayoutVersion();
nodeStateManager.updateLastKnownLayoutVersion(datanodeDetails,
layoutVersionReport);
updateDatanodesFinalizedCount(oldLayoutVersion, layoutVersionReport);
} catch (NodeNotFoundException e) {
LOG.error("SCM trying to process Layout Version from an " +
"unregistered node {}.", datanodeDetails);
Expand Down Expand Up @@ -1053,6 +1064,28 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) {
}
}

@Override
public int getNumDatanodesFinalized() {
return datanodesFinalized.get();
}

private void updateDatanodesFinalizedCount(
LayoutVersionProto previous, LayoutVersionProto current) {
int previousValue = isDatanodeFinalized(previous) ? 1 : 0;
int currentValue = isDatanodeFinalized(current) ? 1 : 0;
if (previousValue != currentValue) {
datanodesFinalized.addAndGet(currentValue - previousValue);
}
}

public static boolean isDatanodeFinalized(LayoutVersionProto layoutVersion) {
if (layoutVersion == null) {
return false;
}
return layoutVersion.getMetadataLayoutVersion()
>= layoutVersion.getSoftwareLayoutVersion();
}

/**
* Return the node stat of the specified datanode.
*
Expand Down Expand Up @@ -1979,6 +2012,11 @@ public void removeNode(DatanodeDetails datanodeDetails) throws NodeNotFoundExcep
if (clusterMap.contains(datanodeDetails)) {
clusterMap.remove(datanodeDetails);
}
DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails);
if (datanodeInfo != null) {
updateDatanodesFinalizedCount(
datanodeInfo.getLastKnownLayoutVersion(), null);
}
nodeStateManager.removeNode(datanodeDetails.getID());
removeFromDnsToDnIdMap(datanodeDetails.getID(), datanodeDetails.getIpAddress());
final List<SCMCommand<?>> cmdList = getCommandQueue(datanodeDetails.getID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.audit.SCMAction;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalization;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalization.StatusAndMessages;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
Expand Down Expand Up @@ -1153,13 +1154,12 @@ public HddsProtos.UpgradeStatus queryUpgradeStatus() throws IOException {
try {
getScm().checkAdminAccess(getRemoteUser(), true);

// Returning a placeholder for now.
HddsProtos.UpgradeStatus result = HddsProtos.UpgradeStatus.newBuilder()
.setScmFinalized(true)
.setNumDatanodesFinalized(10)
.setNumDatanodesTotal(10)
.setShouldFinalize(true)
.build();
UpgradeFinalization.Status scmUpgradeStatus =
scm.getLayoutVersionManager().getUpgradeState();
int totalDatanodes = scm.getScmNodeManager().getAllNodeCount();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realised that nodeManager probably remembers any dead or decommissioned nodes and they will never finalize. We actually want the count of all IN_SERVICE and HEALTHY nodes as those are the ones that should finalize or end up going dead as they are not responding. So I think we need to use this API:

public int getNodeCount(NodeStatus status) {
    return countNodes(matching(status));
  }

Where the NodeStatus is IN_SERVICE + HEALTHY.

int finalizedDatanodes = scm.getScmNodeManager().getNumDatanodesFinalized();
HddsProtos.UpgradeStatus result = buildUpgradeStatus(
scmUpgradeStatus, finalizedDatanodes, totalDatanodes);
AUDIT.logReadSuccess(buildAuditMessageForSuccess(SCMAction.QUERY_UPGRADE_STATUS, null));
return result;
} catch (IOException ex) {
Expand All @@ -1168,6 +1168,29 @@ public HddsProtos.UpgradeStatus queryUpgradeStatus() throws IOException {
}
}

static HddsProtos.UpgradeStatus buildUpgradeStatus(
UpgradeFinalization.Status scmUpgradeStatus,
int finalizedDatanodes,
int totalDatanodes) {
return HddsProtos.UpgradeStatus.newBuilder()
.setScmFinalized(isScmFinalized(scmUpgradeStatus))
.setNumDatanodesFinalized(finalizedDatanodes)
.setNumDatanodesTotal(totalDatanodes)
.setShouldFinalize(shouldFinalize(scmUpgradeStatus, finalizedDatanodes, totalDatanodes))
.build();
}

static boolean isScmFinalized(UpgradeFinalization.Status scmUpgradeStatus) {
return UpgradeFinalization.isFinalized(scmUpgradeStatus)
|| UpgradeFinalization.isDone(scmUpgradeStatus);
}

static boolean shouldFinalize(UpgradeFinalization.Status scmUpgradeStatus,
int finalizedDatanodes, int totalDatanodes) {
return UpgradeFinalization.Status.FINALIZATION_REQUIRED.equals(scmUpgradeStatus)
&& finalizedDatanodes == totalDatanodes;
}

@Override
public StartContainerBalancerResponseProto startContainerBalancer(
Optional<Double> threshold, Optional<Integer> iterations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,65 @@ public void testProcessLayoutVersion() throws IOException {
testProcessLayoutVersionReportHigherMlv();
}

@Test
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a test for registration when a DN is finalized on registration, and not finalized on registration and when the remove_node is called too. This test covers the heartbeat part I think

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added tests for this scenario.

public void testDatanodeFinalizedCounterTracksLayoutVersionReports()
throws IOException, AuthenticationException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
DatanodeDetails node =
HddsTestUtils.createRandomDatanodeAndRegister(nodeManager);
assertEquals(1, nodeManager.getNumDatanodesFinalized(),
"Initial datanode should be counted as finalized");

int softwareVersion =
nodeManager.getLayoutVersionManager().getSoftwareLayoutVersion();
int metadataVersion =
nodeManager.getLayoutVersionManager().getMetadataLayoutVersion();
nodeManager.processLayoutVersionReport(node,
LayoutVersionProto.newBuilder()
.setMetadataLayoutVersion(metadataVersion - 1)
.setSoftwareLayoutVersion(softwareVersion)
.build());
assertEquals(0, nodeManager.getNumDatanodesFinalized(),
"Lower metadata layout version should decrement finalized count");

nodeManager.processLayoutVersionReport(node,
LayoutVersionProto.newBuilder()
.setMetadataLayoutVersion(metadataVersion)
.setSoftwareLayoutVersion(softwareVersion)
.build());
assertEquals(1, nodeManager.getNumDatanodesFinalized(),
"Restored metadata layout version should restore finalized count");
}
}

@Test
public void testDatanodeFinalizedCounterTracksRegistrationAndRemoveNode()
throws IOException, AuthenticationException, NodeNotFoundException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
DatanodeDetails finalizedNode =
registerWithCapacity(nodeManager, CORRECT_LAYOUT_PROTO, success);
assertEquals(1, nodeManager.getNumDatanodesFinalized(),
"Finalized registration should increment finalized count");

DatanodeDetails nonFinalizedNode =
registerWithCapacity(nodeManager, SMALLER_MLV_LAYOUT_PROTO, success);
assertEquals(1, nodeManager.getNumDatanodesFinalized(),
"Non-finalized registration should not increment finalized count");

nonFinalizedNode.setPersistedOpState(
HddsProtos.NodeOperationalState.DECOMMISSIONED);
nodeManager.removeNode(nonFinalizedNode);
assertEquals(1, nodeManager.getNumDatanodesFinalized(),
"Removing a non-finalized node should not change finalized count");

finalizedNode.setPersistedOpState(
HddsProtos.NodeOperationalState.DECOMMISSIONED);
nodeManager.removeNode(finalizedNode);
assertEquals(0, nodeManager.getNumDatanodesFinalized(),
"Removing a finalized node should decrement finalized count");
}
}

// Currently invoked by testProcessLayoutVersion.
public void testProcessLayoutVersionReportHigherMlv()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_READONLY_ADMINISTRATORS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
Expand All @@ -45,6 +46,7 @@
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalization;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -138,6 +140,78 @@ public void testScmListContainer() throws Exception {
HddsProtos.ReplicationFactor.THREE).getContainerInfoList().size());
}

@Test
public void testBuildUpgradeStatusMapsFinalizationRequired() {
HddsProtos.UpgradeStatus status = SCMClientProtocolServer.buildUpgradeStatus(
UpgradeFinalization.Status.FINALIZATION_REQUIRED, 1, 2);

assertFalse(status.getScmFinalized());
assertFalse(status.getShouldFinalize());
assertEquals(1, status.getNumDatanodesFinalized());
assertEquals(2, status.getNumDatanodesTotal());
}

@Test
public void testBuildUpgradeStatusMapsFinalizationRequiredAllNodesFinalized() {
HddsProtos.UpgradeStatus status = SCMClientProtocolServer.buildUpgradeStatus(
UpgradeFinalization.Status.FINALIZATION_REQUIRED, 3, 3);

assertFalse(status.getScmFinalized());
assertTrue(status.getShouldFinalize());
assertEquals(3, status.getNumDatanodesFinalized());
assertEquals(3, status.getNumDatanodesTotal());
}

@Test
public void testBuildUpgradeStatusMapsFinalizationInProgress() {
HddsProtos.UpgradeStatus status = SCMClientProtocolServer.buildUpgradeStatus(
UpgradeFinalization.Status.FINALIZATION_IN_PROGRESS, 1, 2);

assertFalse(status.getScmFinalized());
assertFalse(status.getShouldFinalize());
}

@Test
public void testBuildUpgradeStatusMapsStartingFinalization() {
HddsProtos.UpgradeStatus status = SCMClientProtocolServer.buildUpgradeStatus(
UpgradeFinalization.Status.STARTING_FINALIZATION, 1, 2);

assertFalse(status.getScmFinalized());
assertFalse(status.getShouldFinalize());
assertEquals(1, status.getNumDatanodesFinalized());
assertEquals(2, status.getNumDatanodesTotal());
}

@Test
public void testBuildUpgradeStatusMapsCompletedStatesToFinalized() {
HddsProtos.UpgradeStatus doneStatus =
SCMClientProtocolServer.buildUpgradeStatus(
UpgradeFinalization.Status.FINALIZATION_DONE, 2, 2);
HddsProtos.UpgradeStatus finalizedStatus =
SCMClientProtocolServer.buildUpgradeStatus(
UpgradeFinalization.Status.ALREADY_FINALIZED, 2, 2);

assertTrue(doneStatus.getScmFinalized());
assertFalse(doneStatus.getShouldFinalize());
assertTrue(finalizedStatus.getScmFinalized());
assertFalse(finalizedStatus.getShouldFinalize());
}

@Test
public void testBuildUpgradeStatusFromVersionManagerState() {
HddsProtos.UpgradeStatus needsFinalization =
SCMClientProtocolServer.buildUpgradeStatus(UpgradeFinalization.Status.FINALIZATION_REQUIRED, 1, 3);
assertFalse(needsFinalization.getScmFinalized());
assertFalse(needsFinalization.getShouldFinalize());
assertEquals(1, needsFinalization.getNumDatanodesFinalized());
assertEquals(3, needsFinalization.getNumDatanodesTotal());

HddsProtos.UpgradeStatus alreadyFinalized =
SCMClientProtocolServer.buildUpgradeStatus(UpgradeFinalization.Status.ALREADY_FINALIZED, 3, 3);
assertTrue(alreadyFinalized.getScmFinalized());
assertFalse(alreadyFinalized.getShouldFinalize());
}

private StorageContainerManager mockStorageContainerManager() {
List<ContainerInfo> infos = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ public class StatusSubCommand extends ScmSubcommand {
public void execute(ScmClient client) throws IOException {
HddsProtos.UpgradeStatus status = client.queryUpgradeStatus();

// Temporary output to validate the command is working.
out().println("Update status:");
out().println("Upgrade status:");
out().println(" SCM Finalized: " + status.getScmFinalized());
out().println(" Datanodes finalized: " + status.getNumDatanodesFinalized());
out().println(" Total Datanodes: " + status.getNumDatanodesTotal());
Expand Down
Loading
Loading