Skip to content

.NET: [Bug]: Checkpoint's EdgeStateData cannot reflect real state #6625

@jiangjinnan

Description

@jiangjinnan

Description

If a workflow contains a fan-in edge, the EdgeStateData of the created Checkpoint shuld refelect the current state: Unseen source executors and Pending message. However, it will never change. This is a big bug, and will lead to subsequent supersteps can not replay.

Code Sample

using Microsoft.Agents.AI.Workflows;

var executors = new string[] { "foo", "bar", "baz", "qux", "quux" }
.ToDictionary(it => it, it => new SimpleExecutor(it));

var workflow = new WorkflowBuilder(executors["foo"])
    .AddFanOutEdge(executors["foo"], [executors["bar"], executors["baz"]],label:"fan-out")
    .AddEdge(executors["bar"], executors["qux"])
    .AddFanInBarrierEdge([executors["baz"], executors["qux"]], executors["quux"])
    .Build();

var checkpointManager = CheckpointManager.CreateInMemory();
var run = await InProcessExecution.Default
    .WithCheckpointing(checkpointManager)
    .RunStreamingAsync(workflow, "start");

for (var index = -1; index < 2; index++)
{
    if (index == -1)
    {
        Console.WriteLine($"{new string('-', 10)}Direct run{new string('-', 10)}");
        await run.RunToCompletionAsync();
        continue;
    }

    Console.WriteLine($"{new string('-', 10)}Restore from Checkpoints[{index}]{new string('-', 10)}");
    await run.RestoreCheckpointAsync(run.Checkpoints[index]);
    await run.RunToCompletionAsync();
}

internal partial class SimpleExecutor(string id) : Executor(id)
{
    [MessageHandler]
    public async ValueTask<string> HandleAsync(string input, IWorkflowContext context)
    {
        await Task.Delay(10);
        Console.WriteLine($"Executor {Id} is invoked");
        await context.QueueStateUpdateAsync(key: $"Is{Id}Invoked", value: true, scopeName: "tracking");
        return Id;
    }
}

Error Messages / Stack Traces

----------Direct run----------
Executor foo is invoked
Executor bar is invoked
Executor baz is invoked
Executor qux is invoked
Executor quux is invoked
Executor quux is invoked
----------Restore from Checkpoints[0]----------
Executor bar is invoked
Executor baz is invoked
Executor qux is invoked
Executor quux is invoked
Executor quux is invoked
----------Restore from Checkpoints[1]----------
Executor qux is invoked

Package Versions

Microsoft.Agents.AI.Workflows 1.9.0

.NET Version

.net 10.0

Additional Context

I output the four checkpoints via reflection generated in the above sample:

------------------------------Checkpoint[0]------------------------------
 IsInitial: False
 StepNumber: 0
 RunnerData: <RunnerStateData>
   InstantiatedExecutors: [bar, foo, baz]
   QueuedMessages: <Dictionary<String, List<PortableMessageEnvelope>>>
      Key: bar
      Value: <List<PortableMessageEnvelope>>
         MessageType: System.String
         Message: foo
         Source: foo
         TargetId: null
      Key: baz
      Value: <List<PortableMessageEnvelope>>
         MessageType: System.String
         Message: foo
         Source: foo
         TargetId: null
   OutstandingRequests: []
 StateData: <Dictionary<ScopeKey, PortableValue>>
    Key: foo/tracking/IsfooInvoked
    Value: True
 EdgeStateData: <Dictionary<EdgeId, PortableValue>>
    Key: 3
    Value: <PortableValue>
      SourceIds: [baz, qux]
      Unseen: [baz, qux]
      PendingMessages: []
      TypeId: Microsoft.Agents.AI.Workflows.Execution.FanInEdgeState
 Parent: null

