Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ public class ConfigNodeConfig {
/** Acceptable pause duration for Phi accrual failure detector */
private long failureDetectorPhiAcceptablePauseInMs = 10000;

/** Base interval in ms for topology probing. */
private long topologyProbingBaseIntervalInMs = 5000;

/** Ratio of probing timeout to probing interval (must be less than 1.0). */
private double topologyProbingTimeoutRatio = 0.5;

/** The policy of cluster RegionGroups' leader distribution. */
private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY;

Expand Down Expand Up @@ -1288,4 +1294,20 @@ public long getFailureDetectorPhiAcceptablePauseInMs() {
public void setFailureDetectorPhiAcceptablePauseInMs(long failureDetectorPhiAcceptablePauseInMs) {
this.failureDetectorPhiAcceptablePauseInMs = failureDetectorPhiAcceptablePauseInMs;
}

public long getTopologyProbingBaseIntervalInMs() {
return topologyProbingBaseIntervalInMs;
}

public void setTopologyProbingBaseIntervalInMs(long topologyProbingBaseIntervalInMs) {
this.topologyProbingBaseIntervalInMs = topologyProbingBaseIntervalInMs;
}

public double getTopologyProbingTimeoutRatio() {
return topologyProbingTimeoutRatio;
}

public void setTopologyProbingTimeoutRatio(double topologyProbingTimeoutRatio) {
this.topologyProbingTimeoutRatio = topologyProbingTimeoutRatio;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,18 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio
"failure_detector_phi_acceptable_pause_in_ms",
String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs()))));

conf.setTopologyProbingBaseIntervalInMs(
Long.parseLong(
properties.getProperty(
"topology_probing_base_interval_in_ms",
String.valueOf(conf.getTopologyProbingBaseIntervalInMs()))));

conf.setTopologyProbingTimeoutRatio(
Double.parseDouble(
properties.getProperty(
"topology_probing_timeout_ratio",
String.valueOf(conf.getTopologyProbingTimeoutRatio()))));

String leaderDistributionPolicy =
properties.getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy());
if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ public void updateTopology(Map<Integer, Set<Integer>> latestTopology) {
for (int fromId : latestTopology.keySet()) {
for (int toId : latestTopology.keySet()) {
boolean originReachable =
latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId);
topologyGraph.getOrDefault(fromId, Collections.emptySet()).contains(toId);
boolean newReachable =
latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId);
if (originReachable != newReachable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -168,13 +167,6 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() {
heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
}

final Map<Integer, Set<Integer>> topologyMap =
configManager.getLoadManager().getLoadCache().getTopology();
if (topologyMap != null) {
heartbeatReq.setTopology(topologyMap);
heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations());
}

// We broadcast region operations list every 100 heartbeat loops
if (heartbeatCounter.get() % 100 == 0) {
heartbeatReq.setCurrentRegionOperations(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@

import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
Expand All @@ -41,14 +45,19 @@
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;

import org.apache.ratis.util.AwaitForSignal;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -57,7 +66,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand All @@ -67,8 +75,6 @@

public class TopologyService implements Runnable, IClusterStatusSubscriber {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyService.class);
private static final long PROBING_INTERVAL_MS = 5_000L;
private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS;
private static final int SAMPLING_WINDOW_SIZE = 100;

private final ExecutorService topologyThread =
Expand All @@ -85,18 +91,31 @@

/* (fromDataNodeId, toDataNodeId) -> heartbeat history */
private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> heartbeats;
private final List<Integer> startingDataNodes = new CopyOnWriteArrayList<>();

private final IFailureDetector failureDetector;
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();

/** Rotation index for sqrt(N) prober selection. */
private int proberRotationIndex = 0;

/** Last topology pushed to each DataNode, used to detect changes. */
private final Map<Integer, Set<Integer>> lastPushedTopology = new ConcurrentHashMap<>();

/** Client manager for pushing topology updates to DataNodes via heartbeat RPC. */
private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
topologyPushClientManager;

public TopologyService(
IManager configManager, Consumer<Map<Integer, Set<Integer>>> topologyChangeListener) {
this.configManager = configManager;
this.topologyChangeListener = topologyChangeListener;
this.heartbeats = new ConcurrentHashMap<>();
this.shouldRun = new AtomicBoolean(false);
this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName());
this.topologyPushClientManager =
new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
.createClientManager(
new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory());

// here we use the same failure
switch (CONF.getFailureDetector()) {
Expand Down Expand Up @@ -133,15 +152,14 @@
}

/**
* Schedule the {@link #topologyProbing} task either: 1. every PROBING_INTERVAL_MS interval. 2.
* Schedule the {@link #topologyProbing} task either: 1. every adaptive probing interval. 2.
* Manually triggered by outside events (node restart / register, etc.).
*/
private boolean mayWait() {
try {
this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS);
this.awaitForSignal.await(CONF.getTopologyProbingBaseIntervalInMs(), TimeUnit.MILLISECONDS);
return true;
} catch (InterruptedException e) {
// we don't reset the interrupt flag here since we may reuse this thread again.
return false;
}
}
Expand All @@ -153,36 +171,68 @@
}
}

