Before Creating the Enhancement Request
Summary
In the Proxy receive-message path, GrpcConverter.buildMessage() uses ByteString.copyFrom(messageExt.getBody()) to set the message body in the gRPC response. ByteString.copyFrom() allocates a new byte[] and copies the entire body content via System.arraycopy, even though the source byte[] is a standalone allocation that is never mutated after this point. This can be replaced with UnsafeByteOperations.unsafeWrap() to wrap the existing byte[] by reference (zero-copy).
Motivation
On the Proxy receive path (pop message from Broker → gRPC response to client), the message body undergoes the following copy chain:
| Step |
Location |
Operation |
Copy? |
| 1 |
RemotingCommand.decode() |
new byte[bodyLength] + byteBuffer.readBytes(bodyData) — Netty ByteBuf → heap byte[] |
Necessary (ByteBuf released after decode) |
| 2 |
MessageDecoder.decode() |
new byte[bodyLen] + byteBuffer.get(body) — extract per-message body from batch |
Necessary (split individual messages) |
| 3 |
GrpcConverter.buildMessage() |
ByteString.copyFrom(messageExt.getBody()) — body byte[] → new byte[] inside ByteString |
Unnecessary |
| 4 |
gRPC/protobuf serialization |
ByteString → network buffer |
Necessary |
Step 3 is redundant because:
- The
body byte[] was freshly allocated in MessageDecoder.decode() (line 508), sized exactly to bodyLen. It is not a slice of a larger buffer.
- After
GrpcConverter.buildMessage(), no code path mutates messageExt.getBody(). The MessageExt object is only used for reading receipt handles (in auto-renew) and is then eligible for GC.
ByteString is immutable by contract; UnsafeByteOperations.unsafeWrap() simply wraps the byte[] by reference into a LiteralByteString without copying, which is safe as long as the caller guarantees the byte[] won't be mutated — which holds here.
At high throughput (e.g., 100k msgs/s with 4KB body), this eliminates ~400MB/s of unnecessary heap allocation and System.arraycopy on the receive path.
Describe the Solution You'd Like
In proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java, change line 105:
// Before:
.setBody(ByteString.copyFrom(messageExt.getBody()))
// After:
.setBody(UnsafeByteOperations.unsafeWrap(messageExt.getBody()))
Add the import:
import com.google.protobuf.UnsafeByteOperations;
UnsafeByteOperations is a public API in protobuf-java (available since 3.x), already on the classpath. The change is a single line with no behavioral difference — the ByteString content is identical, only the internal copy is skipped.
Describe Alternatives You've Considered
ByteString.wrap(byte[]): This is a package-private method in protobuf-java and cannot be called from outside com.google.protobuf. UnsafeByteOperations.unsafeWrap() is the public equivalent that delegates to it.
- Keep
copyFrom and accept the overhead: At scale the unnecessary copy is measurable in both CPU (arraycopy) and GC pressure (short-lived byte[] allocations).
Additional Context
- Protobuf version: 3.20.1 (as used by RocketMQ).
UnsafeByteOperations.unsafeWrap(byte[]) internally calls ByteString.wrap(byte[]), which constructs a LiteralByteString with a direct reference to the passed byte[] — confirmed by bytecode inspection.
- This is the receive (consumer) path only. The send (producer) path has a symmetric issue (
ByteString.toByteArray() in SendMessageActivity.buildMessage()), which could be addressed in a separate issue.
Before Creating the Enhancement Request
Summary
In the Proxy receive-message path,
GrpcConverter.buildMessage()usesByteString.copyFrom(messageExt.getBody())to set the message body in the gRPC response.ByteString.copyFrom()allocates a newbyte[]and copies the entire body content viaSystem.arraycopy, even though the sourcebyte[]is a standalone allocation that is never mutated after this point. This can be replaced withUnsafeByteOperations.unsafeWrap()to wrap the existingbyte[]by reference (zero-copy).Motivation
On the Proxy receive path (pop message from Broker → gRPC response to client), the message body undergoes the following copy chain:
RemotingCommand.decode()new byte[bodyLength]+byteBuffer.readBytes(bodyData)— Netty ByteBuf → heap byte[]MessageDecoder.decode()new byte[bodyLen]+byteBuffer.get(body)— extract per-message body from batchGrpcConverter.buildMessage()ByteString.copyFrom(messageExt.getBody())— body byte[] → new byte[] inside ByteStringStep 3 is redundant because:
bodybyte[] was freshly allocated inMessageDecoder.decode()(line 508), sized exactly tobodyLen. It is not a slice of a larger buffer.GrpcConverter.buildMessage(), no code path mutatesmessageExt.getBody(). TheMessageExtobject is only used for reading receipt handles (in auto-renew) and is then eligible for GC.ByteStringis immutable by contract;UnsafeByteOperations.unsafeWrap()simply wraps the byte[] by reference into aLiteralByteStringwithout copying, which is safe as long as the caller guarantees the byte[] won't be mutated — which holds here.At high throughput (e.g., 100k msgs/s with 4KB body), this eliminates ~400MB/s of unnecessary heap allocation and
System.arraycopyon the receive path.Describe the Solution You'd Like
In
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java, change line 105:Add the import:
UnsafeByteOperationsis a public API in protobuf-java (available since 3.x), already on the classpath. The change is a single line with no behavioral difference — the ByteString content is identical, only the internal copy is skipped.Describe Alternatives You've Considered
ByteString.wrap(byte[]): This is a package-private method in protobuf-java and cannot be called from outsidecom.google.protobuf.UnsafeByteOperations.unsafeWrap()is the public equivalent that delegates to it.copyFromand accept the overhead: At scale the unnecessary copy is measurable in both CPU (arraycopy) and GC pressure (short-lived byte[] allocations).Additional Context
UnsafeByteOperations.unsafeWrap(byte[])internally callsByteString.wrap(byte[]), which constructs aLiteralByteStringwith a direct reference to the passed byte[] — confirmed by bytecode inspection.ByteString.toByteArray()inSendMessageActivity.buildMessage()), which could be addressed in a separate issue.