Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Core.Data;
using EventStore.Core.Services.Storage.ReaderIndex;
using NUnit.Framework;

namespace EventStore.Core.Tests.Services.Storage.CheckCommit;

[TestFixture(typeof(LogFormat.V2), typeof(string))]
public class when_checking_deleted_stream_with_expected_version_no_stream<TLogFormat, TStreamId> :
ReadIndexTestScenario<TLogFormat, TStreamId>
{
private const string HardDeletedStream = "hard-deleted-stream";
private const string SoftDeletedStream = "soft-deleted-stream";
private TStreamId _hardDeletedStreamId;
private TStreamId _softDeletedStreamId;

protected override async ValueTask WriteTestScenario(CancellationToken token)
{
(_hardDeletedStreamId, _) = await GetOrReserve(HardDeletedStream, token);
await WriteSingleEvent(HardDeletedStream, 0, "event", token: token);
await WriteDelete(HardDeletedStream, token);

(_softDeletedStreamId, _) = await GetOrReserve(SoftDeletedStream, token);
await WriteSingleEvent(SoftDeletedStream, 0, "event", token: token);
await WriteStreamMetadata(
SoftDeletedStream,
0,
new StreamMetadata(truncateBefore: EventNumber.DeletedStream).ToJsonString(),
token: token);
}

[Test]
public async Task hard_deleted_stream_is_valid_for_check_only_write()
{
var result = await ReadIndex.IndexWriter.CheckCommit(
_hardDeletedStreamId,
ExpectedVersion.NoStream,
Array.Empty<Guid>(),
streamMightExist: true,
CancellationToken.None);

Assert.AreEqual(CommitDecision.Ok, result.Decision);
Assert.AreEqual(EventNumber.DeletedStream, result.CurrentVersion);
}

[Test]
public async Task soft_deleted_stream_is_valid_for_check_only_write()
{
var result = await ReadIndex.IndexWriter.CheckCommit(
_softDeletedStreamId,
ExpectedVersion.NoStream,
Array.Empty<Guid>(),
streamMightExist: true,
CancellationToken.None);

Assert.AreEqual(CommitDecision.Ok, result.Decision);
Assert.IsTrue(result.IsSoftDeleted);
}
}
50 changes: 36 additions & 14 deletions src/EventStore.Core/Services/Storage/ReaderIndex/IndexWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

namespace EventStore.Core.Services.Storage.ReaderIndex;

public interface IIndexWriter<TStreamId> {
public interface IIndexWriter<TStreamId>
{
long CachedTransInfo { get; }
long NotCachedTransInfo { get; }

Expand All @@ -40,17 +41,20 @@ public interface IIndexWriter<TStreamId> {
ValueTask<string> GetStreamName(TStreamId streamId, CancellationToken token);
}

public struct RawMetaInfo {
public struct RawMetaInfo
{
public readonly long MetaLastEventNumber;
public readonly ReadOnlyMemory<byte> RawMeta;

public RawMetaInfo(long metaLastEventNumber, ReadOnlyMemory<byte> rawMeta) {
public RawMetaInfo(long metaLastEventNumber, ReadOnlyMemory<byte> rawMeta)
{
MetaLastEventNumber = metaLastEventNumber;
RawMeta = rawMeta;
}
}

public abstract class IndexWriter {
public abstract class IndexWriter
{
protected static readonly ILogger Log = Serilog.Log.ForContext<IndexWriter>();
}

Expand Down Expand Up @@ -141,10 +145,10 @@ public async ValueTask<CommitCheckResult<TStreamId>> CheckCommitStartingAt(long
if (await GetPrepare(reader, transactionPosition, token) is not { } prepare)
{
Log.Error("Could not read first prepare of to-be-committed transaction. "
+ "Transaction pos: {transactionPosition}, commit pos: {commitPosition}.",
+ "Transaction pos: {transactionPosition}, commit pos: {commitPosition}.",
transactionPosition, commitPosition);
var message = String.Format("Could not read first prepare of to-be-committed transaction. "
+ "Transaction pos: {0}, commit pos: {1}.",
+ "Transaction pos: {0}, commit pos: {1}.",
transactionPosition, commitPosition);
throw new InvalidOperationException(message);
}
Expand Down Expand Up @@ -202,7 +206,16 @@ public async ValueTask<CommitCheckResult<TStreamId>> CheckCommit(TStreamId strea

var curVersion = await GetStreamLastEventNumber(streamId, token);
if (curVersion is EventNumber.DeletedStream)
{
using var enumerator = eventIds.GetEnumerator();
if (!enumerator.MoveNext() &&
expectedVersion is ExpectedVersion.Any or ExpectedVersion.NoStream or EventNumber.DeletedStream)
{
return new CommitCheckResult<TStreamId>(CommitDecision.Ok, streamId, curVersion, -1, -1, false);
}

return new CommitCheckResult<TStreamId>(CommitDecision.Deleted, streamId, curVersion, -1, -1, false);
}
if (curVersion is EventNumber.Invalid)
return new CommitCheckResult<TStreamId>(CommitDecision.WrongExpectedVersion, streamId, curVersion, -1, -1,
false);
Expand Down Expand Up @@ -231,7 +244,7 @@ public async ValueTask<CommitCheckResult<TStreamId>> CheckCommit(TStreamId strea
foreach (var eventId in eventIds)
{
if (!_committedEvents.TryGetRecord(eventId, out var prepInfo) ||
!StreamIdComparer.Equals(prepInfo.StreamId, streamId))
!StreamIdComparer.Equals(prepInfo.StreamId, streamId))
return new CommitCheckResult<TStreamId>(
first ? CommitDecision.Ok : CommitDecision.CorruptedIdempotency,
streamId, curVersion, -1, -1, first && await IsSoftDeleted(streamId, token));
Expand Down Expand Up @@ -272,8 +285,8 @@ public async ValueTask<CommitCheckResult<TStreamId>> CheckCommit(TStreamId strea
eventNumber += 1;

if (_committedEvents.TryGetRecord(eventId, out var prepInfo)
&& StreamIdComparer.Equals(prepInfo.StreamId, streamId)
&& prepInfo.EventNumber == eventNumber)
&& StreamIdComparer.Equals(prepInfo.StreamId, streamId)
&& prepInfo.EventNumber == eventNumber)
continue;

if (await _indexReader.ReadPrepare(streamId, eventNumber, token) is { } res && res.EventId == eventId)
Expand All @@ -293,9 +306,14 @@ public async ValueTask<CommitCheckResult<TStreamId>> CheckCommit(TStreamId strea
false);
}

if (eventNumber == expectedVersion) /* no data in transaction */
if (eventNumber == expectedVersion)
{
if (expectedVersion is ExpectedVersion.NoStream && await IsSoftDeleted(streamId, token))
return new(CommitDecision.Ok, streamId, curVersion, -1, -1, true);

return new(CommitDecision.WrongExpectedVersion, streamId, curVersion, -1,
-1, false);
}

var isReplicated = await _indexReader.GetStreamLastEventNumber(streamId, token) >= eventNumber;
//TODO(clc): the new index should hold the log positions removing this read
Expand Down Expand Up @@ -453,8 +471,10 @@ public void PurgeNotProcessedCommitsTill(long checkpoint)
commitInfo.StreamId,
x =>
{
if (!Debugger.IsAttached) Debugger.Launch();
else Debugger.Break();
if (!Debugger.IsAttached)
Debugger.Launch();
else
Debugger.Break();
throw new Exception(string.Format("CommitInfo for stream '{0}' is not present!", x));
},
(streamId, oldVersion) => oldVersion,
Expand All @@ -465,8 +485,10 @@ public void PurgeNotProcessedCommitsTill(long checkpoint)
_systemStreams.OriginalStreamOf(commitInfo.StreamId),
x =>
{
if (!Debugger.IsAttached) Debugger.Launch();
else Debugger.Break();
if (!Debugger.IsAttached)
Debugger.Launch();
else
Debugger.Break();
throw new Exception(string.Format(
"Original stream CommitInfo for meta-stream '{0}' is not present!",
_systemStreams.MetaStreamOf(x)));
Expand Down
Loading