Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions src/LoadPerformanceTest/Core/InstrumentDataPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using LoadPerformanceTest.Logging;
using LoadPerformanceTest.Logging;
using LoadPerformanceTest.Models;
using LoadPerformanceTest.Services.EventHub;
using LoadPerformanceTest.Utilities;
Expand Down Expand Up @@ -71,7 +71,7 @@ public async Task PublishContinuouslyAsync(List<(string fileName, InstrumentData
break;
}
}
Thread.Sleep(100);
//Thread.Sleep(100);
}
Comment on lines +74 to 75

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep this

Comment on lines 71 to 75
});

Expand Down Expand Up @@ -147,6 +147,7 @@ private async Task PublishDataTypeContinuouslyAsync(

var publishCount = 0;
var startTime = DateTime.UtcNow;
var nextFireTime = DateTime.UtcNow;
var intervalSeconds = intervalMs / 1000.0;

Logger.LogInfo($"Started continuous publishing for {dataType} with {intervalSeconds} second(s) interval.");
Expand All @@ -155,32 +156,35 @@ private async Task PublishDataTypeContinuouslyAsync(
{
while (!cancellationToken.IsCancellationRequested)
{
var updatedDataList = InstrumentDataBuilder.GenerateInstrumentDataFromInventory(jsonTemplate, _context.Tenants, dataType, _context.InstrumentManifests);

if (updatedDataList.Count > 0)
{
int successCount = 0, failCount = 0;
// Wait until next scheduled fire time
var delay = nextFireTime - DateTime.UtcNow;
if (delay > TimeSpan.Zero)
await Task.Delay(delay, cancellationToken);

// Skip missed cycles if publishing took longer than the interval
var now = DateTime.UtcNow;
while (nextFireTime <= now)
nextFireTime = nextFireTime.AddMilliseconds(intervalMs);

var updatedDataList = InstrumentDataBuilder.GenerateInstrumentDataFromInventory(jsonTemplate, _context.Tenants, dataType, _context.InstrumentManifests);

foreach (var (updatedJson, tenantId) in updatedDataList)
if (updatedDataList.Count > 0)
{
try
{
await eventHubPublisher.PublishAsync(updatedJson, Path.GetFileNameWithoutExtension(fileName), tenantId, cancellationToken);
successCount++;
}
catch (Exception ex) when (!(ex is OperationCanceledException))
{
failCount++;
Logger.LogError($"Failed to publish {dataType} for instrument: {ex.Message}", ex);
}
var eventTuples = updatedDataList.Select(d => (d.json, d.tenantId));
var (successCount, failCount) = await eventHubPublisher.PublishBulkAsync(
eventTuples,
eventType: Path.GetFileNameWithoutExtension(fileName),
chunkSize: 50,
maxParallelism: 9,
cancellationToken: cancellationToken);

Comment on lines +175 to +181
publishCount++;
var elapsed = DateTime.UtcNow - startTime;
var lag = (DateTime.UtcNow - nextFireTime.AddMilliseconds(-intervalMs)).TotalSeconds;
Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] {dataType}: Published batch #{publishCount} - {successCount} succeeded, {failCount} failed (Running: {elapsed:hh\\:mm\\:ss}, Lag: {lag:F1}s)");
}

publishCount++;
var elapsed = DateTime.UtcNow - startTime;
Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] {dataType}: Published batch #{publishCount} - {successCount} succeeded, {failCount} failed (Running: {elapsed:hh\\:mm\\:ss})");
}

await Task.Delay(intervalMs, cancellationToken);
}
}
catch (OperationCanceledException)
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

49 changes: 49 additions & 0 deletions src/LoadPerformanceTest/Services/EventHub/EventHubPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,55 @@ public async Task PublishAsync(string jsonData, string eventType, string tenantI
throw new InvalidOperationException("Event is too large for the batch.");

await _client.SendAsync(batch, cancellationToken);
}

/// <summary>
/// Publishes multiple events in parallel batches to Event Hub.
/// </summary>
public async Task<(int success, int failed)> PublishBulkAsync(
IEnumerable<(string JsonData, string TenantId)> events,
string eventType,
int chunkSize = 50,
int maxParallelism = 9,
CancellationToken cancellationToken = default)
{
var eventDataList = events.Select(e =>
{
var data = new EventData(Encoding.UTF8.GetBytes(e.JsonData));
data.Properties.Add("tenantId", e.TenantId);
data.Properties.Add("eventType", eventType);
return data;
Comment on lines +49 to +52
}).ToList();

var chunks = eventDataList.Chunk(chunkSize);
int success = 0, failed = 0;
var lockObj = new object();

await Parallel.ForEachAsync(chunks, new ParallelOptions
{
MaxDegreeOfParallelism = maxParallelism,
CancellationToken = cancellationToken
},
async (chunk, ct) =>
{
try
{
using EventDataBatch batch = await _client.CreateBatchAsync(ct);
foreach (var eventData in chunk)
{
if (!batch.TryAdd(eventData))
throw new InvalidOperationException("Event is too large for the batch.");
}
await _client.SendAsync(batch, ct);
Comment on lines +68 to +74
lock (lockObj) { success += chunk.Length; }
Comment on lines +68 to +75
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
lock (lockObj) { failed += chunk.Length; }
}
Comment on lines +66 to +80

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If chunk size is not increased then it should work fine

Comment on lines +66 to +80
Comment on lines +66 to +80
Comment on lines +77 to +80
Comment on lines +77 to +80
});

return (success, failed);
}


Expand Down
6 changes: 3 additions & 3 deletions src/LoadPerformanceTest/Utilities/InstrumentDataBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private static void SetDataTypeSpecificFields(InstrumentData instrumentData, Ins
Value = 2, // Set a dummy value or use a default/test value
DecimalPrecision = def.Attributes?.DisplayDecimalPoints ?? 2,
UnitId = def.UnitTypeId ?? "30d9f576-a6d2-4439-9907-7e147af64508", // Provide default if null
TimestampUtc = DateTime.UtcNow.ToClarosDateTime(),
TimestampUtc = timestamp,
};
instrumentData.InstrumentMeasurementDatas.Items.Add(
new InstrumentMeasurementData
Expand Down Expand Up @@ -138,7 +138,7 @@ private static void SetDataTypeSpecificFields(InstrumentData instrumentData, Ins
Value = 2, // Set a dummy value or use a default/test value
DecimalPrecision = def.Attributes?.DisplayDecimalPoints ?? 2,
UnitId = "30d9f576-a6d2-4439-9907-7e147af64508",
TimestampUtc = DateTime.UtcNow.ToClarosDateTime(),
TimestampUtc = timestamp,
};
instrumentData.InstrumentMeasurementDatas.Items.Add(
new InstrumentMeasurementData
Expand Down Expand Up @@ -206,7 +206,7 @@ public static ClarosJsonTicksDateTime ToClarosDateTime(this DateTime dateTime)
{
return new ClarosJsonTicksDateTime
{
JsonDateTime = dateTime.ToUtc().ToString("O")
Ticks = (ulong)dateTime.ToUtc().Ticks
};
Comment on lines 207 to 210
}

Expand Down