Spiga

Net+AI智能体9:Workflow基础篇

2025-11-29 19:00:55

一、快速开始

1. 什么是 Workflow Orchestration

在企业级 AI 应用开发中,我们经常面临以下挑战:

  • 单个 Agent 难以处理复杂任务:一个 Agent 无法同时擅长需求分析、代码生成、测试等多个领域
  • 业务逻辑与 AI 调用耦合:复杂的条件判断、循环、并发控制分散在代码各处
  • 流程难以可视化和维护:多步骤的 AI 处理流程缺乏清晰的结构
  • 缺乏统一的状态管理:多个步骤之间的数据传递和状态共享容易出错

Workflow Orchestration (工作流编排) 正是为了解决这些问题而生:

  • 模块化设计:将复杂任务拆分为独立的 Executor 和 Agent
  • 清晰的流程定义:使用 Builder API 构建可读、可维护的流程图
  • 灵活的流程控制:支持条件分支、循环迭代、并发执行等复杂模式
  • 统一的状态管理:内置 WorkflowContext 管理跨步骤的状态和数据
  • 实时流式反馈:通过事件机制实时监控工作流的执行进度

MAF Workflow 位于应用层和 Agent 层之间,负责:

  • 编排多个 Agent:决定 Agent 的执行顺序、条件和并发策略
  • 管理数据流:在 Agent 和 Executor 之间传递数据
  • 监控执行状态:实时报告工作流的执行进度和结果
  • 错误处理:统一处理执行过程中的异常和重试逻辑

2. 第一个工作流:文本处理管道

业务场景:将用户输入的文本 → 转为大写 → 反转顺序

  • 步骤 1:定义第一个 Executor - 大写转换。

    • 继承 Executor<TInput, TOutput>:

      • TInput = string:接收字符串输入

      • TOutput = string:返回字符串输出

      • 构造函数传入 "UppercaseExecutor" 作为唯一标识符

    • 实现 HandleAsync 方法:

      • message:接收上一个步骤传递的数据 (对于第一个 Executor,是工作流的输入)

      • context:工作流上下文,用于访问共享状态、发布事件等 (后续课程详解)

      • cancellationToken:用于取消操作

      • 返回值:会自动作为消息沿着 Edge 传递给下一个 Executor

    • 业务逻辑:

      • 使用 ToUpperInvariant() 进行文化无关的大写转换

      • 返回 ValueTask (高性能异步模式)

/// <summary>
/// 大写转换执行器 - 将输入文本转换为大写
/// </summary>
public class UppercaseExecutor : Executor<string, string>
{
    public UppercaseExecutor() : base("UppercaseExecutor") { }

    /// <summary>
    /// 核心处理方法 - 将输入文本转为大写
    /// </summary>
    public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // 执行业务逻辑: 转为大写
        string result = message.ToUpperInvariant();
        
        // 返回处理结果 (会自动沿着 Edge 传递给下一个 Executor)
        return ValueTask.FromResult(result);
    }
}
  • 步骤 2:定义第二个 Executor - 文本反转
/// <summary>
/// 文本反转执行器 - 将输入文本反转
/// </summary>
public class ReverseTextExecutor : Executor<string, string>
{
    public ReverseTextExecutor() : base("ReverseTextExecutor") { }

    /// <summary>
    /// 核心处理方法 - 反转输入文本
    /// </summary>
    public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // 执行业务逻辑: 反转字符串
        string result = string.Concat(message.Reverse());
        
        // 返回处理结果
        return ValueTask.FromResult(result);
    }
  • 步骤 3: 使用 WorkflowBuilder 构建工作流:将两个 Executor 通过 Edge 连接起来,组成完整的工作流。

    • new WorkflowBuilder(uppercaseExecutor):

      • 创建 WorkflowBuilder 并指定起始 Executor

      • 工作流的输入数据会首先传递给这个 Executor

    • AddEdge(source, target):

      • 添加一条有向边 (Edge),定义数据流动方向

      • source 的输出会自动传递给 target 的输入

      • 类型检查:source 的 TOutput 必须匹配 target 的 TInput

    • WithOutputFrom(executor):

      • 指定工作流的最终输出来自哪个 Executor

      • 如果不调用此方法,默认输出来自最后一个 Executor

    • Build():

      • 完成构建,返回一个不可变的 Workflow 对象

      • 此时会进行拓扑验证:检查是否有循环依赖、孤立节点等

// 创建 Executor 实例
UppercaseExecutor uppercaseExecutor = new UppercaseExecutor();
ReverseTextExecutor reverseExecutor = new ReverseTextExecutor();

// 使用 WorkflowBuilder 构建工作流
WorkflowBuilder builder = new WorkflowBuilder(uppercaseExecutor); // 指定起始 Executor
builder.AddEdge(uppercaseExecutor, reverseExecutor);              // 添加边: uppercase → reverse
builder.WithOutputFrom(reverseExecutor);                          // 指定输出来自 reverse

// 构建最终的工作流对象
Workflow workflow = builder.Build();
Console.WriteLine("工作流构建完成");
new 
{ 
    起始节点 = "UppercaseExecutor",
    边 = "UppercaseExecutor → ReverseTextExecutor",
    输出节点 = "ReverseTextExecutor"
}.Display();
  • 步骤 4: 同步执行工作流

    • InProcessExecution.RunAsync(workflow, input):

      • 在当前进程内执行工作流 (同步等待完成)

      • input 参数会传递给起始 Executor

      • 返回 Run 对象,包含执行结果和事件

    • Run.NewEvents:

      • 获取工作流执行过程中产生的所有事件

      • 主要事件类型:

        • WorkflowStartedEvent:工作流启动

        • ExecutorCompletedEvent:某个 Executor 执行完成

        • WorkflowCompletedEvent:工作流完成

    • ExecutorCompletedEvent:

      • ExecutorId:执行器的唯一标识符

      • Data:执行器的输出数据 (object 类型,需要转换)

      • 事件按执行顺序排列

// 执行工作流 - 同步模式
string input = "Hello, World!";
Console.WriteLine($"输入: {input}");
Console.WriteLine("开始执行工作流...\n");
await using (Run run = await InProcessExecution.RunAsync(workflow, input))
{
    // 处理执行结果
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine("执行结果:\n");
    foreach (WorkflowEvent evt in run.NewEvents)
    {
        if (evt is ExecutorCompletedEvent executorCompleted)
        {
            new
            {
                执行器 = executorCompleted.ExecutorId,
                输出数据 = executorCompleted.Data
            }.Display();
            Console.WriteLine();
        }
    }
    Console.WriteLine("工作流执行完成");
}

3. 流式执行

流式执行允许我们在工作流运行过程中实时接收事件流,而不是等待全部完成。

sequenceDiagram
    participant App as 应用代码
    participant WF as Workflow
    participant E1 as UppercaseExecutor
    participant E2 as ReverseTextExecutor

    App->>WF: StreamAsync(input)
    WF->>E1: 执行
    E1-->>App: ExecutorCompletedEvent (立即返回)
    WF->>E2: 执行
    E2-->>App: ExecutorCompletedEvent (立即返回)
    WF-->>App: WorkflowCompletedEvent

对比:

特性 同步执行 (RunAsync) 流式执行 (StreamAsync)
执行模式 阻塞等待全部完成 返回异步流 (IAsyncEnumerable)
事件获取 一次性获取所有事件 实时逐个接收事件
适用场景 短时间任务、批处理 长时间任务、实时反馈
用户体验 等待后一次性显示结果 逐步显示进度
  • 步骤 5:使用 StreamAsync 实现流式执行

    • InProcessExecution.StreamAsync(workflow, input):

      • 启动流式执行,立即返回 StreamingRun 对象

      • 不会阻塞等待工作流完成

    • streamingRun.WatchStreamAsync():

      • 返回 IAsyncEnumerable 异步流

      • 使用 await foreach 实时迭代事件

      • 每当一个 Executor 完成,就会产生一个新的 ExecutorCompletedEvent

    • 时间戳 DateTime.Now:

      • 演示事件是逐个实时产生的

      • 在实际的长时间任务中,你会看到明显的时间间隔

    • 主要事件类型:

      • WorkflowStartedEvent:工作流开始

      • ExecutorCompletedEvent:单个 Executor 完成

      • WorkflowOutputEvent:整个工作流完成

// 执行工作流 - 流式模式
string streamingInput = "Workflow Streaming!";
Console.WriteLine($"输入: {streamingInput}");
Console.WriteLine("开始流式执行工作流...\n");
await using (StreamingRun streamingRun = await InProcessExecution.StreamAsync(workflow, streamingInput))
{
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine("实时执行进度:\n");
    // 实时监听事件流
    await foreach (WorkflowEvent evt in streamingRun.WatchStreamAsync())
    {
        if (evt is ExecutorCompletedEvent executorCompleted)
        {
            Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {executorCompleted.ExecutorId} 完成");
            new
            {
                输出数据 = executorCompleted.Data
            }.Display();
            Console.WriteLine();
        }
        else if (evt is WorkflowStartedEvent)
        {
            Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 工作流启动\n");
        }
        else if (evt is WorkflowOutputEvent)
        {
            Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 工作流完成");
            Console.WriteLine($"工作流输出:{evt.Data}");
        }
    }
}

4. 简化写法

使用 Lambda 表达式创建 Executor:对于简单的转换逻辑,每次都定义完整的 Executor 类显得过于繁琐。MAF 提供了 BindAsExecutor 扩展方法,允许我们将普通函数绑定为 Executor。

// 使用 Lambda 表达式定义转换逻辑
Func<string, string> toUpperFunc = input => input.ToUpperInvariant();
Func<string, string> reverseFunc = input => string.Concat(input.Reverse());

// 将函数绑定为 Executor
var upperExecutor = toUpperFunc.BindAsExecutor("UpperExecutor");
var reverseExecutor2 = reverseFunc.BindAsExecutor("ReverseExecutor");

// 构建工作流 (与之前相同的流程)
WorkflowBuilder lambdaBuilder = new WorkflowBuilder(upperExecutor);
lambdaBuilder.AddEdge(upperExecutor, reverseExecutor2);
lambdaBuilder.WithOutputFrom(reverseExecutor2);
Workflow lambdaWorkflow = lambdaBuilder.Build();

代码解析:

  • BindAsExecutor(name):

    • 将 Func<TInput, TOutput> 绑定为 Executor<TInput, TOutput>

    • 参数 name:Executor 的唯一标识符

    • 限制:只适用于同步、无副作用的纯函数转换

  • 适用场景:

    • 数据格式转换 (大写、小写、Trim)

    • 简单计算 (字符串拼接、数学运算)

    • 数据映射 (DTO 转换)

  • 不适用场景

    • 需要访问 IWorkflowContext 的逻辑

    • 需要异步操作 (如数据库查询、API调用)

    • 复杂的状态管理

建议:

  • 简单转换用 Lambda
  • 复杂逻辑用完整的 Executor 类

二、集成 Agent 到工作流

1. 为什么要 Agent in Workflow

在工作流中,我们有两种主要的执行单元:

特性 Executor Agent
定义 同步/异步的业务逻辑处理单元 基于 LLM 的智能对话式处理单元
适用场景 确定性逻辑(格式化、验证、计算) 需要智能判断、生成、理解的任务
输入输出 任意类型 (TInput/TOutput) ChatMessage
执行方式 立即执行 需要 TurnToken 触发
成本 几乎无成本 LLM API 调用成本
可预测性 100% 可预测 基于模型能力,存在不确定性

将 Agent 集成到工作流的优势:

  • 自动化数据流:工作流引擎自动管理 Agent 之间的消息传递
  • 统一监控:通过 WorkflowEvent 统一监听所有 Agent 的执行状态
  • 灵活组合:Agent 可以与 Executor 自由组合,构建复杂业务逻辑
  • 错误处理:工作流级别的异常处理和重试机制
  • 可视化:清晰的流程图,便于理解和维护

2. 将 Agent 添加到工作流

  • 创建一个简单的翻译 Agent
// 创建一个法语翻译 Agent
var frenchAgent = new ChatClientAgent(
    chatClient, 
    "You are a translation assistant that translates the provided text to French."
);
  • 将 Agent 添加到工作流
// 构建包含单个 Agent 的工作流
var simpleWorkflow = new WorkflowBuilder(frenchAgent)
    .Build();
  • 执行工作流 - TurnToken 机制详解

关键概念:当 Agent 被用作工作流步骤时,它不会立即执行。需要通过 TurnToken 来触发 Agent 的处理。

sequenceDiagram
    participant User as 用户
    participant Workflow as 工作流引擎
    participant Agent as Agent (Executor)
    participant LLM as LLM
    
    User->>Workflow: StreamAsync(input)
    Workflow->>Agent: 传递 ChatMessage
    Note over Agent: 消息被缓存,等待触发
    
    User->>Workflow: TrySendMessageAsync(TurnToken)
    Workflow->>Agent: 传递 TurnToken
    Agent->>LLM: 发送请求
    LLM-->>Agent: 返回响应
    Agent-->>Workflow: AgentRunUpdateEvent
    Workflow-->>User: 流式输出事件
// 执行工作流
Console.WriteLine("开始执行工作流...");
Console.WriteLine();
var input = new ChatMessage(ChatRole.User, "Hello World!");
await using (StreamingRun run = await InProcessExecution.StreamAsync(simpleWorkflow, input))
{
    Console.WriteLine("发送 TurnToken 触发 Agent...");

    // 关键步骤:发送 TurnToken 来触发 Agent 执行
    await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

    Console.WriteLine();
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine("监听工作流事件...");
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine();

    await foreach (WorkflowEvent evt in run.WatchStreamAsync())
    {
        if (evt is AgentRunUpdateEvent agentUpdate)
        {
            Console.WriteLine($"Agent 输出: {agentUpdate.ExecutorId}");
            Console.WriteLine($"内容: {agentUpdate.Data}");
            Console.WriteLine();
        }
    }
}

