Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.remoting.netty;

import com.google.common.base.Stopwatch;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
Expand All @@ -39,14 +38,14 @@ public NettyDecoder() {
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
Stopwatch timer = Stopwatch.createStarted();
long timerNanos = System.nanoTime();
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
RemotingCommand cmd = RemotingCommand.decode(frame);
cmd.setProcessTimer(timer);
cmd.setProcessTimerNanos(timerNanos);
return cmd;
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public static void writeResponse(Channel channel, RemotingCommand request, @Null
if (request.isOnewayRPC()) {
if (attributesBuilder != null) {
attributesBuilder.put(LABEL_RESULT, RESULT_ONEWAY);
remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build());
}
return;
}
Expand All @@ -264,7 +264,7 @@ public static void writeResponse(Channel channel, RemotingCommand request, @Null
}
if (remotingMetricsManager != null) {
attributesBuilder.put(LABEL_RESULT, remotingMetricsManager.getWriteAndFlushResult(future));
remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build());
}
if (callback != null) {
callback.accept(future);
Expand All @@ -276,7 +276,7 @@ public static void writeResponse(Channel channel, RemotingCommand request, @Null
log.error(response.toString());
if (remotingMetricsManager != null) {
attributesBuilder.put(LABEL_RESULT, RESULT_WRITE_CHANNEL_FAILED);
remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build());
}
}

