Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 235 additions & 16 deletions Nimbus.sln

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions src/Nimbus.Benchmark/BenchmarkCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Nimbus.MessageContracts;

namespace Nimbus.Benchmark;

public class BenchmarkCommand : IBusCommand
{
public long SentAtTicks { get; set; }
public int SequenceNumber { get; set; }
}
12 changes: 12 additions & 0 deletions src/Nimbus.Benchmark/BenchmarkCommandHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Nimbus.InfrastructureContracts.Handlers;

namespace Nimbus.Benchmark;

public class BenchmarkCommandHandler(BenchmarkState state) : IHandleCommand<BenchmarkCommand>
{
public Task Handle(BenchmarkCommand busCommand)
{
state.RecordReceived(busCommand.SentAtTicks, busCommand.SequenceNumber);
return Task.CompletedTask;
}
}
28 changes: 28 additions & 0 deletions src/Nimbus.Benchmark/BenchmarkState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Diagnostics;

namespace Nimbus.Benchmark;

public class BenchmarkState
{
private readonly long[] _latencyTicks;
private readonly CountdownEvent _countdown;
private long _lastReceiveTick;

public BenchmarkState(int count)
{
_latencyTicks = new long[count];
_countdown = new CountdownEvent(count);
}

public void RecordReceived(long sentAtTicks, int sequenceNumber)
{
var now = Stopwatch.GetTimestamp();
_latencyTicks[sequenceNumber] = now - sentAtTicks;
Interlocked.Exchange(ref _lastReceiveTick, now);
_countdown.Signal();
}

public bool WaitForCompletion(TimeSpan timeout) => _countdown.Wait(timeout);

public (long lastReceiveTick, long[] latencyTicks) GetData() => (_lastReceiveTick, _latencyTicks);
}
30 changes: 30 additions & 0 deletions src/Nimbus.Benchmark/Nimbus.Benchmark.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net9.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Nimbus\Nimbus.csproj" />
<ProjectReference Include="..\Nimbus.Containers.Autofac\Nimbus.Containers.Autofac.csproj" />
<ProjectReference Include="..\Nimbus.InfrastructureContracts\Nimbus.InfrastructureContracts.csproj" />
<ProjectReference Include="..\Nimbus.Logger.Serilog\Nimbus.Logger.Serilog.csproj" />
<ProjectReference Include="..\Nimbus.Serializers.Json\Nimbus.Serializers.Json.csproj" />
<ProjectReference Include="..\Nimbus.Transports.InProcess\Nimbus.Transports.InProcess.csproj" />
<ProjectReference Include="..\Nimbus.Transports.Redis\Nimbus.Transports.Redis.csproj" />
<ProjectReference Include="..\Nimbus.Transports.Amqp\Nimbus.Transports.AMQP.csproj" />
<ProjectReference Include="..\Nimbus.Transports.SqlServer\Nimbus.Transports.SqlServer.csproj" />
<ProjectReference Include="..\Nimbus.Transports.Postgres\Nimbus.Transports.Postgres.csproj" />
<ProjectReference Include="..\Nimbus.Transports.Nats\Nimbus.Transports.Nats.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Autofac" Version="4.9.4" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="Serilog.Sinks.Seq" Version="4.0.0" />
</ItemGroup>

</Project>
120 changes: 120 additions & 0 deletions src/Nimbus.Benchmark/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
using System.Diagnostics;
using Autofac;
using Nimbus;
using Nimbus.Benchmark;
using Nimbus.Configuration;
using Nimbus.Configuration.Transport;
using Nimbus.Infrastructure;
using Nimbus.InfrastructureContracts;
using Nimbus.Logger.Serilog.Configuration;
using Nimbus.Serializers.Json.Configuration;
using Serilog;

var count = 1000;
var transportName = "InProcess";

for (var i = 0; i < args.Length; i++)
{
if (args[i] == "--count" && i + 1 < args.Length) count = int.Parse(args[++i]);
else if (args[i] == "--transport" && i + 1 < args.Length) transportName = args[++i];
}

if (args.Contains("--help") || args.Contains("-h"))
{
Console.WriteLine("Usage: Nimbus.Benchmark [--transport <name>] [--count <n>]");
Console.WriteLine($" Transports: {string.Join(", ", TransportFactory.ValidNames)}");
Console.WriteLine($" Default : --transport InProcess --count 1000");
return 0;
}

