Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ use crate::{
shim::clock::ChainEpoch,
state_manager::StateManager,
};
use arc_swap::ArcSwap;
use chrono::Utc;
use hashbrown::{HashMap, HashSet};
use libp2p::PeerId;
use parking_lot::{Mutex, RwLock};
use parking_lot::Mutex;
use std::time::{Duration, Instant};
use tokio::{sync::Notify, task::JoinSet};
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -136,7 +137,7 @@ impl ChainFollower {
Self {
tasks,
state_machine,
sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())),
state_manager,
network,
genesis,
Expand Down Expand Up @@ -326,14 +327,13 @@ async fn chain_follower(

// Update the sync states
{
let old_status_report = sync_status.read().clone();
let old_status_report = sync_status.load().shallow_clone();
let new_status_report = old_status_report.update(
&state_manager,
current_active_forks,
stateless_mode,
);

sync_status.write().clone_from(&new_status_report);
sync_status.store(new_status_report.into());
}

for task in task_vec {
Expand Down
16 changes: 14 additions & 2 deletions src/chain_sync/sync_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::lotus_json::lotus_json_with_self;
use crate::networks::calculate_expected_epoch;
use crate::shim::clock::ChainEpoch;
use crate::state_manager::StateManager;
use arc_swap::ArcSwap;
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
Expand Down Expand Up @@ -101,7 +101,7 @@ pub struct ForkSyncInfo {
pub(crate) last_updated: Option<DateTime<Utc>>,
}

pub type SyncStatus = Arc<RwLock<SyncStatusReport>>;
pub type SyncStatus = Arc<ArcSwap<SyncStatusReport>>;

/// Contains information about the current status of the node's synchronization process.
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, JsonSchema)]
Expand Down Expand Up @@ -200,4 +200,16 @@ impl SyncStatusReport {
.map(|fork_info| fork_info.target_sync_epoch_start)
.min()
}

#[cfg(test)]
pub fn with_status(mut self, status: NodeSyncStatus) -> Self {
self.status = status;
self
}

