Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace DurableTask.AzureStorage.Tests
using System.Threading.Tasks;
using DurableTask.AzureStorage.Messaging;
using DurableTask.AzureStorage.Monitoring;
using DurableTask.AzureStorage.Storage;
using DurableTask.AzureStorage.Tracking;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
Expand Down Expand Up @@ -223,5 +224,82 @@ public void AbortAllSessions_NoSessions_DoesNotThrow()
manager.GetStats(out _, out _, out int count);
Assert.AreEqual(0, count, "Should still have no active sessions");
}

[TestMethod]
public async Task GetNextSessionAsync_DrainedReadyQueueNode_IsIgnored()
{
var settings = new AzureStorageOrchestrationServiceSettings
{
StorageAccountClientProvider = new StorageAccountClientProvider("UseDevelopmentStorage=true"),
};
var stats = new AzureStorageOrchestrationServiceStats();
Comment on lines +228 to +235
var trackingStore = new Mock<ITrackingStore>();

using var manager = new OrchestrationSessionManager(
"testaccount",
settings,
stats,
trackingStore.Object);

var storageClient = new AzureStorageClient(settings);
var messageManager = new MessageManager(settings, storageClient, settings.TaskHubName);
var controlQueue = new ControlQueue(storageClient, "partition-0", messageManager);

object pendingBatch = CreatePendingBatch(controlQueue);
object node = AddPendingBatchNode(manager, pendingBatch);
RemovePendingBatchNode(manager, node);
EnqueueReadyForProcessingNode(manager, node);

using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
try
{
await manager.GetNextSessionAsync(entitiesOnly: false, cts.Token);
Assert.Fail("Expected cancellation after the drained node was skipped.");
}
catch (OperationCanceledException)
{
}
Comment on lines +259 to +261
Comment on lines +253 to +261
}

static object CreatePendingBatch(ControlQueue controlQueue)
{
Type pendingBatchType = typeof(OrchestrationSessionManager)
.GetNestedType("PendingMessageBatch", BindingFlags.NonPublic);

return Activator.CreateInstance(
pendingBatchType,
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic,
binder: null,
args: new object[] { controlQueue, "instance1", "execution1" },
culture: null);
}

static object AddPendingBatchNode(OrchestrationSessionManager manager, object pendingBatch)
{
object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches");
MethodInfo addLast = pendingBatches.GetType().GetMethod("AddLast", new[] { pendingBatch.GetType() });
return addLast.Invoke(pendingBatches, new[] { pendingBatch });
}

static void RemovePendingBatchNode(OrchestrationSessionManager manager, object node)
{
object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches");
MethodInfo remove = pendingBatches.GetType().GetMethod("Remove", new[] { node.GetType() });
remove.Invoke(pendingBatches, new[] { node });
}

static void EnqueueReadyForProcessingNode(OrchestrationSessionManager manager, object node)
{
object readyQueue = GetPrivateField(manager, "orchestrationsReadyForProcessingQueue");
MethodInfo enqueue = readyQueue.GetType().GetMethod("Enqueue");
enqueue.Invoke(readyQueue, new[] { node });
}

static object GetPrivateField(object target, string fieldName)
{
FieldInfo field = target.GetType().GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance);
Assert.IsNotNull(field);
return field.GetValue(target);
}
}
}
45 changes: 45 additions & 0 deletions src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace DurableTask.AzureStorage.Messaging
using DurableTask.AzureStorage.Monitoring;
using DurableTask.AzureStorage.Partitioning;
using DurableTask.AzureStorage.Storage;
using DurableTask.Core;

class ControlQueue : TaskHubQueue, IDisposable
{
Expand Down Expand Up @@ -209,6 +210,50 @@ public override Task AbandonMessageAsync(MessageData message, SessionBase? sessi
return base.AbandonMessageAsync(message, session);
}

/// <summary>
/// Abandons a message with zero visibility timeout so it becomes immediately visible
/// for another partition owner to pick up. This is used during drain to avoid stranding
/// messages that were dequeued but not yet promoted to active sessions.
/// </summary>
public async Task AbandonMessageForDrainAsync(MessageData message)
{
this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _);

QueueMessage queueMessage = message.OriginalQueueMessage;
TaskMessage taskMessage = message.TaskMessage;
OrchestrationInstance instance = taskMessage.OrchestrationInstance;

this.settings.Logger.AbandoningMessage(
this.storageAccountName,
this.settings.TaskHubName,
taskMessage.Event.EventType.ToString(),
Utils.GetTaskEventId(taskMessage.Event),
queueMessage.MessageId,
instance.InstanceId,
instance.ExecutionId,
this.storageQueue.Name,
message.SequenceNumber,
queueMessage.PopReceipt,
visibilityTimeoutSeconds: 0);

try
{
await this.storageQueue.UpdateMessageAsync(
queueMessage,
TimeSpan.Zero,
clientRequestId: null);
}
catch (Exception e)
{
this.settings.Logger.PartitionManagerWarning(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
this.Name,
$"Failed to abandon message {queueMessage.MessageId} during drain: {e.Message}");
}
Comment on lines +246 to +254
}

public override Task DeleteMessageAsync(MessageData message, SessionBase? session = null)
{
this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _);
Expand Down
57 changes: 56 additions & 1 deletion src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,60 @@ public async Task DrainAsync(string partitionId, CloseReason reason, Cancellatio
}
finally
{
// Remove the partition from memory
// Make dequeued-but-undispatched messages visible before dropping the partition.
await this.AbandonPendingMessagesAsync(partitionId);

this.RemoveQueue(partitionId, reason, caller);
}
}

/// <summary>
/// Abandons all pending (dequeued but not yet dispatched) messages for the specified partition,
/// making them immediately visible in the Azure Storage queue for the new partition owner.
/// This prevents a throughput gap equal to the visibility timeout duration when a partition
/// is released during drain.
/// </summary>
async Task AbandonPendingMessagesAsync(string partitionId)
{
var messagesToAbandon = new List<(ControlQueue Queue, MessageData Message)>();

lock (this.messageAndSessionLock)
{
var node = this.pendingOrchestrationMessageBatches.First;
while (node != null)
{
LinkedListNode<PendingMessageBatch>? next = node.Next;
PendingMessageBatch batch = node.Value;

if (string.Equals(batch.ControlQueue.Name, partitionId, StringComparison.OrdinalIgnoreCase))
{
foreach (MessageData message in batch.Messages)
{
messagesToAbandon.Add((batch.ControlQueue, message));
}

this.pendingOrchestrationMessageBatches.Remove(node);
}

node = next;
}
}

if (messagesToAbandon.Count > 0)
{
this.settings.Logger.PartitionManagerInfo(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Abandoning {messagesToAbandon.Count} pending message(s) during drain to make them immediately visible for the new partition owner.");

await messagesToAbandon.ParallelForEachAsync(
this.settings.MaxStorageOperationConcurrency,
item => item.Queue.AbandonMessageForDrainAsync(item.Message));
}
}

/// <summary>
/// This method enumerates all the provided queue messages looking for ExecutionStarted messages. If any are found, it
/// queries table storage to ensure that each message has a matching record in the Instances table. If not, this method
Expand Down Expand Up @@ -592,6 +641,12 @@ async Task ScheduleOrchestrationStatePrefetch(

lock (this.messageAndSessionLock)
{
// Drain may have removed this batch after it was queued for dispatch.
if (node.List != this.pendingOrchestrationMessageBatches)
{
continue;
}

PendingMessageBatch nextBatch = node.Value;
this.pendingOrchestrationMessageBatches.Remove(node);

Expand Down
Loading