Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/StatsdClient/Bufferize/IStatsBufferize.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using StatsdClient.Statistic;

namespace StatsdClient.Bufferize
{
/// <summary>
/// IStatsBufferize defines the contract for sending stats to the pipeline.
/// </summary>
internal interface IStatsBufferize : IDisposable
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming of this interface is confusing. It doesn't really capture what it is trying to do. I think IStatsSender would make more sense.

Then perhaps SynchronousSender and AsynchronousBufferizedSender implementing them.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed as suggested: IStatsBufferizeIStatsSender, StatsBufferizeAsynchronousBufferizedSender. Also renamed the factory types for consistency (IStatsBufferizeFactoryIStatsSenderFactory, StatsBufferizeFactoryStatsSenderFactory).

{
bool TryDequeueFromPool(out Stats stats);

void Send(Stats stats);

void Flush();
}
}
5 changes: 3 additions & 2 deletions src/StatsdClient/Bufferize/IStatsBufferizeFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace StatsdClient.Bufferize
/// </summary>
internal interface IStatsBufferizeFactory
{
StatsBufferize CreateStatsBufferize(
IStatsBufferize CreateStatsBufferize(
StatsRouter statsRouter,
int workerMaxItemCount,
TimeSpan? blockingQueueTimeout,
Expand All @@ -38,6 +38,7 @@ Telemetry CreateTelemetry(
TimeSpan flushInterval,
ITransport transport,
string[] globalTags,
Action<Exception> optionalExceptionHandler);
Action<Exception> optionalExceptionHandler,
bool synchronousMode = false);
}
}
2 changes: 1 addition & 1 deletion src/StatsdClient/Bufferize/StatsBufferize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace StatsdClient.Bufferize
/// <summary>
/// StatsBufferize bufferizes metrics before sending them.
/// </summary>
internal class StatsBufferize : IDisposable
internal class StatsBufferize : IStatsBufferize
{
private readonly AsynchronousWorker<Stats> _worker;

Expand Down
7 changes: 4 additions & 3 deletions src/StatsdClient/Bufferize/StatsBufferizeFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace StatsdClient.Bufferize
{
internal class StatsBufferizeFactory : IStatsBufferizeFactory
{
public StatsBufferize CreateStatsBufferize(
public IStatsBufferize CreateStatsBufferize(
StatsRouter statsRouter,
int workerMaxItemCount,
TimeSpan? blockingQueueTimeout,
Expand Down Expand Up @@ -49,9 +49,10 @@ public Telemetry CreateTelemetry(
TimeSpan flushInterval,
ITransport transport,
string[] globalTags,
Action<Exception> optionalExceptionHandler)
Action<Exception> optionalExceptionHandler,
bool synchronousMode = false)
{
return new Telemetry(metricSerializer, assemblyVersion, flushInterval, transport, globalTags, optionalExceptionHandler);
return new Telemetry(metricSerializer, assemblyVersion, flushInterval, transport, globalTags, optionalExceptionHandler, synchronousMode);
}

public ITransport CreateNamedPipeTransport(string pipeName)
Expand Down
72 changes: 72 additions & 0 deletions src/StatsdClient/Bufferize/SynchronousSender.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using System;
using StatsdClient.Statistic;

namespace StatsdClient.Bufferize
{
/// <summary>
/// SynchronousSender sends metrics synchronously on the calling thread.
/// This is intended for serverless environments (e.g., AWS Lambda) where
/// background threads may be frozen between invocations.
/// </summary>
internal class SynchronousSender : IStatsBufferize
{
[ThreadStatic]
private static Stats _threadLocalStats;

private readonly StatsRouter _statsRouter;
private readonly Action<Exception> _optionalExceptionHandler;
private readonly object _lock = new object();

public SynchronousSender(StatsRouter statsRouter, Action<Exception> optionalExceptionHandler = null)
{
_statsRouter = statsRouter ?? throw new ArgumentNullException(nameof(statsRouter));
_optionalExceptionHandler = optionalExceptionHandler;
}

public bool TryDequeueFromPool(out Stats stats)
{
if (_threadLocalStats == null)
{
_threadLocalStats = new Stats();
}

stats = _threadLocalStats;
return true;
}

public void Send(Stats stats)
{
try
{
lock (_lock)
{
_statsRouter.Route(stats);
}
}
catch (Exception e)
{
_optionalExceptionHandler?.Invoke(e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a good idea to swallow the exceptions if a handler isn't specified.

I understand the purpose, given asynchronous does hide the exceptions, it does mean you can seamlessly swap between synchronous and asynchronous. But I imagine someone using a synchronous sender would expect the exceptions to be surfaced right away.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it, to keep consistent with the async version, we should have the optionalExceptionHandler here - but if it isn't defined rethrow the exception rather than swallow it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated: when optionalExceptionHandler is null, exceptions are now rethrown instead of swallowed. When a handler is provided, the existing behavior is preserved (caught and forwarded to the handler). Note that through the normal DogStatsdService.Configure path, the handler is always set — it defaults to Debug.WriteLine when the caller does not provide one — so in practice exceptions are always caught and logged. The rethrow is a safety net for direct construction without a handler. Added a test to verify the rethrow behavior.

}
}

public void Flush()
{
try
{
lock (_lock)
{
_statsRouter.Flush();
}
}
catch (Exception e)
{
_optionalExceptionHandler?.Invoke(e);
}
}

public void Dispose()
{
Flush();
}
}
}
4 changes: 2 additions & 2 deletions src/StatsdClient/MetricsSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ namespace StatsdClient
internal class MetricsSender
{
private readonly Telemetry _optionalTelemetry;
private readonly StatsBufferize _statsBufferize;
private readonly IStatsBufferize _statsBufferize;
private readonly bool _truncateIfTooLong;
private readonly IStopWatchFactory _stopwatchFactory;
private readonly IRandomGenerator _randomGenerator;
private readonly Cardinality? _defaultCardinality;

internal MetricsSender(
StatsBufferize statsBufferize,
IStatsBufferize statsBufferize,
IRandomGenerator randomGenerator,
IStopWatchFactory stopwatchFactory,
Telemetry optionalTelemetry,
Expand Down
20 changes: 13 additions & 7 deletions src/StatsdClient/StatsdBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ public StatsdData BuildStatsData(StatsdConfig config, Action<Exception> optional

var originDetectionEnabled = IsOriginDetectionEnabled(config);
var serializers = CreateSerializers(config.Prefix, globalTags, config.Advanced.MaxMetricsInAsyncQueue, originDetectionEnabled, config.ContainerID);
var telemetry = CreateTelemetry(serializers.MetricSerializer, config, globalTags, endPoint, transportData.Transport, optionalExceptionHandler);
var telemetry = CreateTelemetry(serializers.MetricSerializer, config, globalTags, endPoint, transportData.Transport, optionalExceptionHandler, config.SynchronousMode);
var statsBufferize = CreateStatsBufferize(
telemetry,
transportData.Transport,
transportData.BufferCapacity,
config.Advanced,
serializers,
config.ClientSideAggregation,
config.SynchronousMode,
optionalExceptionHandler);

var metricsSender = new MetricsSender(
Expand Down Expand Up @@ -179,7 +180,8 @@ private Telemetry CreateTelemetry(
string[] globalTags,
DogStatsdEndPoint dogStatsdEndPoint,
ITransport transport,
Action<Exception> optionalExceptionHandler)
Action<Exception> optionalExceptionHandler,
bool synchronousMode = false)
{
var telemetryFlush = config.Advanced.TelemetryFlushInterval;

Expand All @@ -194,7 +196,7 @@ private Telemetry CreateTelemetry(
telemetryTransport = CreateTransport(optionalTelemetryEndPoint, config);
}

return _factory.CreateTelemetry(metricSerializer, version, telemetryFlush.Value, telemetryTransport, globalTags, optionalExceptionHandler);
return _factory.CreateTelemetry(metricSerializer, version, telemetryFlush.Value, telemetryTransport, globalTags, optionalExceptionHandler, synchronousMode);
}

// Telemetry is not enabled
Expand Down Expand Up @@ -244,13 +246,14 @@ private TransportData CreateTransportData(DogStatsdEndPoint endPoint, StatsdConf
return transportData;
}

private StatsBufferize CreateStatsBufferize(
private IStatsBufferize CreateStatsBufferize(
Telemetry telemetry,
ITransport transport,
int bufferCapacity,
AdvancedStatsConfig config,
Serializers serializers,
ClientSideAggregationConfig optionalClientSideAggregationConfig,
bool synchronousMode,
Action<Exception> optionalExceptionHandler)
{
var bufferHandler = new BufferBuilderHandler(telemetry, transport);
Expand All @@ -276,14 +279,17 @@ private StatsBufferize CreateStatsBufferize(

var statsRouter = _factory.CreateStatsRouter(serializers, bufferBuilder, optionalAggregators);

var statsBufferize = _factory.CreateStatsBufferize(
if (synchronousMode)
{
return new SynchronousSender(statsRouter, optionalExceptionHandler);
}

return _factory.CreateStatsBufferize(
statsRouter,
config.MaxMetricsInAsyncQueue,
config.MaxBlockDuration,
config.DurationBeforeSendingNotFullBuffer,
optionalExceptionHandler);

return statsBufferize;
}

private ITransport CreateUDPTransport(DogStatsdEndPoint endPoint)
Expand Down
10 changes: 10 additions & 0 deletions src/StatsdClient/StatsdConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,5 +160,15 @@ public StatsdConfig()
/// This value is used when no cardinality is explicitly specified.
/// </summary>
public Cardinality? Cardinality { get; set; }

/// <summary>
/// Gets or sets a value indicating whether the client operates in synchronous mode.
/// When enabled, metrics are sent directly on the calling thread without background
/// processing. This is recommended for serverless environments (e.g., AWS Lambda)
/// where background threads may be frozen between invocations.
/// Call Flush() at the end of each invocation to ensure all buffered metrics are sent.
/// Default is false.
/// </summary>
public bool SynchronousMode { get; set; } = false;
}
}
4 changes: 2 additions & 2 deletions src/StatsdClient/StatsdData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ namespace StatsdClient
internal class StatsdData : IDisposable
{
private ITransport _transport;
private StatsBufferize _statsBufferize;
private IStatsBufferize _statsBufferize;

public StatsdData(
MetricsSender metricsSender,
StatsBufferize statsBufferize,
IStatsBufferize statsBufferize,
ITransport transport,
Telemetry telemetry)
{
Expand Down
16 changes: 10 additions & 6 deletions src/StatsdClient/Telemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public Telemetry(
TimeSpan flushInterval,
ITransport transport,
string[] globalTags,
Action<Exception> optionalExceptionHandler)
Action<Exception> optionalExceptionHandler,
bool synchronousMode = false)
{
_optionalMetricSerializer = metricSerializer;
_optionalTransport = transport;
Expand All @@ -53,11 +54,14 @@ public Telemetry(
_aggregatedContexts.Add(MetricType.Count, new ValueWithTags(_optionalTags, "metrics_type:count"));
_aggregatedContexts.Add(MetricType.Set, new ValueWithTags(_optionalTags, "metrics_type:set"));
_optionalExceptionHandler = optionalExceptionHandler;
_optionalTimer = new Timer(
_ => Flush(),
null,
flushInterval,
flushInterval);
if (!synchronousMode)
{
_optionalTimer = new Timer(
_ => Flush(),
null,
flushInterval,
flushInterval);
}
}

public static string MetricsMetricName => _telemetryPrefix + "metrics";
Expand Down
Loading
Loading