diff --git a/Cargo.lock b/Cargo.lock index 6de8a352..387eb531 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -121,6 +121,33 @@ version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100" +[[package]] +name = "async-backtrace" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcb391558246d27a13f195c1e3a53eda422270fdd452bd57a5aa9c1da1bb198" +dependencies = [ + "async-backtrace-attributes", + "dashmap", + "futures", + "loom", + "once_cell", + "pin-project-lite", + "rustc-hash", + "static_assertions", +] + +[[package]] +name = "async-backtrace-attributes" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "affbba0d438add06462a0371997575927bc05052f7ec486e7a4ca405c956c3d7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -587,6 +614,19 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "debugid" version = "0.8.0" @@ -938,6 +978,19 @@ dependencies = [ "slab", ] +[[package]] +name = "generator" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc16584ff22b460a382b7feec54b23d2908d858152e5739a120b949293bd74e" +dependencies = [ + "cc", + "libc", + "log", + "rustversion", + "windows", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -1014,6 +1067,12 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -1031,7 +1090,7 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" dependencies = [ - "hashbrown", + "hashbrown 0.15.5", ] [[package]] @@ -1356,7 +1415,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2481980430f9f78649238835720ddccc57e52df14ffce1c6f37391d61b563e9" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.5", ] [[package]] @@ -1517,6 +1576,19 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "loom" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "matchers" version = "0.2.0" @@ -2344,6 +2416,12 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.4.1" @@ -2439,6 +2517,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -2810,7 +2894,7 @@ dependencies = [ "futures-intrusive", "futures-io", "futures-util", - "hashbrown", + "hashbrown 0.15.5", "hashlink", "indexmap", "log", @@ -2980,6 +3064,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stringprep" version = "0.1.5" @@ -3039,6 +3129,7 @@ name = "taskbroker" version = "0.1.0" dependencies = [ "anyhow", + "async-backtrace", "async-trait", "bytes", "chrono", @@ -3818,6 +3909,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-core" version = "0.61.2" diff --git a/Cargo.toml b/Cargo.toml index c350764d..6e618f47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ debug = 1 [dependencies] anyhow = "1.0.92" +async-backtrace = "0.2" async-trait = "0.1" bytes = "1.11.1" chrono = { version = "0.4.26" } diff --git a/src/config.rs b/src/config.rs index 3d90ed10..700f1a5a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -268,6 +268,12 @@ pub struct Config { /// Enable additional metrics for the sqlite. pub enable_sqlite_status_metrics: bool, + /// When true, the upkeep loop emits the current `async_backtrace::taskdump_tree` + /// snapshot at `debug!` every 30 seconds. Useful for diagnosing hangs in the + /// store / fetch / push pipelines; off by default because the tree can be + /// large and noisy. + pub log_async_backtrace: bool, + /// How to deliver tasks to workers: "push" or "pull". pub delivery_mode: DeliveryMode, @@ -394,6 +400,7 @@ impl Default for Config { full_vacuum_on_upkeep: true, vacuum_interval_ms: 30000, enable_sqlite_status_metrics: true, + log_async_backtrace: false, delivery_mode: DeliveryMode::Pull, fetch_threads: 1, fetch_wait_ms: 100, diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index 9fe35be6..4ad811ad 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::Result; +use async_backtrace::framed; use elegant_departure::get_shutdown_guard; use tokio::time::sleep; use tonic::async_trait; @@ -54,6 +55,7 @@ pub trait TaskPusher { #[async_trait] impl TaskPusher for PushPool { + #[framed] async fn submit_task(&self, activation: InflightActivation) -> Result<(), PushError> { self.submit(activation).await } @@ -86,6 +88,7 @@ impl FetchPool { } /// Spawns one task per effective fetch thread ([`normalize_fetch_threads`]), each claiming pending work only in its bucket subrange. + #[framed] pub async fn start(&self) -> Result<()> { let fetch_wait_ms = self.config.fetch_wait_ms; let fetch_threads = normalize_fetch_threads(self.config.fetch_threads); @@ -100,7 +103,7 @@ impl FetchPool { let guard = get_shutdown_guard().shutdown_on_drop(); - async move { + async_backtrace::frame!(async move { loop { tokio::select! { _ = guard.wait() => { @@ -190,7 +193,7 @@ impl FetchPool { } => {} } } - } + }) }); while let Some(res) = fetch_pool.join_next().await { diff --git a/src/push/mod.rs b/src/push/mod.rs index 6407fd98..2643cc27 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::{Context, Result}; +use async_backtrace::framed; use elegant_departure::get_shutdown_guard; use flume::{Receiver, SendError, Sender}; use hmac::{Hmac, Mac}; @@ -67,6 +68,7 @@ trait WorkerClient { #[async_trait] impl WorkerClient for WorkerServiceClient { + #[framed] async fn send( &mut self, request: PushTaskRequest, @@ -135,6 +137,7 @@ impl PushPool { } /// Spawn `config.push_threads` asynchronous tasks, each of which repeatedly moves pending activations from the channel to the worker service until the shutdown signal is received. + #[framed] pub async fn start(&self) -> Result<()> { let store = self.store.clone(); let worker_factory = self.worker_factory.clone(); @@ -156,7 +159,7 @@ impl PushPool { let timeout = Duration::from_millis(self.config.push_timeout_ms); let grpc_shared_secret = self.config.grpc_shared_secret.clone(); - async move { + async_backtrace::frame!(async move { metrics::counter!("push.worker.connect.attempt").increment(1); let mut workers: HashMap> = HashMap::new(); @@ -319,7 +322,7 @@ impl PushPool { } Ok(()) - } + }) }, ); @@ -339,6 +342,7 @@ impl PushPool { } /// Send an activation to the internal asynchronous MPMC channel used by all running push threads. Times out after `config.push_queue_timeout_ms` milliseconds. + #[framed] pub async fn submit(&self, activation: InflightActivation) -> Result<(), PushError> { let duration = Duration::from_millis(self.config.push_queue_timeout_ms); let start = Instant::now(); @@ -367,6 +371,7 @@ impl PushPool { } /// Decode task activation and push it to a worker. +#[framed] async fn push_task( worker: &mut (dyn WorkerClient + Send), activation: InflightActivation, diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 81d720fd..8f087b59 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -8,6 +8,7 @@ use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions}; use sqlx::{FromRow, Pool, Postgres, QueryBuilder}; use anyhow::{Error, anyhow}; +use async_backtrace::framed; use async_trait::async_trait; use chrono::{DateTime, Utc}; use sentry_protos::taskbroker::v1::OnAttemptsExceeded; @@ -96,6 +97,7 @@ impl From for InflightActivation { } } +#[framed] pub async fn create_postgres_pool( connection: &PgConnectOptions, database_name: &str, @@ -113,6 +115,7 @@ pub async fn create_postgres_pool( Ok((read_pool, write_pool)) } +#[framed] pub async fn create_default_postgres_pool( connection: &PgConnectOptions, default_database_name: &str, @@ -173,6 +176,7 @@ pub struct PostgresActivationStore { } impl PostgresActivationStore { + #[framed] async fn acquire_write_conn_metric( &self, caller: &'static str, @@ -183,6 +187,7 @@ impl PostgresActivationStore { Ok(conn) } + #[framed] pub async fn new(config: PostgresActivationStoreConfig) -> Result { if config.run_migrations { let default_pool = create_default_postgres_pool( @@ -255,18 +260,21 @@ impl InflightActivationStore for PostgresActivationStore { /// Depending on config data, will either vacuum a set number of /// pages or attempt to reclaim all free pages. #[instrument(skip_all)] + #[framed] async fn vacuum_db(&self) -> Result<(), Error> { // TODO: Remove Ok(()) } /// Perform a full vacuum on the database. + #[framed] async fn full_vacuum_db(&self) -> Result<(), Error> { // TODO: Remove Ok(()) } /// Get the size of the database in bytes based on SQLite metadata queries. + #[framed] async fn db_size(&self) -> Result { let row_result: (i64,) = sqlx::query_as("SELECT pg_database_size($1) as size") .bind(&self.config.pg_database_name) @@ -279,6 +287,7 @@ impl InflightActivationStore for PostgresActivationStore { } /// Get an activation by id. Primarily used for testing + #[framed] async fn get_by_id(&self, id: &str) -> Result, Error> { let row_result: Option = sqlx::query_as( " @@ -324,6 +333,7 @@ impl InflightActivationStore for PostgresActivationStore { } #[instrument(skip_all)] + #[framed] async fn store(&self, batch: Vec) -> Result { if batch.is_empty() { return Ok(0); @@ -400,6 +410,7 @@ impl InflightActivationStore for PostgresActivationStore { } #[instrument(skip_all)] + #[framed] async fn claim_activations( &self, application: Option<&str>, @@ -491,6 +502,7 @@ impl InflightActivationStore for PostgresActivationStore { } #[instrument(skip_all)] + #[framed] async fn mark_activation_processing(&self, id: &str) -> Result<(), Error> { let mut conn = self .acquire_write_conn_metric("mark_activation_processing") @@ -530,6 +542,7 @@ impl InflightActivationStore for PostgresActivationStore { /// as we are interested in latency to the *first* attempt. /// Tasks with delay_until set, will have their age adjusted based on their /// delay time. No tasks = 0 lag + #[framed] async fn pending_activation_max_lag(&self, now: &DateTime) -> f64 { let mut query_builder = QueryBuilder::new( "SELECT received_at, delay_until @@ -564,6 +577,7 @@ impl InflightActivationStore for PostgresActivationStore { } #[instrument(skip_all)] + #[framed] async fn count_by_status(&self, status: InflightActivationStatus) -> Result { let mut query_builder = QueryBuilder::new( "SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = ", @@ -577,6 +591,7 @@ impl InflightActivationStore for PostgresActivationStore { Ok(result.0 as usize) } + #[framed] async fn count(&self) -> Result { let mut query_builder = QueryBuilder::new("SELECT COUNT(*) as count FROM inflight_taskactivations"); @@ -589,6 +604,7 @@ impl InflightActivationStore for PostgresActivationStore { } #[instrument(skip_all)] + #[framed] async fn count_depths(&self) -> Result { // Notice that statuses are embedded into the query for simplicity - if the enum is every changed, this must change too! let mut query_builder = QueryBuilder::new( @@ -616,6 +632,7 @@ impl InflightActivationStore for PostgresActivationStore { /// Update the status of a specific activation #[instrument(skip_all)] + #[framed] async fn set_status( &self, id: &str, @@ -638,6 +655,7 @@ impl InflightActivationStore for PostgresActivationStore { } #[instrument(skip_all)] + #[framed] async fn set_processing_deadline( &self, id: &str, @@ -655,6 +673,7 @@ impl InflightActivationStore for PostgresActivationStore { } #[instrument(skip_all)] + #[framed] async fn delete_activation(&self, id: &str) -> Result<(), Error> { let mut conn = self.acquire_write_conn_metric("delete_activation").await?; sqlx::query("DELETE FROM inflight_taskactivations WHERE id = $1") @@ -665,6 +684,7 @@ impl InflightActivationStore for PostgresActivationStore { } #[instrument(skip_all)] + #[framed] async fn get_retry_activations(&self) -> Result, Error> { let mut query_builder = QueryBuilder::new( "SELECT id, @@ -702,6 +722,7 @@ impl InflightActivationStore for PostgresActivationStore { } // Used in tests + #[framed] async fn clear(&self) -> Result<(), Error> { let mut conn = self.acquire_write_conn_metric("clear").await?; sqlx::query("TRUNCATE TABLE inflight_taskactivations") @@ -713,6 +734,7 @@ impl InflightActivationStore for PostgresActivationStore { /// Revert expired push claims back to pending status. #[instrument(skip_all)] + #[framed] async fn handle_claim_expiration(&self) -> Result { let now = Utc::now(); let mut conn = self @@ -743,6 +765,7 @@ impl InflightActivationStore for PostgresActivationStore { /// Exceeding a processing deadline does not consume a retry as we don't know /// if a worker took the task and was killed, or failed. #[instrument(skip_all)] + #[framed] async fn handle_processing_deadline(&self) -> Result { let now = Utc::now(); let mut atomic = self.write_pool.begin().await?; @@ -797,6 +820,7 @@ impl InflightActivationStore for PostgresActivationStore { /// Update tasks that have exceeded their max processing attempts. /// These tasks are set to status=failure and will be handled by handle_failed_tasks accordingly. #[instrument(skip_all)] + #[framed] async fn handle_processing_attempts(&self) -> Result { let mut conn = self .acquire_write_conn_metric("handle_processing_attempts") @@ -827,6 +851,7 @@ impl InflightActivationStore for PostgresActivationStore { /// /// The number of impacted records is returned in a Result. #[instrument(skip_all)] + #[framed] async fn handle_expires_at(&self) -> Result { let now = Utc::now(); let mut conn = self.acquire_write_conn_metric("handle_expires_at").await?; @@ -848,6 +873,7 @@ impl InflightActivationStore for PostgresActivationStore { /// /// The number of impacted records is returned in a Result. #[instrument(skip_all)] + #[framed] async fn handle_delay_until(&self) -> Result { let now = Utc::now(); let mut conn = self.acquire_write_conn_metric("handle_delay_until").await?; @@ -874,6 +900,7 @@ impl InflightActivationStore for PostgresActivationStore { /// Once dead-lettered tasks have been added to Kafka those tasks can have their status set to /// complete. #[instrument(skip_all)] + #[framed] async fn handle_failed_tasks(&self) -> Result { let mut atomic = self.write_pool.begin().await?; @@ -930,6 +957,7 @@ impl InflightActivationStore for PostgresActivationStore { /// Mark a collection of tasks as complete by id #[instrument(skip_all)] + #[framed] async fn mark_completed(&self, ids: Vec) -> Result { let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); query_builder @@ -951,6 +979,7 @@ impl InflightActivationStore for PostgresActivationStore { /// Remove completed tasks. /// This method is a garbage collector for the inflight task store. #[instrument(skip_all)] + #[framed] async fn remove_completed(&self) -> Result { let mut conn = self.acquire_write_conn_metric("remove_completed").await?; let mut query_builder = @@ -964,6 +993,7 @@ impl InflightActivationStore for PostgresActivationStore { /// Remove killswitched tasks. #[instrument(skip_all)] + #[framed] async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { let mut query_builder = QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); @@ -982,6 +1012,7 @@ impl InflightActivationStore for PostgresActivationStore { } // Used in tests + #[framed] async fn remove_db(&self) -> Result<(), Error> { self.read_pool.close().await; self.write_pool.close().await; diff --git a/src/upkeep.rs b/src/upkeep.rs index 6f0202b0..5336f063 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -30,6 +30,8 @@ pub async fn upkeep( runtime_config_manager: Arc, health_reporter: HealthReporter, ) -> Result<(), anyhow::Error> { + const ASYNC_BACKTRACE_LOG_INTERVAL: Duration = Duration::from_secs(30); + let kafka_config = config.kafka_producer_config(); let producer: Arc = Arc::new( kafka_config @@ -42,6 +44,7 @@ pub async fn upkeep( timer.set_missed_tick_behavior(time::MissedTickBehavior::Delay); let mut last_run = Instant::now(); let mut last_vacuum = Instant::now(); + let mut last_backtrace_log = Instant::now(); loop { select! { _ = timer.tick() => { @@ -54,6 +57,14 @@ pub async fn upkeep( &mut last_vacuum, ).await; last_run = check_health(last_run, &config, health_reporter.clone()).await; + + if config.log_async_backtrace + && last_backtrace_log.elapsed() >= ASYNC_BACKTRACE_LOG_INTERVAL + { + let tree = async_backtrace::taskdump_tree(false); + debug!(backtrace = %tree, "async backtrace dump"); + last_backtrace_log = Instant::now(); + } } _ = guard.wait() => { info!("Cancellation token received, shutting down upkeep");