From 1fcaa260e065275fa81f00a27abb4691a660c1cb Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 13:23:43 -0500 Subject: [PATCH 1/5] create flushing service --- bottlecap/src/bin/bottlecap/main.rs | 450 +++++----------------------- bottlecap/src/flushing/handles.rs | 108 +++++++ bottlecap/src/flushing/mod.rs | 14 + bottlecap/src/flushing/service.rs | 366 ++++++++++++++++++++++ bottlecap/src/lib.rs | 1 + 5 files changed, 560 insertions(+), 379 deletions(-) create mode 100644 bottlecap/src/flushing/handles.rs create mode 100644 bottlecap/src/flushing/mod.rs create mode 100644 bottlecap/src/flushing/service.rs diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 294c92c59..08dfbade0 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -38,6 +38,7 @@ use bottlecap::{ }, }, fips::{log_fips_status, prepare_client_provider}, + flushing::FlushingService, lifecycle::{ flush_control::{DEFAULT_CONTINUOUS_FLUSH_INTERVAL, FlushControl, FlushDecision}, invocation::processor_service::{InvocationProcessorHandle, InvocationProcessorService}, @@ -72,12 +73,11 @@ use bottlecap::{ trace_aggregator_service::{ AggregatorHandle as TraceAggregatorHandle, AggregatorService as TraceAggregatorService, }, - trace_flusher::{self, ServerlessTraceFlusher, TraceFlusher}, + trace_flusher::{self, TraceFlusher}, trace_processor::{self, SendingTraceProcessor}, }, }; use datadog_fips::reqwest_adapter::create_reqwest_client_builder; -use datadog_protos::metrics::SketchPayload; use decrypt::resolve_secrets; use dogstatsd::{ aggregator_service::AggregatorHandle as MetricsAggregatorHandle, @@ -86,250 +86,22 @@ use dogstatsd::{ constants::CONTEXTS, datadog::{ DdDdUrl, DdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride, - RetryStrategy as DsdRetryStrategy, Series, Site as MetricsSite, + RetryStrategy as DsdRetryStrategy, Site as MetricsSite, }, dogstatsd::{DogStatsD, DogStatsDConfig}, flusher::{Flusher as MetricsFlusher, FlusherConfig as MetricsFlusherConfig}, metric::{EMPTY_TAGS, SortedTags}, }; use libdd_trace_obfuscation::obfuscation_config; -use libdd_trace_utils::send_data::SendData; use reqwest::Client; use std::{collections::hash_map, env, path::Path, str::FromStr, sync::Arc}; use tokio::time::{Duration, Instant}; -use tokio::{sync::Mutex as TokioMutex, sync::mpsc::Sender, task::JoinHandle}; +use tokio::{sync::Mutex as TokioMutex, sync::mpsc::Sender}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, warn}; use tracing_subscriber::EnvFilter; use ustr::Ustr; -#[allow(clippy::struct_field_names)] -struct PendingFlushHandles { - trace_flush_handles: Vec>>, - log_flush_handles: Vec>>, - metric_flush_handles: Vec>, - proxy_flush_handles: Vec>>, - stats_flush_handles: Vec>, -} - -struct MetricsRetryBatch { - flusher_id: usize, - series: Vec, - sketches: Vec, -} - -impl PendingFlushHandles { - fn new() -> Self { - Self { - trace_flush_handles: Vec::new(), - log_flush_handles: Vec::new(), - metric_flush_handles: Vec::new(), - proxy_flush_handles: Vec::new(), - stats_flush_handles: Vec::new(), - } - } - - fn has_pending_handles(&self) -> bool { - let trace_pending = self.trace_flush_handles.iter().any(|h| !h.is_finished()); - let log_pending = self.log_flush_handles.iter().any(|h| !h.is_finished()); - let metric_pending = self.metric_flush_handles.iter().any(|h| !h.is_finished()); - let proxy_pending = self.proxy_flush_handles.iter().any(|h| !h.is_finished()); - let stats_pending = self.stats_flush_handles.iter().any(|h| !h.is_finished()); - - trace_pending || log_pending || metric_pending || proxy_pending || stats_pending - } - - #[allow(clippy::too_many_lines)] - /// Spawns non-blocking flush tasks for all flushers (logs, traces, metrics, stats, proxy). - async fn spawn_non_blocking_flushes( - &mut self, - logs_flusher: &LogsFlusher, - trace_flusher: &Arc, - metrics_flushers: &Arc>>, - stats_flusher: &Arc, - proxy_flusher: &Arc, - metrics_aggr_handle: &MetricsAggregatorHandle, - ) { - // Spawn logs flush - let lf = logs_flusher.clone(); - self.log_flush_handles - .push(tokio::spawn(async move { lf.flush(None).await })); - - // Spawn traces flush - let tf = trace_flusher.clone(); - self.trace_flush_handles.push(tokio::spawn(async move { - tf.flush(None).await.unwrap_or_default() - })); - - // Spawn metrics flush - let (metrics_flushers_copy, series, sketches) = { - let locked_metrics = metrics_flushers.lock().await; - let flush_response = metrics_aggr_handle - .clone() - .flush() - .await - .expect("can't flush metrics handle"); - ( - locked_metrics.clone(), - flush_response.series, - flush_response.distributions, - ) - }; - - for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() { - let series_clone = series.clone(); - let sketches_clone = sketches.clone(); - let handle = tokio::spawn(async move { - let (retry_series, retry_sketches) = flusher - .flush_metrics(series_clone, sketches_clone) - .await - .unwrap_or_default(); - MetricsRetryBatch { - flusher_id: idx, - series: retry_series, - sketches: retry_sketches, - } - }); - self.metric_flush_handles.push(handle); - } - - // Spawn stats flush - let sf = Arc::clone(stats_flusher); - self.stats_flush_handles - .push(tokio::spawn(async move { sf.flush(false).await })); - - // Spawn proxy flush - let pf = proxy_flusher.clone(); - self.proxy_flush_handles.push(tokio::spawn(async move { - pf.flush(None).await.unwrap_or_default() - })); - } - - #[allow(clippy::too_many_lines)] - async fn await_flush_handles( - &mut self, - logs_flusher: &LogsFlusher, - trace_flusher: &ServerlessTraceFlusher, - metrics_flushers: &Arc>>, - proxy_flusher: &Arc, - ) -> bool { - let mut joinset = tokio::task::JoinSet::new(); - let mut flush_error = false; - - for handle in self.stats_flush_handles.drain(..) { - if let Err(e) = handle.await { - error!("PENDING_FLUSH_HANDLES | stats flush error {e:?}"); - flush_error = true; - } - } - - for handle in self.trace_flush_handles.drain(..) { - match handle.await { - Ok(retry) => { - let tf = trace_flusher.clone(); - if !retry.is_empty() { - debug!( - "PENDING_FLUSH_HANDLES | redriving {:?} trace payloads", - retry.len() - ); - joinset.spawn(async move { - tf.flush(Some(retry)).await; - }); - } - } - Err(e) => { - error!("PENDING_FLUSH_HANDLES | redrive trace error {e:?}"); - } - } - } - - for handle in self.log_flush_handles.drain(..) { - match handle.await { - Ok(retry) => { - if !retry.is_empty() { - debug!( - "PENDING_FLUSH_HANDLES | redriving {:?} log payloads", - retry.len() - ); - } - for item in retry { - let lf = logs_flusher.clone(); - match item.try_clone() { - Some(item_clone) => { - joinset.spawn(async move { - lf.flush(Some(item_clone)).await; - }); - } - None => { - error!("PENDING_FLUSH_HANDLES | Can't clone redrive log payloads"); - } - } - } - } - Err(e) => { - error!("PENDING_FLUSH_HANDLES | redrive log error {e:?}"); - } - } - } - - for handle in self.metric_flush_handles.drain(..) { - let mf = metrics_flushers.clone(); - match handle.await { - Ok(retry_batch) => { - if !retry_batch.series.is_empty() || !retry_batch.sketches.is_empty() { - debug!( - "PENDING_FLUSH_HANDLES | redriving {:?} series and {:?} sketch payloads", - retry_batch.series.len(), - retry_batch.sketches.len() - ); - joinset.spawn(async move { - let mut locked_flushers = mf.lock().await; - if let Some(flusher) = locked_flushers.get_mut(retry_batch.flusher_id) { - flusher - .flush_metrics(retry_batch.series, retry_batch.sketches) - .await; - } - }); - } - } - Err(e) => { - error!("PENDING_FLUSH_HANDLES | redrive metrics error {e:?}"); - } - } - } - - for handle in self.proxy_flush_handles.drain(..) { - match handle.await { - Ok(batch) => { - if !batch.is_empty() { - debug!( - "PENDING_FLUSH_HANDLES | Redriving {:?} APM proxy payloads", - batch.len() - ); - } - - let pf = proxy_flusher.clone(); - joinset.spawn(async move { - pf.flush(Some(batch)).await; - }); - } - Err(e) => { - error!("PENDING_FLUSH_HANDLES | Redrive error in APM proxy: {e:?}"); - } - } - } - - // Wait for all flush join operations to complete - while let Some(result) = joinset.join_next().await { - if let Err(e) = result { - error!("PENDING_FLUSH_HANDLES | redrive request error {e:?}"); - flush_error = true; - } - } - flush_error - } -} - #[tokio::main] async fn main() -> anyhow::Result<()> { let start_time = Instant::now(); @@ -656,32 +428,27 @@ async fn extension_loop_active( // The flushing happens independently of invocation lifecycle events. // This background task runs until shutdown is signaled via cancel_token_clone. let flush_task_handle = tokio::spawn(async move { - let mut pending_flush_handles = PendingFlushHandles::new(); + let mut flushing_service = FlushingService::new( + logs_flusher_clone, + trace_flusher_clone, + stats_flusher_clone, + proxy_flusher_clone, + metrics_flushers_clone, + metrics_aggr_handle_clone, + ); loop { tokio::select! { _ = managed_instance_mode_flush_interval.tick() => { - if !pending_flush_handles.has_pending_handles() { + if !flushing_service.has_pending_handles() { // Only spawn new flush if no pending flushes to prevent resource buildup - pending_flush_handles.spawn_non_blocking_flushes( - &logs_flusher_clone, - &trace_flusher_clone, - &metrics_flushers_clone, - &stats_flusher_clone, - &proxy_flusher_clone, - &metrics_aggr_handle_clone, - ).await; + flushing_service.spawn_non_blocking().await; } } () = cancel_token_clone.cancelled() => { debug!("Managed Instance mode: periodic flusher task cancelled, waiting for pending flushes"); // Wait for any pending flushes before exiting - pending_flush_handles.await_flush_handles( - &logs_flusher_clone, - &trace_flusher_clone, - &metrics_flushers_clone, - &proxy_flusher_clone, - ).await; + flushing_service.await_handles().await; break; } } @@ -846,18 +613,18 @@ async fn extension_loop_active( // Final flush without interval reset. We pass None for race_flush_interval since // this is the final operation before shutdown and resetting the interval timing // serves no purpose. This avoids creating an unnecessary interval object. + let flushing_service = FlushingService::new( + logs_flusher.clone(), + Arc::clone(&trace_flusher), + Arc::clone(&stats_flusher), + proxy_flusher.clone(), + Arc::clone(&metrics_flushers), + metrics_aggregator_handle.clone(), + ); let mut locked_metrics = metrics_flushers.lock().await; - blocking_flush_all( - &logs_flusher, - &mut locked_metrics, - &*trace_flusher, - &*stats_flusher, - &proxy_flusher, - None, - &metrics_aggregator_handle.clone(), - true, // force_flush_trace_stats - ) - .await; + flushing_service + .flush_blocking_with_interval(true, &mut locked_metrics, None) + .await; return Ok(()); } @@ -869,7 +636,14 @@ async fn extension_loop_active( let next_lambda_response = extension::next_event(client, &aws_config.runtime_api, &r.extension_id).await; // first invoke we must call next - let mut pending_flush_handles = PendingFlushHandles::new(); + let mut flushing_service = FlushingService::new( + logs_flusher.clone(), + Arc::clone(&trace_flusher), + Arc::clone(&stats_flusher), + proxy_flusher.clone(), + Arc::clone(&metrics_flushers), + metrics_aggregator_handle.clone(), + ); handle_next_invocation(next_lambda_response, &invocation_processor_handle).await; loop { let maybe_shutdown_event; @@ -893,33 +667,21 @@ async fn extension_loop_active( } _ = race_flush_interval.tick() => { let mut locked_metrics = metrics_flushers.lock().await; - blocking_flush_all( - &logs_flusher, - &mut locked_metrics, - &*trace_flusher, - &*stats_flusher, - &proxy_flusher, - Some(&mut race_flush_interval), - &metrics_aggregator_handle.clone(), - false, - ) - .await; + flushing_service + .flush_blocking_with_interval(false, &mut locked_metrics, Some(&mut race_flush_interval)) + .await; } } } // flush let mut locked_metrics = metrics_flushers.lock().await; - blocking_flush_all( - &logs_flusher, - &mut locked_metrics, - &*trace_flusher, - &*stats_flusher, - &proxy_flusher, - Some(&mut race_flush_interval), - &metrics_aggregator_handle.clone(), - false, - ) - .await; + flushing_service + .flush_blocking_with_interval( + false, + &mut locked_metrics, + Some(&mut race_flush_interval), + ) + .await; let next_response = extension::next_event(client, &aws_config.runtime_api, &r.extension_id).await; maybe_shutdown_event = @@ -929,33 +691,20 @@ async fn extension_loop_active( match current_flush_decision { //Periodic flush scenario, flush at top of invocation FlushDecision::Continuous => { - if !pending_flush_handles.has_pending_handles() { - pending_flush_handles - .spawn_non_blocking_flushes( - &logs_flusher, - &trace_flusher, - &metrics_flushers, - &stats_flusher, - &proxy_flusher, - &metrics_aggregator_handle, - ) - .await; + if !flushing_service.has_pending_handles() { + flushing_service.spawn_non_blocking().await; race_flush_interval.reset(); } } FlushDecision::Periodic => { let mut locked_metrics = metrics_flushers.lock().await; - blocking_flush_all( - &logs_flusher, - &mut locked_metrics, - &*trace_flusher, - &*stats_flusher, - &proxy_flusher, - Some(&mut race_flush_interval), - &metrics_aggregator_handle, - false, // force_flush_trace_stats - ) - .await; + flushing_service + .flush_blocking_with_interval( + false, + &mut locked_metrics, + Some(&mut race_flush_interval), + ) + .await; } _ => { // No specific flush logic for Dont or End (End already handled above) @@ -983,17 +732,9 @@ async fn extension_loop_active( _ = race_flush_interval.tick() => { if flush_control.flush_strategy == FlushStrategy::Default { let mut locked_metrics = metrics_flushers.lock().await; - blocking_flush_all( - &logs_flusher, - &mut locked_metrics, - &*trace_flusher, - &*stats_flusher, - &proxy_flusher, - Some(&mut race_flush_interval), - &metrics_aggregator_handle, - false, // force_flush_trace_stats - ) - .await; + flushing_service + .flush_blocking_with_interval(false, &mut locked_metrics, Some(&mut race_flush_interval)) + .await; } } } @@ -1014,15 +755,7 @@ async fn extension_loop_active( drop(event_bus_tx); // Redrive/block on any failed payloads - let tf = trace_flusher.clone(); - pending_flush_handles - .await_flush_handles( - &logs_flusher.clone(), - &tf, - &metrics_flushers, - &proxy_flusher, - ) - .await; + flushing_service.await_handles().await; // Wait for tombstone event from telemetry listener to ensure all events are processed wait_for_tombstone_event( &mut event_bus, @@ -1046,19 +779,15 @@ async fn extension_loop_active( &lifecycle_listener_shutdown_token, ); - // gotta lock here + // Final flush with force_stats=true since this is our last opportunity let mut locked_metrics = metrics_flushers.lock().await; - blocking_flush_all( - &logs_flusher, - &mut locked_metrics, - &*trace_flusher, - &*stats_flusher, - &proxy_flusher, - Some(&mut race_flush_interval), - &metrics_aggregator_handle, - true, // force_flush_trace_stats - ) - .await; + flushing_service + .flush_blocking_with_interval( + true, + &mut locked_metrics, + Some(&mut race_flush_interval), + ) + .await; // Shutdown aggregator services if let Err(e) = logs_aggregator_handle.shutdown() { @@ -1073,43 +802,6 @@ async fn extension_loop_active( } } -#[allow(clippy::too_many_arguments)] -async fn blocking_flush_all( - logs_flusher: &LogsFlusher, - metrics_flushers: &mut [MetricsFlusher], - trace_flusher: &impl TraceFlusher, - stats_flusher: &impl StatsFlusher, - proxy_flusher: &ProxyFlusher, - race_flush_interval: Option<&mut tokio::time::Interval>, - metrics_aggr_handle: &MetricsAggregatorHandle, - force_flush_trace_stats: bool, -) { - let flush_response = metrics_aggr_handle - .flush() - .await - .expect("can't flush metrics aggr handle"); - let metrics_futures: Vec<_> = metrics_flushers - .iter_mut() - .map(|f| { - f.flush_metrics( - flush_response.series.clone(), - flush_response.distributions.clone(), - ) - }) - .collect(); - - tokio::join!( - logs_flusher.flush(None), - futures::future::join_all(metrics_futures), - trace_flusher.flush(None), - stats_flusher.flush(force_flush_trace_stats), - proxy_flusher.flush(None), - ); - if let Some(interval) = race_flush_interval { - interval.reset(); - } -} - /// Wait for the `Tombstone` event from telemetry listener to ensure all events are processed. /// This function will timeout after the specified duration to prevent hanging indefinitely. #[allow(clippy::too_many_arguments)] @@ -1729,22 +1421,22 @@ fn start_api_runtime_proxy( } #[cfg(test)] -mod pending_flush_handles_tests { - use super::*; +mod flush_handles_tests { + use bottlecap::flushing::FlushHandles; use tokio::time::{Duration, sleep}; #[tokio::test] async fn stats_handle_is_tracked_until_completion() { - let mut pending = PendingFlushHandles::new(); + let mut handles = FlushHandles::new(); let handle = tokio::spawn(async { sleep(Duration::from_millis(5)).await; }); - pending.stats_flush_handles.push(handle); + handles.stats_flush_handles.push(handle); - assert!(pending.has_pending_handles()); + assert!(handles.has_pending()); sleep(Duration::from_millis(10)).await; - assert!(!pending.has_pending_handles()); + assert!(!handles.has_pending()); } } diff --git a/bottlecap/src/flushing/handles.rs b/bottlecap/src/flushing/handles.rs new file mode 100644 index 000000000..58dba4489 --- /dev/null +++ b/bottlecap/src/flushing/handles.rs @@ -0,0 +1,108 @@ +//! Flush handles for tracking in-flight flush operations. + +use datadog_protos::metrics::SketchPayload; +use dogstatsd::datadog::Series; +use libdd_trace_utils::send_data::SendData; +use tokio::task::JoinHandle; + +/// Batch of metrics that failed to flush and need to be retried. +#[derive(Debug)] +pub struct MetricsRetryBatch { + /// Index of the flusher in the metrics flushers vector. + pub flusher_id: usize, + /// Series that failed to flush. + pub series: Vec, + /// Sketch payloads that failed to flush. + pub sketches: Vec, +} + +/// Handles for pending flush operations. +/// +/// Each field contains a vector of `JoinHandle`s for in-flight flush tasks. +/// The return type of each handle contains data that may need to be retried +/// if the flush failed. +#[allow(clippy::struct_field_names)] +pub struct FlushHandles { + /// Handles for trace flush operations. Returns failed traces for retry. + pub trace_flush_handles: Vec>>, + /// Handles for log flush operations. Returns failed request builders for retry. + pub log_flush_handles: Vec>>, + /// Handles for metrics flush operations. Returns batch info for retry. + pub metric_flush_handles: Vec>, + /// Handles for proxy flush operations. Returns failed request builders for retry. + pub proxy_flush_handles: Vec>>, + /// Handles for stats flush operations. Stats don't support retry. + pub stats_flush_handles: Vec>, +} + +impl FlushHandles { + /// Creates a new empty `FlushHandles` instance. + #[must_use] + pub fn new() -> Self { + Self { + trace_flush_handles: Vec::new(), + log_flush_handles: Vec::new(), + metric_flush_handles: Vec::new(), + proxy_flush_handles: Vec::new(), + stats_flush_handles: Vec::new(), + } + } + + /// Returns `true` if any flush operation is still pending (not finished). + #[must_use] + pub fn has_pending(&self) -> bool { + let trace_pending = self.trace_flush_handles.iter().any(|h| !h.is_finished()); + let log_pending = self.log_flush_handles.iter().any(|h| !h.is_finished()); + let metric_pending = self.metric_flush_handles.iter().any(|h| !h.is_finished()); + let proxy_pending = self.proxy_flush_handles.iter().any(|h| !h.is_finished()); + let stats_pending = self.stats_flush_handles.iter().any(|h| !h.is_finished()); + + trace_pending || log_pending || metric_pending || proxy_pending || stats_pending + } + + /// Returns `true` if all handle vectors are empty. + #[must_use] + pub fn is_empty(&self) -> bool { + self.trace_flush_handles.is_empty() + && self.log_flush_handles.is_empty() + && self.metric_flush_handles.is_empty() + && self.proxy_flush_handles.is_empty() + && self.stats_flush_handles.is_empty() + } +} + +impl Default for FlushHandles { + fn default() -> Self { + Self::new() + } +} + +impl std::fmt::Debug for FlushHandles { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FlushHandles") + .field("trace_handles_count", &self.trace_flush_handles.len()) + .field("log_handles_count", &self.log_flush_handles.len()) + .field("metric_handles_count", &self.metric_flush_handles.len()) + .field("proxy_handles_count", &self.proxy_flush_handles.len()) + .field("stats_handles_count", &self.stats_flush_handles.len()) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_new_handles_is_empty() { + let handles = FlushHandles::new(); + assert!(handles.is_empty()); + assert!(!handles.has_pending()); + } + + #[test] + fn test_default_handles_is_empty() { + let handles = FlushHandles::default(); + assert!(handles.is_empty()); + } +} diff --git a/bottlecap/src/flushing/mod.rs b/bottlecap/src/flushing/mod.rs new file mode 100644 index 000000000..f2e55def6 --- /dev/null +++ b/bottlecap/src/flushing/mod.rs @@ -0,0 +1,14 @@ +//! Flushing service module for coordinating data flushes across multiple flusher types. +//! +//! This module provides a unified `FlushingService` that orchestrates flushing of: +//! - Logs (via `LogsFlusher`) +//! - Traces (via `TraceFlusher`) +//! - Stats (via `StatsFlusher`) +//! - Metrics (via `MetricsFlusher`) +//! - Proxy requests (via `ProxyFlusher`) + +mod handles; +mod service; + +pub use handles::{FlushHandles, MetricsRetryBatch}; +pub use service::FlushingService; diff --git a/bottlecap/src/flushing/service.rs b/bottlecap/src/flushing/service.rs new file mode 100644 index 000000000..3cd5eabbd --- /dev/null +++ b/bottlecap/src/flushing/service.rs @@ -0,0 +1,366 @@ +//! `FlushingService` for coordinating flush operations across multiple flusher types. + +use std::sync::Arc; + +use tokio::sync::Mutex as TokioMutex; +use tracing::{debug, error}; + +use dogstatsd::{ + aggregator_service::AggregatorHandle as MetricsAggregatorHandle, + flusher::Flusher as MetricsFlusher, +}; + +use crate::flushing::handles::{FlushHandles, MetricsRetryBatch}; +use crate::logs::flusher::LogsFlusher; +use crate::traces::{ + proxy_flusher::Flusher as ProxyFlusher, stats_flusher::StatsFlusher, + trace_flusher::TraceFlusher, +}; + +/// Service for coordinating flush operations across all flusher types. +/// +/// This service provides a unified interface for: +/// - Spawning non-blocking flush tasks +/// - Awaiting pending flush handles with retry logic +/// - Performing blocking flushes (spawn + await) +/// +/// # Type Parameters +/// +/// * `TF` - Trace flusher type implementing `TraceFlusher` +/// * `SF` - Stats flusher type implementing `StatsFlusher` +pub struct FlushingService +where + TF: TraceFlusher + Send + Sync + 'static, + SF: StatsFlusher + Send + Sync + 'static, +{ + // Flushers + logs_flusher: LogsFlusher, + trace_flusher: Arc, + stats_flusher: Arc, + proxy_flusher: Arc, + metrics_flushers: Arc>>, + + // Metrics aggregator handle for getting data to flush + metrics_aggr_handle: MetricsAggregatorHandle, + + // Pending flush handles + handles: FlushHandles, +} + +impl FlushingService +where + TF: TraceFlusher + Send + Sync + 'static, + SF: StatsFlusher + Send + Sync + 'static, +{ + /// Creates a new `FlushingService` with the given flushers. + #[must_use] + pub fn new( + logs_flusher: LogsFlusher, + trace_flusher: Arc, + stats_flusher: Arc, + proxy_flusher: Arc, + metrics_flushers: Arc>>, + metrics_aggr_handle: MetricsAggregatorHandle, + ) -> Self { + Self { + logs_flusher, + trace_flusher, + stats_flusher, + proxy_flusher, + metrics_flushers, + metrics_aggr_handle, + handles: FlushHandles::new(), + } + } + + /// Returns `true` if any flush operation is still pending. + #[must_use] + pub fn has_pending_handles(&self) -> bool { + self.handles.has_pending() + } + + /// Spawns non-blocking flush tasks for all flushers. + /// + /// This method spawns async tasks for logs, traces, metrics, stats, and proxy flushers. + /// The tasks run concurrently and their handles are stored for later awaiting. + /// + /// For metrics, this first fetches data from the aggregator, then spawns flush tasks + /// for each metrics flusher (supporting multiple endpoints). + pub async fn spawn_non_blocking(&mut self) { + // Spawn logs flush + let lf = self.logs_flusher.clone(); + self.handles + .log_flush_handles + .push(tokio::spawn(async move { lf.flush(None).await })); + + // Spawn traces flush + let tf = self.trace_flusher.clone(); + self.handles + .trace_flush_handles + .push(tokio::spawn(async move { + tf.flush(None).await.unwrap_or_default() + })); + + // Spawn metrics flush + // First get the data from aggregator, then spawn flush tasks for each flusher + let (metrics_flushers_copy, series, sketches) = { + let locked_metrics = self.metrics_flushers.lock().await; + let flush_response = self + .metrics_aggr_handle + .clone() + .flush() + .await + .expect("can't flush metrics handle"); + ( + locked_metrics.clone(), + flush_response.series, + flush_response.distributions, + ) + }; + + for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() { + let series_clone = series.clone(); + let sketches_clone = sketches.clone(); + let handle = tokio::spawn(async move { + let (retry_series, retry_sketches) = flusher + .flush_metrics(series_clone, sketches_clone) + .await + .unwrap_or_default(); + MetricsRetryBatch { + flusher_id: idx, + series: retry_series, + sketches: retry_sketches, + } + }); + self.handles.metric_flush_handles.push(handle); + } + + // Spawn stats flush (fire-and-forget, no retry) + let sf = Arc::clone(&self.stats_flusher); + self.handles + .stats_flush_handles + .push(tokio::spawn(async move { sf.flush(false).await })); + + // Spawn proxy flush + let pf = self.proxy_flusher.clone(); + self.handles + .proxy_flush_handles + .push(tokio::spawn(async move { + pf.flush(None).await.unwrap_or_default() + })); + } + + /// Awaits all pending flush handles and performs retry for failed flushes. + /// + /// This method: + /// 1. Drains all pending handles + /// 2. Awaits each handle's completion + /// 3. For failed flushes that returned retry data, spawns redrive tasks + /// 4. Waits for all redrive tasks to complete + /// + /// # Returns + /// + /// Returns `true` if any flush operation encountered an error. + #[allow(clippy::too_many_lines)] + pub async fn await_handles(&mut self) -> bool { + let mut joinset = tokio::task::JoinSet::new(); + let mut flush_error = false; + + // Await stats handles (no retry) + for handle in self.handles.stats_flush_handles.drain(..) { + if let Err(e) = handle.await { + error!("FLUSHING_SERVICE | stats flush error {e:?}"); + flush_error = true; + } + } + + // Await trace handles with retry + for handle in self.handles.trace_flush_handles.drain(..) { + match handle.await { + Ok(retry) => { + let tf = self.trace_flusher.clone(); + if !retry.is_empty() { + debug!( + "FLUSHING_SERVICE | redriving {:?} trace payloads", + retry.len() + ); + joinset.spawn(async move { + tf.flush(Some(retry)).await; + }); + } + } + Err(e) => { + error!("FLUSHING_SERVICE | redrive trace error {e:?}"); + } + } + } + + // Await log handles with retry + for handle in self.handles.log_flush_handles.drain(..) { + match handle.await { + Ok(retry) => { + if !retry.is_empty() { + debug!( + "FLUSHING_SERVICE | redriving {:?} log payloads", + retry.len() + ); + } + for item in retry { + let lf = self.logs_flusher.clone(); + match item.try_clone() { + Some(item_clone) => { + joinset.spawn(async move { + lf.flush(Some(item_clone)).await; + }); + } + None => { + error!("FLUSHING_SERVICE | Can't clone redrive log payloads"); + } + } + } + } + Err(e) => { + error!("FLUSHING_SERVICE | redrive log error {e:?}"); + } + } + } + + // Await metrics handles with retry + for handle in self.handles.metric_flush_handles.drain(..) { + let mf = self.metrics_flushers.clone(); + match handle.await { + Ok(retry_batch) => { + if !retry_batch.series.is_empty() || !retry_batch.sketches.is_empty() { + debug!( + "FLUSHING_SERVICE | redriving {:?} series and {:?} sketch payloads", + retry_batch.series.len(), + retry_batch.sketches.len() + ); + joinset.spawn(async move { + let mut locked_flushers = mf.lock().await; + if let Some(flusher) = locked_flushers.get_mut(retry_batch.flusher_id) { + flusher + .flush_metrics(retry_batch.series, retry_batch.sketches) + .await; + } + }); + } + } + Err(e) => { + error!("FLUSHING_SERVICE | redrive metrics error {e:?}"); + } + } + } + + // Await proxy handles with retry + for handle in self.handles.proxy_flush_handles.drain(..) { + match handle.await { + Ok(batch) => { + if !batch.is_empty() { + debug!( + "FLUSHING_SERVICE | Redriving {:?} APM proxy payloads", + batch.len() + ); + } + + let pf = self.proxy_flusher.clone(); + joinset.spawn(async move { + pf.flush(Some(batch)).await; + }); + } + Err(e) => { + error!("FLUSHING_SERVICE | Redrive error in APM proxy: {e:?}"); + } + } + } + + // Wait for all redrive operations to complete + while let Some(result) = joinset.join_next().await { + if let Err(e) = result { + error!("FLUSHING_SERVICE | redrive request error {e:?}"); + flush_error = true; + } + } + + flush_error + } + + /// Performs a blocking flush of all data. + /// + /// This method flushes all data synchronously using `tokio::join!` for parallelism. + /// Unlike `spawn_non_blocking`, this waits for all flushes to complete before returning. + /// + /// # Arguments + /// + /// * `force_stats` - If `true`, forces the stats flusher to flush immediately + /// regardless of timing constraints. + /// * `metrics_flushers` - Mutable slice of metrics flushers. The caller must acquire + /// the lock before calling this method. + pub async fn flush_blocking(&self, force_stats: bool, metrics_flushers: &mut [MetricsFlusher]) { + let flush_response = self + .metrics_aggr_handle + .flush() + .await + .expect("can't flush metrics aggr handle"); + + let metrics_futures: Vec<_> = metrics_flushers + .iter_mut() + .map(|f| { + f.flush_metrics( + flush_response.series.clone(), + flush_response.distributions.clone(), + ) + }) + .collect(); + + tokio::join!( + self.logs_flusher.flush(None), + futures::future::join_all(metrics_futures), + self.trace_flusher.flush(None), + self.stats_flusher.flush(force_stats), + self.proxy_flusher.flush(None), + ); + } + + /// Performs a blocking flush and resets the flush interval. + /// + /// This is a convenience method that combines `flush_blocking` with interval reset. + /// + /// # Arguments + /// + /// * `force_stats` - If `true`, forces the stats flusher to flush immediately. + /// * `metrics_flushers` - Mutable slice of metrics flushers. + /// * `interval` - Optional interval to reset after flushing. + pub async fn flush_blocking_with_interval( + &self, + force_stats: bool, + metrics_flushers: &mut [MetricsFlusher], + interval: Option<&mut tokio::time::Interval>, + ) { + self.flush_blocking(force_stats, metrics_flushers).await; + if let Some(interval) = interval { + interval.reset(); + } + } + + /// Returns a reference to the metrics flushers mutex for external locking. + /// + /// This is useful when you need to lock the metrics flushers and pass them + /// to `flush_blocking` or `flush_blocking_with_interval`. + #[must_use] + pub fn metrics_flushers(&self) -> &Arc>> { + &self.metrics_flushers + } +} + +impl std::fmt::Debug for FlushingService +where + TF: TraceFlusher + Send + Sync + 'static, + SF: StatsFlusher + Send + Sync + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FlushingService") + .field("handles", &self.handles) + .finish_non_exhaustive() + } +} diff --git a/bottlecap/src/lib.rs b/bottlecap/src/lib.rs index b30097c61..df94fd246 100644 --- a/bottlecap/src/lib.rs +++ b/bottlecap/src/lib.rs @@ -24,6 +24,7 @@ pub mod config; pub mod event_bus; pub mod extension; pub mod fips; +pub mod flushing; pub mod http; pub mod lifecycle; pub mod logger; From c0518cdfd5b2a18fd6163c36577726195c8673cb Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 13:28:55 -0500 Subject: [PATCH 2/5] remove unnecessary default in favor of sugar --- bottlecap/src/flushing/handles.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/bottlecap/src/flushing/handles.rs b/bottlecap/src/flushing/handles.rs index 58dba4489..851d8e4a9 100644 --- a/bottlecap/src/flushing/handles.rs +++ b/bottlecap/src/flushing/handles.rs @@ -21,6 +21,7 @@ pub struct MetricsRetryBatch { /// Each field contains a vector of `JoinHandle`s for in-flight flush tasks. /// The return type of each handle contains data that may need to be retried /// if the flush failed. +#[derive(Default)] #[allow(clippy::struct_field_names)] pub struct FlushHandles { /// Handles for trace flush operations. Returns failed traces for retry. @@ -71,12 +72,6 @@ impl FlushHandles { } } -impl Default for FlushHandles { - fn default() -> Self { - Self::new() - } -} - impl std::fmt::Debug for FlushHandles { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FlushHandles") From 678e1eb7c5c81e61bb491f50a9cdf96d65e66a3a Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 14:32:41 -0500 Subject: [PATCH 3/5] add comment on locking on caller also remove sending interval to flush_blocking --- bottlecap/src/bin/bottlecap/main.rs | 28 ++++++++++------------------ bottlecap/src/flushing/service.rs | 29 ++++++++--------------------- 2 files changed, 18 insertions(+), 39 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 08dfbade0..569638de4 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -623,7 +623,7 @@ async fn extension_loop_active( ); let mut locked_metrics = metrics_flushers.lock().await; flushing_service - .flush_blocking_with_interval(true, &mut locked_metrics, None) + .flush_blocking(true, &mut locked_metrics) .await; return Ok(()); @@ -668,20 +668,18 @@ async fn extension_loop_active( _ = race_flush_interval.tick() => { let mut locked_metrics = metrics_flushers.lock().await; flushing_service - .flush_blocking_with_interval(false, &mut locked_metrics, Some(&mut race_flush_interval)) + .flush_blocking(false, &mut locked_metrics) .await; + race_flush_interval.reset(); } } } // flush let mut locked_metrics = metrics_flushers.lock().await; flushing_service - .flush_blocking_with_interval( - false, - &mut locked_metrics, - Some(&mut race_flush_interval), - ) + .flush_blocking(false, &mut locked_metrics) .await; + race_flush_interval.reset(); let next_response = extension::next_event(client, &aws_config.runtime_api, &r.extension_id).await; maybe_shutdown_event = @@ -699,12 +697,9 @@ async fn extension_loop_active( FlushDecision::Periodic => { let mut locked_metrics = metrics_flushers.lock().await; flushing_service - .flush_blocking_with_interval( - false, - &mut locked_metrics, - Some(&mut race_flush_interval), - ) + .flush_blocking(false, &mut locked_metrics) .await; + race_flush_interval.reset(); } _ => { // No specific flush logic for Dont or End (End already handled above) @@ -733,8 +728,9 @@ async fn extension_loop_active( if flush_control.flush_strategy == FlushStrategy::Default { let mut locked_metrics = metrics_flushers.lock().await; flushing_service - .flush_blocking_with_interval(false, &mut locked_metrics, Some(&mut race_flush_interval)) + .flush_blocking(false, &mut locked_metrics) .await; + race_flush_interval.reset(); } } } @@ -782,11 +778,7 @@ async fn extension_loop_active( // Final flush with force_stats=true since this is our last opportunity let mut locked_metrics = metrics_flushers.lock().await; flushing_service - .flush_blocking_with_interval( - true, - &mut locked_metrics, - Some(&mut race_flush_interval), - ) + .flush_blocking(true, &mut locked_metrics) .await; // Shutdown aggregator services diff --git a/bottlecap/src/flushing/service.rs b/bottlecap/src/flushing/service.rs index 3cd5eabbd..152a13140 100644 --- a/bottlecap/src/flushing/service.rs +++ b/bottlecap/src/flushing/service.rs @@ -296,6 +296,14 @@ where /// regardless of timing constraints. /// * `metrics_flushers` - Mutable slice of metrics flushers. The caller must acquire /// the lock before calling this method. + /// + /// # Note + /// + /// TODO: The caller must acquire the lock on `metrics_flushers` and pass a mutable slice + /// because `MetricsFlusher::flush_metrics` requires `&mut self`. This creates awkward + /// ergonomics. Consider modifying the `dogstatsd` crate to use interior mutability + /// (e.g., `Arc>` internally) so `flush_metrics` can take `&self`, allowing + /// this method to handle locking internally. pub async fn flush_blocking(&self, force_stats: bool, metrics_flushers: &mut [MetricsFlusher]) { let flush_response = self .metrics_aggr_handle @@ -322,27 +330,6 @@ where ); } - /// Performs a blocking flush and resets the flush interval. - /// - /// This is a convenience method that combines `flush_blocking` with interval reset. - /// - /// # Arguments - /// - /// * `force_stats` - If `true`, forces the stats flusher to flush immediately. - /// * `metrics_flushers` - Mutable slice of metrics flushers. - /// * `interval` - Optional interval to reset after flushing. - pub async fn flush_blocking_with_interval( - &self, - force_stats: bool, - metrics_flushers: &mut [MetricsFlusher], - interval: Option<&mut tokio::time::Interval>, - ) { - self.flush_blocking(force_stats, metrics_flushers).await; - if let Some(interval) = interval { - interval.reset(); - } - } - /// Returns a reference to the metrics flushers mutex for external locking. /// /// This is useful when you need to lock the metrics flushers and pass them From 13361873d0611b322be9cc228a012a557dd164b7 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 14:40:42 -0500 Subject: [PATCH 4/5] remove extra flusher service on managed instances shutdown --- bottlecap/src/bin/bottlecap/main.rs | 47 +++++------------------------ 1 file changed, 8 insertions(+), 39 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 569638de4..d1777d68e 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -447,8 +447,13 @@ async fn extension_loop_active( } () = cancel_token_clone.cancelled() => { debug!("Managed Instance mode: periodic flusher task cancelled, waiting for pending flushes"); - // Wait for any pending flushes before exiting + // Wait for any pending flushes flushing_service.await_handles().await; + // Final flush to capture any data that accumulated since the last + // spawn_non_blocking(). We pass force_stats=true since this is our + // last opportunity to send data before shutdown. + let mut locked_metrics = flushing_service.metrics_flushers().lock().await; + flushing_service.flush_blocking(true, &mut locked_metrics).await; break; } } @@ -584,48 +589,12 @@ async fn extension_loop_active( &lifecycle_listener_shutdown_token, ); - // Wait for background flusher to complete gracefully + // Wait for background flusher to complete gracefully. + // The background task performs the final flush before exiting, so we just need to wait. if let Err(e) = flush_task_handle.await { error!("Error waiting for background flush task: {e:?}"); } - // Final flush to send any remaining observability data before shutdown. - // - // Managed Instance Mode vs OnDemand Mode Final Flush: - // - // While both modes perform a final flush during shutdown, the context differs: - // - // - **Managed Instance Mode (this code)**: Throughout the execution environment's lifetime, - // a background task has been continuously flushing data at regular intervals - // (see flush_task_handle above). This final flush captures any data that was - // generated after the last periodic flush and before shutdown was signaled. - // Since concurrent invocations may have completed just before shutdown, this - // ensures we don't lose their metrics, logs, and traces. - // - // - **OnDemand Mode**: Flushing is tied to invocation lifecycle, so data is typically - // flushed at the end of each invocation. The final flush captures any remaining - // data from the last invocation that may not have been sent yet. - // - // In both modes, we pass `force_flush_trace_stats=true` to ensure trace statistics - // are flushed regardless of timing constraints, as this is our last opportunity to - // send data before the Lambda execution environment terminates. - // - // Final flush without interval reset. We pass None for race_flush_interval since - // this is the final operation before shutdown and resetting the interval timing - // serves no purpose. This avoids creating an unnecessary interval object. - let flushing_service = FlushingService::new( - logs_flusher.clone(), - Arc::clone(&trace_flusher), - Arc::clone(&stats_flusher), - proxy_flusher.clone(), - Arc::clone(&metrics_flushers), - metrics_aggregator_handle.clone(), - ); - let mut locked_metrics = metrics_flushers.lock().await; - flushing_service - .flush_blocking(true, &mut locked_metrics) - .await; - return Ok(()); } From 021166867b671166497f32321cf26269c53578ef Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Thu, 5 Feb 2026 16:02:09 -0500 Subject: [PATCH 5/5] reset race flush interval one last time --- bottlecap/src/bin/bottlecap/main.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index d1777d68e..12ae368f7 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -750,6 +750,9 @@ async fn extension_loop_active( .flush_blocking(true, &mut locked_metrics) .await; + // Even though we're shutting down, we need to reset the flush interval to prevent any future flushes + race_flush_interval.reset(); + // Shutdown aggregator services if let Err(e) = logs_aggregator_handle.shutdown() { error!("Failed to shutdown logs aggregator: {e}");