Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/projections.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ handler. The event provided through the handler contains the following propertie
- `partition`: string
- `eventType`: string
- `streamId`: string
- `created`: string (ISO 8601)

| Handler | Description | Notes |
|:---------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------|
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using EventStore.Core.Data;
Expand All @@ -12,19 +13,19 @@ namespace EventStore.Core.Tests.Integration;
[NonParallelizable]
public class when_restarting_one_node_at_a_time<TLogFormat, TStreamId> : specification_with_cluster<TLogFormat, TStreamId>
{
private const int RestartCount = 3;
private static readonly bool IsArm64 = RuntimeInformation.ProcessArchitecture == Architecture.Arm64;
private static readonly TimeSpan InitialStabilizationTimeout = TimeSpan.FromMinutes(IsArm64 ? 8 : 5);
private static readonly TimeSpan RestartTimeout = TimeSpan.FromMinutes(IsArm64 ? 5 : 3);
protected override TimeSpan GivenTimeout { get; } = TimeSpan.FromMinutes(IsArm64 ? 20 : 10);
private static readonly TimeSpan InitialStabilizationTimeout = TimeSpan.FromMinutes(IsArm64 ? 15 : 5);
private static readonly TimeSpan RestartTimeout = TimeSpan.FromMinutes(10);
protected override TimeSpan GivenTimeout { get; } = TimeSpan.FromMinutes(IsArm64 ? 30 : 20);

protected override async Task Given()
{
await base.Given();

for (int i = 0; i < 9; i++)
var restartedNodes = new bool[RestartCount];
for (int i = 0; i < RestartCount; i++)
{
var restartedNodeIndex = i % 3;

AssertEx.IsOrBecomesTrue(
() =>
{
Expand All @@ -33,9 +34,12 @@ protected override async Task Given()
states.Count(x => x == VNodeState.Follower) == 2;
},
i == 0 ? InitialStabilizationTimeout : RestartTimeout,
$"Cluster did not stabilize before restarting node {restartedNodeIndex}",
$"Cluster did not stabilize before restart iteration {i + 1}",
MiniNodeLogging.WriteLogs);

var restartedNodeIndex = SelectRestartNode(restartedNodes, restartLeader: i == RestartCount - 1);
restartedNodes[restartedNodeIndex] = true;

await _nodes[restartedNodeIndex].Shutdown(keepDb: true);
AssertEx.IsOrBecomesTrue(
() =>
Expand All @@ -51,13 +55,10 @@ protected override async Task Given()
$"Remaining cluster did not stabilize after shutting down node {restartedNodeIndex}",
MiniNodeLogging.WriteLogs);

var node = CreateNode(restartedNodeIndex, _nodeEndpoints[restartedNodeIndex],
new[] { _nodeEndpoints[(i + 1) % 3].HttpEndPoint, _nodeEndpoints[(i + 2) % 3].HttpEndPoint });
var node = CreateNode(restartedNodeIndex, _nodeEndpoints[restartedNodeIndex], GossipSeedsFor(restartedNodeIndex));
node.Start();
_nodes[restartedNodeIndex] = node;

await Task.WhenAll(_nodes.Select(x => x.Started))
.WithTimeout(RestartTimeout, MiniNodeLogging.WriteLogs);
AssertEx.IsOrBecomesTrue(
() =>
{
Expand All @@ -71,6 +72,29 @@ await Task.WhenAll(_nodes.Select(x => x.Started))
}
}

private int SelectRestartNode(bool[] restartedNodes, bool restartLeader)
{
var targetState = restartLeader ? VNodeState.Leader : VNodeState.Follower;
var candidate = _nodes
.Select((node, index) => (node, index))
.FirstOrDefault(x => !restartedNodes[x.index] && x.node.NodeState == targetState);

if (candidate.node is not null)
return candidate.index;

var fallbackIndex = Array.FindIndex(restartedNodes, restarted => !restarted);
if (fallbackIndex >= 0)
return fallbackIndex;

throw new InvalidOperationException("All cluster nodes have already been restarted.");
}

private EndPoint[] GossipSeedsFor(int restartedNodeIndex) =>
_nodeEndpoints
.Where((_, index) => index != restartedNodeIndex)
.Select(x => (EndPoint)x.HttpEndPoint)
.ToArray();

[Test]
public void cluster_should_stabilize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Core.Bus;
using EventStore.Core.Data;
using EventStore.Core.Messages;
using EventStore.Core.Tests.Helpers;
using EventStore.Core.Tests.Integration;
using NUnit.Framework;

Expand All @@ -16,58 +16,47 @@ namespace EventStore.Core.Tests.Replication.ReadStream;
public class when_subscribed_to_stream_on_leader_and_event_is_replicated_to_followers<TLogFormat, TStreamId> : specification_with_cluster<TLogFormat, TStreamId>
{
private const string _streamId = "test-stream";
private CountdownEvent _expectedNumberOfRoleAssignments;
private static readonly TimeSpan _topologyTimeout = TimeSpan.FromSeconds(90);
private static readonly TimeSpan _subscriptionTimeout = TimeSpan.FromSeconds(30);
private CountdownEvent _subscriptionsConfirmed;
private TestSubscription<TLogFormat, TStreamId> _leaderSubscription;
private List<TestSubscription<TLogFormat, TStreamId>> _followerSubscriptions;

private TimeSpan _timeout = TimeSpan.FromSeconds(5);

protected override void BeforeNodesStart()
{
_nodes.ToList().ForEach(x =>
x.Node.MainBus.Subscribe(new AdHocHandler<SystemMessage.StateChangeMessage>(Handle)));
_expectedNumberOfRoleAssignments = new CountdownEvent(3);
base.BeforeNodesStart();
}

private void Handle(SystemMessage.StateChangeMessage msg)
{
switch (msg.State)
{
case Data.VNodeState.Leader:
_expectedNumberOfRoleAssignments.Signal();
break;
case Data.VNodeState.Follower:
_expectedNumberOfRoleAssignments.Signal();
break;
}
}

protected override async Task Given()
{
_expectedNumberOfRoleAssignments.Wait(5000);
AssertEx.IsOrBecomesTrue(
() =>
{
var states = _nodes.Select(x => x.NodeState).ToArray();
return states.Count(x => x == VNodeState.Leader) == 1 &&
states.Count(x => x == VNodeState.Follower) == 2;
},
_topologyTimeout,
"Timed out waiting for one leader and two followers",
MiniNodeLogging.WriteLogs);

var leader = GetLeader();
Assert.IsNotNull(leader, "Could not get leader node");

// Set the checkpoint so the check is not skipped
leader.Db.Config.ReplicationCheckpoint.Write(0);

_subscriptionsConfirmed = new CountdownEvent(3);
var followers = GetFollowers();
Assert.AreEqual(2, followers.Length, "Expected two follower nodes");

_subscriptionsConfirmed = new CountdownEvent(1 + followers.Length);
_leaderSubscription = new TestSubscription<TLogFormat, TStreamId>(leader, 1, _streamId, _subscriptionsConfirmed);
_leaderSubscription.CreateSubscription();

_followerSubscriptions = new List<TestSubscription<TLogFormat, TStreamId>>();
var followers = GetFollowers();
foreach (var s in followers)
{
var followerSubscription = new TestSubscription<TLogFormat, TStreamId>(s, 1, _streamId, _subscriptionsConfirmed);
_followerSubscriptions.Add(followerSubscription);
followerSubscription.CreateSubscription();
}

if (!_subscriptionsConfirmed.Wait(_timeout))
if (!_subscriptionsConfirmed.Wait(_subscriptionTimeout))
{
Assert.Fail($"Timed out waiting for subscriptions to confirm, confirmed {_subscriptionsConfirmed.CurrentCount} need {_subscriptionsConfirmed.InitialCount}.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using System.Text;
using System.Text.Json;
using EventStore.Core.Data;
using NUnit.Framework;
using ResolvedEvent = EventStore.Projections.Core.Services.Processing.ResolvedEvent;

namespace EventStore.Projections.Core.Tests.Services.Jint;

[TestFixture(DateTimeKind.Utc)]
[TestFixture(DateTimeKind.Unspecified)]
public class when_accessing_event_created_property : specification_with_event_handled
{
private readonly DateTimeKind _timestampKind;

public when_accessing_event_created_property(DateTimeKind timestampKind)
{
_timestampKind = timestampKind;
}

private DateTime ExpectedTimestamp => new(2023, 4, 5, 12, 34, 56, _timestampKind);

protected override void Given()
{
_projection = @"
fromAll().when({$any:
function(state, event) {
return { created: event.created };
}
});
";
_state = @"{}";
_handledEvent = new ResolvedEvent(
positionStreamId: "test-stream",
positionSequenceNumber: 42,
eventStreamId: "test-stream",
eventSequenceNumber: 42,
resolvedLinkTo: false,
position: new TFPos(100, 50),
eventOrLinkTargetPosition: new TFPos(100, 50),
eventId: Guid.NewGuid(),
eventType: "TestEvent",
isJson: true,
data: Encoding.UTF8.GetBytes("{}"),
metadata: Encoding.UTF8.GetBytes("{}"),
positionMetadata: Array.Empty<byte>(),
streamMetadata: null,
timestamp: ExpectedTimestamp);
}

[Test, Category(_projectionType)]
public void exposes_the_event_timestamp_as_an_iso_8601_string()
{
using var state = JsonDocument.Parse(_newState);

var expectedTimestamp = DateTime.SpecifyKind(ExpectedTimestamp, DateTimeKind.Utc).ToString("o");

Assert.That(state.RootElement.GetProperty("created").GetString(), Is.EqualTo(expectedTimestamp));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ private static string EnsureNonNullStringValue(JsValue parameter, string paramet
JsString s => formatForRaw ? $"\"{s.AsString()}\"" : s.AsString(),
JsNumber n => n.AsNumber().ToString(CultureInfo.InvariantCulture),
JsNull => null,
JsUndefined => null, { } v => Serialize(value),
JsUndefined => null,
{ } => Serialize(value),
_ => null
};
}
Expand Down Expand Up @@ -969,6 +970,7 @@ EventEnvelope CreateEnvelope(string partition, ResolvedEvent @event, string cate
{
var envelope = new EventEnvelope(_engine, _parser, this);
envelope.Partition = partition;
envelope.Created = @event.Timestamp;
envelope.BodyRaw = @event.Data;
envelope.MetadataRaw = @event.Metadata;
envelope.StreamId = @event.EventStreamId;
Expand Down Expand Up @@ -1119,6 +1121,11 @@ public string Category
set => SetOwnProperty("category", new PropertyDescriptor(value, false, true, false));
}

public DateTime Created
{
set => SetOwnProperty("created", new PropertyDescriptor(FormatCreated(value), false, true, false));
}
Comment thread
yordis marked this conversation as resolved.

public string EventId
{
set => SetOwnProperty("eventId", new PropertyDescriptor(value, false, true, false));
Expand All @@ -1130,6 +1137,15 @@ public EventEnvelope(Engine engine, JsonParser parser, JintProjectionStateHandle
_parent = parent;
}

private static string FormatCreated(DateTime value)
{
var created = value.Kind == DateTimeKind.Unspecified
? DateTime.SpecifyKind(value, DateTimeKind.Utc)
: value.ToUniversalTime();

return created.ToString("o");
}

public override JsValue Get(JsValue property, JsValue receiver)
{
if (property == "body" || property == "data")
Expand Down
Loading