[ISSUE #10512] Eliminate per-RPC allocation in RemotingCommand (Guava Stopwatch, Constructor copy) and downgrade Netty writability log#10514
Conversation
…(Guava Stopwatch, Constructor copy) and downgrade Netty writability log
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR optimizes remoting command processing by reducing allocation overhead (Stopwatch removal, constructor caching, smaller HashMap defaults) and adds a faster header encoding path while also lowering channel writability log verbosity.
Changes:
- Replace Guava
Stopwatchtiming with aSystem.nanoTime()-based timestamp onRemotingCommand. - Cache
CommandCustomHeaderno-arg constructors and addfastEncodeHeaderAsBuffer. - Reduce per-command allocations (smaller extFields default capacity) and downgrade some writability logs to debug.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingContext.java | Adds a shared EMPTY instance for “no context” usage. |
| remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java | Constructor caching, new fast header encoding method, Stopwatch removal, extFields capacity change. |
| remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java | Lowers channel writability logs to debug and guards formatting work. |
| remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java | Switches decode timing from Stopwatch to nanoTime start timestamp. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public static final TopicQueueMappingContext EMPTY = new TopicQueueMappingContext(null, null, null, null, null); | ||
|
|
||
| private String topic; | ||
| private Integer globalId; | ||
| private TopicQueueMappingDetail mappingDetail; |
There was a problem hiding this comment.
TopicQueueMappingContext has only private fields with getters (getTopic(), getGlobalId(), getMappingDetail(), getMappingDetailAndGlobal(), getConsumerGroupMapping()). No public setters exist. The constructor is the only way to set fields, and EMPTY is created with all-null values. This pattern is consistent with Collections.emptyList() — an immutable sentinel.
| public ByteBuffer fastEncodeHeaderAsBuffer(final int bodyLength) { | ||
| ByteBuf buf = Unpooled.buffer(192); | ||
| int beginIndex = buf.writerIndex(); | ||
| buf.writeLong(0); | ||
| int headerSize; | ||
| if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { | ||
| if (customHeader != null && !(customHeader instanceof FastCodesHeader)) { | ||
| this.makeCustomHeaderToNet(); | ||
| } | ||
| headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, buf); | ||
| } else { | ||
| this.makeCustomHeaderToNet(); | ||
| byte[] header = RemotingSerializable.encode(this); | ||
| headerSize = header.length; | ||
| buf.writeBytes(header); | ||
| } | ||
| buf.setInt(beginIndex, 4 + headerSize + bodyLength); | ||
| buf.setInt(beginIndex + 4, markProtocolType(headerSize, serializeTypeCurrentRPC)); | ||
| ByteBuffer result = buf.nioBuffer(); | ||
| buf.release(); | ||
| return result; | ||
| } |
There was a problem hiding this comment.
Unpooled.buffer(192) creates a UnpooledHeapByteBuf (heap-allocated). Its nioBuffer() returns ByteBuffer.wrap(byte[]) — a view backed by the heap array, not by Netty reference-counted memory. After buf.release(), the heap byte[] remains valid (GC-managed). This would be a concern only for DirectByteBuf where release frees native memory, but heap ByteBuf is safe here.
| // Caches the no-arg constructor of each CommandCustomHeader class. | ||
| // Why: Class.getDeclaredConstructor() copies the Constructor object on every call | ||
| // (sample showed ~70MB of Constructor allocations during a 60s benchmark). | ||
| // The set of header classes is fixed at startup, so ConcurrentHashMap.computeIfAbsent | ||
| // pays the reflective lookup once per class and reuses the cached Constructor thereafter. | ||
| private static final Map<Class<?>, Constructor<?>> HEADER_CTOR_CACHE = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Good catch — the comment is misleading. Will update it to accurately describe the get + putIfAbsent pattern used.
| public long processTimerElapsedMs() { | ||
| return (System.nanoTime() - processTimerNanos) / 1_000_000; | ||
| } | ||
|
|
||
| public void setProcessTimer(Stopwatch processTimer) { | ||
| this.processTimer = processTimer; | ||
| public void setProcessTimerNanos(long nanos) { | ||
| this.processTimerNanos = nanos; | ||
| } |
There was a problem hiding this comment.
This API change is intentional — the old getProcessTimer()/setProcessTimer(Stopwatch) methods allocated a Guava Stopwatch per RPC. The new setProcessTimerNanos(long)/processTimerElapsedMs() uses primitive long with zero allocation. The only caller is NettyDecoder (updated in this same PR) and NettyRemotingAbstract (which calls processTimerElapsedMs()). No external callers depend on the old Stopwatch API.
| ctor = cls.getDeclaredConstructor(); | ||
| ctor.setAccessible(true); | ||
| HEADER_CTOR_CACHE.putIfAbsent(cls, ctor); | ||
| } | ||
| return (T) ctor.newInstance(); |
There was a problem hiding this comment.
The cached constructors are for CommandCustomHeader subclasses — these are user-defined classes in org.apache.rocketmq.remoting.protocol.header.*, not JDK internal modules. setAccessible(true) on user-code classes does not trigger JPMS restrictions. RocketMQ already uses this pattern extensively in decodeCommandCustomHeader() (the existing code calls getDeclaredConstructor().newInstance() without JPMS issues).
| if (log.isDebugEnabled()) { | ||
| log.debug("Channel[{}] turns writable, bytes to buffer before changing channel to un-writable: {}", | ||
| RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeUnwritable()); | ||
| } |
There was a problem hiding this comment.
This is an intentional tradeoff. Under high-throughput load, channelWritabilityChanged fires ~900 times/sec, each triggering parseChannelRemoteAddr() + String concatenation + AsyncAppender enqueue. This was measured as a significant per-RPC allocation source. DEBUG level with isDebugEnabled() guard preserves the information for troubleshooting while eliminating the hot-path cost. Same change was made in PR #10491.
| if (log.isDebugEnabled()) { | ||
| log.debug("Channel[{}] auto-read is disabled, bytes to drain before it turns writable: {}", | ||
| RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeWritable()); | ||
| } |
There was a problem hiding this comment.
Same as above — intentional log level downgrade to eliminate hot-path allocation. See reply on line 587.
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
Eliminates per-RPC allocations in the remoting framework: replaces Guava Stopwatch with System.nanoTime(), caches Constructor lookups, downgrades Netty writability log to DEBUG, and adds TopicQueueMappingContext.EMPTY singleton.
Findings
- [Info]
RemotingCommand.java—HEADER_CTOR_CACHEusesConcurrentHashMap.computeIfAbsent(). The mapping function callsctor.newInstance()which is reflective and could throw. Consider wrapping thecomputeIfAbsentlambda with a try-catch to avoid swallowing checked exceptions silently, or useget()with a null check +putIfAbsent()pattern for clearer error propagation. - [Info]
RemotingCommand.java—processTimerNanosdefaults to 0. IfsetProcessTimerNanos()is never called (e.g., code paths that bypassNettyDecoder),processTimerElapsedMs()returns 0. This is likely intentional but worth a brief comment documenting the contract. - [Info]
NettyRemotingServer.java— Good catch on the log amplification (~900 lines/sec). TheisDebugEnabled()guard is correct and avoids unnecessary string formatting.
Suggestions
- Consider adding a unit test for
newHeaderInstance()to verify the constructor cache works correctly for subclasses with different constructor signatures. - The
EMPTYsingleton inTopicQueueMappingContextis a good optimization. Ensure no code path mutates the singleton (e.g., callsetTopic()on it). If immutability cannot be guaranteed, consider returning a defensive copy or making the fields final.
Overall: Clean micro-optimizations with measurable impact on high-throughput paths. 👍
Automated review by github-manager-bot
cb09268 to
0d43e12
Compare
Summary
Four independent per-RPC micro-allocations eliminated in the remoting framework:
1. Guava Stopwatch → System.nanoTime()
RemotingCommandusedStopwatch.createStarted()which allocates a new object per RPC. Replaced withlong processTimerNanos+processTimerElapsedMs().2. Constructor cache
Class.getDeclaredConstructor()copies theConstructorobject on every call (~237 allocation events per 60s JFR). AddedConcurrentHashMap<Class<?>, Constructor<?>> HEADER_CTOR_CACHEwithcomputeIfAbsentto pay the reflective lookup once per class.3. Netty writability log downgrade
NettyRemotingServer.channelWritabilityChangedlogged at INFO/WARN on every writability change (~81,434 lines in 90s = ~900 lines/sec). Downgraded to DEBUG withisDebugEnabled()guard.4. TopicQueueMappingContext.EMPTY singleton
Non-static-topic messages (>99%) created empty
TopicQueueMappingContextobjects. Addedpublic static final EMPTYsingleton.Files Changed
remoting/.../protocol/RemotingCommand.javanewHeaderInstance()+setProcessTimerNanos()/processTimerElapsedMs()remoting/.../netty/NettyDecoder.javaStopwatch→System.nanoTime()+setProcessTimerNanos()remoting/.../netty/NettyRemotingServer.javachannelWritabilityChangedINFO/WARN → DEBUG withisDebugEnabled()guardremoting/.../protocol/statictopic/TopicQueueMappingContext.javaEMPTYstatic singleton