-
Notifications
You must be signed in to change notification settings - Fork 12k
[ISSUE #10512] Eliminate per-RPC allocation in RemotingCommand (Guava Stopwatch, Constructor copy) and downgrade Netty writability log #10514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -581,13 +581,17 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio | |
| if (channel.isWritable()) { | ||
| if (!channel.config().isAutoRead()) { | ||
| channel.config().setAutoRead(true); | ||
| log.info("Channel[{}] turns writable, bytes to buffer before changing channel to un-writable: {}", | ||
| RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeUnwritable()); | ||
| if (log.isDebugEnabled()) { | ||
| log.debug("Channel[{}] turns writable, bytes to buffer before changing channel to un-writable: {}", | ||
| RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeUnwritable()); | ||
| } | ||
| } | ||
| } else { | ||
| channel.config().setAutoRead(false); | ||
| log.warn("Channel[{}] auto-read is disabled, bytes to drain before it turns writable: {}", | ||
| RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeWritable()); | ||
| if (log.isDebugEnabled()) { | ||
| log.debug("Channel[{}] auto-read is disabled, bytes to drain before it turns writable: {}", | ||
| RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeWritable()); | ||
| } | ||
|
Comment on lines
+591
to
+594
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above — intentional log level downgrade to eliminate hot-path allocation. See reply on line 587. |
||
| } | ||
| super.channelWritabilityChanged(ctx); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,8 @@ | |
| package org.apache.rocketmq.remoting.protocol; | ||
|
|
||
| import com.alibaba.fastjson2.annotation.JSONField; | ||
| import com.google.common.base.Stopwatch; | ||
|
|
||
|
|
||
| import io.netty.buffer.ByteBuf; | ||
| import io.netty.buffer.Unpooled; | ||
| import org.apache.commons.lang3.StringUtils; | ||
|
|
@@ -31,6 +32,7 @@ | |
| import org.apache.rocketmq.remoting.exception.RemotingCommandException; | ||
|
|
||
| import java.lang.annotation.Annotation; | ||
| import java.lang.reflect.Constructor; | ||
| import java.lang.reflect.Field; | ||
| import java.lang.reflect.InvocationTargetException; | ||
| import java.lang.reflect.Modifier; | ||
|
|
@@ -42,6 +44,7 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| public class RemotingCommand { | ||
|
|
@@ -54,6 +57,12 @@ public class RemotingCommand { | |
| private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP = | ||
| new HashMap<>(); | ||
| private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<>(); | ||
| // Caches the no-arg constructor of each CommandCustomHeader class. | ||
| // Why: Class.getDeclaredConstructor() copies the Constructor object on every call | ||
| // (sample showed ~70MB of Constructor allocations during a 60s benchmark). | ||
| // The set of header classes is fixed at startup, so ConcurrentHashMap.computeIfAbsent | ||
| // pays the reflective lookup once per class and reuses the cached Constructor thereafter. | ||
| private static final Map<Class<?>, Constructor<?>> HEADER_CTOR_CACHE = new ConcurrentHashMap<>(); | ||
|
Comment on lines
+60
to
+65
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch — the comment is misleading. Will update it to accurately describe the |
||
| // 1, Oneway | ||
| // 1, RESPONSE_COMMAND | ||
| private static final Map<Field, Boolean> NULLABLE_FIELD_CACHE = new HashMap<>(); | ||
|
|
@@ -97,7 +106,7 @@ public class RemotingCommand { | |
|
|
||
| private transient byte[] body; | ||
| private boolean suspended; | ||
| private transient Stopwatch processTimer; | ||
| private transient long processTimerNanos; | ||
| private transient List<CommandCallback> callbackList; | ||
|
|
||
| protected RemotingCommand() { | ||
|
|
@@ -159,8 +168,7 @@ public static RemotingCommand createResponseCommand(int code, String remark, | |
|
|
||
| if (classHeader != null) { | ||
| try { | ||
| CommandCustomHeader objectHeader = classHeader.getDeclaredConstructor().newInstance(); | ||
| cmd.customHeader = objectHeader; | ||
| cmd.customHeader = newHeaderInstance(classHeader); | ||
| } catch (InstantiationException e) { | ||
| return null; | ||
| } catch (IllegalAccessException e) { | ||
|
|
@@ -175,6 +183,18 @@ public static RemotingCommand createResponseCommand(int code, String remark, | |
| return cmd; | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private static <T> T newHeaderInstance(Class<T> cls) | ||
| throws InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException { | ||
| Constructor<?> ctor = HEADER_CTOR_CACHE.get(cls); | ||
| if (ctor == null) { | ||
| ctor = cls.getDeclaredConstructor(); | ||
| ctor.setAccessible(true); | ||
| HEADER_CTOR_CACHE.putIfAbsent(cls, ctor); | ||
| } | ||
| return (T) ctor.newInstance(); | ||
|
Comment on lines
+191
to
+195
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The cached constructors are for |
||
| } | ||
|
|
||
| public static RemotingCommand createResponseCommand(int code, String remark) { | ||
| return createResponseCommand(code, remark, null); | ||
| } | ||
|
|
@@ -283,7 +303,7 @@ public <T extends CommandCustomHeader> T decodeCommandCustomHeaderDirectly(Class | |
| boolean useFastEncode) throws RemotingCommandException { | ||
| T objectHeader; | ||
| try { | ||
| objectHeader = classHeader.getDeclaredConstructor().newInstance(); | ||
| objectHeader = newHeaderInstance(classHeader); | ||
| } catch (Exception e) { | ||
| return null; | ||
| } | ||
|
|
@@ -509,6 +529,29 @@ public ByteBuffer encodeHeader(final int bodyLength) { | |
| return result; | ||
| } | ||
|
|
||
| public ByteBuffer fastEncodeHeaderAsBuffer(final int bodyLength) { | ||
| ByteBuf buf = Unpooled.buffer(192); | ||
| int beginIndex = buf.writerIndex(); | ||
| buf.writeLong(0); | ||
| int headerSize; | ||
| if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { | ||
| if (customHeader != null && !(customHeader instanceof FastCodesHeader)) { | ||
| this.makeCustomHeaderToNet(); | ||
| } | ||
| headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, buf); | ||
| } else { | ||
| this.makeCustomHeaderToNet(); | ||
| byte[] header = RemotingSerializable.encode(this); | ||
| headerSize = header.length; | ||
| buf.writeBytes(header); | ||
| } | ||
| buf.setInt(beginIndex, 4 + headerSize + bodyLength); | ||
| buf.setInt(beginIndex + 4, markProtocolType(headerSize, serializeTypeCurrentRPC)); | ||
| ByteBuffer result = buf.nioBuffer(); | ||
| buf.release(); | ||
| return result; | ||
| } | ||
|
Comment on lines
+532
to
+553
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| public void markOnewayRPC() { | ||
| int bits = 1 << RPC_ONEWAY; | ||
| this.flag |= bits; | ||
|
|
@@ -611,7 +654,10 @@ public void setExtFields(HashMap<String, String> extFields) { | |
|
|
||
| public void addExtField(String key, String value) { | ||
| if (null == extFields) { | ||
| extFields = new HashMap<>(256); | ||
| // Default capacity (16) is plenty for the typical few extFields plus | ||
| // a CustomHeader's reflected fields. Capacity 256 was 16x oversized | ||
| // and allocated a Node[256] per command on the send/response path. | ||
| extFields = new HashMap<>(); | ||
| } | ||
| extFields.put(key, value); | ||
| } | ||
|
|
@@ -635,12 +681,12 @@ public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) { | |
| this.serializeTypeCurrentRPC = serializeTypeCurrentRPC; | ||
| } | ||
|
|
||
| public Stopwatch getProcessTimer() { | ||
| return processTimer; | ||
| public long processTimerElapsedMs() { | ||
| return (System.nanoTime() - processTimerNanos) / 1_000_000; | ||
| } | ||
|
|
||
| public void setProcessTimer(Stopwatch processTimer) { | ||
| this.processTimer = processTimer; | ||
| public void setProcessTimerNanos(long nanos) { | ||
| this.processTimerNanos = nanos; | ||
| } | ||
|
Comment on lines
+684
to
690
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This API change is intentional — the old |
||
|
|
||
| public List<CommandCallback> getCallbackList() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,8 @@ | |
| import java.util.List; | ||
|
|
||
| public class TopicQueueMappingContext { | ||
| public static final TopicQueueMappingContext EMPTY = new TopicQueueMappingContext(null, null, null, null, null); | ||
|
|
||
| private String topic; | ||
| private Integer globalId; | ||
| private TopicQueueMappingDetail mappingDetail; | ||
|
Comment on lines
+23
to
27
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an intentional tradeoff. Under high-throughput load,
channelWritabilityChangedfires ~900 times/sec, each triggeringparseChannelRemoteAddr()+ String concatenation + AsyncAppender enqueue. This was measured as a significant per-RPC allocation source. DEBUG level withisDebugEnabled()guard preserves the information for troubleshooting while eliminating the hot-path cost. Same change was made in PR #10491.