[ISSUE #10515] Fix TransactionMetricsFlushService busy spin bug#10517
[ISSUE #10515] Fix TransactionMetricsFlushService busy spin bug#10517wang-jiahua wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR reduces idle CPU usage in the transaction metrics flush loop and introduces key-string caching to cut down repeated string construction for consumer offsets and long-poll pull request keys.
Changes:
- Prevents a busy-spin loop in
TransactionMetricsFlushServiceby always blocking between flush checks. - Adds caching of
topic@groupkeys inConsumerOffsetManager. - Adds caching of
topic@queueIdkeys inPullRequestHoldService.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetricsFlushService.java | Blocks the service thread between flush attempts to avoid CPU spinning. |
| broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java | Introduces a cache for topic@group composite keys used across offset operations. |
| broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java | Introduces a cache for topic@queueId composite keys used in pull request holding. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| long interval = brokerController.getBrokerConfig().getTransactionMetricFlushInterval(); | ||
| this.waitForRunning(interval); | ||
| if (System.currentTimeMillis() - start > interval) { |
| private String buildTopicGroupKey(String topic, String group) { | ||
| ConcurrentHashMap<String, String> groupMap = topicGroupKeyCache.get(topic); |
| protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); | ||
| public static final String TOPIC_GROUP_SEPARATOR = "@"; | ||
|
|
||
| private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> topicGroupKeyCache = new ConcurrentHashMap<>(); |
| String key = topic + TOPIC_GROUP_SEPARATOR + group; | ||
| topicGroupKeyCache.computeIfAbsent(topic, t -> new ConcurrentHashMap<>()).put(group, key); |
| sb.append(TOPIC_QUEUEID_SEPARATOR); | ||
| sb.append(queueId); | ||
| return sb.toString(); | ||
| String[] keys = buildKeyCache.get(topic); |
| int len = Math.max(queueId + 1, 16); | ||
| keys = buildKeyCache.computeIfAbsent(topic, t -> new String[len]); | ||
| if (queueId >= keys.length) { | ||
| String[] grown = new String[queueId + 16]; |
| String key = topic + TOPIC_QUEUEID_SEPARATOR + queueId; | ||
| if (topic != null && queueId >= 0) { | ||
| int len = Math.max(queueId + 1, 16); | ||
| keys = buildKeyCache.computeIfAbsent(topic, t -> new String[len]); |
| buildKeyCache.put(topic, grown); | ||
| keys = grown; | ||
| } | ||
| keys[queueId] = key; |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## develop #10517 +/- ##
=============================================
- Coverage 48.06% 48.03% -0.04%
- Complexity 13313 13326 +13
=============================================
Files 1377 1377
Lines 100632 100721 +89
Branches 12995 13012 +17
=============================================
+ Hits 48369 48381 +12
- Misses 46344 46389 +45
- Partials 5919 5951 +32 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
This PR introduces key-string caching in ConsumerOffsetManager and PullRequestHoldService to reduce repeated string concatenation, and fixes a busy-spin bug in TransactionMetricsFlushService. The busy-spin fix is a clear correctness improvement. The caching optimizations have valid intent but introduce thread-safety and NPE regression risks that need to be addressed before merge.
Findings
-
[Critical]
PullRequestHoldService.java:63-83— Thread-safety race inString[]cache. ThebuildKeyCachestores mutableString[]arrays. Two concurrent threads can both seekeys == nullfromcomputeIfAbsent, each create a different array, and one overwrites the other viaput(). Worse, the array growth path (new String[queueId + 16]+System.arraycopy+put) is not atomic — a reader thread can observe a partially-grown array. SincePullRequestHoldServiceis accessed by multiple pull-request handler threads concurrently, this is a real data-race. Recommendation: ReplaceString[]withConcurrentHashMap<Integer, String>per topic, or useAtomicReferenceArraywith a safe publication pattern. -
[Critical]
ConsumerOffsetManager.java:49andPullRequestHoldService.java:63— NPE regression with null keys. BothConcurrentHashMap.get()andConcurrentMap.get()throwNullPointerExceptionon null keys. The original code (topic + "@" + groupandStringBuilder.append(topic)) silently produced strings like"null@group". If any caller passes null topic/group (e.g., during error paths or uninitialized state), this becomes a runtime crash. Recommendation: Add explicit null guards that bypass the cache and fall back to concatenation, or validate at the call boundary. -
[Warning]
PullRequestHoldService.java:75— Integer overflow in array sizing.queueId + 16can overflow for largequeueIdvalues, producing a negative array size. While queue IDs are typically small, a malformed request or misconfiguration could triggerNegativeArraySizeException. Recommendation: Cap the array growth at a reasonable maximum (e.g.,Math.min(queueId + 16, 1024)) and fall back to a map-based cache beyond that. -
[Warning]
ConsumerOffsetManager.java:46— Unbounded cache growth.topicGroupKeyCacheretains every(topic, group)pair forever. On brokers serving many ephemeral topics or short-lived consumer groups, this becomes a slow memory leak. Each entry is ~100 bytes (two String objects + CHM node), so 1M unique pairs ≈ 100MB. Recommendation: Consider a bounded cache (Caffeine with max size) or periodic cleanup aligned with topic/group lifecycle. -
[Info]
TransactionMetricsFlushService.java:47-52— Busy-spin fix is correct and well-targeted. MovingwaitForRunning(interval)before theifcheck ensures the thread always yields between iterations, eliminating the CPU waste described in the issue. The existing>vs>=timing edge case (noted by Copilot) is minor — flush cadence drift of one interval is acceptable for a metrics flush. -
[Info]
ConsumerOffsetManager.java— The 2-levelConcurrentHashMap<String, ConcurrentHashMap<String, String>>design is sound for the common case where topic cardinality is moderate and group cardinality per topic is stable. ThecomputeIfAbsentatomicity guarantee on the outer map is correct. The main concern is the unbounded growth noted above.
Suggestions
- Priority fix: Address the NPE regression — this is a behavioral change that can crash brokers on null inputs.
- Priority fix: Replace
String[]inPullRequestHoldServicewith a thread-safe alternative. - Consider: Adding a cache size cap or using Caffeine for both caches to prevent unbounded growth.
- Nit: The PR title mentions three changes — consider splitting the
TransactionMetricsFlushServicebusy-spin fix into a separate PR since it's a standalone correctness fix that doesn't need to wait for the caching discussion.
Cross-repo Note
No protocol or API changes — this is purely internal broker optimization. No coordination needed with rocketmq-clients.
Automated review by github-manager-bot
|
1 和 2 不建议修,没意义,3 修一下就行了 |
0cec077 to
3b695dc
Compare
Summary
Fix a busy-spin bug in
TransactionMetricsFlushService.run(). ThewaitForRunning()call was only inside theifbranch, so when the flush interval had not elapsed thewhileloop spun without yielding — wasting ~170 CPU samples on an idle broker in JFR.Fix
Move
waitForRunning(interval)before theifcheck so the thread always sleeps between iterations:Files Changed
broker/.../transaction/TransactionMetricsFlushService.javawaitForRunning()moved beforeifbranch