From 2aa9540da4e13fda751d5ca459f261bc70482d81 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 16:12:37 -0500 Subject: [PATCH 1/9] refactor types and methods we were creating a client every time when flushing traces, now we just use one, also removes unnecessary traits as we are not creating more tracing agents for other use cases --- bottlecap/src/bin/bottlecap/main.rs | 12 +- bottlecap/src/flushing/service.rs | 31 +--- bottlecap/src/traces/hyper_client.rs | 113 ++++++++++++++ bottlecap/src/traces/mod.rs | 1 + bottlecap/src/traces/stats_flusher.rs | 70 +++++---- bottlecap/src/traces/trace_flusher.rs | 204 +++++++------------------- 6 files changed, 222 insertions(+), 209 deletions(-) create mode 100644 bottlecap/src/traces/hyper_client.rs diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 12ae368f7..e93b95d69 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -66,14 +66,14 @@ use bottlecap::{ span_dedup_service, stats_aggregator::StatsAggregator, stats_concentrator_service::{StatsConcentratorHandle, StatsConcentratorService}, - stats_flusher::{self, StatsFlusher}, + stats_flusher, stats_generator::StatsGenerator, stats_processor, trace_agent, trace_aggregator::SendDataBuilderInfo, trace_aggregator_service::{ AggregatorHandle as TraceAggregatorHandle, AggregatorService as TraceAggregatorService, }, - trace_flusher::{self, TraceFlusher}, + trace_flusher, trace_processor::{self, SendingTraceProcessor}, }, }; @@ -1081,9 +1081,9 @@ fn start_trace_agent( appsec_processor: Option>>, ) -> ( Sender, - Arc, + Arc, Arc, - Arc, + Arc, Arc, tokio_util::sync::CancellationToken, StatsConcentratorHandle, @@ -1096,7 +1096,7 @@ fn start_trace_agent( let stats_aggregator: Arc> = Arc::new(TokioMutex::new( StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()), )); - let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new( + let stats_flusher = Arc::new(stats_flusher::StatsFlusher::new( api_key_factory.clone(), stats_aggregator.clone(), Arc::clone(config), @@ -1108,7 +1108,7 @@ fn start_trace_agent( let (trace_aggregator_service, trace_aggregator_handle) = TraceAggregatorService::default(); tokio::spawn(trace_aggregator_service.run()); - let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new( + let trace_flusher = Arc::new(trace_flusher::TraceFlusher::new( trace_aggregator_handle.clone(), config.clone(), api_key_factory.clone(), diff --git a/bottlecap/src/flushing/service.rs b/bottlecap/src/flushing/service.rs index 152a13140..c632d6e36 100644 --- a/bottlecap/src/flushing/service.rs +++ b/bottlecap/src/flushing/service.rs @@ -23,20 +23,11 @@ use crate::traces::{ /// - Spawning non-blocking flush tasks /// - Awaiting pending flush handles with retry logic /// - Performing blocking flushes (spawn + await) -/// -/// # Type Parameters -/// -/// * `TF` - Trace flusher type implementing `TraceFlusher` -/// * `SF` - Stats flusher type implementing `StatsFlusher` -pub struct FlushingService -where - TF: TraceFlusher + Send + Sync + 'static, - SF: StatsFlusher + Send + Sync + 'static, -{ +pub struct FlushingService { // Flushers logs_flusher: LogsFlusher, - trace_flusher: Arc, - stats_flusher: Arc, + trace_flusher: Arc, + stats_flusher: Arc, proxy_flusher: Arc, metrics_flushers: Arc>>, @@ -47,17 +38,13 @@ where handles: FlushHandles, } -impl FlushingService -where - TF: TraceFlusher + Send + Sync + 'static, - SF: StatsFlusher + Send + Sync + 'static, -{ +impl FlushingService { /// Creates a new `FlushingService` with the given flushers. #[must_use] pub fn new( logs_flusher: LogsFlusher, - trace_flusher: Arc, - stats_flusher: Arc, + trace_flusher: Arc, + stats_flusher: Arc, proxy_flusher: Arc, metrics_flushers: Arc>>, metrics_aggr_handle: MetricsAggregatorHandle, @@ -340,11 +327,7 @@ where } } -impl std::fmt::Debug for FlushingService -where - TF: TraceFlusher + Send + Sync + 'static, - SF: StatsFlusher + Send + Sync + 'static, -{ +impl std::fmt::Debug for FlushingService { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FlushingService") .field("handles", &self.handles) diff --git a/bottlecap/src/traces/hyper_client.rs b/bottlecap/src/traces/hyper_client.rs new file mode 100644 index 000000000..99cd60fe5 --- /dev/null +++ b/bottlecap/src/traces/hyper_client.rs @@ -0,0 +1,113 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Hyper-based HTTP client for trace and stats flushers. +//! +//! This module provides the HTTP client type required by `libdd_trace_utils` +//! for sending traces and stats to Datadog intake endpoints. + +use hyper_http_proxy; +use hyper_rustls::HttpsConnectorBuilder; +use libdd_common::{GenericHttpClient, hyper_migration}; +use rustls::RootCertStore; +use rustls_pki_types::CertificateDer; +use std::error::Error; +use std::fs::File; +use std::io::BufReader; +use std::sync::LazyLock; +use tracing::debug; + +/// Type alias for the HTTP client used by trace and stats flushers. +/// +/// This is the client type expected by `libdd_trace_utils::SendData::send()`. +pub type HyperClient = + GenericHttpClient>; + +/// Initialize the crypto provider needed for setting custom root certificates. +fn ensure_crypto_provider_initialized() { + static INIT_CRYPTO_PROVIDER: LazyLock<()> = LazyLock::new(|| { + #[cfg(unix)] + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .expect("Failed to install default CryptoProvider"); + }); + + let () = &*INIT_CRYPTO_PROVIDER; +} + +/// Creates a new hyper-based HTTP client with the given configuration. +/// +/// This client is compatible with `libdd_trace_utils` and supports: +/// - HTTPS proxy configuration +/// - Custom TLS root certificates +/// +/// # Arguments +/// +/// * `proxy_https` - Optional HTTPS proxy URL +/// * `tls_cert_file` - Optional path to a PEM file containing root certificates +/// +/// # Errors +/// +/// Returns an error if: +/// - The proxy URL cannot be parsed +/// - The TLS certificate file cannot be read or parsed +pub fn create_client( + proxy_https: Option<&String>, + tls_cert_file: Option<&String>, +) -> Result> { + // Create the base connector with optional custom TLS config + let connector = if let Some(ca_cert_path) = tls_cert_file { + // Ensure crypto provider is initialized before creating TLS config + ensure_crypto_provider_initialized(); + + // Load the custom certificate + let cert_file = File::open(ca_cert_path)?; + let mut reader = BufReader::new(cert_file); + let certs: Vec = + rustls_pemfile::certs(&mut reader).collect::, _>>()?; + + // Create a root certificate store and add custom certs + let mut root_store = RootCertStore::empty(); + for cert in certs { + root_store.add(cert)?; + } + + // Build the TLS config with custom root certificates + let tls_config = rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + + // Build the HTTPS connector with custom config + let https_connector = HttpsConnectorBuilder::new() + .with_tls_config(tls_config) + .https_or_http() + .enable_http1() + .build(); + + debug!( + "HYPER_CLIENT | Added root certificate from {}", + ca_cert_path + ); + + // Construct the Connector::Https variant directly + libdd_common::connector::Connector::Https(https_connector) + } else { + // Use default connector + libdd_common::connector::Connector::default() + }; + + if let Some(proxy) = proxy_https { + let proxy = + hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::Https, proxy.parse()?); + let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy(connector, proxy)?; + let client = hyper_migration::client_builder().build(proxy_connector); + debug!( + "HYPER_CLIENT | Proxy connector created with proxy: {:?}", + proxy_https + ); + Ok(client) + } else { + let proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?; + Ok(hyper_migration::client_builder().build(proxy_connector)) + } +} diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 5a2a515dc..7d1581c2e 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 pub mod context; +pub mod hyper_client; pub mod propagation; pub mod proxy_aggregator; pub mod proxy_flusher; diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 7f2a0d998..d74f8c562 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -1,7 +1,6 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use async_trait::async_trait; use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; @@ -9,55 +8,41 @@ use tokio::sync::OnceCell; use crate::config; use crate::lifecycle::invocation::processor::S_TO_MS; +use crate::traces::hyper_client::{self, HyperClient}; use crate::traces::stats_aggregator::StatsAggregator; -use crate::traces::trace_flusher::ServerlessTraceFlusher; use dogstatsd::api_key::ApiKeyFactory; use libdd_common::Endpoint; use libdd_trace_protobuf::pb; use libdd_trace_utils::{config_utils::trace_stats_url, stats_utils}; use tracing::{debug, error}; -#[async_trait] -pub trait StatsFlusher { - fn new( - api_key_factory: Arc, - aggregator: Arc>, - config: Arc, - ) -> Self - where - Self: Sized; - /// Flushes stats to the Datadog trace stats intake. - async fn send(&self, traces: Vec); - - async fn flush(&self, force_flush: bool); -} - -#[allow(clippy::module_name_repetitions)] -#[derive(Clone)] -pub struct ServerlessStatsFlusher { - // pub buffer: Arc>>, +pub struct StatsFlusher { aggregator: Arc>, config: Arc, api_key_factory: Arc, endpoint: OnceCell, + /// Cached HTTP client, lazily initialized on first use. + http_client: OnceCell, } -#[async_trait] -impl StatsFlusher for ServerlessStatsFlusher { - fn new( +impl StatsFlusher { + #[must_use] + pub fn new( api_key_factory: Arc, aggregator: Arc>, config: Arc, ) -> Self { - ServerlessStatsFlusher { + StatsFlusher { aggregator, config, api_key_factory, endpoint: OnceCell::new(), + http_client: OnceCell::new(), } } - async fn send(&self, stats: Vec) { + /// Flushes stats to the Datadog trace stats intake. + pub async fn send(&self, stats: Vec) { if stats.is_empty() { return; } @@ -102,10 +87,9 @@ impl StatsFlusher for ServerlessStatsFlusher { let start = std::time::Instant::now(); - let Ok(http_client) = ServerlessTraceFlusher::get_http_client( - self.config.proxy_https.as_ref(), - self.config.tls_cert_file.as_ref(), - ) else { + // Get or create the cached HTTP client + let http_client = self.get_or_init_http_client().await; + let Some(http_client) = http_client else { error!("STATS_FLUSHER | Failed to create HTTP client"); return; }; @@ -114,7 +98,7 @@ impl StatsFlusher for ServerlessStatsFlusher { serialized_stats_payload, endpoint, api_key.as_str(), - Some(&http_client), + Some(http_client), ) .await; let elapsed = start.elapsed(); @@ -131,7 +115,7 @@ impl StatsFlusher for ServerlessStatsFlusher { }; } - async fn flush(&self, force_flush: bool) { + pub async fn flush(&self, force_flush: bool) { let mut guard = self.aggregator.lock().await; let mut stats = guard.get_batch(force_flush).await; @@ -141,4 +125,26 @@ impl StatsFlusher for ServerlessStatsFlusher { stats = guard.get_batch(force_flush).await; } } + /// Returns a reference to the cached HTTP client, initializing it if necessary. + /// + /// The client is created once and reused for all subsequent flushes, + /// providing connection pooling and TLS session reuse. + async fn get_or_init_http_client(&self) -> Option<&HyperClient> { + let client = self + .http_client + .get_or_init(|| async { + match hyper_client::create_client( + self.config.proxy_https.as_ref(), + self.config.tls_cert_file.as_ref(), + ) { + Ok(client) => client, + Err(e) => { + error!("STATS_FLUSHER | Failed to create HTTP client: {e}"); + panic!("STATS_FLUSHER | Cannot proceed without HTTP client"); + } + } + }) + .await; + Some(client) + } } diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 6d88a12d2..3f3af30f8 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -1,67 +1,36 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use async_trait::async_trait; use dogstatsd::api_key::ApiKeyFactory; -use hyper_http_proxy; -use hyper_rustls::HttpsConnectorBuilder; -use libdd_common::{Endpoint, GenericHttpClient, hyper_migration}; +use libdd_common::Endpoint; use libdd_trace_utils::{ config_utils::trace_intake_url_prefixed, send_data::SendDataBuilder, trace_utils::{self, SendData}, }; -use rustls::RootCertStore; -use rustls_pki_types::CertificateDer; -use std::error::Error; -use std::fs::File; -use std::io::BufReader; use std::str::FromStr; use std::sync::Arc; -use std::sync::LazyLock; +use tokio::sync::OnceCell; use tokio::task::JoinSet; use tracing::{debug, error}; use crate::config::Config; use crate::lifecycle::invocation::processor::S_TO_MS; +use crate::traces::hyper_client::{self, HyperClient}; use crate::traces::trace_aggregator_service::AggregatorHandle; -#[async_trait] -pub trait TraceFlusher { - fn new( - aggregator_handle: AggregatorHandle, - config: Arc, - api_key_factory: Arc, - ) -> Self - where - Self: Sized; - /// Given a `Vec`, a tracer payload, send it to the Datadog intake endpoint. - /// Returns the traces back if there was an error sending them. - async fn send( - traces: Vec, - endpoint: Option<&Endpoint>, - proxy_https: &Option, - tls_cert_file: &Option, - ) -> Option>; - - /// Flushes traces by getting every available batch on the aggregator. - /// If `failed_traces` is provided, it will attempt to send those instead of fetching new traces. - /// Returns any traces that failed to send and should be retried. - async fn flush(&self, failed_traces: Option>) -> Option>; -} - -#[derive(Clone)] -#[allow(clippy::module_name_repetitions)] -pub struct ServerlessTraceFlusher { +pub struct TraceFlusher { pub aggregator_handle: AggregatorHandle, pub config: Arc, pub api_key_factory: Arc, pub additional_endpoints: Vec, + /// Cached HTTP client, lazily initialized on first use. + http_client: OnceCell, } -#[async_trait] -impl TraceFlusher for ServerlessTraceFlusher { - fn new( +impl TraceFlusher { + #[must_use] + pub fn new( aggregator_handle: AggregatorHandle, config: Arc, api_key_factory: Arc, @@ -83,15 +52,19 @@ impl TraceFlusher for ServerlessTraceFlusher { } } - ServerlessTraceFlusher { + TraceFlusher { aggregator_handle, config, api_key_factory, additional_endpoints, + http_client: OnceCell::new(), } } - async fn flush(&self, failed_traces: Option>) -> Option> { + /// Flushes traces by getting every available batch on the aggregator. + /// If `failed_traces` is provided, it will attempt to send those instead of fetching new traces. + /// Returns any traces that failed to send and should be retried. + pub async fn flush(&self, failed_traces: Option>) -> Option> { let Some(api_key) = self.api_key_factory.get_api_key().await else { error!( "TRACES | Failed to resolve API key, dropping aggregated data and skipping flushing." @@ -102,6 +75,12 @@ impl TraceFlusher for ServerlessTraceFlusher { return None; }; + // Get or create the cached HTTP client + let Some(http_client) = self.get_or_init_http_client().await else { + error!("TRACES | Failed to create HTTP client, skipping flush"); + return None; + }; + let mut failed_batch: Vec = Vec::new(); if let Some(traces) = failed_traces { @@ -111,13 +90,7 @@ impl TraceFlusher for ServerlessTraceFlusher { "TRACES | Retrying to send {} previously failed batches", traces.len() ); - let retry_result = Self::send( - traces, - None, - &self.config.proxy_https, - &self.config.tls_cert_file, - ) - .await; + let retry_result = Self::send_traces(traces, None, http_client.clone()).await; if retry_result.is_some() { // Still failed, return to retry later return retry_result; @@ -143,18 +116,15 @@ impl TraceFlusher for ServerlessTraceFlusher { .collect(); let traces_clone = traces.clone(); - let proxy_https = self.config.proxy_https.clone(); - let tls_cert_file = self.config.tls_cert_file.clone(); - batch_tasks.spawn(async move { - Self::send(traces_clone, None, &proxy_https, &tls_cert_file).await - }); + let client_clone = http_client.clone(); + batch_tasks + .spawn(async move { Self::send_traces(traces_clone, None, client_clone).await }); for endpoint in self.additional_endpoints.clone() { let traces_clone = traces.clone(); - let proxy_https = self.config.proxy_https.clone(); - let tls_cert_file = self.config.tls_cert_file.clone(); + let client_clone = http_client.clone(); batch_tasks.spawn(async move { - Self::send(traces_clone, Some(&endpoint), &proxy_https, &tls_cert_file).await + Self::send_traces(traces_clone, Some(endpoint), client_clone).await }); } } @@ -171,11 +141,36 @@ impl TraceFlusher for ServerlessTraceFlusher { None } - async fn send( + /// Returns a clone of the cached HTTP client, initializing it if necessary. + /// + /// The client is created once and reused for all subsequent flushes, + /// providing connection pooling and TLS session reuse. + async fn get_or_init_http_client(&self) -> Option { + let client = self + .http_client + .get_or_init(|| async { + match hyper_client::create_client( + self.config.proxy_https.as_ref(), + self.config.tls_cert_file.as_ref(), + ) { + Ok(client) => client, + Err(e) => { + error!("TRACES | Failed to create HTTP client: {e}"); + panic!("TRACES | Cannot proceed without HTTP client"); + } + } + }) + .await; + Some(client.clone()) + } + + /// Sends traces to the Datadog intake endpoint using the provided HTTP client. + /// + /// Returns the traces back if there was an error sending them. + async fn send_traces( traces: Vec, - endpoint: Option<&Endpoint>, - proxy_https: &Option, - tls_cert_file: &Option, + endpoint: Option, + http_client: HyperClient, ) -> Option> { if traces.is_empty() { return None; @@ -185,15 +180,8 @@ impl TraceFlusher for ServerlessTraceFlusher { tokio::task::yield_now().await; debug!("TRACES | Flushing {} traces", coalesced_traces.len()); - let Ok(http_client) = - ServerlessTraceFlusher::get_http_client(proxy_https.as_ref(), tls_cert_file.as_ref()) - else { - error!("TRACES | Failed to create HTTP client"); - return None; - }; - for trace in &coalesced_traces { - let trace_with_endpoint = match endpoint { + let trace_with_endpoint = match &endpoint { Some(additional_endpoint) => trace.with_endpoint(additional_endpoint.clone()), None => trace.clone(), }; @@ -211,81 +199,3 @@ impl TraceFlusher for ServerlessTraceFlusher { None } } - -// Initialize the crypto provider needed for setting custom root certificates -fn ensure_crypto_provider_initialized() { - static INIT_CRYPTO_PROVIDER: LazyLock<()> = LazyLock::new(|| { - #[cfg(unix)] - rustls::crypto::aws_lc_rs::default_provider() - .install_default() - .expect("Failed to install default CryptoProvider"); - }); - - let () = &*INIT_CRYPTO_PROVIDER; -} - -impl ServerlessTraceFlusher { - pub fn get_http_client( - proxy_https: Option<&String>, - tls_cert_file: Option<&String>, - ) -> Result< - GenericHttpClient>, - Box, - > { - // Create the base connector with optional custom TLS config - let connector = if let Some(ca_cert_path) = tls_cert_file { - // Ensure crypto provider is initialized before creating TLS config - ensure_crypto_provider_initialized(); - - // Load the custom certificate - let cert_file = File::open(ca_cert_path)?; - let mut reader = BufReader::new(cert_file); - let certs: Vec = - rustls_pemfile::certs(&mut reader).collect::, _>>()?; - - // Create a root certificate store and add custom certs - let mut root_store = RootCertStore::empty(); - for cert in certs { - root_store.add(cert)?; - } - - // Build the TLS config with custom root certificates - let tls_config = rustls::ClientConfig::builder() - .with_root_certificates(root_store) - .with_no_client_auth(); - - // Build the HTTPS connector with custom config - let https_connector = HttpsConnectorBuilder::new() - .with_tls_config(tls_config) - .https_or_http() - .enable_http1() - .build(); - - debug!( - "TRACES | GET_HTTP_CLIENT | Added root certificate from {}", - ca_cert_path - ); - - // Construct the Connector::Https variant directly - libdd_common::connector::Connector::Https(https_connector) - } else { - // Use default connector - libdd_common::connector::Connector::default() - }; - - if let Some(proxy) = proxy_https { - let proxy = - hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::Https, proxy.parse()?); - let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy(connector, proxy)?; - let client = hyper_migration::client_builder().build(proxy_connector); - debug!( - "TRACES | GET_HTTP_CLIENT | Proxy connector created with proxy: {:?}", - proxy_https - ); - Ok(client) - } else { - let proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?; - Ok(hyper_migration::client_builder().build(proxy_connector)) - } - } -} From cd090c7ec428c2a280d43a39a26a39aa61ff022a Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 16:29:57 -0500 Subject: [PATCH 2/9] never panic --- bottlecap/src/traces/stats_flusher.rs | 27 +++++++++------- bottlecap/src/traces/trace_flusher.rs | 46 +++++++++++++++++---------- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index d74f8c562..b8061613a 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -129,22 +129,25 @@ impl StatsFlusher { /// /// The client is created once and reused for all subsequent flushes, /// providing connection pooling and TLS session reuse. + /// + /// Returns `None` if client creation fails. The error is logged but not cached, + /// allowing retry on subsequent calls. async fn get_or_init_http_client(&self) -> Option<&HyperClient> { - let client = self + match self .http_client - .get_or_init(|| async { - match hyper_client::create_client( + .get_or_try_init(|| async { + hyper_client::create_client( self.config.proxy_https.as_ref(), self.config.tls_cert_file.as_ref(), - ) { - Ok(client) => client, - Err(e) => { - error!("STATS_FLUSHER | Failed to create HTTP client: {e}"); - panic!("STATS_FLUSHER | Cannot proceed without HTTP client"); - } - } + ) }) - .await; - Some(client) + .await + { + Ok(client) => Some(client), + Err(e) => { + error!("STATS_FLUSHER | Failed to create HTTP client: {e}"); + None + } + } } } diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 3f3af30f8..5f30ea691 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -145,31 +145,43 @@ impl TraceFlusher { /// /// The client is created once and reused for all subsequent flushes, /// providing connection pooling and TLS session reuse. + /// + /// Returns `None` if client creation fails. The error is logged but not cached, + /// allowing retry on subsequent calls. async fn get_or_init_http_client(&self) -> Option { - let client = self + match self .http_client - .get_or_init(|| async { - match hyper_client::create_client( + .get_or_try_init(|| async { + hyper_client::create_client( self.config.proxy_https.as_ref(), self.config.tls_cert_file.as_ref(), - ) { - Ok(client) => client, - Err(e) => { - error!("TRACES | Failed to create HTTP client: {e}"); - panic!("TRACES | Cannot proceed without HTTP client"); - } - } + ) }) - .await; - Some(client.clone()) + .await + { + Ok(client) => Some(client.clone()), + Err(e) => { + error!("TRACES | Failed to create HTTP client: {e}"); + None + } + } } /// Sends traces to the Datadog intake endpoint using the provided HTTP client. /// - /// Returns the traces back if there was an error sending them. + /// # Arguments + /// + /// * `traces` - The traces to send + /// * `override_endpoint` - If `Some`, sends to this endpoint instead of the trace's + /// configured endpoint. Used for sending to additional endpoints. + /// * `http_client` - The HTTP client to use for sending + /// + /// # Returns + /// + /// Returns the traces back if there was an error sending them (for retry). async fn send_traces( traces: Vec, - endpoint: Option, + override_endpoint: Option, http_client: HyperClient, ) -> Option> { if traces.is_empty() { @@ -181,12 +193,12 @@ impl TraceFlusher { debug!("TRACES | Flushing {} traces", coalesced_traces.len()); for trace in &coalesced_traces { - let trace_with_endpoint = match &endpoint { - Some(additional_endpoint) => trace.with_endpoint(additional_endpoint.clone()), + let trace_to_send = match &override_endpoint { + Some(endpoint) => trace.with_endpoint(endpoint.clone()), None => trace.clone(), }; - let send_result = trace_with_endpoint.send(&http_client).await.last_result; + let send_result = trace_to_send.send(&http_client).await.last_result; if let Err(e) = send_result { error!("TRACES | Request failed: {e:?}"); From 068c83550e8e600d401b5545c5a60bf7b48caa57 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 16:35:58 -0500 Subject: [PATCH 3/9] add comments on additional endpoints to fix later --- bottlecap/src/traces/trace_flusher.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 5f30ea691..5fdad478c 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -23,6 +23,9 @@ pub struct TraceFlusher { pub aggregator_handle: AggregatorHandle, pub config: Arc, pub api_key_factory: Arc, + /// Additional endpoints for dual-shipping traces to multiple Datadog sites. + /// Configured via `DD_APM_ADDITIONAL_ENDPOINTS` (e.g., sending to both US and EU). + /// Each trace batch is sent to the primary endpoint AND all additional endpoints. pub additional_endpoints: Vec, /// Cached HTTP client, lazily initialized on first use. http_client: OnceCell, @@ -35,8 +38,10 @@ impl TraceFlusher { config: Arc, api_key_factory: Arc, ) -> Self { + // Parse additional endpoints for dual-shipping from config. + // Format: { "https://trace.agent.datadoghq.eu": ["api-key-1", "api-key-2"], ... } + // Each URL + API key combination becomes a separate endpoint. let mut additional_endpoints: Vec = Vec::new(); - for (endpoint_url, api_keys) in config.apm_additional_endpoints.clone() { for api_key in api_keys { let trace_intake_url = trace_intake_url_prefixed(&endpoint_url); @@ -47,7 +52,6 @@ impl TraceFlusher { timeout_ms: config.flush_timeout * S_TO_MS, test_token: None, }; - additional_endpoints.push(endpoint); } } @@ -84,7 +88,12 @@ impl TraceFlusher { let mut failed_batch: Vec = Vec::new(); if let Some(traces) = failed_traces { - // If we have traces from a previous failed attempt, try to send those first + // If we have traces from a previous failed attempt, try to send those first. + // TODO: Currently retries always go to the primary endpoint (None), even if the + // original failure was for an additional endpoint. This means traces that failed + // to send to additional endpoints will be retried to the primary endpoint instead. + // To fix this, we need to track which endpoint each failed trace was destined for, + // possibly by storing (Vec, Option) pairs in failed_batch. if !traces.is_empty() { debug!( "TRACES | Retrying to send {} previously failed batches", @@ -115,11 +124,15 @@ impl TraceFlusher { .map(SendDataBuilder::build) .collect(); + // Send to PRIMARY endpoint (the default endpoint configured in the trace). + // Passing None means "use the endpoint already configured in the SendData". let traces_clone = traces.clone(); let client_clone = http_client.clone(); batch_tasks .spawn(async move { Self::send_traces(traces_clone, None, client_clone).await }); + // Send to ADDITIONAL endpoints for dual-shipping. + // Each additional endpoint gets the same traces, enabling multi-region delivery. for endpoint in self.additional_endpoints.clone() { let traces_clone = traces.clone(); let client_clone = http_client.clone(); @@ -128,6 +141,9 @@ impl TraceFlusher { }); } } + // Collect failed traces from all endpoints (primary + additional). + // Note: We lose track of which endpoint each failure came from here. + // All failures are mixed together and will be retried to the primary endpoint only. while let Some(result) = batch_tasks.join_next().await { if let Ok(Some(mut failed)) = result { failed_batch.append(&mut failed); From 12bcb5f8c86df8d67f0a63b5ded49753163cb9eb Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 16:49:20 -0500 Subject: [PATCH 4/9] add todo comment --- bottlecap/src/traces/stats_flusher.rs | 3 +++ bottlecap/src/traces/trace_flusher.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index b8061613a..1ab96328a 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -22,6 +22,9 @@ pub struct StatsFlusher { api_key_factory: Arc, endpoint: OnceCell, /// Cached HTTP client, lazily initialized on first use. + /// TODO: StatsFlusher and TraceFlusher both hit trace.agent.datadoghq.{site} and could + /// share a single HTTP client for better connection pooling. Consider using a + /// SharedHyperClient wrapper passed to both flushers from main.rs. http_client: OnceCell, } diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 5fdad478c..f08b24751 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -28,6 +28,9 @@ pub struct TraceFlusher { /// Each trace batch is sent to the primary endpoint AND all additional endpoints. pub additional_endpoints: Vec, /// Cached HTTP client, lazily initialized on first use. + /// TODO: TraceFlusher and StatsFlusher both hit trace.agent.datadoghq.{site} and could + /// share a single HTTP client for better connection pooling. Consider using a + /// SharedHyperClient wrapper passed to both flushers from main.rs. http_client: OnceCell, } From b8b40d2226bb84844935e1b6d4b9f356a7d985bb Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 17:31:02 -0500 Subject: [PATCH 5/9] add stats retry --- bottlecap/src/flushing/handles.rs | 5 +- bottlecap/src/flushing/service.rs | 30 +++++++--- bottlecap/src/traces/stats_flusher.rs | 85 ++++++++++++++++++++------- 3 files changed, 89 insertions(+), 31 deletions(-) diff --git a/bottlecap/src/flushing/handles.rs b/bottlecap/src/flushing/handles.rs index 851d8e4a9..61376ac4b 100644 --- a/bottlecap/src/flushing/handles.rs +++ b/bottlecap/src/flushing/handles.rs @@ -2,6 +2,7 @@ use datadog_protos::metrics::SketchPayload; use dogstatsd::datadog::Series; +use libdd_trace_protobuf::pb; use libdd_trace_utils::send_data::SendData; use tokio::task::JoinHandle; @@ -32,8 +33,8 @@ pub struct FlushHandles { pub metric_flush_handles: Vec>, /// Handles for proxy flush operations. Returns failed request builders for retry. pub proxy_flush_handles: Vec>>, - /// Handles for stats flush operations. Stats don't support retry. - pub stats_flush_handles: Vec>, + /// Handles for stats flush operations. Returns failed stats payloads for retry. + pub stats_flush_handles: Vec>>, } impl FlushHandles { diff --git a/bottlecap/src/flushing/service.rs b/bottlecap/src/flushing/service.rs index c632d6e36..046f65649 100644 --- a/bottlecap/src/flushing/service.rs +++ b/bottlecap/src/flushing/service.rs @@ -122,11 +122,13 @@ impl FlushingService { self.handles.metric_flush_handles.push(handle); } - // Spawn stats flush (fire-and-forget, no retry) + // Spawn stats flush let sf = Arc::clone(&self.stats_flusher); self.handles .stats_flush_handles - .push(tokio::spawn(async move { sf.flush(false).await })); + .push(tokio::spawn(async move { + sf.flush(false, None).await.unwrap_or_default() + })); // Spawn proxy flush let pf = self.proxy_flusher.clone(); @@ -153,11 +155,25 @@ impl FlushingService { let mut joinset = tokio::task::JoinSet::new(); let mut flush_error = false; - // Await stats handles (no retry) + // Await stats handles with retry for handle in self.handles.stats_flush_handles.drain(..) { - if let Err(e) = handle.await { - error!("FLUSHING_SERVICE | stats flush error {e:?}"); - flush_error = true; + match handle.await { + Ok(retry) => { + let sf = self.stats_flusher.clone(); + if !retry.is_empty() { + debug!( + "FLUSHING_SERVICE | redriving {:?} stats payloads", + retry.len() + ); + joinset.spawn(async move { + sf.flush(false, Some(retry)).await; + }); + } + } + Err(e) => { + error!("FLUSHING_SERVICE | stats flush error {e:?}"); + flush_error = true; + } } } @@ -312,7 +328,7 @@ impl FlushingService { self.logs_flusher.flush(None), futures::future::join_all(metrics_futures), self.trace_flusher.flush(None), - self.stats_flusher.flush(force_stats), + self.stats_flusher.flush(force_stats, None), self.proxy_flusher.flush(None), ); } diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 1ab96328a..222dddbd7 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -22,9 +22,8 @@ pub struct StatsFlusher { api_key_factory: Arc, endpoint: OnceCell, /// Cached HTTP client, lazily initialized on first use. - /// TODO: StatsFlusher and TraceFlusher both hit trace.agent.datadoghq.{site} and could - /// share a single HTTP client for better connection pooling. Consider using a - /// SharedHyperClient wrapper passed to both flushers from main.rs. + /// TODO: `StatsFlusher` and `TraceFlusher` both hit trace.agent.datadoghq.{site} and could + /// share a single HTTP client for better connection pooling. http_client: OnceCell, } @@ -45,14 +44,20 @@ impl StatsFlusher { } /// Flushes stats to the Datadog trace stats intake. - pub async fn send(&self, stats: Vec) { + /// + /// Returns `None` on success, or `Some(failed_stats)` if the flush failed and should be retried. + pub async fn send( + &self, + stats: Vec, + ) -> Option> { if stats.is_empty() { - return; + return None; } let Some(api_key) = self.api_key_factory.get_api_key().await else { - error!("Skipping flushing stats: Failed to resolve API key"); - return; + error!("STATS | Skipping flushing stats: Failed to resolve API key"); + // No API key means we can't send - don't retry as it won't help + return None; }; let api_key_clone = api_key.to_string(); @@ -72,17 +77,18 @@ impl StatsFlusher { }) .await; - debug!("Flushing {} stats", stats.len()); + debug!("STATS | Flushing {} stats", stats.len()); - let stats_payload = stats_utils::construct_stats_payload(stats); + let stats_payload = stats_utils::construct_stats_payload(stats.clone()); - debug!("Stats payload to be sent: {stats_payload:?}"); + debug!("STATS | Stats payload to be sent: {stats_payload:?}"); let serialized_stats_payload = match stats_utils::serialize_stats_payload(stats_payload) { Ok(res) => res, Err(err) => { - error!("Failed to serialize stats payload, dropping stats: {err}"); - return; + // Serialization errors are permanent - data is malformed, don't retry + error!("STATS | Failed to serialize stats payload, dropping stats: {err}"); + return None; } }; @@ -93,8 +99,8 @@ impl StatsFlusher { // Get or create the cached HTTP client let http_client = self.get_or_init_http_client().await; let Some(http_client) = http_client else { - error!("STATS_FLUSHER | Failed to create HTTP client"); - return; + error!("STATS | Failed to create HTTP client, will retry"); + return Some(stats); }; let resp = stats_utils::send_stats_payload_with_client( @@ -106,27 +112,62 @@ impl StatsFlusher { .await; let elapsed = start.elapsed(); debug!( - "Stats request to {} took {} ms", + "STATS | Stats request to {} took {} ms", stats_url, elapsed.as_millis() ); match resp { - Ok(()) => debug!("Successfully flushed stats"), + Ok(()) => { + debug!("STATS | Successfully flushed stats"); + None + } Err(e) => { - error!("Error sending stats: {e:?}"); + // Network/server errors are temporary - return stats for retry + error!("STATS | Error sending stats: {e:?}"); + Some(stats) } - }; + } } - pub async fn flush(&self, force_flush: bool) { - let mut guard = self.aggregator.lock().await; + /// Flushes stats from the aggregator. + /// + /// Returns `None` on success, or `Some(failed_stats)` if any flush failed and should be retried. + /// If `failed_stats` is provided, it will attempt to send those first before fetching new stats. + pub async fn flush( + &self, + force_flush: bool, + failed_stats: Option>, + ) -> Option> { + let mut all_failed: Vec = Vec::new(); + + // First, retry any previously failed stats + if let Some(retry_stats) = failed_stats { + if !retry_stats.is_empty() { + debug!( + "STATS | Retrying {} previously failed stats", + retry_stats.len() + ); + if let Some(still_failed) = self.send(retry_stats).await { + all_failed.extend(still_failed); + } + } + } + // Then flush new stats from the aggregator + let mut guard = self.aggregator.lock().await; let mut stats = guard.get_batch(force_flush).await; while !stats.is_empty() { - self.send(stats).await; - + if let Some(failed) = self.send(stats).await { + all_failed.extend(failed); + } stats = guard.get_batch(force_flush).await; } + + if all_failed.is_empty() { + None + } else { + Some(all_failed) + } } /// Returns a reference to the cached HTTP client, initializing it if necessary. /// From 7590bd259ba5ec1713879242f7b509d64097d6b2 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 4 Feb 2026 17:31:17 -0500 Subject: [PATCH 6/9] update docs --- bottlecap/src/bin/bottlecap/main.rs | 1 + bottlecap/src/traces/trace_flusher.rs | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index e93b95d69..185001f20 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -1394,6 +1394,7 @@ mod flush_handles_tests { let mut handles = FlushHandles::new(); let handle = tokio::spawn(async { sleep(Duration::from_millis(5)).await; + Vec::new() // Return empty Vec for stats retry }); handles.stats_flush_handles.push(handle); diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index f08b24751..811751d8b 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -28,9 +28,8 @@ pub struct TraceFlusher { /// Each trace batch is sent to the primary endpoint AND all additional endpoints. pub additional_endpoints: Vec, /// Cached HTTP client, lazily initialized on first use. - /// TODO: TraceFlusher and StatsFlusher both hit trace.agent.datadoghq.{site} and could - /// share a single HTTP client for better connection pooling. Consider using a - /// SharedHyperClient wrapper passed to both flushers from main.rs. + /// TODO: `TraceFlusher` and `StatsFlusher` both hit trace.agent.datadoghq.{site} and could + /// share a single HTTP client for better connection pooling. http_client: OnceCell, } From d60314e05fd5c46521d29761b96144a452ecb4cd Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Thu, 5 Feb 2026 11:26:57 -0500 Subject: [PATCH 7/9] remove locking on metrics --- bottlecap/Cargo.lock | 4 +- bottlecap/Cargo.toml | 4 +- bottlecap/src/bin/bottlecap/main.rs | 40 +++------- bottlecap/src/flushing/service.rs | 84 ++++++++++----------- bottlecap/tests/metrics_integration_test.rs | 2 +- 5 files changed, 55 insertions(+), 79 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 2e7022912..3dd8ee6e6 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -727,7 +727,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=18b49baba8bfef97060d7edd8b830584d0da3373#18b49baba8bfef97060d7edd8b830584d0da3373" +source = "git+https://github.com/DataDog/serverless-components?rev=e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1#e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1" dependencies = [ "reqwest", "rustls", @@ -836,7 +836,7 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=18b49baba8bfef97060d7edd8b830584d0da3373#18b49baba8bfef97060d7edd8b830584d0da3373" +source = "git+https://github.com/DataDog/serverless-components?rev=e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1#e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1" dependencies = [ "datadog-fips", "datadog-protos", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 5253169ef..13ee806af 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -71,8 +71,8 @@ libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "158b libdd-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" } libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" } libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "18b49baba8bfef97060d7edd8b830584d0da3373", default-features = false } -datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "18b49baba8bfef97060d7edd8b830584d0da3373", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1", default-features = false } +datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1", default-features = false } libddwaf = { version = "1.28.1", git = "https://github.com/DataDog/libddwaf-rust", rev = "d1534a158d976bd4f747bf9fcc58e0712d2d17fc", default-features = false, features = ["serde"] } [dev-dependencies] diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 185001f20..6920a0af7 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -450,10 +450,8 @@ async fn extension_loop_active( // Wait for any pending flushes flushing_service.await_handles().await; // Final flush to capture any data that accumulated since the last - // spawn_non_blocking(). We pass force_stats=true since this is our - // last opportunity to send data before shutdown. - let mut locked_metrics = flushing_service.metrics_flushers().lock().await; - flushing_service.flush_blocking(true, &mut locked_metrics).await; + // spawn_non_blocking(). This is our last opportunity to send data. + flushing_service.flush_blocking_final().await; break; } } @@ -635,19 +633,13 @@ async fn extension_loop_active( } } _ = race_flush_interval.tick() => { - let mut locked_metrics = metrics_flushers.lock().await; - flushing_service - .flush_blocking(false, &mut locked_metrics) - .await; + flushing_service.flush_blocking().await; race_flush_interval.reset(); } } } // flush - let mut locked_metrics = metrics_flushers.lock().await; - flushing_service - .flush_blocking(false, &mut locked_metrics) - .await; + flushing_service.flush_blocking().await; race_flush_interval.reset(); let next_response = extension::next_event(client, &aws_config.runtime_api, &r.extension_id).await; @@ -664,10 +656,7 @@ async fn extension_loop_active( } } FlushDecision::Periodic => { - let mut locked_metrics = metrics_flushers.lock().await; - flushing_service - .flush_blocking(false, &mut locked_metrics) - .await; + flushing_service.flush_blocking().await; race_flush_interval.reset(); } _ => { @@ -695,10 +684,7 @@ async fn extension_loop_active( } _ = race_flush_interval.tick() => { if flush_control.flush_strategy == FlushStrategy::Default { - let mut locked_metrics = metrics_flushers.lock().await; - flushing_service - .flush_blocking(false, &mut locked_metrics) - .await; + flushing_service.flush_blocking().await; race_flush_interval.reset(); } } @@ -744,11 +730,8 @@ async fn extension_loop_active( &lifecycle_listener_shutdown_token, ); - // Final flush with force_stats=true since this is our last opportunity - let mut locked_metrics = metrics_flushers.lock().await; - flushing_service - .flush_blocking(true, &mut locked_metrics) - .await; + // Final flush - this is our last opportunity to send data before shutdown + flushing_service.flush_blocking_final().await; // Even though we're shutting down, we need to reset the flush interval to prevent any future flushes race_flush_interval.reset(); @@ -1178,7 +1161,7 @@ async fn start_dogstatsd( api_key_factory: Arc, config: &Arc, ) -> ( - Arc>>, + Arc>, MetricsAggregatorHandle, CancellationToken, ) { @@ -1200,17 +1183,18 @@ async fn start_dogstatsd( }); // Get flushers with aggregator handle - let flushers = Arc::new(TokioMutex::new(start_metrics_flushers( + let flushers = Arc::new(start_metrics_flushers( Arc::clone(&api_key_factory), &aggregator_handle, config, - ))); + )); // Create Dogstatsd server let dogstatsd_config = DogStatsDConfig { host: EXTENSION_HOST.to_string(), port: DOGSTATSD_PORT, metric_namespace: config.statsd_metric_namespace.clone(), + windows_pipe_name: None, }; let cancel_token = tokio_util::sync::CancellationToken::new(); let dogstatsd_agent = DogStatsD::new( diff --git a/bottlecap/src/flushing/service.rs b/bottlecap/src/flushing/service.rs index 046f65649..973a72cfc 100644 --- a/bottlecap/src/flushing/service.rs +++ b/bottlecap/src/flushing/service.rs @@ -2,7 +2,6 @@ use std::sync::Arc; -use tokio::sync::Mutex as TokioMutex; use tracing::{debug, error}; use dogstatsd::{ @@ -29,7 +28,7 @@ pub struct FlushingService { trace_flusher: Arc, stats_flusher: Arc, proxy_flusher: Arc, - metrics_flushers: Arc>>, + metrics_flushers: Arc>, // Metrics aggregator handle for getting data to flush metrics_aggr_handle: MetricsAggregatorHandle, @@ -46,7 +45,7 @@ impl FlushingService { trace_flusher: Arc, stats_flusher: Arc, proxy_flusher: Arc, - metrics_flushers: Arc>>, + metrics_flushers: Arc>, metrics_aggr_handle: MetricsAggregatorHandle, ) -> Self { Self { @@ -90,22 +89,17 @@ impl FlushingService { // Spawn metrics flush // First get the data from aggregator, then spawn flush tasks for each flusher - let (metrics_flushers_copy, series, sketches) = { - let locked_metrics = self.metrics_flushers.lock().await; - let flush_response = self - .metrics_aggr_handle - .clone() - .flush() - .await - .expect("can't flush metrics handle"); - ( - locked_metrics.clone(), - flush_response.series, - flush_response.distributions, - ) - }; + let flush_response = self + .metrics_aggr_handle + .clone() + .flush() + .await + .expect("can't flush metrics handle"); + let series = flush_response.series; + let sketches = flush_response.distributions; - for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() { + for (idx, flusher) in self.metrics_flushers.iter().enumerate() { + let flusher = flusher.clone(); let series_clone = series.clone(); let sketches_clone = sketches.clone(); let handle = tokio::spawn(async move { @@ -240,8 +234,7 @@ impl FlushingService { retry_batch.sketches.len() ); joinset.spawn(async move { - let mut locked_flushers = mf.lock().await; - if let Some(flusher) = locked_flushers.get_mut(retry_batch.flusher_id) { + if let Some(flusher) = mf.get(retry_batch.flusher_id) { flusher .flush_metrics(retry_batch.series, retry_batch.sketches) .await; @@ -288,34 +281,42 @@ impl FlushingService { flush_error } - /// Performs a blocking flush of all data. + /// Performs a blocking flush of all telemetry data. /// - /// This method flushes all data synchronously using `tokio::join!` for parallelism. - /// Unlike `spawn_non_blocking`, this waits for all flushes to complete before returning. + /// Flushes logs, metrics (series and distributions), traces, stats, and APM proxy + /// data in parallel using `tokio::join!`. Unlike `spawn_non_blocking`, this waits + /// for all flushes to complete before returning. /// - /// # Arguments + /// The stats flusher respects its normal timing constraints (time-based bucketing), + /// which may result in some stats being held back until the next flush cycle. + pub async fn flush_blocking(&self) { + self.flush_blocking_inner(false).await; + } + + /// Performs a final blocking flush of all telemetry data before shutdown. /// - /// * `force_stats` - If `true`, forces the stats flusher to flush immediately - /// regardless of timing constraints. - /// * `metrics_flushers` - Mutable slice of metrics flushers. The caller must acquire - /// the lock before calling this method. + /// Flushes logs, metrics (series and distributions), traces, stats, and APM proxy + /// data in parallel. Unlike `flush_blocking`, this forces the stats flusher to + /// flush immediately regardless of its normal timing constraints. /// - /// # Note + /// Use this during shutdown when this is the last opportunity to send data. + pub async fn flush_blocking_final(&self) { + self.flush_blocking_inner(true).await; + } + + /// Internal implementation for blocking flush operations. /// - /// TODO: The caller must acquire the lock on `metrics_flushers` and pass a mutable slice - /// because `MetricsFlusher::flush_metrics` requires `&mut self`. This creates awkward - /// ergonomics. Consider modifying the `dogstatsd` crate to use interior mutability - /// (e.g., `Arc>` internally) so `flush_metrics` can take `&self`, allowing - /// this method to handle locking internally. - pub async fn flush_blocking(&self, force_stats: bool, metrics_flushers: &mut [MetricsFlusher]) { + /// Fetches metrics from the aggregator and flushes all data types in parallel. + async fn flush_blocking_inner(&self, force_stats: bool) { let flush_response = self .metrics_aggr_handle .flush() .await .expect("can't flush metrics aggr handle"); - let metrics_futures: Vec<_> = metrics_flushers - .iter_mut() + let metrics_futures: Vec<_> = self + .metrics_flushers + .iter() .map(|f| { f.flush_metrics( flush_response.series.clone(), @@ -332,15 +333,6 @@ impl FlushingService { self.proxy_flusher.flush(None), ); } - - /// Returns a reference to the metrics flushers mutex for external locking. - /// - /// This is useful when you need to lock the metrics flushers and pass them - /// to `flush_blocking` or `flush_blocking_with_interval`. - #[must_use] - pub fn metrics_flushers(&self) -> &Arc>> { - &self.metrics_flushers - } } impl std::fmt::Debug for FlushingService { diff --git a/bottlecap/tests/metrics_integration_test.rs b/bottlecap/tests/metrics_integration_test.rs index c7ca6b134..2a6c4ae58 100644 --- a/bottlecap/tests/metrics_integration_test.rs +++ b/bottlecap/tests/metrics_integration_test.rs @@ -57,7 +57,7 @@ async fn test_enhanced_metrics() { retry_strategy: dogstatsd::datadog::RetryStrategy::Immediate(1), compression_level: 6, }; - let mut metrics_flusher = MetricsFlusher::new(flusher_config); + let metrics_flusher = MetricsFlusher::new(flusher_config); let lambda_enhanced_metrics = enhanced_metrics::new(metrics_aggr_handle.clone(), Arc::clone(&arc_config)); From baeba5a2d995c58fa668542066a34e7659c2f72b Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Thu, 12 Feb 2026 16:04:42 -0500 Subject: [PATCH 8/9] update hash --- bottlecap/Cargo.lock | 4 ++-- bottlecap/Cargo.toml | 4 ++-- bottlecap/src/bin/bottlecap/main.rs | 1 - 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 3dd8ee6e6..3fab2806d 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -727,7 +727,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1#e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1" +source = "git+https://github.com/DataDog/serverless-components?rev=086c9375e0bec1f50e268827d9347a5e3b73b79d#086c9375e0bec1f50e268827d9347a5e3b73b79d" dependencies = [ "reqwest", "rustls", @@ -836,7 +836,7 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1#e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1" +source = "git+https://github.com/DataDog/serverless-components?rev=086c9375e0bec1f50e268827d9347a5e3b73b79d#086c9375e0bec1f50e268827d9347a5e3b73b79d" dependencies = [ "datadog-fips", "datadog-protos", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 13ee806af..ccc89c80c 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -71,8 +71,8 @@ libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "158b libdd-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" } libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" } libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1", default-features = false } -datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "086c9375e0bec1f50e268827d9347a5e3b73b79d", default-features = false } +datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "086c9375e0bec1f50e268827d9347a5e3b73b79d", default-features = false } libddwaf = { version = "1.28.1", git = "https://github.com/DataDog/libddwaf-rust", rev = "d1534a158d976bd4f747bf9fcc58e0712d2d17fc", default-features = false, features = ["serde"] } [dev-dependencies] diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 6920a0af7..092477b35 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -1194,7 +1194,6 @@ async fn start_dogstatsd( host: EXTENSION_HOST.to_string(), port: DOGSTATSD_PORT, metric_namespace: config.statsd_metric_namespace.clone(), - windows_pipe_name: None, }; let cancel_token = tokio_util::sync::CancellationToken::new(); let dogstatsd_agent = DogStatsD::new( From ce5253f811ea42c1078199eb1bbe5cccc32352bb Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Thu, 12 Feb 2026 16:21:27 -0500 Subject: [PATCH 9/9] dependencies update --- bottlecap/Cargo.lock | 290 +++++++++++++++++++++++++++++++++---------- 1 file changed, 227 insertions(+), 63 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 3fab2806d..5091d2a6f 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -37,9 +37,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "anyhow" -version = "1.0.100" +version = "1.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" [[package]] name = "ascii-canvas" @@ -238,7 +238,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -289,9 +289,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.37.0" +version = "0.37.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c34dda4df7017c8db52132f0f8a2e0f8161649d15723ed63fc00c82d0f2081a" +checksum = "b092fe214090261288111db7a2b2c2118e5a7f30dc2569f1732c4069a6840549" dependencies = [ "cc", "cmake", @@ -391,7 +391,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -762,9 +762,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" +checksum = "cc3dc5ad92c2e2d1c193bbbbdf2ea477cb81331de4f3103f267ca18368b988c4" dependencies = [ "powerfmt", ] @@ -786,7 +786,7 @@ checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", "unicode-xid", ] @@ -830,7 +830,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -872,9 +872,9 @@ checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" [[package]] name = "ena" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d248bdd43ce613d87415282f69b9bb99d947d290b10962dd6c56233312c2ad5" +checksum = "eabffdaee24bd1bf95c5ef7cec31260444317e72ea56c4c91750e8b7ee58d5f1" dependencies = [ "log", ] @@ -1088,7 +1088,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -1158,6 +1158,19 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", + "wasip3", +] + [[package]] name = "glob" version = "0.3.3" @@ -1463,7 +1476,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots 1.0.5", + "webpki-roots 1.0.6", ] [[package]] @@ -1583,6 +1596,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + [[package]] name = "idna" version = "1.1.0" @@ -1622,6 +1641,8 @@ checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", "hashbrown 0.16.1", + "serde", + "serde_core", ] [[package]] @@ -1745,6 +1766,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "levenshtein" version = "1.0.5" @@ -1753,9 +1780,9 @@ checksum = "db13adb97ab515a3691f56e4dbab09283d0b86cb45abd991d8634a9d6f501760" [[package]] name = "libc" -version = "0.2.180" +version = "0.2.181" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" +checksum = "459427e2af2b9c839b132acb702a1c654d95e10f8c326bfc2ad11310e458b1c5" [[package]] name = "libdd-common" @@ -1924,7 +1951,7 @@ checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" dependencies = [ "bitflags 2.10.0", "libc", - "redox_syscall 0.7.0", + "redox_syscall 0.7.1", ] [[package]] @@ -1986,9 +2013,9 @@ checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" [[package]] name = "memchr" -version = "2.7.6" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "mime" @@ -2258,7 +2285,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -2319,7 +2346,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -2402,7 +2429,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -2422,16 +2449,16 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", "version_check", "yansi", ] [[package]] name = "proptest" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bee689443a2bd0a16ab0348b52ee43e3b2d1b1f931c8aa5c9f8de4c86fbe8c40" +checksum = "37566cb3fdacef14c0737f9546df7cfeadbfbc9fef10991038bf5015d0c80532" dependencies = [ "bit-set 0.8.0", "bit-vec 0.8.0", @@ -2482,7 +2509,7 @@ dependencies = [ "prost 0.13.5", "prost-types", "regex", - "syn 2.0.114", + "syn 2.0.115", "tempfile", ] @@ -2496,7 +2523,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -2509,7 +2536,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -2728,9 +2755,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49f3fe0889e69e2ae9e41f4d6c4c0181701d00e4697b356fb1f74173a5e0ee27" +checksum = "35985aa610addc02e24fc232012c86fd11f14111180f902b67e2d5331f8ebf2b" dependencies = [ "bitflags 2.10.0", ] @@ -2814,7 +2841,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots 1.0.5", + "webpki-roots 1.0.6", ] [[package]] @@ -2969,9 +2996,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.22" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" [[package]] name = "safemem" @@ -3041,6 +3068,12 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + [[package]] name = "serde" version = "1.0.228" @@ -3099,7 +3132,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -3196,7 +3229,7 @@ checksum = "6f50427f258fb77356e4cd4aa0e87e2bd2c66dbcee41dc405282cae2bfc26c83" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -3339,9 +3372,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.114" +version = "2.0.115" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4d107df263a3013ef9b1879b0df87d706ff80f65a86ea879bd9c31f9b307c2a" +checksum = "6e614ed320ac28113fa64972c4262d5dbc89deacdfd00c34a3e4cea073243c12" dependencies = [ "proc-macro2", "quote", @@ -3365,7 +3398,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -3381,12 +3414,12 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.24.0" +version = "3.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" +checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" dependencies = [ "fastrand", - "getrandom 0.3.4", + "getrandom 0.4.1", "once_cell", "rustix 1.1.3", "windows-sys 0.61.2", @@ -3429,7 +3462,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -3440,7 +3473,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -3561,7 +3594,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -3656,7 +3689,7 @@ dependencies = [ "prost-build", "prost-types", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -3757,7 +3790,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -3841,9 +3874,9 @@ checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" [[package]] name = "unicode-ident" -version = "1.0.22" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +checksum = "537dd038a89878be9b64dd4bd1b260315c1bb94f4d784956b81e27a088d9a09e" [[package]] name = "unicode-xid" @@ -3951,7 +3984,16 @@ version = "1.0.1+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.46.0", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen 0.51.0", ] [[package]] @@ -4000,7 +4042,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", "wasm-bindgen-shared", ] @@ -4013,6 +4055,40 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap 2.13.0", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags 2.10.0", + "hashbrown 0.15.5", + "indexmap 2.13.0", + "semver", +] + [[package]] name = "web-sys" version = "0.3.85" @@ -4039,14 +4115,14 @@ version = "0.26.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" dependencies = [ - "webpki-roots 1.0.5", + "webpki-roots 1.0.6", ] [[package]] name = "webpki-roots" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12bed680863276c63889429bfd6cab3b99943659923822de1c8a39c49e4d722c" +checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed" dependencies = [ "rustls-pki-types", ] @@ -4271,6 +4347,94 @@ version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap 2.13.0", + "prettyplease", + "syn 2.0.115", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.115", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags 2.10.0", + "indexmap 2.13.0", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap 2.13.0", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + [[package]] name = "writeable" version = "0.6.2" @@ -4312,28 +4476,28 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", "synstructure", ] [[package]] name = "zerocopy" -version = "0.8.38" +version = "0.8.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57cf3aa6855b23711ee9852dfc97dfaa51c45feaba5b645d0c777414d494a961" +checksum = "db6d35d663eadb6c932438e763b262fe1a70987f9ae936e60158176d710cae4a" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.38" +version = "0.8.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a616990af1a287837c4fe6596ad77ef57948f787e46ce28e166facc0cc1cb75" +checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] @@ -4353,7 +4517,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", "synstructure", ] @@ -4393,14 +4557,14 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.115", ] [[package]] name = "zmij" -version = "1.0.19" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" [[package]] name = "zstd"