Log.Logger = new LoggerConfiguration()
.MinimumLevel.Warning()
.WriteTo.Console()
.CreateLogger();

Console.WriteLine("=== Nimbus Benchmark ===");
Console.WriteLine($"Transport : {transportName}");
Console.WriteLine($"Messages : {count:N0}");
Console.WriteLine();

TransportConfiguration transportConfig;
try
{
transportConfig = TransportFactory.Create(transportName);
}
catch (ArgumentException ex)
{
Console.Error.WriteLine(ex.Message);
return 1;
}

var benchmarkState = new BenchmarkState(count);

var containerBuilder = new ContainerBuilder();
var typeProvider = new AssemblyScanningTypeProvider(typeof(Program).Assembly);
containerBuilder.RegisterNimbus(typeProvider);
containerBuilder.RegisterInstance(benchmarkState).AsSelf().SingleInstance();

containerBuilder.Register(ctx => new BusBuilder()
.Configure()
.WithTransport(transportConfig)
.WithNames("NimbusBenchmark", Environment.MachineName)
.WithAutofacDefaults(ctx)
.WithSerilogLogger()
.WithJsonSerializer()
.Build())
.As<IBus>()
.SingleInstance();

using var container = containerBuilder.Build();
var bus = (Bus)container.Resolve<IBus>();

Console.Write("Starting bus...");
var busReady = new TaskCompletionSource();
bus.Started += (_, _) => busReady.TrySetResult();
await bus.Start();
await busReady.Task;
await Task.Delay(500);
Console.WriteLine(" ready.");
Console.WriteLine();

Console.Write($"Sending {count:N0} messages... ");
var sendStart = Stopwatch.GetTimestamp();
for (var i = 0; i < count; i++)
{
await bus.Send(new BenchmarkCommand
{
SentAtTicks = Stopwatch.GetTimestamp(),
SequenceNumber = i
});
}
var sendEnd = Stopwatch.GetTimestamp();
var sendElapsedSec = (sendEnd - sendStart) / (double)Stopwatch.Frequency;
Console.WriteLine($"done in {sendElapsedSec * 1000:F0}ms ({count / sendElapsedSec:F0} msg/s send rate)");

Console.Write("Waiting for all messages... ");
var completed = benchmarkState.WaitForCompletion(TimeSpan.FromMinutes(10));
Console.WriteLine(completed ? "done." : "TIMED OUT!");
Console.WriteLine();

if (completed)
{
var (lastReceiveTick, latencyTicks) = benchmarkState.GetData();
var totalElapsedSec = (lastReceiveTick - sendStart) / (double)Stopwatch.Frequency;
var throughput = count / totalElapsedSec;

var sortedMs = latencyTicks
.Select(t => t / (double)Stopwatch.Frequency * 1000.0)
.OrderBy(x => x)
.ToArray();

double P(double pct) => sortedMs[(int)(sortedMs.Length * pct / 100.0)];

Console.WriteLine("--- Results ---");
Console.WriteLine($" Total time : {totalElapsedSec * 1000:F0}ms");
Console.WriteLine($" Throughput : {throughput:F0} msg/s");
Console.WriteLine($" Latency (ms) : min={sortedMs[0]:F2} p50={P(50):F2} p95={P(95):F2} p99={P(99):F2} max={sortedMs[^1]:F2}");
}

await bus.Stop();
return 0;
51 changes: 51 additions & 0 deletions src/Nimbus.Benchmark/TransportFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Nimbus.Configuration.Transport;
using Nimbus.Transports.AMQP;
using Nimbus.Transports.InProcess;
using Nimbus.Transports.Nats;
using Nimbus.Transports.Postgres;
using Nimbus.Transports.Redis;
using Nimbus.Transports.SqlServer;

namespace Nimbus.Benchmark;