------------------------------Checkpoint[1]------------------------------
 IsInitial: False
 StepNumber: 1
 RunnerData: <RunnerStateData>
   InstantiatedExecutors: [bar, foo, baz, qux]
   QueuedMessages: <Dictionary<String, List<PortableMessageEnvelope>>>
      Key: qux
      Value: <List<PortableMessageEnvelope>>
         MessageType: System.String
         Message: bar
         Source: bar
         TargetId: null
   OutstandingRequests: []
 StateData: <Dictionary<ScopeKey, PortableValue>>
    Key: foo/tracking/IsfooInvoked
    Value: True
    Key: foo/tracking/IsbarInvoked
    Value: True
    Key: foo/tracking/IsbazInvoked
    Value: True
 EdgeStateData: <Dictionary<EdgeId, PortableValue>>
    Key: 3
    Value: <PortableValue>
      SourceIds: [baz, qux]
      Unseen: [baz, qux]
      PendingMessages: []
      TypeId: Microsoft.Agents.AI.Workflows.Execution.FanInEdgeState
 Parent: <CheckpointInfo>
   SessionId: e8a3dd4385384e589271758a1435ab58
   CheckpointId: fc30a48fe01e4eea83ed794df37b537a

------------------------------Checkpoint[2]------------------------------
 IsInitial: False
 StepNumber: 2
 RunnerData: <RunnerStateData>
   InstantiatedExecutors: [bar, foo, baz, quux, qux]
   QueuedMessages: <Dictionary<String, List<PortableMessageEnvelope>>>
      Key: quux
      Value: <List<PortableMessageEnvelope>>
         MessageType: System.String
         Message: baz
         Source: baz
         TargetId: null
         MessageType: System.String
         Message: qux
         Source: qux
         TargetId: null
   OutstandingRequests: []
 StateData: <Dictionary<ScopeKey, PortableValue>>
    Key: foo/tracking/IsfooInvoked
    Value: True
    Key: foo/tracking/IsbarInvoked
    Value: True
    Key: foo/tracking/IsbazInvoked
    Value: True
    Key: foo/tracking/IsquxInvoked
    Value: True
 EdgeStateData: <Dictionary<EdgeId, PortableValue>>
    Key: 3
    Value: <PortableValue>
      SourceIds: [baz, qux]
      Unseen: [baz, qux]
      PendingMessages: []
      TypeId: Microsoft.Agents.AI.Workflows.Execution.FanInEdgeState
 Parent: <CheckpointInfo>
   SessionId: e8a3dd4385384e589271758a1435ab58
   CheckpointId: cd63bfae5da74893a4acbbed74e196b9

------------------------------Checkpoint[3]------------------------------
 IsInitial: False
 StepNumber: 3
 RunnerData: <RunnerStateData>
   InstantiatedExecutors: [bar, foo, baz, quux, qux]
   QueuedMessages: []
   OutstandingRequests: []
 StateData: <Dictionary<ScopeKey, PortableValue>>
    Key: foo/tracking/IsfooInvoked
    Value: True
    Key: foo/tracking/IsbarInvoked
    Value: True
    Key: foo/tracking/IsbazInvoked
    Value: True
    Key: foo/tracking/IsquxInvoked
    Value: True
    Key: foo/tracking/IsquuxInvoked
    Value: True
 EdgeStateData: <Dictionary<EdgeId, PortableValue>>
    Key: 3
    Value: <PortableValue>
      SourceIds: [baz, qux]
      Unseen: [baz, qux]
      PendingMessages: []
      TypeId: Microsoft.Agents.AI.Workflows.Execution.FanInEdgeState
 Parent: <CheckpointInfo>
   SessionId: e8a3dd4385384e589271758a1435ab58
   CheckpointId: 9420e8db25764828b0c1fcc900c5cd34

Metadata

Metadata

Assignees

No one assigned

    Labels

    .NETIssues related to the .NET codebasereproducedSpecified when the automated triage workflow successfully reproduces a bug reporttriagePlaced on an issue or discussion that requires a maintainer to triage the item

    Type

    No fields configured for Bug.

    Projects

    Status
    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions