diff --git a/src/EventStore.Core.Tests/Services/Storage/CheckCommit/when_checking_deleted_stream_with_expected_version_no_stream.cs b/src/EventStore.Core.Tests/Services/Storage/CheckCommit/when_checking_deleted_stream_with_expected_version_no_stream.cs new file mode 100644 index 000000000..3d151f294 --- /dev/null +++ b/src/EventStore.Core.Tests/Services/Storage/CheckCommit/when_checking_deleted_stream_with_expected_version_no_stream.cs @@ -0,0 +1,61 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Data; +using EventStore.Core.Services.Storage.ReaderIndex; +using NUnit.Framework; + +namespace EventStore.Core.Tests.Services.Storage.CheckCommit; + +[TestFixture(typeof(LogFormat.V2), typeof(string))] +public class when_checking_deleted_stream_with_expected_version_no_stream : + ReadIndexTestScenario +{ + private const string HardDeletedStream = "hard-deleted-stream"; + private const string SoftDeletedStream = "soft-deleted-stream"; + private TStreamId _hardDeletedStreamId; + private TStreamId _softDeletedStreamId; + + protected override async ValueTask WriteTestScenario(CancellationToken token) + { + (_hardDeletedStreamId, _) = await GetOrReserve(HardDeletedStream, token); + await WriteSingleEvent(HardDeletedStream, 0, "event", token: token); + await WriteDelete(HardDeletedStream, token); + + (_softDeletedStreamId, _) = await GetOrReserve(SoftDeletedStream, token); + await WriteSingleEvent(SoftDeletedStream, 0, "event", token: token); + await WriteStreamMetadata( + SoftDeletedStream, + 0, + new StreamMetadata(truncateBefore: EventNumber.DeletedStream).ToJsonString(), + token: token); + } + + [Test] + public async Task hard_deleted_stream_is_valid_for_check_only_write() + { + var result = await ReadIndex.IndexWriter.CheckCommit( + _hardDeletedStreamId, + ExpectedVersion.NoStream, + Array.Empty(), + streamMightExist: true, + CancellationToken.None); + + Assert.AreEqual(CommitDecision.Ok, result.Decision); + Assert.AreEqual(EventNumber.DeletedStream, result.CurrentVersion); + } + + [Test] + public async Task soft_deleted_stream_is_valid_for_check_only_write() + { + var result = await ReadIndex.IndexWriter.CheckCommit( + _softDeletedStreamId, + ExpectedVersion.NoStream, + Array.Empty(), + streamMightExist: true, + CancellationToken.None); + + Assert.AreEqual(CommitDecision.Ok, result.Decision); + Assert.IsTrue(result.IsSoftDeleted); + } +} diff --git a/src/EventStore.Core/Services/Storage/ReaderIndex/IndexWriter.cs b/src/EventStore.Core/Services/Storage/ReaderIndex/IndexWriter.cs index 9a84e4dbf..a1a46832b 100644 --- a/src/EventStore.Core/Services/Storage/ReaderIndex/IndexWriter.cs +++ b/src/EventStore.Core/Services/Storage/ReaderIndex/IndexWriter.cs @@ -17,7 +17,8 @@ namespace EventStore.Core.Services.Storage.ReaderIndex; -public interface IIndexWriter { +public interface IIndexWriter +{ long CachedTransInfo { get; } long NotCachedTransInfo { get; } @@ -40,17 +41,20 @@ public interface IIndexWriter { ValueTask GetStreamName(TStreamId streamId, CancellationToken token); } -public struct RawMetaInfo { +public struct RawMetaInfo +{ public readonly long MetaLastEventNumber; public readonly ReadOnlyMemory RawMeta; - public RawMetaInfo(long metaLastEventNumber, ReadOnlyMemory rawMeta) { + public RawMetaInfo(long metaLastEventNumber, ReadOnlyMemory rawMeta) + { MetaLastEventNumber = metaLastEventNumber; RawMeta = rawMeta; } } -public abstract class IndexWriter { +public abstract class IndexWriter +{ protected static readonly ILogger Log = Serilog.Log.ForContext(); } @@ -141,10 +145,10 @@ public async ValueTask> CheckCommitStartingAt(long if (await GetPrepare(reader, transactionPosition, token) is not { } prepare) { Log.Error("Could not read first prepare of to-be-committed transaction. " - + "Transaction pos: {transactionPosition}, commit pos: {commitPosition}.", + + "Transaction pos: {transactionPosition}, commit pos: {commitPosition}.", transactionPosition, commitPosition); var message = String.Format("Could not read first prepare of to-be-committed transaction. " - + "Transaction pos: {0}, commit pos: {1}.", + + "Transaction pos: {0}, commit pos: {1}.", transactionPosition, commitPosition); throw new InvalidOperationException(message); } @@ -202,7 +206,16 @@ public async ValueTask> CheckCommit(TStreamId strea var curVersion = await GetStreamLastEventNumber(streamId, token); if (curVersion is EventNumber.DeletedStream) + { + using var enumerator = eventIds.GetEnumerator(); + if (!enumerator.MoveNext() && + expectedVersion is ExpectedVersion.Any or ExpectedVersion.NoStream or EventNumber.DeletedStream) + { + return new CommitCheckResult(CommitDecision.Ok, streamId, curVersion, -1, -1, false); + } + return new CommitCheckResult(CommitDecision.Deleted, streamId, curVersion, -1, -1, false); + } if (curVersion is EventNumber.Invalid) return new CommitCheckResult(CommitDecision.WrongExpectedVersion, streamId, curVersion, -1, -1, false); @@ -231,7 +244,7 @@ public async ValueTask> CheckCommit(TStreamId strea foreach (var eventId in eventIds) { if (!_committedEvents.TryGetRecord(eventId, out var prepInfo) || - !StreamIdComparer.Equals(prepInfo.StreamId, streamId)) + !StreamIdComparer.Equals(prepInfo.StreamId, streamId)) return new CommitCheckResult( first ? CommitDecision.Ok : CommitDecision.CorruptedIdempotency, streamId, curVersion, -1, -1, first && await IsSoftDeleted(streamId, token)); @@ -272,8 +285,8 @@ public async ValueTask> CheckCommit(TStreamId strea eventNumber += 1; if (_committedEvents.TryGetRecord(eventId, out var prepInfo) - && StreamIdComparer.Equals(prepInfo.StreamId, streamId) - && prepInfo.EventNumber == eventNumber) + && StreamIdComparer.Equals(prepInfo.StreamId, streamId) + && prepInfo.EventNumber == eventNumber) continue; if (await _indexReader.ReadPrepare(streamId, eventNumber, token) is { } res && res.EventId == eventId) @@ -293,9 +306,14 @@ public async ValueTask> CheckCommit(TStreamId strea false); } - if (eventNumber == expectedVersion) /* no data in transaction */ + if (eventNumber == expectedVersion) + { + if (expectedVersion is ExpectedVersion.NoStream && await IsSoftDeleted(streamId, token)) + return new(CommitDecision.Ok, streamId, curVersion, -1, -1, true); + return new(CommitDecision.WrongExpectedVersion, streamId, curVersion, -1, -1, false); + } var isReplicated = await _indexReader.GetStreamLastEventNumber(streamId, token) >= eventNumber; //TODO(clc): the new index should hold the log positions removing this read @@ -453,8 +471,10 @@ public void PurgeNotProcessedCommitsTill(long checkpoint) commitInfo.StreamId, x => { - if (!Debugger.IsAttached) Debugger.Launch(); - else Debugger.Break(); + if (!Debugger.IsAttached) + Debugger.Launch(); + else + Debugger.Break(); throw new Exception(string.Format("CommitInfo for stream '{0}' is not present!", x)); }, (streamId, oldVersion) => oldVersion, @@ -465,8 +485,10 @@ public void PurgeNotProcessedCommitsTill(long checkpoint) _systemStreams.OriginalStreamOf(commitInfo.StreamId), x => { - if (!Debugger.IsAttached) Debugger.Launch(); - else Debugger.Break(); + if (!Debugger.IsAttached) + Debugger.Launch(); + else + Debugger.Break(); throw new Exception(string.Format( "Original stream CommitInfo for meta-stream '{0}' is not present!", _systemStreams.MetaStreamOf(x)));