Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

### Fixed

- [#7096](https://github.com/ChainSafe/forest/issues/7096): `eth_subscribe` `logs` now re-emits the logs of reorg-reverted tipsets with `removed: true`, ahead of the logs of the replacing tipsets.

- [#7129](https://github.com/ChainSafe/forest/pull/7129): Fixed a few inaccurate cache size metrics.

- [#6974](https://github.com/ChainSafe/forest/issues/6974): Fixed the message pool reporting a still-pending nonce as the next nonce after an applied message was removed.
Expand Down
1 change: 1 addition & 0 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ fn maybe_start_rpc_service(
bad_blocks,
sync_status,
eth_event_handler,
eth_logs_feed: Default::default(),
sync_network_context,
start_time,
shutdown,
Expand Down
8 changes: 8 additions & 0 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,14 @@ impl<T: Clone> Clone for PathChange<T> {
}
}

impl<T> PathChange<T> {
pub fn tipset(&self) -> &T {
match self {
Self::Revert(ts) | Self::Apply(ts) => ts,
}
}
}

impl HasLotusJson for PathChange {
type LotusJson = PathChange<<Tipset as HasLotusJson>::LotusJson>;

Expand Down
136 changes: 84 additions & 52 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ use crate::rpc::{
error::ServerError,
eth::{
errors::EthErrors,
filter::{SkipEvent, event::EventFilter, mempool::MempoolFilter, tipset::TipSetFilter},
filter::{
EventRevertStatus, SkipEvent, event::EventFilter, mempool::MempoolFilter,
tipset::TipSetFilter,
},
utils::decode_revert_reason,
},
methods::chain::ChainGetTipSetV2,
methods::chain::{ChainGetTipSetV2, PathChange},
state::ApiInvocResult,
types::{ApiTipsetKey, EventEntry, MessageLookup},
};
Expand Down Expand Up @@ -1410,17 +1413,35 @@ pub async fn eth_logs_for_block_and_transaction(
eth_filter_logs_from_events(ctx, &events)
}

pub async fn eth_logs_with_filter(
/// Collects the logs produced by a single chain head change, for the logs
/// subscription.
pub(in crate::rpc) async fn eth_logs_for_head_change(
ctx: &Ctx,
ts: &Tipset,
spec: Option<EthFilterSpec>,
change: &PathChange<Tipset>,
) -> anyhow::Result<Vec<EthLog>> {
let (receipt_ts, revert_status) = match change {
PathChange::Revert(ts) => (ts, EventRevertStatus::Reverted),
PathChange::Apply(ts) => (ts, EventRevertStatus::Applied),
};
// Genesis carries no events and has no parent message tipset to load.
if receipt_ts.epoch() == 0 {
return Ok(vec![]);
}
let msg_ts = ctx
.chain_index()
.load_required_tipset(receipt_ts.parents())?;
let executed_ts = ctx
.state_manager
.load_executed_tipset_with_receipt(&msg_ts, receipt_ts)
.await?;
let mut events = vec![];
EthEventHandler::collect_events(
EthEventHandler::collect_events_from_messages(
&ctx.state_manager,
ts,
spec.as_ref(),
&msg_ts,
&executed_ts.executed_messages,
None::<&ParsedFilter>,
SkipEvent::OnUnresolvedAddress,
revert_status,
&mut events,
)
.await?;
Expand Down Expand Up @@ -3036,6 +3057,11 @@ pub struct CollectedEvent {
pub(crate) msg_cid: Cid,
}

/// Positions `(message index, event index)` of collected events, grouped by tipset.
/// Identifies events without retaining their entry payloads; grouping by tipset lets
/// membership checks borrow the tipset key and stores each distinct key only once.
pub type SeenEventPositions = HashMap<TipsetKey, HashSet<(u64, u64)>>;

fn match_key(key: &str) -> Option<usize> {
match key.get(0..2) {
Some("t1") => Some(0),
Expand Down Expand Up @@ -3297,6 +3323,54 @@ impl RpcMethod<1> for EthGetLogs {
}
}

/// Shared implementation of `eth_getFilterLogs` / `eth_getFilterChanges` for installed event
/// filters: collects the filter's full result set from the canonical chain, returns only the
/// events that were not present in the previous poll.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
async fn poll_event_filter(
ctx: &Ctx,
event_filter: &EventFilter,
) -> anyhow::Result<Vec<CollectedEvent>> {
let events = ctx
.eth_event_handler
.get_events_for_parsed_filter(
ctx,
&Arc::new(event_filter.into()),
SkipEvent::OnUnresolvedAddress,
)
.await?;
let mut seen_positions = SeenEventPositions::default();
let mut recent_events = Vec::new();
for event in events {
let position = (event.msg_idx, event.event_idx);
let already_seen = event_filter
.seen_positions
.get(&event.tipset_key)
.is_some_and(|positions| positions.contains(&position));
match seen_positions.get_mut(&event.tipset_key) {
Some(positions) => {
positions.insert(position);
}
None => {
seen_positions.insert(event.tipset_key.clone(), HashSet::from_iter([position]));
}
}
if !already_seen {
recent_events.push(event);
}
}
if let Some(store) = &ctx.eth_event_handler.filter_store {
store.update(Arc::new(EventFilter {
id: event_filter.id.clone(),
tipsets: event_filter.tipsets.clone(),
addresses: event_filter.addresses.clone(),
keys_with_codec: event_filter.keys_with_codec.clone(),
max_results: event_filter.max_results,
seen_positions,
}));
}
Ok(recent_events)
}

pub enum EthGetFilterLogs {}
impl RpcMethod<1> for EthGetFilterLogs {
const NAME: &'static str = "Filecoin.EthGetFilterLogs";
Expand All @@ -3316,28 +3390,7 @@ impl RpcMethod<1> for EthGetFilterLogs {
if let Some(store) = &eth_event_handler.filter_store {
let filter = store.get(&filter_id)?;
if let Some(event_filter) = filter.as_any().downcast_ref::<EventFilter>() {
let events = ctx
.eth_event_handler
.get_events_for_parsed_filter(
&ctx,
&Arc::new(event_filter.into()),
SkipEvent::OnUnresolvedAddress,
)
.await?;
let recent_events: Vec<CollectedEvent> = events
.clone()
.into_iter()
.filter(|event| !event_filter.collected.contains(event))
.collect();
let filter = Arc::new(EventFilter {
id: event_filter.id.clone(),
tipsets: event_filter.tipsets.clone(),
addresses: event_filter.addresses.clone(),
keys_with_codec: event_filter.keys_with_codec.clone(),
max_results: event_filter.max_results,
collected: events.clone(),
});
store.update(filter);
let recent_events = poll_event_filter(&ctx, event_filter).await?;
return Ok(eth_filter_result_from_events(&ctx, &recent_events)?);
}
}
Expand Down Expand Up @@ -3367,28 +3420,7 @@ impl RpcMethod<1> for EthGetFilterChanges {
if let Some(store) = &eth_event_handler.filter_store {
let filter = store.get(&filter_id)?;
if let Some(event_filter) = filter.as_any().downcast_ref::<EventFilter>() {
let events = ctx
.eth_event_handler
.get_events_for_parsed_filter(
&ctx,
&Arc::new(event_filter.into()),
SkipEvent::OnUnresolvedAddress,
)
.await?;
let recent_events: Vec<CollectedEvent> = events
.clone()
.into_iter()
.filter(|event| !event_filter.collected.contains(event))
.collect();
let filter = Arc::new(EventFilter {
id: event_filter.id.clone(),
tipsets: event_filter.tipsets.clone(),
addresses: event_filter.addresses.clone(),
keys_with_codec: event_filter.keys_with_codec.clone(),
max_results: event_filter.max_results,
collected: events.clone(),
});
store.update(filter);
let recent_events = poll_event_filter(&ctx, event_filter).await?;
return Ok(eth_filter_result_from_events(&ctx, &recent_events)?);
}
if let Some(tipset_filter) = filter.as_any().downcast_ref::<TipSetFilter>() {
Expand Down
8 changes: 4 additions & 4 deletions src/rpc/methods/eth/filter/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::prelude::*;
use crate::rpc::eth::filter::{ActorEventBlock, ParsedFilter, ParsedFilterTipsets};
use crate::rpc::eth::{CollectedEvent, FilterID, filter::Filter};
use crate::rpc::eth::{FilterID, SeenEventPositions, filter::Filter};
use crate::shim::address::Address;
use ahash::HashMap;
use anyhow::Result;
Expand All @@ -22,8 +22,8 @@ pub struct EventFilter {
pub keys_with_codec: HashMap<String, Vec<ActorEventBlock>>,
// Maximum number of results to collect
pub max_results: usize,
// Collected events
pub collected: Vec<CollectedEvent>,
// Positions of the events returned by the last poll, used to compute the next poll's delta
pub seen_positions: SeenEventPositions,
}

impl From<&EventFilter> for ParsedFilter {
Expand Down Expand Up @@ -72,7 +72,7 @@ impl EventFilterManager {
addresses: pf.addresses,
keys_with_codec: pf.keys,
max_results: self.max_filter_results,
collected: vec![],
seen_positions: Default::default(),
});

self.filters.write().insert(id, filter.clone());
Expand Down
Loading
Loading