#[cfg(test)]
pub fn with_current_head_epoch(mut self, current_head_epoch: ChainEpoch) -> Self {
self.current_head_epoch = current_head_epoch;
self
}
}
2 changes: 1 addition & 1 deletion src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ fn maybe_prefill_rpc_caches(
services.spawn(async move {
loop {
match validated_tipset_rx.recv().await {
Ok(_) if !sync_status.read().is_synced() => {
Ok(_) if !sync_status.load().is_synced() => {
// Skip if the node is catching up to avoid unnecessary work, as the head may be changing rapidly.
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion src/db/gc/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl SnapshotGarbageCollector {
&& let Some(car_db_head_epoch) =
self.db().heaviest_car_tipset().ok().map(|ts| ts.epoch())
{
let sync_status = &*self.sync_status().read();
let sync_status = (*self.sync_status().load()).shallow_clone();
let network_head_epoch = sync_status.network_head_epoch;
let head_epoch = sync_status.current_head_epoch;
if head_epoch > 0 // sync_status has been initialized
Expand Down
8 changes: 4 additions & 4 deletions src/health/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub(crate) async fn healthz(

fn check_sync_status_synced(state: &ForestState, acc: &mut MessageAccumulator) -> bool {
// Forest must be in sync with the network
if state.sync_status.read().status == NodeSyncStatus::Synced {
if state.sync_status.load().status == NodeSyncStatus::Synced {
acc.push_ok("sync complete");
true
} else {
Expand All @@ -106,7 +106,7 @@ fn check_sync_status_synced(state: &ForestState, acc: &mut MessageAccumulator) -

fn check_sync_status_not_error(state: &ForestState, acc: &mut MessageAccumulator) -> bool {
// Forest must be in sync with the network
if state.sync_status.read().status != NodeSyncStatus::Error {
if state.sync_status.load().status != NodeSyncStatus::Error {
acc.push_ok("sync ok");
true
} else {
Expand All @@ -128,7 +128,7 @@ fn check_epoch_up_to_date(state: &ForestState, acc: &mut MessageAccumulator) ->
);

// The current epoch of the node must be not too far behind the network
if state.sync_status.read().current_head_epoch >= now_epoch - MAX_EPOCH_DIFF {
if state.sync_status.load().current_head_epoch >= now_epoch - MAX_EPOCH_DIFF {
acc.push_ok("epoch up to date");
true
} else {
Expand Down Expand Up @@ -181,7 +181,7 @@ async fn check_f3_running(state: &ForestState, acc: &mut MessageAccumulator) ->
acc.push_ok("f3 running");
true
} else if crate::f3::get_f3_sidecar_params(&state.chain_config).bootstrap_epoch
> state.sync_status.read().network_head_epoch
> state.sync_status.load().network_head_epoch
{
acc.push_ok("f3 pending activation");
true
Expand Down
76 changes: 59 additions & 17 deletions src/health/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ mod test {
use crate::Client;
use crate::chain_sync::{NodeSyncStatus, SyncStatusReport};
use crate::cli_shared::cli::ChainIndexerConfig;
use parking_lot::RwLock;
use arc_swap::ArcSwap;
use reqwest::StatusCode;

#[tokio::test]
async fn test_check_readyz() {
let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();

let sync_status = Arc::new(RwLock::new(SyncStatusReport::init()));
let sync_status = Arc::new(ArcSwap::from_pointee(SyncStatusReport::init()));

let forest_state = ForestState {
config: Config {
Expand All @@ -85,10 +85,10 @@ mod test {
},
..Default::default()
},
chain_config: Arc::new(ChainConfig::default()),
chain_config: Default::default(),
genesis_timestamp: 0,
sync_status: sync_status.clone(),
peer_manager: Arc::new(PeerManager::default()),
peer_manager: Default::default(),
};

let listener =
Expand All @@ -112,8 +112,15 @@ mod test {
};

// instrument the state so that the ready requirements are met
sync_status.write().status = NodeSyncStatus::Synced;
sync_status.write().current_head_epoch = i64::MAX;
sync_status.store(
Comment thread
hanabi1224 marked this conversation as resolved.
sync_status
.load()
.as_ref()
.clone()
.with_status(NodeSyncStatus::Synced)
.with_current_head_epoch(i64::MAX)
.into(),
);

assert_eq!(
call_healthcheck(false).await.unwrap().status(),
Expand All @@ -128,8 +135,15 @@ mod test {

// instrument the state so that the ready requirements are not met
drop(rpc_listener);
sync_status.write().status = NodeSyncStatus::Error;
sync_status.write().current_head_epoch = 0;
sync_status.store(
sync_status
.load()
.as_ref()
.clone()
.with_status(NodeSyncStatus::Error)
.with_current_head_epoch(0)
.into(),
);

assert_eq!(
call_healthcheck(false).await.unwrap().status(),
Expand All @@ -149,7 +163,7 @@ mod test {
let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();

let sync_status = Arc::new(RwLock::new(SyncStatusReport::default()));
let sync_status = Arc::new(ArcSwap::from_pointee(SyncStatusReport::default()));
let peer_manager = Arc::new(PeerManager::default());
let forest_state = ForestState {
config: Config {
Expand Down Expand Up @@ -187,7 +201,14 @@ mod test {
};

// instrument the state so that the live requirements are met
sync_status.write().status = NodeSyncStatus::Syncing;
sync_status.store(
sync_status
.load()
.as_ref()
.clone()
.with_status(NodeSyncStatus::Syncing)
.into(),
);
let peer = libp2p::PeerId::random();
peer_manager.touch_peer(&peer);

Expand All @@ -203,7 +224,14 @@ mod test {
assert!(text.contains("[+] peers connected"));

// instrument the state so that the live requirements are not met
sync_status.write().status = NodeSyncStatus::Error;
sync_status.store(
sync_status
.load()
.as_ref()
.clone()
.with_status(NodeSyncStatus::Error)
.into(),
);
peer_manager.remove_peer(&peer);

assert_eq!(
Expand All @@ -224,7 +252,7 @@ mod test {
let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let peer_manager = Arc::new(PeerManager::default());

let sync_status = Arc::new(RwLock::new(SyncStatusReport::default()));
let sync_status = Arc::new(ArcSwap::from_pointee(SyncStatusReport::default()));
let forest_state = ForestState {
config: Config {
client: Client {
Expand Down Expand Up @@ -261,8 +289,15 @@ mod test {
};

// instrument the state so that the health requirements are met
sync_status.write().current_head_epoch = i64::MAX;
sync_status.write().status = NodeSyncStatus::Syncing;
sync_status.store(
sync_status
.load()
.as_ref()
.clone()
.with_status(NodeSyncStatus::Syncing)
.with_current_head_epoch(i64::MAX)
.into(),
);
let peer = libp2p::PeerId::random();
peer_manager.touch_peer(&peer);

Expand All @@ -280,8 +315,15 @@ mod test {

// instrument the state so that the health requirements are not met
drop(rpc_listener);
sync_status.write().status = NodeSyncStatus::Error;
sync_status.write().current_head_epoch = 0;
sync_status.store(
sync_status
.load()
.as_ref()
.clone()
.with_status(NodeSyncStatus::Error)
.with_current_head_epoch(0)
.into(),
);
peer_manager.remove_peer(&peer);

assert_eq!(
Expand Down Expand Up @@ -311,7 +353,7 @@ mod test {
},
chain_config: Arc::default(),
genesis_timestamp: 0,
sync_status: Arc::new(RwLock::new(SyncStatusReport::default())),
sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::default())),
peer_manager: Arc::default(),
};
let listener =
Expand Down
3 changes: 1 addition & 2 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1787,8 +1787,7 @@ impl RpcMethod<0> for EthSyncing {
(): Self::Params,
ext: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
let sync_status: crate::chain_sync::SyncStatusReport =
crate::rpc::sync::SyncStatus::handle(ctx, (), ext).await?;
let sync_status = crate::rpc::sync::SyncStatus::handle(ctx, (), ext).await?;
match sync_status.status {
NodeSyncStatus::Synced => Ok(EthSyncingResult {
done_sync: true,
Expand Down
23 changes: 15 additions & 8 deletions src/rpc/methods/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ impl RpcMethod<0> for SyncStatus {
const DESCRIPTION: Option<&'static str> = Some("Returns the current sync status of the node.");

type Params = ();
type Ok = SyncStatusReport;
type Ok = Arc<SyncStatusReport>;

async fn handle(
ctx: Ctx,
(): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
let sync_status = ctx.sync_status.as_ref().read().clone();
let sync_status = ctx.sync_status.load().shallow_clone();
Ok(sync_status)
}
}
Expand All @@ -122,7 +122,7 @@ impl RpcMethod<1> for SyncSubmitBlock {
(block_msg,): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
if !matches!(ctx.sync_status.read().status, NodeSyncStatus::Synced) {
if !matches!(ctx.sync_status.load().status, NodeSyncStatus::Synced) {
Err(anyhow!("the node isn't in 'follow' mode"))?
}
let genesis_network_name = ctx.chain_config().network.genesis_name();
Expand Down Expand Up @@ -232,7 +232,7 @@ mod tests {
keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory).unwrap())),
mpool,
bad_blocks: Some(Default::default()),
sync_status: Arc::new(RwLock::new(SyncStatusReport::default())),
sync_status: Default::default(),
eth_event_handler: Arc::new(EthEventHandler::new()),
sync_network_context,
start_time,
Expand Down Expand Up @@ -279,16 +279,23 @@ mod tests {
let sync_status = SyncStatus::handle(ctx.clone(), (), &Default::default())
.await
.unwrap();
assert_eq!(sync_status, st_copy.as_ref().read().clone());
assert_eq!(sync_status, st_copy.load().clone());

// update cloned state
st_copy.write().status = NodeSyncStatus::Syncing;
st_copy.write().current_head_epoch = 4;
st_copy.store(
st_copy
.load()
.as_ref()
.clone()
.with_status(NodeSyncStatus::Syncing)
.with_current_head_epoch(4)
.into(),
);

let sync_status = SyncStatus::handle(ctx.clone(), (), &Default::default())
.await
.unwrap();

assert_eq!(sync_status, st_copy.as_ref().read().clone());
assert_eq!(sync_status, st_copy.load().clone());
}
}
3 changes: 2 additions & 1 deletion src/tool/offline_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::state_manager::StateManager;
use crate::utils::net::{DownloadFileOption, download_to};
use crate::utils::proofs_api::{self, ensure_proof_params_downloaded};
use crate::{Config, JWT_IDENTIFIER};
use arc_swap::ArcSwap;
use jsonrpsee::server::stop_channel;
use parking_lot::RwLock;
use std::{
Expand Down Expand Up @@ -102,7 +103,7 @@ pub async fn offline_rpc_state(
keystore: Arc::new(RwLock::new(keystore)),
mpool: message_pool,
bad_blocks: Default::default(),
sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())),
eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)),
sync_network_context,
start_time: chrono::Utc::now(),
Expand Down
3 changes: 2 additions & 1 deletion src/tool/subcommands/api_cmd/generate_test_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
state_manager::StateManager,
};
use api_compare_tests::TestDump;
use arc_swap::ArcSwap;
use fvm_shared4::address::Network;
use openrpc_types::ParamStructure;
use parking_lot::RwLock;
Expand Down Expand Up @@ -141,7 +142,7 @@ async fn ctx(
keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)),
mpool: message_pool,
bad_blocks: Default::default(),
sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
sync_status: Arc::new(ArcSwap::from_pointee(SyncStatusReport::init())),
eth_event_handler: Arc::new(EthEventHandler::new()),
sync_network_context,
start_time: chrono::Utc::now(),
Expand Down
Loading