diff --git a/docs/projections.md b/docs/projections.md index 0846a8061..8fdae9bcb 100644 --- a/docs/projections.md +++ b/docs/projections.md @@ -281,6 +281,7 @@ handler. The event provided through the handler contains the following propertie - `partition`: string - `eventType`: string - `streamId`: string +- `created`: string (ISO 8601) | Handler | Description | Notes | |:---------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------| diff --git a/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs b/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs index 70fe386a5..966adeed2 100644 --- a/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs +++ b/src/EventStore.Core.Tests/Integration/when_cluster_nodes_are_restarted.cs @@ -1,5 +1,6 @@ using System; using System.Linq; +using System.Net; using System.Runtime.InteropServices; using System.Threading.Tasks; using EventStore.Core.Data; @@ -12,19 +13,19 @@ namespace EventStore.Core.Tests.Integration; [NonParallelizable] public class when_restarting_one_node_at_a_time : specification_with_cluster { + private const int RestartCount = 3; private static readonly bool IsArm64 = RuntimeInformation.ProcessArchitecture == Architecture.Arm64; - private static readonly TimeSpan InitialStabilizationTimeout = TimeSpan.FromMinutes(IsArm64 ? 8 : 5); - private static readonly TimeSpan RestartTimeout = TimeSpan.FromMinutes(IsArm64 ? 5 : 3); - protected override TimeSpan GivenTimeout { get; } = TimeSpan.FromMinutes(IsArm64 ? 20 : 10); + private static readonly TimeSpan InitialStabilizationTimeout = TimeSpan.FromMinutes(IsArm64 ? 15 : 5); + private static readonly TimeSpan RestartTimeout = TimeSpan.FromMinutes(10); + protected override TimeSpan GivenTimeout { get; } = TimeSpan.FromMinutes(IsArm64 ? 30 : 20); protected override async Task Given() { await base.Given(); - for (int i = 0; i < 9; i++) + var restartedNodes = new bool[RestartCount]; + for (int i = 0; i < RestartCount; i++) { - var restartedNodeIndex = i % 3; - AssertEx.IsOrBecomesTrue( () => { @@ -33,9 +34,12 @@ protected override async Task Given() states.Count(x => x == VNodeState.Follower) == 2; }, i == 0 ? InitialStabilizationTimeout : RestartTimeout, - $"Cluster did not stabilize before restarting node {restartedNodeIndex}", + $"Cluster did not stabilize before restart iteration {i + 1}", MiniNodeLogging.WriteLogs); + var restartedNodeIndex = SelectRestartNode(restartedNodes, restartLeader: i == RestartCount - 1); + restartedNodes[restartedNodeIndex] = true; + await _nodes[restartedNodeIndex].Shutdown(keepDb: true); AssertEx.IsOrBecomesTrue( () => @@ -51,13 +55,10 @@ protected override async Task Given() $"Remaining cluster did not stabilize after shutting down node {restartedNodeIndex}", MiniNodeLogging.WriteLogs); - var node = CreateNode(restartedNodeIndex, _nodeEndpoints[restartedNodeIndex], - new[] { _nodeEndpoints[(i + 1) % 3].HttpEndPoint, _nodeEndpoints[(i + 2) % 3].HttpEndPoint }); + var node = CreateNode(restartedNodeIndex, _nodeEndpoints[restartedNodeIndex], GossipSeedsFor(restartedNodeIndex)); node.Start(); _nodes[restartedNodeIndex] = node; - await Task.WhenAll(_nodes.Select(x => x.Started)) - .WithTimeout(RestartTimeout, MiniNodeLogging.WriteLogs); AssertEx.IsOrBecomesTrue( () => { @@ -71,6 +72,29 @@ await Task.WhenAll(_nodes.Select(x => x.Started)) } } + private int SelectRestartNode(bool[] restartedNodes, bool restartLeader) + { + var targetState = restartLeader ? VNodeState.Leader : VNodeState.Follower; + var candidate = _nodes + .Select((node, index) => (node, index)) + .FirstOrDefault(x => !restartedNodes[x.index] && x.node.NodeState == targetState); + + if (candidate.node is not null) + return candidate.index; + + var fallbackIndex = Array.FindIndex(restartedNodes, restarted => !restarted); + if (fallbackIndex >= 0) + return fallbackIndex; + + throw new InvalidOperationException("All cluster nodes have already been restarted."); + } + + private EndPoint[] GossipSeedsFor(int restartedNodeIndex) => + _nodeEndpoints + .Where((_, index) => index != restartedNodeIndex) + .Select(x => (EndPoint)x.HttpEndPoint) + .ToArray(); + [Test] public void cluster_should_stabilize() { diff --git a/src/EventStore.Core.Tests/Services/Replication/Subscriptions/when_subscribed_to_stream_on_leader_and_event_is_replicated_to_followers.cs b/src/EventStore.Core.Tests/Services/Replication/Subscriptions/when_subscribed_to_stream_on_leader_and_event_is_replicated_to_followers.cs index 6d3d10042..59c0cba11 100644 --- a/src/EventStore.Core.Tests/Services/Replication/Subscriptions/when_subscribed_to_stream_on_leader_and_event_is_replicated_to_followers.cs +++ b/src/EventStore.Core.Tests/Services/Replication/Subscriptions/when_subscribed_to_stream_on_leader_and_event_is_replicated_to_followers.cs @@ -3,9 +3,9 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using EventStore.Core.Bus; using EventStore.Core.Data; using EventStore.Core.Messages; +using EventStore.Core.Tests.Helpers; using EventStore.Core.Tests.Integration; using NUnit.Framework; @@ -16,37 +16,24 @@ namespace EventStore.Core.Tests.Replication.ReadStream; public class when_subscribed_to_stream_on_leader_and_event_is_replicated_to_followers : specification_with_cluster { private const string _streamId = "test-stream"; - private CountdownEvent _expectedNumberOfRoleAssignments; + private static readonly TimeSpan _topologyTimeout = TimeSpan.FromSeconds(90); + private static readonly TimeSpan _subscriptionTimeout = TimeSpan.FromSeconds(30); private CountdownEvent _subscriptionsConfirmed; private TestSubscription _leaderSubscription; private List> _followerSubscriptions; - private TimeSpan _timeout = TimeSpan.FromSeconds(5); - - protected override void BeforeNodesStart() - { - _nodes.ToList().ForEach(x => - x.Node.MainBus.Subscribe(new AdHocHandler(Handle))); - _expectedNumberOfRoleAssignments = new CountdownEvent(3); - base.BeforeNodesStart(); - } - - private void Handle(SystemMessage.StateChangeMessage msg) - { - switch (msg.State) - { - case Data.VNodeState.Leader: - _expectedNumberOfRoleAssignments.Signal(); - break; - case Data.VNodeState.Follower: - _expectedNumberOfRoleAssignments.Signal(); - break; - } - } - protected override async Task Given() { - _expectedNumberOfRoleAssignments.Wait(5000); + AssertEx.IsOrBecomesTrue( + () => + { + var states = _nodes.Select(x => x.NodeState).ToArray(); + return states.Count(x => x == VNodeState.Leader) == 1 && + states.Count(x => x == VNodeState.Follower) == 2; + }, + _topologyTimeout, + "Timed out waiting for one leader and two followers", + MiniNodeLogging.WriteLogs); var leader = GetLeader(); Assert.IsNotNull(leader, "Could not get leader node"); @@ -54,12 +41,14 @@ protected override async Task Given() // Set the checkpoint so the check is not skipped leader.Db.Config.ReplicationCheckpoint.Write(0); - _subscriptionsConfirmed = new CountdownEvent(3); + var followers = GetFollowers(); + Assert.AreEqual(2, followers.Length, "Expected two follower nodes"); + + _subscriptionsConfirmed = new CountdownEvent(1 + followers.Length); _leaderSubscription = new TestSubscription(leader, 1, _streamId, _subscriptionsConfirmed); _leaderSubscription.CreateSubscription(); _followerSubscriptions = new List>(); - var followers = GetFollowers(); foreach (var s in followers) { var followerSubscription = new TestSubscription(s, 1, _streamId, _subscriptionsConfirmed); @@ -67,7 +56,7 @@ protected override async Task Given() followerSubscription.CreateSubscription(); } - if (!_subscriptionsConfirmed.Wait(_timeout)) + if (!_subscriptionsConfirmed.Wait(_subscriptionTimeout)) { Assert.Fail($"Timed out waiting for subscriptions to confirm, confirmed {_subscriptionsConfirmed.CurrentCount} need {_subscriptionsConfirmed.InitialCount}."); } diff --git a/src/EventStore.Projections.Core.Tests/Services/Jint/when_accessing_event_created_property.cs b/src/EventStore.Projections.Core.Tests/Services/Jint/when_accessing_event_created_property.cs new file mode 100644 index 000000000..7edab9d8e --- /dev/null +++ b/src/EventStore.Projections.Core.Tests/Services/Jint/when_accessing_event_created_property.cs @@ -0,0 +1,60 @@ +using System; +using System.Text; +using System.Text.Json; +using EventStore.Core.Data; +using NUnit.Framework; +using ResolvedEvent = EventStore.Projections.Core.Services.Processing.ResolvedEvent; + +namespace EventStore.Projections.Core.Tests.Services.Jint; + +[TestFixture(DateTimeKind.Utc)] +[TestFixture(DateTimeKind.Unspecified)] +public class when_accessing_event_created_property : specification_with_event_handled +{ + private readonly DateTimeKind _timestampKind; + + public when_accessing_event_created_property(DateTimeKind timestampKind) + { + _timestampKind = timestampKind; + } + + private DateTime ExpectedTimestamp => new(2023, 4, 5, 12, 34, 56, _timestampKind); + + protected override void Given() + { + _projection = @" + fromAll().when({$any: + function(state, event) { + return { created: event.created }; + } + }); + "; + _state = @"{}"; + _handledEvent = new ResolvedEvent( + positionStreamId: "test-stream", + positionSequenceNumber: 42, + eventStreamId: "test-stream", + eventSequenceNumber: 42, + resolvedLinkTo: false, + position: new TFPos(100, 50), + eventOrLinkTargetPosition: new TFPos(100, 50), + eventId: Guid.NewGuid(), + eventType: "TestEvent", + isJson: true, + data: Encoding.UTF8.GetBytes("{}"), + metadata: Encoding.UTF8.GetBytes("{}"), + positionMetadata: Array.Empty(), + streamMetadata: null, + timestamp: ExpectedTimestamp); + } + + [Test, Category(_projectionType)] + public void exposes_the_event_timestamp_as_an_iso_8601_string() + { + using var state = JsonDocument.Parse(_newState); + + var expectedTimestamp = DateTime.SpecifyKind(ExpectedTimestamp, DateTimeKind.Utc).ToString("o"); + + Assert.That(state.RootElement.GetProperty("created").GetString(), Is.EqualTo(expectedTimestamp)); + } +} diff --git a/src/EventStore.Projections.Core/Services/Interpreted/JintProjectionStateHandler.cs b/src/EventStore.Projections.Core/Services/Interpreted/JintProjectionStateHandler.cs index 155ace4f2..01e3c75d7 100644 --- a/src/EventStore.Projections.Core/Services/Interpreted/JintProjectionStateHandler.cs +++ b/src/EventStore.Projections.Core/Services/Interpreted/JintProjectionStateHandler.cs @@ -341,7 +341,8 @@ private static string EnsureNonNullStringValue(JsValue parameter, string paramet JsString s => formatForRaw ? $"\"{s.AsString()}\"" : s.AsString(), JsNumber n => n.AsNumber().ToString(CultureInfo.InvariantCulture), JsNull => null, - JsUndefined => null, { } v => Serialize(value), + JsUndefined => null, + { } => Serialize(value), _ => null }; } @@ -969,6 +970,7 @@ EventEnvelope CreateEnvelope(string partition, ResolvedEvent @event, string cate { var envelope = new EventEnvelope(_engine, _parser, this); envelope.Partition = partition; + envelope.Created = @event.Timestamp; envelope.BodyRaw = @event.Data; envelope.MetadataRaw = @event.Metadata; envelope.StreamId = @event.EventStreamId; @@ -1119,6 +1121,11 @@ public string Category set => SetOwnProperty("category", new PropertyDescriptor(value, false, true, false)); } + public DateTime Created + { + set => SetOwnProperty("created", new PropertyDescriptor(FormatCreated(value), false, true, false)); + } + public string EventId { set => SetOwnProperty("eventId", new PropertyDescriptor(value, false, true, false)); @@ -1130,6 +1137,15 @@ public EventEnvelope(Engine engine, JsonParser parser, JintProjectionStateHandle _parent = parent; } + private static string FormatCreated(DateTime value) + { + var created = value.Kind == DateTimeKind.Unspecified + ? DateTime.SpecifyKind(value, DateTimeKind.Utc) + : value.ToUniversalTime(); + + return created.ToString("o"); + } + public override JsValue Get(JsValue property, JsValue receiver) { if (property == "body" || property == "data")