3. 关键概念解析

  • 为什么需要 TurnToken?

    在对话式 AI 系统中,Agent 可能需要等待多条消息(如用户输入、工具调用结果)后再统一处理。TurnToken 作为一个 的信号,告诉 Agent:

    • "所有输入已就绪,可以开始处理了"

    • "这是一个完整的对话回合"

  • TurnToken 的参数

    • emitEvents:是否发出执行事件(如 AgentRunUpdateEvent

    • 设置为 true 可以实时监控 Agent 的输出

new TurnToken(emitEvents: true)
  • AgentRunUpdateEvent

    这是 Agent 在工作流中执行时发出的事件:

public class AgentRunUpdateEvent : WorkflowEvent
{
    public string ExecutorId { get; }    // Agent 的 ID
    public object Data { get; }          // Agent 的输出内容
    public string Status { get; }        // 执行状态
}

4. 实战:翻译链

让我们构建一个更有趣的案例:将英文依次翻译为法语、西班牙语,最后再翻译回英语,看看经过三次翻译后内容会有什么变化。

  • 创建三个翻译 Agent
// 定义一个辅助方法来创建翻译 Agent
AIAgent CreateTranslationAgent(string targetLanguage, IChatClient client)
{
    return new ChatClientAgent(
        client,
        $"You are a translation assistant that translates the provided text to {targetLanguage}."
    );
}
// 创建三个翻译 Agent
var frenchTranslator = CreateTranslationAgent("French", chatClient);
var spanishTranslator = CreateTranslationAgent("Spanish", chatClient);
var englishTranslator = CreateTranslationAgent("English", chatClient);
  • 构建翻译链工作流:使用 AddEdge 将三个 Agent 顺序连接
// 构建工作流:English → French → Spanish → English
var translationChainWorkflow = new WorkflowBuilder(frenchTranslator)
    .AddEdge(frenchTranslator, spanishTranslator)
    .AddEdge(spanishTranslator, englishTranslator)
    .Build();
  • 执行翻译链
Console.WriteLine("开始执行翻译链工作流...");
Console.WriteLine();
var inputMessage = new ChatMessage(ChatRole.User, "Artificial Intelligence is transforming the world!");
Console.WriteLine($"原始输入: {inputMessage.Text}");
Console.WriteLine();

await using (StreamingRun chainRun = await InProcessExecution.StreamAsync(translationChainWorkflow, inputMessage)){
    // 发送 TurnToken 触发所有 Agent
    await chainRun.TrySendMessageAsync(new TurnToken(emitEvents: true));
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine("翻译过程跟踪");
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine();
    int stepNumber = 1;
    await foreach (WorkflowEvent evt in chainRun.WatchStreamAsync())
    {
        if (evt is AgentRunUpdateEvent agentEvent)
        {
            Console.WriteLine($"Step {stepNumber}: {agentEvent.ExecutorId}");
            Console.WriteLine($"   翻译结果: {agentEvent.Data}");
            Console.WriteLine();
            stepNumber++;
        }
    }
    Console.WriteLine("翻译链执行完成");
}

观察与分析

通过上面的输出,会发现:

  • 顺序执行:三个 Agent 按照定义的顺序依次执行
  • 自动传递:每个 Agent 的输出自动成为下一个 Agent 的输入
  • 实时反馈:通过 AgentRunUpdateEvent 可以实时看到每一步的翻译结果
  • 语义变化:经过多次翻译后,原始语义可能会有细微变化(这是 LLM 的特性)

5. 核心概念深入

  • Agent 作为 Executor 的内部机制,当将 AIAgent 添加到工作流时,MAF 会自动将其封装为一个特殊的 Executor。

    关键特性:

    • 消息缓存:Agent Wrapper 会缓存收到的 ChatMessage

    • 等待触发:只有收到 TurnToken 才会调用内部的 Agent.RunAsync()

    • 事件发射:执行过程中会发出 AgentRunUpdateEvent

    • 类型固定:输入输出都是 ChatMessage

  • TurnToken 的作用域:单个 TurnToken 触发所有 Agent,在一个工作流中,只需发送一次 TurnToken,它会触发所有等待中的 Agent。

sequenceDiagram
    participant User
    participant Workflow
    participant Agent1
    participant Agent2
    
    User->>Workflow: TurnToken
    Workflow->>Agent1: 传递 TurnToken
    Agent1->>Agent1: 处理并输出
    Agent1->>Workflow: 输出 (ChatMessage)
    Workflow->>Agent2: 传递输出 + TurnToken
    Agent2->>Agent2: 处理并输出
    Agent2->>Workflow: 输出 (ChatMessage)
  • AgentRunUpdateEvent 详解,这是监控 Agent 执行的核心事件。

    AgentRunUpdateEvent 的用途:

    • 实时进度:显示 Agent 的执行进度给用户
    • 调试:检查每个 Agent 的输入输出
    • 日志记录:记录完整的执行链路
    • 错误诊断:发现哪个 Agent 出现了问题
// 典型的事件处理模式
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentRunUpdateEvent agentEvent)
    {
        // 事件属性
        Console.WriteLine($"Agent ID: {agentEvent.ExecutorId}");
        Console.WriteLine($"数据: {agentEvent.Data}");
        Console.WriteLine($"状态: {agentEvent.Status}");
        Console.WriteLine($"时间戳: {agentEvent.Timestamp}");
    }
}

5. 最佳实践

何时使用 Agent vs Executor

场景 推荐 理由
文本格式化、验证 Executor 确定性逻辑,无需 LLM
内容生成、改写 Agent 需要理解和创造性
数学计算、数据转换 Executor 精确计算,成本低
意图识别、分类 Agent 需要语义理解
API 调用、数据库查询 Executor 确定性操作
多轮对话、推理 Agent 需要上下文理解

Agent 在工作流中的命名规范

  • 推荐的命名模式:

    • 职责导向:名称应反映 Agent 的具体职责
    • 动词+名词:如 translateText, reviewCode
    • 避免泛化:不要使用 agent1, helper 等通用名称
    • 一致性:在同一工作流中保持命名风格一致
// 好的命名 - 清晰描述职责
var frenchTranslator = CreateTranslationAgent("French", client);
var contentReviewer = new ChatClientAgent(client, "Review content for quality");
var codeGenerator = new ChatClientAgent(client, "Generate C# code");

// 不好的命名 - 过于抽象
var agent1 = new ChatClientAgent(...);
var helper = new ChatClientAgent(...);

错误处理策略

在工作流中使用 Agent 时的错误处理:

// 带错误处理的 Agent 工作流执行
async Task<bool> ExecuteWorkflowWithErrorHandlingAsync(Workflow workflow, ChatMessage input, int maxRetries = 3)
{
    for (int attempt = 1; attempt <= maxRetries; attempt++)
    {
        try
        {
            Console.WriteLine($"尝试执行 (第 {attempt} 次)...");            
            await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, input);
            await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
            
            await foreach (WorkflowEvent evt in run.WatchStreamAsync())
            {
                if (evt is AgentRunUpdateEvent agentEvent)
                {
                    // 检查 Agent 输出是否有效
                    if (agentEvent.Data == null || string.IsNullOrWhiteSpace(agentEvent.Data.ToString()))
                    {
                        throw new InvalidOperationException($"Agent {agentEvent.ExecutorId} 返回了空结果");
                    }
                }
            }            
            Console.WriteLine("工作流执行成功");
            return true;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"执行失败: {ex.Message}");
            
            if (attempt < maxRetries)
            {
                var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt)); // 指数退避
                Console.WriteLine($"等待 {delay.TotalSeconds} 秒后重试...");
                await Task.Delay(delay);
            }
            else
            {
                Console.WriteLine($"达到最大重试次数,放弃执行");
                return false;
            }
        }
    }    
    return false;
}

性能优化建议

  • 减少不必要的 Agent 调用
// 不好的做法 - 每次都调用 Agent
var validatorAgent = new ChatClientAgent(client, "Validate input format");

// 好的做法 - 用 Executor 处理格式验证
public class FormatValidatorExecutor : Executor<string, string>
{
    protected override Task<string> HandleAsync(string input, IWorkflowContext context, CancellationToken ct)
    {
        if (string.IsNullOrWhiteSpace(input))
            throw new ArgumentException("输入不能为空");
        return Task.FromResult(input);
    }
}
  • 合理使用 emitEvents
// 生产环境:可以关闭事件以提高性能
await run.TrySendMessageAsync(new TurnToken(emitEvents: false));

// 开发/调试:开启事件以便监控
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
  • Agent 粒度控制

    • 太细:多个简单 Agent 导致过多的 LLM 调用

    • 太粗:单个复杂 Agent 难以复用和维护

    • 适中:每个 Agent 负责一个明确的子任务

三、工作流即 Agent

1. 为什么需要 Workflow as Agent

在实际应用中,我们经常需要将多个 Agent 组成的协作工作流作为一个整体单元来使用:

特性 独立的 Workflow Workflow as Agent
执行方式 StreamAsync() + TurnToken RunAsync() / RunStreamingAsync()
输入类型 ChatMessage + 手动发送 TurnToken ChatMessage / string (自动处理)
输出类型 WorkflowEvent 流 AgentRunResponse
接口 Workflow AIAgent
复用方式 需要手动管理执行流程 作为工具/插件/子Agent

问题:直接使用 Workflow 接口存在以下挑战

  • 调用复杂:需要手动创建 StreamingRun、发送 TurnToken、监听事件
  • 接口不统一:Workflow 和 Agent 使用不同的调用方式
  • 复用困难:无法将 Workflow 直接作为 Agent 的工具
  • 组合受限:无法在其他 Agent 中轻松集成 Workflow

Workflow as Agent 的解决方案

AsAgent() 扩展方法将基于 Agent 的 Workflow 包装为标准的 AIAgent 接口,使其可以像普通 Agent 一样使用

  • 核心优势:
    • 统一接口:使用 AIAgent 接口调用 Workflow
    • 无缝集成:可以作为 Agent 的工具使用
    • 跨协议复用:支持 A2A、MCP 等协议
    • 生态兼容:享受 AIAgent 生态的所有中间件
  • 使用场景
    • 复杂任务封装:将多步骤的业务逻辑封装为可复用的 Agent
    • 跨系统集成:需要在 A2A 协作中使用 Workflow
    • 工具化封装:将 Workflow 作为其他 Agent 的工具
    • 中间件增强:需要对 Workflow 应用 AIAgent 中间件
  • 不建议场景
    • 简单任务:如果只是简单的顺序调用,直接使用 WorkflowBuilder 即可
    • 性能敏感:WorkflowAgent 会增加一层适配开销

2. 创建多 Agent 协助工作流

我们使用前面创建的多Agent 翻译链工作流,现在我们将这个 Workflow 封装为一个 Agent,使其可以像普通 Agent 一样通过 AIAgent 接口调用。

AsAgent() 方法将 Workflow 包装为一个 AIAgent,内部自动处理:

  • 接收 ChatMessage 输入:提取用户消息
  • 自动管理 TurnToken:无需手动发送 TurnToken
  • 返回 AgentRunResponse:将 Workflow 输出包装为标准响应
sequenceDiagram
    participant User as 调用方
    participant WA as Workflow Agent
    participant WF as Workflow
    
    User->>WA: RunAsync(ChatMessage)
    WA->>WA: 内部创建 StreamingRun
    WA->>WF: 自动发送 TurnToken
    WF->>WF: 执行所有 Agent
    WF-->>WA: WorkflowEvent 流
    WA->>WA: 包装为 AgentRunResponse
    WA-->>User: AgentRunResponse

重要限制:ChatProtocol 支持要求

并非所有 Workflow 都能转换为 Agent! AsAgent() 方法有严格的协议要求,只有满足 ChatProtocol 的 Workflow 才能使用 AsAgent(),必须同时支持:

  • 输入类型:List 或 ChatMessage
  • 触发类型:TurnToken(用于触发 Agent 执行)

如果 Workflow 不满足这两个条件,调用 AsAgent() 会抛出异常:

// 错误示例:基于 Executor 的 Workflow
var workflow = new WorkflowBuilder()
    .AddExecutor(new MyCustomExecutor())  // 使用自定义 Executor
    .Build();

// 运行时报错!
var agent = workflow.AsAgent("my-agent", "MyAgent");
// System.InvalidOperationException: 
// Workflow does not support ChatProtocol: 
// At least List<ChatMessage> and TurnToken must be supported as input.

正确做法:使用 AgentWorkflowBuilder,要让 Workflow 支持 AsAgent(),必须使用 AgentWorkflowBuilder 创建基于 Agent 的 Workflow

// 正确:使用 AgentWorkflowBuilder
var workflow = AgentWorkflowBuilder.BuildSequential(
    "my-workflow",
    [agent1, agent2, agent3]  // 使用 AIAgent 实例
);

var workflowAgent = workflow.AsAgent("my-agent", "MyAgent");  // 成功

为什么有这个限制?

核心原因:

  • AIAgent 接口的 RunAsync() 方法接收 ChatMessage 作为输入
  • Agent 通过 TurnToken 触发执行
  • Executor-based Workflow 使用自定义输入类型(如 string),不符合 ChatProtocol 规范
Workflow 类型 输入类型 是否支持 TurnToken 能否转 Agent
Executor-based(WorkflowBuilder) 自定义类型(如 string, MyData) 不支持 不能
Agent-based(AgentWorkflowBuilder) ChatMessage 支持 可以

