diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index ef3b91ecb5c3..3fc070a92383 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -564,7 +564,13 @@ fn maybe_start_rpc_service( .map(|path| crate::rpc::FilterList::new_from_file(path)) .transpose()?; info!("JSON-RPC endpoint will listen at {rpc_address}"); - let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events)); + let eth_event_handler = { + let mp = mpool.shallow_clone(); + let subscriber = crate::rpc::eth::filter::mempool::MpoolSubscriber::new(move || { + mp.subscribe_to_updates() + }); + Arc::new(EthEventHandler::from_config(&config.events, subscriber)) + }; if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") { warn!( "JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments" diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 9de7441c4ca7..67434f460e0d 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -2608,15 +2608,7 @@ impl EthGetTransactionHashByCid { if let Ok(smsgs) = smsgs_result && let Some(smsg) = smsgs.first() { - let hash = if smsg.is_delegated() { - let (_, tx) = eth_tx_from_signed_eth_message(smsg, eth_chain_id)?; - tx.eth_hash()?.into() - } else if smsg.is_secp256k1() { - smsg.cid().into() - } else { - smsg.message().cid().into() - }; - return Ok(Some(hash)); + return Ok(Some(eth_tx_hash_from_signed_message(smsg, eth_chain_id)?)); } let msg_result = crate::chain::get_chain_message(db, &cid); @@ -3151,29 +3143,6 @@ fn eth_filter_logs_from_tipsets(events: &[CollectedEvent]) -> anyhow::Result anyhow::Result> { - events - .iter() - .filter_map(|event| { - match eth_tx_hash_from_message_cid( - ctx.db(), - &event.msg_cid, - ctx.state_manager.chain_config().eth_chain_id, - ) { - Ok(Some(hash)) => Some(Ok(hash)), - Ok(None) => { - tracing::warn!("Ignoring event"); - None - } - Err(err) => Some(Err(err)), - } - }) - .collect() -} - fn eth_filter_logs_from_events( ctx: &Ctx, events: &[CollectedEvent], @@ -3256,15 +3225,6 @@ fn eth_filter_result_from_tipsets(events: &[CollectedEvent]) -> anyhow::Result anyhow::Result { - Ok(EthFilterResult::Hashes(eth_filter_logs_from_messages( - ctx, events, - )?)) -} - pub enum EthGetLogs {} impl RpcMethod<1> for EthGetLogs { const NAME: &'static str = "Filecoin.EthGetLogs"; @@ -3428,36 +3388,9 @@ impl RpcMethod<1> for EthGetFilterChanges { return Ok(eth_filter_result_from_tipsets(&events)?); } if let Some(mempool_filter) = filter.as_any().downcast_ref::() { - let events = ctx - .eth_event_handler - .get_events_for_parsed_filter( - &ctx, - &Arc::new(ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range( - // heaviest tipset doesn't have events because its messages haven't been executed yet - RangeInclusive::new( - mempool_filter - .collected - .unwrap_or(ctx.chain_store().heaviest_tipset().epoch() - 1), - // Use -1 to indicate that the range extends until the latest available tipset. - -1, - ), - ))), - SkipEvent::OnUnresolvedAddress, - ) - .await?; - let new_collected = events - .iter() - .max_by_key(|event| event.height) - .map(|e| e.height); - if let Some(height) = new_collected { - let filter = Arc::new(MempoolFilter { - id: mempool_filter.id.clone(), - max_results: mempool_filter.max_results, - collected: Some(height), - }); - store.update(filter); - } - return Ok(eth_filter_result_from_messages(&ctx, &events)?); + let chain_id = ctx.chain_config().eth_chain_id; + let hashes = mempool_filter.drain(chain_id); + return Ok(EthFilterResult::Hashes(hashes)); } } Err(anyhow::anyhow!("method not supported").into()) diff --git a/src/rpc/methods/eth/filter/mempool.rs b/src/rpc/methods/eth/filter/mempool.rs index 1c2e11c75d28..1c4c84ffa3f9 100644 --- a/src/rpc/methods/eth/filter/mempool.rs +++ b/src/rpc/methods/eth/filter/mempool.rs @@ -1,36 +1,127 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use crate::eth::EthChainId as EthChainIdType; +use crate::message::SignedMessage; +use crate::message_pool::MpoolUpdate; use crate::rpc::Arc; +use crate::rpc::eth::eth_tx_hash_from_signed_message; +use crate::rpc::eth::types::EthHash; use crate::rpc::eth::{FilterID, filter::Filter, filter::FilterManager}; -use crate::shim::fvm_shared_latest::clock::ChainEpoch; use ahash::AHashMap as HashMap; use anyhow::{Context, Result}; -use parking_lot::RwLock; +use indexmap::IndexSet; +use parking_lot::{Mutex, RwLock}; use std::any::Any; +use tokio::sync::broadcast; -/// Data structure for filtering and collecting pending transactions -/// from the mempool before they are confirmed in a block. -#[allow(dead_code)] -#[derive(Debug, PartialEq)] +/// Factory that yields a fresh independent `broadcast::Receiver` +/// on each call. Wraps the `MessagePool` so the filter layer never sees the +/// pool's broadcast `Sender` directly — preserves the send-only encapsulation +/// owned by the message pool module. +#[derive(Clone)] +pub struct MpoolSubscriber { + inner: Arc broadcast::Receiver + Send + Sync>, +} + +impl MpoolSubscriber { + /// Build a subscriber from a factory closure that yields a fresh + /// receiver on each call (typically `move || mp.subscribe_to_updates()`). + pub fn new(factory: F) -> Self + where + F: Fn() -> broadcast::Receiver + Send + Sync + 'static, + { + Self { + inner: Arc::new(factory), + } + } + + /// Subscriber whose receivers never receive any events. Used by + /// standalone contexts (tests, snapshot tools, offline server when no + /// real mempool is attached). + pub fn dummy() -> Self { + let (tx, _) = broadcast::channel::(1); + Self::new(move || tx.subscribe()) + } + + fn subscribe(&self) -> broadcast::Receiver { + (self.inner)() + } +} + +impl std::fmt::Debug for MpoolSubscriber { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MpoolSubscriber").finish_non_exhaustive() + } +} + +/// Filter backing `eth_newPendingTransactionFilter`. Each instance owns its +/// own `broadcast::Receiver`. +#[derive(Debug)] pub struct MempoolFilter { // Unique id used to identify the filter pub id: FilterID, // Maximum number of results to collect pub max_results: usize, - // Epoch at which the results were collected - pub collected: Option, + // Receiver for mempool updates + rx: Mutex>, } impl MempoolFilter { - pub fn new(max_results: usize) -> Result, uuid::Error> { - let id = FilterID::new()?; + pub fn new( + max_results: usize, + rx: broadcast::Receiver, + ) -> Result, uuid::Error> { Ok(Arc::new(Self { - id, + id: FilterID::new()?, max_results, - collected: None, + rx: Mutex::new(rx), })) } + + /// Drain queued mempool updates and return the resulting set of pending + /// tx hashes, capped at `max_results`. + /// + /// Semantics within a single drain window: + /// - `Add` inserts the tx hash. + /// - `Remove` cancels a prior `Add` from the *same* window. A `Remove` + /// for a hash that was already returned by an earlier `drain` call is + /// a no-op on the set — that hash was already reported as pending, + /// so the client has seen it and the cancellation does not need to + /// propagate. + /// + /// Why process `Remove` at all: a tx can leave the mempool between two + /// client polls (mined into a tipset, replaced via RBF, or evicted). If + /// we ignored `Remove` we would surface a hash whose tx is no longer + /// pending, which is misleading for `eth_newPendingTransactionFilter` + /// consumers. + pub fn drain(&self, chain_id: EthChainIdType) -> Vec { + use broadcast::error::TryRecvError; + + let mut rx = self.rx.lock(); + let mut pending: IndexSet = IndexSet::new(); + loop { + match rx.try_recv() { + Ok(MpoolUpdate::Add(m)) => { + if let Some(h) = hash_or_log(&m, chain_id) { + pending.insert(h); + } + } + Ok(MpoolUpdate::Remove(m)) => { + if let Some(h) = hash_or_log(&m, chain_id) { + // Cancels a matching Add buffered earlier in the + // same window. No-op if the hash is not in the set. + pending.shift_remove(&h); + } + } + Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => break, + Err(TryRecvError::Lagged(n)) => { + tracing::warn!("mempool filter lagged, dropped {n} events"); + } + } + } + pending.into_iter().take(self.max_results).collect() + } } impl Filter for MempoolFilter { @@ -43,76 +134,165 @@ impl Filter for MempoolFilter { } } -/// `MempoolFilterManager` uses a `RwLock` to handle concurrent access to a collection of `MempoolFilter` -/// instances, each identified by a `FilterID`. The number of results returned by the filters is capped by `max_filter_results`. -#[derive(Debug)] +fn hash_or_log(msg: &SignedMessage, chain_id: EthChainIdType) -> Option { + match eth_tx_hash_from_signed_message(msg, chain_id) { + Ok(h) => Some(h), + Err(e) => { + tracing::debug!("mempool filter: dropping message, hash error: {e}"); + None + } + } +} + +/// Manages installed `MempoolFilter`s. Each `install` calls the configured +/// [`MpoolSubscriber`] to obtain a fresh independent +/// `broadcast::Receiver`. Contexts without a real `MessagePool` +/// (tests, snapshot tools, offline server) pass a subscriber whose receivers +/// always yield `Empty`. pub struct MempoolFilterManager { filters: RwLock>>, max_filter_results: usize, + subscriber: MpoolSubscriber, +} + +impl std::fmt::Debug for MempoolFilterManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MempoolFilterManager") + .field("max_filter_results", &self.max_filter_results) + .finish_non_exhaustive() + } } impl MempoolFilterManager { - pub fn new(max_filter_results: usize) -> Arc { + pub fn new(max_filter_results: usize, subscriber: MpoolSubscriber) -> Arc { Arc::new(Self { filters: RwLock::new(HashMap::new()), max_filter_results, + subscriber, }) } } impl FilterManager for MempoolFilterManager { fn install(&self) -> Result> { - let filter = MempoolFilter::new(self.max_filter_results) + let filter = MempoolFilter::new(self.max_filter_results, self.subscriber.subscribe()) .context("Failed to create a new mempool filter")?; - let id = filter.id().clone(); - - self.filters.write().insert(id, filter.clone()); - + self.filters + .write() + .insert(filter.id().clone(), filter.clone()); Ok(filter) } fn remove(&self, id: &FilterID) -> Option> { - let mut filters = self.filters.write(); - filters.remove(id) + self.filters.write().remove(id) } } #[cfg(test)] mod tests { use super::*; + use crate::shim::address::Address; + use crate::shim::econ::TokenAmount; + use crate::shim::message::Message as ShimMessage; + + const TEST_CHAIN_ID: EthChainIdType = 314; + + fn make_smsg(seq: u64) -> SignedMessage { + SignedMessage::mock_bls_signed_message(ShimMessage { + from: Address::new_id(1), + to: Address::new_id(2), + sequence: seq, + gas_premium: TokenAmount::from_atto(100u64), + gas_limit: 1_000_000, + ..ShimMessage::default() + }) + } + + fn hash_of(seq: u64) -> EthHash { + eth_tx_hash_from_signed_message(&make_smsg(seq), TEST_CHAIN_ID).unwrap() + } + + /// Build a subscriber backed by `tx` so tests can drive + /// `MpoolUpdate` events through the manager. + fn subscriber_from(tx: broadcast::Sender) -> MpoolSubscriber { + MpoolSubscriber::new(move || tx.subscribe()) + } + + #[test] + fn drain_returns_empty_when_no_events() { + let (tx, _) = broadcast::channel::(1); + let filter = MempoolFilter::new(10, tx.subscribe()).unwrap(); + assert!(filter.drain(TEST_CHAIN_ID).is_empty()); + } + + #[test] + fn drain_add_remove_cancel_within_window() { + let (tx, _) = broadcast::channel::(16); + let filter = MempoolFilter::new(10, tx.subscribe()).unwrap(); + + tx.send(MpoolUpdate::Add(make_smsg(0))).unwrap(); + tx.send(MpoolUpdate::Add(make_smsg(1))).unwrap(); + tx.send(MpoolUpdate::Remove(make_smsg(0))).unwrap(); + tx.send(MpoolUpdate::Add(make_smsg(2))).unwrap(); + + let hashes = filter.drain(TEST_CHAIN_ID); + assert!(!hashes.contains(&hash_of(0)), "Add+Remove should cancel"); + assert!(hashes.contains(&hash_of(1))); + assert!(hashes.contains(&hash_of(2))); + assert!(filter.drain(TEST_CHAIN_ID).is_empty(), "second drain empty"); + } #[test] - fn test_mempool_filter() { - // Test case 1: Create a mempool filter - let max_results = 10; - let filter = MempoolFilter::new(max_results).expect("Failed to create mempool filter"); - assert_eq!(filter.max_results, max_results); - - // Test case 2: Create a mempool filter manager and install the mempool filter - let mempool_manager = MempoolFilterManager::new(max_results); - let installed_filter = mempool_manager - .install() - .expect("Failed to install mempool filter"); - - // Verify that the filter has been added to the mempool manager - { - let filters = mempool_manager.filters.read(); - assert!(filters.contains_key(installed_filter.id())); + fn drain_truncates_to_max_results() { + let (tx, _) = broadcast::channel::(64); + let filter = MempoolFilter::new(2, tx.subscribe()).unwrap(); + for seq in 0..5u64 { + tx.send(MpoolUpdate::Add(make_smsg(seq))).unwrap(); } + assert_eq!(filter.drain(TEST_CHAIN_ID).len(), 2); + } - // Test case 3: Remove the installed mempool filter - let filter_id = installed_filter.id().clone(); - let removed = mempool_manager.remove(&filter_id); - assert_eq!( - removed.map(|f| f.id().clone()), - Some(installed_filter.id().clone()), - "Filter should be successfully removed" - ); - - // Verify that the filter is no longer in the mempool manager - { - let filters = mempool_manager.filters.read(); - assert!(!filters.contains_key(&filter_id)); + #[test] + fn drain_handles_lag_and_returns_remaining() { + let (tx, _) = broadcast::channel::(4); + let filter = MempoolFilter::new(100, tx.subscribe()).unwrap(); + for seq in 0..10u64 { + tx.send(MpoolUpdate::Add(make_smsg(seq))).unwrap(); } + // Buffer was 4; receiver lagged. Drain returns the remaining buffered + // events without panicking. + assert!(!filter.drain(TEST_CHAIN_ID).is_empty()); + } + + #[test] + fn manager_subscribes_each_filter_to_independent_receiver() { + let (tx, _) = broadcast::channel::(16); + let manager = MempoolFilterManager::new(100, subscriber_from(tx.clone())); + + let f1 = manager.install().expect("install f1"); + let f2 = manager.install().expect("install f2"); + + tx.send(MpoolUpdate::Add(make_smsg(0))).unwrap(); + tx.send(MpoolUpdate::Add(make_smsg(1))).unwrap(); + + let f1 = f1.as_any().downcast_ref::().unwrap(); + let f2 = f2.as_any().downcast_ref::().unwrap(); + + // Each receiver sees the full broadcast, independently. + let h1 = f1.drain(TEST_CHAIN_ID); + let h2 = f2.drain(TEST_CHAIN_ID); + assert_eq!(h1.len(), 2); + assert_eq!(h2.len(), 2); + + // Draining once empties only that receiver. + assert!(f1.drain(TEST_CHAIN_ID).is_empty()); + } + + #[test] + fn manager_with_dummy_subscriber_yields_empty() { + let manager = MempoolFilterManager::new(100, MpoolSubscriber::dummy()); + let f = manager.install().expect("install"); + let f = f.as_any().downcast_ref::().unwrap(); + assert!(f.drain(TEST_CHAIN_ID).is_empty()); } } diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index ce7fed6e4dbd..4f601955d67f 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -132,11 +132,16 @@ pub enum SkipEvent { impl EthEventHandler { pub fn new() -> Self { - let config = EventsConfig::default(); - Self::from_config(&config) + // Standalone handler with no live mempool: subscribers see an empty + // stream forever. Used in tests, snapshot tools, and other contexts + // where no `MessagePool` is available. + Self::from_config(&EventsConfig::default(), MpoolSubscriber::dummy()) } - pub fn from_config(config: &EventsConfig) -> Self { + /// Build a handler from `config`. Each `MempoolFilter` installed via the + /// returned handler invokes `mpool_subscriber` to obtain its own + /// independent broadcast receiver for pending-tx updates. + pub fn from_config(config: &EventsConfig, mpool_subscriber: MpoolSubscriber) -> Self { let max_filters: usize = env_or_default("FOREST_MAX_FILTERS", 100); let max_filter_results = std::env::var("FOREST_MAX_FILTER_RESULTS") .ok() @@ -162,7 +167,10 @@ impl EthEventHandler { Some(MemFilterStore::new(max_filters) as Arc); let event_filter_manager = Some(EventFilterManager::new(max_filter_results)); let tipset_filter_manager = Some(TipSetFilterManager::new(max_filter_results)); - let mempool_filter_manager = Some(MempoolFilterManager::new(max_filter_results)); + let mempool_filter_manager = Some(MempoolFilterManager::new( + max_filter_results, + mpool_subscriber, + )); Self { filter_store, diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index b315025dc00e..5222b272ddbf 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -96,6 +96,13 @@ pub async fn offline_rpc_state( let sync_network_context = SyncNetworkContext::new(network_send, peer_manager, state_manager.db_owned()); let nonce_tracker = NonceTracker::new(); + let eth_event_handler = { + let mp = message_pool.shallow_clone(); + let subscriber = crate::rpc::eth::filter::mempool::MpoolSubscriber::new(move || { + mp.subscribe_to_updates() + }); + Arc::new(EthEventHandler::from_config(&events_config, subscriber)) + }; Ok(( RPCState { state_manager, @@ -103,7 +110,7 @@ pub async fn offline_rpc_state( mpool: message_pool, bad_blocks: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), - eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), + eth_event_handler, sync_network_context, start_time: chrono::Utc::now(), shutdown, diff --git a/src/tool/subcommands/api_cmd/stateful_tests.rs b/src/tool/subcommands/api_cmd/stateful_tests.rs index 8df22c542646..b7a1e1060e5a 100644 --- a/src/tool/subcommands/api_cmd/stateful_tests.rs +++ b/src/tool/subcommands/api_cmd/stateful_tests.rs @@ -308,26 +308,19 @@ async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> { unreachable!("loop always returns within the branches above") } -async fn wait_pending_message(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { - let tipset = client.call(ChainHead::request(())?).await?; +/// Poll `MpoolPending` until `message_cid` is visible. Does not wait for +/// execution. +async fn wait_in_mempool(client: &rpc::Client, message_cid: Cid) -> anyhow::Result<()> { let mut retries = 100; loop { let pending = client .call(MpoolPending::request((ApiTipsetKey(None),))?) .await?; - if pending.0.iter().any(|msg| msg.cid() == message_cid) { - client - .call( - StateWaitMsg::request((message_cid, 1, tipset.epoch(), true))? - .with_timeout(Duration::from_secs(300)), - ) - .await?; break Ok(()); } ensure!(retries != 0, "Message not found in mpool"); retries -= 1; - tokio::time::sleep(Duration::from_millis(10)).await; } } @@ -796,7 +789,8 @@ fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario { .await? .context("no Eth transaction hash for CID")?; - wait_pending_message(&client, cid).await?; + // Observe mempool state *before* the message is mined. + wait_in_mempool(&client, cid).await?; let filter_result = client .call(EthGetFilterChanges::request((filter_id.clone(),))?) @@ -831,6 +825,83 @@ fn eth_new_pending_transaction_filter(tx: TestTransaction) -> RpcTestScenario { }) } +/// Verify that successive `eth_getFilterChanges` polls return only the +/// pending transactions added since the previous poll. +/// +/// 1. Install a pending-tx filter. +/// 2. Drain any baseline state with an initial poll. +/// 3. Submit tx A, wait for it in the mempool, poll — assert hash A present. +/// 4. Submit tx B, wait for it in the mempool, poll — assert hash B present +/// and hash A absent (it was already consumed by the previous poll). +fn eth_new_pending_transaction_filter_multi_poll(tx: TestTransaction) -> RpcTestScenario { + RpcTestScenario::basic(move |client| { + let tx = tx.clone(); + async move { + let filter_id = client + .call(EthNewPendingTransactionFilter::request(())?) + .await?; + + let result = async { + // Baseline: clear any pre-existing pending state. + let _ = client + .call(EthGetFilterChanges::request((filter_id.clone(),))?) + .await?; + + // First tx. + let cid_a = invoke_contract(&client, &tx).await?; + let hash_a = client + .call(EthGetTransactionHashByCid::request((cid_a,))?) + .await? + .context("no Eth transaction hash for cid_a")?; + wait_in_mempool(&client, cid_a).await?; + let poll_a = client + .call(EthGetFilterChanges::request((filter_id.clone(),))?) + .await?; + let EthFilterResult::Hashes(hashes_a) = poll_a else { + anyhow::bail!("expected hashes, got {poll_a:?}"); + }; + anyhow::ensure!( + hashes_a.contains(&hash_a), + "first poll missing tx_a: hash_a={hash_a:?} hashes={hashes_a:?}" + ); + + // Second tx. + let cid_b = invoke_contract(&client, &tx).await?; + let hash_b = client + .call(EthGetTransactionHashByCid::request((cid_b,))?) + .await? + .context("no Eth transaction hash for cid_b")?; + wait_in_mempool(&client, cid_b).await?; + let poll_b = client + .call(EthGetFilterChanges::request((filter_id.clone(),))?) + .await?; + let EthFilterResult::Hashes(hashes_b) = poll_b else { + anyhow::bail!("expected hashes, got {poll_b:?}"); + }; + anyhow::ensure!( + hashes_b.contains(&hash_b), + "second poll missing tx_b: hash_b={hash_b:?} hashes={hashes_b:?}" + ); + anyhow::ensure!( + !hashes_b.contains(&hash_a), + "second poll should not return previously-consumed tx_a: \ + hash_a={hash_a:?} hashes={hashes_b:?}" + ); + + anyhow::Ok(()) + } + .await; + + let removed = client + .call(EthUninstallFilter::request((filter_id,))?) + .await?; + anyhow::ensure!(removed); + + result + } + }) +} + fn as_logs(input: EthFilterResult) -> EthFilterResult { match input { EthFilterResult::Hashes(vec) if vec.is_empty() => EthFilterResult::Logs(Vec::new()), @@ -940,6 +1011,14 @@ pub(super) async fn create_tests(tx: TestTransaction) -> Vec { EthGetTransactionHashByCid, EthUninstallFilter ), + with_methods!( + eth_new_pending_transaction_filter_multi_poll(tx.clone()) + .name("eth_getFilterChanges returns only new pending txs per poll"), + EthNewPendingTransactionFilter, + EthGetFilterChanges, + EthGetTransactionHashByCid, + EthUninstallFilter + ), with_methods!( eth_get_filter_logs(tx.clone()).name("eth_getFilterLogs works"), EthNewFilter,