Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,13 @@ fn maybe_start_rpc_service(
.map(|path| crate::rpc::FilterList::new_from_file(path))
.transpose()?;
info!("JSON-RPC endpoint will listen at {rpc_address}");
let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events));
let eth_event_handler = {
let mp = mpool.shallow_clone();
let subscriber = crate::rpc::eth::filter::mempool::MpoolSubscriber::new(move || {
mp.subscribe_to_updates()
});
Arc::new(EthEventHandler::from_config(&config.events, subscriber))
};
if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") {
warn!(
"JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments"
Expand Down
75 changes: 4 additions & 71 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2608,15 +2608,7 @@ impl EthGetTransactionHashByCid {
if let Ok(smsgs) = smsgs_result
&& let Some(smsg) = smsgs.first()
{
let hash = if smsg.is_delegated() {
let (_, tx) = eth_tx_from_signed_eth_message(smsg, eth_chain_id)?;
tx.eth_hash()?.into()
} else if smsg.is_secp256k1() {
smsg.cid().into()
} else {
smsg.message().cid().into()
};
return Ok(Some(hash));
return Ok(Some(eth_tx_hash_from_signed_message(smsg, eth_chain_id)?));
}

let msg_result = crate::chain::get_chain_message(db, &cid);
Expand Down Expand Up @@ -3151,29 +3143,6 @@ fn eth_filter_logs_from_tipsets(events: &[CollectedEvent]) -> anyhow::Result<Vec
.collect()
}

fn eth_filter_logs_from_messages(
ctx: &Ctx,
events: &[CollectedEvent],
) -> anyhow::Result<Vec<EthHash>> {
events
.iter()
.filter_map(|event| {
match eth_tx_hash_from_message_cid(
ctx.db(),
&event.msg_cid,
ctx.state_manager.chain_config().eth_chain_id,
) {
Ok(Some(hash)) => Some(Ok(hash)),
Ok(None) => {
tracing::warn!("Ignoring event");
None
}
Err(err) => Some(Err(err)),
}
})
.collect()
}

fn eth_filter_logs_from_events(
ctx: &Ctx,
events: &[CollectedEvent],
Expand Down Expand Up @@ -3256,15 +3225,6 @@ fn eth_filter_result_from_tipsets(events: &[CollectedEvent]) -> anyhow::Result<E
)?))
}

fn eth_filter_result_from_messages(
ctx: &Ctx,
events: &[CollectedEvent],
) -> anyhow::Result<EthFilterResult> {
Ok(EthFilterResult::Hashes(eth_filter_logs_from_messages(
ctx, events,
)?))
}

pub enum EthGetLogs {}
impl RpcMethod<1> for EthGetLogs {
const NAME: &'static str = "Filecoin.EthGetLogs";
Expand Down Expand Up @@ -3428,36 +3388,9 @@ impl RpcMethod<1> for EthGetFilterChanges {
return Ok(eth_filter_result_from_tipsets(&events)?);
}
if let Some(mempool_filter) = filter.as_any().downcast_ref::<MempoolFilter>() {
let events = ctx
.eth_event_handler
.get_events_for_parsed_filter(
&ctx,
&Arc::new(ParsedFilter::new_with_tipset(ParsedFilterTipsets::Range(
// heaviest tipset doesn't have events because its messages haven't been executed yet
RangeInclusive::new(
mempool_filter
.collected
.unwrap_or(ctx.chain_store().heaviest_tipset().epoch() - 1),
// Use -1 to indicate that the range extends until the latest available tipset.
-1,
),
))),
SkipEvent::OnUnresolvedAddress,
)
.await?;
let new_collected = events
.iter()
.max_by_key(|event| event.height)
.map(|e| e.height);
if let Some(height) = new_collected {
let filter = Arc::new(MempoolFilter {
id: mempool_filter.id.clone(),
max_results: mempool_filter.max_results,
collected: Some(height),
});
store.update(filter);
}
return Ok(eth_filter_result_from_messages(&ctx, &events)?);
let chain_id = ctx.chain_config().eth_chain_id;
let hashes = mempool_filter.drain(chain_id);
return Ok(EthFilterResult::Hashes(hashes));
}
}
Err(anyhow::anyhow!("method not supported").into())
Expand Down
Loading