将 Workflow 转换为 Agent

// 使用 AsAgent() 将 Workflow 转换为 AIAgent
var translationWorkflowAgent = translationWorkflow.AsAgent(
    id: "translation-workflow",
    name: "TranslationChain"
);

3. 测试 Workflow Agent

  • 使用 RunAsync 调用
// 使用 AIAgent 接口调用 Workflow Agent
var userMessage = "The future belongs to those who believe in the beauty of their dreams.";
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("用户输入:");
Console.WriteLine($" '{userMessage}'");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");

// 创建 Thread
var thread = translationWorkflowAgent.GetNewThread();
// 关键:像调用普通 Agent 一样调用 Workflow Agent
var response = await translationWorkflowAgent.RunAsync(userMessage, thread);
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("Workflow Agent 响应:");
foreach (var msg in response.Messages)
{
    if (msg.Role == ChatRole.Assistant)
    {
        Console.WriteLine($" {msg.Text}");
    }
}
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
  • 使用 RunStreamingAsync 流式调用
// 使用流式调用
var streamingMessage = "Technology is best when it brings people together.";
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("用户输入 (流式):");
Console.WriteLine($" '{streamingMessage}'");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
Console.WriteLine("流式输出:");
var streamThread = translationWorkflowAgent.GetNewThread();
await foreach (var update in translationWorkflowAgent.RunStreamingAsync(streamingMessage, streamThread))
{
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.WriteLine($"   {update.Text}");
    }
}
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("流式调用完成");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");

4. 将 Workflow Agent 作为工具使用

现在让我们演示 Workflow Agent 最强大的功能:将复杂的多 Agent 协作流程作为工具提供给其他 Agent 使用。

场景说明,我们要创建一个 语言专家 Agent,它需要:

  1. 接收用户请求(如"测试这段文字的翻译一致性")
  2. 调用翻译链 Workflow (Workflow Agent) 进行多轮翻译
  3. 分析并报告翻译质量

这是一个典型的 Agent 调用 Workflow Agent"场景。

  • 将 Workflow Agent 转换为 AIFunction
// Workflow Agent 可以直接转换为 AIFunction
var translationChainFunction = translationWorkflowAgent.AsAIFunction(
    new AIFunctionFactoryOptions
    {
        Name = "translation_chain",
        Description = "对文本进行多轮翻译测试:英文→法语→西班牙语→英文,用于测试翻译一致性"
    }
);
  • 创建内容审核 Agent (使用 WorkflowAgent 作为工具)
// 创建语言专家 Agent,配置翻译链工具
var linguistAgent = chatClient.CreateAIAgent(
    instructions: @"你是一位语言学专家。你可以使用 translation_chain 工具来测试文本的翻译一致性。
    
    工作流程:
    1. 使用 translation_chain 工具对用户提供的文本进行多轮翻译
    2. 比较原始文本和最终翻译结果
    3. 分析翻译过程中的语义变化
    4. 提供翻译质量评估和改进建议
    
    返回格式化的分析报告。",
    name: "LinguistExpert",
    tools: [translationChainFunction]
);
Console.WriteLine("语言专家 Agent 创建完成");
Console.WriteLine(" 配置工具: translation_chain (Workflow Agent)");
  • 测试:Agent 自动调用 WorkflowAgent
// 创建线程并提交请求
var linguistThread = linguistAgent.GetNewThread();
var userRequest = "请测试这段文字的翻译一致性: The only way to do great work is to love what you do.";
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("用户请求:");
Console.WriteLine($" {userRequest}");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
Console.WriteLine("语言专家处理中(将自动调用翻译链工具)...\n");
// Agent 会自动调用 translation_chain 工具 (Workflow Agent)
var response = await linguistAgent.RunAsync(userRequest, linguistThread);
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("分析报告:");
foreach (var msg in response.Messages)
{
    if (msg.Role == ChatRole.Assistant)
    {
        Console.WriteLine($"\n{msg.Text}");
    }
}
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");

5. 直接调用 vs WorkflowAgent

对比总结

特性 直接调用 Workflow 使用 WorkflowAgent
调用方式 RunAsync(TInput) RunAsync(ChatMessage)
接口 Workflow AIAgent
输入类型 强类型 (string) ChatMessage
输出类型 强类型 (string) AgentRunResponse
作为工具 不支持 支持
中间件 不支持 支持 AIAgent 中间件
A2A 协作 不支持 支持
MCP 集成 不支持 支持
性能开销 稍高 (适配层)

架构对比图

flowchart TB
    subgraph "直接调用 Workflow"
        A1[应用代码] -->|RunAsync| B1[Workflow]
        B1 -->|string| A1
        style A1 fill:#FF5252,stroke:#C62828,stroke-width:2px,color:#fff
        style B1 fill:#FF5252,stroke:#C62828,stroke-width:2px,color:#fff
    end
    
    subgraph "WorkflowAgent 封装"
        A2[Agent/应用] -->|RunAsync| B2[WorkflowAgent]
        B2 -->|调用| C2[Workflow]
        C2 -->|结果| B2
        B2 -->|AgentRunResponse| A2
        
        B2 -.可应用.-> D2[中间件]
        B2 -.可注册为.-> E2[Agent工具]
        B2 -.可用于.-> F2[A2A/MCP]
        
        style A2 fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
        style B2 fill:#2196F3,stroke:#1565C0,stroke-width:2px,color:#fff
        style C2 fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
    end

6. 最佳实践

何时使用 WorkflowAgent?

  • 推荐场景:

    • 需要将 Workflow 作为工具提供给其他 Agent

    • 需要在 A2A 协作中使用 Workflow

    • 需要对 Workflow 应用 AIAgent 中间件 (如日志、缓存)

    • 需要通过 MCP 协议暴露 Workflow

    • 构建可复用的业务逻辑组件

  • 不推荐场景:

    • 简单的内部调用,直接使用 RunAsync 更高效

    • 性能敏感的场景 (适配层会有额外开销)

    • 不需要对外暴露的内部工作流

使用建议

  • 命名规范:WorkflowAgent 转为工具时,使用清晰的名称和描述
var tool = workflowAgent.AsAIFunction(new AIFunctionFactoryOptions
{
    Name = "clean_data",  // 动词_名词格式
    Description = "清晰描述功能和用途"
});
  • 错误处理:确保 Workflow 内部有完善的错误处理
protected override async Task<string> ExecuteAsync(...)
{
    try {
        // 业务逻辑
    } catch (Exception ex) {
        // 记录日志并返回友好错误信息
        return $"处理失败: {ex.Message}";
    }
}
  • 性能优化:对于频繁调用的 WorkflowAgent,考虑添加缓存中间件
// 示例:使用缓存中间件 (需自行实现或引用相关库)
// var cachedWorkflowAgent = new CachingAgent(workflowAgent);

四、工作流的事件机制

事件类别 作用域 代表事件 使用频率
Executor 事件 单个执行器 ExecutorCompletedEvent ⭐⭐⭐⭐⭐
SuperStep 事件 执行步骤 SuperStepCompletedEvent ⭐⭐
Workflow 事件 整个工作流 WorkflowOutputEvent ⭐⭐⭐⭐⭐
Agent 事件 AI Agent AgentRunUpdateEvent ⭐⭐⭐⭐
交互事件 人机协作 RequestInfoEvent ⭐⭐⭐

1. WorkflowEvent 基类剖析

  • 类定义
public class WorkflowEvent
{
    // 事件携带的数据负载
    public object? Data { get; }
    
    // 构造函数
    public WorkflowEvent(object? data = null)
    {
        Data = data;
    }
    
    // 提供友好的字符串表示
    public override string ToString() =>
        Data is not null ?
            $"{GetType().Name}(Data: {Data.GetType()} = {Data})" :
            $"{GetType().Name}()";
}
  • 核心特性
特性 说明 示例
通用性 所有事件的基类,提供统一接口 可以用 WorkflowEvent 类型接收所有事件
数据承载 Data 属性存储事件的业务数据 执行结果、错误信息、状态对象等
可扩展 子类可以添加特定属性 ExecutorEvent 添加 ExecutorId
调试友好 ToString() 提供清晰的事件描述 便于日志输出和调试
  • ExecutorEvent - Executor 作用域事件,ExecutorEvent 是最常用的事件类别,它表示与特定 Executor 执行相关的事件。

    关键属性

    • ExecutorId:标识哪个 Executor 触发了这个事件
      • 用于追踪执行流程
      • 区分并发执行的不同 Executor
      • 实现基于 Executor 的过滤逻辑

    四大子事件

    • ExecutorInvokedEvent:开始执行
    • ExecutorCompletedEvent:成功完成
    • ExecutorFailedEvent:执行失败
    • AgentRunUpdateEvent:Agent 更新
public class ExecutorEvent : WorkflowEvent
{
    // 触发此事件的 Executor 的唯一标识符
    public string ExecutorId { get; }
    
    public ExecutorEvent(string executorId, object? data) : base(data)
    {
        ExecutorId = executorId;
    }
    
    public override string ToString() =>
        Data is not null ?
            $"{GetType().Name}(Executor = {ExecutorId}, Data: {Data.GetType()} = {Data})" :
            $"{GetType().Name}(Executor = {ExecutorId})";
}
  • SuperStepEvent - 执行步骤事件,SuperStepEvent 用于表示工作流的执行步骤(SuperStep)级别的事件。

    在 MAF Workflow 中,SuperStep 是一个执行单元,可以包含:

    1. 一个或多个 Executor 的并发执行
    2. 一个完整的顺序执行链
    3. 两个子事件
事件类型 触发时机 主要用途
SuperStepStartedEvent SuperStep 开始执行 监控执行进度
SuperStepCompletedEvent SuperStep 完成执行 调试、Checkpoint 保存

使用场景:在调试复杂的并发工作流时,SuperStep 事件可以帮助你理解执行的阶段划分。

public class SuperStepEvent : WorkflowEvent
{
    // SuperStep 的索引(从 0 开始)
    public int StepNumber { get; }
    
    public SuperStepEvent(int stepNumber, object? data = null) : base(data)
    {
        StepNumber = stepNumber;
    }
}

2. 流式执行机制详解

MAF Workflow 提供两种执行模式:

执行模式 方法 特点 适用场景
同步执行 RunAsync() 等待完成后返回结果 简单工作流、不需要中间状态
流式执行 StreamAsync() 实时返回事件流 需要进度监控、Agent 工作流
sequenceDiagram
    participant User as 用户代码
    participant Workflow as 工作流引擎
    participant Executor as Executors
    
    Note over User,Executor: 同步执行 (RunAsync)
    User->>Workflow: RunAsync(input)
    Workflow->>Executor: 执行所有步骤
    Executor-->>Workflow: 返回结果
    Workflow-->>User: 返回 Run (包含结果)
    Note over User: 阻塞等待完成
    
    Note over User,Executor: 流式执行 (StreamAsync)
    User->>Workflow: StreamAsync(input)
    Workflow-->>User: 返回 StreamingRun
    Note over User: 立即返回
    User->>Workflow: WatchStreamAsync()
    loop 执行过程中
        Workflow->>Executor: 执行步骤
        Executor-->>Workflow: 产生事件
        Workflow-->>User: 实时推送事件
        Note over User: 实时处理
    end
  • 核心 API:WatchStreamAsync:是流式执行的核心方法,它返回一个 IAsyncEnumerable,可以使用 await foreach 遍历。
// 1. 启动流式执行
StreamingRun run = await InProcessExecution.StreamAsync(workflow, input);

// 2. 订阅事件流
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    // 3. 处理每个事件
    Console.WriteLine($"收到事件: {evt.GetType().Name}");
}

关键概念

概念 说明
StreamingRun 流式执行的句柄,用于监听事件和发送消息
IAsyncEnumerable 异步流,事件按产生顺序依次返回
await foreach 异步迭代,每收到一个事件就处理一次
实时性 事件一产生就推送,不等待工作流完成

执行流程图

flowchart TB
    A[StreamAsync] --> B[返回 StreamingRun]
    B --> C[WatchStreamAsync]
    C --> D[await foreach 循环]
    
    D --> E{收到事件?}
    E -->|是| F[处理事件]
    F --> E
    E -->|工作流完成| G[退出循环]
    
    H[Workflow Engine] -.产生事件.-> E
    
    style A fill:#2196F3,stroke:#1565C0,stroke-width:2px,color:#fff
    style C fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
    style F fill:#FF9800,stroke:#E65100,stroke-width:2px,color:#fff
  • 实验 1:观察基础事件流

我们使用前面章节创建的文本处理管道

// 构建工作流
var uppercaseExecutor = new UppercaseExecutor();
var reverseExecutor = new ReverseExecutor();

var simpleWorkflow = new WorkflowBuilder(uppercaseExecutor)
    .AddEdge(uppercaseExecutor, reverseExecutor)
    .WithOutputFrom(reverseExecutor)
    .Build();
// 执行工作流并监听所有事件
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("开始执行工作流 (流式模式)");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
var input = "Hello Workflow Events!";
Console.WriteLine($"输入: \"{input}\"\n");

// 启动流式执行
StreamingRun run = await InProcessExecution.StreamAsync(simpleWorkflow, input);
Console.WriteLine("开始监听事件流...\n");
Console.WriteLine(new string('─', 60));

