Description
If a workflow contains a fan-in edge, the EdgeStateData of the created Checkpoint shuld refelect the current state: Unseen source executors and Pending message. However, it will never change. This is a big bug, and will lead to subsequent supersteps can not replay.
Code Sample
using Microsoft.Agents.AI.Workflows;
var executors = new string[] { "foo", "bar", "baz", "qux", "quux" }
.ToDictionary(it => it, it => new SimpleExecutor(it));
var workflow = new WorkflowBuilder(executors["foo"])
.AddFanOutEdge(executors["foo"], [executors["bar"], executors["baz"]],label:"fan-out")
.AddEdge(executors["bar"], executors["qux"])
.AddFanInBarrierEdge([executors["baz"], executors["qux"]], executors["quux"])
.Build();
var checkpointManager = CheckpointManager.CreateInMemory();
var run = await InProcessExecution.Default
.WithCheckpointing(checkpointManager)
.RunStreamingAsync(workflow, "start");
for (var index = -1; index < 2; index++)
{
if (index == -1)
{
Console.WriteLine($"{new string('-', 10)}Direct run{new string('-', 10)}");
await run.RunToCompletionAsync();
continue;
}
Console.WriteLine($"{new string('-', 10)}Restore from Checkpoints[{index}]{new string('-', 10)}");
await run.RestoreCheckpointAsync(run.Checkpoints[index]);
await run.RunToCompletionAsync();
}
internal partial class SimpleExecutor(string id) : Executor(id)
{
[MessageHandler]
public async ValueTask<string> HandleAsync(string input, IWorkflowContext context)
{
await Task.Delay(10);
Console.WriteLine($"Executor {Id} is invoked");
await context.QueueStateUpdateAsync(key: $"Is{Id}Invoked", value: true, scopeName: "tracking");
return Id;
}
}
Error Messages / Stack Traces
----------Direct run----------
Executor foo is invoked
Executor bar is invoked
Executor baz is invoked
Executor qux is invoked
Executor quux is invoked
Executor quux is invoked
----------Restore from Checkpoints[0]----------
Executor bar is invoked
Executor baz is invoked
Executor qux is invoked
Executor quux is invoked
Executor quux is invoked
----------Restore from Checkpoints[1]----------
Executor qux is invoked
Package Versions
Microsoft.Agents.AI.Workflows 1.9.0
.NET Version
.net 10.0
Additional Context
I output the four checkpoints via reflection generated in the above sample:
------------------------------Checkpoint[0]------------------------------
IsInitial: False
StepNumber: 0
RunnerData: <RunnerStateData>
InstantiatedExecutors: [bar, foo, baz]
QueuedMessages: <Dictionary<String, List<PortableMessageEnvelope>>>
Key: bar
Value: <List<PortableMessageEnvelope>>
MessageType: System.String
Message: foo
Source: foo
TargetId: null
Key: baz
Value: <List<PortableMessageEnvelope>>
MessageType: System.String
Message: foo
Source: foo
TargetId: null
OutstandingRequests: []
StateData: <Dictionary<ScopeKey, PortableValue>>
Key: foo/tracking/IsfooInvoked
Value: True
EdgeStateData: <Dictionary<EdgeId, PortableValue>>
Key: 3
Value: <PortableValue>
SourceIds: [baz, qux]
Unseen: [baz, qux]
PendingMessages: []
TypeId: Microsoft.Agents.AI.Workflows.Execution.FanInEdgeState
Parent: null
------------------------------Checkpoint[1]------------------------------
IsInitial: False
StepNumber: 1
RunnerData: <RunnerStateData>
InstantiatedExecutors: [bar, foo, baz, qux]
QueuedMessages: <Dictionary<String, List<PortableMessageEnvelope>>>
Key: qux
Value: <List<PortableMessageEnvelope>>
MessageType: System.String
Message: bar
Source: bar
TargetId: null
OutstandingRequests: []
StateData: <Dictionary<ScopeKey, PortableValue>>
Key: foo/tracking/IsfooInvoked
Value: True
Key: foo/tracking/IsbarInvoked
Value: True
Key: foo/tracking/IsbazInvoked
Value: True
EdgeStateData: <Dictionary<EdgeId, PortableValue>>
Key: 3
Value: <PortableValue>
SourceIds: [baz, qux]
Unseen: [baz, qux]
PendingMessages: []
TypeId: Microsoft.Agents.AI.Workflows.Execution.FanInEdgeState
Parent: <CheckpointInfo>
SessionId: e8a3dd4385384e589271758a1435ab58
CheckpointId: fc30a48fe01e4eea83ed794df37b537a
------------------------------Checkpoint[2]------------------------------
IsInitial: False
StepNumber: 2
RunnerData: <RunnerStateData>
InstantiatedExecutors: [bar, foo, baz, quux, qux]
QueuedMessages: <Dictionary<String, List<PortableMessageEnvelope>>>
Key: quux
Value: <List<PortableMessageEnvelope>>
MessageType: System.String
Message: baz
Source: baz
TargetId: null
MessageType: System.String
Message: qux
Source: qux
TargetId: null
OutstandingRequests: []
StateData: <Dictionary<ScopeKey, PortableValue>>
Key: foo/tracking/IsfooInvoked
Value: True
Key: foo/tracking/IsbarInvoked
Value: True
Key: foo/tracking/IsbazInvoked
Value: True
Key: foo/tracking/IsquxInvoked
Value: True
EdgeStateData: <Dictionary<EdgeId, PortableValue>>
Key: 3
Value: <PortableValue>
SourceIds: [baz, qux]
Unseen: [baz, qux]
PendingMessages: []
TypeId: Microsoft.Agents.AI.Workflows.Execution.FanInEdgeState
Parent: <CheckpointInfo>
SessionId: e8a3dd4385384e589271758a1435ab58
CheckpointId: cd63bfae5da74893a4acbbed74e196b9
------------------------------Checkpoint[3]------------------------------
IsInitial: False
StepNumber: 3
RunnerData: <RunnerStateData>
InstantiatedExecutors: [bar, foo, baz, quux, qux]
QueuedMessages: []
OutstandingRequests: []
StateData: <Dictionary<ScopeKey, PortableValue>>
Key: foo/tracking/IsfooInvoked
Value: True
Key: foo/tracking/IsbarInvoked
Value: True
Key: foo/tracking/IsbazInvoked
Value: True
Key: foo/tracking/IsquxInvoked
Value: True
Key: foo/tracking/IsquuxInvoked
Value: True
EdgeStateData: <Dictionary<EdgeId, PortableValue>>
Key: 3
Value: <PortableValue>
SourceIds: [baz, qux]
Unseen: [baz, qux]
PendingMessages: []
TypeId: Microsoft.Agents.AI.Workflows.Execution.FanInEdgeState
Parent: <CheckpointInfo>
SessionId: e8a3dd4385384e589271758a1435ab58
CheckpointId: 9420e8db25764828b0c1fcc900c5cd34
Description
If a workflow contains a fan-in edge, the EdgeStateData of the created Checkpoint shuld refelect the current state: Unseen source executors and Pending message. However, it will never change. This is a big bug, and will lead to subsequent supersteps can not replay.
Code Sample
Error Messages / Stack Traces
Package Versions
Microsoft.Agents.AI.Workflows 1.9.0
.NET Version
.net 10.0
Additional Context
I output the four checkpoints via reflection generated in the above sample: