From 1879e1a36d2cbc39e5365e8c1dc55b2a8d4b0405 Mon Sep 17 00:00:00 2001 From: Gianmarco Fraccaroli Date: Tue, 26 May 2026 23:05:46 +0200 Subject: [PATCH 1/6] refactor(eth-rpc): extract eth_hash_from_signed_message helper --- src/rpc/methods/eth.rs | 91 ++++++++++-------------------------------- 1 file changed, 20 insertions(+), 71 deletions(-) diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 9de7441c4ca..4645108fd52 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -1103,6 +1103,22 @@ pub fn eth_tx_from_signed_eth_message( Ok((from, tx)) } +/// Derive the Ethereum-style hash for a `SignedMessage`. +pub fn eth_hash_from_signed_message( + smsg: &SignedMessage, + chain_id: EthChainIdType, +) -> Result { + let hash = if smsg.is_delegated() { + let (_, tx) = eth_tx_from_signed_eth_message(smsg, chain_id)?; + tx.eth_hash()?.into() + } else if smsg.is_secp256k1() { + smsg.cid().into() + } else { + smsg.message().cid().into() + }; + Ok(hash) +} + /// See /// for ABI specification fn encode_filecoin_params_as_abi( @@ -2608,15 +2624,7 @@ impl EthGetTransactionHashByCid { if let Ok(smsgs) = smsgs_result && let Some(smsg) = smsgs.first() { - let hash = if smsg.is_delegated() { - let (_, tx) = eth_tx_from_signed_eth_message(smsg, eth_chain_id)?; - tx.eth_hash()?.into() - } else if smsg.is_secp256k1() { - smsg.cid().into() - } else { - smsg.message().cid().into() - }; - return Ok(Some(hash)); + return Ok(Some(eth_hash_from_signed_message(smsg, eth_chain_id)?)); } let msg_result = crate::chain::get_chain_message(db, &cid); @@ -3151,29 +3159,6 @@ fn eth_filter_logs_from_tipsets(events: &[CollectedEvent]) -> anyhow::Result anyhow::Result> { - events - .iter() - .filter_map(|event| { - match eth_tx_hash_from_message_cid( - ctx.db(), - &event.msg_cid, - ctx.state_manager.chain_config().eth_chain_id, - ) { - Ok(Some(hash)) => Some(Ok(hash)), - Ok(None) => { - tracing::warn!("Ignoring event"); - None - } - Err(err) => Some(Err(err)), - } - }) - .collect() -} - fn eth_filter_logs_from_events( ctx: &Ctx, events: &[CollectedEvent], @@ -3256,15 +3241,6 @@ fn eth_filter_result_from_tipsets(events: &[CollectedEvent]) -> anyhow::Result anyhow::Result { - Ok(EthFilterResult::Hashes(eth_filter_logs_from_messages( - ctx, events, - )?)) -} - pub enum EthGetLogs {} impl RpcMethod<1> for EthGetLogs { const NAME: &'static str = "Filecoin.EthGetLogs"; @@ -3428,36 +3404,9 @@ impl RpcMethod<1> for EthGetFilterChanges { return Ok(eth_filter_result_from_tipsets(&events)?); } if let Some(mempool_filter) = filter.as_any().downcast_ref::() { - let events = ctx - .eth_event_handler - .get_events_for_parsed_filter( - &ctx, - &Arc::new(ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range( - // heaviest tipset doesn't have events because its messages haven't been executed yet - RangeInclusive::new( - mempool_filter - .collected - .unwrap_or(ctx.chain_store().heaviest_tipset().epoch() - 1), - // Use -1 to indicate that the range extends until the latest available tipset. - -1, - ), - ))), - SkipEvent::OnUnresolvedAddress, - ) - .await?; - let new_collected = events - .iter() - .max_by_key(|event| event.height) - .map(|e| e.height); - if let Some(height) = new_collected { - let filter = Arc::new(MempoolFilter { - id: mempool_filter.id.clone(), - max_results: mempool_filter.max_results, - collected: Some(height), - }); - store.update(filter); - } - return Ok(eth_filter_result_from_messages(&ctx, &events)?); + let chain_id = ctx.chain_config().eth_chain_id; + let hashes = mempool_filter.drain(chain_id); + return Ok(EthFilterResult::Hashes(hashes)); } } Err(anyhow::anyhow!("method not supported").into()) From dee548339d69c8484a8caf50d971590ace7f43b0 Mon Sep 17 00:00:00 2001 From: Gianmarco Fraccaroli Date: Tue, 26 May 2026 23:05:56 +0200 Subject: [PATCH 2/6] feat(mempool): expose MpoolUpdate broadcast sender --- src/message_pool/msgpool/msg_pool.rs | 6 ++++++ src/message_pool/msgpool/pending_store.rs | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 1780b3b8742..3cbf271bd54 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -434,6 +434,12 @@ where self.pending.subscribe() } + /// Clone of the [`MpoolUpdate`] sender. Each call to `subscribe()` on + /// the returned sender yields an independent receiver. + pub fn mpool_event_sender(&self) -> broadcast::Sender { + self.pending.event_sender() + } + /// Return Vector of signed messages given a block header for self. pub fn messages_for_blocks<'a>( &self, diff --git a/src/message_pool/msgpool/pending_store.rs b/src/message_pool/msgpool/pending_store.rs index 0c49f18357a..c7ead55ab1a 100644 --- a/src/message_pool/msgpool/pending_store.rs +++ b/src/message_pool/msgpool/pending_store.rs @@ -124,6 +124,12 @@ impl PendingStore { pub fn subscribe(&self) -> broadcast::Receiver { self.inner.events.subscribe() } + + /// Clone of the [`MpoolUpdate`] broadcast sender. New independent + /// receivers can be derived by calling `subscribe()` on the clone. + pub(in crate::message_pool) fn event_sender(&self) -> broadcast::Sender { + self.inner.events.clone() + } } #[cfg(test)] From 306cdd4512b94a7195177a7e293332f83aa68878 Mon Sep 17 00:00:00 2001 From: Gianmarco Fraccaroli Date: Tue, 26 May 2026 23:06:00 +0200 Subject: [PATCH 3/6] fix(eth-rpc): MempoolFilter returns pending mempool txs, not chain events --- src/daemon/mod.rs | 5 +- src/rpc/methods/eth/filter/mempool.rs | 187 +++++++++++++++++++------- src/rpc/methods/eth/filter/mod.rs | 20 ++- src/tool/offline_server/server.rs | 6 +- 4 files changed, 161 insertions(+), 57 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index ef3b91ecb5c..a0edd426945 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -564,7 +564,10 @@ fn maybe_start_rpc_service( .map(|path| crate::rpc::FilterList::new_from_file(path)) .transpose()?; info!("JSON-RPC endpoint will listen at {rpc_address}"); - let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events)); + let eth_event_handler = Arc::new(EthEventHandler::from_config( + &config.events, + mpool.mpool_event_sender(), + )); if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") { warn!( "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments" diff --git a/src/rpc/methods/eth/filter/mempool.rs b/src/rpc/methods/eth/filter/mempool.rs index 1c2e11c75d2..ff767dc815d 100644 --- a/src/rpc/methods/eth/filter/mempool.rs +++ b/src/rpc/methods/eth/filter/mempool.rs @@ -1,36 +1,71 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use crate::eth::EthChainId as EthChainIdType; +use crate::message::SignedMessage; +use crate::message_pool::MpoolUpdate; use crate::rpc::Arc; +use crate::rpc::eth::eth_hash_from_signed_message; +use crate::rpc::eth::types::EthHash; use crate::rpc::eth::{FilterID, filter::Filter, filter::FilterManager}; -use crate::shim::fvm_shared_latest::clock::ChainEpoch; use ahash::AHashMap as HashMap; use anyhow::{Context, Result}; -use parking_lot::RwLock; +use indexmap::IndexSet; +use parking_lot::{Mutex, RwLock}; use std::any::Any; +use tokio::sync::broadcast; -/// Data structure for filtering and collecting pending transactions -/// from the mempool before they are confirmed in a block. -#[allow(dead_code)] -#[derive(Debug, PartialEq)] +/// Filter backing `eth_newPendingTransactionFilter`. Each instance owns its +/// own `broadcast::Receiver`. +#[derive(Debug)] pub struct MempoolFilter { // Unique id used to identify the filter pub id: FilterID, // Maximum number of results to collect pub max_results: usize, - // Epoch at which the results were collected - pub collected: Option, + // Receiver for mempool updates + rx: Mutex>, } impl MempoolFilter { - pub fn new(max_results: usize) -> Result, uuid::Error> { - let id = FilterID::new()?; + pub fn new( + max_results: usize, + rx: broadcast::Receiver, + ) -> Result, uuid::Error> { Ok(Arc::new(Self { - id, + id: FilterID::new()?, max_results, - collected: None, + rx: Mutex::new(rx), })) } + + /// Drain queued mempool updates and return the resulting set of pending + /// tx hashes (`Add` minus subsequent `Remove`), capped at `max_results`. + pub fn drain(&self, chain_id: EthChainIdType) -> Vec { + use broadcast::error::TryRecvError; + + let mut rx = self.rx.lock(); + let mut pending: IndexSet = IndexSet::new(); + loop { + match rx.try_recv() { + Ok(MpoolUpdate::Add(m)) => { + if let Some(h) = hash_or_log(&m, chain_id) { + pending.insert(h); + } + } + Ok(MpoolUpdate::Remove(m)) => { + if let Some(h) = hash_or_log(&m, chain_id) { + pending.shift_remove(&h); + } + } + Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => break, + Err(TryRecvError::Lagged(n)) => { + tracing::warn!("mempool filter lagged, dropped {n} events"); + } + } + } + pending.into_iter().take(self.max_results).collect() + } } impl Filter for MempoolFilter { @@ -43,76 +78,126 @@ impl Filter for MempoolFilter { } } -/// `MempoolFilterManager` uses a `RwLock` to handle concurrent access to a collection of `MempoolFilter` -/// instances, each identified by a `FilterID`. The number of results returned by the filters is capped by `max_filter_results`. +fn hash_or_log(msg: &SignedMessage, chain_id: EthChainIdType) -> Option { + match eth_hash_from_signed_message(msg, chain_id) { + Ok(h) => Some(h), + Err(e) => { + tracing::debug!("mempool filter: dropping message, hash error: {e}"); + None + } + } +} + +/// Manages installed `MempoolFilter`s. Each `install` derives a fresh +/// `broadcast::Receiver` from the shared sender. Contexts without a real +/// `MessagePool` (tests, snapshot tools, offline server) pass a dummy sender +/// whose receivers always yield `Empty`. #[derive(Debug)] pub struct MempoolFilterManager { filters: RwLock>>, max_filter_results: usize, + mpool_event_sender: broadcast::Sender, } impl MempoolFilterManager { - pub fn new(max_filter_results: usize) -> Arc { + pub fn new( + max_filter_results: usize, + mpool_event_sender: broadcast::Sender, + ) -> Arc { Arc::new(Self { filters: RwLock::new(HashMap::new()), max_filter_results, + mpool_event_sender, }) } } impl FilterManager for MempoolFilterManager { fn install(&self) -> Result> { - let filter = MempoolFilter::new(self.max_filter_results) + let rx = self.mpool_event_sender.subscribe(); + let filter = MempoolFilter::new(self.max_filter_results, rx) .context("Failed to create a new mempool filter")?; - let id = filter.id().clone(); - - self.filters.write().insert(id, filter.clone()); - + self.filters.write().insert(filter.id().clone(), filter.clone()); Ok(filter) } fn remove(&self, id: &FilterID) -> Option> { - let mut filters = self.filters.write(); - filters.remove(id) + self.filters.write().remove(id) } } #[cfg(test)] mod tests { use super::*; + use crate::shim::address::Address; + use crate::shim::econ::TokenAmount; + use crate::shim::message::Message as ShimMessage; + + const TEST_CHAIN_ID: EthChainIdType = 314; + + fn make_smsg(seq: u64) -> SignedMessage { + SignedMessage::mock_bls_signed_message(ShimMessage { + from: Address::new_id(1), + to: Address::new_id(2), + sequence: seq, + gas_premium: TokenAmount::from_atto(100u64), + gas_limit: 1_000_000, + ..ShimMessage::default() + }) + } + + fn hash_of(seq: u64) -> EthHash { + eth_hash_from_signed_message(&make_smsg(seq), TEST_CHAIN_ID).unwrap() + } + + fn dummy_sender() -> broadcast::Sender { + let (tx, _) = broadcast::channel(1); + tx + } + + #[test] + fn drain_returns_empty_when_no_events() { + let tx = dummy_sender(); + let filter = MempoolFilter::new(10, tx.subscribe()).unwrap(); + assert!(filter.drain(TEST_CHAIN_ID).is_empty()); + } + + #[test] + fn drain_add_remove_cancel_within_window() { + let (tx, _) = broadcast::channel::(16); + let filter = MempoolFilter::new(10, tx.subscribe()).unwrap(); + + tx.send(MpoolUpdate::Add(make_smsg(0))).unwrap(); + tx.send(MpoolUpdate::Add(make_smsg(1))).unwrap(); + tx.send(MpoolUpdate::Remove(make_smsg(0))).unwrap(); + tx.send(MpoolUpdate::Add(make_smsg(2))).unwrap(); + + let hashes = filter.drain(TEST_CHAIN_ID); + assert!(!hashes.contains(&hash_of(0)), "Add+Remove should cancel"); + assert!(hashes.contains(&hash_of(1))); + assert!(hashes.contains(&hash_of(2))); + assert!(filter.drain(TEST_CHAIN_ID).is_empty(), "second drain empty"); + } #[test] - fn test_mempool_filter() { - // Test case 1: Create a mempool filter - let max_results = 10; - let filter = MempoolFilter::new(max_results).expect("Failed to create mempool filter"); - assert_eq!(filter.max_results, max_results); - - // Test case 2: Create a mempool filter manager and install the mempool filter - let mempool_manager = MempoolFilterManager::new(max_results); - let installed_filter = mempool_manager - .install() - .expect("Failed to install mempool filter"); - - // Verify that the filter has been added to the mempool manager - { - let filters = mempool_manager.filters.read(); - assert!(filters.contains_key(installed_filter.id())); + fn drain_truncates_to_max_results() { + let (tx, _) = broadcast::channel::(64); + let filter = MempoolFilter::new(2, tx.subscribe()).unwrap(); + for seq in 0..5u64 { + tx.send(MpoolUpdate::Add(make_smsg(seq))).unwrap(); } + assert_eq!(filter.drain(TEST_CHAIN_ID).len(), 2); + } - // Test case 3: Remove the installed mempool filter - let filter_id = installed_filter.id().clone(); - let removed = mempool_manager.remove(&filter_id); - assert_eq!( - removed.map(|f| f.id().clone()), - Some(installed_filter.id().clone()), - "Filter should be successfully removed" - ); - - // Verify that the filter is no longer in the mempool manager - { - let filters = mempool_manager.filters.read(); - assert!(!filters.contains_key(&filter_id)); + #[test] + fn drain_handles_lag_and_returns_remaining() { + let (tx, _) = broadcast::channel::(4); + let filter = MempoolFilter::new(100, tx.subscribe()).unwrap(); + for seq in 0..10u64 { + tx.send(MpoolUpdate::Add(make_smsg(seq))).unwrap(); } + // Buffer was 4; receiver lagged. Drain returns the remaining buffered + // events without panicking. + assert!(!filter.drain(TEST_CHAIN_ID).is_empty()); } } diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index ce7fed6e4db..a30c27ac738 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -132,11 +132,20 @@ pub enum SkipEvent { impl EthEventHandler { pub fn new() -> Self { - let config = EventsConfig::default(); - Self::from_config(&config) + // Standalone handler with no live mempool: subscribers see an empty + // stream forever. Used in tests, snapshot tools, and other contexts + // where no `MessagePool` is available. + let (dummy, _) = tokio::sync::broadcast::channel(1); + Self::from_config(&EventsConfig::default(), dummy) } - pub fn from_config(config: &EventsConfig) -> Self { + /// Build a handler from `config`. Each `MempoolFilter` installed via the + /// returned handler subscribes to `mpool_event_sender` for pending-tx + /// updates. + pub fn from_config( + config: &EventsConfig, + mpool_event_sender: tokio::sync::broadcast::Sender, + ) -> Self { let max_filters: usize = env_or_default("FOREST_MAX_FILTERS", 100); let max_filter_results = std::env::var("FOREST_MAX_FILTER_RESULTS") .ok() @@ -162,7 +171,10 @@ impl EthEventHandler { Some(MemFilterStore::new(max_filters) as Arc); let event_filter_manager = Some(EventFilterManager::new(max_filter_results)); let tipset_filter_manager = Some(TipSetFilterManager::new(max_filter_results)); - let mempool_filter_manager = Some(MempoolFilterManager::new(max_filter_results)); + let mempool_filter_manager = Some(MempoolFilterManager::new( + max_filter_results, + mpool_event_sender, + )); Self { filter_store, diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index b315025dc00..3ea46bbf878 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -96,6 +96,10 @@ pub async fn offline_rpc_state( let sync_network_context = SyncNetworkContext::new(network_send, peer_manager, state_manager.db_owned()); let nonce_tracker = NonceTracker::new(); + let eth_event_handler = Arc::new(EthEventHandler::from_config( + &events_config, + message_pool.mpool_event_sender(), + )); Ok(( RPCState { state_manager, @@ -103,7 +107,7 @@ pub async fn offline_rpc_state( mpool: message_pool, bad_blocks: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), - eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), + eth_event_handler, sync_network_context, start_time: chrono::Utc::now(), shutdown, From 546453716f67489d38a29c333dfd0834801b3d95 Mon Sep 17 00:00:00 2001 From: Gianmarco Fraccaroli Date: Tue, 26 May 2026 23:06:04 +0200 Subject: [PATCH 4/6] test(api): tighten eth_newPendingTransactionFilter to poll pre-execution --- src/rpc/methods/eth/filter/mempool.rs | 4 +++- src/tool/subcommands/api_cmd/stateful_tests.rs | 16 +++++----------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/rpc/methods/eth/filter/mempool.rs b/src/rpc/methods/eth/filter/mempool.rs index ff767dc815d..1e9d0b64358 100644 --- a/src/rpc/methods/eth/filter/mempool.rs +++ b/src/rpc/methods/eth/filter/mempool.rs @@ -117,7 +117,9 @@ impl FilterManager for MempoolFilterManager { let rx = self.mpool_event_sender.subscribe(); let filter = MempoolFilter::new(self.max_filter_results, rx) .context("Failed to create a new mempool filter")?; - self.filters.write().insert(filter.id().clone(), filter.clone()); + self.filters + .write() + .insert(filter.id().clone(), filter.clone()); Ok(filter) } diff --git a/src/tool/subcommands/api_cmd/stateful_tests.rs b/src/tool/subcommands/api_cmd/stateful_tests.rs index 8df22c54264..dc959f421a4 100644 --- a/src/tool/subcommands/api_cmd/stateful_tests.rs +++ b/src/tool/subcommands/api_cmd/stateful_tests.rs @@ -308,26 +308,19 @@ async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> { unreachable!("loop always returns within the branches above") } -async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { - let tipset = client.call(ChainHead::request(())?).await?; +/// Poll `MpoolPending` until `message_cid` is visible. Does not wait for +/// execution. +async fn wait_in_mempool(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { let mut retries = 100; loop { let pending = client .call(MpoolPending::request((ApiTipsetKey(None),))?) .await?; - if pending.0.iter().any(|msg| msg.cid() == message_cid) { - client - .call( - StateWaitMsg::request((message_cid, 1, tipset.epoch(), true))? - .with_timeout(Duration::from_secs(300)), - ) - .await?; break Ok(()); } ensure!(retries != 0, "Message not found in mpool"); retries -= 1; - tokio::time::sleep(Duration::from_millis(10)).await; } } @@ -796,7 +789,8 @@ fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario { .await? .context("no Eth transaction hash for CID")?; - wait_pending_message(&client, cid).await?; + // Observe mempool state *before* the message is mined. + wait_in_mempool(&client, cid).await?; let filter_result = client .call(EthGetFilterChanges::request((filter_id.clone(),))?) From ca9e1af6ba5240cdc65d59c0c333432965933f44 Mon Sep 17 00:00:00 2001 From: Gianmarco Fraccaroli Date: Sun, 21 Jun 2026 22:04:20 +0200 Subject: [PATCH 5/6] refactor(eth-rpc): address review feedback on MempoolFilter - Encapsulation: drop the public Sender accessor on PendingStore / MessagePool. MempoolFilterManager now holds an `MpoolSubscriber` closure (`Arc broadcast::Receiver>`); each install() calls it to obtain a fresh independent receiver via `MessagePool::subscribe_to_updates()`. Filter layer never touches the send side. Daemon and offline server wrap a shallow-cloned MessagePool in the closure; `EthEventHandler::new()` builds a dummy closure for standalone contexts. - Helper reuse: drop the new `eth_hash_from_signed_message` helper and reuse the existing `eth_tx_hash_from_signed_message`. Both `EthGetTransactionHashByCid::run` and `MempoolFilter::drain` now call the single implementation. - Docs: document why `MpoolUpdate::Remove` is processed inside `drain` (cancels a buffered `Add` for a tx that left the mempool between client polls, e.g. mined into a tipset). Adds two unit tests for the new manager wiring: manager_subscribes_each_filter_to_independent_receiver and manager_with_dummy_subscriber_yields_empty. --- src/daemon/mod.rs | 11 +- src/message_pool/msgpool/msg_pool.rs | 6 - src/message_pool/msgpool/pending_store.rs | 6 - src/rpc/methods/eth.rs | 18 +-- src/rpc/methods/eth/filter/mempool.rs | 135 ++++++++++++++++++---- src/rpc/methods/eth/filter/mod.rs | 14 +-- src/tool/offline_server/server.rs | 11 +- 7 files changed, 134 insertions(+), 67 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index a0edd426945..3fc070a9238 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -564,10 +564,13 @@ fn maybe_start_rpc_service( .map(|path| crate::rpc::FilterList::new_from_file(path)) .transpose()?; info!("JSON-RPC endpoint will listen at {rpc_address}"); - let eth_event_handler = Arc::new(EthEventHandler::from_config( - &config.events, - mpool.mpool_event_sender(), - )); + let eth_event_handler = { + let mp = mpool.shallow_clone(); + let subscriber = crate::rpc::eth::filter::mempool::MpoolSubscriber::new(move || { + mp.subscribe_to_updates() + }); + Arc::new(EthEventHandler::from_config(&config.events, subscriber)) + }; if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") { warn!( "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments" diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 3cbf271bd54..1780b3b8742 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -434,12 +434,6 @@ where self.pending.subscribe() } - /// Clone of the [`MpoolUpdate`] sender. Each call to `subscribe()` on - /// the returned sender yields an independent receiver. - pub fn mpool_event_sender(&self) -> broadcast::Sender { - self.pending.event_sender() - } - /// Return Vector of signed messages given a block header for self. pub fn messages_for_blocks<'a>( &self, diff --git a/src/message_pool/msgpool/pending_store.rs b/src/message_pool/msgpool/pending_store.rs index c7ead55ab1a..0c49f18357a 100644 --- a/src/message_pool/msgpool/pending_store.rs +++ b/src/message_pool/msgpool/pending_store.rs @@ -124,12 +124,6 @@ impl PendingStore { pub fn subscribe(&self) -> broadcast::Receiver { self.inner.events.subscribe() } - - /// Clone of the [`MpoolUpdate`] broadcast sender. New independent - /// receivers can be derived by calling `subscribe()` on the clone. - pub(in crate::message_pool) fn event_sender(&self) -> broadcast::Sender { - self.inner.events.clone() - } } #[cfg(test)] diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 4645108fd52..67434f460e0 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -1103,22 +1103,6 @@ pub fn eth_tx_from_signed_eth_message( Ok((from, tx)) } -/// Derive the Ethereum-style hash for a `SignedMessage`. -pub fn eth_hash_from_signed_message( - smsg: &SignedMessage, - chain_id: EthChainIdType, -) -> Result { - let hash = if smsg.is_delegated() { - let (_, tx) = eth_tx_from_signed_eth_message(smsg, chain_id)?; - tx.eth_hash()?.into() - } else if smsg.is_secp256k1() { - smsg.cid().into() - } else { - smsg.message().cid().into() - }; - Ok(hash) -} - /// See /// for ABI specification fn encode_filecoin_params_as_abi( @@ -2624,7 +2608,7 @@ impl EthGetTransactionHashByCid { if let Ok(smsgs) = smsgs_result && let Some(smsg) = smsgs.first() { - return Ok(Some(eth_hash_from_signed_message(smsg, eth_chain_id)?)); + return Ok(Some(eth_tx_hash_from_signed_message(smsg, eth_chain_id)?)); } let msg_result = crate::chain::get_chain_message(db, &cid); diff --git a/src/rpc/methods/eth/filter/mempool.rs b/src/rpc/methods/eth/filter/mempool.rs index 1e9d0b64358..1c4c84ffa3f 100644 --- a/src/rpc/methods/eth/filter/mempool.rs +++ b/src/rpc/methods/eth/filter/mempool.rs @@ -5,7 +5,7 @@ use crate::eth::EthChainId as EthChainIdType; use crate::message::SignedMessage; use crate::message_pool::MpoolUpdate; use crate::rpc::Arc; -use crate::rpc::eth::eth_hash_from_signed_message; +use crate::rpc::eth::eth_tx_hash_from_signed_message; use crate::rpc::eth::types::EthHash; use crate::rpc::eth::{FilterID, filter::Filter, filter::FilterManager}; use ahash::AHashMap as HashMap; @@ -15,6 +15,46 @@ use parking_lot::{Mutex, RwLock}; use std::any::Any; use tokio::sync::broadcast; +/// Factory that yields a fresh independent `broadcast::Receiver` +/// on each call. Wraps the `MessagePool` so the filter layer never sees the +/// pool's broadcast `Sender` directly — preserves the send-only encapsulation +/// owned by the message pool module. +#[derive(Clone)] +pub struct MpoolSubscriber { + inner: Arc broadcast::Receiver + Send + Sync>, +} + +impl MpoolSubscriber { + /// Build a subscriber from a factory closure that yields a fresh + /// receiver on each call (typically `move || mp.subscribe_to_updates()`). + pub fn new(factory: F) -> Self + where + F: Fn() -> broadcast::Receiver + Send + Sync + 'static, + { + Self { + inner: Arc::new(factory), + } + } + + /// Subscriber whose receivers never receive any events. Used by + /// standalone contexts (tests, snapshot tools, offline server when no + /// real mempool is attached). + pub fn dummy() -> Self { + let (tx, _) = broadcast::channel::(1); + Self::new(move || tx.subscribe()) + } + + fn subscribe(&self) -> broadcast::Receiver { + (self.inner)() + } +} + +impl std::fmt::Debug for MpoolSubscriber { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MpoolSubscriber").finish_non_exhaustive() + } +} + /// Filter backing `eth_newPendingTransactionFilter`. Each instance owns its /// own `broadcast::Receiver`. #[derive(Debug)] @@ -40,7 +80,21 @@ impl MempoolFilter { } /// Drain queued mempool updates and return the resulting set of pending - /// tx hashes (`Add` minus subsequent `Remove`), capped at `max_results`. + /// tx hashes, capped at `max_results`. + /// + /// Semantics within a single drain window: + /// - `Add` inserts the tx hash. + /// - `Remove` cancels a prior `Add` from the *same* window. A `Remove` + /// for a hash that was already returned by an earlier `drain` call is + /// a no-op on the set — that hash was already reported as pending, + /// so the client has seen it and the cancellation does not need to + /// propagate. + /// + /// Why process `Remove` at all: a tx can leave the mempool between two + /// client polls (mined into a tipset, replaced via RBF, or evicted). If + /// we ignored `Remove` we would surface a hash whose tx is no longer + /// pending, which is misleading for `eth_newPendingTransactionFilter` + /// consumers. pub fn drain(&self, chain_id: EthChainIdType) -> Vec { use broadcast::error::TryRecvError; @@ -55,6 +109,8 @@ impl MempoolFilter { } Ok(MpoolUpdate::Remove(m)) => { if let Some(h) = hash_or_log(&m, chain_id) { + // Cancels a matching Add buffered earlier in the + // same window. No-op if the hash is not in the set. pending.shift_remove(&h); } } @@ -79,7 +135,7 @@ impl Filter for MempoolFilter { } fn hash_or_log(msg: &SignedMessage, chain_id: EthChainIdType) -> Option { - match eth_hash_from_signed_message(msg, chain_id) { + match eth_tx_hash_from_signed_message(msg, chain_id) { Ok(h) => Some(h), Err(e) => { tracing::debug!("mempool filter: dropping message, hash error: {e}"); @@ -88,34 +144,38 @@ fn hash_or_log(msg: &SignedMessage, chain_id: EthChainIdType) -> Option } } -/// Manages installed `MempoolFilter`s. Each `install` derives a fresh -/// `broadcast::Receiver` from the shared sender. Contexts without a real -/// `MessagePool` (tests, snapshot tools, offline server) pass a dummy sender -/// whose receivers always yield `Empty`. -#[derive(Debug)] +/// Manages installed `MempoolFilter`s. Each `install` calls the configured +/// [`MpoolSubscriber`] to obtain a fresh independent +/// `broadcast::Receiver`. Contexts without a real `MessagePool` +/// (tests, snapshot tools, offline server) pass a subscriber whose receivers +/// always yield `Empty`. pub struct MempoolFilterManager { filters: RwLock>>, max_filter_results: usize, - mpool_event_sender: broadcast::Sender, + subscriber: MpoolSubscriber, +} + +impl std::fmt::Debug for MempoolFilterManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MempoolFilterManager") + .field("max_filter_results", &self.max_filter_results) + .finish_non_exhaustive() + } } impl MempoolFilterManager { - pub fn new( - max_filter_results: usize, - mpool_event_sender: broadcast::Sender, - ) -> Arc { + pub fn new(max_filter_results: usize, subscriber: MpoolSubscriber) -> Arc { Arc::new(Self { filters: RwLock::new(HashMap::new()), max_filter_results, - mpool_event_sender, + subscriber, }) } } impl FilterManager for MempoolFilterManager { fn install(&self) -> Result> { - let rx = self.mpool_event_sender.subscribe(); - let filter = MempoolFilter::new(self.max_filter_results, rx) + let filter = MempoolFilter::new(self.max_filter_results, self.subscriber.subscribe()) .context("Failed to create a new mempool filter")?; self.filters .write() @@ -149,17 +209,18 @@ mod tests { } fn hash_of(seq: u64) -> EthHash { - eth_hash_from_signed_message(&make_smsg(seq), TEST_CHAIN_ID).unwrap() + eth_tx_hash_from_signed_message(&make_smsg(seq), TEST_CHAIN_ID).unwrap() } - fn dummy_sender() -> broadcast::Sender { - let (tx, _) = broadcast::channel(1); - tx + /// Build a subscriber backed by `tx` so tests can drive + /// `MpoolUpdate` events through the manager. + fn subscriber_from(tx: broadcast::Sender) -> MpoolSubscriber { + MpoolSubscriber::new(move || tx.subscribe()) } #[test] fn drain_returns_empty_when_no_events() { - let tx = dummy_sender(); + let (tx, _) = broadcast::channel::(1); let filter = MempoolFilter::new(10, tx.subscribe()).unwrap(); assert!(filter.drain(TEST_CHAIN_ID).is_empty()); } @@ -202,4 +263,36 @@ mod tests { // events without panicking. assert!(!filter.drain(TEST_CHAIN_ID).is_empty()); } + + #[test] + fn manager_subscribes_each_filter_to_independent_receiver() { + let (tx, _) = broadcast::channel::(16); + let manager = MempoolFilterManager::new(100, subscriber_from(tx.clone())); + + let f1 = manager.install().expect("install f1"); + let f2 = manager.install().expect("install f2"); + + tx.send(MpoolUpdate::Add(make_smsg(0))).unwrap(); + tx.send(MpoolUpdate::Add(make_smsg(1))).unwrap(); + + let f1 = f1.as_any().downcast_ref::().unwrap(); + let f2 = f2.as_any().downcast_ref::().unwrap(); + + // Each receiver sees the full broadcast, independently. + let h1 = f1.drain(TEST_CHAIN_ID); + let h2 = f2.drain(TEST_CHAIN_ID); + assert_eq!(h1.len(), 2); + assert_eq!(h2.len(), 2); + + // Draining once empties only that receiver. + assert!(f1.drain(TEST_CHAIN_ID).is_empty()); + } + + #[test] + fn manager_with_dummy_subscriber_yields_empty() { + let manager = MempoolFilterManager::new(100, MpoolSubscriber::dummy()); + let f = manager.install().expect("install"); + let f = f.as_any().downcast_ref::().unwrap(); + assert!(f.drain(TEST_CHAIN_ID).is_empty()); + } } diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index a30c27ac738..4f601955d67 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -135,17 +135,13 @@ impl EthEventHandler { // Standalone handler with no live mempool: subscribers see an empty // stream forever. Used in tests, snapshot tools, and other contexts // where no `MessagePool` is available. - let (dummy, _) = tokio::sync::broadcast::channel(1); - Self::from_config(&EventsConfig::default(), dummy) + Self::from_config(&EventsConfig::default(), MpoolSubscriber::dummy()) } /// Build a handler from `config`. Each `MempoolFilter` installed via the - /// returned handler subscribes to `mpool_event_sender` for pending-tx - /// updates. - pub fn from_config( - config: &EventsConfig, - mpool_event_sender: tokio::sync::broadcast::Sender, - ) -> Self { + /// returned handler invokes `mpool_subscriber` to obtain its own + /// independent broadcast receiver for pending-tx updates. + pub fn from_config(config: &EventsConfig, mpool_subscriber: MpoolSubscriber) -> Self { let max_filters: usize = env_or_default("FOREST_MAX_FILTERS", 100); let max_filter_results = std::env::var("FOREST_MAX_FILTER_RESULTS") .ok() @@ -173,7 +169,7 @@ impl EthEventHandler { let tipset_filter_manager = Some(TipSetFilterManager::new(max_filter_results)); let mempool_filter_manager = Some(MempoolFilterManager::new( max_filter_results, - mpool_event_sender, + mpool_subscriber, )); Self { diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 3ea46bbf878..5222b272ddb 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -96,10 +96,13 @@ pub async fn offline_rpc_state( let sync_network_context = SyncNetworkContext::new(network_send, peer_manager, state_manager.db_owned()); let nonce_tracker = NonceTracker::new(); - let eth_event_handler = Arc::new(EthEventHandler::from_config( - &events_config, - message_pool.mpool_event_sender(), - )); + let eth_event_handler = { + let mp = message_pool.shallow_clone(); + let subscriber = crate::rpc::eth::filter::mempool::MpoolSubscriber::new(move || { + mp.subscribe_to_updates() + }); + Arc::new(EthEventHandler::from_config(&events_config, subscriber)) + }; Ok(( RPCState { state_manager, From 0b66f95673fe9245544f927d530157870dcc7d2a Mon Sep 17 00:00:00 2001 From: Gianmarco Fraccaroli Date: Sun, 21 Jun 2026 22:04:27 +0200 Subject: [PATCH 6/6] test(api): assert eth_getFilterChanges returns only new pending txs per poll Adds eth_new_pending_transaction_filter_multi_poll: installs a pending-tx filter, submits tx A, polls (asserts hash A returned), submits tx B, polls (asserts hash B returned and hash A absent). Confirms each eth_getFilterChanges call surfaces only the delta since the previous poll, addressing review feedback on PR #7109. --- .../subcommands/api_cmd/stateful_tests.rs | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/src/tool/subcommands/api_cmd/stateful_tests.rs b/src/tool/subcommands/api_cmd/stateful_tests.rs index dc959f421a4..b7a1e1060e5 100644 --- a/src/tool/subcommands/api_cmd/stateful_tests.rs +++ b/src/tool/subcommands/api_cmd/stateful_tests.rs @@ -825,6 +825,83 @@ fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario { }) } +/// Verify that successive `eth_getFilterChanges` polls return only the +/// pending transactions added since the previous poll. +/// +/// 1. Install a pending-tx filter. +/// 2. Drain any baseline state with an initial poll. +/// 3. Submit tx A, wait for it in the mempool, poll — assert hash A present. +/// 4. Submit tx B, wait for it in the mempool, poll — assert hash B present +/// and hash A absent (it was already consumed by the previous poll). +fn eth_new_pending_transaction_filter_multi_poll(tx: TestTransaction) -> RpcTestScenario { + RpcTestScenario::basic(move |client| { + let tx = tx.clone(); + async move { + let filter_id = client + .call(EthNewPendingTransactionFilter::request(())?) + .await?; + + let result = async { + // Baseline: clear any pre-existing pending state. + let _ = client + .call(EthGetFilterChanges::request((filter_id.clone(),))?) + .await?; + + // First tx. + let cid_a = invoke_contract(&client, &tx).await?; + let hash_a = client + .call(EthGetTransactionHashByCid::request((cid_a,))?) + .await? + .context("no Eth transaction hash for cid_a")?; + wait_in_mempool(&client, cid_a).await?; + let poll_a = client + .call(EthGetFilterChanges::request((filter_id.clone(),))?) + .await?; + let EthFilterResult::Hashes(hashes_a) = poll_a else { + anyhow::bail!("expected hashes, got {poll_a:?}"); + }; + anyhow::ensure!( + hashes_a.contains(&hash_a), + "first poll missing tx_a: hash_a={hash_a:?} hashes={hashes_a:?}" + ); + + // Second tx. + let cid_b = invoke_contract(&client, &tx).await?; + let hash_b = client + .call(EthGetTransactionHashByCid::request((cid_b,))?) + .await? + .context("no Eth transaction hash for cid_b")?; + wait_in_mempool(&client, cid_b).await?; + let poll_b = client + .call(EthGetFilterChanges::request((filter_id.clone(),))?) + .await?; + let EthFilterResult::Hashes(hashes_b) = poll_b else { + anyhow::bail!("expected hashes, got {poll_b:?}"); + }; + anyhow::ensure!( + hashes_b.contains(&hash_b), + "second poll missing tx_b: hash_b={hash_b:?} hashes={hashes_b:?}" + ); + anyhow::ensure!( + !hashes_b.contains(&hash_a), + "second poll should not return previously-consumed tx_a: \ + hash_a={hash_a:?} hashes={hashes_b:?}" + ); + + anyhow::Ok(()) + } + .await; + + let removed = client + .call(EthUninstallFilter::request((filter_id,))?) + .await?; + anyhow::ensure!(removed); + + result + } + }) +} + fn as_logs(input: EthFilterResult) -> EthFilterResult { match input { EthFilterResult::Hashes(vec) if vec.is_empty() => EthFilterResult::Logs(Vec::new()), @@ -934,6 +1011,14 @@ pub(super) async fn create_tests(tx: TestTransaction) -> Vec { EthGetTransactionHashByCid, EthUninstallFilter ), + with_methods!( + eth_new_pending_transaction_filter_multi_poll(tx.clone()) + .name("eth_getFilterChanges returns only new pending txs per poll"), + EthNewPendingTransactionFilter, + EthGetFilterChanges, + EthGetTransactionHashByCid, + EthUninstallFilter + ), with_methods!( eth_get_filter_logs(tx.clone()).name("eth_getFilterLogs works"), EthNewFilter,