fix MempoolFilter returning pending transactions#7109
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
Walkthrough
ChangesMempool broadcast-driven pending transaction filter
Sequence Diagram(s)sequenceDiagram
participant Client
participant RPC as Eth RPC
participant EthEventHandler
participant MempoolFilterManager
participant MempoolFilter
participant MessagePool
Note over EthEventHandler,MessagePool: Service startup
MessagePool->>EthEventHandler: subscribe_to_updates() → MpoolSubscriber
EthEventHandler->>MempoolFilterManager: new(max_results, mpool_subscriber)
Note over Client,MempoolFilter: eth_newPendingTransactionFilter
Client->>RPC: eth_newPendingTransactionFilter
RPC->>MempoolFilterManager: install()
MempoolFilterManager->>MempoolFilter: new(max_results, subscriber())
Note over Client,MempoolFilter: eth_getFilterChanges (pending txs)
MessagePool->>MempoolFilter: broadcast MpoolUpdate::Add(msg)
Client->>RPC: eth_getFilterChanges(filter_id)
RPC->>MempoolFilter: drain(chain_id)
MempoolFilter->>MempoolFilter: try_recv() → IndexSet<EthHash>
MempoolFilter-->>RPC: Vec<EthHash>
RPC-->>Client: EthFilterResult::Hashes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/rpc/methods/eth.rs (1)
1106-1120: ⚡ Quick winConsolidate the signed-message hash derivation into one helper.
eth_tx_hash_from_signed_messagebelow still reimplements the same delegated/secp/BLS branches, so this extraction can drift from the event/log path. Please make one helper delegate to the other and keep the hash rules in one place.♻️ Minimal consolidation
fn eth_tx_hash_from_signed_message( message: &SignedMessage, eth_chain_id: EthChainIdType, ) -> anyhow::Result<EthHash> { - if message.is_delegated() { - let (_, tx) = eth_tx_from_signed_eth_message(message, eth_chain_id)?; - Ok(tx.eth_hash()?.into()) - } else if message.is_secp256k1() { - Ok(message.cid().into()) - } else { - Ok(message.message().cid().into()) - } + eth_hash_from_signed_message(message, eth_chain_id) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/rpc/methods/eth.rs` around lines 1106 - 1120, The logic that derives an Ethereum-style hash from a SignedMessage has been factored into eth_hash_from_signed_message but eth_tx_hash_from_signed_message still duplicates the delegated/secp256k1/BLS branching; consolidate by making eth_tx_hash_from_signed_message call eth_hash_from_signed_message (or vice‑versa) so the hash rules live in one place. Update eth_tx_hash_from_signed_message to remove its own branching and delegate to eth_hash_from_signed_message (passing the same SignedMessage and chain_id), ensuring all callers use the single helper to avoid drift between paths.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/rpc/methods/eth/filter/mempool.rs`:
- Line 67: The current drain uses
pending.into_iter().take(self.max_results).collect() which returns an empty vec
when self.max_results == 0; change the logic so a zero max_results means “no
cap”: check self.max_results and if it is 0 collect the full pending.into_iter()
otherwise collect after take(self.max_results). Use the existing pending
iterator and self.max_results symbol to implement the conditional branch so the
returned collection contains all items when max_results == 0.
In `@src/tool/subcommands/api_cmd/stateful_tests.rs`:
- Around line 300-311: The mempool polling in wait_in_mempool has a very short
timeout (retries = 100 with 10ms sleeps); increase the retry budget or
per-iteration sleep to reduce flakiness in CI by changing the retries or
Duration::from_millis call in the wait_in_mempool function (e.g., bump retries
to a larger value like 1000 or increase the sleep to 50–100ms) so the loop has a
longer total wait before the ensure! triggers; update the retries initialization
and/or the tokio::time::sleep(Duration::from_millis(...)).
---
Nitpick comments:
In `@src/rpc/methods/eth.rs`:
- Around line 1106-1120: The logic that derives an Ethereum-style hash from a
SignedMessage has been factored into eth_hash_from_signed_message but
eth_tx_hash_from_signed_message still duplicates the delegated/secp256k1/BLS
branching; consolidate by making eth_tx_hash_from_signed_message call
eth_hash_from_signed_message (or vice‑versa) so the hash rules live in one
place. Update eth_tx_hash_from_signed_message to remove its own branching and
delegate to eth_hash_from_signed_message (passing the same SignedMessage and
chain_id), ensuring all callers use the single helper to avoid drift between
paths.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 15f882bf-4135-452b-b120-7f6d544dc8b8
📒 Files selected for processing (8)
src/daemon/mod.rssrc/message_pool/msgpool/msg_pool.rssrc/message_pool/msgpool/pending_store.rssrc/rpc/methods/eth.rssrc/rpc/methods/eth/filter/mempool.rssrc/rpc/methods/eth/filter/mod.rssrc/tool/offline_server/server.rssrc/tool/subcommands/api_cmd/stateful_tests.rs
| } | ||
| } | ||
| } | ||
| pending.into_iter().take(self.max_results).collect() |
There was a problem hiding this comment.
Handle max_results == 0 as “no cap” in drain output.
Line 67 currently truncates with take(0), which returns an empty vector when max_results is zero; this conflicts with the filter subsystem’s “0 disables cap” convention.
Proposed fix
- pending.into_iter().take(self.max_results).collect()
+ if self.max_results == 0 {
+ return pending.into_iter().collect();
+ }
+ pending.into_iter().take(self.max_results).collect()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pending.into_iter().take(self.max_results).collect() | |
| if self.max_results == 0 { | |
| return pending.into_iter().collect(); | |
| } | |
| pending.into_iter().take(self.max_results).collect() |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/rpc/methods/eth/filter/mempool.rs` at line 67, The current drain uses
pending.into_iter().take(self.max_results).collect() which returns an empty vec
when self.max_results == 0; change the logic so a zero max_results means “no
cap”: check self.max_results and if it is 0 collect the full pending.into_iter()
otherwise collect after take(self.max_results). Use the existing pending
iterator and self.max_results symbol to implement the conditional branch so the
returned collection contains all items when max_results == 0.
0f8af7d to
4f45897
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files
... and 19 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
|
@Fraccaman CI linter seems to be failing please fix it, you can use Also all the commits needs to signed, so please do that as well. |
53c3804 to
e39aa6f
Compare
|
@akaladarshi I signed the commits and ran |
Hey @Fraccaman, Sorry for delayed response. Also you need to first run the |
e39aa6f to
5464537
Compare
|
I rebased but |
akaladarshi
left a comment
There was a problem hiding this comment.
We also need to add tests for the EthGetFilterChanges where we call it multiple times and see if each call returns the new pending transaction since the last poll.
|
|
||
| /// Clone of the [`MpoolUpdate`] broadcast sender. New independent | ||
| /// receivers can be derived by calling `subscribe()` on the clone. | ||
| pub(in crate::message_pool) fn event_sender(&self) -> broadcast::Sender<MpoolUpdate> { |
There was a problem hiding this comment.
We made the events private for a specific reason, events should only be sent by the message pool module and every other module will read the data through subscribe method receiver.
This breaks the encapsulation.
|
|
||
| /// Clone of the [`MpoolUpdate`] sender. Each call to `subscribe()` on | ||
| /// the returned sender yields an independent receiver. | ||
| pub fn mpool_event_sender(&self) -> broadcast::Sender<MpoolUpdate> { |
There was a problem hiding this comment.
Same here this breaks the encapsulation, event sender is send only should not shared.
| } | ||
|
|
||
| /// Derive the Ethereum-style hash for a `SignedMessage`. | ||
| pub fn eth_hash_from_signed_message( |
There was a problem hiding this comment.
We already have similar fn eth_tx_hash_from_signed_message, please reuse it.
| Ok(MpoolUpdate::Remove(m)) => { | ||
| if let Some(h) = hash_or_log(&m, chain_id) { | ||
| pending.shift_remove(&h); |
There was a problem hiding this comment.
What is the actual purpose doing this ?
There was a problem hiding this comment.
as far as I understand ::Remove is triggered when the mempool drops a pending tx (i.e in this case only when a tx is mined). The cancel in drain removes the hash that would otherwise surface as still-pending
|
hey @akaladarshi im on vacation, ill fix this as soon as im back |
|
Hi @Fraccaman |
…er poll Adds eth_new_pending_transaction_filter_multi_poll: installs a pending-tx filter, submits tx A, polls (asserts hash A returned), submits tx B, polls (asserts hash B returned and hash A absent). Confirms each eth_getFilterChanges call surfaces only the delta since the previous poll, addressing review feedback on PR ChainSafe#7109.
Hey @akaladarshi sorry for the delay, I got back from vacation this week and got busy. Fixed your code review suggestions! thanks |
- Encapsulation: drop the public Sender accessor on PendingStore / MessagePool. MempoolFilterManager now holds an `MpoolSubscriber` closure (`Arc<dyn Fn() -> broadcast::Receiver<MpoolUpdate>>`); each install() calls it to obtain a fresh independent receiver via `MessagePool::subscribe_to_updates()`. Filter layer never touches the send side. Daemon and offline server wrap a shallow-cloned MessagePool in the closure; `EthEventHandler::new()` builds a dummy closure for standalone contexts. - Helper reuse: drop the new `eth_hash_from_signed_message` helper and reuse the existing `eth_tx_hash_from_signed_message`. Both `EthGetTransactionHashByCid::run` and `MempoolFilter::drain` now call the single implementation. - Docs: document why `MpoolUpdate::Remove` is processed inside `drain` (cancels a buffered `Add` for a tx that left the mempool between client polls, e.g. mined into a tipset). Adds two unit tests for the new manager wiring: manager_subscribes_each_filter_to_independent_receiver and manager_with_dummy_subscriber_yields_empty.
…er poll Adds eth_new_pending_transaction_filter_multi_poll: installs a pending-tx filter, submits tx A, polls (asserts hash A returned), submits tx B, polls (asserts hash B returned and hash A absent). Confirms each eth_getFilterChanges call surfaces only the delta since the previous poll, addressing review feedback on PR ChainSafe#7109.
2594346 to
0b66f95
Compare
Summary of changes
MempoolFilter was returning hashes of already-mined transactions instead of pending ones. This PR fixes it by subscribing each mempool filter to the mempool's Add/Remove broadcast channel, so it reports only transactions still in the pool.
Changes introduced in this pull request:
MempoolFilterManager::installnow create a Receiver oninstalldraintoMempoolFilterto add/remove pending tx hashesdraininEthGetFilterChanges::handlemethodeth_new_pending_transaction_filtertest + minor unitReference issue to close (if applicable)
Closes #7091
Other information and links
Change checklist
Outside contributions
Summary by CodeRabbit
Bug Fixes
Tests