int eventCount = 0;
// 订阅事件流
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    eventCount++;
    
    // 显示事件的完整信息
    Console.WriteLine($"\n事件 #{eventCount}");
    Console.WriteLine($" 类型: {evt.GetType().Name}");
    
    // // 根据事件类型显示详细信息
    // switch (evt)
    // {
    //     case WorkflowStartedEvent:
    //         Console.WriteLine($" 说明: 工作流开始执行");
    //         break;
            
    //     case ExecutorInvokedEvent invokedEvent:
    //         Console.WriteLine($" Executor: {invokedEvent.ExecutorId}");
    //         Console.WriteLine($" 说明: Executor 开始执行");
    //         Console.WriteLine($" 输入: {invokedEvent.Data}");
    //         break;
            
    //     case ExecutorCompletedEvent completedEvent:
    //         Console.WriteLine($" Executor: {completedEvent.ExecutorId}");
    //         Console.WriteLine($" 说明: Executor 执行完成");
    //         Console.WriteLine($" 输出: {completedEvent.Data}");
    //         break;
            
    //     case SuperStepCompletedEvent stepEvent:
    //         Console.WriteLine($" SuperStep: #{stepEvent.StepNumber}");
    //         Console.WriteLine($" 说明: 执行步骤完成");
    //         break;
            
    //     case WorkflowOutputEvent outputEvent:
    //         Console.WriteLine($" 来源: {outputEvent.SourceId}");
    //         Console.WriteLine($" 说明: 工作流产生输出");
    //         Console.WriteLine($" 结果: {outputEvent.Data}");
    //         break;
            
    //     default:
    //         Console.WriteLine($" 数据: {evt.Data}");
    //         break;
    // }
    
    Console.WriteLine(new string('─', 60));
}
Console.WriteLine($"\n工作流执行完成,共产生 {eventCount} 个事件");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");

通过上面的实验,我们可以观察到一个典型的事件序列:

1. SuperStepStartedEvent         ← SuperStep 0 开始
2. ExecutorInvokedEvent          ← UppercaseExecutor 开始
3. ExecutorCompletedEvent        ← UppercaseExecutor 完成
4. SuperStepCompletedEvent       ← SuperStep 0 完成
5. SuperStepStartedEvent         ← SuperStep 1 开始
6. ExecutorInvokedEvent          ← ReverseExecutor 开始
7. ExecutorCompletedEvent        ← ReverseExecutor 完成
8. WorkflowOutputEvent           ← 工作流输出结果
9. SuperStepCompletedEvent       ← SuperStep 1 完成

关键发现

  1. SuperStep 结构:每个 SuperStep 都有明确的开始和结束事件
  2. 执行顺序:SuperStepStarted → ExecutorInvoked → ExecutorCompleted → SuperStepCompleted
  3. 完整性:每个 Executor 都有 Invoked 和 Completed 事件对
  4. 输出时机:WorkflowOutputEvent 在最后一个 Executor 完成后、SuperStep 完成前触发
  5. 可追溯:通过事件序号和类型可以完整追踪执行流程

3. 系统内置事件详解

ExecutorCompletedEvent - 最常用的事件:ExecutorCompletedEvent 是工作流开发中使用频率最高的事件,它表示一个 Executor 成功完成执行。

public sealed class ExecutorCompletedEvent : ExecutorEvent
{
    public ExecutorCompletedEvent(string executorId, object? result) 
        : base(executorId, data: result)
    {
    }
}

核心用途

用途 说明 示例
获取结果 Data 属性包含 Executor 的返回值 提取中间处理结果
进度追踪 标记某个步骤已完成 更新进度条
链路追踪 记录执行顺序 构建执行日志
调试分析 查看每个步骤的输出 定位数据转换问题

实战示例

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is ExecutorCompletedEvent completed)
    {
        Console.WriteLine($"{completed.ExecutorId} 完成");
        Console.WriteLine($" 输出: {completed.Data}");
        
        // 根据 ExecutorId 做不同处理
        if (completed.ExecutorId == "ValidationExecutor")
        {
            bool isValid = (bool)completed.Data;
            if (!isValid)
            {
                Console.WriteLine("验证失败,终止后续流程");
            }
        }
    }
}
  • AgentRunUpdateEvent - Agent 专用事件:当 Agent 在工作流中执行时,会产生 AgentRunUpdateEvent 来报告执行进度和输出内容。
public class AgentRunUpdateEvent : ExecutorEvent
{
    public AgentRunUpdateEvent(string executorId, AgentRunResponseUpdate update) 
        : base(executorId, data: update)
    {
        Update = update;
    }
    
    public AgentRunResponseUpdate Update { get; }
}

关键属性 AgentRunResponseUpdate对象,包含:

  • Text:Agent 输出的文本内容
  • Contents:结构化内容(可能包含 FunctionCall 等)
  • 其他 Agent 运行时信息

触发条件:只有在发送 TurnToken 时设置 emitEvents: true,才会产生此事件!

// 正确:会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

// 错误:不会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: false));

实战示例

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentRunUpdateEvent agentEvent)
    {
        // 实时显示 Agent 输出
        Console.Write(agentEvent.Update.Text);
        
        // 检查是否有函数调用
        var functionCall = agentEvent.Update.Contents
            .OfType<FunctionCallContent>()
            .FirstOrDefault();
            
        if (functionCall != null)
        {
            Console.WriteLine($"\n调用函数: {functionCall.Name}");
        }
    }
}
  • WorkflowOutputEvent - 工作流输出事件:WorkflowOutputEvent 标志着工作流产生了输出,通常意味着工作流即将完成或已经完成。
public sealed class WorkflowOutputEvent : WorkflowEvent
{
    internal WorkflowOutputEvent(object data, string sourceId) : base(data)
    {
        SourceId = sourceId;
    }
    
    public string SourceId { get; }
    
    // 类型安全的数据提取方法
    public bool Is<T>() => Data is T;
    
    public bool Is<T>([NotNullWhen(true)] out T? maybeValue)
    {
        if (Data is T value)
        {
            maybeValue = value;
            return true;
        }
        maybeValue = default;
        return false;
    }
    
    public T? As<T>() => Data is T value ? value : default;
}

核心特性

特性 说明
SourceId 产生输出的 Executor ID
类型安全 提供 Is() 和 As() 方法
终止标志 通常是事件流的最后一个事件

实战示例

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is WorkflowOutputEvent outputEvent)
    {
        Console.WriteLine($"工作流完成,来自: {outputEvent.SourceId}");
        
        // 类型安全的数据提取
        if (outputEvent.Is<string>(out var textResult))
        {
            Console.WriteLine($"文本结果: {textResult}");
        }
        else if (outputEvent.Is<List<ChatMessage>>(out var messages))
        {
            Console.WriteLine($"对话消息: {messages.Count} 条");
        }
        
        // 或者使用 As<T>
        var result = outputEvent.As<MyCustomType>();
        if (result != null)
        {
            // 处理自定义类型
        }
    }
}
  • ExecutorFailedEvent - 错误处理事件:当 Executor 执行过程中抛出异常时,会产生 ExecutorFailedEvent。
public sealed class ExecutorFailedEvent : ExecutorEvent
{
    public ExecutorFailedEvent(string executorId, Exception? err)
        : base(executorId, data: err)
    {
        Data = err;
    }
    
    public new Exception? Data { get; }
}

实战示例

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is ExecutorFailedEvent failedEvent)
    {
        Console.WriteLine($"{failedEvent.ExecutorId} 执行失败");
        Console.WriteLine($" 错误类型: {failedEvent.Data?.GetType().Name}");
        Console.WriteLine($" 错误消息: {failedEvent.Data?.Message}");
        Console.WriteLine($" 堆栈跟踪: {failedEvent.Data?.StackTrace}");
        
        // 根据错误类型做不同处理
        switch (failedEvent.Data)
        {
            case HttpRequestException httpEx:
                Console.WriteLine("网络错误,建议重试");
                break;
            case TimeoutException timeoutEx:
                Console.WriteLine("超时错误,可能需要增加超时时间");
                break;
            default:
                Console.WriteLine("未知错误,请检查日志");
                break;
        }
    }
}
  • RequestInfoEvent - 人机协作事件:RequestInfoEvent 用于 Human-in-the-Loop 场景,表示工作流需要外部输入(通常是人工干预)。
public sealed class RequestInfoEvent : WorkflowEvent
{
    public RequestInfoEvent(ExternalRequest request) : base(request)
    {
        Request = request;
    }
    
    public ExternalRequest Request { get; }
}

使用场景

  • 审批流程:等待管理员批准
  • 内容审核:等待人工审核敏感内容
  • 用户交互:等待用户提供额外信息
  • 配置选择:等待用户选择执行路径

实战示例

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent requestEvent)
    {
        Console.WriteLine("工作流暂停,等待外部输入");
        Console.WriteLine($" 请求ID: {requestEvent.Request.RequestId}");
        
        // 获取用户输入(实际场景可能是 Web API 或 UI)
        Console.Write("请输入响应内容: ");
        var userInput = Console.ReadLine();
        
        // 创建响应
        var response = new ExternalResponse
        {
            RequestId = requestEvent.Request.RequestId,
            Data = userInput
        };
        
        // 将响应发送回工作流
        await run.TrySendMessageAsync(response);
        Console.WriteLine("响应已发送,工作流继续执行");
    }
}

4. 事件监听与过滤

在实际应用中,我们通常不需要处理所有事件,而是只关注特定类型的事件,接下来我们看看多种事件过滤技巧。

  • 基于事件类型的过滤,最基础的过滤方式是使用 is 模式匹配或 switch 表达式。
// 示例:只处理 ExecutorCompletedEvent
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is ExecutorCompletedEvent completed)
    {
        Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
    }
    // 忽略其他事件
}

// 使用 switch 表达式处理多种事件类型
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    var message = evt switch
    {
        ExecutorCompletedEvent completed => 
            $"{completed.ExecutorId} 完成: {completed.Data}",
        
        ExecutorFailedEvent failed => 
            $"{failed.ExecutorId} 失败: {failed.Data?.Message}",
        
        AgentRunUpdateEvent agentUpdate => 
            $"Agent 更新: {agentUpdate.Update.Text}",
        
        WorkflowOutputEvent output => 
            $"输出结果: {output.Data}",
        
        _ => null // 忽略其他事件
    };
    
    if (message != null)
    {
        Console.WriteLine(message);
    }
}
  • 基于 ExecutorId 的过滤,在复杂的工作流中,我们可能只关注特定 Executor 的事件。

    使用场景

    • 进度监控:只追踪关键步骤的执行

    • 问题调试:定位某个特定 Executor 的问题

    • 日志记录:只记录重要步骤的日志

    • 性能优化:减少不必要的事件处理

// 只监听特定 Executor 的完成事件
var targetExecutorId = "ReverseExecutor";
Console.WriteLine($"只监听 {targetExecutorId} 的事件\n");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    // 方式 1: 直接在 ExecutorEvent 上过滤
    if (evt is ExecutorEvent executorEvent && 
        executorEvent.ExecutorId == targetExecutorId)
    {
        Console.WriteLine($"{evt.GetType().Name}");
        Console.WriteLine($" Executor: {executorEvent.ExecutorId}");
        Console.WriteLine($" 数据: {executorEvent.Data}\n");
    }
}

// 方式 2: 组合多个条件
var interestedExecutors = new[] { "UppercaseExecutor", "ReverseExecutor" };
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is ExecutorCompletedEvent completed && 
        interestedExecutors.Contains(completed.ExecutorId))
    {
        Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
    }
}
  • LINQ 过滤(最优雅的方式),利用 C# 的 LINQ 能力,我们可以创建更加优雅和可复用的过滤逻辑。
优势 说明
链式调用 可以组合多个过滤条件
类型安全 强类型支持,编译时检查
代码复用 过滤逻辑可以封装为扩展方法
可读性强 语义清晰,接近自然语言
// 定义 LINQ 扩展方法
//public static class WorkflowEventExtensions
//{
    // 过滤特定类型的事件
    public static async IAsyncEnumerable<T> OfEventType<T>(
        this IAsyncEnumerable<WorkflowEvent> events) where T : WorkflowEvent
    {
        await foreach (var evt in events)
        {
            if (evt is T typedEvent)
            {
                yield return typedEvent;
            }
        }
    }
    
    // 过滤特定 Executor 的事件
    public static async IAsyncEnumerable<ExecutorEvent> FromExecutor(
        this IAsyncEnumerable<WorkflowEvent> events, 
        string executorId)
    {
        await foreach (var evt in events)
        {
            if (evt is ExecutorEvent executorEvent && 
                executorEvent.ExecutorId == executorId)
            {
                yield return executorEvent;
            }
        }
    }
    
    // 过滤多个 Executor 的事件
    public static async IAsyncEnumerable<ExecutorEvent> FromExecutors(
        this IAsyncEnumerable<WorkflowEvent> events, 
        params string[] executorIds)
    {
        var executorSet = new HashSet<string>(executorIds);        
        await foreach (var evt in events)
        {
            if (evt is ExecutorEvent executorEvent && 
                executorSet.Contains(executorEvent.ExecutorId))
            {
                yield return executorEvent;
            }
        }
    }
//}

使用 LINQ 扩展方法的优雅示例

Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("LINQ 过滤示例");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");

var simpleWorkflow = new WorkflowBuilder(uppercaseExecutor)
    .AddEdge(uppercaseExecutor, reverseExecutor)
    .WithOutputFrom(reverseExecutor)
    .Build();

// 重新执行工作流
var run2 = await InProcessExecution.StreamAsync(simpleWorkflow, "Hello LINQ");

// 示例 1: 只监听 ExecutorCompletedEvent
Console.WriteLine("示例 1: 只监听完成事件\n");
await foreach (var completed in run2.WatchStreamAsync().OfEventType<ExecutorCompletedEvent>())
{
    Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
}
await run2.DisposeAsync();
Console.WriteLine("\n" + new string('─', 60) + "\n");

