From e6c4f99127e791013cd5eb0ad5713e29abd2179f Mon Sep 17 00:00:00 2001 From: Kaibo Cai Date: Fri, 15 May 2026 16:42:56 -0500 Subject: [PATCH] reset pending orchestrations when worker restart --- .../OrchestrationSessionTests.cs | 78 +++++++++++++++++++ .../Messaging/ControlQueue.cs | 45 +++++++++++ .../OrchestrationSessionManager.cs | 57 +++++++++++++- 3 files changed, 179 insertions(+), 1 deletion(-) diff --git a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs b/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs index 126c4b9bc..23976d888 100644 --- a/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs +++ b/Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs @@ -22,6 +22,7 @@ namespace DurableTask.AzureStorage.Tests using System.Threading.Tasks; using DurableTask.AzureStorage.Messaging; using DurableTask.AzureStorage.Monitoring; + using DurableTask.AzureStorage.Storage; using DurableTask.AzureStorage.Tracking; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -223,5 +224,82 @@ public void AbortAllSessions_NoSessions_DoesNotThrow() manager.GetStats(out _, out _, out int count); Assert.AreEqual(0, count, "Should still have no active sessions"); } + + [TestMethod] + public async Task GetNextSessionAsync_DrainedReadyQueueNode_IsIgnored() + { + var settings = new AzureStorageOrchestrationServiceSettings + { + StorageAccountClientProvider = new StorageAccountClientProvider("UseDevelopmentStorage=true"), + }; + var stats = new AzureStorageOrchestrationServiceStats(); + var trackingStore = new Mock(); + + using var manager = new OrchestrationSessionManager( + "testaccount", + settings, + stats, + trackingStore.Object); + + var storageClient = new AzureStorageClient(settings); + var messageManager = new MessageManager(settings, storageClient, settings.TaskHubName); + var controlQueue = new ControlQueue(storageClient, "partition-0", messageManager); + + object pendingBatch = CreatePendingBatch(controlQueue); + object node = AddPendingBatchNode(manager, pendingBatch); + RemovePendingBatchNode(manager, node); + EnqueueReadyForProcessingNode(manager, node); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)); + try + { + await manager.GetNextSessionAsync(entitiesOnly: false, cts.Token); + Assert.Fail("Expected cancellation after the drained node was skipped."); + } + catch (OperationCanceledException) + { + } + } + + static object CreatePendingBatch(ControlQueue controlQueue) + { + Type pendingBatchType = typeof(OrchestrationSessionManager) + .GetNestedType("PendingMessageBatch", BindingFlags.NonPublic); + + return Activator.CreateInstance( + pendingBatchType, + BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, + binder: null, + args: new object[] { controlQueue, "instance1", "execution1" }, + culture: null); + } + + static object AddPendingBatchNode(OrchestrationSessionManager manager, object pendingBatch) + { + object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches"); + MethodInfo addLast = pendingBatches.GetType().GetMethod("AddLast", new[] { pendingBatch.GetType() }); + return addLast.Invoke(pendingBatches, new[] { pendingBatch }); + } + + static void RemovePendingBatchNode(OrchestrationSessionManager manager, object node) + { + object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches"); + MethodInfo remove = pendingBatches.GetType().GetMethod("Remove", new[] { node.GetType() }); + remove.Invoke(pendingBatches, new[] { node }); + } + + static void EnqueueReadyForProcessingNode(OrchestrationSessionManager manager, object node) + { + object readyQueue = GetPrivateField(manager, "orchestrationsReadyForProcessingQueue"); + MethodInfo enqueue = readyQueue.GetType().GetMethod("Enqueue"); + enqueue.Invoke(readyQueue, new[] { node }); + } + + static object GetPrivateField(object target, string fieldName) + { + FieldInfo field = target.GetType().GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance); + Assert.IsNotNull(field); + return field.GetValue(target); + } } } diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 9f1c0d2ba..6b9a01549 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -23,6 +23,7 @@ namespace DurableTask.AzureStorage.Messaging using DurableTask.AzureStorage.Monitoring; using DurableTask.AzureStorage.Partitioning; using DurableTask.AzureStorage.Storage; + using DurableTask.Core; class ControlQueue : TaskHubQueue, IDisposable { @@ -209,6 +210,50 @@ public override Task AbandonMessageAsync(MessageData message, SessionBase? sessi return base.AbandonMessageAsync(message, session); } + /// + /// Abandons a message with zero visibility timeout so it becomes immediately visible + /// for another partition owner to pick up. This is used during drain to avoid stranding + /// messages that were dequeued but not yet promoted to active sessions. + /// + public async Task AbandonMessageForDrainAsync(MessageData message) + { + this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _); + + QueueMessage queueMessage = message.OriginalQueueMessage; + TaskMessage taskMessage = message.TaskMessage; + OrchestrationInstance instance = taskMessage.OrchestrationInstance; + + this.settings.Logger.AbandoningMessage( + this.storageAccountName, + this.settings.TaskHubName, + taskMessage.Event.EventType.ToString(), + Utils.GetTaskEventId(taskMessage.Event), + queueMessage.MessageId, + instance.InstanceId, + instance.ExecutionId, + this.storageQueue.Name, + message.SequenceNumber, + queueMessage.PopReceipt, + visibilityTimeoutSeconds: 0); + + try + { + await this.storageQueue.UpdateMessageAsync( + queueMessage, + TimeSpan.Zero, + clientRequestId: null); + } + catch (Exception e) + { + this.settings.Logger.PartitionManagerWarning( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + this.Name, + $"Failed to abandon message {queueMessage.MessageId} during drain: {e.Message}"); + } + } + public override Task DeleteMessageAsync(MessageData message, SessionBase? session = null) { this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _); diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index abf7a58b2..3a7c2e616 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -216,11 +216,60 @@ public async Task DrainAsync(string partitionId, CloseReason reason, Cancellatio } finally { - // Remove the partition from memory + // Make dequeued-but-undispatched messages visible before dropping the partition. + await this.AbandonPendingMessagesAsync(partitionId); + this.RemoveQueue(partitionId, reason, caller); } } + /// + /// Abandons all pending (dequeued but not yet dispatched) messages for the specified partition, + /// making them immediately visible in the Azure Storage queue for the new partition owner. + /// This prevents a throughput gap equal to the visibility timeout duration when a partition + /// is released during drain. + /// + async Task AbandonPendingMessagesAsync(string partitionId) + { + var messagesToAbandon = new List<(ControlQueue Queue, MessageData Message)>(); + + lock (this.messageAndSessionLock) + { + var node = this.pendingOrchestrationMessageBatches.First; + while (node != null) + { + LinkedListNode? next = node.Next; + PendingMessageBatch batch = node.Value; + + if (string.Equals(batch.ControlQueue.Name, partitionId, StringComparison.OrdinalIgnoreCase)) + { + foreach (MessageData message in batch.Messages) + { + messagesToAbandon.Add((batch.ControlQueue, message)); + } + + this.pendingOrchestrationMessageBatches.Remove(node); + } + + node = next; + } + } + + if (messagesToAbandon.Count > 0) + { + this.settings.Logger.PartitionManagerInfo( + this.storageAccountName, + this.settings.TaskHubName, + this.settings.WorkerId, + partitionId, + $"Abandoning {messagesToAbandon.Count} pending message(s) during drain to make them immediately visible for the new partition owner."); + + await messagesToAbandon.ParallelForEachAsync( + this.settings.MaxStorageOperationConcurrency, + item => item.Queue.AbandonMessageForDrainAsync(item.Message)); + } + } + /// /// This method enumerates all the provided queue messages looking for ExecutionStarted messages. If any are found, it /// queries table storage to ensure that each message has a matching record in the Instances table. If not, this method @@ -592,6 +641,12 @@ async Task ScheduleOrchestrationStatePrefetch( lock (this.messageAndSessionLock) { + // Drain may have removed this batch after it was queued for dispatch. + if (node.List != this.pendingOrchestrationMessageBatches) + { + continue; + } + PendingMessageBatch nextBatch = node.Value; this.pendingOrchestrationMessageBatches.Remove(node);