Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 229 additions & 65 deletions bottlecap/Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "158b
libdd-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" }
libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" }
libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" }
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "18b49baba8bfef97060d7edd8b830584d0da3373", default-features = false }
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "18b49baba8bfef97060d7edd8b830584d0da3373", default-features = false }
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "086c9375e0bec1f50e268827d9347a5e3b73b79d", default-features = false }
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "086c9375e0bec1f50e268827d9347a5e3b73b79d", default-features = false }
libddwaf = { version = "1.28.1", git = "https://github.com/DataDog/libddwaf-rust", rev = "d1534a158d976bd4f747bf9fcc58e0712d2d17fc", default-features = false, features = ["serde"] }

[dev-dependencies]
Expand Down
52 changes: 18 additions & 34 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ use bottlecap::{
span_dedup_service,
stats_aggregator::StatsAggregator,
stats_concentrator_service::{StatsConcentratorHandle, StatsConcentratorService},
stats_flusher::{self, StatsFlusher},
stats_flusher,
stats_generator::StatsGenerator,
stats_processor, trace_agent,
trace_aggregator::SendDataBuilderInfo,
trace_aggregator_service::{
AggregatorHandle as TraceAggregatorHandle, AggregatorService as TraceAggregatorService,
},
trace_flusher::{self, TraceFlusher},
trace_flusher,
trace_processor::{self, SendingTraceProcessor},
},
};
Expand Down Expand Up @@ -450,10 +450,8 @@ async fn extension_loop_active(
// Wait for any pending flushes
flushing_service.await_handles().await;
// Final flush to capture any data that accumulated since the last
// spawn_non_blocking(). We pass force_stats=true since this is our
// last opportunity to send data before shutdown.
let mut locked_metrics = flushing_service.metrics_flushers().lock().await;
flushing_service.flush_blocking(true, &mut locked_metrics).await;
// spawn_non_blocking(). This is our last opportunity to send data.
flushing_service.flush_blocking_final().await;
break;
}
}
Expand Down Expand Up @@ -635,19 +633,13 @@ async fn extension_loop_active(
}
}
_ = race_flush_interval.tick() => {
let mut locked_metrics = metrics_flushers.lock().await;
flushing_service
.flush_blocking(false, &mut locked_metrics)
.await;
flushing_service.flush_blocking().await;
race_flush_interval.reset();
}
}
}
// flush
let mut locked_metrics = metrics_flushers.lock().await;
flushing_service
.flush_blocking(false, &mut locked_metrics)
.await;
flushing_service.flush_blocking().await;
race_flush_interval.reset();
let next_response =
extension::next_event(client, &aws_config.runtime_api, &r.extension_id).await;
Expand All @@ -664,10 +656,7 @@ async fn extension_loop_active(
}
}
FlushDecision::Periodic => {
let mut locked_metrics = metrics_flushers.lock().await;
flushing_service
.flush_blocking(false, &mut locked_metrics)
.await;
flushing_service.flush_blocking().await;
race_flush_interval.reset();
}
_ => {
Expand Down Expand Up @@ -695,10 +684,7 @@ async fn extension_loop_active(
}
_ = race_flush_interval.tick() => {
if flush_control.flush_strategy == FlushStrategy::Default {
let mut locked_metrics = metrics_flushers.lock().await;
flushing_service
.flush_blocking(false, &mut locked_metrics)
.await;
flushing_service.flush_blocking().await;
race_flush_interval.reset();
}
}
Expand Down Expand Up @@ -744,11 +730,8 @@ async fn extension_loop_active(
&lifecycle_listener_shutdown_token,
);

// Final flush with force_stats=true since this is our last opportunity
let mut locked_metrics = metrics_flushers.lock().await;
flushing_service
.flush_blocking(true, &mut locked_metrics)
.await;
// Final flush - this is our last opportunity to send data before shutdown
flushing_service.flush_blocking_final().await;

// Even though we're shutting down, we need to reset the flush interval to prevent any future flushes
race_flush_interval.reset();
Expand Down Expand Up @@ -1081,9 +1064,9 @@ fn start_trace_agent(
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
) -> (
Sender<SendDataBuilderInfo>,
Arc<trace_flusher::ServerlessTraceFlusher>,
Arc<trace_flusher::TraceFlusher>,
Arc<trace_processor::ServerlessTraceProcessor>,
Arc<stats_flusher::ServerlessStatsFlusher>,
Arc<stats_flusher::StatsFlusher>,
Arc<ProxyFlusher>,
tokio_util::sync::CancellationToken,
StatsConcentratorHandle,
Expand All @@ -1096,7 +1079,7 @@ fn start_trace_agent(
let stats_aggregator: Arc<TokioMutex<StatsAggregator>> = Arc::new(TokioMutex::new(
StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()),
));
let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new(
let stats_flusher = Arc::new(stats_flusher::StatsFlusher::new(
api_key_factory.clone(),
stats_aggregator.clone(),
Arc::clone(config),
Expand All @@ -1108,7 +1091,7 @@ fn start_trace_agent(
let (trace_aggregator_service, trace_aggregator_handle) = TraceAggregatorService::default();
tokio::spawn(trace_aggregator_service.run());

let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new(
let trace_flusher = Arc::new(trace_flusher::TraceFlusher::new(
trace_aggregator_handle.clone(),
config.clone(),
api_key_factory.clone(),
Expand Down Expand Up @@ -1178,7 +1161,7 @@ async fn start_dogstatsd(
api_key_factory: Arc<ApiKeyFactory>,
config: &Arc<Config>,
) -> (
Arc<TokioMutex<Vec<MetricsFlusher>>>,
Arc<Vec<MetricsFlusher>>,
MetricsAggregatorHandle,
CancellationToken,
) {
Expand All @@ -1200,11 +1183,11 @@ async fn start_dogstatsd(
});

// Get flushers with aggregator handle
let flushers = Arc::new(TokioMutex::new(start_metrics_flushers(
let flushers = Arc::new(start_metrics_flushers(
Arc::clone(&api_key_factory),
&aggregator_handle,
config,
)));
));

// Create Dogstatsd server
let dogstatsd_config = DogStatsDConfig {
Expand Down Expand Up @@ -1394,6 +1377,7 @@ mod flush_handles_tests {
let mut handles = FlushHandles::new();
let handle = tokio::spawn(async {
sleep(Duration::from_millis(5)).await;
Vec::new() // Return empty Vec for stats retry
});
handles.stats_flush_handles.push(handle);

Expand Down
5 changes: 3 additions & 2 deletions bottlecap/src/flushing/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use datadog_protos::metrics::SketchPayload;
use dogstatsd::datadog::Series;
use libdd_trace_protobuf::pb;
use libdd_trace_utils::send_data::SendData;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -32,8 +33,8 @@ pub struct FlushHandles {
pub metric_flush_handles: Vec<JoinHandle<MetricsRetryBatch>>,
/// Handles for proxy flush operations. Returns failed request builders for retry.
pub proxy_flush_handles: Vec<JoinHandle<Vec<reqwest::RequestBuilder>>>,
/// Handles for stats flush operations. Stats don't support retry.
pub stats_flush_handles: Vec<JoinHandle<()>>,
/// Handles for stats flush operations. Returns failed stats payloads for retry.
pub stats_flush_handles: Vec<JoinHandle<Vec<pb::ClientStatsPayload>>>,
}

impl FlushHandles {
Expand Down
Loading
Loading