Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ All notable changes to this project will be documented in this file.
- Handle Redis connection errors (e.g. broken pipe during master failover) gracefully instead of panicking.
Previously, `get_queued_query_count` would `.unwrap()` on the Redis result, causing a panic that poisoned
the metrics `RwLock`, cascading into further panics and leaving pods unresponsive ([#111]).

- Recover from a black-holed Redis connection instead of hanging until the liveness probe restarts the pod.
When the node running the Redis master is drained, the TCP connection silently stops delivering packets
without ever being closed, so the only error trino-lb sees is a response timeout. The redis crate's
`ConnectionManager` does not reconnect on timeouts (only on dropped-connection errors), so it kept reusing
the dead connection forever. trino-lb now enables TCP keepalive / `TCP_USER_TIMEOUT` on the Redis socket and
runs a background health check that rebuilds the connection itself when a periodic ping fails ([#109]).

[#109]: https://github.com/stackabletech/trino-lb/issues/109
[#111]: https://github.com/stackabletech/trino-lb/pull/111

## [0.6.0] - 2026-02-17
Expand Down
3 changes: 3 additions & 0 deletions trino-lb-persistence/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ tokio.workspace = true
tracing.workspace = true
trait-variant.workspace = true
url.workspace = true

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt"] }
291 changes: 268 additions & 23 deletions trino-lb-persistence/src/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
use std::{
fmt::Debug,
num::TryFromIntError,
sync::{Arc, PoisonError, RwLock},
time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
};

use futures::{TryFutureExt, future::try_join_all};
use futures::{
TryFutureExt,
future::{BoxFuture, try_join_all},
};
use redis::{
AsyncCommands, Client, RedisError, Script,
AsyncCommands, Client, IntoConnectionInfo, RedisError, Script,
aio::{ConnectionManager, ConnectionManagerConfig, MultiplexedConnection},
cluster::{ClusterClientBuilder, ClusterConfig},
cluster_async::ClusterConnection,
io::tcp::{TcpSettings, socket2::TcpKeepalive},
};
use snafu::{OptionExt, ResultExt, Snafu};
use tracing::{Instrument, debug, debug_span, info, instrument};
use tracing::{Instrument, debug, debug_span, info, instrument, warn};
use trino_lb_core::{
TrinoClusterName, TrinoLbQueryId, TrinoQueryId,
config::RedisConfig,
Expand All @@ -26,6 +31,40 @@ use crate::Persistence;
const REDIS_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
const REDIS_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);

// TCP keepalive / user-timeout settings for the Redis socket.
//
// The redis crate's [`ConnectionManager`] only reconnects when a command fails with a
// dropped-connection error (broken pipe, connection reset, ...). When the Kubernetes node running
// the Redis master is drained, the established TCP connection turns into a black hole: packets are
// silently dropped and no FIN/RST ever arrives, so the only error trino-lb ever sees is a response
// *timeout* - which the redis crate does not treat as a reason to reconnect. Without these socket
// options the connection would stay wedged until the liveness probe restarts the pod (see issue
// #109). Enabling TCP keepalive (and TCP_USER_TIMEOUT) lets the kernel tear the dead socket down on
// its own, shrinking the detection window; the [`Reconnectable`] wrapper then rebuilds it.
const REDIS_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
const REDIS_TCP_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(2);
const REDIS_TCP_KEEPALIVE_RETRIES: u32 = 3;
const REDIS_TCP_USER_TIMEOUT: Duration = Duration::from_secs(12);

/// How often the background health check pings Redis to detect a black-holed connection (see the
/// note on [`REDIS_TCP_KEEPALIVE_TIME`]) and rebuild it via [`Reconnectable`]. Such a connection
/// only ever produces response *timeouts*, which the redis crate does not treat as a reason to
/// reconnect, so without this proactive check it would stay wedged until the liveness probe
/// restarts the pod (see issue #109).
const REDIS_HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(5);

/// TCP settings applied to every Redis connection (and every reconnect), see the constants above.
fn tcp_settings() -> TcpSettings {
TcpSettings::default()
.set_keepalive(
TcpKeepalive::new()
.with_time(REDIS_TCP_KEEPALIVE_TIME)
.with_interval(REDIS_TCP_KEEPALIVE_INTERVAL)
.with_retries(REDIS_TCP_KEEPALIVE_RETRIES),
)
.set_user_timeout(REDIS_TCP_USER_TIMEOUT)
}

const LAST_QUERY_COUNT_FETCHER_UPDATE_KEY: &str = "lastQueryCountFetcherUpdate";

const BINCODE_CONFIG: bincode::config::Configuration = bincode::config::standard();
Expand Down Expand Up @@ -129,6 +168,73 @@ pub enum Error {
},
}

/// Builds a fresh Redis connection. Used by [`Reconnectable`] to rebuild a connection that went bad.
type ConnectionFactory<T> =
Arc<dyn Fn() -> BoxFuture<'static, Result<T, RedisError>> + Send + Sync>;

/// A Redis connection that can be transparently rebuilt when it goes bad.
///
/// The redis crate's [`ConnectionManager`] only reconnects on dropped-connection errors, not on
/// timeouts, so a black-holed connection (e.g. after a Redis node drain, see issue #109 and the
/// note on [`REDIS_TCP_KEEPALIVE_TIME`]) would otherwise time out forever. This wrapper lets us
/// rebuild the underlying connection ourselves; the background health check in
/// [`RedisPersistence::spawn_health_check`] decides *when* to rebuild.
struct Reconnectable<T> {
current: RwLock<Arc<T>>,
factory: ConnectionFactory<T>,
/// Guarantees only a single rebuild runs at a time.
reconnecting: tokio::sync::Mutex<()>,
}

impl<T> Reconnectable<T>
where
T: Send + Sync + 'static,
{
fn new(initial: T, factory: ConnectionFactory<T>) -> Self {
Self {
current: RwLock::new(Arc::new(initial)),
factory,
reconnecting: tokio::sync::Mutex::new(()),
}
}

/// Returns the connection currently in use.
fn current(&self) -> Arc<T> {
Arc::clone(&self.current.read().unwrap_or_else(PoisonError::into_inner))
}

/// Rebuilds the connection, replacing the one currently in use.
///
/// `used` is the connection that was found to be bad. This is a no-op if another rebuild is
/// already in progress, or if the connection has already been replaced since `used` was
/// obtained (i.e. another rebuild already happened). This keeps overlapping failure reports
/// from rebuilding the connection over and over.
async fn reconnect(&self, used: &Arc<T>) {
let Ok(_guard) = self.reconnecting.try_lock() else {
// Another task is already rebuilding the connection.
return;
};

if !Arc::ptr_eq(&self.current(), used) {
// The connection was already replaced while we waited for the lock.
return;
}

match (self.factory)().await {
Ok(connection) => {
*self.current.write().unwrap_or_else(PoisonError::into_inner) = Arc::new(connection);
info!("Successfully rebuilt the Redis connection");
}
Err(error) => {
warn!(
?error,
"Failed to rebuild the Redis connection, will retry on the next health check"
);
}
}
}
}

/// This Redis implementation works against Redis clusters. It uses a single connection that is shared between all
/// operations for best performance. However, this makes atomic operations hard as their are some pitfalls regarding
/// `WATCH` in combination with `MULTI` and `EXEC` documented
Expand All @@ -140,7 +246,7 @@ pub struct RedisPersistence<R>
where
R: AsyncCommands + Clone,
{
connection: R,
connection: Arc<Reconnectable<R>>,
compare_and_set_script: Script,

/// Sometimes we need to do stuff for all cluster groups, so we need to store them to iterate over them
Expand All @@ -158,17 +264,32 @@ impl RedisPersistence<ConnectionManager> {
.set_connection_timeout(Some(REDIS_CONNECTION_TIMEOUT))
.set_response_timeout(Some(REDIS_RESPONSE_TIMEOUT));

let client = Client::open(config.endpoint.as_str()).context(CreateClientSnafu)?;
let connection = client
.get_connection_manager_with_config(redis_config)
.await
.context(CreateClientSnafu)?;

Ok(Self {
// The TCP settings live on the `ConnectionInfo` (not on `ConnectionManagerConfig`, which
// does not expose them), and are re-applied on every reconnect the manager performs.
let connection_info = config
.endpoint
.as_str()
.into_connection_info()
.context(CreateClientSnafu)?
.set_tcp_settings(tcp_settings());
let client = Client::open(connection_info).context(CreateClientSnafu)?;

let factory: ConnectionFactory<ConnectionManager> = {
let client = client.clone();
Arc::new(move || {
let client = client.clone();
let redis_config = redis_config.clone();
Box::pin(async move { client.get_connection_manager_with_config(redis_config).await })
})
};

let connection = factory().await.context(CreateClientSnafu)?;

Ok(Self::new_with_connection(
connection,
compare_and_set_script: compare_and_set_script(),
factory,
cluster_groups,
})
))
}
}

Expand All @@ -184,24 +305,32 @@ impl RedisPersistence<ClusterConnection<MultiplexedConnection>> {
.set_response_timeout(REDIS_RESPONSE_TIMEOUT);

let client = ClusterClientBuilder::new([config.endpoint.as_str()])
.tcp_settings(tcp_settings())
.build()
.context(CreateClientSnafu)?;
let connection = client
.get_async_connection_with_config(redis_config)
.await
.context(CreateClientSnafu)?;

Ok(Self {
let factory: ConnectionFactory<ClusterConnection<MultiplexedConnection>> = {
let client = client.clone();
Arc::new(move || {
let client = client.clone();
let redis_config = redis_config.clone();
Box::pin(async move { client.get_async_connection_with_config(redis_config).await })
})
};

let connection = factory().await.context(CreateClientSnafu)?;

Ok(Self::new_with_connection(
connection,
compare_and_set_script: compare_and_set_script(),
factory,
cluster_groups,
})
))
}
}

impl<R> Persistence for RedisPersistence<R>
where
R: AsyncCommands + Clone,
R: AsyncCommands + Clone + Send + Sync + 'static,
{
#[instrument(skip(self, queued_query))]
async fn store_queued_query(&self, queued_query: QueuedQuery) -> Result<(), super::Error> {
Expand Down Expand Up @@ -533,10 +662,55 @@ where

impl<R> RedisPersistence<R>
where
R: AsyncCommands + Clone,
R: AsyncCommands + Clone + Send + Sync + 'static,
{
/// Wraps the initial connection in a [`Reconnectable`] and starts the background health check
/// that rebuilds it when it goes bad.
fn new_with_connection(
connection: R,
factory: ConnectionFactory<R>,
cluster_groups: Vec<String>,
) -> Self {
let connection = Arc::new(Reconnectable::new(connection, factory));
Self::spawn_health_check(Arc::clone(&connection));

Self {
connection,
compare_and_set_script: compare_and_set_script(),
cluster_groups,
}
}

fn connection(&self) -> R {
self.connection.clone()
(*self.connection.current()).clone()
}

/// Spawns a background task that periodically pings Redis and rebuilds the connection if the
/// ping fails.
///
/// The redis crate's [`ConnectionManager`] reconnects on dropped-connection errors but not on
/// timeouts, so a black-holed connection (see the note on [`REDIS_TCP_KEEPALIVE_TIME`]) would
/// otherwise time out on every command indefinitely. A periodic ping detects this - and heals
/// it even while there is no other traffic - without having to wrap every individual operation.
/// [`Reconnectable::reconnect`] makes sure we only rebuild once per dead connection.
fn spawn_health_check(connection: Arc<Reconnectable<R>>) {
tokio::spawn(async move {
loop {
tokio::time::sleep(REDIS_HEALTH_CHECK_INTERVAL).await;

let current = connection.current();
let mut ping_connection = (*current).clone();
// The connection carries its own response timeout, so a black-holed socket makes
// this `PING` fail rather than hang forever.
let ping: Result<(), RedisError> =
redis::cmd("PING").query_async(&mut ping_connection).await;

if let Err(error) = ping {
warn!(?error, "Redis health check failed, rebuilding the connection");
connection.reconnect(&current).await;
}
}
});
}

#[instrument(skip(self))]
Expand Down Expand Up @@ -610,3 +784,74 @@ fn compare_and_set_script() -> Script {
",
)
}

#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU32, Ordering};

use super::*;

/// Returns a [`Reconnectable`] over a `u32` plus a counter of how many times it was rebuilt.
/// Every rebuild yields once (so concurrent rebuild attempts interleave deterministically) and
/// then returns the new rebuild count as the connection value.
fn counting_reconnectable() -> (Arc<AtomicU32>, Reconnectable<u32>) {
let rebuilds = Arc::new(AtomicU32::new(0));

let factory_rebuilds = Arc::clone(&rebuilds);
let factory: ConnectionFactory<u32> = Arc::new(move || {
let rebuilds = Arc::clone(&factory_rebuilds);
Box::pin(async move {
tokio::task::yield_now().await;
Ok(rebuilds.fetch_add(1, Ordering::SeqCst) + 1)
})
});

(rebuilds, Reconnectable::new(0, factory))
}

#[tokio::test]
async fn reconnect_replaces_the_connection() {
let (rebuilds, reconnectable) = counting_reconnectable();

let used = reconnectable.current();
assert_eq!(*used, 0);

reconnectable.reconnect(&used).await;

assert_eq!(rebuilds.load(Ordering::SeqCst), 1);
assert_eq!(*reconnectable.current(), 1);
}

#[tokio::test]
async fn reconnect_is_a_noop_for_an_already_replaced_connection() {
let (rebuilds, reconnectable) = counting_reconnectable();

let stale = reconnectable.current();
reconnectable.reconnect(&stale).await;
assert_eq!(*reconnectable.current(), 1);

// The handle is now stale (the connection was replaced); reconnecting with it must not
// rebuild again, otherwise repeated failure reports would rebuild over and over.
reconnectable.reconnect(&stale).await;
assert_eq!(rebuilds.load(Ordering::SeqCst), 1);
assert_eq!(*reconnectable.current(), 1);
}

#[tokio::test]
async fn concurrent_reconnects_rebuild_only_once() {
let (rebuilds, reconnectable) = counting_reconnectable();
let reconnectable = Arc::new(reconnectable);

let used = reconnectable.current();
let attempts = (0..8).map(|_| {
let reconnectable = Arc::clone(&reconnectable);
let used = Arc::clone(&used);
async move { reconnectable.reconnect(&used).await }
});
futures::future::join_all(attempts).await;

// Single-flight: many concurrent failures on the same connection rebuild it exactly once.
assert_eq!(rebuilds.load(Ordering::SeqCst), 1);
assert_eq!(*reconnectable.current(), 1);
}
}
Loading