// 示例 2: 只监听特定 Executor
var run3 = await InProcessExecution.StreamAsync(simpleWorkflow, "Hello Filter");

Console.WriteLine("示例 2: 只监听 ReverseExecutor\n");
await foreach (var evt in run3.WatchStreamAsync().FromExecutor("ReverseExecutor"))
{
    Console.WriteLine($"{evt.GetType().Name}: {evt.Data}");
}
await run3.DisposeAsync();
Console.WriteLine("\n" + new string('─', 60) + "\n");

// 示例 3: 链式组合(只监听特定 Executor 的完成事件)
var run4 = await InProcessExecution.StreamAsync(simpleWorkflow, "Hello Chain");
Console.WriteLine("示例 3: 链式过滤 (ReverseExecutor + Completed)\n");
await foreach (var completed in run4.WatchStreamAsync()
    .FromExecutor("ReverseExecutor")
    .OfEventType<ExecutorCompletedEvent>())
{
    Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
}
await run4.DisposeAsync();

Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("LINQ 过滤示例完成");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");

5. 实战案例 - 构建事件监控系统

现在让我们构建一个完整的事件监控系统,展示如何在实际项目中应用事件系统。

我们将构建一个 WorkflowMonitor 类,它可以:

  • 实时统计:记录每个 Executor 的执行时间
  • 日志记录:记录关键事件
  • 错误监控:捕获并报告错误
  • 进度展示:显示执行进度
  • 性能分析:识别性能瓶颈

监控系统架构

flowchart TB
    subgraph "事件流"
        A[WorkflowEvent Stream]
    end
    
    subgraph "WorkflowMonitor"
        B[事件接收器]
        C[统计收集器]
        D[日志记录器]
        E[错误处理器]
        F[进度追踪器]
    end
    
    subgraph "输出"
        G[控制台日志]
        H[性能报告]
        I[错误报告]
    end
    
    A --> B
    B --> C
    B --> D
    B --> E
    B --> F
    
    C --> H
    D --> G
    E --> I
    F --> G
    
    style A fill:#2196F3,stroke:#1565C0,stroke-width:2px,color:#fff
    style B fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
    style H fill:#FF9800,stroke:#E65100,stroke-width:2px,color:#fff
  • 监控类定义
// 完整的工作流监控系统
public class WorkflowMonitor
{
    // 执行统计信息
    private class ExecutorStats
    {
        public string ExecutorId { get; set; } = "";
        public DateTime? StartTime { get; set; }
        public DateTime? EndTime { get; set; }
        public TimeSpan? Duration => EndTime - StartTime;
        public bool Success { get; set; }
        public object? Result { get; set; }
        public Exception? Error { get; set; }
    }
    
    private readonly Dictionary<string, ExecutorStats> _stats = new();
    private readonly List<string> _logs = new();
    private DateTime _workflowStartTime;
    private DateTime _workflowEndTime;
    private int _totalEvents = 0;
    private int _superStepCount = 0;
    
    // 监控入口方法
    public async Task MonitorAsync(StreamingRun run)
    {
        Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
        Console.WriteLine("工作流监控系统已启动");
        Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
        
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            _totalEvents++;
            await HandleEventAsync(evt);
        }
        
        // 执行完成后生成报告
        GenerateReport();
    }
    
    // 处理单个事件
    private async Task HandleEventAsync(WorkflowEvent evt)
    {
        switch (evt)
        {
            case WorkflowStartedEvent:
                HandleWorkflowStarted();
                break;
                
            case ExecutorInvokedEvent invoked:
                HandleExecutorInvoked(invoked);
                break;
                
            case ExecutorCompletedEvent completed:
                HandleExecutorCompleted(completed);
                break;
                
            case ExecutorFailedEvent failed:
                HandleExecutorFailed(failed);
                break;
                
            case SuperStepCompletedEvent step:
                HandleSuperStepCompleted(step);
                break;
                
            case WorkflowOutputEvent output:
                HandleWorkflowOutput(output);
                break;
                
            case AgentRunUpdateEvent agentUpdate:
                HandleAgentUpdate(agentUpdate);
                break;
        }
        
        await Task.CompletedTask;
    }
    
    // 工作流开始
    private void HandleWorkflowStarted()
    {
        _workflowStartTime = DateTime.Now;
        Log("工作流开始执行");
    }
    
    // Executor 调用
    private void HandleExecutorInvoked(ExecutorInvokedEvent evt)
    {
        _stats[evt.ExecutorId] = new ExecutorStats
        {
            ExecutorId = evt.ExecutorId,
            StartTime = DateTime.Now
        };
        
        Log($" [{evt.ExecutorId}] 开始执行");
    }
    
    // Executor 完成
    private void HandleExecutorCompleted(ExecutorCompletedEvent evt)
    {
        if (_stats.TryGetValue(evt.ExecutorId, out var stats))
        {
            stats.EndTime = DateTime.Now;
            stats.Success = true;
            stats.Result = evt.Data;
            
            Log($"[{evt.ExecutorId}] 执行成功 (耗时: {stats.Duration?.TotalMilliseconds:F2}ms)");
        }
    }
    
    // Executor 失败
    private void HandleExecutorFailed(ExecutorFailedEvent evt)
    {
        if (_stats.TryGetValue(evt.ExecutorId, out var stats))
        {
            stats.EndTime = DateTime.Now;
            stats.Success = false;
            stats.Error = evt.Data;
            
            Log($"[{evt.ExecutorId}] 执行失败: {evt.Data?.Message}");
        }
    }
    
    // SuperStep 完成
    private void HandleSuperStepCompleted(SuperStepCompletedEvent evt)
    {
        _superStepCount++;
        Log($"SuperStep #{evt.StepNumber} 完成");
    }
    
    // 工作流输出
    private void HandleWorkflowOutput(WorkflowOutputEvent evt)
    {
        _workflowEndTime = DateTime.Now;
        Log($"工作流输出结果 (来自: {evt.SourceId})");
    }
    
    // Agent 更新
    private void HandleAgentUpdate(AgentRunUpdateEvent evt)
    {
        if (!string.IsNullOrEmpty(evt.Update.Text))
        {
            Log($"[{evt.ExecutorId}] Agent 输出: {evt.Update.Text}");
        }
    }
    
    // 记录日志
    private void Log(string message)
    {
        var timestamp = DateTime.Now.ToString("HH:mm:ss.fff");
        var logMessage = $"[{timestamp}] {message}";
        _logs.Add(logMessage);
        Console.WriteLine(logMessage);
    }
    
    // 生成性能报告
    private void GenerateReport()
    {
        Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
        Console.WriteLine("执行性能报告");
        Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
        
        // 总体统计
        var totalDuration = _workflowEndTime - _workflowStartTime;
        Console.WriteLine("### 总体统计");
        new
        {
            总事件数 = _totalEvents,
            SuperStep数量 = _superStepCount,
            Executor数量 = _stats.Count,
            总耗时 = $"{totalDuration.TotalMilliseconds:F2}ms",
            成功数 = _stats.Values.Count(s => s.Success),
            失败数 = _stats.Values.Count(s => !s.Success)
        }.Display();
        
        Console.WriteLine("\n### Executor 性能明细\n");
        
        // Executor 性能表格
        foreach (var stats in _stats.Values.OrderBy(s => s.StartTime))
        {
            var status = stats.Success ? "成功" : "失败";
            var duration = stats.Duration?.TotalMilliseconds.ToString("F2") ?? "N/A";
            
            Console.WriteLine($"{stats.ExecutorId}");
            Console.WriteLine($" 状态: {status}");
            Console.WriteLine($" 耗时: {duration}ms");
            
            if (stats.Error != null)
            {
                Console.WriteLine($" 错误: {stats.Error.Message}");
            }
            
            Console.WriteLine();
        }
        
        // 性能分析
        if (_stats.Values.Any(s => s.Duration.HasValue))
        {
            var slowest = _stats.Values
                .Where(s => s.Duration.HasValue)
                .OrderByDescending(s => s.Duration)
                .First();
                
            Console.WriteLine("### 性能瓶颈");
            Console.WriteLine($"最慢的 Executor: {slowest.ExecutorId}");
            Console.WriteLine($" 耗时: {slowest.Duration?.TotalMilliseconds:F2}ms");
        }
        
        Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    }
}
  • 使用监控系统
// 创建一个更复杂的工作流来测试监控系统
public class DataProcessorExecutor : Executor<string, string>
{
    public DataProcessorExecutor() : base("DataProcessor") { }
    
    public override async ValueTask<string> HandleAsync(
        string input, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 模拟一些处理时间
        await Task.Delay(100);
        return $"[Processed: {input}]";
    }
}

public class DataValidatorExecutor : Executor<string, bool>
{
    public DataValidatorExecutor() : base("DataValidator") { }
    
    public override async ValueTask<bool> HandleAsync(
        string input, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 模拟验证时间
        await Task.Delay(50);
        return input.Length > 0;
    }
}

public class DataFormatterExecutor : Executor<string, string>
{
    public DataFormatterExecutor() : base("DataFormatter") { }
    
    public override async ValueTask<string> HandleAsync(
        string input, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 模拟格式化时间
        await Task.Delay(80);
        return $"**{input}**";
    }
}

// 构建测试工作流
var processor = new DataProcessorExecutor();
var validator = new DataValidatorExecutor();
var formatter = new DataFormatterExecutor();
var testWorkflow = new WorkflowBuilder(processor)
    .AddEdge(processor, validator)
    .AddEdge(validator, formatter)
    .Build();

5. 最佳实践建议

  • DO(推荐做法)

    • 使用 LINQ 扩展进行过滤 - 代码更清晰、可复用

    • 监听特定事件类型 - 避免处理不需要的事件

    • 记录关键事件 - 便于调试和审计

    • 异步处理事件 - 避免阻塞事件流

    • 使用 switch 表达式 - 处理多种事件类型时更简洁

  • DON'T(避免做法)

    • 在事件处理中做耗时操作 - 会阻塞事件流
    • 忽略 ExecutorFailedEvent - 可能导致静默失败
    • 过度监听所有事件 - 影响性能
    • 在事件处理中修改工作流状态 - 可能导致意外行为
    • 忘记设置 emitEvents: true - Agent 事件不会产生
  • 事件监听模式速查

场景 代码示例 适用情况
监听所有事件 await foreach (var e in run.WatchStreamAsync()) 调试、完整日志
监听特定类型 run.WatchStreamAsync().OfEventType() 只关注某类事件
监听特定 Executor run.WatchStreamAsync().FromExecutor("ExecutorId") 追踪特定步骤
组合过滤 run.WatchStreamAsync().FromExecutor("X").OfEventType() 精确过滤
多类型处理 evt switch 差异化处理
  • 常用事件属性速查
