diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java index 19624d74028..f652af0778d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.remoting.netty; -import com.google.common.base.Stopwatch; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -39,14 +38,14 @@ public NettyDecoder() { @Override public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; - Stopwatch timer = Stopwatch.createStarted(); + long timerNanos = System.nanoTime(); try { frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } RemotingCommand cmd = RemotingCommand.decode(frame); - cmd.setProcessTimer(timer); + cmd.setProcessTimerNanos(timerNanos); return cmd; } catch (Exception e) { log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index a735f8455d3..fb165eddaf0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -247,7 +247,7 @@ public static void writeResponse(Channel channel, RemotingCommand request, @Null if (request.isOnewayRPC()) { if (attributesBuilder != null) { attributesBuilder.put(LABEL_RESULT, RESULT_ONEWAY); - remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); + remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build()); } return; } @@ -264,7 +264,7 @@ public static void writeResponse(Channel channel, RemotingCommand request, @Null } if (remotingMetricsManager != null) { attributesBuilder.put(LABEL_RESULT, remotingMetricsManager.getWriteAndFlushResult(future)); - remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); + remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build()); } if (callback != null) { callback.accept(future); @@ -276,7 +276,7 @@ public static void writeResponse(Channel channel, RemotingCommand request, @Null log.error(response.toString()); if (remotingMetricsManager != null) { attributesBuilder.put(LABEL_RESULT, RESULT_WRITE_CHANNEL_FAILED); - remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); + remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build()); } } @@ -299,7 +299,7 @@ public void writeResponse(Channel channel, RemotingCommand request, @Nullable Re if (request.isOnewayRPC()) { if (attributesBuilder != null) { attributesBuilder.put(LABEL_RESULT, RESULT_ONEWAY); - this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); + this.remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build()); } return; } @@ -316,7 +316,7 @@ public void writeResponse(Channel channel, RemotingCommand request, @Nullable Re } if (this.remotingMetricsManager != null && attributesBuilder != null) { attributesBuilder.put(LABEL_RESULT, this.remotingMetricsManager.getWriteAndFlushResult(future)); - this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); + this.remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build()); } if (callback != null) { callback.accept(future); @@ -328,7 +328,7 @@ public void writeResponse(Channel channel, RemotingCommand request, @Nullable Re log.error(response.toString()); if (this.remotingMetricsManager != null && attributesBuilder != null) { attributesBuilder.put(LABEL_RESULT, RESULT_WRITE_CHANNEL_FAILED); - this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); + this.remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build()); } } } @@ -396,7 +396,7 @@ public void processRequestCommand(final ChannelHandlerContext ctx, final Remotin AttributesBuilder attributesBuilder = remotingMetricsManager.newAttributesBuilder() .put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(cmd.getCode())) .put(LABEL_RESULT, RESULT_PROCESS_REQUEST_FAILED); - remotingMetricsManager.getRpcLatency().record(cmd.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); + remotingMetricsManager.getRpcLatency().record(cmd.processTimerElapsedMs(), attributesBuilder.build()); } } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index ab2e208f484..596d3393469 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -581,13 +581,17 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio if (channel.isWritable()) { if (!channel.config().isAutoRead()) { channel.config().setAutoRead(true); - log.info("Channel[{}] turns writable, bytes to buffer before changing channel to un-writable: {}", - RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeUnwritable()); + if (log.isDebugEnabled()) { + log.debug("Channel[{}] turns writable, bytes to buffer before changing channel to un-writable: {}", + RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeUnwritable()); + } } } else { channel.config().setAutoRead(false); - log.warn("Channel[{}] auto-read is disabled, bytes to drain before it turns writable: {}", - RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeWritable()); + if (log.isDebugEnabled()) { + log.debug("Channel[{}] auto-read is disabled, bytes to drain before it turns writable: {}", + RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeWritable()); + } } super.channelWritabilityChanged(ctx); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index e08a1627d15..75445428b37 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -17,7 +17,8 @@ package org.apache.rocketmq.remoting.protocol; import com.alibaba.fastjson2.annotation.JSONField; -import com.google.common.base.Stopwatch; + + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.apache.commons.lang3.StringUtils; @@ -31,6 +32,7 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import java.lang.annotation.Annotation; +import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Modifier; @@ -42,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; public class RemotingCommand { @@ -54,6 +57,12 @@ public class RemotingCommand { private static final Map, Field[]> CLASS_HASH_MAP = new HashMap<>(); private static final Map CANONICAL_NAME_CACHE = new HashMap<>(); + // Caches the no-arg constructor of each CommandCustomHeader class. + // Why: Class.getDeclaredConstructor() copies the Constructor object on every call + // (sample showed ~70MB of Constructor allocations during a 60s benchmark). + // The set of header classes is fixed at startup, so ConcurrentHashMap.computeIfAbsent + // pays the reflective lookup once per class and reuses the cached Constructor thereafter. + private static final Map, Constructor> HEADER_CTOR_CACHE = new ConcurrentHashMap<>(); // 1, Oneway // 1, RESPONSE_COMMAND private static final Map NULLABLE_FIELD_CACHE = new HashMap<>(); @@ -97,7 +106,7 @@ public class RemotingCommand { private transient byte[] body; private boolean suspended; - private transient Stopwatch processTimer; + private transient long processTimerNanos; private transient List callbackList; protected RemotingCommand() { @@ -159,8 +168,7 @@ public static RemotingCommand createResponseCommand(int code, String remark, if (classHeader != null) { try { - CommandCustomHeader objectHeader = classHeader.getDeclaredConstructor().newInstance(); - cmd.customHeader = objectHeader; + cmd.customHeader = newHeaderInstance(classHeader); } catch (InstantiationException e) { return null; } catch (IllegalAccessException e) { @@ -175,6 +183,18 @@ public static RemotingCommand createResponseCommand(int code, String remark, return cmd; } + @SuppressWarnings("unchecked") + private static T newHeaderInstance(Class cls) + throws InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException { + Constructor ctor = HEADER_CTOR_CACHE.get(cls); + if (ctor == null) { + ctor = cls.getDeclaredConstructor(); + ctor.setAccessible(true); + HEADER_CTOR_CACHE.putIfAbsent(cls, ctor); + } + return (T) ctor.newInstance(); + } + public static RemotingCommand createResponseCommand(int code, String remark) { return createResponseCommand(code, remark, null); } @@ -283,7 +303,7 @@ public T decodeCommandCustomHeaderDirectly(Class boolean useFastEncode) throws RemotingCommandException { T objectHeader; try { - objectHeader = classHeader.getDeclaredConstructor().newInstance(); + objectHeader = newHeaderInstance(classHeader); } catch (Exception e) { return null; } @@ -509,6 +529,29 @@ public ByteBuffer encodeHeader(final int bodyLength) { return result; } + public ByteBuffer fastEncodeHeaderAsBuffer(final int bodyLength) { + ByteBuf buf = Unpooled.buffer(192); + int beginIndex = buf.writerIndex(); + buf.writeLong(0); + int headerSize; + if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { + if (customHeader != null && !(customHeader instanceof FastCodesHeader)) { + this.makeCustomHeaderToNet(); + } + headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, buf); + } else { + this.makeCustomHeaderToNet(); + byte[] header = RemotingSerializable.encode(this); + headerSize = header.length; + buf.writeBytes(header); + } + buf.setInt(beginIndex, 4 + headerSize + bodyLength); + buf.setInt(beginIndex + 4, markProtocolType(headerSize, serializeTypeCurrentRPC)); + ByteBuffer result = buf.nioBuffer(); + buf.release(); + return result; + } + public void markOnewayRPC() { int bits = 1 << RPC_ONEWAY; this.flag |= bits; @@ -611,7 +654,10 @@ public void setExtFields(HashMap extFields) { public void addExtField(String key, String value) { if (null == extFields) { - extFields = new HashMap<>(256); + // Default capacity (16) is plenty for the typical few extFields plus + // a CustomHeader's reflected fields. Capacity 256 was 16x oversized + // and allocated a Node[256] per command on the send/response path. + extFields = new HashMap<>(); } extFields.put(key, value); } @@ -635,12 +681,12 @@ public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) { this.serializeTypeCurrentRPC = serializeTypeCurrentRPC; } - public Stopwatch getProcessTimer() { - return processTimer; + public long processTimerElapsedMs() { + return (System.nanoTime() - processTimerNanos) / 1_000_000; } - public void setProcessTimer(Stopwatch processTimer) { - this.processTimer = processTimer; + public void setProcessTimerNanos(long nanos) { + this.processTimerNanos = nanos; } public List getCallbackList() { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingContext.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingContext.java index 81718c8bc11..36873de2b4a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingContext.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingContext.java @@ -20,6 +20,8 @@ import java.util.List; public class TopicQueueMappingContext { + public static final TopicQueueMappingContext EMPTY = new TopicQueueMappingContext(null, null, null, null, null); + private String topic; private Integer globalId; private TopicQueueMappingDetail mappingDetail;