From 9e367c2afebbf6410d771aa92980a4550122fa25 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Thu, 11 Jun 2026 21:17:14 +0530 Subject: [PATCH 1/7] set the reverted field in the logs --- src/daemon/mod.rs | 1 + src/rpc/methods/eth.rs | 134 +++--- src/rpc/methods/eth/filter/event.rs | 8 +- src/rpc/methods/eth/filter/mod.rs | 416 +++++------------- src/rpc/methods/eth/pubsub.rs | 304 ++++++++++++- src/rpc/methods/sync.rs | 1 + src/rpc/mod.rs | 5 + src/state_manager/state_computation.rs | 47 +- src/tool/offline_server/server.rs | 1 + .../api_cmd/generate_test_snapshot.rs | 1 + src/tool/subcommands/api_cmd/test_snapshot.rs | 1 + 11 files changed, 528 insertions(+), 391 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index f19b0a532292..e9e4477b4cf2 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -600,6 +600,7 @@ fn maybe_start_rpc_service( bad_blocks, sync_status, eth_event_handler, + eth_logs_feed: Default::default(), sync_network_context, start_time, shutdown, diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index c8cac3138e1a..d04a108c0ab7 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -35,7 +35,10 @@ use crate::rpc::{ error::ServerError, eth::{ errors::EthErrors, - filter::{SkipEvent, event::EventFilter, mempool::MempoolFilter, tipset::TipSetFilter}, + filter::{ + EventRevertStatus, SkipEvent, event::EventFilter, mempool::MempoolFilter, + tipset::TipSetFilter, + }, utils::decode_revert_reason, }, methods::chain::ChainGetTipSetV2, @@ -1410,17 +1413,30 @@ pub async fn eth_logs_for_block_and_transaction( eth_filter_logs_from_events(ctx, &events) } -pub async fn eth_logs_with_filter( +/// Collects the Ethereum logs of the message tipset whose receipts live in `receipt_ts` +/// (its child), marking them as removed when the head change that surfaced them was a reorg +/// revert. The receipt tipset is passed explicitly because reverted tipsets can no longer be +/// resolved through the canonical chain. +pub(in crate::rpc) async fn eth_logs_for_head_change( ctx: &Ctx, - ts: &Tipset, - spec: Option, + receipt_ts: &Tipset, + revert_status: EventRevertStatus, ) -> anyhow::Result> { + let msg_ts = ctx + .chain_index() + .load_required_tipset(receipt_ts.parents())?; + let executed_ts = ctx + .state_manager + .load_executed_tipset_with_receipt(&msg_ts, receipt_ts) + .await?; let mut events = vec![]; - EthEventHandler::collect_events( + EthEventHandler::collect_events_from_messages( &ctx.state_manager, - ts, - spec.as_ref(), + &msg_ts, + &executed_ts.executed_messages, + None::<&ParsedFilter>, SkipEvent::OnUnresolvedAddress, + revert_status, &mut events, ) .await?; @@ -3036,6 +3052,11 @@ pub struct CollectedEvent { pub(crate) msg_cid: Cid, } +/// Positions `(message index, event index)` of collected events, grouped by tipset. +/// Identifies events without retaining their entry payloads; grouping by tipset lets +/// membership checks borrow the tipset key and stores each distinct key only once. +pub type SeenEventPositions = HashMap>; + fn match_key(key: &str) -> Option { match key.get(0..2) { Some("t1") => Some(0), @@ -3297,6 +3318,59 @@ impl RpcMethod<1> for EthGetLogs { } } +/// Shared implementation of `eth_getFilterLogs` / `eth_getFilterChanges` for installed event +/// filters: collects the filter's full result set from the canonical chain, returns only the +/// events that were not present in the previous poll, and stores the latest set as the new +/// baseline. +async fn poll_event_filter( + ctx: &Ctx, + event_filter: &EventFilter, +) -> anyhow::Result> { + let events = ctx + .eth_event_handler + .get_events_for_parsed_filter( + ctx, + &Arc::new(event_filter.into()), + SkipEvent::OnUnresolvedAddress, + ) + .await?; + // An event's position identifies it uniquely, so the poll baseline stores positions + // instead of whole `CollectedEvent`s and the filter does not pin entry payloads between + // polls. A re-orged duplicate lands under a different tipset key and is correctly + // reported again. + let mut seen_positions = SeenEventPositions::default(); + let mut recent_events = Vec::new(); + for event in events { + let position = (event.msg_idx, event.event_idx); + let already_seen = event_filter + .seen_positions + .get(&event.tipset_key) + .is_some_and(|positions| positions.contains(&position)); + match seen_positions.get_mut(&event.tipset_key) { + Some(positions) => { + positions.insert(position); + } + None => { + seen_positions.insert(event.tipset_key.clone(), HashSet::from_iter([position])); + } + } + if !already_seen { + recent_events.push(event); + } + } + if let Some(store) = &ctx.eth_event_handler.filter_store { + store.update(Arc::new(EventFilter { + id: event_filter.id.clone(), + tipsets: event_filter.tipsets.clone(), + addresses: event_filter.addresses.clone(), + keys_with_codec: event_filter.keys_with_codec.clone(), + max_results: event_filter.max_results, + seen_positions, + })); + } + Ok(recent_events) +} + pub enum EthGetFilterLogs {} impl RpcMethod<1> for EthGetFilterLogs { const NAME: &'static str = "Filecoin.EthGetFilterLogs"; @@ -3316,28 +3390,7 @@ impl RpcMethod<1> for EthGetFilterLogs { if let Some(store) = ð_event_handler.filter_store { let filter = store.get(&filter_id)?; if let Some(event_filter) = filter.as_any().downcast_ref::() { - let events = ctx - .eth_event_handler - .get_events_for_parsed_filter( - &ctx, - &Arc::new(event_filter.into()), - SkipEvent::OnUnresolvedAddress, - ) - .await?; - let recent_events: Vec = events - .clone() - .into_iter() - .filter(|event| !event_filter.collected.contains(event)) - .collect(); - let filter = Arc::new(EventFilter { - id: event_filter.id.clone(), - tipsets: event_filter.tipsets.clone(), - addresses: event_filter.addresses.clone(), - keys_with_codec: event_filter.keys_with_codec.clone(), - max_results: event_filter.max_results, - collected: events.clone(), - }); - store.update(filter); + let recent_events = poll_event_filter(&ctx, event_filter).await?; return Ok(eth_filter_result_from_events(&ctx, &recent_events)?); } } @@ -3367,28 +3420,7 @@ impl RpcMethod<1> for EthGetFilterChanges { if let Some(store) = ð_event_handler.filter_store { let filter = store.get(&filter_id)?; if let Some(event_filter) = filter.as_any().downcast_ref::() { - let events = ctx - .eth_event_handler - .get_events_for_parsed_filter( - &ctx, - &Arc::new(event_filter.into()), - SkipEvent::OnUnresolvedAddress, - ) - .await?; - let recent_events: Vec = events - .clone() - .into_iter() - .filter(|event| !event_filter.collected.contains(event)) - .collect(); - let filter = Arc::new(EventFilter { - id: event_filter.id.clone(), - tipsets: event_filter.tipsets.clone(), - addresses: event_filter.addresses.clone(), - keys_with_codec: event_filter.keys_with_codec.clone(), - max_results: event_filter.max_results, - collected: events.clone(), - }); - store.update(filter); + let recent_events = poll_event_filter(&ctx, event_filter).await?; return Ok(eth_filter_result_from_events(&ctx, &recent_events)?); } if let Some(tipset_filter) = filter.as_any().downcast_ref::() { diff --git a/src/rpc/methods/eth/filter/event.rs b/src/rpc/methods/eth/filter/event.rs index 605ba069c949..14b871bd3586 100644 --- a/src/rpc/methods/eth/filter/event.rs +++ b/src/rpc/methods/eth/filter/event.rs @@ -3,7 +3,7 @@ use crate::prelude::*; use crate::rpc::eth::filter::{ActorEventBlock, ParsedFilter, ParsedFilterTipsets}; -use crate::rpc::eth::{CollectedEvent, FilterID, filter::Filter}; +use crate::rpc::eth::{FilterID, SeenEventPositions, filter::Filter}; use crate::shim::address::Address; use ahash::HashMap; use anyhow::Result; @@ -22,8 +22,8 @@ pub struct EventFilter { pub keys_with_codec: HashMap>, // Maximum number of results to collect pub max_results: usize, - // Collected events - pub collected: Vec, + // Positions of the events returned by the last poll, used to compute the next poll's delta + pub seen_positions: SeenEventPositions, } impl From<&EventFilter> for ParsedFilter { @@ -72,7 +72,7 @@ impl EventFilterManager { addresses: pf.addresses, keys_with_codec: pf.keys, max_results: self.max_filter_results, - collected: vec![], + seen_positions: Default::default(), }); self.filters.write().insert(id, filter.clone()); diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index b97a14bf7a36..721ea52b8ac9 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -29,7 +29,6 @@ use crate::blocks::TipsetKey; use crate::chain::index::ResolveNullTipset; use crate::cli_shared::cli::EventsConfig; use crate::prelude::*; -use crate::rpc::eth::EVM_WORD_LENGTH; use crate::rpc::eth::errors::EthErrors; use crate::rpc::eth::filter::event::*; use crate::rpc::eth::filter::mempool::*; @@ -130,6 +129,15 @@ pub enum SkipEvent { Never, } +/// Whether the events being collected belong to a tipset that was applied to the canonical +/// chain, or to one that was reverted from it by a reorg. Reverted events surface as +/// `removed: true` in the Ethereum log APIs. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum EventRevertStatus { + Applied, + Reverted, +} + impl EthEventHandler { pub fn new() -> Self { let config = EventsConfig::default(); @@ -341,12 +349,33 @@ impl EthEventHandler { skip_event: SkipEvent, collected_events: &mut Vec, ) -> anyhow::Result<()> { - let msg_cid_filter = spec.and_then(|s| s.msg_cid_filter()).copied(); - let height = tipset.epoch(); - let tipset_key = tipset.key(); let ExecutedTipset { executed_messages, .. } = state_manager.load_executed_tipset_for_rpc(tipset).await?; + Self::collect_events_from_messages( + state_manager, + tipset, + &executed_messages, + spec, + skip_event, + EventRevertStatus::Applied, + collected_events, + ) + .await + } + + pub async fn collect_events_from_messages( + state_manager: &StateManager, + tipset: &Tipset, + executed_messages: &[ExecutedMessage], + spec: Option<&impl Matcher>, + skip_event: SkipEvent, + revert_status: EventRevertStatus, + collected_events: &mut Vec, + ) -> anyhow::Result<()> { + let msg_cid_filter = spec.and_then(|s| s.msg_cid_filter()).copied(); + let height = tipset.epoch(); + let tipset_key = tipset.key(); let mut resolved_id_addrs = HashMap::default(); let mut event_count = 0; for ( @@ -417,7 +446,7 @@ impl EthEventHandler { entries, emitter_addr: resolved, event_idx, - reverted: false, + reverted: matches!(revert_status, EventRevertStatus::Reverted), height, tipset_key: tipset_key.clone(), msg_idx: msg_idx as u64, @@ -565,50 +594,6 @@ impl EthFilterSpec { } } -impl Matcher for EthFilterSpec { - fn matches( - &self, - emitter_addr: &crate::shim::address::Address, - entries: &[Entry], - ) -> anyhow::Result { - fn get_word(value: &[u8]) -> Option<&[u8; EVM_WORD_LENGTH]> { - value.get(..EVM_WORD_LENGTH)?.try_into().ok() - } - - let eth_emitter_addr = EthAddress::from_filecoin_address(emitter_addr)?; - - let match_addr = match self.address { - Some(ref address_list) => { - if address_list.is_empty() { - true - } else { - address_list.iter().any(|other| other == ð_emitter_addr) - } - } - None => true, - }; - - let match_topics = if let Some(spec) = self.topics.as_ref() { - entries.iter().enumerate().all(|(i, entry)| { - if let Some(slice) = get_word(entry.value()) { - let hash: EthHash = (*slice).into(); - match spec.0.get(i) { - Some(EthHashList::List(vec)) => vec.contains(&hash), - Some(EthHashList::Single(Some(h))) => h == &hash, - _ => true, /* wildcard */ - } - } else { - // Drop events with mis-sized topics - false - } - }) - } else { - true - }; - Ok(match_addr && match_topics) - } -} - // TODO(forest): https://github.com/ChainSafe/forest/issues/6411 fn parse_block_range( heaviest: ChainEpoch, @@ -883,19 +868,6 @@ mod tests { ); } - #[test] - fn test_empty_address_list() { - let empty_list_spec = EthFilterSpec { - address: Some(vec![].into()), // Empty list, not None - ..Default::default() - }; - - let addr = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - - // Updated to match Lotus behavior: empty list = wildcard (matches all) - assert!(empty_list_spec.matches(&addr, &[]).unwrap()); - } - #[test] fn test_parse_eth_filter_spec_with_none_address() { let eth_filter_spec = EthFilterSpec { @@ -935,51 +907,6 @@ mod tests { ); } - #[test] - fn test_lotus_compatible_address_behavior() { - // Test the Lotus-compatible behavior: empty list = wildcard - let addr = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - - // Case 1: None (omitted) = wildcard - let none_spec = EthFilterSpec { - address: None, - ..Default::default() - }; - assert!( - none_spec.matches(&addr, &[]).unwrap(), - "None should match all addresses" - ); - - // Case 2: Empty list = wildcard (Lotus behavior) - let empty_spec = EthFilterSpec { - address: Some(vec![].into()), - ..Default::default() - }; - assert!( - empty_spec.matches(&addr, &[]).unwrap(), - "Empty list should match all addresses (Lotus compatible)" - ); - - // Case 3: Specific address = only that address - let eth_addr = EthAddress::from_filecoin_address(&addr).unwrap(); - let specific_spec = EthFilterSpec { - address: Some(vec![eth_addr].into()), - ..Default::default() - }; - assert!( - specific_spec.matches(&addr, &[]).unwrap(), - "Specific address should match itself" - ); - - // Case 4: Different address = no match - let different_addr = - Address::from_str("t410fe2jx2wo3irrsktetbvptcnj7csvitihxyehuaeq").unwrap(); - assert!( - !specific_spec.matches(&different_addr, &[]).unwrap(), - "Specific address should not match different address" - ); - } - #[test] fn test_eth_filter_spec_default_has_none_values() { let default_spec = EthFilterSpec::default(); @@ -1005,14 +932,6 @@ mod tests { default_spec.block_hash.is_none(), "Default EthFilterSpec should have None block_hash" ); - - // Verify that the default spec matches any address (wildcard behavior) - let addr0 = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - let addr1 = Address::from_str("t410fe2jx2wo3irrsktetbvptcnj7csvitihxyehuaeq").unwrap(); - - // Test with no entries - assert!(default_spec.matches(&addr0, &[]).unwrap()); - assert!(default_spec.matches(&addr1, &[]).unwrap()); } #[test] @@ -1345,198 +1264,6 @@ mod tests { } } - #[test] - fn test_do_match_address() { - let empty_spec = EthFilterSpec::default(); - - let addr0 = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - let eth_addr0 = EthAddress::from_str("0xff38c072f286e3b20b3954ca9f99c05fbecc64aa").unwrap(); - - let addr1 = Address::from_str("t410fe2jx2wo3irrsktetbvptcnj7csvitihxyehuaeq").unwrap(); - let eth_addr1 = EthAddress::from_str("0x26937d59db4463254c930d5f31353f14aa89a0f7").unwrap(); - - let entries0 = vec![ - Entry::new( - Flags::FLAG_INDEXED_ALL, - "t1".into(), - IPLD_RAW, - vec![ - 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, - 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, - ], - ), - Entry::new( - Flags::FLAG_INDEXED_ALL, - "t2".into(), - IPLD_RAW, - vec![ - 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, - 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, - ], - ), - Entry::new( - Flags::FLAG_INDEXED_ALL, - "d".into(), - IPLD_RAW, - vec![ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, - 254, 169, 229, 74, 6, 24, 52, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 13, 232, 134, 151, 206, 121, 139, 231, 226, 192, - ], - ), - ]; - - // Matching an empty spec - assert!(empty_spec.matches(&addr0, &[]).unwrap()); - - assert!(empty_spec.matches(&addr0, &entries0).unwrap()); - - // Matching the given address 0 - let spec0 = EthFilterSpec { - address: Some(vec![eth_addr0].into()), - ..Default::default() - }; - - assert!(spec0.matches(&addr0, &[]).unwrap()); - - assert!(!spec0.matches(&addr1, &[]).unwrap()); - - // Matching the given address 0 or 1 - let spec1 = EthFilterSpec { - address: Some(vec![eth_addr0, eth_addr1].into()), - ..Default::default() - }; - - assert!(spec1.matches(&addr0, &[]).unwrap()); - - assert!(spec1.matches(&addr1, &[]).unwrap()); - } - - #[test] - fn test_do_match_topic() { - let addr0 = Address::from_str("t410f744ma4xsq3r3eczzktfj7goal67myzfkusna2hy").unwrap(); - - let entries0 = vec![ - Entry::new( - Flags::FLAG_INDEXED_ALL, - "t1".into(), - IPLD_RAW, - vec![ - 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, - 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, - ], - ), - Entry::new( - Flags::FLAG_INDEXED_ALL, - "t2".into(), - IPLD_RAW, - vec![ - 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, - 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, - ], - ), - Entry::new( - Flags::FLAG_INDEXED_ALL, - "d".into(), - IPLD_RAW, - vec![ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, - 254, 169, 229, 74, 6, 24, 52, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 13, 232, 134, 151, 206, 121, 139, 231, 226, 192, - ], - ), - ]; - - let topic0 = - EthHash::from_str("0xe24720f45cb74f2d55f1deebb6098f50f10b511dab8a7d47c4819a08dcd0b895") - .unwrap(); - - let topic1 = - EthHash::from_str("0x7404e3d104ea7841c3d9e6fd20adfe99b4ad586bc08d8f3bd3afef894cf184de") - .unwrap(); - - let topic2 = - EthHash::from_str("0x000000000000000000000000d0fb381fc644cdd5d694d35e1afb445527b9244b") - .unwrap(); - - let topic3 = - EthHash::from_str("0x00000000000000000000000092c3b379c217fdf8603884770e83fded7b7410f8") - .unwrap(); - - let spec1 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::Single(None)])), - ..Default::default() - }; - - assert!(spec1.matches(&addr0, &entries0).unwrap()); - - let spec2 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![ - EthHashList::Single(None), - EthHashList::Single(None), - ])), - ..Default::default() - }; - - assert!(spec2.matches(&addr0, &entries0).unwrap()); - - let spec2 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::Single(Some(topic0))])), - ..Default::default() - }; - - assert!(spec2.matches(&addr0, &entries0).unwrap()); - - let spec3 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::List(vec![topic0])])), - ..Default::default() - }; - - assert!(spec3.matches(&addr0, &entries0).unwrap()); - - let spec4 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::List(vec![topic1, topic0])])), - ..Default::default() - }; - - assert!(spec4.matches(&addr0, &entries0).unwrap()); - - let spec5 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::Single(Some(topic1))])), - ..Default::default() - }; - - assert!(!spec5.matches(&addr0, &entries0).unwrap()); - - let spec6 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![EthHashList::List(vec![topic2, topic3])])), - ..Default::default() - }; - - assert!(!spec6.matches(&addr0, &entries0).unwrap()); - - let spec7 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![ - EthHashList::Single(Some(topic1)), - EthHashList::Single(Some(topic1)), - ])), - ..Default::default() - }; - - assert!(!spec7.matches(&addr0, &entries0).unwrap()); - - let spec8 = EthFilterSpec { - topics: Some(EthTopicSpec(vec![ - EthHashList::Single(Some(topic0)), - EthHashList::Single(Some(topic1)), - EthHashList::Single(Some(topic3)), - ])), - ..Default::default() - }; - - assert!(!spec8.matches(&addr0, &entries0).unwrap()); - } - #[test] fn test_parsed_filter_match_address() { // Note that all the following addresses and topics (base64-encoded strings) come from real data on Calibnet, @@ -1743,12 +1470,6 @@ mod tests { assert!(!filter3.matches(&addr0, &entries0).unwrap()); } - #[test] - fn test_eth_filter_spec_msg_cid_filter_default_none() { - let spec = EthFilterSpec::default(); - assert!(spec.msg_cid_filter().is_none()); - } - #[test] fn test_parsed_filter_msg_cid_filter_returns_field() { let pf_none = ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range(0..=0)); @@ -1794,4 +1515,73 @@ mod tests { assert!(ensure_filter_cap(100, 3, 101).is_err()); } + + #[tokio::test] + async fn test_collect_events_from_messages_sets_revert_status() { + use crate::blocks::{CachingBlockHeader, RawBlockHeader}; + use crate::chain::ChainStore; + use crate::db::MemoryDB; + use crate::message::ChainMessage; + use crate::networks::ChainConfig; + use crate::shim::executor::Receipt; + use crate::shim::message::Message; + + let db = Arc::new(MemoryDB::default()); + let genesis_header = CachingBlockHeader::new(RawBlockHeader { + miner_address: Address::new_id(0), + // A zero genesis timestamp is rejected by the beacon schedule. + timestamp: 7777, + ..Default::default() + }); + let chain_store = + ChainStore::new(db, Arc::new(ChainConfig::default()), genesis_header).unwrap(); + let tipset = chain_store.heaviest_tipset(); + let state_manager = StateManager::new(chain_store).unwrap(); + + let event = StampedEvent::V4(fvm_shared4::event::StampedEvent::new( + 1234, + fvm_shared4::event::ActorEvent { + entries: vec![fvm_shared4::event::Entry { + flags: Flags::FLAG_INDEXED_ALL, + key: "t1".into(), + codec: IPLD_RAW, + value: vec![0xab; 32], + }], + }, + )); + let executed_messages = vec![ExecutedMessage { + message: ChainMessage::Unsigned(Message::default().into()), + receipt: Receipt::V4(fvm_shared4::receipt::Receipt { + exit_code: fvm_shared4::error::ExitCode::OK, + return_data: Default::default(), + gas_used: 0, + events_root: None, + }), + events: Some(vec![event]), + }]; + + for (revert_status, expected_reverted) in [ + (EventRevertStatus::Applied, false), + (EventRevertStatus::Reverted, true), + ] { + let mut events = vec![]; + EthEventHandler::collect_events_from_messages( + &state_manager, + &tipset, + &executed_messages, + None::<&ParsedFilter>, + // The test genesis has no state tree, so the emitter cannot be resolved; + // fall back to its ID address instead of skipping the event. + SkipEvent::Never, + revert_status, + &mut events, + ) + .await + .unwrap(); + assert_eq!(events.len(), 1); + let event = events.first().unwrap(); + assert_eq!(event.reverted, expected_reverted); + assert_eq!(event.emitter_addr, Address::new_id(1234)); + } + } } diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index a1e47b64fe7a..7149a8e9d228 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -60,19 +60,29 @@ //! use crate::blocks::Tipset; +use crate::chain::HeadChanges; use crate::message_pool::MpoolUpdate; use crate::prelude::ShallowClone; use crate::rpc::RPCState; +use crate::rpc::chain::PathChange; +use crate::rpc::eth::filter::EventRevertStatus; use crate::rpc::eth::pubsub_trait::{EthPubSubApiServer, SubscriptionKind, SubscriptionParams}; -use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec}; +use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec, EthHashList, EthTopicSpec}; use crate::rpc::eth::{ - Block as EthBlock, TxInfo, eth_logs_with_filter, eth_tx_hash_from_signed_message, + Block as EthBlock, EthLog, TxInfo, eth_logs_for_head_change, eth_tx_hash_from_signed_message, }; use crate::utils::broadcast::subscription_stream; use futures::{Stream, StreamExt as _}; use jsonrpsee::core::SubscriptionResult; use jsonrpsee::{PendingSubscriptionSink, SubscriptionSink}; use std::sync::Arc; +use tokio::sync::broadcast; + +/// A cap on the number of in-flight per-tipset log batches in the shared logs feed. +const LOGS_FEED_CAP: usize = 256; + +/// Sender half of the shared logs feed; see [`RPCState::eth_logs_feed`]. +pub type LogsFeed = broadcast::Sender>>; #[derive(derive_more::Constructor)] pub struct EthPubSub { @@ -102,8 +112,9 @@ impl EthPubSubApiServer for EthPubSub { } } -/// Stream of "message tipsets", the parent of each newly applied tipset. -/// Reverts are ignored; lagged events are dropped (and logged) by [`subscription_stream`]. +/// Stream of "message tipsets", the parent of each newly applied tipset; only used by the +/// `newHeads` subscription. Reverts are ignored; lagged events are dropped (and logged) by +/// [`subscription_stream`]. fn head_message_tipsets(ctx: &Arc) -> impl Stream + Send + use<> { let rx = ctx.chain_store().subscribe_head_changes(); let ctx = ctx.shallow_clone(); @@ -147,25 +158,95 @@ fn spawn_new_heads(sink: SubscriptionSink, ctx: Arc) { tokio::spawn(pipe_stream_to_sink(stream, sink)); } -fn spawn_logs(sink: SubscriptionSink, ctx: Arc, filter: Option) { - let stream = head_message_tipsets(&ctx) - .filter_map(move |ts| { - let ctx = ctx.shallow_clone(); - let filter = filter.clone(); - async move { - eth_logs_with_filter(&ctx, &ts, filter) - .await - .inspect_err(|e| { - tracing::error!("Failed to fetch logs for tipset {}: {e:#}", ts.key()) - }) - .ok() +fn flatten_head_changes(changes: HeadChanges) -> impl Iterator { + changes + .into_change_vec() + .into_iter() + .map(|change| match change { + PathChange::Revert(tipset) => (tipset, EventRevertStatus::Reverted), + PathChange::Apply(tipset) => (tipset, EventRevertStatus::Applied), + }) +} + +/// Drives the shared logs feed: for every chain head change, collects the Ethereum logs of +/// the affected tipsets — reorg-reverted ones (marked `removed: true`) before applied ones — +/// and broadcasts each tipset's logs to all live `eth_subscribe("logs")` subscriptions. +async fn run_logs_feed(ctx: Arc, feed: broadcast::Sender>>) { + let mut head_changes = subscription_stream(ctx.chain_store().subscribe_head_changes()); + while let Some(changes) = head_changes.next().await { + // Collecting events is not free; skip the work entirely while no subscription is live. + if feed.receiver_count() == 0 { + continue; + } + for (tipset, revert_status) in flatten_head_changes(changes) { + if tipset.epoch() == 0 { + continue; + } + match eth_logs_for_head_change(&ctx, &tipset, revert_status).await { + Ok(logs) if !logs.is_empty() => { + // An error only means every receiver vanished since the check above. + let _ = feed.send(Arc::new(logs)); + } + Ok(_) => {} + Err(e) => { + tracing::error!( + "Failed to collect logs for tipset {} ({revert_status:?}): {e:#}", + tipset.key() + ); + } } + } + } +} + +fn subscribe_logs_feed(ctx: &Arc) -> broadcast::Receiver>> { + ctx.eth_logs_feed + .get_or_init(|| { + let (tx, _) = broadcast::channel(LOGS_FEED_CAP); + tokio::spawn(run_logs_feed(ctx.clone(), tx.clone())); + tx + }) + .subscribe() +} + +fn spawn_logs(sink: SubscriptionSink, ctx: Arc, filter: Option) { + let rx = subscribe_logs_feed(&ctx); + let stream = subscription_stream(rx) + .flat_map(move |logs| { + let matched: Vec = logs + .iter() + .filter(|log| filter.as_ref().is_none_or(|spec| log_matches(spec, log))) + .cloned() + .collect(); + futures::stream::iter(matched) }) - .flat_map(futures::stream::iter) .boxed(); tokio::spawn(pipe_stream_to_sink(stream, sink)); } +/// Standard Ethereum log filtering (go-ethereum's `filterLogs`) over an already-converted +/// log: any address in the list may match, with an absent or empty list acting as a +/// wildcard; topic positions are ANDed across positions and ORed within one, with absent or +/// null positions acting as wildcards. A log with fewer topics than the filter has positions +/// never matches. The filter's block range does not apply to subscriptions. +fn log_matches(spec: &EthFilterSpec, log: &EthLog) -> bool { + let address_matches = spec + .address + .as_ref() + .is_none_or(|addresses| addresses.is_empty() || addresses.contains(&log.address)); + let topics_match = spec.topics.as_ref().is_none_or(|EthTopicSpec(positions)| { + positions.len() <= log.topics.len() + && positions + .iter() + .zip(&log.topics) + .all(|(position, topic)| match position { + EthHashList::List(hashes) => hashes.is_empty() || hashes.contains(topic), + EthHashList::Single(hash) => hash.as_ref().is_none_or(|h| h == topic), + }) + }); + address_matches && topics_match +} + fn spawn_pending_transactions(sink: SubscriptionSink, ctx: Arc) { let mpool_rx = ctx.mpool.subscribe_to_updates(); let eth_chain_id = ctx.chain_config().eth_chain_id; @@ -218,3 +299,192 @@ where } tracing::debug!("Subscription task ended (id: {:?})", sink.subscription_id()); } + +#[cfg(test)] +mod tests { + use super::*; + use crate::blocks::{CachingBlockHeader, RawBlockHeader}; + use crate::rpc::eth::{EthAddress, EthHash}; + use crate::shim::clock::ChainEpoch; + use std::str::FromStr as _; + + fn tipset(epoch: ChainEpoch) -> Tipset { + Tipset::from(&CachingBlockHeader::new(RawBlockHeader { + epoch, + ..Default::default() + })) + } + + #[test] + fn flatten_head_changes_emits_reverts_before_applies() { + // `chain_get_path` produces reverts newest-first and applies oldest-first; the + // flattened order must preserve that and put every revert before any apply. + let changes = HeadChanges { + reverts: vec![tipset(5), tipset(4)], + applies: vec![tipset(14), tipset(15)], + }; + let flattened: Vec<(ChainEpoch, EventRevertStatus)> = flatten_head_changes(changes) + .map(|(ts, status)| (ts.epoch(), status)) + .collect(); + assert_eq!( + flattened, + vec![ + (5, EventRevertStatus::Reverted), + (4, EventRevertStatus::Reverted), + (14, EventRevertStatus::Applied), + (15, EventRevertStatus::Applied), + ] + ); + } + + #[test] + fn flatten_head_changes_plain_apply() { + let changes = HeadChanges { + reverts: vec![], + applies: vec![tipset(7)], + }; + let flattened: Vec<(ChainEpoch, EventRevertStatus)> = flatten_head_changes(changes) + .map(|(ts, status)| (ts.epoch(), status)) + .collect(); + assert_eq!(flattened, vec![(7, EventRevertStatus::Applied)]); + } + + fn eth_log(address: &EthAddress, topics: Vec) -> EthLog { + EthLog { + address: *address, + topics, + ..Default::default() + } + } + + fn address_0() -> EthAddress { + EthAddress::from_str("0xff38c072f286e3b20b3954ca9f99c05fbecc64aa").unwrap() + } + + fn address_1() -> EthAddress { + EthAddress::from_str("0x26937d59db4463254c930d5f31353f14aa89a0f7").unwrap() + } + + fn topic(byte: u8) -> EthHash { + EthHash(ethereum_types::H256::from_slice(&[byte; 32])) + } + + #[test] + fn log_matches_address() { + let log = eth_log(&address_0(), vec![]); + + // Absent and empty address lists are wildcards (Lotus/go-ethereum behavior). + assert!(log_matches(&EthFilterSpec::default(), &log)); + let empty = EthFilterSpec { + address: Some(vec![].into()), + ..Default::default() + }; + assert!(log_matches(&empty, &log)); + + let specific = EthFilterSpec { + address: Some(vec![address_0()].into()), + ..Default::default() + }; + assert!(log_matches(&specific, &log)); + assert!(!log_matches(&specific, ð_log(&address_1(), vec![]))); + + // Any address in the list may match. + let either = EthFilterSpec { + address: Some(vec![address_0(), address_1()].into()), + ..Default::default() + }; + assert!(log_matches(&either, &log)); + assert!(log_matches(&either, ð_log(&address_1(), vec![]))); + } + + #[test] + fn log_matches_topics() { + let log = eth_log(&address_0(), vec![topic(1), topic(2)]); + + let with_topics = |positions: Vec| EthFilterSpec { + topics: Some(EthTopicSpec(positions)), + ..Default::default() + }; + + // Wildcards: null position, empty list position, fewer positions than topics. + assert!(log_matches(&with_topics(vec![]), &log)); + assert!(log_matches( + &with_topics(vec![EthHashList::Single(None)]), + &log + )); + assert!(log_matches( + &with_topics(vec![EthHashList::List(vec![])]), + &log + )); + + // Value in the first position. + assert!(log_matches( + &with_topics(vec![EthHashList::Single(Some(topic(1)))]), + &log + )); + assert!(!log_matches( + &with_topics(vec![EthHashList::Single(Some(topic(2)))]), + &log + )); + + // OR within a position. + assert!(log_matches( + &with_topics(vec![EthHashList::List(vec![topic(9), topic(1)])]), + &log + )); + assert!(!log_matches( + &with_topics(vec![EthHashList::List(vec![topic(8), topic(9)])]), + &log + )); + + // AND across positions. + assert!(log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(Some(topic(2))), + ]), + &log + )); + assert!(!log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(Some(topic(9))), + ]), + &log + )); + + // More filter positions than log topics never match, even with wildcards + // (go-ethereum's `filterLogs` semantics). + assert!(!log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(Some(topic(2))), + EthHashList::Single(None), + ]), + &log + )); + } + + #[test] + fn log_matches_address_and_topics_combined() { + let log = eth_log(&address_0(), vec![topic(1)]); + let spec = EthFilterSpec { + address: Some(vec![address_0()].into()), + topics: Some(EthTopicSpec(vec![EthHashList::Single(Some(topic(1)))])), + ..Default::default() + }; + assert!(log_matches(&spec, &log)); + + let wrong_address = EthFilterSpec { + address: Some(vec![address_1()].into()), + ..spec.clone() + }; + assert!(!log_matches(&wrong_address, &log)); + + let wrong_topic = EthFilterSpec { + topics: Some(EthTopicSpec(vec![EthHashList::Single(Some(topic(9)))])), + ..spec + }; + assert!(!log_matches(&wrong_topic, &log)); + } +} diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 4d6075d69cad..253dc236ac35 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -234,6 +234,7 @@ mod tests { bad_blocks: Some(Default::default()), sync_status: Arc::new(RwLock::new(SyncStatusReport::default())), eth_event_handler: Arc::new(EthEventHandler::new()), + eth_logs_feed: Default::default(), sync_network_context, start_time, shutdown: mpsc::channel(1).0, // dummy for tests diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 44539798a8aa..90d608d7e18c 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -501,6 +501,11 @@ pub struct RPCState { pub bad_blocks: Option, pub sync_status: crate::chain_sync::SyncStatus, pub eth_event_handler: Arc, + /// Broadcast of per-tipset Ethereum logs derived from chain head changes, covering both + /// applied tipsets and reorg-reverted ones (whose logs carry `removed: true`). Started + /// lazily by the first `eth_subscribe("logs")` subscription and shared by all of them, so + /// events are collected and converted once per tipset regardless of the subscriber count. + pub eth_logs_feed: std::sync::OnceLock, pub sync_network_context: SyncNetworkContext, pub tipset_send: flume::Sender, pub start_time: chrono::DateTime, diff --git a/src/state_manager/state_computation.rs b/src/state_manager/state_computation.rs index e05ae4c1aa76..3d9b21ef24cd 100644 --- a/src/state_manager/state_computation.rs +++ b/src/state_manager/state_computation.rs @@ -32,11 +32,9 @@ impl StateManager { } } - /// Load an executed tipset for RPC methods, with state computation unless explicitly enabled. - pub async fn load_executed_tipset_for_rpc( - &self, - ts: &Tipset, - ) -> anyhow::Result { + /// State recomputation policy for RPC methods: recomputation is disabled unless explicitly + /// enabled via the environment. + fn rpc_state_recompute_policy() -> StateRecomputePolicy { crate::def_is_env_truthy!( enable_state_computation, "FOREST_ETH_RPC_COMPUTE_STATE_ON_INDEX_MISS" @@ -47,7 +45,44 @@ impl StateManager { StateRecomputePolicy::Disallowed }; - self.load_executed_tipset_with_cache(ts, policy).await + // https://github.com/ChainSafe/forest/issues/7118 + #[cfg(test)] + let policy = { + _ = policy; + StateRecomputePolicy::Allowed + }; + + policy + } + + /// Load an executed tipset for RPC methods, with state computation unless explicitly enabled. + pub async fn load_executed_tipset_for_rpc( + &self, + ts: &Tipset, + ) -> anyhow::Result { + self.load_executed_tipset_with_cache(ts, Self::rpc_state_recompute_policy()) + .await + } + + /// Load an executed tipset using an explicitly provided receipt (child) tipset instead of + /// resolving the child on the current heaviest chain. This is required when serving events + /// for tipsets that are no longer canonical — e.g. the divergent segment of a reorg — where + /// a canonical-chain lookup would find no child, or a different one. + pub async fn load_executed_tipset_with_receipt( + &self, + msg_ts: &Tipset, + receipt_ts: &Tipset, + ) -> anyhow::Result { + self.cache + .get_or_insert_async(msg_ts.key(), async move { + self.load_executed_tipset_inner( + msg_ts, + Some(receipt_ts), + Self::rpc_state_recompute_policy(), + ) + .await + }) + .await } /// Load an executed tipset, including state root, message receipts and events with caching. diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 11c5c9262e74..fdbaf1fbbe18 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -104,6 +104,7 @@ pub async fn offline_rpc_state( bad_blocks: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), + eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), shutdown, diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 0e439b7fd5ec..0278d7a44b06 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -143,6 +143,7 @@ async fn ctx( bad_blocks: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::new()), + eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), shutdown, diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 4963fce3f8f1..840a8bb8b094 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -172,6 +172,7 @@ async fn ctx( bad_blocks: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::new()), + eth_logs_feed: Default::default(), sync_network_context, start_time: chrono::Utc::now(), shutdown, From 659d5398f6752f44b9100450f3fe37f2c3e3939b Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Thu, 11 Jun 2026 21:17:43 +0530 Subject: [PATCH 2/7] update the changelog entry --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89a0b17906d8..6909a2ed61e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,10 +37,14 @@ - [#7164](https://github.com/ChainSafe/forest/issues/7164): JSON-RPC authentication is now performed once per connection (e.g. at the WebSocket upgrade) instead of on every request, matching Lotus. Note that token expiry is no longer re-checked for the lifetime of an established connection. +- [#7096](https://github.com/ChainSafe/forest/issues/7096): `eth_subscribe` `logs` filters now match with go-ethereum's semantics: the event data payload no longer participates in topic matching, empty topic positions act as wildcards, and logs with fewer topics than the filter has positions never match. + ### Removed ### Fixed +- [#7096](https://github.com/ChainSafe/forest/issues/7096): `eth_subscribe` `logs` now re-emits the logs of reorg-reverted tipsets with `removed: true`, ahead of the logs of the replacing tipsets. + - [#7129](https://github.com/ChainSafe/forest/pull/7129): Fixed a few inaccurate cache size metrics. - [#6974](https://github.com/ChainSafe/forest/issues/6974): Fixed the message pool reporting a still-pending nonce as the next nonce after an applied message was removed. From 16ef4beb29ca9c16d065da8709c34859281e9abe Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Wed, 17 Jun 2026 21:51:03 +0530 Subject: [PATCH 3/7] cleanup --- src/rpc/methods/chain.rs | 8 ++++ src/rpc/methods/eth.rs | 22 ++++++---- src/rpc/methods/eth/pubsub.rs | 77 ++++------------------------------- src/rpc/mod.rs | 4 -- 4 files changed, 31 insertions(+), 80 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 151885df1a00..d1d3d492c425 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -1551,6 +1551,14 @@ impl Clone for PathChange { } } +impl PathChange { + pub fn tipset(&self) -> &T { + match self { + Self::Revert(ts) | Self::Apply(ts) => ts, + } + } +} + impl HasLotusJson for PathChange { type LotusJson = PathChange<::LotusJson>; diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index d04a108c0ab7..a01e66f5cee2 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -41,7 +41,7 @@ use crate::rpc::{ }, utils::decode_revert_reason, }, - methods::chain::ChainGetTipSetV2, + methods::chain::{ChainGetTipSetV2, PathChange}, state::ApiInvocResult, types::{ApiTipsetKey, EventEntry, MessageLookup}, }; @@ -1413,15 +1413,23 @@ pub async fn eth_logs_for_block_and_transaction( eth_filter_logs_from_events(ctx, &events) } -/// Collects the Ethereum logs of the message tipset whose receipts live in `receipt_ts` -/// (its child), marking them as removed when the head change that surfaced them was a reorg -/// revert. The receipt tipset is passed explicitly because reverted tipsets can no longer be -/// resolved through the canonical chain. +/// Collects the Ethereum logs produced by a single chain head change, for the logs +/// subscription. The change's tipset is the *receipt* tipset (the applied or reverted tipset +/// itself); its parent is the message tipset whose events those receipts carry. A reverting +/// change marks the logs `removed: true`. The receipt tipset is taken from the change rather +/// than resolved through the canonical chain, because a reverted tipset is no longer on it. pub(in crate::rpc) async fn eth_logs_for_head_change( ctx: &Ctx, - receipt_ts: &Tipset, - revert_status: EventRevertStatus, + change: &PathChange, ) -> anyhow::Result> { + let (receipt_ts, revert_status) = match change { + PathChange::Revert(ts) => (ts, EventRevertStatus::Reverted), + PathChange::Apply(ts) => (ts, EventRevertStatus::Applied), + }; + // Genesis carries no events and has no parent message tipset to load. + if receipt_ts.epoch() == 0 { + return Ok(vec![]); + } let msg_ts = ctx .chain_index() .load_required_tipset(receipt_ts.parents())?; diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index 7149a8e9d228..baedb42ffb07 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -60,12 +60,9 @@ //! use crate::blocks::Tipset; -use crate::chain::HeadChanges; use crate::message_pool::MpoolUpdate; use crate::prelude::ShallowClone; use crate::rpc::RPCState; -use crate::rpc::chain::PathChange; -use crate::rpc::eth::filter::EventRevertStatus; use crate::rpc::eth::pubsub_trait::{EthPubSubApiServer, SubscriptionKind, SubscriptionParams}; use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec, EthHashList, EthTopicSpec}; use crate::rpc::eth::{ @@ -112,9 +109,8 @@ impl EthPubSubApiServer for EthPubSub { } } -/// Stream of "message tipsets", the parent of each newly applied tipset; only used by the -/// `newHeads` subscription. Reverts are ignored; lagged events are dropped (and logged) by -/// [`subscription_stream`]. +/// Stream of "message tipsets", the parent of each newly applied tipset. +/// Reverts are ignored; lagged events are dropped (and logged) by [`subscription_stream`]. fn head_message_tipsets(ctx: &Arc) -> impl Stream + Send + use<> { let rx = ctx.chain_store().subscribe_head_changes(); let ctx = ctx.shallow_clone(); @@ -158,19 +154,7 @@ fn spawn_new_heads(sink: SubscriptionSink, ctx: Arc) { tokio::spawn(pipe_stream_to_sink(stream, sink)); } -fn flatten_head_changes(changes: HeadChanges) -> impl Iterator { - changes - .into_change_vec() - .into_iter() - .map(|change| match change { - PathChange::Revert(tipset) => (tipset, EventRevertStatus::Reverted), - PathChange::Apply(tipset) => (tipset, EventRevertStatus::Applied), - }) -} - -/// Drives the shared logs feed: for every chain head change, collects the Ethereum logs of -/// the affected tipsets — reorg-reverted ones (marked `removed: true`) before applied ones — -/// and broadcasts each tipset's logs to all live `eth_subscribe("logs")` subscriptions. +/// Drives the shared logs feed for every chain head change, collects the Ethereum logs of the affected tipsets async fn run_logs_feed(ctx: Arc, feed: broadcast::Sender>>) { let mut head_changes = subscription_stream(ctx.chain_store().subscribe_head_changes()); while let Some(changes) = head_changes.next().await { @@ -178,11 +162,8 @@ async fn run_logs_feed(ctx: Arc, feed: broadcast::Sender { // An error only means every receiver vanished since the check above. let _ = feed.send(Arc::new(logs)); @@ -190,8 +171,8 @@ async fn run_logs_feed(ctx: Arc, feed: broadcast::Sender {} Err(e) => { tracing::error!( - "Failed to collect logs for tipset {} ({revert_status:?}): {e:#}", - tipset.key() + "Failed to collect logs for head change {}: {e:#}", + change.tipset().key() ); } } @@ -199,6 +180,7 @@ async fn run_logs_feed(ctx: Arc, feed: broadcast::Sender) -> broadcast::Receiver>> { ctx.eth_logs_feed .get_or_init(|| { @@ -303,52 +285,9 @@ where #[cfg(test)] mod tests { use super::*; - use crate::blocks::{CachingBlockHeader, RawBlockHeader}; use crate::rpc::eth::{EthAddress, EthHash}; - use crate::shim::clock::ChainEpoch; use std::str::FromStr as _; - fn tipset(epoch: ChainEpoch) -> Tipset { - Tipset::from(&CachingBlockHeader::new(RawBlockHeader { - epoch, - ..Default::default() - })) - } - - #[test] - fn flatten_head_changes_emits_reverts_before_applies() { - // `chain_get_path` produces reverts newest-first and applies oldest-first; the - // flattened order must preserve that and put every revert before any apply. - let changes = HeadChanges { - reverts: vec![tipset(5), tipset(4)], - applies: vec![tipset(14), tipset(15)], - }; - let flattened: Vec<(ChainEpoch, EventRevertStatus)> = flatten_head_changes(changes) - .map(|(ts, status)| (ts.epoch(), status)) - .collect(); - assert_eq!( - flattened, - vec![ - (5, EventRevertStatus::Reverted), - (4, EventRevertStatus::Reverted), - (14, EventRevertStatus::Applied), - (15, EventRevertStatus::Applied), - ] - ); - } - - #[test] - fn flatten_head_changes_plain_apply() { - let changes = HeadChanges { - reverts: vec![], - applies: vec![tipset(7)], - }; - let flattened: Vec<(ChainEpoch, EventRevertStatus)> = flatten_head_changes(changes) - .map(|(ts, status)| (ts.epoch(), status)) - .collect(); - assert_eq!(flattened, vec![(7, EventRevertStatus::Applied)]); - } - fn eth_log(address: &EthAddress, topics: Vec) -> EthLog { EthLog { address: *address, diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 90d608d7e18c..f64abfd8a71f 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -501,10 +501,6 @@ pub struct RPCState { pub bad_blocks: Option, pub sync_status: crate::chain_sync::SyncStatus, pub eth_event_handler: Arc, - /// Broadcast of per-tipset Ethereum logs derived from chain head changes, covering both - /// applied tipsets and reorg-reverted ones (whose logs carry `removed: true`). Started - /// lazily by the first `eth_subscribe("logs")` subscription and shared by all of them, so - /// events are collected and converted once per tipset regardless of the subscriber count. pub eth_logs_feed: std::sync::OnceLock, pub sync_network_context: SyncNetworkContext, pub tipset_send: flume::Sender, From 92c885e87c9263f2f9f1382c9d9458c255d8af65 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Thu, 18 Jun 2026 17:29:37 +0530 Subject: [PATCH 4/7] refactor and add more tests --- CHANGELOG.md | 2 - src/rpc/methods/eth.rs | 14 +- src/rpc/methods/eth/filter/mod.rs | 3 - src/rpc/methods/eth/pubsub.rs | 106 ++++-- src/state_manager/state_computation.rs | 3 +- .../subcommands/api_cmd/stateful_tests.rs | 314 +++++++++++++++--- 6 files changed, 356 insertions(+), 86 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6909a2ed61e6..5f44c0022a70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,8 +37,6 @@ - [#7164](https://github.com/ChainSafe/forest/issues/7164): JSON-RPC authentication is now performed once per connection (e.g. at the WebSocket upgrade) instead of on every request, matching Lotus. Note that token expiry is no longer re-checked for the lifetime of an established connection. -- [#7096](https://github.com/ChainSafe/forest/issues/7096): `eth_subscribe` `logs` filters now match with go-ethereum's semantics: the event data payload no longer participates in topic matching, empty topic positions act as wildcards, and logs with fewer topics than the filter has positions never match. - ### Removed ### Fixed diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index a01e66f5cee2..3f53677a1276 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -1413,11 +1413,8 @@ pub async fn eth_logs_for_block_and_transaction( eth_filter_logs_from_events(ctx, &events) } -/// Collects the Ethereum logs produced by a single chain head change, for the logs -/// subscription. The change's tipset is the *receipt* tipset (the applied or reverted tipset -/// itself); its parent is the message tipset whose events those receipts carry. A reverting -/// change marks the logs `removed: true`. The receipt tipset is taken from the change rather -/// than resolved through the canonical chain, because a reverted tipset is no longer on it. +/// Collects the logs produced by a single chain head change, for the logs +/// subscription. pub(in crate::rpc) async fn eth_logs_for_head_change( ctx: &Ctx, change: &PathChange, @@ -3328,8 +3325,7 @@ impl RpcMethod<1> for EthGetLogs { /// Shared implementation of `eth_getFilterLogs` / `eth_getFilterChanges` for installed event /// filters: collects the filter's full result set from the canonical chain, returns only the -/// events that were not present in the previous poll, and stores the latest set as the new -/// baseline. +/// events that were not present in the previous poll. async fn poll_event_filter( ctx: &Ctx, event_filter: &EventFilter, @@ -3342,10 +3338,6 @@ async fn poll_event_filter( SkipEvent::OnUnresolvedAddress, ) .await?; - // An event's position identifies it uniquely, so the poll baseline stores positions - // instead of whole `CollectedEvent`s and the filter does not pin entry payloads between - // polls. A re-orged duplicate lands under a different tipset key and is correctly - // reported again. let mut seen_positions = SeenEventPositions::default(); let mut recent_events = Vec::new(); for event in events { diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index 721ea52b8ac9..da2dd8e23f12 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -129,9 +129,6 @@ pub enum SkipEvent { Never, } -/// Whether the events being collected belong to a tipset that was applied to the canonical -/// chain, or to one that was reverted from it by a reorg. Reverted events surface as -/// `removed: true` in the Ethereum log APIs. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum EventRevertStatus { Applied, diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index baedb42ffb07..37d505da6726 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -66,7 +66,8 @@ use crate::rpc::RPCState; use crate::rpc::eth::pubsub_trait::{EthPubSubApiServer, SubscriptionKind, SubscriptionParams}; use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec, EthHashList, EthTopicSpec}; use crate::rpc::eth::{ - Block as EthBlock, EthLog, TxInfo, eth_logs_for_head_change, eth_tx_hash_from_signed_message, + Block as EthBlock, EthHash, EthLog, TxInfo, eth_logs_for_head_change, + eth_tx_hash_from_signed_message, }; use crate::utils::broadcast::subscription_stream; use futures::{Stream, StreamExt as _}; @@ -155,7 +156,7 @@ fn spawn_new_heads(sink: SubscriptionSink, ctx: Arc) { } /// Drives the shared logs feed for every chain head change, collects the Ethereum logs of the affected tipsets -async fn run_logs_feed(ctx: Arc, feed: broadcast::Sender>>) { +async fn run_logs_feed(ctx: Arc, feed: LogsFeed) { let mut head_changes = subscription_stream(ctx.chain_store().subscribe_head_changes()); while let Some(changes) = head_changes.next().await { // Collecting events is not free; skip the work entirely while no subscription is live. @@ -164,11 +165,11 @@ async fn run_logs_feed(ctx: Arc, feed: broadcast::Sender { - // An error only means every receiver vanished since the check above. - let _ = feed.send(Arc::new(logs)); + Ok(logs) => { + if !logs.is_empty() { + let _ = feed.send(Arc::new(logs)); + } } - Ok(_) => {} Err(e) => { tracing::error!( "Failed to collect logs for head change {}: {e:#}", @@ -206,27 +207,37 @@ fn spawn_logs(sink: SubscriptionSink, ctx: Arc, filter: Option bool { - let address_matches = spec + let address_ok = spec .address .as_ref() - .is_none_or(|addresses| addresses.is_empty() || addresses.contains(&log.address)); - let topics_match = spec.topics.as_ref().is_none_or(|EthTopicSpec(positions)| { - positions.len() <= log.topics.len() - && positions - .iter() - .zip(&log.topics) - .all(|(position, topic)| match position { - EthHashList::List(hashes) => hashes.is_empty() || hashes.contains(topic), - EthHashList::Single(hash) => hash.as_ref().is_none_or(|h| h == topic), - }) + .is_none_or(|list| list.is_empty() || list.contains(&log.address)); + + let topics_ok = spec.topics.as_ref().is_none_or(|EthTopicSpec(positions)| { + positions.iter().enumerate().all(|(i, position)| { + // A position is a set of accepted hashes; an empty set is a wildcard. + let accepted: &[EthHash] = match position { + EthHashList::List(hashes) => hashes, + EthHashList::Single(Some(hash)) => std::slice::from_ref(hash), + EthHashList::Single(None) => &[], + }; + accepted.is_empty() + || log + .topics + .get(i) + .is_some_and(|topic| accepted.contains(topic)) + }) }); - address_matches && topics_match + + address_ok && topics_ok } fn spawn_pending_transactions(sink: SubscriptionSink, ctx: Arc) { @@ -392,13 +403,58 @@ mod tests { &log )); - // More filter positions than log topics never match, even with wildcards - // (go-ethereum's `filterLogs` semantics). + // A trailing wildcard position imposes no constraint, even past the log's topics — + // matching Anvil (reth), Lotus, and Forest's eth_getLogs. + assert!(log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(Some(topic(2))), + EthHashList::Single(None), + ]), + &log + )); + } + + #[test] + fn log_matches_trailing_wildcard_past_topics() { + // A log with a single topic, e.g. a no-indexed-arg event: topics = [signature]. + // These assertions mirror the empirically-confirmed Anvil (reth) behaviour. + let log = eth_log(&address_0(), vec![topic(1)]); + let with_topics = |positions: Vec| EthFilterSpec { + topics: Some(EthTopicSpec(positions)), + ..Default::default() + }; + + // [sig, null]: trailing wildcard does not require a second topic -> matches. + assert!(log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::Single(None), + ]), + &log + )); + // [sig, []]: an empty list is also a wildcard -> matches. + assert!(log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), + EthHashList::List(vec![]), + ]), + &log + )); + // [sig, value]: a constrained second position with no topic to match -> no match. assert!(!log_matches( &with_topics(vec![ EthHashList::Single(Some(topic(1))), EthHashList::Single(Some(topic(2))), + ]), + &log + )); + // Many trailing wildcards are still ignored. + assert!(log_matches( + &with_topics(vec![ + EthHashList::Single(Some(topic(1))), EthHashList::Single(None), + EthHashList::List(vec![]), ]), &log )); diff --git a/src/state_manager/state_computation.rs b/src/state_manager/state_computation.rs index 3d9b21ef24cd..c587798875a4 100644 --- a/src/state_manager/state_computation.rs +++ b/src/state_manager/state_computation.rs @@ -66,8 +66,7 @@ impl StateManager { /// Load an executed tipset using an explicitly provided receipt (child) tipset instead of /// resolving the child on the current heaviest chain. This is required when serving events - /// for tipsets that are no longer canonical — e.g. the divergent segment of a reorg — where - /// a canonical-chain lookup would find no child, or a different one. + /// for tipsets that are no longer canonical. pub async fn load_executed_tipset_with_receipt( &self, msg_ts: &Tipset, diff --git a/src/tool/subcommands/api_cmd/stateful_tests.rs b/src/tool/subcommands/api_cmd/stateful_tests.rs index 8df22c542646..648d946b758d 100644 --- a/src/tool/subcommands/api_cmd/stateful_tests.rs +++ b/src/tool/subcommands/api_cmd/stateful_tests.rs @@ -18,6 +18,7 @@ use serde_json::json; use tokio::time::Duration; use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage}; +use std::collections::BTreeSet; use std::io::{self, Write}; use std::pin::Pin; use std::sync::Arc; @@ -567,67 +568,293 @@ fn eth_subscribe_pending_transactions(tx: TestTransaction) -> RpcTestScenario { }) } -/// Minimal typed view of an `EthLog` for the fields the log test asserts on. #[derive(serde::Deserialize)] #[serde(rename_all = "camelCase")] -struct LogView { +struct EthLogView { + address: EthAddress, topics: Vec, + removed: bool, + log_index: EthUint64, transaction_hash: EthHash, } +fn logs_from_result(result: EthFilterResult) -> anyhow::Result> { + let logs = match as_logs(result) { + EthFilterResult::Logs(logs) => logs, + EthFilterResult::Hashes(_) => anyhow::bail!("expected logs but got block/tx hashes"), + }; + logs.into_iter() + .map(|log| { + serde_json::from_value(serde_json::to_value(log)?) + .context("failed to decode EthLog into a view") + }) + .collect() +} + +async fn get_logs_for_tx( + client: &rpc::Client, + spec: EthFilterSpec, + tx_hash: &EthHash, +) -> anyhow::Result> { + let result = client.call(EthGetLogs::request((spec,))?).await?; + let mut logs: Vec = logs_from_result(result)? + .into_iter() + .filter(|log| &log.transaction_hash == tx_hash) + .collect(); + logs.sort_by_key(|log| log.log_index.0); + Ok(logs) +} + +/// Drain a `logs` subscription, collecting the logs produced by `tx`. +async fn collect_subscription_logs( + ws: &mut EthSubStream, + subscription_id: &serde_json::Value, + tx_hash: &EthHash, + want: usize, + budget: Duration, +) -> anyhow::Result> { + let deadline = std::time::Instant::now() + budget; + let mut logs = Vec::new(); + let mut seen = BTreeSet::new(); + while seen.len() < want { + let remaining = deadline.saturating_duration_since(std::time::Instant::now()); + if remaining.is_zero() { + break; + } + match next_subscription_payload(ws, subscription_id, remaining).await { + Ok(payload) => { + ensure!( + payload.is_object(), + "a logs notification must be a single log object, got: {payload}" + ); + let log: EthLogView = + serde_json::from_value(payload).context("subscription log payload")?; + if &log.transaction_hash == tx_hash && seen.insert(log.log_index.0) { + logs.push(log); + } + } + // Timed out waiting for the next notification (or the stream ended): done draining. + Err(_) => break, + } + } + Ok(logs) +} + fn eth_subscribe_logs(tx: TestTransaction) -> RpcTestScenario { RpcTestScenario::basic(move |client| { let tx = tx.clone(); async move { - let filter = json!({ - "address": [], - "topics": [tx.topic.to_string()], - }); - let (mut ws_stream, subscription_id) = - open_eth_subscription(&client, SubscriptionKind::Logs, Some(filter)).await?; + let contract = EthAddress::from_filecoin_address(&tx.to)?; + let wrong_addr = EthAddress(ethereum_types::Address::from_slice(&[0xde; 20])); + let absent = EthHash(ethereum_types::H256::from_slice(&[0xcd; 32])); + ensure!( + wrong_addr != contract && absent != tx.topic, + "sentinels must differ from the contract/topic under test" + ); - // Emit the event on-chain and remember the exact tx that produced it, - // so we can confirm the log we receive is `ours` - let cid = invoke_contract(&client, &tx).await?; - let tx_hash = client - .call(EthGetTransactionHashByCid::request((cid,))?) - .await? - .context("no Eth transaction hash for CID")?; + // Filters are fixed at subscription time (subscriptions only see future events). + let spec = |topics: Option, address: EthAddress| EthFilterSpec { + address: Some(vec![address].into()), + topics, + ..Default::default() + }; + let json_of = |s: &EthFilterSpec| -> anyhow::Result { + Ok(serde_json::to_value(s)?) + }; + let topic_at = |positions: Vec| Some(EthTopicSpec(positions)); + + // (contract, no topics) -> all our logs; doubles as the delivery barrier. + let filter_all = spec(None, contract); + // (wrong address) -> nothing. + let filter_wrong = spec(None, wrong_addr); + // (contract, topic) -> the logs whose signature is topic. + let filter_topic = spec( + topic_at(vec![EthHashList::Single(Some(tx.topic))]), + contract, + ); + // (contract, [topic, null]) -> a trailing wildcard must change nothing. + let filter_wild = spec( + topic_at(vec![ + EthHashList::Single(Some(tx.topic)), + EthHashList::Single(None), + ]), + contract, + ); + // (contract, [topic, absent]) -> a constrained trailing position must exclude. + let filter_constrained = spec( + topic_at(vec![ + EthHashList::Single(Some(tx.topic)), + EthHashList::Single(Some(absent)), + ]), + contract, + ); - // Logs are delivered when the tipset holding the event is applied, - // which can take a few epochs (~30s each) on calibnet - let watch = async { - loop { - let payload = next_subscription_payload( - &mut ws_stream, - &subscription_id, - Duration::from_secs(300), + // Open every subscription BEFORE emitting. + let logs = SubscriptionKind::Logs; + let (mut ws_all, id_all) = + open_eth_subscription(&client, logs.clone(), Some(json_of(&filter_all)?)).await?; + let (mut ws_wrong, id_wrong) = + open_eth_subscription(&client, logs.clone(), Some(json_of(&filter_wrong)?)).await?; + let (mut ws_topic, id_topic) = + open_eth_subscription(&client, logs.clone(), Some(json_of(&filter_topic)?)).await?; + let (mut ws_wild, id_wild) = + open_eth_subscription(&client, logs.clone(), Some(json_of(&filter_wild)?)).await?; + let (mut ws_constrained, id_constrained) = + open_eth_subscription(&client, logs, Some(json_of(&filter_constrained)?)).await?; + + let outcome = async { + // Emit the events. + let head = client.call(ChainHead::request(())?).await?; + let cid = invoke_contract(&client, &tx).await?; + let lookup = client + .call( + StateWaitMsg::request((cid, 1, head.epoch(), true))? + .with_timeout(Duration::from_secs(300)), ) .await?; - // A logs notification is a single log object (one per log, - // matching geth/reth/Lotus) — not an array. - anyhow::ensure!( - payload.is_object(), - "logs must yield a single log object, got: {payload}" - ); - let log: LogView = serde_json::from_value(payload) - .context("logs payload is not an Eth log")?; - // Identity: the log must carry our event topic and `our` tx hash. - if log.transaction_hash == tx_hash && log.topics.contains(&tx.topic) { + let tx_hash = client + .call(EthGetTransactionHashByCid::request((cid,))?) + .await? + .context("no Eth transaction hash for CID")?; + let height = (lookup.height as u64).max(1); + + let probe = EthFilterSpec { + from_block: Some(EthUint64(height.saturating_sub(2)).to_hex_string()), + to_block: Some(EthUint64(height + 2).to_hex_string()), + address: Some(vec![contract].into()), + ..Default::default() + }; + let mut ours = Vec::new(); + for _ in 0..30 { + if let Ok(logs) = get_logs_for_tx(&client, probe.clone(), &tx_hash).await + && !logs.is_empty() + { + ours = logs; break; } + tokio::time::sleep(Duration::from_secs(2)).await; } + ensure!(!ours.is_empty(), "no logs discovered for tx {tx_hash:?}"); + + let all_idx: BTreeSet = ours.iter().map(|l| l.log_index.0).collect(); + let topic_idx: BTreeSet = ours + .iter() + .filter(|l| l.topics.first() == Some(&tx.topic)) + .map(|l| l.log_index.0) + .collect(); + ensure!( + !topic_idx.is_empty(), + "--topic must be the signature (topic[0]) of an emitted event" + ); + + let idx_of = |logs: &[EthLogView]| -> BTreeSet { + logs.iter().map(|l| l.log_index.0).collect() + }; + let shape_ok = |logs: &[EthLogView]| -> anyhow::Result<()> { + for log in logs { + ensure!( + !log.removed, + "an applied subscription log must have removed=false" + ); + ensure!( + log.address == contract, + "subscription log address must be the contract" + ); + } + Ok(()) + }; + + // Barrier: the contract-scoped subscription must receive every log we emitted. + let got_all = collect_subscription_logs( + &mut ws_all, + &id_all, + &tx_hash, + ours.len(), + Duration::from_secs(300), + ) + .await?; + shape_ok(&got_all)?; + ensure!( + idx_of(&got_all) == all_idx, + "address=contract: expected {all_idx:?}, got {:?}", + idx_of(&got_all) + ); + + // Topic match, and the trailing-wildcard form which must behave identically. + let got_topic = collect_subscription_logs( + &mut ws_topic, + &id_topic, + &tx_hash, + topic_idx.len(), + Duration::from_secs(60), + ) + .await?; + shape_ok(&got_topic)?; + ensure!( + idx_of(&got_topic) == topic_idx, + "topic match: expected {topic_idx:?}, got {:?}", + idx_of(&got_topic) + ); + + let got_wild = collect_subscription_logs( + &mut ws_wild, + &id_wild, + &tx_hash, + topic_idx.len(), + Duration::from_secs(60), + ) + .await?; + shape_ok(&got_wild)?; + ensure!( + idx_of(&got_wild) == topic_idx, + "trailing wildcard [topic, null]: expected {topic_idx:?}, got {:?}", + idx_of(&got_wild) + ); + + let (got_wrong, got_constrained) = tokio::join!( + collect_subscription_logs( + &mut ws_wrong, + &id_wrong, + &tx_hash, + usize::MAX, + Duration::from_secs(20), + ), + collect_subscription_logs( + &mut ws_constrained, + &id_constrained, + &tx_hash, + usize::MAX, + Duration::from_secs(20), + ), + ); + let got_wrong = got_wrong?; + let got_constrained = got_constrained?; + ensure!( + got_wrong.is_empty(), + "address non-matching should receive no logs, got {:?}", + idx_of(&got_wrong) + ); + ensure!( + got_constrained.is_empty(), + "constrained trailing position should receive no logs, got {:?}", + idx_of(&got_constrained) + ); + anyhow::Ok(()) - }; - let outcome = tokio::time::timeout(Duration::from_secs(300), watch) - .await - .unwrap_or_else(|_| { - Err(anyhow::anyhow!( - "timed out waiting for our logs notification" - )) - }); + } + .await; + + // Clean up every subscription. + for (ws, id) in [ + (&mut ws_all, &id_all), + (&mut ws_wrong, &id_wrong), + (&mut ws_topic, &id_topic), + (&mut ws_wild, &id_wild), + (&mut ws_constrained, &id_constrained), + ] { + let _ = close_eth_subscription(ws, id).await; + } - let _ = close_eth_subscription(&mut ws_stream, &subscription_id).await; outcome } }) @@ -959,9 +1186,10 @@ pub(super) async fn create_tests(tx: TestTransaction) -> Vec { EthGetTransactionHashByCid ), with_methods!( - eth_subscribe_logs(tx.clone()).name("eth_subscribe logs works"), + eth_subscribe_logs(tx.clone()).name("eth_subscribe logs filter matrix"), EthSubscribe, EthUnsubscribe, + EthGetLogs, EthGetTransactionHashByCid ), ] From c6fa1896a78af6b3239bc8585a081724112002d1 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Thu, 18 Jun 2026 21:53:46 +0530 Subject: [PATCH 5/7] fix ci test failure --- .../subcommands/api_cmd/stateful_tests.rs | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/tool/subcommands/api_cmd/stateful_tests.rs b/src/tool/subcommands/api_cmd/stateful_tests.rs index 648d946b758d..0ac7fe17cdb1 100644 --- a/src/tool/subcommands/api_cmd/stateful_tests.rs +++ b/src/tool/subcommands/api_cmd/stateful_tests.rs @@ -717,24 +717,29 @@ fn eth_subscribe_logs(tx: TestTransaction) -> RpcTestScenario { .await? .context("no Eth transaction hash for CID")?; let height = (lookup.height as u64).max(1); - let probe = EthFilterSpec { from_block: Some(EthUint64(height.saturating_sub(2)).to_hex_string()), - to_block: Some(EthUint64(height + 2).to_hex_string()), + to_block: None, address: Some(vec![contract].into()), ..Default::default() }; let mut ours = Vec::new(); + let mut last_err = None; for _ in 0..30 { - if let Ok(logs) = get_logs_for_tx(&client, probe.clone(), &tx_hash).await - && !logs.is_empty() - { - ours = logs; - break; + match get_logs_for_tx(&client, probe.clone(), &tx_hash).await { + Ok(logs) if !logs.is_empty() => { + ours = logs; + break; + } + Ok(_) => {} + Err(e) => last_err = Some(e), } tokio::time::sleep(Duration::from_secs(2)).await; } - ensure!(!ours.is_empty(), "no logs discovered for tx {tx_hash:?}"); + ensure!( + !ours.is_empty(), + "no logs discovered for tx {tx_hash:?} near height {height} (last error: {last_err:?})" + ); let all_idx: BTreeSet = ours.iter().map(|l| l.log_index.0).collect(); let topic_idx: BTreeSet = ours From 9a4b5134aa9be0ef38807d731ceef3e3ae3051a6 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Fri, 19 Jun 2026 15:25:17 +0530 Subject: [PATCH 6/7] merge cleanup --- src/state_manager/state_computation.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/state_manager/state_computation.rs b/src/state_manager/state_computation.rs index c587798875a4..caf128b24f5d 100644 --- a/src/state_manager/state_computation.rs +++ b/src/state_manager/state_computation.rs @@ -45,13 +45,6 @@ impl StateManager { StateRecomputePolicy::Disallowed }; - // https://github.com/ChainSafe/forest/issues/7118 - #[cfg(test)] - let policy = { - _ = policy; - StateRecomputePolicy::Allowed - }; - policy } From 627bda79258209aa5812c27169f92a8769a25056 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Fri, 19 Jun 2026 15:51:16 +0530 Subject: [PATCH 7/7] fix linter issue --- src/state_manager/state_computation.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/state_manager/state_computation.rs b/src/state_manager/state_computation.rs index caf128b24f5d..7c9f8fee9054 100644 --- a/src/state_manager/state_computation.rs +++ b/src/state_manager/state_computation.rs @@ -39,13 +39,11 @@ impl StateManager { enable_state_computation, "FOREST_ETH_RPC_COMPUTE_STATE_ON_INDEX_MISS" ); - let policy = if enable_state_computation() { + if enable_state_computation() { StateRecomputePolicy::Allowed } else { StateRecomputePolicy::Disallowed - }; - - policy + } } /// Load an executed tipset for RPC methods, with state computation unless explicitly enabled.