public static class TransportFactory
{
public static readonly string[] ValidNames =
[
"InProcess", "Redis", "Nats", "NatsJetStream", "Amqp", "SqlServer", "Postgres"
];

public static TransportConfiguration Create(string name) => name.ToLowerInvariant() switch
{
"inprocess" => new InProcessTransportConfiguration(),

"redis" => new RedisTransportConfiguration()
.WithConnectionString("localhost"),

"nats" => new NatsTransportConfiguration()
.WithUrl("nats://localhost:4222")
.WithCredentials("admin", "password"),

"natsjetstream" => new NatsTransportConfiguration()
.WithUrl("nats://localhost:4222")
.WithCredentials("admin", "password")
.WithJetStream(),

"amqp" => new AMQPTransportConfiguration()
.WithBrokerUri("amqp://localhost:5672")
.WithCredentials("admin", "admin"),

"sqlserver" => new SqlServerTransportConfiguration()
.WithConnectionString("Server=localhost,1433;Database=Nimbus;User Id=sa;Password=Nimbus_Dev_123!;TrustServerCertificate=true;")
.WithPollInterval(TimeSpan.FromMilliseconds(50))
.WithAutoCreateSchema(),

"postgres" => new PostgresTransportConfiguration()
.WithConnectionString("Host=localhost;Port=5432;Database=nimbus;Username=nimbus;Password=Nimbus_Dev_123!")
.WithPollInterval(TimeSpan.FromMilliseconds(50))
.WithAutoCreateSchema(),

_ => throw new ArgumentException(
$"Unknown transport '{name}'. Valid options: {string.Join(", ", ValidNames)}")
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static class TransportSelector

private static TestTransport LoadFromEnvironment()
{
var envVar = Environment.GetEnvironmentVariable("NIMBUS_TEST_TRANSPORT") ?? "InProcess";
var envVar = Environment.GetEnvironmentVariable("NIMBUS_TEST_TRANSPORT") ?? "InMemory";

if (Enum.TryParse<TestTransport>(envVar, ignoreCase: true, out var transport))
return transport;
Expand Down
23 changes: 19 additions & 4 deletions src/Nimbus.Transports.Amqp/AMQPTransport.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Nimbus.Configuration.PoorMansIocContainer;
using Nimbus.Infrastructure;
Expand All @@ -16,6 +17,8 @@ internal class AMQPTransport : INimbusTransport, IDisposable
private readonly PoorMansIoC _container;
private readonly ILogger _logger;
private readonly NmsConnectionManager _connectionManager;
private readonly ConcurrentDictionary<string, AMQPQueueSender> _queueSenders = new();
private readonly ConcurrentDictionary<string, AMQPTopicSender> _topicSenders = new();
private bool _disposed;

public AMQPTransport(PoorMansIoC container, ILogger logger, NmsConnectionManager connectionManager)
Expand All @@ -34,8 +37,11 @@ public async Task TestConnection()

public INimbusMessageSender GetQueueSender(string queuePath)
{
_logger.Debug("Creating queue sender for {QueuePath}", queuePath);
return _container.ResolveWithOverrides<AMQPQueueSender>(queuePath);
return _queueSenders.GetOrAdd(queuePath, path =>
{
_logger.Debug("Creating queue sender for {QueuePath}", path);
return _container.ResolveWithOverrides<AMQPQueueSender>(path);
});
}

public INimbusMessageReceiver GetQueueReceiver(string queuePath)
Expand All @@ -46,8 +52,11 @@ public INimbusMessageReceiver GetQueueReceiver(string queuePath)

public INimbusMessageSender GetTopicSender(string topicPath)
{
_logger.Debug("Creating topic sender for {TopicPath}", topicPath);
return _container.ResolveWithOverrides<AMQPTopicSender>(topicPath);
return _topicSenders.GetOrAdd(topicPath, path =>
{
_logger.Debug("Creating topic sender for {TopicPath}", path);
return _container.ResolveWithOverrides<AMQPTopicSender>(path);
});
}

public INimbusMessageReceiver GetTopicReceiver(string topicPath, string subscriptionName, IFilterCondition filter)
Expand All @@ -65,6 +74,12 @@ public void Dispose()

_logger.Info("Disposing AMQP transport");

foreach (var sender in _queueSenders.Values)
try { sender.Dispose(); } catch { /* ignore */ }

foreach (var sender in _topicSenders.Values)
try { sender.Dispose(); } catch { /* ignore */ }

try
{
_connectionManager?.Dispose();
Expand Down
Loading
Loading