From ff1882e028067b91b852df5db100f7d54d76ea93 Mon Sep 17 00:00:00 2001 From: echobt Date: Tue, 3 Feb 2026 14:55:29 +0000 Subject: [PATCH] fix(bittensor): improve reconnection by properly stopping old BlockSync When Bittensor RPC connection errors occur (e.g., 'restart required'), the reconnection logic now properly stops the old BlockSync before creating a new one. This prevents the old internal task from continuing to emit errors. Key changes: - Track BlockSync instance instead of just the spawned task handle - Call sync.stop() before creating new connection to clean up internal tasks - Remove unnecessary wrapper task around sync.start() - Maintain exponential backoff for reconnection attempts This resolves the issue where connection errors would spam logs every 5 seconds due to the old BlockSync task not being properly terminated. --- bins/validator-node/src/main.rs | 395 +++++++++++++++++++++++--------- 1 file changed, 293 insertions(+), 102 deletions(-) diff --git a/bins/validator-node/src/main.rs b/bins/validator-node/src/main.rs index d8be92c..9ab139e 100644 --- a/bins/validator-node/src/main.rs +++ b/bins/validator-node/src/main.rs @@ -23,9 +23,166 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use sysinfo::System; + use tokio_tungstenite::{connect_async, tungstenite::Message}; use tracing::{debug, error, info, warn}; +// ==================== Connection Error Detection ==================== + +/// Checks if a Bittensor error message indicates a critical connection failure +/// requiring immediate reconnection (e.g., RPC connection closed) +fn is_critical_bittensor_error(msg: &str) -> bool { + // Detect critical errors that require full reconnection + // These errors indicate the underlying RPC connection is dead + let critical_patterns = [ + "restart required", + "connection closed", + "background task closed", + "transport error", + ]; + + let msg_lower = msg.to_lowercase(); + critical_patterns + .iter() + .any(|pattern| msg_lower.contains(pattern)) +} + +// ==================== Weight Submission Context ==================== + +/// Context for weight submission operations. +/// Groups related parameters to reduce function argument count. +struct WeightSubmissionContext<'a> { + platform_client: &'a Arc, + subtensor: &'a Arc, + signer: &'a Arc, + subtensor_client: &'a Arc>, + cached_challenges: &'a Arc>>, + netuid: u16, + version_key: u64, +} + +// ==================== Block Event Context ==================== + +/// Context for handling block events. +/// Groups related parameters to reduce function argument count. +struct BlockEventContext<'a> { + platform_client: &'a Arc, + subtensor: &'a Option>, + signer: &'a Option>, + subtensor_client: &'a Option>>, + cached_challenges: &'a Arc>>, + netuid: u16, + version_key: u64, +} + +// ==================== Reconnection State ==================== + +/// Tracks the state of Bittensor reconnection attempts with exponential backoff. +struct ReconnectionState { + disconnected: bool, + last_attempt: std::time::Instant, + failures: u32, +} + +impl Default for ReconnectionState { + fn default() -> Self { + Self { + disconnected: false, + last_attempt: std::time::Instant::now(), + failures: 0, + } + } +} + +impl ReconnectionState { + /// Check if reconnection should be attempted based on exponential backoff. + /// Backoff formula: min(10 * 2^failures, 120) seconds + fn should_attempt(&self) -> bool { + if !self.disconnected { + return false; + } + let backoff_secs = std::cmp::min(10 * 2u64.pow(self.failures), 120); + self.last_attempt.elapsed() > Duration::from_secs(backoff_secs) + } + + /// Get the current backoff interval in seconds + fn current_backoff_secs(&self) -> u64 { + std::cmp::min(10 * 2u64.pow(self.failures), 120) + } + + /// Mark a reconnection attempt started + fn mark_attempt(&mut self) { + self.last_attempt = std::time::Instant::now(); + } + + /// Mark successful reconnection + fn mark_success(&mut self) { + self.disconnected = false; + self.failures = 0; + } + + /// Mark failed reconnection + fn mark_failure(&mut self) { + self.failures = self.failures.saturating_add(1); + } + + /// Mark as disconnected + fn mark_disconnected(&mut self) { + self.disconnected = true; + } +} + +// ==================== Reconnection Result ==================== + +/// Result of a Bittensor reconnection attempt +struct ReconnectionResult { + event_rx: tokio::sync::mpsc::Receiver, + sync: BlockSync, + client: Arc, +} + +/// Attempt to reconnect to Bittensor by creating a new BlockSync. +/// Properly stops the old BlockSync to clean up internal tasks. +async fn attempt_bittensor_reconnect( + endpoint: &str, + netuid: u16, + old_sync: Option, +) -> Result { + // Properly stop the old BlockSync to clean up internal tasks + // This sets the running flag to false and stops the listener + if let Some(old) = old_sync { + old.stop().await; + debug!("Stopped previous BlockSync"); + } + + // Create new client + let new_client = BittensorClient::new(endpoint).await?; + let new_client = Arc::new(new_client); + + // Create new BlockSync + let mut sync = BlockSync::new(BlockSyncConfig { + netuid, + ..Default::default() + }); + + // Get event receiver + let new_rx = sync.take_event_receiver().ok_or_else(|| { + anyhow::anyhow!("Failed to get event receiver from BlockSync - receiver already taken") + })?; + + // Connect + sync.connect(new_client.clone()).await?; + + // Start the sync (spawns internal task) + sync.start().await?; + + Ok(ReconnectionResult { + event_rx: new_rx, + sync, + client: new_client, + }) +} + // ==================== Platform Server Client ==================== #[derive(Clone)] @@ -605,6 +762,7 @@ async fn main() -> Result<()> { let subtensor_signer: Option>; let subtensor_client: Option>>; let mut block_rx: Option> = None; + let mut block_sync: Option = None; let bittensor_client_for_metagraph: Option>; if !args.no_bittensor { @@ -670,13 +828,20 @@ async fn main() -> Result<()> { if let Err(e) = sync.connect(bittensor_client_for_sync).await { warn!("Block sync connect failed: {}", e); } else { - tokio::spawn(async move { - if let Err(e) = sync.start().await { - error!("Block sync error: {}", e); + match rx { + Some(event_rx) => { + if let Err(e) = sync.start().await { + error!("Block sync start failed: {}", e); + } else { + block_rx = Some(event_rx); + block_sync = Some(sync); + info!("Block sync: started"); + } + } + None => { + warn!("Block sync take_event_receiver() returned None - receiver already taken"); } - }); - block_rx = rx; - info!("Block sync: started"); + } } } Err(e) => { @@ -705,10 +870,8 @@ async fn main() -> Result<()> { let mut challenge_refresh_interval = tokio::time::interval(Duration::from_secs(60)); let mut metagraph_refresh_interval = tokio::time::interval(Duration::from_secs(300)); // 5 minutes - // Track Bittensor connection state for reconnection - let mut bittensor_disconnected = false; - let mut last_reconnect_attempt = std::time::Instant::now(); - let mut reconnect_failures: u32 = 0; + // Track Bittensor connection state for reconnection with exponential backoff + let mut reconnect_state = ReconnectionState::default(); // Store challenges in Arc for periodic refresh let cached_challenges: Arc>> = Arc::new(RwLock::new( @@ -752,17 +915,16 @@ async fn main() -> Result<()> { .as_secs() / 12; - submit_weights_for_epoch( - epoch, - &platform_client_clone, - &st_clone, - &sig_clone, - &client_clone, - &cached_challenges_clone, + let ctx = WeightSubmissionContext { + platform_client: &platform_client_clone, + subtensor: &st_clone, + signer: &sig_clone, + subtensor_client: &client_clone, + cached_challenges: &cached_challenges_clone, netuid, version_key, - ) - .await; + }; + submit_weights_for_epoch(epoch, &ctx).await; } else { info!("Pending commits found - weights already submitted for this epoch"); } @@ -780,84 +942,118 @@ async fn main() -> Result<()> { // Track disconnection state for reconnection logic match &event { BlockSyncEvent::Disconnected(msg) => { - bittensor_disconnected = true; - // Force immediate reconnect if client needs full restart - if msg.contains("restart required") || msg.contains("connection closed") { - last_reconnect_attempt = std::time::Instant::now() - Duration::from_secs(121); + reconnect_state.mark_disconnected(); + + // Detect critical errors that require immediate reconnection + if is_critical_bittensor_error(msg) { + // Attempt immediate reconnection for critical errors + // Only if backoff allows (prevents infinite rapid retries) + if reconnect_state.should_attempt() { + warn!("Critical Bittensor error detected: {}. Attempting immediate reconnection...", msg); + reconnect_state.mark_attempt(); + + let next_backoff = std::cmp::min(reconnect_state.current_backoff_secs() * 2, 120); + info!("Immediate Bittensor reconnection (attempt {}, next backoff {}s)...", + reconnect_state.failures + 1, next_backoff); + + // Try to reconnect using helper function + match attempt_bittensor_reconnect( + &subtensor_endpoint, + netuid, + block_sync.take() + ).await { + Ok(result) => { + info!("Bittensor reconnected successfully (immediate)"); + block_rx = Some(result.event_rx); + block_sync = Some(result.sync); + reconnect_state.mark_success(); + + // Also refresh metagraph with new client + if let Some(ref st_client) = subtensor_client { + match sync_metagraph(&result.client, netuid).await { + Ok(mg) => { + info!("Metagraph refreshed after reconnect: {} neurons", mg.n); + let mut client = st_client.write(); + client.set_metagraph(mg); + } + Err(e) => { + warn!("Metagraph refresh after reconnect failed: {}", e); + } + } + } + } + Err(e) => { + reconnect_state.mark_failure(); + let next_backoff = reconnect_state.current_backoff_secs(); + warn!("Immediate Bittensor reconnection failed: {} (will retry in {}s)", e, next_backoff); + } + } + } else { + let backoff_secs = reconnect_state.current_backoff_secs(); + let elapsed = reconnect_state.last_attempt.elapsed().as_secs(); + let remaining = backoff_secs.saturating_sub(elapsed); + warn!("Critical Bittensor error: {}. Backoff active, reconnection in ~{}s", msg, remaining); + } } } BlockSyncEvent::Reconnected | BlockSyncEvent::NewBlock { .. } => { - bittensor_disconnected = false; - reconnect_failures = 0; + reconnect_state.mark_success(); } _ => {} } - handle_block_event( - event, - &platform_client, - &subtensor, - &subtensor_signer, - &subtensor_client, - &cached_challenges, + let block_ctx = BlockEventContext { + platform_client: &platform_client, + subtensor: &subtensor, + signer: &subtensor_signer, + subtensor_client: &subtensor_client, + cached_challenges: &cached_challenges, netuid, version_key, - ).await; + }; + handle_block_event(event, &block_ctx).await; } _ = interval.tick() => { debug!("Heartbeat"); // Check if we need to attempt Bittensor reconnection with exponential backoff - // Base delay: 10s, doubles on each failure, max 120s - let backoff_secs = std::cmp::min(10 * 2u64.pow(reconnect_failures), 120); - if bittensor_disconnected && last_reconnect_attempt.elapsed() > Duration::from_secs(backoff_secs) { - last_reconnect_attempt = std::time::Instant::now(); - info!("Attempting Bittensor reconnection (attempt {}, next backoff {}s)...", reconnect_failures + 1, backoff_secs * 2); - - // Try to reconnect by creating a new BlockSync - match BittensorClient::new(&subtensor_endpoint).await { - Ok(new_client) => { - let mut sync = BlockSync::new(BlockSyncConfig { - netuid, - ..Default::default() - }); - - if let Some(new_rx) = sync.take_event_receiver() { - let new_client = Arc::new(new_client); - match sync.connect(new_client.clone()).await { - Ok(()) => { - // Spawn the sync task to keep it running - tokio::spawn(async move { - if let Err(e) = sync.start().await { - error!("Block sync error after reconnect: {}", e); - } - }); - - info!("Bittensor reconnected successfully"); - block_rx = Some(new_rx); - bittensor_disconnected = false; - reconnect_failures = 0; - - // Also refresh metagraph with new client - if let Some(ref st_client) = subtensor_client { - if let Ok(mg) = sync_metagraph(&new_client, netuid).await { - info!("Metagraph refreshed after reconnect: {} neurons", mg.n); - let mut client = st_client.write(); - client.set_metagraph(mg); - } - } + // This handles non-critical disconnections and retries after immediate reconnection fails + if reconnect_state.should_attempt() { + reconnect_state.mark_attempt(); + let next_backoff = std::cmp::min(reconnect_state.current_backoff_secs() * 2, 120); + info!("Attempting Bittensor reconnection (attempt {}, next backoff {}s)...", + reconnect_state.failures + 1, next_backoff); + + // Try to reconnect using helper function + match attempt_bittensor_reconnect( + &subtensor_endpoint, + netuid, + block_sync.take() + ).await { + Ok(result) => { + info!("Bittensor reconnected successfully"); + block_rx = Some(result.event_rx); + block_sync = Some(result.sync); + reconnect_state.mark_success(); + + // Also refresh metagraph with new client + if let Some(ref st_client) = subtensor_client { + match sync_metagraph(&result.client, netuid).await { + Ok(mg) => { + info!("Metagraph refreshed after reconnect: {} neurons", mg.n); + let mut client = st_client.write(); + client.set_metagraph(mg); } Err(e) => { - warn!("Failed to connect block sync: {}", e); - reconnect_failures = reconnect_failures.saturating_add(1); + warn!("Metagraph refresh after reconnect failed: {}", e); } } } } Err(e) => { - reconnect_failures = reconnect_failures.saturating_add(1); - let next_backoff = std::cmp::min(10 * 2u64.pow(reconnect_failures), 120); + reconnect_state.mark_failure(); + let next_backoff = reconnect_state.current_backoff_secs(); warn!("Bittensor reconnection failed: {} (will retry in {}s)", e, next_backoff); } } @@ -938,16 +1134,14 @@ fn load_keypair(args: &Args) -> Result { /// Submit weights for a given epoch /// This is the core weight submission logic, extracted to be reusable -async fn submit_weights_for_epoch( - epoch: u64, - platform_client: &Arc, - st: &Arc, - sig: &Arc, - client: &Arc>, - cached_challenges: &Arc>>, - netuid: u16, - version_key: u64, -) { +async fn submit_weights_for_epoch(epoch: u64, ctx: &WeightSubmissionContext<'_>) { + let platform_client = ctx.platform_client; + let st = ctx.subtensor; + let sig = ctx.signer; + let client = ctx.subtensor_client; + let cached_challenges = ctx.cached_challenges; + let netuid = ctx.netuid; + let version_key = ctx.version_key; info!("=== SUBMITTING WEIGHTS for epoch {} ===", epoch); // Get weights from platform-server using cached challenges @@ -1175,16 +1369,14 @@ async fn submit_weights_for_epoch( } } -async fn handle_block_event( - event: BlockSyncEvent, - platform_client: &Arc, - subtensor: &Option>, - signer: &Option>, - subtensor_client: &Option>>, - cached_challenges: &Arc>>, - netuid: u16, - version_key: u64, -) { +async fn handle_block_event(event: BlockSyncEvent, ctx: &BlockEventContext<'_>) { + let platform_client = ctx.platform_client; + let subtensor = ctx.subtensor; + let signer = ctx.signer; + let subtensor_client = ctx.subtensor_client; + let cached_challenges = ctx.cached_challenges; + let netuid = ctx.netuid; + let version_key = ctx.version_key; match event { BlockSyncEvent::NewBlock { block_number, .. } => { debug!("Block {}", block_number); @@ -1207,17 +1399,16 @@ async fn handle_block_event( signer.as_ref(), subtensor_client.as_ref(), ) { - submit_weights_for_epoch( - epoch, + let weight_ctx = WeightSubmissionContext { platform_client, - st, - sig, - client, + subtensor: st, + signer: sig, + subtensor_client: client, cached_challenges, netuid, version_key, - ) - .await; + }; + submit_weights_for_epoch(epoch, &weight_ctx).await; } else { warn!("No Subtensor/signer - cannot submit weights"); }