diff --git a/src/chain_sync/chain_follower.rs b/src/chain_sync/chain_follower.rs index 6842e9afe070..52a71471c1e5 100644 --- a/src/chain_sync/chain_follower.rs +++ b/src/chain_sync/chain_follower.rs @@ -34,10 +34,11 @@ use crate::{ shim::clock::ChainEpoch, state_manager::StateManager, }; +use arc_swap::ArcSwap; use chrono::Utc; use hashbrown::{HashMap, HashSet}; use libp2p::PeerId; -use parking_lot::{Mutex, RwLock}; +use parking_lot::Mutex; use std::time::{Duration, Instant}; use tokio::{sync::Notify, task::JoinSet}; use tracing::{debug, error, info, trace, warn}; @@ -136,7 +137,7 @@ impl ChainFollower { Self { tasks, state_machine, - sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), + sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), state_manager, network, genesis, @@ -326,14 +327,13 @@ async fn chain_follower( // Update the sync states { - let old_status_report = sync_status.read().clone(); + let old_status_report = sync_status.load().shallow_clone(); let new_status_report = old_status_report.update( &state_manager, current_active_forks, stateless_mode, ); - - sync_status.write().clone_from(&new_status_report); + sync_status.store(new_status_report.into()); } for task in task_vec { diff --git a/src/chain_sync/sync_status.rs b/src/chain_sync/sync_status.rs index 28aa2ceb8468..a93f9ccea871 100644 --- a/src/chain_sync/sync_status.rs +++ b/src/chain_sync/sync_status.rs @@ -5,8 +5,8 @@ use crate::lotus_json::lotus_json_with_self; use crate::networks::calculate_expected_epoch; use crate::shim::clock::ChainEpoch; use crate::state_manager::StateManager; +use arc_swap::ArcSwap; use chrono::{DateTime, Utc}; -use parking_lot::RwLock; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -101,7 +101,7 @@ pub struct ForkSyncInfo { pub(crate) last_updated: Option>, } -pub type SyncStatus = Arc>; +pub type SyncStatus = Arc>; /// Contains information about the current status of the node's synchronization process. #[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, JsonSchema)] @@ -200,4 +200,16 @@ impl SyncStatusReport { .map(|fork_info| fork_info.target_sync_epoch_start) .min() } + + #[cfg(test)] + pub fn with_status(mut self, status: NodeSyncStatus) -> Self { + self.status = status; + self + } + + #[cfg(test)] + pub fn with_current_head_epoch(mut self, current_head_epoch: ChainEpoch) -> Self { + self.current_head_epoch = current_head_epoch; + self + } } diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index f19b0a532292..981b3a735129 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -370,7 +370,7 @@ fn maybe_prefill_rpc_caches( services.spawn(async move { loop { match validated_tipset_rx.recv().await { - Ok(_) if !sync_status.read().is_synced() => { + Ok(_) if !sync_status.load().is_synced() => { // Skip if the node is catching up to avoid unnecessary work, as the head may be changing rapidly. continue; } diff --git a/src/db/gc/snapshot.rs b/src/db/gc/snapshot.rs index f366832ae029..9c88dc17211d 100644 --- a/src/db/gc/snapshot.rs +++ b/src/db/gc/snapshot.rs @@ -162,7 +162,7 @@ impl SnapshotGarbageCollector { && let Some(car_db_head_epoch) = self.db().heaviest_car_tipset().ok().map(|ts| ts.epoch()) { - let sync_status = &*self.sync_status().read(); + let sync_status = (*self.sync_status().load()).shallow_clone(); let network_head_epoch = sync_status.network_head_epoch; let head_epoch = sync_status.current_head_epoch; if head_epoch > 0 // sync_status has been initialized diff --git a/src/health/endpoints.rs b/src/health/endpoints.rs index 8b6bcd76aee8..4170778f2239 100644 --- a/src/health/endpoints.rs +++ b/src/health/endpoints.rs @@ -95,7 +95,7 @@ pub(crate) async fn healthz( fn check_sync_status_synced(state: &ForestState, acc: &mut MessageAccumulator) -> bool { // Forest must be in sync with the network - if state.sync_status.read().status == NodeSyncStatus::Synced { + if state.sync_status.load().status == NodeSyncStatus::Synced { acc.push_ok("sync complete"); true } else { @@ -106,7 +106,7 @@ fn check_sync_status_synced(state: &ForestState, acc: &mut MessageAccumulator) - fn check_sync_status_not_error(state: &ForestState, acc: &mut MessageAccumulator) -> bool { // Forest must be in sync with the network - if state.sync_status.read().status != NodeSyncStatus::Error { + if state.sync_status.load().status != NodeSyncStatus::Error { acc.push_ok("sync ok"); true } else { @@ -128,7 +128,7 @@ fn check_epoch_up_to_date(state: &ForestState, acc: &mut MessageAccumulator) -> ); // The current epoch of the node must be not too far behind the network - if state.sync_status.read().current_head_epoch >= now_epoch - MAX_EPOCH_DIFF { + if state.sync_status.load().current_head_epoch >= now_epoch - MAX_EPOCH_DIFF { acc.push_ok("epoch up to date"); true } else { @@ -181,7 +181,7 @@ async fn check_f3_running(state: &ForestState, acc: &mut MessageAccumulator) -> acc.push_ok("f3 running"); true } else if crate::f3::get_f3_sidecar_params(&state.chain_config).bootstrap_epoch - > state.sync_status.read().network_head_epoch + > state.sync_status.load().network_head_epoch { acc.push_ok("f3 pending activation"); true diff --git a/src/health/mod.rs b/src/health/mod.rs index c7b174ea6028..c2c4500e95a2 100644 --- a/src/health/mod.rs +++ b/src/health/mod.rs @@ -62,7 +62,7 @@ mod test { use crate::Client; use crate::chain_sync::{NodeSyncStatus, SyncStatusReport}; use crate::cli_shared::cli::ChainIndexerConfig; - use parking_lot::RwLock; + use arc_swap::ArcSwap; use reqwest::StatusCode; #[tokio::test] @@ -70,7 +70,7 @@ mod test { let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0); let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let sync_status = Arc::new(RwLock::new(SyncStatusReport::init())); + let sync_status = Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())); let forest_state = ForestState { config: Config { @@ -85,10 +85,10 @@ mod test { }, ..Default::default() }, - chain_config: Arc::new(ChainConfig::default()), + chain_config: Default::default(), genesis_timestamp: 0, sync_status: sync_status.clone(), - peer_manager: Arc::new(PeerManager::default()), + peer_manager: Default::default(), }; let listener = @@ -112,8 +112,15 @@ mod test { }; // instrument the state so that the ready requirements are met - sync_status.write().status = NodeSyncStatus::Synced; - sync_status.write().current_head_epoch = i64::MAX; + sync_status.store( + sync_status + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Synced) + .with_current_head_epoch(i64::MAX) + .into(), + ); assert_eq!( call_healthcheck(false).await.unwrap().status(), @@ -128,8 +135,15 @@ mod test { // instrument the state so that the ready requirements are not met drop(rpc_listener); - sync_status.write().status = NodeSyncStatus::Error; - sync_status.write().current_head_epoch = 0; + sync_status.store( + sync_status + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Error) + .with_current_head_epoch(0) + .into(), + ); assert_eq!( call_healthcheck(false).await.unwrap().status(), @@ -149,7 +163,7 @@ mod test { let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0); let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let sync_status = Arc::new(RwLock::new(SyncStatusReport::default())); + let sync_status = Arc::new(ArcSwap::from_pointee(SyncStatusReport::default())); let peer_manager = Arc::new(PeerManager::default()); let forest_state = ForestState { config: Config { @@ -187,7 +201,14 @@ mod test { }; // instrument the state so that the live requirements are met - sync_status.write().status = NodeSyncStatus::Syncing; + sync_status.store( + sync_status + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Syncing) + .into(), + ); let peer = libp2p::PeerId::random(); peer_manager.touch_peer(&peer); @@ -203,7 +224,14 @@ mod test { assert!(text.contains("[+] peers connected")); // instrument the state so that the live requirements are not met - sync_status.write().status = NodeSyncStatus::Error; + sync_status.store( + sync_status + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Error) + .into(), + ); peer_manager.remove_peer(&peer); assert_eq!( @@ -224,7 +252,7 @@ mod test { let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let peer_manager = Arc::new(PeerManager::default()); - let sync_status = Arc::new(RwLock::new(SyncStatusReport::default())); + let sync_status = Arc::new(ArcSwap::from_pointee(SyncStatusReport::default())); let forest_state = ForestState { config: Config { client: Client { @@ -261,8 +289,15 @@ mod test { }; // instrument the state so that the health requirements are met - sync_status.write().current_head_epoch = i64::MAX; - sync_status.write().status = NodeSyncStatus::Syncing; + sync_status.store( + sync_status + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Syncing) + .with_current_head_epoch(i64::MAX) + .into(), + ); let peer = libp2p::PeerId::random(); peer_manager.touch_peer(&peer); @@ -280,8 +315,15 @@ mod test { // instrument the state so that the health requirements are not met drop(rpc_listener); - sync_status.write().status = NodeSyncStatus::Error; - sync_status.write().current_head_epoch = 0; + sync_status.store( + sync_status + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Error) + .with_current_head_epoch(0) + .into(), + ); peer_manager.remove_peer(&peer); assert_eq!( @@ -311,7 +353,7 @@ mod test { }, chain_config: Arc::default(), genesis_timestamp: 0, - sync_status: Arc::new(RwLock::new(SyncStatusReport::default())), + sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::default())), peer_manager: Arc::default(), }; let listener = diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index b8f1c3f99436..33216942ecb7 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -1787,8 +1787,7 @@ impl RpcMethod<0> for EthSyncing { (): Self::Params, ext: &http::Extensions, ) -> Result { - let sync_status: crate::chain_sync::SyncStatusReport = - crate::rpc::sync::SyncStatus::handle(ctx, (), ext).await?; + let sync_status = crate::rpc::sync::SyncStatus::handle(ctx, (), ext).await?; match sync_status.status { NodeSyncStatus::Synced => Ok(EthSyncingResult { done_sync: true, diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 4d6075d69cad..c803050600b9 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -92,14 +92,14 @@ impl RpcMethod<0> for SyncStatus { const DESCRIPTION: Option<&'static str> = Some("Returns the current sync status of the node."); type Params = (); - type Ok = SyncStatusReport; + type Ok = Arc; async fn handle( ctx: Ctx, (): Self::Params, _: &http::Extensions, ) -> Result { - let sync_status = ctx.sync_status.as_ref().read().clone(); + let sync_status = ctx.sync_status.load().shallow_clone(); Ok(sync_status) } } @@ -122,7 +122,7 @@ impl RpcMethod<1> for SyncSubmitBlock { (block_msg,): Self::Params, _: &http::Extensions, ) -> Result { - if !matches!(ctx.sync_status.read().status, NodeSyncStatus::Synced) { + if !matches!(ctx.sync_status.load().status, NodeSyncStatus::Synced) { Err(anyhow!("the node isn't in 'follow' mode"))? } let genesis_network_name = ctx.chain_config().network.genesis_name(); @@ -232,7 +232,7 @@ mod tests { keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory).unwrap())), mpool, bad_blocks: Some(Default::default()), - sync_status: Arc::new(RwLock::new(SyncStatusReport::default())), + sync_status: Default::default(), eth_event_handler: Arc::new(EthEventHandler::new()), sync_network_context, start_time, @@ -279,16 +279,23 @@ mod tests { let sync_status = SyncStatus::handle(ctx.clone(), (), &Default::default()) .await .unwrap(); - assert_eq!(sync_status, st_copy.as_ref().read().clone()); + assert_eq!(sync_status, st_copy.load().clone()); // update cloned state - st_copy.write().status = NodeSyncStatus::Syncing; - st_copy.write().current_head_epoch = 4; + st_copy.store( + st_copy + .load() + .as_ref() + .clone() + .with_status(NodeSyncStatus::Syncing) + .with_current_head_epoch(4) + .into(), + ); let sync_status = SyncStatus::handle(ctx.clone(), (), &Default::default()) .await .unwrap(); - assert_eq!(sync_status, st_copy.as_ref().read().clone()); + assert_eq!(sync_status, st_copy.load().clone()); } } diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 11c5c9262e74..bf6de454a1ba 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -22,6 +22,7 @@ use crate::state_manager::StateManager; use crate::utils::net::{DownloadFileOption, download_to}; use crate::utils::proofs_api::{self, ensure_proof_params_downloaded}; use crate::{Config, JWT_IDENTIFIER}; +use arc_swap::ArcSwap; use jsonrpsee::server::stop_channel; use parking_lot::RwLock; use std::{ @@ -102,7 +103,7 @@ pub async fn offline_rpc_state( keystore: Arc::new(RwLock::new(keystore)), mpool: message_pool, bad_blocks: Default::default(), - sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), + sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), sync_network_context, start_time: chrono::Utc::now(), diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 0e439b7fd5ec..ae3f64605268 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -22,6 +22,7 @@ use crate::{ state_manager::StateManager, }; use api_compare_tests::TestDump; +use arc_swap::ArcSwap; use fvm_shared4::address::Network; use openrpc_types::ParamStructure; use parking_lot::RwLock; @@ -141,7 +142,7 @@ async fn ctx( keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: message_pool, bad_blocks: Default::default(), - sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), + sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::new()), sync_network_context, start_time: chrono::Utc::now(), diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 4963fce3f8f1..064c4a82432e 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -23,6 +23,7 @@ use crate::{ state_manager::StateManager, }; use anyhow::Context as _; +use arc_swap::ArcSwap; use openrpc_types::ParamStructure; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; @@ -170,7 +171,7 @@ async fn ctx( keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: message_pool, bad_blocks: Default::default(), - sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), + sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::new()), sync_network_context, start_time: chrono::Utc::now(),