/**
* Select sqrt(N) DataNodes as probers, rotating through all DataNodes across cycles so that every
* DataNode gets to be a prober over sqrt(N) cycles.
*/
private List<TDataNodeLocation> selectProbers(List<TDataNodeLocation> allDataNodes) {
int n = allDataNodes.size();
if (n <= 1) {
return allDataNodes;
}
int sqrtN = (int) Math.ceil(Math.sqrt(n));
List<TDataNodeLocation> sorted = new ArrayList<>(allDataNodes);
sorted.sort(Comparator.comparingInt(TDataNodeLocation::getDataNodeId));
int startIndex = (proberRotationIndex * sqrtN) % n;
proberRotationIndex++;
List<TDataNodeLocation> probers = new ArrayList<>(sqrtN);
for (int i = 0; i < sqrtN && i < n; i++) {
probers.add(sorted.get((startIndex + i) % n));
}
return probers;
}

private synchronized void topologyProbing() {
// 1. get the latest datanode list
// 1. get Running DataNodes only
final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
final Set<Integer> dataNodeIds = new HashSet<>();
for (final TDataNodeConfiguration dataNodeConf :
configManager.getNodeManager().getRegisteredDataNodes()) {
final TDataNodeLocation location = dataNodeConf.getLocation();
if (startingDataNodes.contains(location.getDataNodeId())) {
continue; // we shall wait for internal endpoint to be ready
if (configManager.getLoadManager().getNodeStatus(location.getDataNodeId())
!= NodeStatus.Running) {
continue;
}
dataNodeLocations.add(location);
dataNodeIds.add(location.getDataNodeId());
}

// 2. send the verify connection RPC to all datanodes
// 2. compute probing timeout
final long baseInterval = CONF.getTopologyProbingBaseIntervalInMs();
final long timeout = (long) (baseInterval * CONF.getTopologyProbingTimeoutRatio());

// 3. select sqrt(N) probers via rotating selection
final List<TDataNodeLocation> probers = selectProbers(dataNodeLocations);

// 4. build TNodeLocations with ALL DataNode locations (so probers test all targets)
final TNodeLocations nodeLocations = new TNodeLocations();
nodeLocations.setDataNodeLocations(dataNodeLocations);
nodeLocations.setConfigNodeLocations(Collections.emptyList());
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodes().stream()
.map(TDataNodeConfiguration::getLocation)

// 5. build proberLocationMap containing only the selected probers
final Map<Integer, TDataNodeLocation> proberLocationMap =
probers.stream()
.collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, location -> location));

// 6. send async requests ONLY to probers (not all DataNodes) with computed timeout
final DataNodeAsyncRequestContext<TNodeLocations, TTestConnectionResp>
dataNodeAsyncRequestContext =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
nodeLocations,
dataNodeLocationMap);
proberLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance()
.sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, PROBING_TIMEOUT_MS);
.sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, timeout);
final List<TTestConnectionResult> results = new ArrayList<>();
dataNodeAsyncRequestContext
.getResponseMap()
Expand All @@ -193,7 +243,7 @@
}
});

// 3. collect results and update the heartbeat timestamps
// 7. collect results and update the heartbeat timestamps
for (final TTestConnectionResult result : results) {
final int fromDataNodeId =
Optional.ofNullable(result.getSender().getDataNodeLocation())
Expand All @@ -203,8 +253,6 @@
if (result.isSuccess()
&& dataNodeIds.contains(fromDataNodeId)
&& dataNodeIds.contains(toDataNodeId)) {
// testAllDataNodeConnectionWithTimeout ensures the heartbeats are Dn-Dn internally. Here we
// just double-check.
final List<AbstractHeartbeatSample> heartbeatHistory =
heartbeats.computeIfAbsent(
new Pair<>(fromDataNodeId, toDataNodeId), p -> new LinkedList<>());
Expand All @@ -215,7 +263,7 @@
}
}

// 4. use failure detector to identify potential network partitions
// 8. use failure detector to identify potential network partitions (on ALL heartbeat pairs)
final Map<Integer, Set<Integer>> latestTopology =
dataNodeLocations.stream()
.collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> new HashSet<>()));
Expand All @@ -234,10 +282,62 @@

logAsymmetricPartition(latestTopology);

// 5. notify the listeners on topology change
// 9. notify the listeners on topology change
if (shouldRun.get()) {
topologyChangeListener.accept(latestTopology);
}

// 10. push topology changes to DataNodes
pushTopologyToDataNodes(latestTopology, dataNodeLocations);
}

/**
* Push topology changes to DataNodes via heartbeat RPC. Each DataNode only receives a push when
* its own reachable set has changed since the last push.
*/
private void pushTopologyToDataNodes(
Map<Integer, Set<Integer>> latestTopology, List<TDataNodeLocation> dataNodeLocations) {
final Map<Integer, TDataNodeLocation> dataNodesMap =
dataNodeLocations.stream()
.collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, loc -> loc));

for (final TDataNodeLocation location : dataNodeLocations) {
final int nodeId = location.getDataNodeId();
final Set<Integer> reachableSet = latestTopology.getOrDefault(nodeId, Collections.emptySet());
final Set<Integer> lastPushed = lastPushedTopology.get(nodeId);

if (lastPushed != null && lastPushed.equals(reachableSet)) {
continue;
}

// Per-node topology: only this DataNode's own reachable set
final Map<Integer, Set<Integer>> perNodeTopology = new HashMap<>();
perNodeTopology.put(nodeId, new HashSet<>(reachableSet));

final TDataNodeHeartbeatReq req =
new TDataNodeHeartbeatReq(System.nanoTime(), false, false, 0L, 0L);
req.setTopology(perNodeTopology);
req.setDataNodes(dataNodesMap);

final TEndPoint endPoint = location.getInternalEndPoint();
try {
topologyPushClientManager
.borrowClient(endPoint)
.getDataNodeHeartBeat(
req,
new AsyncMethodCallback<TDataNodeHeartbeatResp>() {
@Override
public void onComplete(TDataNodeHeartbeatResp response) {}

Check failure on line 330 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ33SDXZSe0xLbnEAnIh&open=AZ33SDXZSe0xLbnEAnIh&pullRequest=17595

@Override
public void onError(Exception exception) {}

Check failure on line 333 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ33SDXZSe0xLbnEAnIi&open=AZ33SDXZSe0xLbnEAnIi&pullRequest=17595
});
lastPushedTopology.put(nodeId, new HashSet<>(reachableSet));
} catch (Exception e) {
LOGGER.debug(
"Failed to push topology to DataNode {} at {}: {}", nodeId, endPoint, e.getMessage());
}
}
}

/**
Expand Down Expand Up @@ -287,11 +387,7 @@
continue;
}
if (changeEvent.getLeft() == null) {
// if a new datanode registered, DO NOT trigger probing immediately
startingDataNodes.add(nodeId);
continue;
} else {
startingDataNodes.remove(nodeId);
}

final Set<Pair<Integer, Integer>> affectedPairs =
Expand All @@ -303,13 +399,10 @@
.collect(Collectors.toSet());

if (changeEvent.getRight() == null) {
// datanode removed from cluster, clean up probing history
affectedPairs.forEach(heartbeats::remove);
} else {
// we only trigger probing immediately if node comes around from UNKNOWN to RUNNING
if (NodeStatus.Unknown.equals(changeEvent.getLeft().getStatus())
&& NodeStatus.Running.equals(changeEvent.getRight().getStatus())) {
// let's clear the history when a new node comes around
affectedPairs.forEach(pair -> heartbeats.put(pair, new ArrayList<>()));
awaitForSignal.signal();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
Expand Down Expand Up @@ -377,8 +376,7 @@ public void stopAllPipesWithCriticalExceptionAndTrackException(
///////////////////////// Heartbeat /////////////////////////

public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws TException {
if (!tryReadLockWithTimeOut(
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) {
if (!tryReadLockWithTimeOut(2)) {
return;
}
try {
Expand Down
Loading
Loading