From 669cb11ab0689e6ba84776d810d3f16eb71fb30b Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 22 Jun 2026 10:22:51 +0800 Subject: [PATCH 1/3] fix: use ArcSwap for SyncStatus --- src/chain_sync/chain_follower.rs | 10 +-- src/chain_sync/sync_status.rs | 4 +- src/daemon/mod.rs | 2 +- src/db/gc/snapshot.rs | 2 +- src/health/endpoints.rs | 8 +-- src/health/mod.rs | 70 ++++++++++++++----- src/rpc/methods/eth.rs | 3 +- src/rpc/methods/sync.rs | 22 +++--- src/tool/offline_server/server.rs | 3 +- .../api_cmd/generate_test_snapshot.rs | 3 +- src/tool/subcommands/api_cmd/test_snapshot.rs | 3 +- 11 files changed, 87 insertions(+), 43 deletions(-) diff --git a/src/chain_sync/chain_follower.rs b/src/chain_sync/chain_follower.rs index 6842e9afe070..52a71471c1e5 100644 --- a/src/chain_sync/chain_follower.rs +++ b/src/chain_sync/chain_follower.rs @@ -34,10 +34,11 @@ use crate::{ shim::clock::ChainEpoch, state_manager::StateManager, }; +use arc_swap::ArcSwap; use chrono::Utc; use hashbrown::{HashMap, HashSet}; use libp2p::PeerId; -use parking_lot::{Mutex, RwLock}; +use parking_lot::Mutex; use std::time::{Duration, Instant}; use tokio::{sync::Notify, task::JoinSet}; use tracing::{debug, error, info, trace, warn}; @@ -136,7 +137,7 @@ impl ChainFollower { Self { tasks, state_machine, - sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), + sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), state_manager, network, genesis, @@ -326,14 +327,13 @@ async fn chain_follower( // Update the sync states { - let old_status_report = sync_status.read().clone(); + let old_status_report = sync_status.load().shallow_clone(); let new_status_report = old_status_report.update( &state_manager, current_active_forks, stateless_mode, ); - - sync_status.write().clone_from(&new_status_report); + sync_status.store(new_status_report.into()); } for task in task_vec { diff --git a/src/chain_sync/sync_status.rs b/src/chain_sync/sync_status.rs index 28aa2ceb8468..2659c98052f9 100644 --- a/src/chain_sync/sync_status.rs +++ b/src/chain_sync/sync_status.rs @@ -5,8 +5,8 @@ use crate::lotus_json::lotus_json_with_self; use crate::networks::calculate_expected_epoch; use crate::shim::clock::ChainEpoch; use crate::state_manager::StateManager; +use arc_swap::ArcSwap; use chrono::{DateTime, Utc}; -use parking_lot::RwLock; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -101,7 +101,7 @@ pub struct ForkSyncInfo { pub(crate) last_updated: Option>, } -pub type SyncStatus = Arc>; +pub type SyncStatus = Arc>; /// Contains information about the current status of the node's synchronization process. #[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, JsonSchema)] diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index f19b0a532292..981b3a735129 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -370,7 +370,7 @@ fn maybe_prefill_rpc_caches( services.spawn(async move { loop { match validated_tipset_rx.recv().await { - Ok(_) if !sync_status.read().is_synced() => { + Ok(_) if !sync_status.load().is_synced() => { // Skip if the node is catching up to avoid unnecessary work, as the head may be changing rapidly. continue; } diff --git a/src/db/gc/snapshot.rs b/src/db/gc/snapshot.rs index f366832ae029..9c88dc17211d 100644 --- a/src/db/gc/snapshot.rs +++ b/src/db/gc/snapshot.rs @@ -162,7 +162,7 @@ impl SnapshotGarbageCollector { && let Some(car_db_head_epoch) = self.db().heaviest_car_tipset().ok().map(|ts| ts.epoch()) { - let sync_status = &*self.sync_status().read(); + let sync_status = (*self.sync_status().load()).shallow_clone(); let network_head_epoch = sync_status.network_head_epoch; let head_epoch = sync_status.current_head_epoch; if head_epoch > 0 // sync_status has been initialized diff --git a/src/health/endpoints.rs b/src/health/endpoints.rs index 8b6bcd76aee8..4170778f2239 100644 --- a/src/health/endpoints.rs +++ b/src/health/endpoints.rs @@ -95,7 +95,7 @@ pub(crate) async fn healthz( fn check_sync_status_synced(state: &ForestState, acc: &mut MessageAccumulator) -> bool { // Forest must be in sync with the network - if state.sync_status.read().status == NodeSyncStatus::Synced { + if state.sync_status.load().status == NodeSyncStatus::Synced { acc.push_ok("sync complete"); true } else { @@ -106,7 +106,7 @@ fn check_sync_status_synced(state: &ForestState, acc: &mut MessageAccumulator) - fn check_sync_status_not_error(state: &ForestState, acc: &mut MessageAccumulator) -> bool { // Forest must be in sync with the network - if state.sync_status.read().status != NodeSyncStatus::Error { + if state.sync_status.load().status != NodeSyncStatus::Error { acc.push_ok("sync ok"); true } else { @@ -128,7 +128,7 @@ fn check_epoch_up_to_date(state: &ForestState, acc: &mut MessageAccumulator) -> ); // The current epoch of the node must be not too far behind the network - if state.sync_status.read().current_head_epoch >= now_epoch - MAX_EPOCH_DIFF { + if state.sync_status.load().current_head_epoch >= now_epoch - MAX_EPOCH_DIFF { acc.push_ok("epoch up to date"); true } else { @@ -181,7 +181,7 @@ async fn check_f3_running(state: &ForestState, acc: &mut MessageAccumulator) -> acc.push_ok("f3 running"); true } else if crate::f3::get_f3_sidecar_params(&state.chain_config).bootstrap_epoch - > state.sync_status.read().network_head_epoch + > state.sync_status.load().network_head_epoch { acc.push_ok("f3 pending activation"); true diff --git a/src/health/mod.rs b/src/health/mod.rs index c7b174ea6028..a856120cec24 100644 --- a/src/health/mod.rs +++ b/src/health/mod.rs @@ -62,7 +62,7 @@ mod test { use crate::Client; use crate::chain_sync::{NodeSyncStatus, SyncStatusReport}; use crate::cli_shared::cli::ChainIndexerConfig; - use parking_lot::RwLock; + use arc_swap::ArcSwap; use reqwest::StatusCode; #[tokio::test] @@ -70,7 +70,7 @@ mod test { let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0); let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let sync_status = Arc::new(RwLock::new(SyncStatusReport::init())); + let sync_status = Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())); let forest_state = ForestState { config: Config { @@ -85,10 +85,10 @@ mod test { }, ..Default::default() }, - chain_config: Arc::new(ChainConfig::default()), + chain_config: Default::default(), genesis_timestamp: 0, sync_status: sync_status.clone(), - peer_manager: Arc::new(PeerManager::default()), + peer_manager: Default::default(), }; let listener = @@ -112,8 +112,14 @@ mod test { }; // instrument the state so that the ready requirements are met - sync_status.write().status = NodeSyncStatus::Synced; - sync_status.write().current_head_epoch = i64::MAX; + sync_status.store( + SyncStatusReport { + status: NodeSyncStatus::Synced, + current_head_epoch: i64::MAX, + ..Arc::unwrap_or_clone(sync_status.load().clone()) + } + .into(), + ); assert_eq!( call_healthcheck(false).await.unwrap().status(), @@ -128,8 +134,14 @@ mod test { // instrument the state so that the ready requirements are not met drop(rpc_listener); - sync_status.write().status = NodeSyncStatus::Error; - sync_status.write().current_head_epoch = 0; + sync_status.store( + SyncStatusReport { + status: NodeSyncStatus::Error, + current_head_epoch: 0, + ..Arc::unwrap_or_clone(sync_status.load().clone()) + } + .into(), + ); assert_eq!( call_healthcheck(false).await.unwrap().status(), @@ -149,7 +161,7 @@ mod test { let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0); let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let sync_status = Arc::new(RwLock::new(SyncStatusReport::default())); + let sync_status = Arc::new(ArcSwap::from_pointee(SyncStatusReport::default())); let peer_manager = Arc::new(PeerManager::default()); let forest_state = ForestState { config: Config { @@ -187,7 +199,13 @@ mod test { }; // instrument the state so that the live requirements are met - sync_status.write().status = NodeSyncStatus::Syncing; + sync_status.store( + SyncStatusReport { + status: NodeSyncStatus::Syncing, + ..Arc::unwrap_or_clone(sync_status.load().clone()) + } + .into(), + ); let peer = libp2p::PeerId::random(); peer_manager.touch_peer(&peer); @@ -203,7 +221,13 @@ mod test { assert!(text.contains("[+] peers connected")); // instrument the state so that the live requirements are not met - sync_status.write().status = NodeSyncStatus::Error; + sync_status.store( + SyncStatusReport { + status: NodeSyncStatus::Error, + ..Arc::unwrap_or_clone(sync_status.load().clone()) + } + .into(), + ); peer_manager.remove_peer(&peer); assert_eq!( @@ -224,7 +248,7 @@ mod test { let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let peer_manager = Arc::new(PeerManager::default()); - let sync_status = Arc::new(RwLock::new(SyncStatusReport::default())); + let sync_status = Arc::new(ArcSwap::from_pointee(SyncStatusReport::default())); let forest_state = ForestState { config: Config { client: Client { @@ -261,8 +285,14 @@ mod test { }; // instrument the state so that the health requirements are met - sync_status.write().current_head_epoch = i64::MAX; - sync_status.write().status = NodeSyncStatus::Syncing; + sync_status.store( + SyncStatusReport { + status: NodeSyncStatus::Syncing, + current_head_epoch: i64::MAX, + ..Arc::unwrap_or_clone(sync_status.load().clone()) + } + .into(), + ); let peer = libp2p::PeerId::random(); peer_manager.touch_peer(&peer); @@ -280,8 +310,14 @@ mod test { // instrument the state so that the health requirements are not met drop(rpc_listener); - sync_status.write().status = NodeSyncStatus::Error; - sync_status.write().current_head_epoch = 0; + sync_status.store( + SyncStatusReport { + status: NodeSyncStatus::Error, + current_head_epoch: 0, + ..Arc::unwrap_or_clone(sync_status.load().clone()) + } + .into(), + ); peer_manager.remove_peer(&peer); assert_eq!( @@ -311,7 +347,7 @@ mod test { }, chain_config: Arc::default(), genesis_timestamp: 0, - sync_status: Arc::new(RwLock::new(SyncStatusReport::default())), + sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::default())), peer_manager: Arc::default(), }; let listener = diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index b8f1c3f99436..33216942ecb7 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -1787,8 +1787,7 @@ impl RpcMethod<0> for EthSyncing { (): Self::Params, ext: &http::Extensions, ) -> Result { - let sync_status: crate::chain_sync::SyncStatusReport = - crate::rpc::sync::SyncStatus::handle(ctx, (), ext).await?; + let sync_status = crate::rpc::sync::SyncStatus::handle(ctx, (), ext).await?; match sync_status.status { NodeSyncStatus::Synced => Ok(EthSyncingResult { done_sync: true, diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 4d6075d69cad..52921b49c019 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -92,14 +92,14 @@ impl RpcMethod<0> for SyncStatus { const DESCRIPTION: Option<&'static str> = Some("Returns the current sync status of the node."); type Params = (); - type Ok = SyncStatusReport; + type Ok = Arc; async fn handle( ctx: Ctx, (): Self::Params, _: &http::Extensions, ) -> Result { - let sync_status = ctx.sync_status.as_ref().read().clone(); + let sync_status = ctx.sync_status.load().shallow_clone(); Ok(sync_status) } } @@ -122,7 +122,7 @@ impl RpcMethod<1> for SyncSubmitBlock { (block_msg,): Self::Params, _: &http::Extensions, ) -> Result { - if !matches!(ctx.sync_status.read().status, NodeSyncStatus::Synced) { + if !matches!(ctx.sync_status.load().status, NodeSyncStatus::Synced) { Err(anyhow!("the node isn't in 'follow' mode"))? } let genesis_network_name = ctx.chain_config().network.genesis_name(); @@ -232,7 +232,7 @@ mod tests { keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory).unwrap())), mpool, bad_blocks: Some(Default::default()), - sync_status: Arc::new(RwLock::new(SyncStatusReport::default())), + sync_status: Default::default(), eth_event_handler: Arc::new(EthEventHandler::new()), sync_network_context, start_time, @@ -279,16 +279,22 @@ mod tests { let sync_status = SyncStatus::handle(ctx.clone(), (), &Default::default()) .await .unwrap(); - assert_eq!(sync_status, st_copy.as_ref().read().clone()); + assert_eq!(sync_status, st_copy.load().clone()); // update cloned state - st_copy.write().status = NodeSyncStatus::Syncing; - st_copy.write().current_head_epoch = 4; + st_copy.store( + SyncStatusReport { + status: NodeSyncStatus::Syncing, + current_head_epoch: 4, + ..Arc::unwrap_or_clone(st_copy.load().clone()) + } + .into(), + ); let sync_status = SyncStatus::handle(ctx.clone(), (), &Default::default()) .await .unwrap(); - assert_eq!(sync_status, st_copy.as_ref().read().clone()); + assert_eq!(sync_status, st_copy.load().clone()); } } diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 11c5c9262e74..bf6de454a1ba 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -22,6 +22,7 @@ use crate::state_manager::StateManager; use crate::utils::net::{DownloadFileOption, download_to}; use crate::utils::proofs_api::{self, ensure_proof_params_downloaded}; use crate::{Config, JWT_IDENTIFIER}; +use arc_swap::ArcSwap; use jsonrpsee::server::stop_channel; use parking_lot::RwLock; use std::{ @@ -102,7 +103,7 @@ pub async fn offline_rpc_state( keystore: Arc::new(RwLock::new(keystore)), mpool: message_pool, bad_blocks: Default::default(), - sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), + sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), sync_network_context, start_time: chrono::Utc::now(), diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 0e439b7fd5ec..ae3f64605268 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -22,6 +22,7 @@ use crate::{ state_manager::StateManager, }; use api_compare_tests::TestDump; +use arc_swap::ArcSwap; use fvm_shared4::address::Network; use openrpc_types::ParamStructure; use parking_lot::RwLock; @@ -141,7 +142,7 @@ async fn ctx( keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: message_pool, bad_blocks: Default::default(), - sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), + sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::new()), sync_network_context, start_time: chrono::Utc::now(), diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 4963fce3f8f1..064c4a82432e 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -23,6 +23,7 @@ use crate::{ state_manager::StateManager, }; use anyhow::Context as _; +use arc_swap::ArcSwap; use openrpc_types::ParamStructure; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; @@ -170,7 +171,7 @@ async fn ctx( keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: message_pool, bad_blocks: Default::default(), - sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), + sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::new()), sync_network_context, start_time: chrono::Utc::now(), From f7785a82cb9562420028906ef369c79d1fe29341 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 22 Jun 2026 16:37:54 +0800 Subject: [PATCH 2/3] resolve comments --- src/chain_sync/sync_status.rs | 12 ++++++ src/health/mod.rs | 74 +++++++++++++++++++---------------- src/rpc/methods/sync.rs | 13 +++--- 3 files changed, 59 insertions(+), 40 deletions(-) diff --git a/src/chain_sync/sync_status.rs b/src/chain_sync/sync_status.rs index 2659c98052f9..a93f9ccea871 100644 --- a/src/chain_sync/sync_status.rs +++ b/src/chain_sync/sync_status.rs @@ -200,4 +200,16 @@ impl SyncStatusReport { .map(|fork_info| fork_info.target_sync_epoch_start) .min() } + + #[cfg(test)] + pub fn with_status(mut self, status: NodeSyncStatus) -> Self { + self.status = status; + self + } + + #[cfg(test)] + pub fn with_current_head_epoch(mut self, current_head_epoch: ChainEpoch) -> Self { + self.current_head_epoch = current_head_epoch; + self + } } diff --git a/src/health/mod.rs b/src/health/mod.rs index a856120cec24..8c42dcd703b1 100644 --- a/src/health/mod.rs +++ b/src/health/mod.rs @@ -113,12 +113,13 @@ mod test { // instrument the state so that the ready requirements are met sync_status.store( - SyncStatusReport { - status: NodeSyncStatus::Synced, - current_head_epoch: i64::MAX, - ..Arc::unwrap_or_clone(sync_status.load().clone()) - } - .into(), + sync_status + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Synced) + .current_head_epoch(i64::MAX) + .into(), ); assert_eq!( @@ -135,12 +136,13 @@ mod test { // instrument the state so that the ready requirements are not met drop(rpc_listener); sync_status.store( - SyncStatusReport { - status: NodeSyncStatus::Error, - current_head_epoch: 0, - ..Arc::unwrap_or_clone(sync_status.load().clone()) - } - .into(), + sync_status + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Error) + .current_head_epoch(0) + .into(), ); assert_eq!( @@ -200,11 +202,12 @@ mod test { // instrument the state so that the live requirements are met sync_status.store( - SyncStatusReport { - status: NodeSyncStatus::Syncing, - ..Arc::unwrap_or_clone(sync_status.load().clone()) - } - .into(), + sync_status + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Syncing) + .into(), ); let peer = libp2p::PeerId::random(); peer_manager.touch_peer(&peer); @@ -222,11 +225,12 @@ mod test { // instrument the state so that the live requirements are not met sync_status.store( - SyncStatusReport { - status: NodeSyncStatus::Error, - ..Arc::unwrap_or_clone(sync_status.load().clone()) - } - .into(), + sync_status + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Error) + .into(), ); peer_manager.remove_peer(&peer); @@ -286,12 +290,13 @@ mod test { // instrument the state so that the health requirements are met sync_status.store( - SyncStatusReport { - status: NodeSyncStatus::Syncing, - current_head_epoch: i64::MAX, - ..Arc::unwrap_or_clone(sync_status.load().clone()) - } - .into(), + sync_status + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Syncing) + .current_head_epoch(i64::MAX) + .into(), ); let peer = libp2p::PeerId::random(); peer_manager.touch_peer(&peer); @@ -311,12 +316,13 @@ mod test { // instrument the state so that the health requirements are not met drop(rpc_listener); sync_status.store( - SyncStatusReport { - status: NodeSyncStatus::Error, - current_head_epoch: 0, - ..Arc::unwrap_or_clone(sync_status.load().clone()) - } - .into(), + sync_status + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Error) + .current_head_epoch(0) + .into(), ); peer_manager.remove_peer(&peer); diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 52921b49c019..c803050600b9 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -283,12 +283,13 @@ mod tests { // update cloned state st_copy.store( - SyncStatusReport { - status: NodeSyncStatus::Syncing, - current_head_epoch: 4, - ..Arc::unwrap_or_clone(st_copy.load().clone()) - } - .into(), + st_copy + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Syncing) + .with_current_head_epoch(4) + .into(), ); let sync_status = SyncStatus::handle(ctx.clone(), (), &Default::default()) From 828f0f72b71bbe1101b156970d207629d2e979dc Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 22 Jun 2026 16:41:03 +0800 Subject: [PATCH 3/3] fix --- src/health/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/health/mod.rs b/src/health/mod.rs index 8c42dcd703b1..c2c4500e95a2 100644 --- a/src/health/mod.rs +++ b/src/health/mod.rs @@ -118,7 +118,7 @@ mod test { .as_ref() .clone() .with_status(NodeSyncStatus::Synced) - .current_head_epoch(i64::MAX) + .with_current_head_epoch(i64::MAX) .into(), ); @@ -141,7 +141,7 @@ mod test { .as_ref() .clone() .with_status(NodeSyncStatus::Error) - .current_head_epoch(0) + .with_current_head_epoch(0) .into(), ); @@ -295,7 +295,7 @@ mod test { .as_ref() .clone() .with_status(NodeSyncStatus::Syncing) - .current_head_epoch(i64::MAX) + .with_current_head_epoch(i64::MAX) .into(), ); let peer = libp2p::PeerId::random(); @@ -321,7 +321,7 @@ mod test { .as_ref() .clone() .with_status(NodeSyncStatus::Error) - .current_head_epoch(0) + .with_current_head_epoch(0) .into(), ); peer_manager.remove_peer(&peer);