From 2ddad6d48ebe1c580491a8249c2a96102e901052 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 5 May 2026 13:58:28 +0800 Subject: [PATCH 1/3] ready 4 review --- .../confignode/conf/ConfigNodeConfig.java | 33 ++++ .../confignode/conf/ConfigNodeDescriptor.java | 18 +++ .../manager/load/cache/LoadCache.java | 2 +- .../load/service/HeartbeatService.java | 8 +- .../manager/load/service/TopologyService.java | 143 ++++++++++++++++-- .../agent/task/PipeDataNodeTaskAgent.java | 4 +- .../impl/DataNodeInternalRPCServiceImpl.java | 42 ++++- .../db/queryengine/plan/ClusterTopology.java | 2 +- .../commons/client/ClientPoolFactory.java | 6 +- .../iotdb/commons/concurrent/ThreadName.java | 1 + .../iotdb/commons/conf/CommonConfig.java | 10 ++ .../iotdb/commons/conf/CommonDescriptor.java | 6 + 12 files changed, 237 insertions(+), 38 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index f305b06398db9..306e6cd895f8f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -204,6 +204,15 @@ public class ConfigNodeConfig { /** Acceptable pause duration for Phi accrual failure detector */ private long failureDetectorPhiAcceptablePauseInMs = 10000; + /** Base interval in ms for topology probing. Actual interval scales with DataNode count. */ + private long topologyProbingBaseIntervalInMs = 5000; + + /** Reference DataNode count for adaptive probing interval scaling. */ + private int topologyProbingReferenceNodeCount = 10; + + /** Ratio of probing timeout to probing interval (must be less than 1.0). */ + private double topologyProbingTimeoutRatio = 0.5; + /** The policy of cluster RegionGroups' leader distribution. */ private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY; @@ -1288,4 +1297,28 @@ public long getFailureDetectorPhiAcceptablePauseInMs() { public void setFailureDetectorPhiAcceptablePauseInMs(long failureDetectorPhiAcceptablePauseInMs) { this.failureDetectorPhiAcceptablePauseInMs = failureDetectorPhiAcceptablePauseInMs; } + + public long getTopologyProbingBaseIntervalInMs() { + return topologyProbingBaseIntervalInMs; + } + + public void setTopologyProbingBaseIntervalInMs(long topologyProbingBaseIntervalInMs) { + this.topologyProbingBaseIntervalInMs = topologyProbingBaseIntervalInMs; + } + + public int getTopologyProbingReferenceNodeCount() { + return topologyProbingReferenceNodeCount; + } + + public void setTopologyProbingReferenceNodeCount(int topologyProbingReferenceNodeCount) { + this.topologyProbingReferenceNodeCount = topologyProbingReferenceNodeCount; + } + + public double getTopologyProbingTimeoutRatio() { + return topologyProbingTimeoutRatio; + } + + public void setTopologyProbingTimeoutRatio(double topologyProbingTimeoutRatio) { + this.topologyProbingTimeoutRatio = topologyProbingTimeoutRatio; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 77790dae1a903..506131a583a7e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -322,6 +322,24 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio "failure_detector_phi_acceptable_pause_in_ms", String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs())))); + conf.setTopologyProbingBaseIntervalInMs( + Long.parseLong( + properties.getProperty( + "topology_probing_base_interval_in_ms", + String.valueOf(conf.getTopologyProbingBaseIntervalInMs())))); + + conf.setTopologyProbingReferenceNodeCount( + Integer.parseInt( + properties.getProperty( + "topology_probing_reference_node_count", + String.valueOf(conf.getTopologyProbingReferenceNodeCount())))); + + conf.setTopologyProbingTimeoutRatio( + Double.parseDouble( + properties.getProperty( + "topology_probing_timeout_ratio", + String.valueOf(conf.getTopologyProbingTimeoutRatio())))); + String leaderDistributionPolicy = properties.getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy()); if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 07105d96bbd1d..41a0dbc5c4671 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -787,7 +787,7 @@ public void updateTopology(Map> latestTopology) { for (int fromId : latestTopology.keySet()) { for (int toId : latestTopology.keySet()) { boolean originReachable = - latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); + topologyGraph.getOrDefault(fromId, Collections.emptySet()).contains(toId); boolean newReachable = latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); if (originReachable != newReachable) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index 64322da5bbb20..e5f65152c875c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -49,7 +49,6 @@ import org.slf4j.LoggerFactory; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.Future; @@ -168,12 +167,7 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() { heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage()); } - final Map> topologyMap = - configManager.getLoadManager().getLoadCache().getTopology(); - if (topologyMap != null) { - heartbeatReq.setTopology(topologyMap); - heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations()); - } + // Topology is now pushed independently by TopologyService, no longer piggybacked on heartbeat // We broadcast region operations list every 100 heartbeat loops if (heartbeatCounter.get() % 100 == 0) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 9e4f6bd6121ab..1fd716fc7fc92 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -21,9 +21,13 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp; import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult; +import org.apache.iotdb.commons.client.ClientPoolFactory; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; @@ -41,14 +45,18 @@ import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent; +import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq; +import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; import org.apache.ratis.util.AwaitForSignal; +import org.apache.thrift.async.AsyncMethodCallback; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -67,8 +75,6 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { private static final Logger LOGGER = LoggerFactory.getLogger(TopologyService.class); - private static final long PROBING_INTERVAL_MS = 5_000L; - private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS; private static final int SAMPLING_WINDOW_SIZE = 100; private final ExecutorService topologyThread = @@ -90,6 +96,16 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { private final IFailureDetector failureDetector; private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + /** Rotation index for sqrt(N) prober selection. */ + private int proberRotationIndex = 0; + + /** Last topology pushed to each DataNode, used to detect changes. */ + private final Map> lastPushedTopology = new ConcurrentHashMap<>(); + + /** Client manager for pushing topology updates to DataNodes via heartbeat RPC. */ + private final IClientManager + topologyPushClientManager; + public TopologyService( IManager configManager, Consumer>> topologyChangeListener) { this.configManager = configManager; @@ -97,6 +113,10 @@ public TopologyService( this.heartbeats = new ConcurrentHashMap<>(); this.shouldRun = new AtomicBoolean(false); this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName()); + this.topologyPushClientManager = + new IClientManager.Factory() + .createClientManager( + new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory()); // here we use the same failure switch (CONF.getFailureDetector()) { @@ -133,12 +153,17 @@ public synchronized void stopTopologyService() { } /** - * Schedule the {@link #topologyProbing} task either: 1. every PROBING_INTERVAL_MS interval. 2. + * Schedule the {@link #topologyProbing} task either: 1. every adaptive probing interval. 2. * Manually triggered by outside events (node restart / register, etc.). */ private boolean mayWait() { try { - this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS); + long baseInterval = CONF.getTopologyProbingBaseIntervalInMs(); + int dataNodeCount = + configManager.getNodeManager().getRegisteredDataNodes().size() - startingDataNodes.size(); + int referenceNodeCount = CONF.getTopologyProbingReferenceNodeCount(); + long interval = Math.max(baseInterval, baseInterval * dataNodeCount / referenceNodeCount); + this.awaitForSignal.await(interval, TimeUnit.MILLISECONDS); return true; } catch (InterruptedException e) { // we don't reset the interrupt flag here since we may reuse this thread again. @@ -153,8 +178,29 @@ public void run() { } } + /** + * Select sqrt(N) DataNodes as probers, rotating through all DataNodes across cycles so that every + * DataNode gets to be a prober over sqrt(N) cycles. + */ + private List selectProbers(List allDataNodes) { + int n = allDataNodes.size(); + if (n <= 1) { + return allDataNodes; + } + int sqrtN = (int) Math.ceil(Math.sqrt(n)); + List sorted = new ArrayList<>(allDataNodes); + sorted.sort(Comparator.comparingInt(TDataNodeLocation::getDataNodeId)); + int startIndex = (proberRotationIndex * sqrtN) % n; + proberRotationIndex++; + List probers = new ArrayList<>(sqrtN); + for (int i = 0; i < sqrtN && i < n; i++) { + probers.add(sorted.get((startIndex + i) % n)); + } + return probers; + } + private synchronized void topologyProbing() { - // 1. get the latest datanode list + // 1. get the latest datanode list, filter out starting ones final List dataNodeLocations = new ArrayList<>(); final Set dataNodeIds = new HashSet<>(); for (final TDataNodeConfiguration dataNodeConf : @@ -167,22 +213,36 @@ private synchronized void topologyProbing() { dataNodeIds.add(location.getDataNodeId()); } - // 2. send the verify connection RPC to all datanodes + // 2. compute adaptive interval and timeout from N = dataNodeLocations.size() + final long baseInterval = CONF.getTopologyProbingBaseIntervalInMs(); + final int referenceNodeCount = CONF.getTopologyProbingReferenceNodeCount(); + final double timeoutRatio = CONF.getTopologyProbingTimeoutRatio(); + final int dataNodeCount = dataNodeLocations.size(); + final long interval = Math.max(baseInterval, baseInterval * dataNodeCount / referenceNodeCount); + final long timeout = (long) (interval * timeoutRatio); + + // 3. select sqrt(N) probers via rotating selection + final List probers = selectProbers(dataNodeLocations); + + // 4. build TNodeLocations with ALL DataNode locations (so probers test all targets) final TNodeLocations nodeLocations = new TNodeLocations(); nodeLocations.setDataNodeLocations(dataNodeLocations); nodeLocations.setConfigNodeLocations(Collections.emptyList()); - final Map dataNodeLocationMap = - configManager.getNodeManager().getRegisteredDataNodes().stream() - .map(TDataNodeConfiguration::getLocation) + + // 5. build proberLocationMap containing only the selected probers + final Map proberLocationMap = + probers.stream() .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, location -> location)); + + // 6. send async requests ONLY to probers (not all DataNodes) with computed timeout final DataNodeAsyncRequestContext dataNodeAsyncRequestContext = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK, nodeLocations, - dataNodeLocationMap); + proberLocationMap); CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, PROBING_TIMEOUT_MS); + .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, timeout); final List results = new ArrayList<>(); dataNodeAsyncRequestContext .getResponseMap() @@ -193,7 +253,7 @@ private synchronized void topologyProbing() { } }); - // 3. collect results and update the heartbeat timestamps + // 7. collect results and update the heartbeat timestamps for (final TTestConnectionResult result : results) { final int fromDataNodeId = Optional.ofNullable(result.getSender().getDataNodeLocation()) @@ -203,8 +263,6 @@ private synchronized void topologyProbing() { if (result.isSuccess() && dataNodeIds.contains(fromDataNodeId) && dataNodeIds.contains(toDataNodeId)) { - // testAllDataNodeConnectionWithTimeout ensures the heartbeats are Dn-Dn internally. Here we - // just double-check. final List heartbeatHistory = heartbeats.computeIfAbsent( new Pair<>(fromDataNodeId, toDataNodeId), p -> new LinkedList<>()); @@ -215,7 +273,7 @@ private synchronized void topologyProbing() { } } - // 4. use failure detector to identify potential network partitions + // 8. use failure detector to identify potential network partitions (on ALL heartbeat pairs) final Map> latestTopology = dataNodeLocations.stream() .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> new HashSet<>())); @@ -234,10 +292,63 @@ private synchronized void topologyProbing() { logAsymmetricPartition(latestTopology); - // 5. notify the listeners on topology change + // 9. notify the listeners on topology change if (shouldRun.get()) { topologyChangeListener.accept(latestTopology); } + + // 10. push topology changes to DataNodes + pushTopologyToDataNodes(latestTopology, dataNodeLocations); + } + + /** + * Push topology changes to DataNodes via heartbeat RPC. Each DataNode only receives a push when + * its own reachable set has changed since the last push. + */ + private void pushTopologyToDataNodes( + Map> latestTopology, List dataNodeLocations) { + // Build dataNodes map once for all pushes + final Map dataNodesMap = + dataNodeLocations.stream() + .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, loc -> loc)); + + for (final TDataNodeLocation location : dataNodeLocations) { + final int nodeId = location.getDataNodeId(); + final Set reachableSet = latestTopology.getOrDefault(nodeId, Collections.emptySet()); + final Set lastPushed = lastPushedTopology.get(nodeId); + + if (lastPushed != null && lastPushed.equals(reachableSet)) { + continue; + } + + final TDataNodeHeartbeatReq req = + new TDataNodeHeartbeatReq(System.nanoTime(), false, false, 0L, 0L); + req.setTopology(latestTopology); + req.setDataNodes(dataNodesMap); + + final TEndPoint endPoint = location.getInternalEndPoint(); + try { + topologyPushClientManager + .borrowClient(endPoint) + .getDataNodeHeartBeat( + req, + new AsyncMethodCallback() { + @Override + public void onComplete(TDataNodeHeartbeatResp response) { + // No-op: topology push is fire-and-forget + } + + @Override + public void onError(Exception exception) { + // No-op: topology push failures are silently ignored + } + }); + lastPushedTopology.put(nodeId, new HashSet<>(reachableSet)); + } catch (Exception e) { + LOGGER.debug( + "Failed to push topology to DataNode {} at {}: {}", nodeId, endPoint, e.getMessage()); + } + } } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 031cfd3a62e0e..bc0a619afd892 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.concurrent.IoTThreadFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; -import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; @@ -377,8 +376,7 @@ public void stopAllPipesWithCriticalExceptionAndTrackException( ///////////////////////// Heartbeat ///////////////////////// public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws TException { - if (!tryReadLockWithTimeOut( - CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) { + if (!tryReadLockWithTimeOut(2)) { return; } try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 84479c9dcd22a..e889d936ace5f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -52,6 +52,7 @@ import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.Await; import org.apache.iotdb.commons.concurrent.AwaitTimeoutException; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.IoTThreadFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; @@ -417,6 +418,12 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface private final ClusterTopology clusterTopology = ClusterTopology.getInstance(); + private static final long TEST_CONNECTION_TIMEOUT_MS = + CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS(); + + private static final ExecutorService TOPOLOGY_PROBING_EXECUTOR = + IoTDBThreadPoolFactory.newFixedThreadPool(2, ThreadName.DATANODE_TOPOLOGY_PROBING.getName()); + private final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); private final DataNodeContext dataNodeContext; @@ -2076,9 +2083,23 @@ public TTestConnectionResp submitTestConnectionTask(final TNodeLocations nodeLoc @Override public TTestConnectionResp submitInternalTestConnectionTask(TNodeLocations nodeLocations) throws TException { - return new TTestConnectionResp( - new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), - testAllDataNodeConnectionInHeartbeatChannel(nodeLocations.getDataNodeLocations())); + try { + Future future = + TOPOLOGY_PROBING_EXECUTOR.submit( + () -> + new TTestConnectionResp( + new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), + testAllDataNodeConnectionInHeartbeatChannel( + nodeLocations.getDataNodeLocations()))); + return future.get(TEST_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + return new TTestConnectionResp( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage("Topology probing timed out after " + TEST_CONNECTION_TIMEOUT_MS + "ms"), + Collections.emptyList()); + } catch (Exception e) { + throw new TException(e); + } } private static List testConnections( @@ -2105,7 +2126,8 @@ private List testAllConfigNodeConnection( TServiceType.ConfigNodeInternalService, DnToCnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> - DnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler)); + DnToCnInternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); } private List testAllDataNodeConnectionInHeartbeatChannel( @@ -2117,7 +2139,8 @@ private List testAllDataNodeConnectionInHeartbeatChannel( TServiceType.DataNodeInternalService, DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> - DataNodeIntraHeartbeatManager.getInstance().sendAsyncRequest(handler, 1, null, true)); + DataNodeIntraHeartbeatManager.getInstance() + .sendAsyncRequest(handler, 1, TEST_CONNECTION_TIMEOUT_MS, true)); } private List testAllDataNodeInternalServiceConnection( @@ -2129,7 +2152,8 @@ private List testAllDataNodeInternalServiceConnection( TServiceType.DataNodeInternalService, DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> - DnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler)); + DnToDnInternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); } private List testAllDataNodeMPPServiceConnection( @@ -2141,7 +2165,8 @@ private List testAllDataNodeMPPServiceConnection( TServiceType.DataNodeMPPService, DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> - DataNodeMPPServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler)); + DataNodeMPPServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); } private List testAllDataNodeExternalServiceConnection( @@ -2153,7 +2178,8 @@ private List testAllDataNodeExternalServiceConnection( TServiceType.DataNodeExternalService, DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> - DataNodeExternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler)); + DataNodeExternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java index 64ea8b8a2ea11..194d9f8c6cdf0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java @@ -152,7 +152,7 @@ public void updateTopology( for (int fromId : dataNodes.keySet()) { for (int toId : dataNodes.keySet()) { boolean originReachable = - latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); + this.topologyMap.get().getOrDefault(fromId, Collections.emptySet()).contains(toId); boolean newReachable = latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); if (originReachable != newReachable) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index ea20f9c76ccd8..f9ffca123b5c7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -182,7 +182,8 @@ public GenericKeyedObjectPool c new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager( + conf.getHeartbeatSelectorNumOfClientManager()) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()), @@ -207,7 +208,8 @@ public GenericKeyedObjectPool cre new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager( + conf.getHeartbeatSelectorNumOfClientManager()) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 20fa9d78d5927..ad22c8b20ae71 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -196,6 +196,7 @@ public enum ThreadName { INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"), STORAGE_ENGINE_CACHED_POOL("StorageEngine"), DATANODE_SHUTDOWN_HOOK("DataNode-Shutdown-Hook"), + DATANODE_TOPOLOGY_PROBING("DataNode-Topology-Probing"), UPGRADE_TASK("UpgradeThread"), REGION_MIGRATE("Region-Migrate-Pool"), STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 19313723e55f5..598c9d5f02134 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -155,6 +155,8 @@ public class CommonConfig { */ private int selectorNumOfClientManager = 1; + private int heartbeatSelectorNumOfClientManager = 4; + /** Whether to use thrift compression. */ private boolean isRpcThriftCompressionEnabled = false; @@ -697,6 +699,14 @@ public void setSelectorNumOfClientManager(int selectorNumOfClientManager) { this.selectorNumOfClientManager = selectorNumOfClientManager; } + public int getHeartbeatSelectorNumOfClientManager() { + return heartbeatSelectorNumOfClientManager; + } + + public void setHeartbeatSelectorNumOfClientManager(int heartbeatSelectorNumOfClientManager) { + this.heartbeatSelectorNumOfClientManager = heartbeatSelectorNumOfClientManager; + } + public boolean isRpcThriftCompressionEnabled() { return isRpcThriftCompressionEnabled; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index d392a60bbbd76..11db54be34f81 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -178,6 +178,12 @@ public void loadCommonProps(TrimProperties properties) throws IOException { String.valueOf(config.getSelectorNumOfClientManager())) .trim())); + config.setHeartbeatSelectorNumOfClientManager( + Integer.parseInt( + properties.getProperty( + "heartbeat_selector_num_of_client_manager", + String.valueOf(config.getHeartbeatSelectorNumOfClientManager())))); + config.setMaxClientNumForEachNode( Integer.parseInt( properties From 3e5f0d733fc741898160b445b269c9ea69630a8a Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 5 May 2026 15:18:42 +0800 Subject: [PATCH 2/3] update --- .../iotdb/confignode/conf/ConfigNodeConfig.java | 13 +------------ .../confignode/conf/ConfigNodeDescriptor.java | 6 ------ .../manager/load/service/HeartbeatService.java | 2 -- .../manager/load/service/TopologyService.java | 16 +++------------- .../impl/DataNodeInternalRPCServiceImpl.java | 1 + .../conf/iotdb-system.properties.template | 16 ++++++++++++++++ .../apache/iotdb/commons/conf/CommonConfig.java | 5 ++++- 7 files changed, 25 insertions(+), 34 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 306e6cd895f8f..4dd5e78372f46 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -204,12 +204,9 @@ public class ConfigNodeConfig { /** Acceptable pause duration for Phi accrual failure detector */ private long failureDetectorPhiAcceptablePauseInMs = 10000; - /** Base interval in ms for topology probing. Actual interval scales with DataNode count. */ + /** Base interval in ms for topology probing. */ private long topologyProbingBaseIntervalInMs = 5000; - /** Reference DataNode count for adaptive probing interval scaling. */ - private int topologyProbingReferenceNodeCount = 10; - /** Ratio of probing timeout to probing interval (must be less than 1.0). */ private double topologyProbingTimeoutRatio = 0.5; @@ -1306,14 +1303,6 @@ public void setTopologyProbingBaseIntervalInMs(long topologyProbingBaseIntervalI this.topologyProbingBaseIntervalInMs = topologyProbingBaseIntervalInMs; } - public int getTopologyProbingReferenceNodeCount() { - return topologyProbingReferenceNodeCount; - } - - public void setTopologyProbingReferenceNodeCount(int topologyProbingReferenceNodeCount) { - this.topologyProbingReferenceNodeCount = topologyProbingReferenceNodeCount; - } - public double getTopologyProbingTimeoutRatio() { return topologyProbingTimeoutRatio; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 506131a583a7e..591a4cb817ec2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -328,12 +328,6 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio "topology_probing_base_interval_in_ms", String.valueOf(conf.getTopologyProbingBaseIntervalInMs())))); - conf.setTopologyProbingReferenceNodeCount( - Integer.parseInt( - properties.getProperty( - "topology_probing_reference_node_count", - String.valueOf(conf.getTopologyProbingReferenceNodeCount())))); - conf.setTopologyProbingTimeoutRatio( Double.parseDouble( properties.getProperty( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index e5f65152c875c..100b0cbf384e0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -167,8 +167,6 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() { heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage()); } - // Topology is now pushed independently by TopologyService, no longer piggybacked on heartbeat - // We broadcast region operations list every 100 heartbeat loops if (heartbeatCounter.get() % 100 == 0) { heartbeatReq.setCurrentRegionOperations( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 1fd716fc7fc92..76d943e24c224 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -158,15 +158,9 @@ public synchronized void stopTopologyService() { */ private boolean mayWait() { try { - long baseInterval = CONF.getTopologyProbingBaseIntervalInMs(); - int dataNodeCount = - configManager.getNodeManager().getRegisteredDataNodes().size() - startingDataNodes.size(); - int referenceNodeCount = CONF.getTopologyProbingReferenceNodeCount(); - long interval = Math.max(baseInterval, baseInterval * dataNodeCount / referenceNodeCount); - this.awaitForSignal.await(interval, TimeUnit.MILLISECONDS); + this.awaitForSignal.await(CONF.getTopologyProbingBaseIntervalInMs(), TimeUnit.MILLISECONDS); return true; } catch (InterruptedException e) { - // we don't reset the interrupt flag here since we may reuse this thread again. return false; } } @@ -213,13 +207,9 @@ private synchronized void topologyProbing() { dataNodeIds.add(location.getDataNodeId()); } - // 2. compute adaptive interval and timeout from N = dataNodeLocations.size() + // 2. compute probing timeout final long baseInterval = CONF.getTopologyProbingBaseIntervalInMs(); - final int referenceNodeCount = CONF.getTopologyProbingReferenceNodeCount(); - final double timeoutRatio = CONF.getTopologyProbingTimeoutRatio(); - final int dataNodeCount = dataNodeLocations.size(); - final long interval = Math.max(baseInterval, baseInterval * dataNodeCount / referenceNodeCount); - final long timeout = (long) (interval * timeoutRatio); + final long timeout = (long) (baseInterval * CONF.getTopologyProbingTimeoutRatio()); // 3. select sqrt(N) probers via rotating selection final List probers = selectProbers(dataNodeLocations); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index e889d936ace5f..2e0f6fd84b5d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -374,6 +374,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 87be30f4520e0..ae940226ab89c 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -722,6 +722,22 @@ failure_detector_phi_threshold=30 # Datatype: long failure_detector_phi_acceptable_pause_in_ms=10000 +# Base interval in ms for topology probing between DataNodes +# effectiveMode: restart +# Datatype: long +# topology_probing_base_interval_in_ms=5000 + +# Ratio of probing timeout to probing interval (must be less than 1.0) +# effectiveMode: restart +# Datatype: double +# topology_probing_timeout_ratio=0.5 + +# Selector thread (TAsyncClientManager) count for heartbeat async client pools. +# 0 means auto: max(1, availableProcessors / 4) +# effectiveMode: restart +# Datatype: int +# heartbeat_selector_num_of_client_manager=0 + # Disk remaining threshold at which DataNode is set to ReadOnly status # effectiveMode: restart # Datatype: double(percentage) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 598c9d5f02134..f91f6156d753c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -155,7 +155,7 @@ public class CommonConfig { */ private int selectorNumOfClientManager = 1; - private int heartbeatSelectorNumOfClientManager = 4; + private int heartbeatSelectorNumOfClientManager = 0; /** Whether to use thrift compression. */ private boolean isRpcThriftCompressionEnabled = false; @@ -700,6 +700,9 @@ public void setSelectorNumOfClientManager(int selectorNumOfClientManager) { } public int getHeartbeatSelectorNumOfClientManager() { + if (heartbeatSelectorNumOfClientManager <= 0) { + return Math.max(1, Runtime.getRuntime().availableProcessors() / 4); + } return heartbeatSelectorNumOfClientManager; } From b0e2df75ae8d26e515f0a4253180b93bdb39ffaf Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 5 May 2026 16:29:14 +0800 Subject: [PATCH 3/3] better impl 4 topology service --- .../manager/load/service/TopologyService.java | 32 +++---- .../impl/DataNodeInternalRPCServiceImpl.java | 4 +- .../db/queryengine/plan/ClusterTopology.java | 89 +++++++------------ 3 files changed, 45 insertions(+), 80 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 76d943e24c224..8203f83402e6c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -57,6 +57,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -65,7 +66,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -91,7 +91,6 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { /* (fromDataNodeId, toDataNodeId) -> heartbeat history */ private final Map, List> heartbeats; - private final List startingDataNodes = new CopyOnWriteArrayList<>(); private final IFailureDetector failureDetector; private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); @@ -194,14 +193,15 @@ private List selectProbers(List allDataNod } private synchronized void topologyProbing() { - // 1. get the latest datanode list, filter out starting ones + // 1. get Running DataNodes only final List dataNodeLocations = new ArrayList<>(); final Set dataNodeIds = new HashSet<>(); for (final TDataNodeConfiguration dataNodeConf : configManager.getNodeManager().getRegisteredDataNodes()) { final TDataNodeLocation location = dataNodeConf.getLocation(); - if (startingDataNodes.contains(location.getDataNodeId())) { - continue; // we shall wait for internal endpoint to be ready + if (configManager.getLoadManager().getNodeStatus(location.getDataNodeId()) + != NodeStatus.Running) { + continue; } dataNodeLocations.add(location); dataNodeIds.add(location.getDataNodeId()); @@ -297,7 +297,6 @@ private synchronized void topologyProbing() { */ private void pushTopologyToDataNodes( Map> latestTopology, List dataNodeLocations) { - // Build dataNodes map once for all pushes final Map dataNodesMap = dataNodeLocations.stream() .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, loc -> loc)); @@ -311,9 +310,13 @@ private void pushTopologyToDataNodes( continue; } + // Per-node topology: only this DataNode's own reachable set + final Map> perNodeTopology = new HashMap<>(); + perNodeTopology.put(nodeId, new HashSet<>(reachableSet)); + final TDataNodeHeartbeatReq req = new TDataNodeHeartbeatReq(System.nanoTime(), false, false, 0L, 0L); - req.setTopology(latestTopology); + req.setTopology(perNodeTopology); req.setDataNodes(dataNodesMap); final TEndPoint endPoint = location.getInternalEndPoint(); @@ -324,14 +327,10 @@ private void pushTopologyToDataNodes( req, new AsyncMethodCallback() { @Override - public void onComplete(TDataNodeHeartbeatResp response) { - // No-op: topology push is fire-and-forget - } + public void onComplete(TDataNodeHeartbeatResp response) {} @Override - public void onError(Exception exception) { - // No-op: topology push failures are silently ignored - } + public void onError(Exception exception) {} }); lastPushedTopology.put(nodeId, new HashSet<>(reachableSet)); } catch (Exception e) { @@ -388,11 +387,7 @@ public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) { continue; } if (changeEvent.getLeft() == null) { - // if a new datanode registered, DO NOT trigger probing immediately - startingDataNodes.add(nodeId); continue; - } else { - startingDataNodes.remove(nodeId); } final Set> affectedPairs = @@ -404,13 +399,10 @@ public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) { .collect(Collectors.toSet()); if (changeEvent.getRight() == null) { - // datanode removed from cluster, clean up probing history affectedPairs.forEach(heartbeats::remove); } else { - // we only trigger probing immediately if node comes around from UNKNOWN to RUNNING if (NodeStatus.Unknown.equals(changeEvent.getLeft().getStatus()) && NodeStatus.Running.equals(changeEvent.getRight().getStatus())) { - // let's clear the history when a new node comes around affectedPairs.forEach(pair -> heartbeats.put(pair, new ArrayList<>())); awaitForSignal.signal(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 2e0f6fd84b5d4..ec691880a43d8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -423,7 +423,9 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS(); private static final ExecutorService TOPOLOGY_PROBING_EXECUTOR = - IoTDBThreadPoolFactory.newFixedThreadPool(2, ThreadName.DATANODE_TOPOLOGY_PROBING.getName()); + IoTDBThreadPoolFactory.newFixedThreadPool( + Math.max(1, Runtime.getRuntime().availableProcessors() / 4), + ThreadName.DATANODE_TOPOLOGY_PROBING.getName()); private final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java index 194d9f8c6cdf0..f7807d4a90941 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java @@ -43,7 +43,7 @@ public class ClusterTopology { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterTopology.class); private final Integer myself; private final AtomicReference> dataNodes; - private final AtomicReference>> topologyMap; + private final AtomicReference> myReachableNodes; private final AtomicBoolean isPartitioned = new AtomicBoolean(); public static ClusterTopology getInstance() { @@ -54,11 +54,10 @@ public TRegionReplicaSet getValidatedReplicaSet(TRegionReplicaSet origin) { if (!isPartitioned.get() || origin == null) { return origin; } - final Set reachableToMyself = - Collections.unmodifiableSet(topologyMap.get().get(myself)); + final Set reachable = myReachableNodes.get(); final List locations = new ArrayList<>(); for (final TDataNodeLocation location : origin.getDataNodeLocations()) { - if (reachableToMyself.contains(location.getDataNodeId())) { + if (reachable.contains(location.getDataNodeId())) { locations.add(location); } } @@ -97,82 +96,54 @@ private List getReachableCandidates(List a return all; } for (TRegionReplicaSet replicaSet : all) { - // some TRegionReplicaSet is unreachable since all DataNodes are down if (replicaSet.getDataNodeLocationsSize() == 0) { throw new ReplicaSetUnreachableException(replicaSet); } } - final Map> topologyMapCurrent = - Collections.unmodifiableMap(this.topologyMap.get()); - - // brute-force search to select DataNode candidates that can communicate to all - // TRegionReplicaSets - final List dataNodeCandidates = new ArrayList<>(); - for (final Integer datanode : topologyMapCurrent.keySet()) { - boolean reachableToAllSets = true; - final Set datanodeReachableToThis = topologyMapCurrent.get(datanode); - for (final TRegionReplicaSet replicaSet : all) { - final List replicaNodeLocations = - replicaSet.getDataNodeLocations().stream() - .map(TDataNodeLocation::getDataNodeId) - .collect(Collectors.toList()); - replicaNodeLocations.retainAll(datanodeReachableToThis); - reachableToAllSets = !replicaNodeLocations.isEmpty(); - } - if (reachableToAllSets) { - dataNodeCandidates.add(datanode); - } - } - // select TRegionReplicaSet candidates whose DataNode Locations contain at least one - // allReachableDataNodes + final Set reachable = this.myReachableNodes.get(); + final Map dataNodesCurrent = this.dataNodes.get(); + final List reachableSetCandidates = new ArrayList<>(); for (final TRegionReplicaSet replicaSet : all) { - final List commonLocations = + final List validLocations = replicaSet.getDataNodeLocations().stream() - .map(TDataNodeLocation::getDataNodeId) + .filter(loc -> reachable.contains(loc.getDataNodeId())) + .map(loc -> dataNodesCurrent.getOrDefault(loc.getDataNodeId(), loc)) .collect(Collectors.toList()); - commonLocations.retainAll(dataNodeCandidates); - if (!commonLocations.isEmpty()) { - final List validLocations = - commonLocations.stream().map(dataNodes.get()::get).collect(Collectors.toList()); - final TRegionReplicaSet validCandidate = - new TRegionReplicaSet(replicaSet.getRegionId(), validLocations); - reachableSetCandidates.add(validCandidate); + if (!validLocations.isEmpty()) { + reachableSetCandidates.add(new TRegionReplicaSet(replicaSet.getRegionId(), validLocations)); } } - return reachableSetCandidates; } public void updateTopology( final Map dataNodes, Map> latestTopology) { - if (!latestTopology.equals(topologyMap.get())) { - LOGGER.info("[Topology] latest view from config-node: {}", latestTopology); - for (int fromId : dataNodes.keySet()) { - for (int toId : dataNodes.keySet()) { - boolean originReachable = - this.topologyMap.get().getOrDefault(fromId, Collections.emptySet()).contains(toId); - boolean newReachable = - latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); - if (originReachable != newReachable) { - LOGGER.info( - "[Topology] Topology of DataNode {} is now {} to DataNode {}", - fromId, - newReachable ? "reachable" : "unreachable", - toId); - } + final Set newReachable = latestTopology.getOrDefault(myself, Collections.emptySet()); + final Set oldReachable = this.myReachableNodes.get(); + + if (!newReachable.equals(oldReachable)) { + LOGGER.info( + "[Topology] latest view from config-node for myself({}): {}", myself, newReachable); + for (int toId : dataNodes.keySet()) { + boolean wasReachable = oldReachable.contains(toId); + boolean nowReachable = newReachable.contains(toId); + if (wasReachable != nowReachable) { + LOGGER.info( + "[Topology] DataNode {} is now {} to myself({})", + toId, + nowReachable ? "reachable" : "unreachable", + myself); } } - this.topologyMap.set(latestTopology); + this.myReachableNodes.set(newReachable); } this.dataNodes.set(dataNodes); - if (latestTopology.get(myself) == null || latestTopology.get(myself).isEmpty()) { - // latest topology doesn't include this node information. - // This mostly happens when this node just starts and haven't report connection details. + if (newReachable.isEmpty()) { this.isPartitioned.set(false); } else { - this.isPartitioned.set(latestTopology.get(myself).size() != latestTopology.size()); + this.isPartitioned.set(newReachable.size() != dataNodes.size()); } } @@ -180,7 +151,7 @@ private ClusterTopology() { this.myself = IoTDBDescriptor.getInstance().getConfig().generateLocalDataNodeLocation().getDataNodeId(); this.isPartitioned.set(false); - this.topologyMap = new AtomicReference<>(Collections.emptyMap()); + this.myReachableNodes = new AtomicReference<>(Collections.emptySet()); this.dataNodes = new AtomicReference<>(Collections.emptyMap()); }