From dcd51c10c9ed8f455171c22cb9944a445e051e24 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Fri, 8 May 2026 18:16:20 -0400 Subject: [PATCH 1/6] feat(projections): expose event creation timestamp Signed-off-by: Yordis Prieto --- docs/projections.md | 1 + .../when_accessing_event_created_property.cs | 50 +++++++++++++++++++ .../Interpreted/JintProjectionStateHandler.cs | 9 +++- 3 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 src/EventStore.Projections.Core.Tests/Services/Jint/when_accessing_event_created_property.cs diff --git a/docs/projections.md b/docs/projections.md index 0846a8061..8fdae9bcb 100644 --- a/docs/projections.md +++ b/docs/projections.md @@ -281,6 +281,7 @@ handler. The event provided through the handler contains the following propertie - `partition`: string - `eventType`: string - `streamId`: string +- `created`: string (ISO 8601) | Handler | Description | Notes | |:---------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------| diff --git a/src/EventStore.Projections.Core.Tests/Services/Jint/when_accessing_event_created_property.cs b/src/EventStore.Projections.Core.Tests/Services/Jint/when_accessing_event_created_property.cs new file mode 100644 index 000000000..4f675a94c --- /dev/null +++ b/src/EventStore.Projections.Core.Tests/Services/Jint/when_accessing_event_created_property.cs @@ -0,0 +1,50 @@ +using System; +using System.Text; +using System.Text.Json; +using EventStore.Core.Data; +using NUnit.Framework; +using ResolvedEvent = EventStore.Projections.Core.Services.Processing.ResolvedEvent; + +namespace EventStore.Projections.Core.Tests.Services.Jint; + +[TestFixture] +public class when_accessing_event_created_property : specification_with_event_handled +{ + private static readonly DateTime ExpectedTimestamp = new(2023, 4, 5, 12, 34, 56, DateTimeKind.Utc); + + protected override void Given() + { + _projection = @" + fromAll().when({$any: + function(state, event) { + return { created: event.created }; + } + }); + "; + _state = @"{}"; + _handledEvent = new ResolvedEvent( + positionStreamId: "test-stream", + positionSequenceNumber: 42, + eventStreamId: "test-stream", + eventSequenceNumber: 42, + resolvedLinkTo: false, + position: new TFPos(100, 50), + eventOrLinkTargetPosition: new TFPos(100, 50), + eventId: Guid.NewGuid(), + eventType: "TestEvent", + isJson: true, + data: Encoding.UTF8.GetBytes("{}"), + metadata: Encoding.UTF8.GetBytes("{}"), + positionMetadata: Array.Empty(), + streamMetadata: null, + timestamp: ExpectedTimestamp); + } + + [Test, Category(_projectionType)] + public void exposes_the_event_timestamp_as_an_iso_8601_string() + { + using var state = JsonDocument.Parse(_newState); + + Assert.AreEqual(ExpectedTimestamp.ToString("o"), state.RootElement.GetProperty("created").GetString()); + } +} diff --git a/src/EventStore.Projections.Core/Services/Interpreted/JintProjectionStateHandler.cs b/src/EventStore.Projections.Core/Services/Interpreted/JintProjectionStateHandler.cs index 155ace4f2..e08616e9b 100644 --- a/src/EventStore.Projections.Core/Services/Interpreted/JintProjectionStateHandler.cs +++ b/src/EventStore.Projections.Core/Services/Interpreted/JintProjectionStateHandler.cs @@ -341,7 +341,8 @@ private static string EnsureNonNullStringValue(JsValue parameter, string paramet JsString s => formatForRaw ? $"\"{s.AsString()}\"" : s.AsString(), JsNumber n => n.AsNumber().ToString(CultureInfo.InvariantCulture), JsNull => null, - JsUndefined => null, { } v => Serialize(value), + JsUndefined => null, + { } => Serialize(value), _ => null }; } @@ -969,6 +970,7 @@ EventEnvelope CreateEnvelope(string partition, ResolvedEvent @event, string cate { var envelope = new EventEnvelope(_engine, _parser, this); envelope.Partition = partition; + envelope.Created = @event.Timestamp; envelope.BodyRaw = @event.Data; envelope.MetadataRaw = @event.Metadata; envelope.StreamId = @event.EventStreamId; @@ -1119,6 +1121,11 @@ public string Category set => SetOwnProperty("category", new PropertyDescriptor(value, false, true, false)); } + public DateTime Created + { + set => SetOwnProperty("created", new PropertyDescriptor(value.ToString("o"), false, true, false)); + } + public string EventId { set => SetOwnProperty("eventId", new PropertyDescriptor(value, false, true, false)); From 3e818b1d647552ad926eb02e7a36de1cf88b3a26 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Sat, 9 May 2026 12:12:46 -0400 Subject: [PATCH 2/6] fix(projections): qualify event created timestamps Signed-off-by: Yordis Prieto --- .../when_accessing_event_created_property.cs | 16 +++++++++++++--- .../Interpreted/JintProjectionStateHandler.cs | 11 ++++++++++- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/EventStore.Projections.Core.Tests/Services/Jint/when_accessing_event_created_property.cs b/src/EventStore.Projections.Core.Tests/Services/Jint/when_accessing_event_created_property.cs index 4f675a94c..7edab9d8e 100644 --- a/src/EventStore.Projections.Core.Tests/Services/Jint/when_accessing_event_created_property.cs +++ b/src/EventStore.Projections.Core.Tests/Services/Jint/when_accessing_event_created_property.cs @@ -7,10 +7,18 @@ namespace EventStore.Projections.Core.Tests.Services.Jint; -[TestFixture] +[TestFixture(DateTimeKind.Utc)] +[TestFixture(DateTimeKind.Unspecified)] public class when_accessing_event_created_property : specification_with_event_handled { - private static readonly DateTime ExpectedTimestamp = new(2023, 4, 5, 12, 34, 56, DateTimeKind.Utc); + private readonly DateTimeKind _timestampKind; + + public when_accessing_event_created_property(DateTimeKind timestampKind) + { + _timestampKind = timestampKind; + } + + private DateTime ExpectedTimestamp => new(2023, 4, 5, 12, 34, 56, _timestampKind); protected override void Given() { @@ -45,6 +53,8 @@ public void exposes_the_event_timestamp_as_an_iso_8601_string() { using var state = JsonDocument.Parse(_newState); - Assert.AreEqual(ExpectedTimestamp.ToString("o"), state.RootElement.GetProperty("created").GetString()); + var expectedTimestamp = DateTime.SpecifyKind(ExpectedTimestamp, DateTimeKind.Utc).ToString("o"); + + Assert.That(state.RootElement.GetProperty("created").GetString(), Is.EqualTo(expectedTimestamp)); } } diff --git a/src/EventStore.Projections.Core/Services/Interpreted/JintProjectionStateHandler.cs b/src/EventStore.Projections.Core/Services/Interpreted/JintProjectionStateHandler.cs index e08616e9b..01e3c75d7 100644 --- a/src/EventStore.Projections.Core/Services/Interpreted/JintProjectionStateHandler.cs +++ b/src/EventStore.Projections.Core/Services/Interpreted/JintProjectionStateHandler.cs @@ -1123,7 +1123,7 @@ public string Category public DateTime Created { - set => SetOwnProperty("created", new PropertyDescriptor(value.ToString("o"), false, true, false)); + set => SetOwnProperty("created", new PropertyDescriptor(FormatCreated(value), false, true, false)); } public string EventId @@ -1137,6 +1137,15 @@ public EventEnvelope(Engine engine, JsonParser parser, JintProjectionStateHandle _parent = parent; } + private static string FormatCreated(DateTime value) + { + var created = value.Kind == DateTimeKind.Unspecified + ? DateTime.SpecifyKind(value, DateTimeKind.Utc) + : value.ToUniversalTime(); + + return created.ToString("o"); + } + public override JsValue Get(JsValue property, JsValue receiver) { if (property == "body" || property == "data") From 307396fc6853eeef7bc3eb9ba071c7b6b225b295 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Sat, 9 May 2026 12:57:13 -0400 Subject: [PATCH 3/6] fix(tests): wait for replication subscription topology Signed-off-by: Yordis Prieto --- ...er_and_event_is_replicated_to_followers.cs | 47 +++++++------------ 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/src/EventStore.Core.Tests/Services/Replication/Subscriptions/when_subscribed_to_stream_on_leader_and_event_is_replicated_to_followers.cs b/src/EventStore.Core.Tests/Services/Replication/Subscriptions/when_subscribed_to_stream_on_leader_and_event_is_replicated_to_followers.cs index 6d3d10042..59c0cba11 100644 --- a/src/EventStore.Core.Tests/Services/Replication/Subscriptions/when_subscribed_to_stream_on_leader_and_event_is_replicated_to_followers.cs +++ b/src/EventStore.Core.Tests/Services/Replication/Subscriptions/when_subscribed_to_stream_on_leader_and_event_is_replicated_to_followers.cs @@ -3,9 +3,9 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using EventStore.Core.Bus; using EventStore.Core.Data; using EventStore.Core.Messages; +using EventStore.Core.Tests.Helpers; using EventStore.Core.Tests.Integration; using NUnit.Framework; @@ -16,37 +16,24 @@ namespace EventStore.Core.Tests.Replication.ReadStream; public class when_subscribed_to_stream_on_leader_and_event_is_replicated_to_followers : specification_with_cluster { private const string _streamId = "test-stream"; - private CountdownEvent _expectedNumberOfRoleAssignments; + private static readonly TimeSpan _topologyTimeout = TimeSpan.FromSeconds(90); + private static readonly TimeSpan _subscriptionTimeout = TimeSpan.FromSeconds(30); private CountdownEvent _subscriptionsConfirmed; private TestSubscription _leaderSubscription; private List> _followerSubscriptions; - private TimeSpan _timeout = TimeSpan.FromSeconds(5); - - protected override void BeforeNodesStart() - { - _nodes.ToList().ForEach(x => - x.Node.MainBus.Subscribe(new AdHocHandler(Handle))); - _expectedNumberOfRoleAssignments = new CountdownEvent(3); - base.BeforeNodesStart(); - } - - private void Handle(SystemMessage.StateChangeMessage msg) - { - switch (msg.State) - { - case Data.VNodeState.Leader: - _expectedNumberOfRoleAssignments.Signal(); - break; - case Data.VNodeState.Follower: - _expectedNumberOfRoleAssignments.Signal(); - break; - } - } - protected override async Task Given() { - _expectedNumberOfRoleAssignments.Wait(5000); + AssertEx.IsOrBecomesTrue( + () => + { + var states = _nodes.Select(x => x.NodeState).ToArray(); + return states.Count(x => x == VNodeState.Leader) == 1 && + states.Count(x => x == VNodeState.Follower) == 2; + }, + _topologyTimeout, + "Timed out waiting for one leader and two followers", + MiniNodeLogging.WriteLogs); var leader = GetLeader(); Assert.IsNotNull(leader, "Could not get leader node"); @@ -54,12 +41,14 @@ protected override async Task Given() // Set the checkpoint so the check is not skipped leader.Db.Config.ReplicationCheckpoint.Write(0); - _subscriptionsConfirmed = new CountdownEvent(3); + var followers = GetFollowers(); + Assert.AreEqual(2, followers.Length, "Expected two follower nodes"); + + _subscriptionsConfirmed = new CountdownEvent(1 + followers.Length); _leaderSubscription = new TestSubscription(leader, 1, _streamId, _subscriptionsConfirmed); _leaderSubscription.CreateSubscription(); _followerSubscriptions = new List>(); - var followers = GetFollowers(); foreach (var s in followers) { var followerSubscription = new TestSubscription(s, 1, _streamId, _subscriptionsConfirmed); @@ -67,7 +56,7 @@ protected override async Task Given() followerSubscription.CreateSubscription(); } - if (!_subscriptionsConfirmed.Wait(_timeout)) + if (!_subscriptionsConfirmed.Wait(_subscriptionTimeout)) { Assert.Fail($"Timed out waiting for subscriptions to confirm, confirmed {_subscriptionsConfirmed.CurrentCount} need {_subscriptionsConfirmed.InitialCount}."); } From bb4018f04e821deab9840708af1913ffddcb4d25 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Sat, 9 May 2026 13:40:21 -0400 Subject: [PATCH 4/6] fix(tests): stabilize cluster restart convergence Signed-off-by: Yordis Prieto --- .../Integration/when_cluster_nodes_are_restarted.cs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs b/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs index 70fe386a5..574798481 100644 --- a/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs +++ b/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs @@ -14,8 +14,8 @@ public class when_restarting_one_node_at_a_time : specifi { private static readonly bool IsArm64 = RuntimeInformation.ProcessArchitecture == Architecture.Arm64; private static readonly TimeSpan InitialStabilizationTimeout = TimeSpan.FromMinutes(IsArm64 ? 8 : 5); - private static readonly TimeSpan RestartTimeout = TimeSpan.FromMinutes(IsArm64 ? 5 : 3); - protected override TimeSpan GivenTimeout { get; } = TimeSpan.FromMinutes(IsArm64 ? 20 : 10); + private static readonly TimeSpan RestartTimeout = TimeSpan.FromMinutes(IsArm64 ? 8 : 5); + protected override TimeSpan GivenTimeout { get; } = TimeSpan.FromMinutes(IsArm64 ? 30 : 20); protected override async Task Given() { @@ -56,8 +56,6 @@ protected override async Task Given() node.Start(); _nodes[restartedNodeIndex] = node; - await Task.WhenAll(_nodes.Select(x => x.Started)) - .WithTimeout(RestartTimeout, MiniNodeLogging.WriteLogs); AssertEx.IsOrBecomesTrue( () => { From c405e24cd7a558ed4e6c2dd5238a6d3fedfe8b41 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Sat, 9 May 2026 14:24:12 -0400 Subject: [PATCH 5/6] fix(tests): extend arm cluster restart headroom Signed-off-by: Yordis Prieto --- .../Integration/when_cluster_nodes_are_restarted.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs b/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs index 574798481..02dea18d1 100644 --- a/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs +++ b/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs @@ -13,8 +13,8 @@ namespace EventStore.Core.Tests.Integration; public class when_restarting_one_node_at_a_time : specification_with_cluster { private static readonly bool IsArm64 = RuntimeInformation.ProcessArchitecture == Architecture.Arm64; - private static readonly TimeSpan InitialStabilizationTimeout = TimeSpan.FromMinutes(IsArm64 ? 8 : 5); - private static readonly TimeSpan RestartTimeout = TimeSpan.FromMinutes(IsArm64 ? 8 : 5); + private static readonly TimeSpan InitialStabilizationTimeout = TimeSpan.FromMinutes(IsArm64 ? 15 : 5); + private static readonly TimeSpan RestartTimeout = TimeSpan.FromMinutes(IsArm64 ? 10 : 5); protected override TimeSpan GivenTimeout { get; } = TimeSpan.FromMinutes(IsArm64 ? 30 : 20); protected override async Task Given() From 13c80fe9e5c6b90e514f7f2f2e867d6b9552d82a Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Sat, 9 May 2026 15:02:37 -0400 Subject: [PATCH 6/6] fix(tests): make cluster restart order deterministic Signed-off-by: Yordis Prieto --- .../when_cluster_nodes_are_restarted.cs | 40 +++++++++++++++---- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs b/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs index 02dea18d1..966adeed2 100644 --- a/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs +++ b/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs @@ -1,5 +1,6 @@ using System; using System.Linq; +using System.Net; using System.Runtime.InteropServices; using System.Threading.Tasks; using EventStore.Core.Data; @@ -12,19 +13,19 @@ namespace EventStore.Core.Tests.Integration; [NonParallelizable] public class when_restarting_one_node_at_a_time : specification_with_cluster { + private const int RestartCount = 3; private static readonly bool IsArm64 = RuntimeInformation.ProcessArchitecture == Architecture.Arm64; private static readonly TimeSpan InitialStabilizationTimeout = TimeSpan.FromMinutes(IsArm64 ? 15 : 5); - private static readonly TimeSpan RestartTimeout = TimeSpan.FromMinutes(IsArm64 ? 10 : 5); + private static readonly TimeSpan RestartTimeout = TimeSpan.FromMinutes(10); protected override TimeSpan GivenTimeout { get; } = TimeSpan.FromMinutes(IsArm64 ? 30 : 20); protected override async Task Given() { await base.Given(); - for (int i = 0; i < 9; i++) + var restartedNodes = new bool[RestartCount]; + for (int i = 0; i < RestartCount; i++) { - var restartedNodeIndex = i % 3; - AssertEx.IsOrBecomesTrue( () => { @@ -33,9 +34,12 @@ protected override async Task Given() states.Count(x => x == VNodeState.Follower) == 2; }, i == 0 ? InitialStabilizationTimeout : RestartTimeout, - $"Cluster did not stabilize before restarting node {restartedNodeIndex}", + $"Cluster did not stabilize before restart iteration {i + 1}", MiniNodeLogging.WriteLogs); + var restartedNodeIndex = SelectRestartNode(restartedNodes, restartLeader: i == RestartCount - 1); + restartedNodes[restartedNodeIndex] = true; + await _nodes[restartedNodeIndex].Shutdown(keepDb: true); AssertEx.IsOrBecomesTrue( () => @@ -51,8 +55,7 @@ protected override async Task Given() $"Remaining cluster did not stabilize after shutting down node {restartedNodeIndex}", MiniNodeLogging.WriteLogs); - var node = CreateNode(restartedNodeIndex, _nodeEndpoints[restartedNodeIndex], - new[] { _nodeEndpoints[(i + 1) % 3].HttpEndPoint, _nodeEndpoints[(i + 2) % 3].HttpEndPoint }); + var node = CreateNode(restartedNodeIndex, _nodeEndpoints[restartedNodeIndex], GossipSeedsFor(restartedNodeIndex)); node.Start(); _nodes[restartedNodeIndex] = node; @@ -69,6 +72,29 @@ protected override async Task Given() } } + private int SelectRestartNode(bool[] restartedNodes, bool restartLeader) + { + var targetState = restartLeader ? VNodeState.Leader : VNodeState.Follower; + var candidate = _nodes + .Select((node, index) => (node, index)) + .FirstOrDefault(x => !restartedNodes[x.index] && x.node.NodeState == targetState); + + if (candidate.node is not null) + return candidate.index; + + var fallbackIndex = Array.FindIndex(restartedNodes, restarted => !restarted); + if (fallbackIndex >= 0) + return fallbackIndex; + + throw new InvalidOperationException("All cluster nodes have already been restarted."); + } + + private EndPoint[] GossipSeedsFor(int restartedNodeIndex) => + _nodeEndpoints + .Where((_, index) => index != restartedNodeIndex) + .Select(x => (EndPoint)x.HttpEndPoint) + .ToArray(); + [Test] public void cluster_should_stabilize() {