Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
if (!fastForward && !peer.isRelayPeer()) {
check(peer, blockMessage);
}
peer.setBlockRcvTime(System.currentTimeMillis());

if (peer.getSyncBlockRequested().containsKey(blockId)) {
peer.getSyncBlockRequested().remove(blockId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.utils.Sha256Hash;
import org.tron.core.capsule.BlockCapsule.BlockId;
import org.tron.core.config.args.Args;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.TronMessage;
Expand Down Expand Up @@ -40,7 +41,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) {
peer.getAdvInvReceive().put(item, System.currentTimeMillis());
advService.addInv(item);
if (type.equals(InventoryType.BLOCK) && peer.getAdvInvSpread().getIfPresent(item) == null) {
peer.setLastInteractiveTime(System.currentTimeMillis());
long headNum = tronNetDelegate.getHeadBlockId().getNum();
if (new BlockId(id).getNum() > headNum) {
peer.setLastInteractiveTime(System.currentTimeMillis());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ public class PeerConnection {
@Setter
private volatile long lastInteractiveTime;

@Setter
@Getter
private volatile long blockRcvTime;

@Setter
@Getter
private volatile long blockRcvTimeCmp;

@Getter
@Setter
private volatile TronState tronState = TronState.INIT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ResilienceService {

@Autowired
private ChainBaseManager chainBaseManager;

public void init() {
if (Args.getInstance().isOpenFullTcpDisconnect) {
executor.scheduleWithFixedDelay(() -> {
Expand Down Expand Up @@ -86,6 +86,7 @@ private void disconnectRandom() {
.collect(Collectors.toList());

if (peers.size() >= minBroadcastPeerSize) {
peers = getRandomDisconnectionPeers(peers);
long now = System.currentTimeMillis();
Map<Object, Integer> weights = new HashMap<>();
peers.forEach(peer -> {
Expand Down Expand Up @@ -121,6 +122,12 @@ private void disconnectRandom() {
}


private List<PeerConnection> getRandomDisconnectionPeers(List<PeerConnection> peers) {
peers.forEach(p -> p.setBlockRcvTimeCmp(p.getBlockRcvTime()));
peers.sort(Comparator.comparingLong(PeerConnection::getBlockRcvTimeCmp));
return peers.subList(0, peers.size() / 2);
}

private void disconnectLan() {
if (!isLanNode()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,22 @@ public void testProcessMessage() {
}
}

@Test
public void testBlockRcvTimeIsSet() throws Exception {
BlockCapsule blockCapsule = new BlockCapsule(1, Sha256Hash.ZERO_HASH,
System.currentTimeMillis() + 1000, Sha256Hash.ZERO_HASH.getByteString());
BlockMessage msg = new BlockMessage(blockCapsule);
// put block in syncBlockRequested so check() is bypassed (sync path)
peer.getSyncBlockRequested().put(msg.getBlockId(), System.currentTimeMillis());

long before = System.currentTimeMillis();
handler.processMessage(peer, msg);
long after = System.currentTimeMillis();

Assert.assertTrue("blockRcvTime should be set",
peer.getBlockRcvTime() >= before && peer.getBlockRcvTime() <= after);
}

@Test
public void testProcessBlock() {
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package org.tron.core.net.messagehandler;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;

import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.tron.common.TestConstants;
import org.tron.common.utils.Sha256Hash;
import org.tron.core.capsule.BlockCapsule.BlockId;
import org.tron.core.config.args.Args;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.adv.InventoryMessage;
import org.tron.core.net.peer.PeerConnection;
import org.tron.core.net.service.adv.AdvService;
import org.tron.p2p.connection.Channel;
import org.tron.protos.Protocol.Inventory.InventoryType;

Expand Down Expand Up @@ -51,6 +57,85 @@ public void testProcessMessage() throws Exception {
handler.processMessage(peer, msg);
}

@Test
public void testLastInteractiveTimeNotUpdatedForSolidifiedBlock() throws Exception {
InventoryMsgHandler handler = new InventoryMsgHandler();
Args.setParam(new String[]{}, TestConstants.TEST_CONF);

TronNetDelegate tronNetDelegate = mock(TronNetDelegate.class);
AdvService advService = mock(AdvService.class);
Mockito.when(advService.addInv(any())).thenReturn(true);
// block num 100 is at head boundary — should NOT update
Mockito.when(tronNetDelegate.getHeadBlockId())
.thenReturn(new BlockId(Sha256Hash.ZERO_HASH, 100L));

Field delegateField = handler.getClass().getDeclaredField("tronNetDelegate");
delegateField.setAccessible(true);
delegateField.set(handler, tronNetDelegate);
Field advField = handler.getClass().getDeclaredField("advService");
advField.setAccessible(true);
advField.set(handler, advService);

PeerConnection peer = new PeerConnection();
peer.setChannel(getChannel("1.0.0.4", 1001));
peer.setNeedSyncFromPeer(false);
peer.setNeedSyncFromUs(false);
peer.setLastInteractiveTime(0L);

// Block hash encodes num=100 (at solidified boundary — should NOT update)
Sha256Hash blockHash = new BlockId(Sha256Hash.ZERO_HASH, 100L);
InventoryMessage msg = new InventoryMessage(Arrays.asList(blockHash), InventoryType.BLOCK);
handler.processMessage(peer, msg);

Assert.assertEquals("lastInteractiveTime should NOT be updated for solidified block",
0L, peer.getLastInteractiveTime());
}

@Test
public void testLastInteractiveTimeUpdatedForBothPeersWithSameAboveSolidifiedBlock()
throws Exception {
InventoryMsgHandler handler = new InventoryMsgHandler();
Args.setParam(new String[]{}, TestConstants.TEST_CONF);

TronNetDelegate tronNetDelegate = mock(TronNetDelegate.class);
AdvService advService = mock(AdvService.class);
// First call returns true (peer1), second call returns false (peer2 — already in cache)
Mockito.when(advService.addInv(any())).thenReturn(true).thenReturn(false);
Mockito.when(tronNetDelegate.getHeadBlockId())
.thenReturn(new BlockId(Sha256Hash.ZERO_HASH, 99L));

Field delegateField = handler.getClass().getDeclaredField("tronNetDelegate");
delegateField.setAccessible(true);
delegateField.set(handler, tronNetDelegate);
Field advField = handler.getClass().getDeclaredField("advService");
advField.setAccessible(true);
advField.set(handler, advService);

PeerConnection peer1 = new PeerConnection();
peer1.setChannel(getChannel("1.0.0.5", 1002));
peer1.setNeedSyncFromPeer(false);
peer1.setNeedSyncFromUs(false);
peer1.setLastInteractiveTime(0L);

PeerConnection peer2 = new PeerConnection();
peer2.setChannel(getChannel("1.0.0.6", 1003));
peer2.setNeedSyncFromPeer(false);
peer2.setNeedSyncFromUs(false);
peer2.setLastInteractiveTime(0L);

// block num 100 > solidified 99 — both peers should update
Sha256Hash blockHash = new BlockId(Sha256Hash.ZERO_HASH, 100L);
InventoryMessage msg = new InventoryMessage(Arrays.asList(blockHash), InventoryType.BLOCK);

handler.processMessage(peer1, msg);
handler.processMessage(peer2, msg);

Assert.assertTrue("peer1 lastInteractiveTime should be updated",
peer1.getLastInteractiveTime() > 0L);
Assert.assertTrue("peer2 lastInteractiveTime should be updated even when addInv returns false",
peer2.getLastInteractiveTime() > 0L);
}

private Channel getChannel(String host, int port) throws Exception {
Channel channel = new Channel();
InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Resource;
import org.junit.After;
Expand Down Expand Up @@ -97,6 +98,57 @@ public void testDisconnectRandom() {
Assert.assertEquals(maxConnection - 1, PeerManager.getPeers().size());
}

@Test
public void testDisconnectRandomPreservesRecentBlockRcvTimePeer() {
int maxConnection = 30;
Assert.assertEquals(0, PeerManager.getPeers().size());

ApplicationContext ctx = (ApplicationContext) ReflectUtils.getFieldObject(p2pEventHandler,
"ctx");

// Create maxConnection + 1 peers (triggers disconnectRandom)
for (int i = 0; i < maxConnection + 1; i++) {
InetSocketAddress inetSocketAddress = new InetSocketAddress("202.0.0." + i, 10001);
Channel c1 = spy(Channel.class);
ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress);
ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress());
ReflectUtils.setFieldValue(c1, "ctx", spy(ChannelHandlerContext.class));
Mockito.doNothing().when(c1).send((byte[]) any());
PeerManager.add(ctx, c1);
}

// Set first minBroadcastPeerSize peers as broadcast-state
List<PeerConnection> peers = PeerManager.getPeers();
for (PeerConnection peer : peers.subList(0, ResilienceService.minBroadcastPeerSize)) {
peer.setNeedSyncFromPeer(false);
peer.setNeedSyncFromUs(false);
peer.setLastInteractiveTime(System.currentTimeMillis() - 1000);
}
for (PeerConnection peer : peers.subList(ResilienceService.minBroadcastPeerSize,
maxConnection + 1)) {
peer.setNeedSyncFromPeer(false);
peer.setNeedSyncFromUs(true);
}

// Give the LAST broadcast peer a very recent blockRcvTime — it must NOT be disconnected
PeerConnection bestPeer = peers.stream()
.filter(p -> !p.isNeedSyncFromUs() && !p.isNeedSyncFromPeer())
.reduce((a, b) -> b) // last broadcast peer
.orElseThrow(() -> new AssertionError("no broadcast peer"));
bestPeer.setBlockRcvTime(System.currentTimeMillis());

InetSocketAddress bestPeerAddress = bestPeer.getChannel().getInetSocketAddress();

// With minBroadcastPeerSize=3 broadcast peers, getRandomDisconnectionPeers returns
// the 1 peer with oldest blockRcvTime (0). bestPeer has most recent time → exempt.
ReflectUtils.invokeMethod(service, "disconnectRandom");

boolean bestPeerStillConnected = PeerManager.getPeers().stream()
.anyMatch(p -> p.getChannel().getInetSocketAddress().equals(bestPeerAddress));
Assert.assertTrue("Peer with most recent blockRcvTime should not be disconnected",
bestPeerStillConnected);
}

@Test
public void testDisconnectLan() {
int minConnection = 8;
Expand Down