事件类型 关键属性 数据类型 获取方式
ExecutorCompletedEvent ExecutorId string evt.ExecutorId
Data (结果) object? evt.Data
ExecutorFailedEvent ExecutorId string evt.ExecutorId
Data (异常) Exception? evt.Data
AgentRunUpdateEvent ExecutorId string evt.ExecutorId
Update AgentRunResponseUpdate evt.Update
Text (输出文本) string evt.Update.Text
WorkflowOutputEvent SourceId string evt.SourceId
Data (结果) object? evt.Data
类型检查 bool evt.Is()`
类型转换 T? evt.As()
SuperStepCompletedEvent StepNumber int evt.StepNumber
  • 错误处理检查清单
    • 监听 ExecutorFailedEvent 处理异常
    • 检查 WorkflowOutputEvent 是否为 null
    • 使用类型安全的 Is<T>()As<T>() 方法
    • 在事件处理中捕获异常,避免崩溃
    • 记录关键错误信息(ExecutorId、异常类型、堆栈)
  • 性能优化检查清单
    • 使用过滤器减少不必要的事件处理
    • 避免在事件处理中执行耗时操作
    • 对于密集事件(如 AgentRunUpdateEvent),考虑批处理
    • 使用异步操作,避免阻塞事件流
    • 监控事件处理的性能瓶颈
  • AgentRunUpdateEvent 必备条件:
// 正确:会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

// 错误:不会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: false));
await run.TrySendMessageAsync(new TurnToken()); // 默认 false
  • 调试技巧速查
技巧 实现方式 用途
查看所有事件 Console.WriteLine(evt) 了解事件序列
记录事件时间戳 DateTime.Now 在事件处理时记录 性能分析
统计事件数量 维护计数器 验证执行完整性
追踪执行路径 记录 ExecutorId 序列 理解执行流程
捕获中间结果 保存 ExecutorCompletedEvent.Data 数据流分析

五、检查点

1. 概念

在现实生活中,我们经常会遇到需要"保存进度"的场景:

场景 检查点机制 作用
玩游戏 存档点 游戏失败后可以从存档点重新开始
写文档 自动保存 意外关闭后可以恢复到最近保存的版本
Git 版本控制 Commit 提交点 可以回退到任意历史提交点
数据库事务 事务日志 系统崩溃后可以从日志恢复数据

在 Microsoft Agent Framework (MAF) 的 Workflow 中,检查点(Checkpoint) 就是这样一种机制:

定义:检查点是 Workflow 执行过程中的状态快照,它记录了某个时刻所有 Executor 的状态数据,使得工作流可以在未来的某个时间点从该状态继续执行或重新执行。

检查点机制为 Workflow 应用提供了强大的能力:

  • 容错恢复(Fault Tolerance)

    场景示例:长时间运行的数据处理工作流,在处理了 1000 条记录后系统崩溃,可以从最近的检查点恢复,而不必从头开始。

  • 状态回溯(State Rollback)

    场景示例:在 AI Agent 决策过程中,如果某条路径的结果不理想,可以回退到之前的检查点,尝试其他决策路径。

flowchart LR
    A[初始状态] --> B[检查点 1]
    B --> C[检查点 2]
    C --> D[检查点 3]
    D --> E[当前状态]
    E -.回退.-> C
    C --> F[新路径探索]
  • 分支探索(Branch Exploration)

    场景示例:从同一个检查点创建多个独立的工作流实例,并行探索不同的执行路径(A/B 测试、参数调优)。

  • 人工介入(Human-in-the-Loop)

    场景示例:在审批流程中,工作流执行到需要人工审批的节点时创建检查点,等待人工决策后再从该检查点继续执行。

维度 无检查点 有检查点
系统崩溃 需要从头开始,浪费计算资源 从最近检查点恢复,节省时间
调试复杂流程 每次调试都要完整运行 可以从特定检查点开始调试
长时间任务 中断后全部重来 断点续传,不丢失进度
多路径探索 需要多次完整执行 从检查点分叉,提高效率
人工干预 难以实现异步等待 保存状态后等待,自然支持

2. 核心概念:Super Step(超级步骤)

定义:Super Step 是 Workflow 执行的基本阶段单位。一个 Super Step 包含一组 Executor 的完整执行周期,从接收信号开始,到所有 Executor 完成工作为止。

Super Step 的执行流程

flowchart TD
    A[开始 Super Step] --> B{有待执行的 Executor?}
    B -->|是| C[执行 Executor 1]
    B -->|是| D[执行 Executor 2]
    B -->|是| E[执行 Executor N]
    C --> F[等待所有 Executor 完成]
    D --> F
    E --> F
    F --> G[Super Step 完成]
    G --> H[自动创建检查点]
    H --> I{还有后续信号?}
    I -->|是| A
    I -->|否| J[工作流结束]
特性 说明
并发执行 同一 Super Step 中的多个 Executor 可以并行执行
完整性保证 只有当前 Super Step 的所有 Executor 完成后,才进入下一个 Super Step
检查点边界 每个 Super Step 结束时自动创建检查点

3. 核心概念:Checkpoint(检查点)

当为 Workflow 提供了 CheckpointManager 时,系统会在每个 Super Step 结束时自动创建检查点

// 创建带检查点的工作流运行
var checkpointManager = CheckpointManager.Default;
await using Checkpointed<StreamingRun> checkpointedRun = 
    await InProcessExecution.StreamAsync(workflow, initialSignal, checkpointManager);

检查点通过 CheckpointInfo 对象表示,它包含:

属性 类型 说明
RunId Guid 工作流运行实例的唯一标识
Sequence int 检查点序号(第几个 Super Step)
StateData Dictionary 所有 Executor 保存的状态数据
Timestamp DateTime 检查点创建时间

检查点通过 SuperStepCompletedEvent 事件获取:

await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // 从事件中获取自动创建的检查点
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
        
        if (checkpoint is not null)
        {
            // 保存检查点以备后用
            checkpoints.Add(checkpoint);
            Console.WriteLine($"检查点已创建:序号 {checkpoint.Sequence}");
        }
    }
}

4. 核心概念:CheckpointManager(检查点管理器)

CheckpointManager 是负责存储和读取检查点的组件:

flowchart LR
    A[Workflow Engine] -->|保存检查点| B[CheckpointManager]
    B -->|存储到| C[(存储介质)]
    C -->|读取检查点| B
    B -->|恢复状态| A
    
    style C fill:#e1f5ff

MAF 提供了开箱即用的默认实现:CheckpointManager.Default

MAF 支持自定义 ICheckpointManager 接口实现:

public interface ICheckpointManager
{
    // 保存检查点
    ValueTask SaveAsync(CheckpointInfo checkpoint, CancellationToken cancellationToken);
    
    // 加载检查点
    ValueTask<CheckpointInfo> LoadAsync(Guid runId, int sequence, CancellationToken cancellationToken);
    
    // 列出所有检查点
    IAsyncEnumerable<CheckpointInfo> ListAsync(Guid runId, CancellationToken cancellationToken);
}

提示:在生产环境中,可以实现基于 Azure Blob Storage、SQL Server 或 Redis 的 CheckpointManager。

5. 核心概念:Executor 的检查点生命周期

要让 Executor 支持检查点,需要实现两个关键方法:保存状态恢复状态

检查点生命周期流程

sequenceDiagram
    participant WF as Workflow Engine
    participant Ex as Executor
    participant CM as CheckpointManager
    
    Note over WF,CM: Super Step 执行阶段
    WF->>Ex: HandleAsync(signal)
    Ex->>Ex: 执行业务逻辑
    Ex->>Ex: 更新内部状态
    
    Note over WF,CM: Super Step 完成阶段
    WF->>Ex: OnCheckpointingAsync()
    Ex->>WF: QueueStateUpdate("key", stateData)
    WF->>CM: SaveAsync(checkpoint)
    
    Note over WF,CM: 状态恢复阶段
    CM->>WF: LoadAsync(checkpointInfo)
    WF->>Ex: OnCheckpointRestoredAsync()
    Ex->>WF: GetStateAsync("key")
    Ex->>Ex: 恢复内部状态

让我们看看源码中的完整实现:

/// <summary>
/// 猜数字游戏的 Executor,支持检查点机制
/// </summary>
internal sealed class GuessNumberExecutor() : Executor<NumberSignal>("Guess")
{
    // 状态数据:猜测范围的上下界
    public int LowerBound { get; private set; }
    public int UpperBound { get; private set; }    

    // 用于在检查点中存储状态的键
    private const string StateKey = "GuessNumberExecutorState";
    
    // 构造函数:初始化范围
    public GuessNumberExecutor(int lowerBound, int upperBound) : this()
    {
        this.LowerBound = lowerBound;
        this.UpperBound = upperBound;
    }
    
    // 计算下一次猜测的值(二分法)
    private int NextGuess => (this.LowerBound + this.UpperBound) / 2;
    
    // 业务逻辑:处理不同信号
    public override async ValueTask HandleAsync(
        NumberSignal message, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        switch (message)
        {
            case NumberSignal.Init:
                await context.SendMessageAsync(this.NextGuess, cancellationToken: cancellationToken);
                break;
            case NumberSignal.Above:
                this.UpperBound = this.NextGuess - 1; // 调整上界
                await context.SendMessageAsync(this.NextGuess, cancellationToken: cancellationToken);
                break;
            case NumberSignal.Below:
                this.LowerBound = this.NextGuess + 1; // 调整下界
                await context.SendMessageAsync(this.NextGuess, cancellationToken: cancellationToken);
                break;
        }
    }
    
    /// <summary>
    /// 关键方法 1:保存检查点状态
    /// 当 Super Step 结束时,框架会调用此方法
    /// </summary>
    protected override ValueTask OnCheckpointingAsync(
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 将需要保存的状态数据排队(支持多个键值对)
        return context.QueueStateUpdateAsync(
            StateKey, 
            (this.LowerBound, this.UpperBound), 
            cancellationToken: cancellationToken);
    }
    
    /// <summary>
    /// 关键方法 2:恢复检查点状态
    /// 当从检查点恢复时,框架会调用此方法
    /// </summary>
    protected override async ValueTask OnCheckpointRestoredAsync(
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 从检查点中读取保存的状态数据
        (int lowerBound, int upperBound) = await context
            .GetStateAsync<(int, int)>(StateKey, cancellationToken: cancellationToken);
        
        // 恢复实例的状态
        this.LowerBound = lowerBound;
        this.UpperBound = upperBound;
    }

}

// 定义信号枚举
internal enum NumberSignal
{
    Init,   // 初始化
    Above,  // 猜大了
    Below   // 猜小了
}

Console.WriteLine("GuessNumberExecutor 定义完成");

代码详解

  1. OnCheckpointingAsync() - 保存状态
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, ...)
{
    // 将状态数据添加到检查点
    return context.QueueStateUpdateAsync(StateKey, (LowerBound, UpperBound), ...);
}
  • 调用时机:每个 Super Step 结束时自动调用
  • 状态数据:可以是任何可序列化的对象(基本类型、元组、自定义类)
  • StateKey:用于标识状态数据的键,恢复时需要使用相同的键
  • 多状态支持:可以多次调用 QueueStateUpdateAsync() 保存多个状态
  1. OnCheckpointRestoredAsync() - 恢复状态
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, ...)
{
    // 从检查点读取状态数据
    var state = await context.GetStateAsync<(int, int)>(StateKey, ...);
    
    // 恢复实例字段
    this.LowerBound = state.Item1;
    this.UpperBound = state.Item2;
}
  • 调用时机:从检查点恢复工作流时调用
  • 类型匹配:GetStateAsync() 的类型必须与保存时一致
  • 键匹配:必须使用与保存时相同的 StateKey
  • 必须实现:如果 Executor 有状态,这两个方法都必须正确实现
  1. 状态数据的序列化要求
  • 支持的类型:

    • 基本类型:int, string, bool, DateTime 等

    • 值类型:元组 (int, int)、结构体 struct

    • 引用类型:自定义类(需支持 JSON 序列化)

    • 集合类型:List, Dictionary<K,V>

  • 不支持的类型:

    • 不可序列化的对象(如数据库连接、文件句柄)

    • 包含循环引用的对象

    • 包含委托或事件的对象

6. Super Step、Checkpoint、Executor 的关系总结

flowchart TB
    subgraph "Workflow 执行流程"
        A[开始] --> B[Super Step 1]
        B --> C[检查点 1]
        C --> D[Super Step 2]
        D --> E[检查点 2]
        E --> F[Super Step 3]
        F --> G[检查点 3]
        G --> H[结束]
    end
    
    subgraph "Super Step 内部"
        I[接收信号] --> J[Executor 1<br/>HandleAsync]
        I --> K[Executor 2<br/>HandleAsync]
        J --> L[等待完成]
        K --> L
        L --> M[OnCheckpointingAsync]
        M --> N[保存状态到检查点]
    end
    
    subgraph "检查点恢复"
        O[加载检查点] --> P[创建新 Workflow 实例]
        P --> Q[OnCheckpointRestoredAsync]
        Q --> R[恢复 Executor 状态]
        R --> S[继续执行]
    end
    
    B -.内部执行.-> I
    C -.恢复流程.-> O
概念 核心要点
Super Step • Workflow 执行的基本单位
• 包含一组 Executor 的完整执行
• 是检查点创建的边界
Checkpoint • 在 Super Step 结束时自动创建
• 包含所有 Executor 的状态快照
• 通过 SuperStepCompletedEvent 获取
CheckpointManager • 负责存储和读取检查点
• 默认实现是内存存储
• 可自定义实现(文件、数据库、云存储)
Executor 生命周期 • OnCheckpointingAsync() 保存状态
• OnCheckpointRestoredAsync() 恢复状态
• 状态数据必须可序列化

六、工作流状态共享

在真实业务中,以下痛点最常见:

场景 传统做法 痛点 Workflow Shared State 方案
多步骤共享原始输入 通过参数层层传递 易丢失、难维护 在入口步骤写入状态,后续随取随用
并发执行器共享缓存 使用静态变量 线程不安全 作用域隔离 + 状态管理器负责并发
人机协作回填审批结论 HTTP 回调携带参数 状态丢失、难追踪 WaitForInput + 状态恢复
Agent 输出给多个消费者 多次调用同一个 Agent 成本高 把输出写入状态,其他步骤复用

1. WorkflowContext 状态 API 速查

API 作用 典型使用方式
QueueStateUpdateAsync(key, value, scope) 将状态写入队列,当前步骤完成后统一提交,避免脏读 适合在 Executor 内写入结果、缓存、审计信息
ReadStateAsync(key, scope) 读取指定键,找不到则返回 null 读取上游步骤输出、缓存的 Agent 响应
ReadOrInitStateAsync(key, factory, scope) 如果不存在则初始化,兼顾懒加载与并发一致性 首次接入业务实体、惰性缓存、计数器 / 重试次数
QueueClearScopeAsync(scope) 清理整个作用域 敏感信息擦除、生命周期结束时清理
ReadStateKeysAsync(scope) 列出作用域中所有键 调试、监控、构建动态路由
  • ReadOrInitStateAsync 应用场景

    • 首次加载业务上下文:如 fileId 对应的元数据、审批信息等只需初始化一次的结构。

    • 惰性构建缓存:在第一次命中时生成摘要、Embedding 或模型响应,后续 Executor 复用。

    • 并发累加器:计数器、重试次数、Fan-in 进度等需要“读当前值 + 初始化默认值”的模式。

作用域 (Scope) 分类

Scope 类型 说明 建议命名
Executor Scope (默认) 键只对当前 Executor 可见 nameof(Executor)
自定义业务 Scope 多个 Executor 共享,例如 "FileContentState" 模块名 + State
System / Environment Scope 框架内部使用,不建议直接写入 使用 VariableScopeNames 常量时需谨慎

线程安全说明:QueueStateUpdateAsync 会在状态管理器内部加锁,所以不需要额外的 lock。但仍需控制数据粒度(例如传输 ID,而不是整个大对象)。

2. 案例:文档统计工作流

我们将复刻官方 Workflows/SharedStates 示例,并加入更多注释:

  1. 入口步骤读取文本内容,写入 FileContentState 作用域中,值为 fileId -> 文本内容
  2. Fan-out 出两个 Executor:
    • WordCountingExecutor 统计单词数
    • ParagraphCountingExecutor 统计段落数
  3. Fan-in 到 AggregationExecutor,汇总结果并输出结构化报表

该案例展示了一次写入,多处读取的模式,也是后续 MapReduce、并发协作的基础。

  • 准备工作
// 示例数据
internal static class SharedStateSampleData
{
    private static readonly IReadOnlyDictionary<string, string> Documents = new Dictionary<string, string>
    {
        ["ProductBrief"] = "MAF Workflow 让 .NET 团队可以像积木一样组合 Agent、Executor 与工具, 支持流式事件、并发节点和可观测性。\n\n它强调企业级能力, 包括状态管理、依赖注入、权限控制, 适合搭建端到端 AI 业务流程。",
        ["WeeklyReport"] = "本周平台完成了 Shared State 功能的代码走查, 已经覆盖 Fan-out/Fan-in, Loop, Human-in-the-Loop 三种场景。\n\n下周计划: 1) 集成多模型投票; 2) 增加异常回滚; 3) 落地监控指标。"
    };

    public static string GetDocument(string name)
        => Documents.TryGetValue(name, out var content)
            ? content
            : throw new ArgumentException($"未找到文档: {name}");
}

// 状态常量
internal static class FileContentStateConstants
{
    public const string ScopeName = "FileContentState";
}

// 模型定义
internal sealed class FileStats
{
    public int WordCount { get; init; }
    public int ParagraphCount { get; init; }
}

// 定义FileReadExecutor
internal sealed class FileReadExecutor() : Executor<string, string>("FileReadExecutor")
{
    public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        string content = SharedStateSampleData.GetDocument(message);
        string fileId = Guid.NewGuid().ToString("N");

        await context.QueueStateUpdateAsync(fileId, content, FileContentStateConstants.ScopeName, cancellationToken);
        Console.WriteLine($"FileReadExecutor 将 {message} 写入 Scope:{FileContentStateConstants.ScopeName}");

        return fileId;
    }
}

// 定义并行统计执行器
internal sealed class WordCountingExecutor() : Executor<string, FileStats>("WordCountingExecutor")
{
    public override async ValueTask<FileStats> HandleAsync(string fileId, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        string? content = await context.ReadStateAsync<string>(fileId, FileContentStateConstants.ScopeName, cancellationToken);
        if (content is null)
        {
            throw new InvalidOperationException($"无法在 Scope:{FileContentStateConstants.ScopeName} 中找到 fileId={fileId}");
        }

        int wordCount = content.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
        return new FileStats { WordCount = wordCount };
    }
}

// 定义AggregationExecutor 
internal sealed class ParagraphCountingExecutor() : Executor<string, FileStats>("ParagraphCountingExecutor")
{
    public override async ValueTask<FileStats> HandleAsync(string fileId, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        string? content = await context.ReadStateAsync<string>(fileId, FileContentStateConstants.ScopeName, cancellationToken);
        if (content is null)
        {
            throw new InvalidOperationException($"无法在 Scope:{FileContentStateConstants.ScopeName} 中找到 fileId={fileId}");
        }

        int paragraphCount = content.Split(['\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
        return new FileStats { ParagraphCount = paragraphCount };
    }
}

internal sealed class AggregationExecutor() : Executor<FileStats>("AggregationExecutor")
{
    private readonly List<FileStats> _buffer = [];

    public override async ValueTask HandleAsync(FileStats message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._buffer.Add(message);
        if (this._buffer.Count < 2)
        {
            return;
        }

        int totalWords = this._buffer.Sum(x => x.WordCount);
        int totalParagraphs = this._buffer.Sum(x => x.ParagraphCount);

        var output = new
        {
            总词数 = totalWords,
            总段落数 = totalParagraphs,
            统计时间 = DateTimeOffset.UtcNow
        };

        Console.WriteLine("文档统计结果");
        output.Display();
        await context.YieldOutputAsync(output, cancellationToken);
    }
}

// 定义AggregationExecutor 
internal sealed class AggregationExecutor() : Executor<FileStats>("AggregationExecutor")
{
    private readonly List<FileStats> _buffer = [];

    public override async ValueTask HandleAsync(FileStats message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._buffer.Add(message);
        if (this._buffer.Count < 2)
        {
            return;
        }

        int totalWords = this._buffer.Sum(x => x.WordCount);
        int totalParagraphs = this._buffer.Sum(x => x.ParagraphCount);

        var output = new
        {
            总词数 = totalWords,
            总段落数 = totalParagraphs,
            统计时间 = DateTimeOffset.UtcNow
        };

        Console.WriteLine("文档统计结果");
        output.Display();
        await context.YieldOutputAsync(output, cancellationToken);
    }
}
  • 步骤 1:组装 Fan-out/Fan-in 工作流

我们将以下列顺序建图:

  1. FileReadExecutor 作为入口,并指定最终输出来自 AggregationExecutor
  2. AddFanOutEdge:把文件 ID 同时传给两个统计执行器
  3. AddFanInEdge:等待两个统计执行器完成后,再进入聚合节点

通过 Builder 定义拓扑后,我们会在后续步骤用 RunStreamingAsync 查看事件流。

var fileRead = new FileReadExecutor();
var wordCounting = new WordCountingExecutor();
var paragraphCounting = new ParagraphCountingExecutor();
var aggregate = new AggregationExecutor();

var sharedStateWorkflow = new WorkflowBuilder(fileRead)
    .AddFanOutEdge(fileRead, [wordCounting, paragraphCounting])
    .AddFanInEdge([wordCounting, paragraphCounting], aggregate)
    .WithOutputFrom(aggregate)
    .Build();
Console.WriteLine("Shared State Workflow 构建完成");
  • 步骤 2:流式观察状态读写

我们使用 InProcessExecution.RunStreamingAsync 以事件流的方式运行 workflow,并关注三类事件:

  1. WorkflowStartedEvent / WorkflowCompletedEvent:整体生命周期
  2. ExecutorCompletedEvent:确认每个节点的执行顺序
  3. WorkflowOutputEvent:最终业务输出

同时,我们打印状态写入时生成的 fileId,以便验证 Fan-out 节点读取的是同一个共享数据。

static async Task RunSharedStateDemoAsync(Workflow sharedWorkflow, string documentKey)
{
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine($"演示文档: {documentKey}");
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");

    await using (var run = await InProcessExecution.StreamAsync(sharedWorkflow, documentKey)){
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            switch (evt)
            {
                case WorkflowStartedEvent started:
                    Console.WriteLine($"Workflow Started");
                    break;
                case ExecutorCompletedEvent executorCompleted:
                    Console.WriteLine($"{executorCompleted.ExecutorId} 完成");
                    break;
                case WorkflowOutputEvent outputEvent:
                    Console.WriteLine("收到 Workflow Output Event:");
                    outputEvent.Display();
                    break;
                case WorkflowErrorEvent  errorEvent:
                    Console.WriteLine("收到 Workflow Error Event:");
                    errorEvent.Display();
                    break;
                default:
                    Console.WriteLine($"其他事件: {evt.GetType().Name}");
                    evt.Display();
                    break;
            }
        }
        await run.DisposeAsync();
    }
}
await RunSharedStateDemoAsync(sharedStateWorkflow, "ProductBrief");

3. 最佳实践

  • 命名规范

    • 使用 Scope + 功能命名(例:FileContentState)

    • 键值建议为业务 ID(如 fileId、turnId),避免直接储存整段文本作为 key

  • 状态生命周期

    • 在数据敏感的场景结束后调用 QueueClearScopeAsync

    • 长时间运行的 Workflow 可定期写入 UpdatedAt,便于调试

  • 并发注意事项

    • Fan-out 节点共享同一 state 时,只写 ID,不写大对象

    • 如果必须写大对象,可在读取端做 null 检查并提供降级策略

  • 调试建议

    • 使用 ReadStateKeysAsync 打印当前作用域的所有键

    • 配合 WorkflowEvent 日志,快速定位状态缺失或覆盖问题

  • 常见踩坑

    • 在 Executor 中保存可变引用(List/Dictionary),会被后续步骤修改

    • 在多个 scope 中重复使用相同 key,导致取值混乱

    • 忘记 await QueueStateUpdateAsync,导致状态未落盘

七、工作流上下文

1. API 分类速查表

IWorkflowContext 是 Executor 与 Workflow 引擎交互的唯一入口,它提供了以下能力:

分类 API 用途 常见场景
输出管理 SendMessageAsync 向下游 Executor 发送消息 流程编排、数据传递
AddEventAsync 触发自定义业务事件 进度通知、审计日志
YieldOutputAsync 输出最终业务结果 工作流返回值
流程控制 RequestHaltAsync 暂停工作流执行 人工审批、等待外部系统
状态管理 ReadStateAsync 读取状态值 获取共享数据
ReadOrInitStateAsync 读取或初始化状态 计数器、缓存、配置
QueueStateUpdateAsync 更新状态值 写入业务数据
ReadStateKeysAsync 枚举作用域中的所有键 调试、动态路由
QueueClearScopeAsync 清理整个作用域 敏感数据清理
元数据 TraceContext 链路追踪上下文 分布式追踪
ConcurrentRunsEnabled 是否支持并发运行 并发控制

2. 核心概念:三种输出机制的本质区别

这是最容易混淆的部分!理解它们的区别是掌握 Context 的关键。

  • SendMessageAsync - 流程内部通信

    特点:

    • 目标是下游 Executor(通过 Edge 连接的节点)

    • 消息在下一个 SuperStep才会被接收

    • 可以指定 targetId 进行精确路由

    • 如果没有连接关系,消息会被丢弃

    典型场景:

    • 动态路由(根据条件发送给不同的 Executor)

    • 广播模式(一个 Executor 发送给多个下游)

    • 流程编排中的数据传递

  • AddEventAsync - 业务事件通知

    特点:

    • 目标是外部订阅者(调用 workflow 的客户端)

    • 事件在当前 SuperStep 结束时立即发出

    • 用于传递业务进度、审计日志、中间状态

    • 不影响工作流的执行流程

    典型场景:

    • 进度通知("正在生成大纲…"、"已完成 3/10 个文档")

    • 审计日志("用户操作已记录"、"检测到敏感词")

    • 调试信息(自定义 Event 携带中间变量)

  • YieldOutputAsync - 最终业务结果

    特点:

    • 目标是工作流的最终返回值

    • 通过 WorkflowOutputEvent 封装返回

    • 通常在 WithOutputFrom() 指定的步骤中调用

    • 一个工作流可以有多个输出(多次调用 YieldOutputAsync)

    典型场景:

    • 工作流的最终结果(生成的文章、处理后的数据)

    • 批量输出(MapReduce 的 Reduce 结果)

    • 结构化返回值(JSON、DTO)

  • 记忆口诀:

    • Send = 内部流程通信(Executor → Executor)
    • Event = 进度广播(Executor → 订阅者)
    • Output = 最终结果(Workflow → 调用者)
  • 对比总结

维度 SendMessageAsync AddEventAsync YieldOutputAsync
接收者 下游 Executor 外部订阅者 外部订阅者
触发时机 下一个 SuperStep 当前 SuperStep 结束 当前 SuperStep 结束
用途 流程内部数据传递 进度通知/审计日志 最终业务结果
是否影响流程 是(触发下游执行)
典型事件类型 N/A 自定义 WorkflowEvent WorkflowOutputEvent

2. 实战演示:Context Explorer

我们将创建一个专门的 ContextExplorerExecutor,它会依次调用 IWorkflowContext 的所有关键 API,并通过事件流展示每个方法的效果。

演示场景设计

flowchart TD
    Start[用户输入: 探索模式] --> Explorer[ContextExplorerExecutor]
    
    Explorer --> S1[1.状态管理演示]
    S1 --> S1A[QueueStateUpdateAsync]
    S1A --> S1B[ReadStateAsync]
    S1B --> S1C[ReadOrInitStateAsync]
    S1C --> S1D[ReadStateKeysAsync]
    
    Explorer --> S2[2.输出机制演示]
    S2 --> S2A[SendMessageAsync]
    S2A --> S2B[AddEventAsync]
    S2B --> S2C[YieldOutputAsync]
    
    Explorer --> S3[3.元数据演示]
    S3 --> S3A[TraceContext]
    S3A --> S3B[ConcurrentRunsEnabled]
    
    S1D --> Receiver[DownstreamExecutor]
    S2C --> Output[WorkflowOutputEvent]
    
    style Explorer fill:#e1f5ff
    style S1 fill:#fff4e1
    style S2 fill:#f3e5f5
    style S3 fill:#e8f5e9
  • 步骤 1:定义自定义事件和 Context Explorer
// 自定义事件:用于报告探索进度
internal sealed class ExplorationProgressEvent(string category, string api, string result) : WorkflowEvent
{
    public string Category { get; } = category;
    public string API { get; } = api;
    public string Result { get; } = result;
}

internal sealed class ContextExplorerExecutor() : Executor<string, string>("ContextExplorerExecutor")
{
    private const string DemoScope = "ExplorerDemoScope";

    public override async ValueTask<string> HandleAsync(string mode, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"ContextExplorer 开始工作 (模式: {mode})");
        Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    
        // ============ 1.状态管理演示 ============
        await this.DemonstrateStateManagementAsync(context, cancellationToken);
    
        // ============ 2.输出机制演示 ============
        await this.DemonstrateOutputMechanismsAsync(context, cancellationToken);
    
        // ============ 3.元数据演示 ============
        await this.DemonstrateMetadataAsync(context, cancellationToken);
    
        Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
        Console.WriteLine("ContextExplorer 工作完成");
    
        return "探索完成";
    }
    
    private async ValueTask DemonstrateStateManagementAsync(IWorkflowContext context, CancellationToken cancellationToken)
    {
        Console.WriteLine("\n【1.状态管理 API 演示】");
    
        // 1. QueueStateUpdateAsync - 写入状态
        await context.QueueStateUpdateAsync("counter", 100, DemoScope, cancellationToken);
        await context.AddEventAsync(new ExplorationProgressEvent(
            "状态管理", 
            "QueueStateUpdateAsync", 
            "写入 counter=100"), cancellationToken);
        Console.WriteLine(" QueueStateUpdateAsync: 写入 counter=100");
    
        // 2. ReadStateAsync - 读取状态
        var counter = await context.ReadStateAsync<int>("counter", DemoScope, cancellationToken);
        await context.AddEventAsync(new ExplorationProgressEvent(
            "状态管理", 
            "ReadStateAsync", 
            $"读取到 counter={counter}"), cancellationToken);
        Console.WriteLine($" ReadStateAsync: 读取到 counter={counter}");
    
        // 3. ReadOrInitStateAsync - 惰性初始化
        var config = await context.ReadOrInitStateAsync(
            "config", 
            () => new { MaxRetry = 3, Timeout = 30 }, 
            DemoScope, 
            cancellationToken);
        await context.AddEventAsync(new ExplorationProgressEvent(
            "状态管理", 
            "ReadOrInitStateAsync", 
            $"初始化配置: MaxRetry={config.MaxRetry}"), cancellationToken);
        Console.WriteLine($" ReadOrInitStateAsync: 初始化配置 MaxRetry={config.MaxRetry}");
    
        // 4. ReadStateKeysAsync - 枚举所有键
        var keys = await context.ReadStateKeysAsync(DemoScope, cancellationToken);
        await context.AddEventAsync(new ExplorationProgressEvent(
            "状态管理", 
            "ReadStateKeysAsync", 
            $"作用域中的键: {string.Join(", ", keys)}"), cancellationToken);
        Console.WriteLine($" ReadStateKeysAsync: 共 {keys.Count} 个键 [{string.Join(", ", keys)}]");
    
        // 5. QueueClearScopeAsync - 清理作用域(演示中不实际执行,避免影响后续测试)
        // await context.QueueClearScopeAsync(DemoScope, cancellationToken);
        Console.WriteLine(" QueueClearScopeAsync: 已跳过(避免清理演示数据)");
    }
    
    private async ValueTask DemonstrateOutputMechanismsAsync(IWorkflowContext context, CancellationToken cancellationToken)
    {
        Console.WriteLine("\n【2.输出机制 API 演示】");
    
        // 1. SendMessageAsync - 发送给下游 Executor
        await context.SendMessageAsync("来自 ContextExplorer 的消息", targetId: null, cancellationToken);
        await context.AddEventAsync(new ExplorationProgressEvent(
            "输出机制", 
            "SendMessageAsync", 
            "已发送消息到下游"), cancellationToken);
        Console.WriteLine(" SendMessageAsync: 已发送消息到下游 Executor");
    
        // 2. AddEventAsync - 触发自定义事件
        await context.AddEventAsync(new ExplorationProgressEvent(
            "输出机制", 
            "AddEventAsync", 
            "这是一个自定义业务事件"), cancellationToken);
        Console.WriteLine(" AddEventAsync: 已触发自定义事件");
    
        // 3. YieldOutputAsync - 输出最终结果
        var finalResult = new
        {
            探索时间 = DateTimeOffset.UtcNow,
            API总数 = 11,
            状态作用域 = DemoScope
        };
        await context.YieldOutputAsync(finalResult, cancellationToken);
        Console.WriteLine(" YieldOutputAsync: 已输出最终结果");
    }
    
    private async ValueTask DemonstrateMetadataAsync(IWorkflowContext context, CancellationToken cancellationToken)
    {
        Console.WriteLine("\n【3.元数据 API 演示】");
    
        // 1. TraceContext - 链路追踪上下文
        var traceContext = context.TraceContext;
        var traceInfo = traceContext != null 
            ? string.Join(", ", traceContext.Select(kv => $"{kv.Key}={kv.Value}"))
            : "null";
        await context.AddEventAsync(new ExplorationProgressEvent(
            "元数据", 
            "TraceContext", 
            traceInfo), cancellationToken);
        Console.WriteLine($" TraceContext: {traceInfo}");
    
        // 2. ConcurrentRunsEnabled - 并发支持
        var concurrentEnabled = context.ConcurrentRunsEnabled;
        await context.AddEventAsync(new ExplorationProgressEvent(
            "元数据", 
            "ConcurrentRunsEnabled", 
            concurrentEnabled.ToString()), cancellationToken);
        Console.WriteLine($" ConcurrentRunsEnabled: {concurrentEnabled}");
    }
}
  • 步骤 2:创建下游接收器

为了演示 SendMessageAsync 的效果,我们需要一个下游 Executor 来接收消息。

internal sealed class DownstreamReceiverExecutor() : Executor<string, string>("DownstreamReceiverExecutor")
{
    public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"DownstreamReceiver 收到消息: {message}");        
        await context.AddEventAsync(new ExplorationProgressEvent(
            "下游接收", 
            "HandleAsync", 
            $"已处理消息: {message}"), cancellationToken);    
        return "消息已接收";
    }
}
  • 步骤 3:构建工作流

将 ContextExplorerExecutor 和DownstreamReceiverExecutor 连接起来,形成完整的工作流。

var explorer = new ContextExplorerExecutor();
var receiver = new DownstreamReceiverExecutor();

var contextExplorerWorkflow = new WorkflowBuilder(explorer)
    .AddEdge(explorer, receiver)
    .WithOutputFrom(explorer) // 输出来自 explorer 的 YieldOutputAsync
    .Build();
  • 步骤 4:流式运行并观察事件

通过流式运行,我们可以实时观察每个 API 调用产生的事件。

static async Task RunContextExplorerAsync(Workflow workflow, string mode)
{
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine($"开始探索 IWorkflowContext API (模式: {mode})");
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");

    await using (var run = await InProcessExecution.StreamAsync(workflow, mode))
    {
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            switch (evt)
            {
                case WorkflowStartedEvent:
                    Console.WriteLine("[系统事件] Workflow 已启动\n");
                    break;
    
                case ExplorationProgressEvent progress:
                    Console.WriteLine($"[业务事件] 分类: {progress.Category}");
                    Console.WriteLine($" API: {progress.API}");
                    Console.WriteLine($" 结果: {progress.Result}\n");
                    break;
    
                case ExecutorCompletedEvent executorCompleted:
                    Console.WriteLine($"[系统事件] {executorCompleted.ExecutorId} 执行完成\n");
                    break;
    
                case WorkflowOutputEvent outputEvent:
                    Console.WriteLine("[输出事件] WorkflowOutputEvent:");
                    outputEvent.Display();
                    Console.WriteLine();
                    break;
    
                default:
                    Console.WriteLine($"[其他事件] {evt.GetType().Name}");
                    break;
            }
        }
    }    
    Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
}

await RunContextExplorerAsync(contextExplorerWorkflow, "完整探索");
  • SendMessageAsync targetId 精准路由实战

典型场景:Intent Router 同时连接图像、文本、兜底三个 Executor,希望只唤醒匹配的下游。

flowchart LR
    R[IntentRouter] --> I[ImageGenerator]
    R --> T[TextGenerator]
    R --> F[FallbackExecutor]
    R -.targetId=image.-> I
    R -.targetId=report.-> T
    R -.targetId=FallbackExecutor.-> F
    style R fill:#e1f5ff
    style I fill:#fff4e1
    style T fill:#f3e5f5
    style F fill:#e8f5e9
internal sealed class IntentRouterExecutor() : Executor<string, string>("IntentRouter")
{
    private readonly Dictionary<string, string> _routingTable = new(StringComparer.OrdinalIgnoreCase)
    {
        ["image"] = "ImageGenerator",
        ["report"] = "TextGenerator"
    };

    public override async ValueTask<string> HandleAsync(string intent, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        var targetId = this._routingTable.TryGetValue(intent, out var candidate) ? candidate : "FallbackExecutor";
        Console.WriteLine($"Router 判定 {intent} 应派发给 {targetId}");
    
        await context.SendMessageAsync($"指令: {intent}", targetId, cancellationToken);
        return $"消息已定向到 {targetId}";
    }
}

internal sealed class ImageGeneratorExecutor() : Executor<string, string>("ImageGenerator")
{
    public override ValueTask<string> HandleAsync(string payload, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"ImageGenerator 收到: {payload}");
        return ValueTask.FromResult("图像任务完成");
    }
}

internal sealed class TextGeneratorExecutor() : Executor<string, string>("TextGenerator")
{
    public override ValueTask<string> HandleAsync(string payload, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"TextGenerator 收到: {payload}");
        return ValueTask.FromResult("文本任务完成");
    }
}

internal sealed class FallbackExecutor() : Executor<string, string>("FallbackExecutor")
{
    public override ValueTask<string> HandleAsync(string payload, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"FallbackExecutor 接管: {payload}");
        return ValueTask.FromResult("兜底处理完成");
    }
}

static async Task RunTargetIdDemoAsync(string intent)
{
    var router = new IntentRouterExecutor();
    var imageExecutor = new ImageGeneratorExecutor();
    var textExecutor = new TextGeneratorExecutor();
    var fallbackExecutor = new FallbackExecutor();

    var workflow = new WorkflowBuilder(router)
        .AddEdge(router, imageExecutor)
        .AddEdge(router, textExecutor)
        .AddEdge(router, fallbackExecutor)
        .WithOutputFrom(router)
        .Build();
    
    Console.WriteLine($"\n输入: {intent}");
    
    await using (var run = await InProcessExecution.StreamAsync(workflow, intent)){
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine("Router 输出:");
                outputEvent.Display();
            }
        }
    }
}
await RunTargetIdDemoAsync("image");
await RunTargetIdDemoAsync("report");
await RunTargetIdDemoAsync("speech");

关键提示:

  • targetId 必须等于下游 Executor 的 Id(构造函数里指定)
  • 上游可以保留多条 Edge,但只有 targetId 命中的节点会收到消息
  • 适合意图路由、A/B 测试、租户隔离等需要精确派发的场景

3. 高级场景:QueueClearScopeAsync 的正确使用

QueueClearScopeAsync 用于清理整个作用域中的所有状态,这在处理敏感数据时非常重要。

场景 说明 示例
敏感数据清理 处理完用户隐私数据后立即清理 银行交易、医疗记录
临时状态回收 中间计算结果不再需要 MapReduce 的 Map 阶段结果
生命周期管理 某个业务流程结束后清理上下文 订单处理完成后清理订单缓存
测试隔离 每个测试用例结束后清理状态 单元测试、集成测试
internal sealed class SecureDataExecutor() : Executor<string, string>("SecureDataExecutor")
{
    private const string SensitiveDataScope = "SensitiveData";

    public override async ValueTask<string> HandleAsync(
        string userId, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 1. 读取敏感数据
        var userData = await LoadUserDataAsync(userId);
        await context.QueueStateUpdateAsync("userData", userData, SensitiveDataScope, cancellationToken);

        // 2. 处理业务逻辑
        var result = await ProcessSecureDataAsync(userData);

        // 3. 处理完成后立即清理敏感数据
        await context.QueueClearScopeAsync(SensitiveDataScope, cancellationToken);
        Console.WriteLine("敏感数据已清理");

        return result;
    }

    private Task<string> LoadUserDataAsync(string userId) => Task.FromResult($"UserData-{userId}");

    private Task<string> ProcessSecureDataAsync(string data) => Task.FromResult($"Processed-{data}");
}

4. 最佳实践

  • 输出机制选择指南
flowchart TD
    Start{需要输出什么?}
    Start -->|给下游 Executor| A[SendMessageAsync]
    Start -->|进度/日志| B[AddEventAsync]
    Start -->|最终结果| C[YieldOutputAsync]
    
    A --> A1[流程编排<br/>动态路由<br/>数据传递]
    B --> B1[进度通知<br/>审计日志<br/>调试信息]
    C --> C1[业务返回值<br/>批量输出<br/>结构化结果]
  • 状态管理最佳实践
实践 说明 示例
使用常量定义 Scope 避免硬编码字符串 const string MyScope = "OrderContext";
命名约定 {模块名}Scope 或 {功能}Context FileContentScope, UserSessionContext
读写分离 写操作集中在入口步骤,读操作分散在下游 Fan-out/Fan-in 模式
惰性初始化 使用 ReadOrInitStateAsync 避免重复初始化 配置、缓存、计数器
及时清理 处理完敏感数据后立即调用 QueueClearScopeAsync 用户隐私数据、临时文件
  • 元数据利用
public override async ValueTask<string> HandleAsync(
    string message, 
    IWorkflowContext context, 
    CancellationToken cancellationToken = default)
{
    // 1. 检查并发支持
    if (context.ConcurrentRunsEnabled)
    {
        Console.WriteLine("当前环境支持并发执行");
    }

    // 2. 传递链路追踪上下文
    if (context.TraceContext != null)
    {
        var traceId = context.TraceContext.GetValueOrDefault("traceId");
        Console.WriteLine($"TraceId: {traceId}");
        
        // 传递给下游服务(如 HTTP 请求)
        await CallExternalServiceAsync(traceId);
    }

    return "完成";
}
  • 常见错误与解决方案
错误 原因 解决方案
SendMessageAsync 后下游没收到 没有建立 Edge 连接 使用 AddEdge() 连接节点
状态读取返回 null Scope 名称不一致 使用常量统一管理 Scope 名称
并发冲突 多个 Executor 同时写入同一个 Key 使用不同的 Key 或加入步骤序号
事件没有触发 忘记 await AddEventAsync 确保所有异步方法都被 await