Expand All @@ -299,7 +299,7 @@ public void writeResponse(Channel channel, RemotingCommand request, @Nullable Re
if (request.isOnewayRPC()) {
if (attributesBuilder != null) {
attributesBuilder.put(LABEL_RESULT, RESULT_ONEWAY);
this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
this.remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build());
}
return;
}
Expand All @@ -316,7 +316,7 @@ public void writeResponse(Channel channel, RemotingCommand request, @Nullable Re
}
if (this.remotingMetricsManager != null && attributesBuilder != null) {
attributesBuilder.put(LABEL_RESULT, this.remotingMetricsManager.getWriteAndFlushResult(future));
this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
this.remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build());
}
if (callback != null) {
callback.accept(future);
Expand All @@ -328,7 +328,7 @@ public void writeResponse(Channel channel, RemotingCommand request, @Nullable Re
log.error(response.toString());
if (this.remotingMetricsManager != null && attributesBuilder != null) {
attributesBuilder.put(LABEL_RESULT, RESULT_WRITE_CHANNEL_FAILED);
this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
this.remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build());
}
}
}
Expand Down Expand Up @@ -396,7 +396,7 @@ public void processRequestCommand(final ChannelHandlerContext ctx, final Remotin
AttributesBuilder attributesBuilder = remotingMetricsManager.newAttributesBuilder()
.put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(cmd.getCode()))
.put(LABEL_RESULT, RESULT_PROCESS_REQUEST_FAILED);
remotingMetricsManager.getRpcLatency().record(cmd.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
remotingMetricsManager.getRpcLatency().record(cmd.processTimerElapsedMs(), attributesBuilder.build());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,13 +581,17 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
if (channel.isWritable()) {
if (!channel.config().isAutoRead()) {
channel.config().setAutoRead(true);
log.info("Channel[{}] turns writable, bytes to buffer before changing channel to un-writable: {}",
RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeUnwritable());
if (log.isDebugEnabled()) {
log.debug("Channel[{}] turns writable, bytes to buffer before changing channel to un-writable: {}",
RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeUnwritable());
}
Comment on lines +584 to +587

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an intentional tradeoff. Under high-throughput load, channelWritabilityChanged fires ~900 times/sec, each triggering parseChannelRemoteAddr() + String concatenation + AsyncAppender enqueue. This was measured as a significant per-RPC allocation source. DEBUG level with isDebugEnabled() guard preserves the information for troubleshooting while eliminating the hot-path cost. Same change was made in PR #10491.

}
} else {
channel.config().setAutoRead(false);
log.warn("Channel[{}] auto-read is disabled, bytes to drain before it turns writable: {}",
RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeWritable());
if (log.isDebugEnabled()) {
log.debug("Channel[{}] auto-read is disabled, bytes to drain before it turns writable: {}",
RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeWritable());
}
Comment on lines +591 to +594

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above — intentional log level downgrade to eliminate hot-path allocation. See reply on line 587.

}
super.channelWritabilityChanged(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package org.apache.rocketmq.remoting.protocol;

import com.alibaba.fastjson2.annotation.JSONField;
import com.google.common.base.Stopwatch;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -31,6 +32,7 @@
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
Expand All @@ -42,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class RemotingCommand {
Expand All @@ -54,6 +57,12 @@ public class RemotingCommand {
private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
new HashMap<>();
private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<>();
// Caches the no-arg constructor of each CommandCustomHeader class.
// Why: Class.getDeclaredConstructor() copies the Constructor object on every call
// (sample showed ~70MB of Constructor allocations during a 60s benchmark).
// The set of header classes is fixed at startup, so ConcurrentHashMap.computeIfAbsent
// pays the reflective lookup once per class and reuses the cached Constructor thereafter.
private static final Map<Class<?>, Constructor<?>> HEADER_CTOR_CACHE = new ConcurrentHashMap<>();
Comment on lines +60 to +65

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — the comment is misleading. Will update it to accurately describe the get + putIfAbsent pattern used.

// 1, Oneway
// 1, RESPONSE_COMMAND
private static final Map<Field, Boolean> NULLABLE_FIELD_CACHE = new HashMap<>();
Expand Down Expand Up @@ -97,7 +106,7 @@ public class RemotingCommand {

private transient byte[] body;
private boolean suspended;
private transient Stopwatch processTimer;
private transient long processTimerNanos;
private transient List<CommandCallback> callbackList;

protected RemotingCommand() {
Expand Down Expand Up @@ -159,8 +168,7 @@ public static RemotingCommand createResponseCommand(int code, String remark,

if (classHeader != null) {
try {
CommandCustomHeader objectHeader = classHeader.getDeclaredConstructor().newInstance();
cmd.customHeader = objectHeader;
cmd.customHeader = newHeaderInstance(classHeader);
} catch (InstantiationException e) {
return null;
} catch (IllegalAccessException e) {
Expand All @@ -175,6 +183,18 @@ public static RemotingCommand createResponseCommand(int code, String remark,
return cmd;
}

@SuppressWarnings("unchecked")
private static <T> T newHeaderInstance(Class<T> cls)
throws InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
Constructor<?> ctor = HEADER_CTOR_CACHE.get(cls);
if (ctor == null) {
ctor = cls.getDeclaredConstructor();
ctor.setAccessible(true);
HEADER_CTOR_CACHE.putIfAbsent(cls, ctor);
}
return (T) ctor.newInstance();
Comment on lines +191 to +195

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cached constructors are for CommandCustomHeader subclasses — these are user-defined classes in org.apache.rocketmq.remoting.protocol.header.*, not JDK internal modules. setAccessible(true) on user-code classes does not trigger JPMS restrictions. RocketMQ already uses this pattern extensively in decodeCommandCustomHeader() (the existing code calls getDeclaredConstructor().newInstance() without JPMS issues).

}

public static RemotingCommand createResponseCommand(int code, String remark) {
return createResponseCommand(code, remark, null);
}
Expand Down Expand Up @@ -283,7 +303,7 @@ public <T extends CommandCustomHeader> T decodeCommandCustomHeaderDirectly(Class
boolean useFastEncode) throws RemotingCommandException {
T objectHeader;
try {
objectHeader = classHeader.getDeclaredConstructor().newInstance();
objectHeader = newHeaderInstance(classHeader);
} catch (Exception e) {
return null;
}
Expand Down Expand Up @@ -509,6 +529,29 @@ public ByteBuffer encodeHeader(final int bodyLength) {
return result;
}

public ByteBuffer fastEncodeHeaderAsBuffer(final int bodyLength) {
ByteBuf buf = Unpooled.buffer(192);
int beginIndex = buf.writerIndex();
buf.writeLong(0);
int headerSize;
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
if (customHeader != null && !(customHeader instanceof FastCodesHeader)) {
this.makeCustomHeaderToNet();
}
headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, buf);
} else {
this.makeCustomHeaderToNet();
byte[] header = RemotingSerializable.encode(this);
headerSize = header.length;
buf.writeBytes(header);
}
buf.setInt(beginIndex, 4 + headerSize + bodyLength);
buf.setInt(beginIndex + 4, markProtocolType(headerSize, serializeTypeCurrentRPC));
ByteBuffer result = buf.nioBuffer();
buf.release();
return result;
}
Comment on lines +532 to +553

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unpooled.buffer(192) creates a UnpooledHeapByteBuf (heap-allocated). Its nioBuffer() returns ByteBuffer.wrap(byte[]) — a view backed by the heap array, not by Netty reference-counted memory. After buf.release(), the heap byte[] remains valid (GC-managed). This would be a concern only for DirectByteBuf where release frees native memory, but heap ByteBuf is safe here.


public void markOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
this.flag |= bits;
Expand Down Expand Up @@ -611,7 +654,10 @@ public void setExtFields(HashMap<String, String> extFields) {

public void addExtField(String key, String value) {
if (null == extFields) {
extFields = new HashMap<>(256);
// Default capacity (16) is plenty for the typical few extFields plus
// a CustomHeader's reflected fields. Capacity 256 was 16x oversized
// and allocated a Node[256] per command on the send/response path.
extFields = new HashMap<>();
}
extFields.put(key, value);
}
Expand All @@ -635,12 +681,12 @@ public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) {
this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
}

public Stopwatch getProcessTimer() {
return processTimer;
public long processTimerElapsedMs() {
return (System.nanoTime() - processTimerNanos) / 1_000_000;
}

public void setProcessTimer(Stopwatch processTimer) {
this.processTimer = processTimer;
public void setProcessTimerNanos(long nanos) {
this.processTimerNanos = nanos;
}
Comment on lines +684 to 690

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API change is intentional — the old getProcessTimer()/setProcessTimer(Stopwatch) methods allocated a Guava Stopwatch per RPC. The new setProcessTimerNanos(long)/processTimerElapsedMs() uses primitive long with zero allocation. The only caller is NettyDecoder (updated in this same PR) and NettyRemotingAbstract (which calls processTimerElapsedMs()). No external callers depend on the old Stopwatch API.


public List<CommandCallback> getCallbackList() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.List;

public class TopicQueueMappingContext {
public static final TopicQueueMappingContext EMPTY = new TopicQueueMappingContext(null, null, null, null, null);

private String topic;
private Integer globalId;
private TopicQueueMappingDetail mappingDetail;
Comment on lines +23 to 27

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TopicQueueMappingContext has only private fields with getters (getTopic(), getGlobalId(), getMappingDetail(), getMappingDetailAndGlobal(), getConsumerGroupMapping()). No public setters exist. The constructor is the only way to set fields, and EMPTY is created with all-null values. This pattern is consistent with Collections.emptyList() — an immutable sentinel.

Expand Down
Loading