diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fd1328..220aa9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,14 @@ All notable changes to this project will be documented in this file. - Handle Redis connection errors (e.g. broken pipe during master failover) gracefully instead of panicking. Previously, `get_queued_query_count` would `.unwrap()` on the Redis result, causing a panic that poisoned the metrics `RwLock`, cascading into further panics and leaving pods unresponsive ([#111]). - +- Recover from a black-holed Redis connection instead of hanging until the liveness probe restarts the pod. + When the node running the Redis master is drained, the TCP connection silently stops delivering packets + without ever being closed, so the only error trino-lb sees is a response timeout. The redis crate's + `ConnectionManager` does not reconnect on timeouts (only on dropped-connection errors), so it kept reusing + the dead connection forever. trino-lb now enables TCP keepalive / `TCP_USER_TIMEOUT` on the Redis socket and + runs a background health check that rebuilds the connection itself when a periodic ping fails ([#109]). + +[#109]: https://github.com/stackabletech/trino-lb/issues/109 [#111]: https://github.com/stackabletech/trino-lb/pull/111 ## [0.6.0] - 2026-02-17 diff --git a/trino-lb-persistence/Cargo.toml b/trino-lb-persistence/Cargo.toml index 1c8bd61..287a444 100644 --- a/trino-lb-persistence/Cargo.toml +++ b/trino-lb-persistence/Cargo.toml @@ -25,3 +25,6 @@ tokio.workspace = true tracing.workspace = true trait-variant.workspace = true url.workspace = true + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt"] } diff --git a/trino-lb-persistence/src/redis/mod.rs b/trino-lb-persistence/src/redis/mod.rs index cdf36a2..c9cd1c9 100644 --- a/trino-lb-persistence/src/redis/mod.rs +++ b/trino-lb-persistence/src/redis/mod.rs @@ -1,18 +1,23 @@ use std::{ fmt::Debug, num::TryFromIntError, + sync::{Arc, PoisonError, RwLock}, time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}, }; -use futures::{TryFutureExt, future::try_join_all}; +use futures::{ + TryFutureExt, + future::{BoxFuture, try_join_all}, +}; use redis::{ - AsyncCommands, Client, RedisError, Script, + AsyncCommands, Client, IntoConnectionInfo, RedisError, Script, aio::{ConnectionManager, ConnectionManagerConfig, MultiplexedConnection}, cluster::{ClusterClientBuilder, ClusterConfig}, cluster_async::ClusterConnection, + io::tcp::{TcpSettings, socket2::TcpKeepalive}, }; use snafu::{OptionExt, ResultExt, Snafu}; -use tracing::{Instrument, debug, debug_span, info, instrument}; +use tracing::{Instrument, debug, debug_span, info, instrument, warn}; use trino_lb_core::{ TrinoClusterName, TrinoLbQueryId, TrinoQueryId, config::RedisConfig, @@ -26,6 +31,40 @@ use crate::Persistence; const REDIS_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); const REDIS_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10); +// TCP keepalive / user-timeout settings for the Redis socket. +// +// The redis crate's [`ConnectionManager`] only reconnects when a command fails with a +// dropped-connection error (broken pipe, connection reset, ...). When the Kubernetes node running +// the Redis master is drained, the established TCP connection turns into a black hole: packets are +// silently dropped and no FIN/RST ever arrives, so the only error trino-lb ever sees is a response +// *timeout* - which the redis crate does not treat as a reason to reconnect. Without these socket +// options the connection would stay wedged until the liveness probe restarts the pod (see issue +// #109). Enabling TCP keepalive (and TCP_USER_TIMEOUT) lets the kernel tear the dead socket down on +// its own, shrinking the detection window; the [`Reconnectable`] wrapper then rebuilds it. +const REDIS_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(5); +const REDIS_TCP_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(2); +const REDIS_TCP_KEEPALIVE_RETRIES: u32 = 3; +const REDIS_TCP_USER_TIMEOUT: Duration = Duration::from_secs(12); + +/// How often the background health check pings Redis to detect a black-holed connection (see the +/// note on [`REDIS_TCP_KEEPALIVE_TIME`]) and rebuild it via [`Reconnectable`]. Such a connection +/// only ever produces response *timeouts*, which the redis crate does not treat as a reason to +/// reconnect, so without this proactive check it would stay wedged until the liveness probe +/// restarts the pod (see issue #109). +const REDIS_HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(5); + +/// TCP settings applied to every Redis connection (and every reconnect), see the constants above. +fn tcp_settings() -> TcpSettings { + TcpSettings::default() + .set_keepalive( + TcpKeepalive::new() + .with_time(REDIS_TCP_KEEPALIVE_TIME) + .with_interval(REDIS_TCP_KEEPALIVE_INTERVAL) + .with_retries(REDIS_TCP_KEEPALIVE_RETRIES), + ) + .set_user_timeout(REDIS_TCP_USER_TIMEOUT) +} + const LAST_QUERY_COUNT_FETCHER_UPDATE_KEY: &str = "lastQueryCountFetcherUpdate"; const BINCODE_CONFIG: bincode::config::Configuration = bincode::config::standard(); @@ -129,6 +168,73 @@ pub enum Error { }, } +/// Builds a fresh Redis connection. Used by [`Reconnectable`] to rebuild a connection that went bad. +type ConnectionFactory = + Arc BoxFuture<'static, Result> + Send + Sync>; + +/// A Redis connection that can be transparently rebuilt when it goes bad. +/// +/// The redis crate's [`ConnectionManager`] only reconnects on dropped-connection errors, not on +/// timeouts, so a black-holed connection (e.g. after a Redis node drain, see issue #109 and the +/// note on [`REDIS_TCP_KEEPALIVE_TIME`]) would otherwise time out forever. This wrapper lets us +/// rebuild the underlying connection ourselves; the background health check in +/// [`RedisPersistence::spawn_health_check`] decides *when* to rebuild. +struct Reconnectable { + current: RwLock>, + factory: ConnectionFactory, + /// Guarantees only a single rebuild runs at a time. + reconnecting: tokio::sync::Mutex<()>, +} + +impl Reconnectable +where + T: Send + Sync + 'static, +{ + fn new(initial: T, factory: ConnectionFactory) -> Self { + Self { + current: RwLock::new(Arc::new(initial)), + factory, + reconnecting: tokio::sync::Mutex::new(()), + } + } + + /// Returns the connection currently in use. + fn current(&self) -> Arc { + Arc::clone(&self.current.read().unwrap_or_else(PoisonError::into_inner)) + } + + /// Rebuilds the connection, replacing the one currently in use. + /// + /// `used` is the connection that was found to be bad. This is a no-op if another rebuild is + /// already in progress, or if the connection has already been replaced since `used` was + /// obtained (i.e. another rebuild already happened). This keeps overlapping failure reports + /// from rebuilding the connection over and over. + async fn reconnect(&self, used: &Arc) { + let Ok(_guard) = self.reconnecting.try_lock() else { + // Another task is already rebuilding the connection. + return; + }; + + if !Arc::ptr_eq(&self.current(), used) { + // The connection was already replaced while we waited for the lock. + return; + } + + match (self.factory)().await { + Ok(connection) => { + *self.current.write().unwrap_or_else(PoisonError::into_inner) = Arc::new(connection); + info!("Successfully rebuilt the Redis connection"); + } + Err(error) => { + warn!( + ?error, + "Failed to rebuild the Redis connection, will retry on the next health check" + ); + } + } + } +} + /// This Redis implementation works against Redis clusters. It uses a single connection that is shared between all /// operations for best performance. However, this makes atomic operations hard as their are some pitfalls regarding /// `WATCH` in combination with `MULTI` and `EXEC` documented @@ -140,7 +246,7 @@ pub struct RedisPersistence where R: AsyncCommands + Clone, { - connection: R, + connection: Arc>, compare_and_set_script: Script, /// Sometimes we need to do stuff for all cluster groups, so we need to store them to iterate over them @@ -158,17 +264,32 @@ impl RedisPersistence { .set_connection_timeout(Some(REDIS_CONNECTION_TIMEOUT)) .set_response_timeout(Some(REDIS_RESPONSE_TIMEOUT)); - let client = Client::open(config.endpoint.as_str()).context(CreateClientSnafu)?; - let connection = client - .get_connection_manager_with_config(redis_config) - .await - .context(CreateClientSnafu)?; - - Ok(Self { + // The TCP settings live on the `ConnectionInfo` (not on `ConnectionManagerConfig`, which + // does not expose them), and are re-applied on every reconnect the manager performs. + let connection_info = config + .endpoint + .as_str() + .into_connection_info() + .context(CreateClientSnafu)? + .set_tcp_settings(tcp_settings()); + let client = Client::open(connection_info).context(CreateClientSnafu)?; + + let factory: ConnectionFactory = { + let client = client.clone(); + Arc::new(move || { + let client = client.clone(); + let redis_config = redis_config.clone(); + Box::pin(async move { client.get_connection_manager_with_config(redis_config).await }) + }) + }; + + let connection = factory().await.context(CreateClientSnafu)?; + + Ok(Self::new_with_connection( connection, - compare_and_set_script: compare_and_set_script(), + factory, cluster_groups, - }) + )) } } @@ -184,24 +305,32 @@ impl RedisPersistence> { .set_response_timeout(REDIS_RESPONSE_TIMEOUT); let client = ClusterClientBuilder::new([config.endpoint.as_str()]) + .tcp_settings(tcp_settings()) .build() .context(CreateClientSnafu)?; - let connection = client - .get_async_connection_with_config(redis_config) - .await - .context(CreateClientSnafu)?; - Ok(Self { + let factory: ConnectionFactory> = { + let client = client.clone(); + Arc::new(move || { + let client = client.clone(); + let redis_config = redis_config.clone(); + Box::pin(async move { client.get_async_connection_with_config(redis_config).await }) + }) + }; + + let connection = factory().await.context(CreateClientSnafu)?; + + Ok(Self::new_with_connection( connection, - compare_and_set_script: compare_and_set_script(), + factory, cluster_groups, - }) + )) } } impl Persistence for RedisPersistence where - R: AsyncCommands + Clone, + R: AsyncCommands + Clone + Send + Sync + 'static, { #[instrument(skip(self, queued_query))] async fn store_queued_query(&self, queued_query: QueuedQuery) -> Result<(), super::Error> { @@ -533,10 +662,55 @@ where impl RedisPersistence where - R: AsyncCommands + Clone, + R: AsyncCommands + Clone + Send + Sync + 'static, { + /// Wraps the initial connection in a [`Reconnectable`] and starts the background health check + /// that rebuilds it when it goes bad. + fn new_with_connection( + connection: R, + factory: ConnectionFactory, + cluster_groups: Vec, + ) -> Self { + let connection = Arc::new(Reconnectable::new(connection, factory)); + Self::spawn_health_check(Arc::clone(&connection)); + + Self { + connection, + compare_and_set_script: compare_and_set_script(), + cluster_groups, + } + } + fn connection(&self) -> R { - self.connection.clone() + (*self.connection.current()).clone() + } + + /// Spawns a background task that periodically pings Redis and rebuilds the connection if the + /// ping fails. + /// + /// The redis crate's [`ConnectionManager`] reconnects on dropped-connection errors but not on + /// timeouts, so a black-holed connection (see the note on [`REDIS_TCP_KEEPALIVE_TIME`]) would + /// otherwise time out on every command indefinitely. A periodic ping detects this - and heals + /// it even while there is no other traffic - without having to wrap every individual operation. + /// [`Reconnectable::reconnect`] makes sure we only rebuild once per dead connection. + fn spawn_health_check(connection: Arc>) { + tokio::spawn(async move { + loop { + tokio::time::sleep(REDIS_HEALTH_CHECK_INTERVAL).await; + + let current = connection.current(); + let mut ping_connection = (*current).clone(); + // The connection carries its own response timeout, so a black-holed socket makes + // this `PING` fail rather than hang forever. + let ping: Result<(), RedisError> = + redis::cmd("PING").query_async(&mut ping_connection).await; + + if let Err(error) = ping { + warn!(?error, "Redis health check failed, rebuilding the connection"); + connection.reconnect(¤t).await; + } + } + }); } #[instrument(skip(self))] @@ -610,3 +784,74 @@ fn compare_and_set_script() -> Script { ", ) } + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicU32, Ordering}; + + use super::*; + + /// Returns a [`Reconnectable`] over a `u32` plus a counter of how many times it was rebuilt. + /// Every rebuild yields once (so concurrent rebuild attempts interleave deterministically) and + /// then returns the new rebuild count as the connection value. + fn counting_reconnectable() -> (Arc, Reconnectable) { + let rebuilds = Arc::new(AtomicU32::new(0)); + + let factory_rebuilds = Arc::clone(&rebuilds); + let factory: ConnectionFactory = Arc::new(move || { + let rebuilds = Arc::clone(&factory_rebuilds); + Box::pin(async move { + tokio::task::yield_now().await; + Ok(rebuilds.fetch_add(1, Ordering::SeqCst) + 1) + }) + }); + + (rebuilds, Reconnectable::new(0, factory)) + } + + #[tokio::test] + async fn reconnect_replaces_the_connection() { + let (rebuilds, reconnectable) = counting_reconnectable(); + + let used = reconnectable.current(); + assert_eq!(*used, 0); + + reconnectable.reconnect(&used).await; + + assert_eq!(rebuilds.load(Ordering::SeqCst), 1); + assert_eq!(*reconnectable.current(), 1); + } + + #[tokio::test] + async fn reconnect_is_a_noop_for_an_already_replaced_connection() { + let (rebuilds, reconnectable) = counting_reconnectable(); + + let stale = reconnectable.current(); + reconnectable.reconnect(&stale).await; + assert_eq!(*reconnectable.current(), 1); + + // The handle is now stale (the connection was replaced); reconnecting with it must not + // rebuild again, otherwise repeated failure reports would rebuild over and over. + reconnectable.reconnect(&stale).await; + assert_eq!(rebuilds.load(Ordering::SeqCst), 1); + assert_eq!(*reconnectable.current(), 1); + } + + #[tokio::test] + async fn concurrent_reconnects_rebuild_only_once() { + let (rebuilds, reconnectable) = counting_reconnectable(); + let reconnectable = Arc::new(reconnectable); + + let used = reconnectable.current(); + let attempts = (0..8).map(|_| { + let reconnectable = Arc::clone(&reconnectable); + let used = Arc::clone(&used); + async move { reconnectable.reconnect(&used).await } + }); + futures::future::join_all(attempts).await; + + // Single-flight: many concurrent failures on the same connection rebuild it exactly once. + assert_eq!(rebuilds.load(Ordering::SeqCst), 1); + assert_eq!(*reconnectable.current(), 1); + } +}