Spiga

Net+AI智能体进阶8:Workflow工作流

2025-11-22 10:34:18

一、工作流基础

1. 什么是 Workflow Orchestration

在企业级 AI 应用开发中,我们经常面临以下挑战:

  • 单个 Agent 难以处理复杂任务:一个 Agent 无法同时擅长需求分析、代码生成、测试等多个领域
  • 业务逻辑与 AI 调用耦合:复杂的条件判断、循环、并发控制分散在代码各处
  • 流程难以可视化和维护:多步骤的 AI 处理流程缺乏清晰的结构
  • 缺乏统一的状态管理:多个步骤之间的数据传递和状态共享容易出错

Workflow Orchestration (工作流编排) 正是为了解决这些问题而生:

  • 模块化设计:将复杂任务拆分为独立的 Executor 和 Agent
  • 清晰的流程定义:使用 Builder API 构建可读、可维护的流程图
  • 灵活的流程控制:支持条件分支、循环迭代、并发执行等复杂模式
  • 统一的状态管理:内置 WorkflowContext 管理跨步骤的状态和数据
  • 实时流式反馈:通过事件机制实时监控工作流的执行进度

MAF Workflow 位于应用层和 Agent 层之间,负责:

  • 编排多个 Agent:决定 Agent 的执行顺序、条件和并发策略
  • 管理数据流:在 Agent 和 Executor 之间传递数据
  • 监控执行状态:实时报告工作流的执行进度和结果
  • 错误处理:统一处理执行过程中的异常和重试逻辑

2. Workflow 的核心构建块

Workflow 由以下三个核心概念组成:

  • Executor (执行器) - 处理单元:Executor 是工作流的基本处理单元,负责执行具体的业务逻辑。

    特点:

    • 强类型输入:TInput 定义接收的数据类型
    • 强类型输出:TOutput 定义返回的数据类型
    • 纯逻辑处理:可以是数据转换、验证、格式化等任何操作
    • Agent 也是 Executor:AIAgent 可以直接作为 Executor 使用

    示例用途:

    • 文本转换 (大写、反转、清理)
    • 数据验证 (格式检查、安全过滤)
    • 结果聚合 (合并多个输出)
    • AI 调用 (通过 Agent)
public abstract class Executor<TInput, TOutput> : Executor
{
    public abstract ValueTask<TOutput> HandleAsync(
        TInput input, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default);
}
  • Edge (边) - 数据流路径:Edge 定义了 Executor 之间的数据流动方向和连接关系。

    类型:

    • 顺序边 (Sequential Edge):A → B - 数据从 A 流向 B

    • 条件边 (Conditional Edge):A → B or C - 根据条件选择路径

    • Fan-out 边:A → [B, C, D] - 一个输出分发给多个 Executor

    • Fan-in 边:[B, C, D] → E - 多个输出汇聚到一个 Executor

  • Workflow (工作流) - 完整流程定义:Workflow 是由多个 Executor 通过 Edge 连接而成的完整处理流程。

    执行方式:

    • 同步执行:RunAsync() - 等待完整结果
    • 流式执行:StreamAsync() - 实时接收事件流

    构建方式:

// 使用 WorkflowBuilder 构建工作流
WorkflowBuilder builder = new(startExecutor);
builder.AddEdge(executorA, executorB);
builder.AddEdge(executorB, executorC);
builder.WithOutputFrom(executorC);
Workflow workflow = builder.Build();

3. 第一个工作流:文本处理管道

业务场景:将用户输入的文本 → 转为大写 → 反转顺序

  • 步骤 1:定义第一个 Executor - 大写转换。

    • 继承 Executor<TInput, TOutput>:

      • TInput = string:接收字符串输入

      • TOutput = string:返回字符串输出

      • 构造函数传入 "UppercaseExecutor" 作为唯一标识符

    • 实现 HandleAsync 方法:

      • message:接收上一个步骤传递的数据 (对于第一个 Executor,是工作流的输入)

      • context:工作流上下文,用于访问共享状态、发布事件等 (后续课程详解)

      • cancellationToken:用于取消操作

      • 返回值:会自动作为消息沿着 Edge 传递给下一个 Executor

    • 业务逻辑:

      • 使用 ToUpperInvariant() 进行文化无关的大写转换

      • 返回 ValueTask (高性能异步模式)

/// <summary>
/// 大写转换执行器 - 将输入文本转换为大写
/// </summary>
public class UppercaseExecutor : Executor<string, string>
{
    public UppercaseExecutor() : base("UppercaseExecutor") { }

    /// <summary>
    /// 核心处理方法 - 将输入文本转为大写
    /// </summary>
    public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // 执行业务逻辑: 转为大写
        string result = message.ToUpperInvariant();
        
        // 返回处理结果 (会自动沿着 Edge 传递给下一个 Executor)
        return ValueTask.FromResult(result);
    }
}
  • 步骤 2:定义第二个 Executor - 文本反转
/// <summary>
/// 文本反转执行器 - 将输入文本反转
/// </summary>
public class ReverseTextExecutor : Executor<string, string>
{
    public ReverseTextExecutor() : base("ReverseTextExecutor") { }

    /// <summary>
    /// 核心处理方法 - 反转输入文本
    /// </summary>
    public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // 执行业务逻辑: 反转字符串
        string result = string.Concat(message.Reverse());
        
        // 返回处理结果
        return ValueTask.FromResult(result);
    }
  • 步骤 3: 使用 WorkflowBuilder 构建工作流:将两个 Executor 通过 Edge 连接起来,组成完整的工作流。

    • new WorkflowBuilder(uppercaseExecutor):

      • 创建 WorkflowBuilder 并指定起始 Executor

      • 工作流的输入数据会首先传递给这个 Executor

    • AddEdge(source, target):

      • 添加一条有向边 (Edge),定义数据流动方向

      • source 的输出会自动传递给 target 的输入

      • 类型检查:source 的 TOutput 必须匹配 target 的 TInput

    • WithOutputFrom(executor):

      • 指定工作流的最终输出来自哪个 Executor

      • 如果不调用此方法,默认输出来自最后一个 Executor

    • Build():

      • 完成构建,返回一个不可变的 Workflow 对象

      • 此时会进行拓扑验证:检查是否有循环依赖、孤立节点等

// 创建 Executor 实例
UppercaseExecutor uppercaseExecutor = new UppercaseExecutor();
ReverseTextExecutor reverseExecutor = new ReverseTextExecutor();

// 使用 WorkflowBuilder 构建工作流
WorkflowBuilder builder = new WorkflowBuilder(uppercaseExecutor); // 指定起始 Executor
builder.AddEdge(uppercaseExecutor, reverseExecutor);              // 添加边: uppercase → reverse
builder.WithOutputFrom(reverseExecutor);                          // 指定输出来自 reverse

// 构建最终的工作流对象
Workflow workflow = builder.Build();
Console.WriteLine("工作流构建完成");
new 
{ 
    起始节点 = "UppercaseExecutor",
    边 = "UppercaseExecutor → ReverseTextExecutor",
    输出节点 = "ReverseTextExecutor"
}.Display();
  • 步骤 4: 同步执行工作流

    • InProcessExecution.RunAsync(workflow, input):

      • 在当前进程内执行工作流 (同步等待完成)

      • input 参数会传递给起始 Executor

      • 返回 Run 对象,包含执行结果和事件

    • Run.NewEvents:

      • 获取工作流执行过程中产生的所有事件

      • 主要事件类型:

        • WorkflowStartedEvent:工作流启动

        • ExecutorCompletedEvent:某个 Executor 执行完成

        • WorkflowCompletedEvent:工作流完成

    • ExecutorCompletedEvent:

      • ExecutorId:执行器的唯一标识符

      • Data:执行器的输出数据 (object 类型,需要转换)

      • 事件按执行顺序排列

// 执行工作流 - 同步模式
string input = "Hello, World!";
Console.WriteLine($"输入: {input}");
Console.WriteLine("开始执行工作流...\n");
await using (Run run = await InProcessExecution.RunAsync(workflow, input))
{
    // 处理执行结果
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine("执行结果:\n");
    foreach (WorkflowEvent evt in run.NewEvents)
    {
        if (evt is ExecutorCompletedEvent executorCompleted)
        {
            new
            {
                执行器 = executorCompleted.ExecutorId,
                输出数据 = executorCompleted.Data
            }.Display();
            Console.WriteLine();
        }
    }
    Console.WriteLine("工作流执行完成");
}

4. 流式执行

流式执行允许我们在工作流运行过程中实时接收事件流,而不是等待全部完成。

sequenceDiagram
    participant App as 应用代码
    participant WF as Workflow
    participant E1 as UppercaseExecutor
    participant E2 as ReverseTextExecutor

    App->>WF: StreamAsync(input)
    WF->>E1: 执行
    E1-->>App: ExecutorCompletedEvent (立即返回)
    WF->>E2: 执行
    E2-->>App: ExecutorCompletedEvent (立即返回)
    WF-->>App: WorkflowCompletedEvent

对比:

特性 同步执行 (RunAsync) 流式执行 (StreamAsync)
执行模式 阻塞等待全部完成 返回异步流 (IAsyncEnumerable)
事件获取 一次性获取所有事件 实时逐个接收事件
适用场景 短时间任务、批处理 长时间任务、实时反馈
用户体验 等待后一次性显示结果 逐步显示进度
  • 步骤 5:使用 StreamAsync 实现流式执行

    • InProcessExecution.StreamAsync(workflow, input):

      • 启动流式执行,立即返回 StreamingRun 对象

      • 不会阻塞等待工作流完成

    • streamingRun.WatchStreamAsync():

      • 返回 IAsyncEnumerable 异步流

      • 使用 await foreach 实时迭代事件

      • 每当一个 Executor 完成,就会产生一个新的 ExecutorCompletedEvent

    • 时间戳 DateTime.Now:

      • 演示事件是逐个实时产生的

      • 在实际的长时间任务中,你会看到明显的时间间隔

    • 主要事件类型:

      • WorkflowStartedEvent:工作流开始

      • ExecutorCompletedEvent:单个 Executor 完成

      • WorkflowOutputEvent:整个工作流完成

// 执行工作流 - 流式模式
string streamingInput = "Workflow Streaming!";
Console.WriteLine($"输入: {streamingInput}");
Console.WriteLine("开始流式执行工作流...\n");
await using (StreamingRun streamingRun = await InProcessExecution.StreamAsync(workflow, streamingInput))
{
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine("实时执行进度:\n");
    // 实时监听事件流
    await foreach (WorkflowEvent evt in streamingRun.WatchStreamAsync())
    {
        if (evt is ExecutorCompletedEvent executorCompleted)
        {
            Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {executorCompleted.ExecutorId} 完成");
            new
            {
                输出数据 = executorCompleted.Data
            }.Display();
            Console.WriteLine();
        }
        else if (evt is WorkflowStartedEvent)
        {
            Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 工作流启动\n");
        }
        else if (evt is WorkflowOutputEvent)
        {
            Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 工作流完成");
            Console.WriteLine($"工作流输出:{evt.Data}");
        }
    }
}

5. 简化写法

使用 Lambda 表达式创建 Executor:对于简单的转换逻辑,每次都定义完整的 Executor 类显得过于繁琐。MAF 提供了 BindAsExecutor 扩展方法,允许我们将普通函数绑定为 Executor。

// 使用 Lambda 表达式定义转换逻辑
Func<string, string> toUpperFunc = input => input.ToUpperInvariant();
Func<string, string> reverseFunc = input => string.Concat(input.Reverse());

// 将函数绑定为 Executor
var upperExecutor = toUpperFunc.BindAsExecutor("UpperExecutor");
var reverseExecutor2 = reverseFunc.BindAsExecutor("ReverseExecutor");

// 构建工作流 (与之前相同的流程)
WorkflowBuilder lambdaBuilder = new WorkflowBuilder(upperExecutor);
lambdaBuilder.AddEdge(upperExecutor, reverseExecutor2);
lambdaBuilder.WithOutputFrom(reverseExecutor2);
Workflow lambdaWorkflow = lambdaBuilder.Build();

代码解析:

  • BindAsExecutor(name):

    • 将 Func<TInput, TOutput> 绑定为 Executor<TInput, TOutput>

    • 参数 name:Executor 的唯一标识符

    • 限制:只适用于同步、无副作用的纯函数转换

  • 适用场景:

    • 数据格式转换 (大写、小写、Trim)

    • 简单计算 (字符串拼接、数学运算)

    • 数据映射 (DTO 转换)

  • 不适用场景

    • 需要访问 IWorkflowContext 的逻辑

    • 需要异步操作 (如数据库查询、API调用)

    • 复杂的状态管理

建议:

  • 简单转换用 Lambda
  • 复杂逻辑用完整的 Executor 类

二、集成 Agent 到工作流

1. 为什么要 Agent in Workflow

在工作流中,我们有两种主要的执行单元:

特性 Executor Agent
定义 同步/异步的业务逻辑处理单元 基于 LLM 的智能对话式处理单元
适用场景 确定性逻辑(格式化、验证、计算) 需要智能判断、生成、理解的任务
输入输出 任意类型 (TInput/TOutput) ChatMessage
执行方式 立即执行 需要 TurnToken 触发
成本 几乎无成本 LLM API 调用成本
可预测性 100% 可预测 基于模型能力,存在不确定性

将 Agent 集成到工作流的优势:

  • 自动化数据流:工作流引擎自动管理 Agent 之间的消息传递
  • 统一监控:通过 WorkflowEvent 统一监听所有 Agent 的执行状态
  • 灵活组合:Agent 可以与 Executor 自由组合,构建复杂业务逻辑
  • 错误处理:工作流级别的异常处理和重试机制
  • 可视化:清晰的流程图,便于理解和维护

2. 将 Agent 添加到工作流

  • 创建一个简单的翻译 Agent
// 创建一个法语翻译 Agent
var frenchAgent = new ChatClientAgent(
    chatClient, 
    "You are a translation assistant that translates the provided text to French."
);
  • 将 Agent 添加到工作流
// 构建包含单个 Agent 的工作流
var simpleWorkflow = new WorkflowBuilder(frenchAgent)
    .Build();
  • 执行工作流 - TurnToken 机制详解

关键概念:当 Agent 被用作工作流步骤时,它不会立即执行。需要通过 TurnToken 来触发 Agent 的处理。

sequenceDiagram
    participant User as 用户
    participant Workflow as 工作流引擎
    participant Agent as Agent (Executor)
    participant LLM as LLM
    
    User->>Workflow: StreamAsync(input)
    Workflow->>Agent: 传递 ChatMessage
    Note over Agent: 消息被缓存,等待触发
    
    User->>Workflow: TrySendMessageAsync(TurnToken)
    Workflow->>Agent: 传递 TurnToken
    Agent->>LLM: 发送请求
    LLM-->>Agent: 返回响应
    Agent-->>Workflow: AgentRunUpdateEvent
    Workflow-->>User: 流式输出事件
// 执行工作流
Console.WriteLine("开始执行工作流...");
Console.WriteLine();
var input = new ChatMessage(ChatRole.User, "Hello World!");
await using (StreamingRun run = await InProcessExecution.StreamAsync(simpleWorkflow, input))
{
    Console.WriteLine("发送 TurnToken 触发 Agent...");

    // 关键步骤:发送 TurnToken 来触发 Agent 执行
    await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

    Console.WriteLine();
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine("监听工作流事件...");
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine();

    await foreach (WorkflowEvent evt in run.WatchStreamAsync())
    {
        if (evt is AgentRunUpdateEvent agentUpdate)
        {
            Console.WriteLine($"Agent 输出: {agentUpdate.ExecutorId}");
            Console.WriteLine($"内容: {agentUpdate.Data}");
            Console.WriteLine();
        }
    }
}

3. 关键概念解析

  • 为什么需要 TurnToken?

    在对话式 AI 系统中,Agent 可能需要等待多条消息(如用户输入、工具调用结果)后再统一处理。TurnToken 作为一个 的信号,告诉 Agent:

    • "所有输入已就绪,可以开始处理了"

    • "这是一个完整的对话回合"

  • TurnToken 的参数

    • emitEvents:是否发出执行事件(如 AgentRunUpdateEvent

    • 设置为 true 可以实时监控 Agent 的输出

new TurnToken(emitEvents: true)
  • AgentRunUpdateEvent

    这是 Agent 在工作流中执行时发出的事件:

public class AgentRunUpdateEvent : WorkflowEvent
{
    public string ExecutorId { get; }    // Agent 的 ID
    public object Data { get; }          // Agent 的输出内容
    public string Status { get; }        // 执行状态
}

4. 实战:翻译链

让我们构建一个更有趣的案例:将英文依次翻译为法语、西班牙语,最后再翻译回英语,看看经过三次翻译后内容会有什么变化。

  • 创建三个翻译 Agent
// 定义一个辅助方法来创建翻译 Agent
AIAgent CreateTranslationAgent(string targetLanguage, IChatClient client)
{
    return new ChatClientAgent(
        client,
        $"You are a translation assistant that translates the provided text to {targetLanguage}."
    );
}
// 创建三个翻译 Agent
var frenchTranslator = CreateTranslationAgent("French", chatClient);
var spanishTranslator = CreateTranslationAgent("Spanish", chatClient);
var englishTranslator = CreateTranslationAgent("English", chatClient);
  • 构建翻译链工作流:使用 AddEdge 将三个 Agent 顺序连接
// 构建工作流:English → French → Spanish → English
var translationChainWorkflow = new WorkflowBuilder(frenchTranslator)
    .AddEdge(frenchTranslator, spanishTranslator)
    .AddEdge(spanishTranslator, englishTranslator)
    .Build();
  • 执行翻译链
Console.WriteLine("开始执行翻译链工作流...");
Console.WriteLine();
var inputMessage = new ChatMessage(ChatRole.User, "Artificial Intelligence is transforming the world!");
Console.WriteLine($"原始输入: {inputMessage.Text}");
Console.WriteLine();

await using (StreamingRun chainRun = await InProcessExecution.StreamAsync(translationChainWorkflow, inputMessage)){
    // 发送 TurnToken 触发所有 Agent
    await chainRun.TrySendMessageAsync(new TurnToken(emitEvents: true));
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine("翻译过程跟踪");
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine();
    int stepNumber = 1;
    await foreach (WorkflowEvent evt in chainRun.WatchStreamAsync())
    {
        if (evt is AgentRunUpdateEvent agentEvent)
        {
            Console.WriteLine($"Step {stepNumber}: {agentEvent.ExecutorId}");
            Console.WriteLine($"   翻译结果: {agentEvent.Data}");
            Console.WriteLine();
            stepNumber++;
        }
    }
    Console.WriteLine("翻译链执行完成");
}

观察与分析

通过上面的输出,会发现:

  • 顺序执行:三个 Agent 按照定义的顺序依次执行
  • 自动传递:每个 Agent 的输出自动成为下一个 Agent 的输入
  • 实时反馈:通过 AgentRunUpdateEvent 可以实时看到每一步的翻译结果
  • 语义变化:经过多次翻译后,原始语义可能会有细微变化(这是 LLM 的特性)

5. 核心概念深入

  • Agent 作为 Executor 的内部机制,当将 AIAgent 添加到工作流时,MAF 会自动将其封装为一个特殊的 Executor。

    关键特性:

    • 消息缓存:Agent Wrapper 会缓存收到的 ChatMessage

    • 等待触发:只有收到 TurnToken 才会调用内部的 Agent.RunAsync()

    • 事件发射:执行过程中会发出 AgentRunUpdateEvent

    • 类型固定:输入输出都是 ChatMessage

  • TurnToken 的作用域:单个 TurnToken 触发所有 Agent,在一个工作流中,只需发送一次 TurnToken,它会触发所有等待中的 Agent。

sequenceDiagram
    participant User
    participant Workflow
    participant Agent1
    participant Agent2
    
    User->>Workflow: TurnToken
    Workflow->>Agent1: 传递 TurnToken
    Agent1->>Agent1: 处理并输出
    Agent1->>Workflow: 输出 (ChatMessage)
    Workflow->>Agent2: 传递输出 + TurnToken
    Agent2->>Agent2: 处理并输出
    Agent2->>Workflow: 输出 (ChatMessage)
  • AgentRunUpdateEvent 详解,这是监控 Agent 执行的核心事件。

    AgentRunUpdateEvent 的用途:

    • 实时进度:显示 Agent 的执行进度给用户
    • 调试:检查每个 Agent 的输入输出
    • 日志记录:记录完整的执行链路
    • 错误诊断:发现哪个 Agent 出现了问题
// 典型的事件处理模式
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentRunUpdateEvent agentEvent)
    {
        // 事件属性
        Console.WriteLine($"Agent ID: {agentEvent.ExecutorId}");
        Console.WriteLine($"数据: {agentEvent.Data}");
        Console.WriteLine($"状态: {agentEvent.Status}");
        Console.WriteLine($"时间戳: {agentEvent.Timestamp}");
    }
}

5. 最佳实践

何时使用 Agent vs Executor

场景 推荐 理由
文本格式化、验证 Executor 确定性逻辑,无需 LLM
内容生成、改写 Agent 需要理解和创造性
数学计算、数据转换 Executor 精确计算,成本低
意图识别、分类 Agent 需要语义理解
API 调用、数据库查询 Executor 确定性操作
多轮对话、推理 Agent 需要上下文理解

Agent 在工作流中的命名规范

  • 推荐的命名模式:

    • 职责导向:名称应反映 Agent 的具体职责
    • 动词+名词:如 translateText, reviewCode
    • 避免泛化:不要使用 agent1, helper 等通用名称
    • 一致性:在同一工作流中保持命名风格一致
// 好的命名 - 清晰描述职责
var frenchTranslator = CreateTranslationAgent("French", client);
var contentReviewer = new ChatClientAgent(client, "Review content for quality");
var codeGenerator = new ChatClientAgent(client, "Generate C# code");

// 不好的命名 - 过于抽象
var agent1 = new ChatClientAgent(...);
var helper = new ChatClientAgent(...);

错误处理策略

在工作流中使用 Agent 时的错误处理:

// 带错误处理的 Agent 工作流执行
async Task<bool> ExecuteWorkflowWithErrorHandlingAsync(Workflow workflow, ChatMessage input, int maxRetries = 3)
{
    for (int attempt = 1; attempt <= maxRetries; attempt++)
    {
        try
        {
            Console.WriteLine($"尝试执行 (第 {attempt} 次)...");            
            await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, input);
            await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
            
            await foreach (WorkflowEvent evt in run.WatchStreamAsync())
            {
                if (evt is AgentRunUpdateEvent agentEvent)
                {
                    // 检查 Agent 输出是否有效
                    if (agentEvent.Data == null || string.IsNullOrWhiteSpace(agentEvent.Data.ToString()))
                    {
                        throw new InvalidOperationException($"Agent {agentEvent.ExecutorId} 返回了空结果");
                    }
                }
            }            
            Console.WriteLine("工作流执行成功");
            return true;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"执行失败: {ex.Message}");
            
            if (attempt < maxRetries)
            {
                var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt)); // 指数退避
                Console.WriteLine($"等待 {delay.TotalSeconds} 秒后重试...");
                await Task.Delay(delay);
            }
            else
            {
                Console.WriteLine($"达到最大重试次数,放弃执行");
                return false;
            }
        }
    }    
    return false;
}

性能优化建议

  • 减少不必要的 Agent 调用
// 不好的做法 - 每次都调用 Agent
var validatorAgent = new ChatClientAgent(client, "Validate input format");

// 好的做法 - 用 Executor 处理格式验证
public class FormatValidatorExecutor : Executor<string, string>
{
    protected override Task<string> HandleAsync(string input, IWorkflowContext context, CancellationToken ct)
    {
        if (string.IsNullOrWhiteSpace(input))
            throw new ArgumentException("输入不能为空");
        return Task.FromResult(input);
    }
}
  • 合理使用 emitEvents
// 生产环境:可以关闭事件以提高性能
await run.TrySendMessageAsync(new TurnToken(emitEvents: false));

// 开发/调试:开启事件以便监控
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
  • Agent 粒度控制

    • 太细:多个简单 Agent 导致过多的 LLM 调用

    • 太粗:单个复杂 Agent 难以复用和维护

    • 适中:每个 Agent 负责一个明确的子任务

三、Workflow as Agent(工作流即 Agent)

1. 为什么需要 Workflow as Agent

在实际应用中,我们经常需要将多个 Agent 组成的协作工作流作为一个整体单元来使用:

特性 独立的 Workflow Workflow as Agent
执行方式 StreamAsync() + TurnToken RunAsync() / RunStreamingAsync()
输入类型 ChatMessage + 手动发送 TurnToken ChatMessage / string (自动处理)
输出类型 WorkflowEvent 流 AgentRunResponse
接口 Workflow AIAgent
复用方式 需要手动管理执行流程 作为工具/插件/子Agent

问题:直接使用 Workflow 接口存在以下挑战

  • 调用复杂:需要手动创建 StreamingRun、发送 TurnToken、监听事件
  • 接口不统一:Workflow 和 Agent 使用不同的调用方式
  • 复用困难:无法将 Workflow 直接作为 Agent 的工具
  • 组合受限:无法在其他 Agent 中轻松集成 Workflow

Workflow as Agent 的解决方案

AsAgent() 扩展方法将基于 Agent 的 Workflow 包装为标准的 AIAgent 接口,使其可以像普通 Agent 一样使用

  • 核心优势:
    • 统一接口:使用 AIAgent 接口调用 Workflow
    • 无缝集成:可以作为 Agent 的工具使用
    • 跨协议复用:支持 A2A、MCP 等协议
    • 生态兼容:享受 AIAgent 生态的所有中间件
  • 使用场景
    • 复杂任务封装:将多步骤的业务逻辑封装为可复用的 Agent
    • 跨系统集成:需要在 A2A 协作中使用 Workflow
    • 工具化封装:将 Workflow 作为其他 Agent 的工具
    • 中间件增强:需要对 Workflow 应用 AIAgent 中间件
  • 不建议场景
    • 简单任务:如果只是简单的顺序调用,直接使用 WorkflowBuilder 即可
    • 性能敏感:WorkflowAgent 会增加一层适配开销

2. 创建多 Agent 协助工作流

我们使用前面创建的多Agent 翻译链工作流,现在我们将这个 Workflow 封装为一个 Agent,使其可以像普通 Agent 一样通过 AIAgent 接口调用。

AsAgent() 方法将 Workflow 包装为一个 AIAgent,内部自动处理:

  • 接收 ChatMessage 输入:提取用户消息
  • 自动管理 TurnToken:无需手动发送 TurnToken
  • 返回 AgentRunResponse:将 Workflow 输出包装为标准响应
sequenceDiagram
    participant User as 调用方
    participant WA as Workflow Agent
    participant WF as Workflow
    
    User->>WA: RunAsync(ChatMessage)
    WA->>WA: 内部创建 StreamingRun
    WA->>WF: 自动发送 TurnToken
    WF->>WF: 执行所有 Agent
    WF-->>WA: WorkflowEvent 流
    WA->>WA: 包装为 AgentRunResponse
    WA-->>User: AgentRunResponse

重要限制:ChatProtocol 支持要求

并非所有 Workflow 都能转换为 Agent! AsAgent() 方法有严格的协议要求,只有满足 ChatProtocol 的 Workflow 才能使用 AsAgent(),必须同时支持:

  • 输入类型:List 或 ChatMessage
  • 触发类型:TurnToken(用于触发 Agent 执行)

如果 Workflow 不满足这两个条件,调用 AsAgent() 会抛出异常:

// 错误示例:基于 Executor 的 Workflow
var workflow = new WorkflowBuilder()
    .AddExecutor(new MyCustomExecutor())  // 使用自定义 Executor
    .Build();

// 运行时报错!
var agent = workflow.AsAgent("my-agent", "MyAgent");
// System.InvalidOperationException: 
// Workflow does not support ChatProtocol: 
// At least List<ChatMessage> and TurnToken must be supported as input.

正确做法:使用 AgentWorkflowBuilder,要让 Workflow 支持 AsAgent(),必须使用 AgentWorkflowBuilder 创建基于 Agent 的 Workflow

// 正确:使用 AgentWorkflowBuilder
var workflow = AgentWorkflowBuilder.BuildSequential(
    "my-workflow",
    [agent1, agent2, agent3]  // 使用 AIAgent 实例
);

var workflowAgent = workflow.AsAgent("my-agent", "MyAgent");  // 成功

为什么有这个限制?

核心原因:

  • AIAgent 接口的 RunAsync() 方法接收 ChatMessage 作为输入
  • Agent 通过 TurnToken 触发执行
  • Executor-based Workflow 使用自定义输入类型(如 string),不符合 ChatProtocol 规范
Workflow 类型 输入类型 是否支持 TurnToken 能否转 Agent
Executor-based(WorkflowBuilder) 自定义类型(如 string, MyData) 不支持 不能
Agent-based(AgentWorkflowBuilder) ChatMessage 支持 可以

将 Workflow 转换为 Agent

// 使用 AsAgent() 将 Workflow 转换为 AIAgent
var translationWorkflowAgent = translationWorkflow.AsAgent(
    id: "translation-workflow",
    name: "TranslationChain"
);

3. 测试 Workflow Agent

  • 使用 RunAsync 调用
// 使用 AIAgent 接口调用 Workflow Agent
var userMessage = "The future belongs to those who believe in the beauty of their dreams.";
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("用户输入:");
Console.WriteLine($" '{userMessage}'");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");

// 创建 Thread
var thread = translationWorkflowAgent.GetNewThread();
// 关键:像调用普通 Agent 一样调用 Workflow Agent
var response = await translationWorkflowAgent.RunAsync(userMessage, thread);
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("Workflow Agent 响应:");
foreach (var msg in response.Messages)
{
    if (msg.Role == ChatRole.Assistant)
    {
        Console.WriteLine($" {msg.Text}");
    }
}
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
  • 使用 RunStreamingAsync 流式调用
// 使用流式调用
var streamingMessage = "Technology is best when it brings people together.";
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("用户输入 (流式):");
Console.WriteLine($" '{streamingMessage}'");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
Console.WriteLine("流式输出:");
var streamThread = translationWorkflowAgent.GetNewThread();
await foreach (var update in translationWorkflowAgent.RunStreamingAsync(streamingMessage, streamThread))
{
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.WriteLine($"   {update.Text}");
    }
}
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("流式调用完成");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");

4. 将 Workflow Agent 作为工具使用

现在让我们演示 Workflow Agent 最强大的功能:将复杂的多 Agent 协作流程作为工具提供给其他 Agent 使用。

场景说明,我们要创建一个 语言专家 Agent,它需要:

  1. 接收用户请求(如"测试这段文字的翻译一致性")
  2. 调用翻译链 Workflow (Workflow Agent) 进行多轮翻译
  3. 分析并报告翻译质量

这是一个典型的 Agent 调用 Workflow Agent"场景。

  • 将 Workflow Agent 转换为 AIFunction
// Workflow Agent 可以直接转换为 AIFunction
var translationChainFunction = translationWorkflowAgent.AsAIFunction(
    new AIFunctionFactoryOptions
    {
        Name = "translation_chain",
        Description = "对文本进行多轮翻译测试:英文→法语→西班牙语→英文,用于测试翻译一致性"
    }
);
  • 创建内容审核 Agent (使用 WorkflowAgent 作为工具)
// 创建语言专家 Agent,配置翻译链工具
var linguistAgent = chatClient.CreateAIAgent(
    instructions: @"你是一位语言学专家。你可以使用 translation_chain 工具来测试文本的翻译一致性。
    
    工作流程:
    1. 使用 translation_chain 工具对用户提供的文本进行多轮翻译
    2. 比较原始文本和最终翻译结果
    3. 分析翻译过程中的语义变化
    4. 提供翻译质量评估和改进建议
    
    返回格式化的分析报告。",
    name: "LinguistExpert",
    tools: [translationChainFunction]
);
Console.WriteLine("语言专家 Agent 创建完成");
Console.WriteLine(" 配置工具: translation_chain (Workflow Agent)");
  • 测试:Agent 自动调用 WorkflowAgent
// 创建线程并提交请求
var linguistThread = linguistAgent.GetNewThread();
var userRequest = "请测试这段文字的翻译一致性: The only way to do great work is to love what you do.";
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("用户请求:");
Console.WriteLine($" {userRequest}");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
Console.WriteLine("语言专家处理中(将自动调用翻译链工具)...\n");
// Agent 会自动调用 translation_chain 工具 (Workflow Agent)
var response = await linguistAgent.RunAsync(userRequest, linguistThread);
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("分析报告:");
foreach (var msg in response.Messages)
{
    if (msg.Role == ChatRole.Assistant)
    {
        Console.WriteLine($"\n{msg.Text}");
    }
}
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");

5. 直接调用 vs WorkflowAgent

对比总结

特性 直接调用 Workflow 使用 WorkflowAgent
调用方式 RunAsync(TInput) RunAsync(ChatMessage)
接口 Workflow AIAgent
输入类型 强类型 (string) ChatMessage
输出类型 强类型 (string) AgentRunResponse
作为工具 不支持 支持
中间件 不支持 支持 AIAgent 中间件
A2A 协作 不支持 支持
MCP 集成 不支持 支持
性能开销 稍高 (适配层)

架构对比图

flowchart TB
    subgraph "直接调用 Workflow"
        A1[应用代码] -->|RunAsync| B1[Workflow]
        B1 -->|string| A1
        style A1 fill:#FF5252,stroke:#C62828,stroke-width:2px,color:#fff
        style B1 fill:#FF5252,stroke:#C62828,stroke-width:2px,color:#fff
    end
    
    subgraph "WorkflowAgent 封装"
        A2[Agent/应用] -->|RunAsync| B2[WorkflowAgent]
        B2 -->|调用| C2[Workflow]
        C2 -->|结果| B2
        B2 -->|AgentRunResponse| A2
        
        B2 -.可应用.-> D2[中间件]
        B2 -.可注册为.-> E2[Agent工具]
        B2 -.可用于.-> F2[A2A/MCP]
        
        style A2 fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
        style B2 fill:#2196F3,stroke:#1565C0,stroke-width:2px,color:#fff
        style C2 fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
    end

6. 最佳实践

何时使用 WorkflowAgent?

  • 推荐场景:

    • 需要将 Workflow 作为工具提供给其他 Agent

    • 需要在 A2A 协作中使用 Workflow

    • 需要对 Workflow 应用 AIAgent 中间件 (如日志、缓存)

    • 需要通过 MCP 协议暴露 Workflow

    • 构建可复用的业务逻辑组件

  • 不推荐场景:

    • 简单的内部调用,直接使用 RunAsync 更高效

    • 性能敏感的场景 (适配层会有额外开销)

    • 不需要对外暴露的内部工作流

使用建议

  • 命名规范:WorkflowAgent 转为工具时,使用清晰的名称和描述
var tool = workflowAgent.AsAIFunction(new AIFunctionFactoryOptions
{
    Name = "clean_data",  // 动词_名词格式
    Description = "清晰描述功能和用途"
});
  • 错误处理:确保 Workflow 内部有完善的错误处理
protected override async Task<string> ExecuteAsync(...)
{
    try {
        // 业务逻辑
    } catch (Exception ex) {
        // 记录日志并返回友好错误信息
        return $"处理失败: {ex.Message}";
    }
}
  • 性能优化:对于频繁调用的 WorkflowAgent,考虑添加缓存中间件
// 示例:使用缓存中间件 (需自行实现或引用相关库)
// var cachedWorkflowAgent = new CachingAgent(workflowAgent);

四、工作流的事件机制

事件类别 作用域 代表事件 使用频率
Executor 事件 单个执行器 ExecutorCompletedEvent ⭐⭐⭐⭐⭐
SuperStep 事件 执行步骤 SuperStepCompletedEvent ⭐⭐
Workflow 事件 整个工作流 WorkflowOutputEvent ⭐⭐⭐⭐⭐
Agent 事件 AI Agent AgentRunUpdateEvent ⭐⭐⭐⭐
交互事件 人机协作 RequestInfoEvent ⭐⭐⭐

1. WorkflowEvent 基类剖析

  • 类定义
public class WorkflowEvent
{
    // 事件携带的数据负载
    public object? Data { get; }
    
    // 构造函数
    public WorkflowEvent(object? data = null)
    {
        Data = data;
    }
    
    // 提供友好的字符串表示
    public override string ToString() =>
        Data is not null ?
            $"{GetType().Name}(Data: {Data.GetType()} = {Data})" :
            $"{GetType().Name}()";
}
  • 核心特性
特性 说明 示例
通用性 所有事件的基类,提供统一接口 可以用 WorkflowEvent 类型接收所有事件
数据承载 Data 属性存储事件的业务数据 执行结果、错误信息、状态对象等
可扩展 子类可以添加特定属性 ExecutorEvent 添加 ExecutorId
调试友好 ToString() 提供清晰的事件描述 便于日志输出和调试
  • ExecutorEvent - Executor 作用域事件,ExecutorEvent 是最常用的事件类别,它表示与特定 Executor 执行相关的事件。

    关键属性

    • ExecutorId:标识哪个 Executor 触发了这个事件
      • 用于追踪执行流程
      • 区分并发执行的不同 Executor
      • 实现基于 Executor 的过滤逻辑

    四大子事件

    • ExecutorInvokedEvent:开始执行
    • ExecutorCompletedEvent:成功完成
    • ExecutorFailedEvent:执行失败
    • AgentRunUpdateEvent:Agent 更新
public class ExecutorEvent : WorkflowEvent
{
    // 触发此事件的 Executor 的唯一标识符
    public string ExecutorId { get; }
    
    public ExecutorEvent(string executorId, object? data) : base(data)
    {
        ExecutorId = executorId;
    }
    
    public override string ToString() =>
        Data is not null ?
            $"{GetType().Name}(Executor = {ExecutorId}, Data: {Data.GetType()} = {Data})" :
            $"{GetType().Name}(Executor = {ExecutorId})";
}
  • SuperStepEvent - 执行步骤事件,SuperStepEvent 用于表示工作流的执行步骤(SuperStep)级别的事件。

    在 MAF Workflow 中,SuperStep 是一个执行单元,可以包含:

    1. 一个或多个 Executor 的并发执行
    2. 一个完整的顺序执行链
    3. 两个子事件
事件类型 触发时机 主要用途
SuperStepStartedEvent SuperStep 开始执行 监控执行进度
SuperStepCompletedEvent SuperStep 完成执行 调试、Checkpoint 保存

使用场景:在调试复杂的并发工作流时,SuperStep 事件可以帮助你理解执行的阶段划分。

public class SuperStepEvent : WorkflowEvent
{
    // SuperStep 的索引(从 0 开始)
    public int StepNumber { get; }
    
    public SuperStepEvent(int stepNumber, object? data = null) : base(data)
    {
        StepNumber = stepNumber;
    }
}

2. 流式执行机制详解

MAF Workflow 提供两种执行模式:

执行模式 方法 特点 适用场景
同步执行 RunAsync() 等待完成后返回结果 简单工作流、不需要中间状态
流式执行 StreamAsync() 实时返回事件流 需要进度监控、Agent 工作流
sequenceDiagram
    participant User as 用户代码
    participant Workflow as 工作流引擎
    participant Executor as Executors
    
    Note over User,Executor: 同步执行 (RunAsync)
    User->>Workflow: RunAsync(input)
    Workflow->>Executor: 执行所有步骤
    Executor-->>Workflow: 返回结果
    Workflow-->>User: 返回 Run (包含结果)
    Note over User: 阻塞等待完成
    
    Note over User,Executor: 流式执行 (StreamAsync)
    User->>Workflow: StreamAsync(input)
    Workflow-->>User: 返回 StreamingRun
    Note over User: 立即返回
    User->>Workflow: WatchStreamAsync()
    loop 执行过程中
        Workflow->>Executor: 执行步骤
        Executor-->>Workflow: 产生事件
        Workflow-->>User: 实时推送事件
        Note over User: 实时处理
    end
  • 核心 API:WatchStreamAsync:是流式执行的核心方法,它返回一个 IAsyncEnumerable,可以使用 await foreach 遍历。
// 1. 启动流式执行
StreamingRun run = await InProcessExecution.StreamAsync(workflow, input);

// 2. 订阅事件流
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    // 3. 处理每个事件
    Console.WriteLine($"收到事件: {evt.GetType().Name}");
}

关键概念

概念 说明
StreamingRun 流式执行的句柄,用于监听事件和发送消息
IAsyncEnumerable 异步流,事件按产生顺序依次返回
await foreach 异步迭代,每收到一个事件就处理一次
实时性 事件一产生就推送,不等待工作流完成

执行流程图

flowchart TB
    A[StreamAsync] --> B[返回 StreamingRun]
    B --> C[WatchStreamAsync]
    C --> D[await foreach 循环]
    
    D --> E{收到事件?}
    E -->|是| F[处理事件]
    F --> E
    E -->|工作流完成| G[退出循环]
    
    H[Workflow Engine] -.产生事件.-> E
    
    style A fill:#2196F3,stroke:#1565C0,stroke-width:2px,color:#fff
    style C fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
    style F fill:#FF9800,stroke:#E65100,stroke-width:2px,color:#fff
  • 实验 1:观察基础事件流

我们使用前面章节创建的文本处理管道

// 构建工作流
var uppercaseExecutor = new UppercaseExecutor();
var reverseExecutor = new ReverseExecutor();

var simpleWorkflow = new WorkflowBuilder(uppercaseExecutor)
    .AddEdge(uppercaseExecutor, reverseExecutor)
    .WithOutputFrom(reverseExecutor)
    .Build();
// 执行工作流并监听所有事件
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("开始执行工作流 (流式模式)");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
var input = "Hello Workflow Events!";
Console.WriteLine($"输入: \"{input}\"\n");

// 启动流式执行
StreamingRun run = await InProcessExecution.StreamAsync(simpleWorkflow, input);
Console.WriteLine("开始监听事件流...\n");
Console.WriteLine(new string('─', 60));

int eventCount = 0;
// 订阅事件流
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    eventCount++;
    
    // 显示事件的完整信息
    Console.WriteLine($"\n事件 #{eventCount}");
    Console.WriteLine($" 类型: {evt.GetType().Name}");
    
    // // 根据事件类型显示详细信息
    // switch (evt)
    // {
    //     case WorkflowStartedEvent:
    //         Console.WriteLine($" 说明: 工作流开始执行");
    //         break;
            
    //     case ExecutorInvokedEvent invokedEvent:
    //         Console.WriteLine($" Executor: {invokedEvent.ExecutorId}");
    //         Console.WriteLine($" 说明: Executor 开始执行");
    //         Console.WriteLine($" 输入: {invokedEvent.Data}");
    //         break;
            
    //     case ExecutorCompletedEvent completedEvent:
    //         Console.WriteLine($" Executor: {completedEvent.ExecutorId}");
    //         Console.WriteLine($" 说明: Executor 执行完成");
    //         Console.WriteLine($" 输出: {completedEvent.Data}");
    //         break;
            
    //     case SuperStepCompletedEvent stepEvent:
    //         Console.WriteLine($" SuperStep: #{stepEvent.StepNumber}");
    //         Console.WriteLine($" 说明: 执行步骤完成");
    //         break;
            
    //     case WorkflowOutputEvent outputEvent:
    //         Console.WriteLine($" 来源: {outputEvent.SourceId}");
    //         Console.WriteLine($" 说明: 工作流产生输出");
    //         Console.WriteLine($" 结果: {outputEvent.Data}");
    //         break;
            
    //     default:
    //         Console.WriteLine($" 数据: {evt.Data}");
    //         break;
    // }
    
    Console.WriteLine(new string('─', 60));
}
Console.WriteLine($"\n工作流执行完成,共产生 {eventCount} 个事件");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");

通过上面的实验,我们可以观察到一个典型的事件序列:

1. SuperStepStartedEvent         ← SuperStep 0 开始
2. ExecutorInvokedEvent          ← UppercaseExecutor 开始
3. ExecutorCompletedEvent        ← UppercaseExecutor 完成
4. SuperStepCompletedEvent       ← SuperStep 0 完成
5. SuperStepStartedEvent         ← SuperStep 1 开始
6. ExecutorInvokedEvent          ← ReverseExecutor 开始
7. ExecutorCompletedEvent        ← ReverseExecutor 完成
8. WorkflowOutputEvent           ← 工作流输出结果
9. SuperStepCompletedEvent       ← SuperStep 1 完成

关键发现

  1. SuperStep 结构:每个 SuperStep 都有明确的开始和结束事件
  2. 执行顺序:SuperStepStarted → ExecutorInvoked → ExecutorCompleted → SuperStepCompleted
  3. 完整性:每个 Executor 都有 Invoked 和 Completed 事件对
  4. 输出时机:WorkflowOutputEvent 在最后一个 Executor 完成后、SuperStep 完成前触发
  5. 可追溯:通过事件序号和类型可以完整追踪执行流程

3. 系统内置事件详解

ExecutorCompletedEvent - 最常用的事件:ExecutorCompletedEvent 是工作流开发中使用频率最高的事件,它表示一个 Executor 成功完成执行。

public sealed class ExecutorCompletedEvent : ExecutorEvent
{
    public ExecutorCompletedEvent(string executorId, object? result) 
        : base(executorId, data: result)
    {
    }
}

核心用途

用途 说明 示例
获取结果 Data 属性包含 Executor 的返回值 提取中间处理结果
进度追踪 标记某个步骤已完成 更新进度条
链路追踪 记录执行顺序 构建执行日志
调试分析 查看每个步骤的输出 定位数据转换问题

实战示例

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is ExecutorCompletedEvent completed)
    {
        Console.WriteLine($"{completed.ExecutorId} 完成");
        Console.WriteLine($" 输出: {completed.Data}");
        
        // 根据 ExecutorId 做不同处理
        if (completed.ExecutorId == "ValidationExecutor")
        {
            bool isValid = (bool)completed.Data;
            if (!isValid)
            {
                Console.WriteLine("验证失败,终止后续流程");
            }
        }
    }
}
  • AgentRunUpdateEvent - Agent 专用事件:当 Agent 在工作流中执行时,会产生 AgentRunUpdateEvent 来报告执行进度和输出内容。
public class AgentRunUpdateEvent : ExecutorEvent
{
    public AgentRunUpdateEvent(string executorId, AgentRunResponseUpdate update) 
        : base(executorId, data: update)
    {
        Update = update;
    }
    
    public AgentRunResponseUpdate Update { get; }
}

关键属性 AgentRunResponseUpdate对象,包含:

  • Text:Agent 输出的文本内容
  • Contents:结构化内容(可能包含 FunctionCall 等)
  • 其他 Agent 运行时信息

触发条件:只有在发送 TurnToken 时设置 emitEvents: true,才会产生此事件!

// 正确:会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

// 错误:不会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: false));

实战示例

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentRunUpdateEvent agentEvent)
    {
        // 实时显示 Agent 输出
        Console.Write(agentEvent.Update.Text);
        
        // 检查是否有函数调用
        var functionCall = agentEvent.Update.Contents
            .OfType<FunctionCallContent>()
            .FirstOrDefault();
            
        if (functionCall != null)
        {
            Console.WriteLine($"\n调用函数: {functionCall.Name}");
        }
    }
}
  • WorkflowOutputEvent - 工作流输出事件:WorkflowOutputEvent 标志着工作流产生了输出,通常意味着工作流即将完成或已经完成。
public sealed class WorkflowOutputEvent : WorkflowEvent
{
    internal WorkflowOutputEvent(object data, string sourceId) : base(data)
    {
        SourceId = sourceId;
    }
    
    public string SourceId { get; }
    
    // 类型安全的数据提取方法
    public bool Is<T>() => Data is T;
    
    public bool Is<T>([NotNullWhen(true)] out T? maybeValue)
    {
        if (Data is T value)
        {
            maybeValue = value;
            return true;
        }
        maybeValue = default;
        return false;
    }
    
    public T? As<T>() => Data is T value ? value : default;
}

核心特性

特性 说明
SourceId 产生输出的 Executor ID
类型安全 提供 Is() 和 As() 方法
终止标志 通常是事件流的最后一个事件

实战示例

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is WorkflowOutputEvent outputEvent)
    {
        Console.WriteLine($"工作流完成,来自: {outputEvent.SourceId}");
        
        // 类型安全的数据提取
        if (outputEvent.Is<string>(out var textResult))
        {
            Console.WriteLine($"文本结果: {textResult}");
        }
        else if (outputEvent.Is<List<ChatMessage>>(out var messages))
        {
            Console.WriteLine($"对话消息: {messages.Count} 条");
        }
        
        // 或者使用 As<T>
        var result = outputEvent.As<MyCustomType>();
        if (result != null)
        {
            // 处理自定义类型
        }
    }
}
  • ExecutorFailedEvent - 错误处理事件:当 Executor 执行过程中抛出异常时,会产生 ExecutorFailedEvent。
public sealed class ExecutorFailedEvent : ExecutorEvent
{
    public ExecutorFailedEvent(string executorId, Exception? err)
        : base(executorId, data: err)
    {
        Data = err;
    }
    
    public new Exception? Data { get; }
}

实战示例

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is ExecutorFailedEvent failedEvent)
    {
        Console.WriteLine($"{failedEvent.ExecutorId} 执行失败");
        Console.WriteLine($" 错误类型: {failedEvent.Data?.GetType().Name}");
        Console.WriteLine($" 错误消息: {failedEvent.Data?.Message}");
        Console.WriteLine($" 堆栈跟踪: {failedEvent.Data?.StackTrace}");
        
        // 根据错误类型做不同处理
        switch (failedEvent.Data)
        {
            case HttpRequestException httpEx:
                Console.WriteLine("网络错误,建议重试");
                break;
            case TimeoutException timeoutEx:
                Console.WriteLine("超时错误,可能需要增加超时时间");
                break;
            default:
                Console.WriteLine("未知错误,请检查日志");
                break;
        }
    }
}
  • RequestInfoEvent - 人机协作事件:RequestInfoEvent 用于 Human-in-the-Loop 场景,表示工作流需要外部输入(通常是人工干预)。
public sealed class RequestInfoEvent : WorkflowEvent
{
    public RequestInfoEvent(ExternalRequest request) : base(request)
    {
        Request = request;
    }
    
    public ExternalRequest Request { get; }
}

使用场景

  • 审批流程:等待管理员批准
  • 内容审核:等待人工审核敏感内容
  • 用户交互:等待用户提供额外信息
  • 配置选择:等待用户选择执行路径

实战示例

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent requestEvent)
    {
        Console.WriteLine("工作流暂停,等待外部输入");
        Console.WriteLine($" 请求ID: {requestEvent.Request.RequestId}");
        
        // 获取用户输入(实际场景可能是 Web API 或 UI)
        Console.Write("请输入响应内容: ");
        var userInput = Console.ReadLine();
        
        // 创建响应
        var response = new ExternalResponse
        {
            RequestId = requestEvent.Request.RequestId,
            Data = userInput
        };
        
        // 将响应发送回工作流
        await run.TrySendMessageAsync(response);
        Console.WriteLine("响应已发送,工作流继续执行");
    }
}

4. 事件监听与过滤

在实际应用中,我们通常不需要处理所有事件,而是只关注特定类型的事件,接下来我们看看多种事件过滤技巧。

  • 基于事件类型的过滤,最基础的过滤方式是使用 is 模式匹配或 switch 表达式。
// 示例:只处理 ExecutorCompletedEvent
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is ExecutorCompletedEvent completed)
    {
        Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
    }
    // 忽略其他事件
}

// 使用 switch 表达式处理多种事件类型
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    var message = evt switch
    {
        ExecutorCompletedEvent completed => 
            $"{completed.ExecutorId} 完成: {completed.Data}",
        
        ExecutorFailedEvent failed => 
            $"{failed.ExecutorId} 失败: {failed.Data?.Message}",
        
        AgentRunUpdateEvent agentUpdate => 
            $"Agent 更新: {agentUpdate.Update.Text}",
        
        WorkflowOutputEvent output => 
            $"输出结果: {output.Data}",
        
        _ => null // 忽略其他事件
    };
    
    if (message != null)
    {
        Console.WriteLine(message);
    }
}
  • 基于 ExecutorId 的过滤,在复杂的工作流中,我们可能只关注特定 Executor 的事件。

    使用场景

    • 进度监控:只追踪关键步骤的执行

    • 问题调试:定位某个特定 Executor 的问题

    • 日志记录:只记录重要步骤的日志

    • 性能优化:减少不必要的事件处理

// 只监听特定 Executor 的完成事件
var targetExecutorId = "ReverseExecutor";
Console.WriteLine($"只监听 {targetExecutorId} 的事件\n");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    // 方式 1: 直接在 ExecutorEvent 上过滤
    if (evt is ExecutorEvent executorEvent && 
        executorEvent.ExecutorId == targetExecutorId)
    {
        Console.WriteLine($"{evt.GetType().Name}");
        Console.WriteLine($" Executor: {executorEvent.ExecutorId}");
        Console.WriteLine($" 数据: {executorEvent.Data}\n");
    }
}

// 方式 2: 组合多个条件
var interestedExecutors = new[] { "UppercaseExecutor", "ReverseExecutor" };
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is ExecutorCompletedEvent completed && 
        interestedExecutors.Contains(completed.ExecutorId))
    {
        Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
    }
}
  • LINQ 过滤(最优雅的方式),利用 C# 的 LINQ 能力,我们可以创建更加优雅和可复用的过滤逻辑。
优势 说明
链式调用 可以组合多个过滤条件
类型安全 强类型支持,编译时检查
代码复用 过滤逻辑可以封装为扩展方法
可读性强 语义清晰,接近自然语言
// 定义 LINQ 扩展方法
//public static class WorkflowEventExtensions
//{
    // 过滤特定类型的事件
    public static async IAsyncEnumerable<T> OfEventType<T>(
        this IAsyncEnumerable<WorkflowEvent> events) where T : WorkflowEvent
    {
        await foreach (var evt in events)
        {
            if (evt is T typedEvent)
            {
                yield return typedEvent;
            }
        }
    }
    
    // 过滤特定 Executor 的事件
    public static async IAsyncEnumerable<ExecutorEvent> FromExecutor(
        this IAsyncEnumerable<WorkflowEvent> events, 
        string executorId)
    {
        await foreach (var evt in events)
        {
            if (evt is ExecutorEvent executorEvent && 
                executorEvent.ExecutorId == executorId)
            {
                yield return executorEvent;
            }
        }
    }
    
    // 过滤多个 Executor 的事件
    public static async IAsyncEnumerable<ExecutorEvent> FromExecutors(
        this IAsyncEnumerable<WorkflowEvent> events, 
        params string[] executorIds)
    {
        var executorSet = new HashSet<string>(executorIds);        
        await foreach (var evt in events)
        {
            if (evt is ExecutorEvent executorEvent && 
                executorSet.Contains(executorEvent.ExecutorId))
            {
                yield return executorEvent;
            }
        }
    }
//}

使用 LINQ 扩展方法的优雅示例

Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("LINQ 过滤示例");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");

var simpleWorkflow = new WorkflowBuilder(uppercaseExecutor)
    .AddEdge(uppercaseExecutor, reverseExecutor)
    .WithOutputFrom(reverseExecutor)
    .Build();

// 重新执行工作流
var run2 = await InProcessExecution.StreamAsync(simpleWorkflow, "Hello LINQ");

// 示例 1: 只监听 ExecutorCompletedEvent
Console.WriteLine("示例 1: 只监听完成事件\n");
await foreach (var completed in run2.WatchStreamAsync().OfEventType<ExecutorCompletedEvent>())
{
    Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
}
await run2.DisposeAsync();
Console.WriteLine("\n" + new string('─', 60) + "\n");

// 示例 2: 只监听特定 Executor
var run3 = await InProcessExecution.StreamAsync(simpleWorkflow, "Hello Filter");

Console.WriteLine("示例 2: 只监听 ReverseExecutor\n");
await foreach (var evt in run3.WatchStreamAsync().FromExecutor("ReverseExecutor"))
{
    Console.WriteLine($"{evt.GetType().Name}: {evt.Data}");
}
await run3.DisposeAsync();
Console.WriteLine("\n" + new string('─', 60) + "\n");

// 示例 3: 链式组合(只监听特定 Executor 的完成事件)
var run4 = await InProcessExecution.StreamAsync(simpleWorkflow, "Hello Chain");
Console.WriteLine("示例 3: 链式过滤 (ReverseExecutor + Completed)\n");
await foreach (var completed in run4.WatchStreamAsync()
    .FromExecutor("ReverseExecutor")
    .OfEventType<ExecutorCompletedEvent>())
{
    Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
}
await run4.DisposeAsync();

Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("LINQ 过滤示例完成");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");

5. 实战案例 - 构建事件监控系统

现在让我们构建一个完整的事件监控系统,展示如何在实际项目中应用事件系统。

我们将构建一个 WorkflowMonitor 类,它可以:

  • 实时统计:记录每个 Executor 的执行时间
  • 日志记录:记录关键事件
  • 错误监控:捕获并报告错误
  • 进度展示:显示执行进度
  • 性能分析:识别性能瓶颈

监控系统架构

flowchart TB
    subgraph "事件流"
        A[WorkflowEvent Stream]
    end
    
    subgraph "WorkflowMonitor"
        B[事件接收器]
        C[统计收集器]
        D[日志记录器]
        E[错误处理器]
        F[进度追踪器]
    end
    
    subgraph "输出"
        G[控制台日志]
        H[性能报告]
        I[错误报告]
    end
    
    A --> B
    B --> C
    B --> D
    B --> E
    B --> F
    
    C --> H
    D --> G
    E --> I
    F --> G
    
    style A fill:#2196F3,stroke:#1565C0,stroke-width:2px,color:#fff
    style B fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
    style H fill:#FF9800,stroke:#E65100,stroke-width:2px,color:#fff
  • 监控类定义
// 完整的工作流监控系统
public class WorkflowMonitor
{
    // 执行统计信息
    private class ExecutorStats
    {
        public string ExecutorId { get; set; } = "";
        public DateTime? StartTime { get; set; }
        public DateTime? EndTime { get; set; }
        public TimeSpan? Duration => EndTime - StartTime;
        public bool Success { get; set; }
        public object? Result { get; set; }
        public Exception? Error { get; set; }
    }
    
    private readonly Dictionary<string, ExecutorStats> _stats = new();
    private readonly List<string> _logs = new();
    private DateTime _workflowStartTime;
    private DateTime _workflowEndTime;
    private int _totalEvents = 0;
    private int _superStepCount = 0;
    
    // 监控入口方法
    public async Task MonitorAsync(StreamingRun run)
    {
        Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
        Console.WriteLine("工作流监控系统已启动");
        Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
        
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            _totalEvents++;
            await HandleEventAsync(evt);
        }
        
        // 执行完成后生成报告
        GenerateReport();
    }
    
    // 处理单个事件
    private async Task HandleEventAsync(WorkflowEvent evt)
    {
        switch (evt)
        {
            case WorkflowStartedEvent:
                HandleWorkflowStarted();
                break;
                
            case ExecutorInvokedEvent invoked:
                HandleExecutorInvoked(invoked);
                break;
                
            case ExecutorCompletedEvent completed:
                HandleExecutorCompleted(completed);
                break;
                
            case ExecutorFailedEvent failed:
                HandleExecutorFailed(failed);
                break;
                
            case SuperStepCompletedEvent step:
                HandleSuperStepCompleted(step);
                break;
                
            case WorkflowOutputEvent output:
                HandleWorkflowOutput(output);
                break;
                
            case AgentRunUpdateEvent agentUpdate:
                HandleAgentUpdate(agentUpdate);
                break;
        }
        
        await Task.CompletedTask;
    }
    
    // 工作流开始
    private void HandleWorkflowStarted()
    {
        _workflowStartTime = DateTime.Now;
        Log("工作流开始执行");
    }
    
    // Executor 调用
    private void HandleExecutorInvoked(ExecutorInvokedEvent evt)
    {
        _stats[evt.ExecutorId] = new ExecutorStats
        {
            ExecutorId = evt.ExecutorId,
            StartTime = DateTime.Now
        };
        
        Log($" [{evt.ExecutorId}] 开始执行");
    }
    
    // Executor 完成
    private void HandleExecutorCompleted(ExecutorCompletedEvent evt)
    {
        if (_stats.TryGetValue(evt.ExecutorId, out var stats))
        {
            stats.EndTime = DateTime.Now;
            stats.Success = true;
            stats.Result = evt.Data;
            
            Log($"[{evt.ExecutorId}] 执行成功 (耗时: {stats.Duration?.TotalMilliseconds:F2}ms)");
        }
    }
    
    // Executor 失败
    private void HandleExecutorFailed(ExecutorFailedEvent evt)
    {
        if (_stats.TryGetValue(evt.ExecutorId, out var stats))
        {
            stats.EndTime = DateTime.Now;
            stats.Success = false;
            stats.Error = evt.Data;
            
            Log($"[{evt.ExecutorId}] 执行失败: {evt.Data?.Message}");
        }
    }
    
    // SuperStep 完成
    private void HandleSuperStepCompleted(SuperStepCompletedEvent evt)
    {
        _superStepCount++;
        Log($"SuperStep #{evt.StepNumber} 完成");
    }
    
    // 工作流输出
    private void HandleWorkflowOutput(WorkflowOutputEvent evt)
    {
        _workflowEndTime = DateTime.Now;
        Log($"工作流输出结果 (来自: {evt.SourceId})");
    }
    
    // Agent 更新
    private void HandleAgentUpdate(AgentRunUpdateEvent evt)
    {
        if (!string.IsNullOrEmpty(evt.Update.Text))
        {
            Log($"[{evt.ExecutorId}] Agent 输出: {evt.Update.Text}");
        }
    }
    
    // 记录日志
    private void Log(string message)
    {
        var timestamp = DateTime.Now.ToString("HH:mm:ss.fff");
        var logMessage = $"[{timestamp}] {message}";
        _logs.Add(logMessage);
        Console.WriteLine(logMessage);
    }
    
    // 生成性能报告
    private void GenerateReport()
    {
        Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
        Console.WriteLine("执行性能报告");
        Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
        
        // 总体统计
        var totalDuration = _workflowEndTime - _workflowStartTime;
        Console.WriteLine("### 总体统计");
        new
        {
            总事件数 = _totalEvents,
            SuperStep数量 = _superStepCount,
            Executor数量 = _stats.Count,
            总耗时 = $"{totalDuration.TotalMilliseconds:F2}ms",
            成功数 = _stats.Values.Count(s => s.Success),
            失败数 = _stats.Values.Count(s => !s.Success)
        }.Display();
        
        Console.WriteLine("\n### Executor 性能明细\n");
        
        // Executor 性能表格
        foreach (var stats in _stats.Values.OrderBy(s => s.StartTime))
        {
            var status = stats.Success ? "成功" : "失败";
            var duration = stats.Duration?.TotalMilliseconds.ToString("F2") ?? "N/A";
            
            Console.WriteLine($"{stats.ExecutorId}");
            Console.WriteLine($" 状态: {status}");
            Console.WriteLine($" 耗时: {duration}ms");
            
            if (stats.Error != null)
            {
                Console.WriteLine($" 错误: {stats.Error.Message}");
            }
            
            Console.WriteLine();
        }
        
        // 性能分析
        if (_stats.Values.Any(s => s.Duration.HasValue))
        {
            var slowest = _stats.Values
                .Where(s => s.Duration.HasValue)
                .OrderByDescending(s => s.Duration)
                .First();
                
            Console.WriteLine("### 性能瓶颈");
            Console.WriteLine($"最慢的 Executor: {slowest.ExecutorId}");
            Console.WriteLine($" 耗时: {slowest.Duration?.TotalMilliseconds:F2}ms");
        }
        
        Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    }
}
  • 使用监控系统
// 创建一个更复杂的工作流来测试监控系统
public class DataProcessorExecutor : Executor<string, string>
{
    public DataProcessorExecutor() : base("DataProcessor") { }
    
    public override async ValueTask<string> HandleAsync(
        string input, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 模拟一些处理时间
        await Task.Delay(100);
        return $"[Processed: {input}]";
    }
}

public class DataValidatorExecutor : Executor<string, bool>
{
    public DataValidatorExecutor() : base("DataValidator") { }
    
    public override async ValueTask<bool> HandleAsync(
        string input, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 模拟验证时间
        await Task.Delay(50);
        return input.Length > 0;
    }
}

public class DataFormatterExecutor : Executor<string, string>
{
    public DataFormatterExecutor() : base("DataFormatter") { }
    
    public override async ValueTask<string> HandleAsync(
        string input, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 模拟格式化时间
        await Task.Delay(80);
        return $"**{input}**";
    }
}

// 构建测试工作流
var processor = new DataProcessorExecutor();
var validator = new DataValidatorExecutor();
var formatter = new DataFormatterExecutor();
var testWorkflow = new WorkflowBuilder(processor)
    .AddEdge(processor, validator)
    .AddEdge(validator, formatter)
    .Build();

5. 最佳实践建议

  • DO(推荐做法)

    • 使用 LINQ 扩展进行过滤 - 代码更清晰、可复用

    • 监听特定事件类型 - 避免处理不需要的事件

    • 记录关键事件 - 便于调试和审计

    • 异步处理事件 - 避免阻塞事件流

    • 使用 switch 表达式 - 处理多种事件类型时更简洁

  • DON'T(避免做法)

    • 在事件处理中做耗时操作 - 会阻塞事件流
    • 忽略 ExecutorFailedEvent - 可能导致静默失败
    • 过度监听所有事件 - 影响性能
    • 在事件处理中修改工作流状态 - 可能导致意外行为
    • 忘记设置 emitEvents: true - Agent 事件不会产生
  • 事件监听模式速查

场景 代码示例 适用情况
监听所有事件 await foreach (var e in run.WatchStreamAsync()) 调试、完整日志
监听特定类型 run.WatchStreamAsync().OfEventType() 只关注某类事件
监听特定 Executor run.WatchStreamAsync().FromExecutor("ExecutorId") 追踪特定步骤
组合过滤 run.WatchStreamAsync().FromExecutor("X").OfEventType() 精确过滤
多类型处理 evt switch 差异化处理
  • 常用事件属性速查
事件类型 关键属性 数据类型 获取方式
ExecutorCompletedEvent ExecutorId string evt.ExecutorId
Data (结果) object? evt.Data
ExecutorFailedEvent ExecutorId string evt.ExecutorId
Data (异常) Exception? evt.Data
AgentRunUpdateEvent ExecutorId string evt.ExecutorId
Update AgentRunResponseUpdate evt.Update
Text (输出文本) string evt.Update.Text
WorkflowOutputEvent SourceId string evt.SourceId
Data (结果) object? evt.Data
类型检查 bool evt.Is()`
类型转换 T? evt.As()
SuperStepCompletedEvent StepNumber int evt.StepNumber
  • 错误处理检查清单
    • 监听 ExecutorFailedEvent 处理异常
    • 检查 WorkflowOutputEvent 是否为 null
    • 使用类型安全的 Is<T>()As<T>() 方法
    • 在事件处理中捕获异常,避免崩溃
    • 记录关键错误信息(ExecutorId、异常类型、堆栈)
  • 性能优化检查清单
    • 使用过滤器减少不必要的事件处理
    • 避免在事件处理中执行耗时操作
    • 对于密集事件(如 AgentRunUpdateEvent),考虑批处理
    • 使用异步操作,避免阻塞事件流
    • 监控事件处理的性能瓶颈
  • AgentRunUpdateEvent 必备条件:
// 正确:会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

// 错误:不会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: false));
await run.TrySendMessageAsync(new TurnToken()); // 默认 false
  • 调试技巧速查
技巧 实现方式 用途
查看所有事件 Console.WriteLine(evt) 了解事件序列
记录事件时间戳 DateTime.Now 在事件处理时记录 性能分析
统计事件数量 维护计数器 验证执行完整性
追踪执行路径 记录 ExecutorId 序列 理解执行流程
捕获中间结果 保存 ExecutorCompletedEvent.Data 数据流分析

五、人机协作

1. 为什么需要人机协作?

虽然 AI 能力强大,但在以下场景中,人工干预仍然不可或缺:

场景 说明 示例
合规审核 需要人工判断内容合规性 内容发布前的审核流程
高风险决策 涉及资金或重要资源的操作 大额转账审批、资源调配
创意评审 需要主观判断的创意内容 营销文案终审、设计方案选择
质量把控 AI 输出需要专家验证 医疗诊断辅助、法律文书生成
异常处理 遇到 AI 无法处理的边界情况 复杂客诉的升级处理

MAF 提供了三个核心组件来实现 Human-in-the-Loop:

classDiagram
    class RequestPort {
        +string PortName
        +Type RequestType
        +Type ResponseType
        +Create~TRequest,TResponse~()
    }
    
    class RequestInfoEvent {
        +ExternalRequest Request
        +string PortName
        +Type RequestType
    }
    
    class ExternalResponse {
        +object Data
        +string RequestId
        +CreateResponse(data)
    }
    
    RequestPort ..> RequestInfoEvent: 发出事件
    RequestInfoEvent ..> ExternalResponse: 触发响应
    ExternalResponse ..> RequestPort: 恢复执行
  • RequestPort (请求端口)

    • 作用:在工作流中插入"暂停点"

    • 特性:像 Executor 一样使用,但不执行逻辑

    • 创建:RequestPort.Create<TRequest, TResponse>("PortName")

  • RequestInfoEvent (请求信息事件)

    • 作用:通知外部世界"需要人工干预"
    • 内容:包含请求的数据类型、端口名称
    • 监听:通过流式运行的事件流捕获
  • ExternalResponse (外部响应)

    • 作用:将人工决策结果传回工作流
    • 创建:request.CreateResponse(data)
    • 发送:handle.SendResponseAsync(response)

2. 基础示例 - 猜数字游戏

我们实现一个猜数字游戏,演示最基本的 HITL 交互:

  1. 工作流设定一个目标数字(如 42)
  2. 通过 RequestPort 向用户请求猜测
  3. JudgeExecutor 判断猜测结果(太大/太小/正确)
  4. 循环直到猜对
graph LR
    A[RequestPort<br/>请求猜测] --> B[JudgeExecutor<br/>判断结果]
    B -->|太小| C[发送Below信号]
    B -->|太大| D[发送Above信号]
    B -->|正确| E[输出成功]
    C --> A
    D --> A
    
    style A fill:#ffeb3b
    style E fill:#4caf50
  • 步骤1:创建 JudgeExecutor (判断执行器)
/// <summary>
/// 游戏信号:用于 RequestPort 和 JudgeExecutor 之间的通信
/// </summary>
public enum NumberSignal
{
    Init,   // 初始状态
    Above,  // 猜的数字太大
    Below   // 猜的数字太小
}

/// <summary>
/// 裁判执行器:判断用户猜测的数字
/// </summary>
public class JudgeExecutor : Executor<int>
{
    private readonly int _targetNumber; // 目标数字
    private int _tries = 0;             // 尝试次数
    
    public JudgeExecutor(int targetNumber) : base("Judge")
    {
        _targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(
        int guess, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        _tries++;
        Console.WriteLine($"━━━━━━━━━━━━━━━━━━");
        Console.WriteLine($"第 {_tries} 次尝试,您猜测的数字是: {guess}");

        if (guess == _targetNumber)
        {
            // 猜对了!输出结果并结束
            await context.YieldOutputAsync(
                $"恭喜!数字 {_targetNumber} 在 {_tries} 次尝试后找到!", 
                cancellationToken);
        }
        else if (guess < _targetNumber)
        {
            // 太小了
            Console.WriteLine($"提示: 太小了!");
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
        }
        else
        {
            // 太大了
            Console.WriteLine($"提示: 太大了!");
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
        }
    }
}
  • 步骤2:构建工作流

    核心要点:

    • RequestPort 接收 NumberSignal 作为请求,返回 int 作为响应
    • 工作流从 RequestPort 开始
    • 创建循环边:JudgeExecutor → RequestPort → JudgeExecutor
// 创建 RequestPort
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

// 创建 JudgeExecutor(目标数字是 42)
var judgeExecutor = new JudgeExecutor(42);

// 构建工作流
var guessWorkflow = new WorkflowBuilder(numberRequestPort)
    .AddEdge(numberRequestPort, judgeExecutor)  // RequestPort → Judge
    .AddEdge(judgeExecutor, numberRequestPort)  // Judge → RequestPort (循环)
    .WithOutputFrom(judgeExecutor)              // 从 Judge 输出结果
    .Build();
  • 步骤3:定义外部请求处理逻辑,这是 HITL 的核心:处理 RequestInfoEvent 并返回 ExternalResponse
/// <summary>
/// 处理外部请求:根据不同的 NumberSignal 提示用户输入
/// </summary>
async Task<ExternalResponse> HandleExternalRequest(ExternalRequest request)
{
    if (request.DataIs<NumberSignal>())
    {
        var signal = request.DataAs<NumberSignal>();
        
        switch (signal)
        {
            case NumberSignal.Init:
                var prompt = "\n游戏开始!请输入您的初始猜测 (1-100):";
                Console.WriteLine(prompt);
                int initialGuess = await PolyglotHelper.ReadConsoleInputAsync<int>(prompt);
                return request.CreateResponse(initialGuess);
                
            case NumberSignal.Above:
                var promptAbove = "\n上次猜的太大了,请输入一个更小的数字:";
                Console.WriteLine(promptAbove);
                int lowerGuess = await PolyglotHelper.ReadConsoleInputAsync<int>(promptAbove);
                return request.CreateResponse(lowerGuess);
                
            case NumberSignal.Below:
                var promptBelow = "\n上次猜的太小了,请输入一个更大的数字:";
                Console.WriteLine(promptBelow);
                int higherGuess = await PolyglotHelper.ReadConsoleInputAsync<int>(promptBelow);
                return request.CreateResponse(higherGuess);
        }
    }
    
    throw new NotSupportedException($"不支持的请求类型: {request.PortInfo.RequestType}");
}

Console.WriteLine("外部请求处理函数定义完成");
  • 步骤4:运行工作流(流式执行 + 事件处理)

    关键流程:

    • 使用 InProcessExecution.StreamAsync() 流式运行
    • 初始消息发送 NumberSignal.Init 启动游戏
    • 监听 RequestInfoEvent → 调用 HandleExternalRequest() → 发送 ExternalResponse
    • 监听 WorkflowOutputEvent 获取最终结果
Console.WriteLine("开始猜数字游戏!\n");
// 流式运行工作流,初始消息是 NumberSignal.Init
await using (StreamingRun handle = await InProcessExecution.StreamAsync(guessWorkflow, NumberSignal.Init)){
    await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
    {
        switch (evt)
        {
            case RequestInfoEvent requestInfoEvent:
                // 收到外部请求事件
                Console.WriteLine($"\n收到 RequestInfoEvent (PortName: {requestInfoEvent.Request.PortInfo.PortId})");

                // 处理请求并返回响应
                ExternalResponse response = await HandleExternalRequest(requestInfoEvent.Request);
                await handle.SendResponseAsync(response);
                break;

            case WorkflowOutputEvent outputEvent:
                // 工作流完成
                Console.WriteLine($"\n━━━━━━━━━━━━━━━━━━");
                Console.WriteLine($"{outputEvent.Data}");
                return;
        }
    }
}

3. 企业场景 - 内容审核工作流

场景说明:某企业的内容发布系统需要实现以下审核流程:

  1. 用户提交文章内容
  2. AI 进行初步敏感词检测
  3. 如果检测到风险内容,暂停工作流,通知人工审核员
  4. 审核员决定:通过 / 拒绝 / 需要修改
  5. 根据审核结果继续后续流程
  • 步骤1:定义数据模型
/// <summary>
/// 内容提交
/// </summary>
public record ContentSubmission(string Title, string Body, string Author);

/// <summary>
/// AI 检测结果
/// </summary>
public record DetectionResult(string Content, bool IsRisky, string Reason);

/// <summary>
/// 审核决策
/// </summary>
public enum ReviewDecision
{
    Approve,   // 通过
    Reject,    // 拒绝
    NeedEdit   // 需要修改
}

/// <summary>
/// 审核请求(用于 RequestPort)
/// </summary>
public record ReviewRequest(string Content, string Reason);
  • 步骤2:实现 AI 检测执行器
/// <summary>
/// AI 初步检测执行器
/// </summary>
public class ContentDetectionExecutor : Executor<ContentSubmission>
{
    private readonly IChatClient _chatClient;

    public ContentDetectionExecutor(IChatClient chatClient) : base("ContentDetection")
    {
        _chatClient = chatClient;
    }

    public override async ValueTask HandleAsync(
        ContentSubmission content, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"\nAI 正在检测内容...");
        Console.WriteLine($" 标题: {content.Title}");
        Console.WriteLine($" 作者: {content.Author}");

        // 构建检测 Prompt
        var prompt = $@"
请检测以下内容是否包含敏感信息、违规内容或不当表述。

内容:
{content.Body}

请以 JSON 格式返回:
{{
  ""isRisky"": true/false,
  ""reason"": ""检测原因""
}}
";

        try
        {
            var response = await _chatClient.GetResponseAsync<DetectionResult>(prompt, cancellationToken: cancellationToken);
            var result = new DetectionResult(content.Body, response.Result!.IsRisky, response.Result.Reason);

            if (result.IsRisky)
            {
                Console.WriteLine($"检测到风险内容: {result.Reason}");
            }
            else
            {
                Console.WriteLine($"内容安全,可以自动发布");
            }

            // 发送检测结果到下游
            await context.SendMessageAsync(result, cancellationToken: cancellationToken);
        }
        catch (Exception ex)
        {
            Console.WriteLine($"AI 检测失败: {ex.Message}");
            // 失败时默认走人工审核
            await context.SendMessageAsync(
                new DetectionResult(content.Body, true, "AI检测失败,需要人工审核"), 
                cancellationToken: cancellationToken);
        }
    }
}
  • 步骤3:实现路由执行器(决定是否需要人工审核)
/// <summary>
/// 路由执行器:根据风险判断是否需要人工审核
/// </summary>
public class RoutingExecutor : Executor<DetectionResult>
{
    public RoutingExecutor() : base("Routing") { }

    public override async ValueTask HandleAsync(
        DetectionResult result, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        if (result.IsRisky)
        {
            // 高风险,发送到人工审核端口
            Console.WriteLine($"触发人工审核流程");
            await context.SendMessageAsync(
                new ReviewRequest(result.Content, result.Reason), 
                cancellationToken: cancellationToken);
        }
        else
        {
            // 低风险,直接发布
            Console.WriteLine($"自动通过审核,准备发布");
            await context.YieldOutputAsync($"内容已自动发布", cancellationToken);
        }
    }
}
  • 步骤4:实现发布执行器
/// <summary>
/// 发布执行器:处理人工审核后的结果
/// </summary>
public class PublishExecutor : Executor<ReviewDecision>
{
    public PublishExecutor() : base("Publish") { }

    public override async ValueTask HandleAsync(
        ReviewDecision decision, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"\n处理审核决策: {decision}");

        switch (decision)
        {
            case ReviewDecision.Approve:
                await context.YieldOutputAsync("人工审核通过,内容已发布", cancellationToken);
                break;
                
            case ReviewDecision.Reject:
                await context.YieldOutputAsync("人工审核拒绝,内容未发布", cancellationToken);
                break;
                
            case ReviewDecision.NeedEdit:
                await context.YieldOutputAsync("需要作者修改后重新提交", cancellationToken);
                break;
        }
    }
}
  • 步骤5:构建审核工作流
// 创建执行器
var detectionExecutor = new ContentDetectionExecutor(chatClient);
var routingExecutor = new RoutingExecutor();
var publishExecutor = new PublishExecutor();

// 创建 RequestPort(用于人工审核)
var reviewPort = RequestPort.Create<ReviewRequest, ReviewDecision>("HumanReview");

// 构建工作流
var reviewWorkflow = new WorkflowBuilder(detectionExecutor)
    .AddEdge(detectionExecutor, routingExecutor)   // 检测 → 路由
    .AddEdge(routingExecutor, reviewPort)          // 路由 → 人工审核端口
    .AddEdge(reviewPort, publishExecutor)          // 审核端口 → 发布
    .WithOutputFrom(routingExecutor)               // 自动发布的输出
    .WithOutputFrom(publishExecutor)               // 人工审核后的输出
    .Build();

Console.WriteLine("内容审核工作流构建完成");
  • 步骤6:定义外部审核处理逻辑
/// <summary>
/// 模拟人工审核员的决策
/// </summary>
async Task<ExternalResponse> HandleReviewRequest(ExternalRequest request)
{
    if (request.DataIs<ReviewRequest>())
    {
        var reviewRequest = request.DataAs<ReviewRequest>();        
        Console.WriteLine($"\n━━━━━━━━━━━━━━━━━━");
        Console.WriteLine($"人工审核界面");
        Console.WriteLine($"━━━━━━━━━━━━━━━━━━");
        Console.WriteLine($"风险原因: {reviewRequest.Reason}");
        Console.WriteLine($"内容摘要: {reviewRequest.Content.Substring(0, Math.Min(50, reviewRequest.Content.Length))}...");
        Console.WriteLine($"\n请输入审核决策:");
        Console.WriteLine($"1 - 通过 (Approve)");
        Console.WriteLine($"2 - 拒绝 (Reject)");
        Console.WriteLine($"3 - 需要修改 (NeedEdit)");
        Console.WriteLine($"━━━━━━━━━━━━━━━━━━\n");
        
        int choice = await PolyglotHelper.ReadConsoleInputAsync<int>("请输入您的选择 (1-3): 1:通过,2:拒绝,3:需要修改");        
        ReviewDecision decision = choice switch
        {
            1 => ReviewDecision.Approve,
            2 => ReviewDecision.Reject,
            3 => ReviewDecision.NeedEdit,
            _ => ReviewDecision.Reject // 默认拒绝
        };        
        Console.WriteLine($"\n审核决策已提交: {decision}");
        return request.CreateResponse(decision);
    }
    
    throw new NotSupportedException($"不支持的请求类型: {request.PortInfo.RequestType}");
}
  • 步骤7:测试工作流 - 高风险内容
// 构造一个高风险内容的测试用例
var riskyContent = new ContentSubmission(
    Title: "测试文章",
    Body: "这是一篇包含敏感政治话题和未经证实的健康信息的文章内容,可能违反平台规定。",
    Author: "测试作者"
);

Console.WriteLine("开始测试内容审核工作流(高风险场景)\n");
await using (StreamingRun handle = await InProcessExecution.StreamAsync(reviewWorkflow, riskyContent)){
    await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
    {
        switch (evt)
        {
            case RequestInfoEvent requestInfoEvent:
                // 收到人工审核请求
                Console.WriteLine($"\n收到 RequestInfoEvent (需要人工审核)");
                ExternalResponse response = await HandleReviewRequest(requestInfoEvent.Request);
                await handle.SendResponseAsync(response);
                break;

            case WorkflowOutputEvent outputEvent:
                // 工作流完成
                Console.WriteLine($"\n━━━━━━━━━━━━━━━━━━");
                Console.WriteLine($"工作流完成: {outputEvent.Data}");
                return;
        }
    }
}
  • 步骤8:测试工作流 - 低风险内容(自动通过)
// 构造一个低风险内容的测试用例
var safeContent = new ContentSubmission(
    Title: "技术分享",
    Body: "本文将介绍如何使用 .NET 开发 AI Agent,包括 MAF 框架的核心概念和最佳实践。",
    Author: "技术博主"
);

Console.WriteLine("开始测试内容审核工作流(低风险场景)\n");
await using (StreamingRun handle2 = await InProcessExecution.StreamAsync(reviewWorkflow, safeContent)){
    await foreach (WorkflowEvent evt in handle2.WatchStreamAsync())
    {
        switch (evt)
        {
            case RequestInfoEvent requestInfoEvent:
                // 这个场景不应该触发人工审核
                Console.WriteLine($"意外触发人工审核");
                ExternalResponse response = await HandleReviewRequest(requestInfoEvent.Request);
                await handle2.SendResponseAsync(response);
                break;

            case WorkflowOutputEvent outputEvent:
                Console.WriteLine($"\n━━━━━━━━━━━━━━━━━━");
                Console.WriteLine($"工作流完成: {outputEvent.Data}");
                return;
        }
    }
}

4. 高级模式 - 多轮交互

场景说明:在某些场景中,可能需要多轮人工交互。例如:

  • 需求澄清:AI 生成代码后,用户可能多次提出修改意见
  • 设计迭代:设计稿需要经过多轮审核和调整
  • 调查问卷:根据上一个答案动态生成下一个问题

实现要点,多轮交互的关键是使用循环边和状态管理:

graph LR
    A[AI生成初稿] --> B[RequestPort<br/>用户反馈]
    B --> C{是否满意?}
    C -->|否| D[AI修改]
    D --> B
    C -->|是| E[完成]
    
    style B fill:#ffeb3b
    style C fill:#ff9800
  • 简易实现
/// <summary>
/// 需求澄清执行器
/// </summary>
public class RequirementClarificationExecutor : Executor<string>
{
    private int _roundCount = 0;
    private const int MaxRounds = 3; // 最多3轮交互

    public RequirementClarificationExecutor() : base("RequirementClarification") { }

    public override async ValueTask HandleAsync(
        string userFeedback, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        _roundCount++;
        Console.WriteLine($"\n第 {_roundCount} 轮澄清");
        Console.WriteLine($" 用户反馈: {userFeedback}");

        if (userFeedback.Contains("满意") || userFeedback.Contains("确认"))
        {
            // 用户满意,结束流程
            await context.YieldOutputAsync($"需求已确认!共进行了 {_roundCount} 轮澄清。", cancellationToken);
        }
        else if (_roundCount >= MaxRounds)
        {
            // 达到最大轮数
            await context.YieldOutputAsync($"已达到最大轮数 ({MaxRounds}),流程结束。", cancellationToken);
        }
        else
        {
            // 继续下一轮
            await context.SendMessageAsync("需要更多信息", cancellationToken: cancellationToken);
        }
    }
}

// 构建多轮交互工作流
var clarificationPort = RequestPort.Create<string, string>("UserInput");
var clarificationExecutor = new RequirementClarificationExecutor();

var multiRoundWorkflow = new WorkflowBuilder(clarificationPort)
    .AddEdge(clarificationPort, clarificationExecutor)
    .AddEdge(clarificationExecutor, clarificationPort) // 循环边
    .WithOutputFrom(clarificationExecutor)
    .Build();
  • 测试多轮交互工作流
Console.WriteLine("开始多轮需求澄清流程\n");
await using (StreamingRun handle3 = await InProcessExecution.StreamAsync(multiRoundWorkflow, "开始")){
    await foreach (WorkflowEvent evt in handle3.WatchStreamAsync())
    {
        switch (evt)
        {
            case RequestInfoEvent requestInfoEvent:
                string feedback = await PolyglotHelper.ReadConsoleInputAsync<string>("请输入您的反馈 (输入'满意'结束流程):");
                var response = requestInfoEvent.Request.CreateResponse(feedback);
                await handle3.SendResponseAsync(response);
                break;

            case WorkflowOutputEvent outputEvent:
                Console.WriteLine($"\n━━━━━━━━━━━━━━━━━━");
                Console.WriteLine($"{outputEvent.Data}");
                return;
        }
    }
}

5. 最佳实践

HITL 最佳实践

实践 说明 示例
最大迭代保护 防止无限循环 _roundCount >= MaxRounds
超时机制 人工响应超时处理 CancellationTokenSource.CancelAfter()
状态持久化 长时间等待需保存状态 结合 Checkpoint 机制
清晰的上下文 提供足够的决策信息 在 ReviewRequest 中包含完整上下文
通知机制 及时提醒审核人员 发送邮件/IM 通知
审计日志 记录所有人工决策 在 PublishEventAsync 中记录

常见陷阱

  • 忘记发送响应
// 错误:收到 RequestInfoEvent 后没有调用 SendResponseAsync
case RequestInfoEvent evt:
    HandleReviewRequest(evt.Request);
    // 工作流会永久卡住!
    break;

// 正确
case RequestInfoEvent evt:
    var response = HandleReviewRequest(evt.Request);
    await handle.SendResponseAsync(response);
    break;
  • 类型不匹配
// 错误:RequestPort 期望 int,但返回了 string
var port = RequestPort.Create<string, int>("Port");
var response = request.CreateResponse("42"); // string!

// 正确
var response = request.CreateResponse(42); // int
  • 缺少循环边
// 错误:RequestPort 只有入边,没有出边
.AddEdge(judgeExecutor, requestPort) // 只有这一条边

// 正确:需要双向边形成循环
.AddEdge(requestPort, judgeExecutor)
.AddEdge(judgeExecutor, requestPort)

架构建议

对于生产环境的 HITL 系统,建议:

  • 解耦工作流与 UI
    • 工作流不应直接依赖 Console.ReadLine()
    • 使用消息队列或 WebSocket 进行通信
  • 持久化 Checkpoint
    • 在 RequestPort 前保存 Checkpoint
    • 允许长时间等待和系统重启
  • 监控与告警
    • 监控等待中的请求数量
    • 超时自动升级或降级处理

六、工作流状态共享

在真实业务中,以下痛点最常见:

场景 传统做法 痛点 Workflow Shared State 方案
多步骤共享原始输入 通过参数层层传递 易丢失、难维护 在入口步骤写入状态,后续随取随用
并发执行器共享缓存 使用静态变量 线程不安全 作用域隔离 + 状态管理器负责并发
人机协作回填审批结论 HTTP 回调携带参数 状态丢失、难追踪 WaitForInput + 状态恢复
Agent 输出给多个消费者 多次调用同一个 Agent 成本高 把输出写入状态,其他步骤复用

1. WorkflowContext 状态 API 速查

API 作用 典型使用方式
QueueStateUpdateAsync(key, value, scope) 将状态写入队列,当前步骤完成后统一提交,避免脏读 适合在 Executor 内写入结果、缓存、审计信息
ReadStateAsync(key, scope) 读取指定键,找不到则返回 null 读取上游步骤输出、缓存的 Agent 响应
ReadOrInitStateAsync(key, factory, scope) 如果不存在则初始化,兼顾懒加载与并发一致性 首次接入业务实体、惰性缓存、计数器 / 重试次数
QueueClearScopeAsync(scope) 清理整个作用域 敏感信息擦除、生命周期结束时清理
ReadStateKeysAsync(scope) 列出作用域中所有键 调试、监控、构建动态路由
  • ReadOrInitStateAsync 应用场景

    • 首次加载业务上下文:如 fileId 对应的元数据、审批信息等只需初始化一次的结构。

    • 惰性构建缓存:在第一次命中时生成摘要、Embedding 或模型响应,后续 Executor 复用。

    • 并发累加器:计数器、重试次数、Fan-in 进度等需要“读当前值 + 初始化默认值”的模式。

作用域 (Scope) 分类

Scope 类型 说明 建议命名
Executor Scope (默认) 键只对当前 Executor 可见 nameof(Executor)
自定义业务 Scope 多个 Executor 共享,例如 "FileContentState" 模块名 + State
System / Environment Scope 框架内部使用,不建议直接写入 使用 VariableScopeNames 常量时需谨慎

线程安全说明:QueueStateUpdateAsync 会在状态管理器内部加锁,所以不需要额外的 lock。但仍需控制数据粒度(例如传输 ID,而不是整个大对象)。

2. 案例:文档统计工作流

我们将复刻官方 Workflows/SharedStates 示例,并加入更多注释:

  1. 入口步骤读取文本内容,写入 FileContentState 作用域中,值为 fileId -> 文本内容
  2. Fan-out 出两个 Executor:
    • WordCountingExecutor 统计单词数
    • ParagraphCountingExecutor 统计段落数
  3. Fan-in 到 AggregationExecutor,汇总结果并输出结构化报表

该案例展示了一次写入,多处读取的模式,也是后续 MapReduce、并发协作的基础。

  • 准备工作
// 示例数据
internal static class SharedStateSampleData
{
    private static readonly IReadOnlyDictionary<string, string> Documents = new Dictionary<string, string>
    {
        ["ProductBrief"] = "MAF Workflow 让 .NET 团队可以像积木一样组合 Agent、Executor 与工具, 支持流式事件、并发节点和可观测性。\n\n它强调企业级能力, 包括状态管理、依赖注入、权限控制, 适合搭建端到端 AI 业务流程。",
        ["WeeklyReport"] = "本周平台完成了 Shared State 功能的代码走查, 已经覆盖 Fan-out/Fan-in, Loop, Human-in-the-Loop 三种场景。\n\n下周计划: 1) 集成多模型投票; 2) 增加异常回滚; 3) 落地监控指标。"
    };

    public static string GetDocument(string name)
        => Documents.TryGetValue(name, out var content)
            ? content
            : throw new ArgumentException($"未找到文档: {name}");
}

// 状态常量
internal static class FileContentStateConstants
{
    public const string ScopeName = "FileContentState";
}

// 模型定义
internal sealed class FileStats
{
    public int WordCount { get; init; }
    public int ParagraphCount { get; init; }
}

// 定义FileReadExecutor
internal sealed class FileReadExecutor() : Executor<string, string>("FileReadExecutor")
{
    public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        string content = SharedStateSampleData.GetDocument(message);
        string fileId = Guid.NewGuid().ToString("N");

        await context.QueueStateUpdateAsync(fileId, content, FileContentStateConstants.ScopeName, cancellationToken);
        Console.WriteLine($"FileReadExecutor 将 {message} 写入 Scope:{FileContentStateConstants.ScopeName}");

        return fileId;
    }
}

// 定义并行统计执行器
internal sealed class WordCountingExecutor() : Executor<string, FileStats>("WordCountingExecutor")
{
    public override async ValueTask<FileStats> HandleAsync(string fileId, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        string? content = await context.ReadStateAsync<string>(fileId, FileContentStateConstants.ScopeName, cancellationToken);
        if (content is null)
        {
            throw new InvalidOperationException($"无法在 Scope:{FileContentStateConstants.ScopeName} 中找到 fileId={fileId}");
        }

        int wordCount = content.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
        return new FileStats { WordCount = wordCount };
    }
}

// 定义AggregationExecutor 
internal sealed class ParagraphCountingExecutor() : Executor<string, FileStats>("ParagraphCountingExecutor")
{
    public override async ValueTask<FileStats> HandleAsync(string fileId, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        string? content = await context.ReadStateAsync<string>(fileId, FileContentStateConstants.ScopeName, cancellationToken);
        if (content is null)
        {
            throw new InvalidOperationException($"无法在 Scope:{FileContentStateConstants.ScopeName} 中找到 fileId={fileId}");
        }

        int paragraphCount = content.Split(['\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
        return new FileStats { ParagraphCount = paragraphCount };
    }
}

internal sealed class AggregationExecutor() : Executor<FileStats>("AggregationExecutor")
{
    private readonly List<FileStats> _buffer = [];

    public override async ValueTask HandleAsync(FileStats message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._buffer.Add(message);
        if (this._buffer.Count < 2)
        {
            return;
        }

        int totalWords = this._buffer.Sum(x => x.WordCount);
        int totalParagraphs = this._buffer.Sum(x => x.ParagraphCount);

        var output = new
        {
            总词数 = totalWords,
            总段落数 = totalParagraphs,
            统计时间 = DateTimeOffset.UtcNow
        };

        Console.WriteLine("文档统计结果");
        output.Display();
        await context.YieldOutputAsync(output, cancellationToken);
    }
}

// 定义AggregationExecutor 
internal sealed class AggregationExecutor() : Executor<FileStats>("AggregationExecutor")
{
    private readonly List<FileStats> _buffer = [];

    public override async ValueTask HandleAsync(FileStats message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._buffer.Add(message);
        if (this._buffer.Count < 2)
        {
            return;
        }

        int totalWords = this._buffer.Sum(x => x.WordCount);
        int totalParagraphs = this._buffer.Sum(x => x.ParagraphCount);

        var output = new
        {
            总词数 = totalWords,
            总段落数 = totalParagraphs,
            统计时间 = DateTimeOffset.UtcNow
        };

        Console.WriteLine("文档统计结果");
        output.Display();
        await context.YieldOutputAsync(output, cancellationToken);
    }
}
  • 步骤 1:组装 Fan-out/Fan-in 工作流

我们将以下列顺序建图:

  1. FileReadExecutor 作为入口,并指定最终输出来自 AggregationExecutor
  2. AddFanOutEdge:把文件 ID 同时传给两个统计执行器
  3. AddFanInEdge:等待两个统计执行器完成后,再进入聚合节点

通过 Builder 定义拓扑后,我们会在后续步骤用 RunStreamingAsync 查看事件流。

var fileRead = new FileReadExecutor();
var wordCounting = new WordCountingExecutor();
var paragraphCounting = new ParagraphCountingExecutor();
var aggregate = new AggregationExecutor();

var sharedStateWorkflow = new WorkflowBuilder(fileRead)
    .AddFanOutEdge(fileRead, [wordCounting, paragraphCounting])
    .AddFanInEdge([wordCounting, paragraphCounting], aggregate)
    .WithOutputFrom(aggregate)
    .Build();
Console.WriteLine("Shared State Workflow 构建完成");
  • 步骤 2:流式观察状态读写

我们使用 InProcessExecution.RunStreamingAsync 以事件流的方式运行 workflow,并关注三类事件:

  1. WorkflowStartedEvent / WorkflowCompletedEvent:整体生命周期
  2. ExecutorCompletedEvent:确认每个节点的执行顺序
  3. WorkflowOutputEvent:最终业务输出

同时,我们打印状态写入时生成的 fileId,以便验证 Fan-out 节点读取的是同一个共享数据。

static async Task RunSharedStateDemoAsync(Workflow sharedWorkflow, string documentKey)
{
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine($"演示文档: {documentKey}");
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");

    await using (var run = await InProcessExecution.StreamAsync(sharedWorkflow, documentKey)){
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            switch (evt)
            {
                case WorkflowStartedEvent started:
                    Console.WriteLine($"Workflow Started");
                    break;
                case ExecutorCompletedEvent executorCompleted:
                    Console.WriteLine($"{executorCompleted.ExecutorId} 完成");
                    break;
                case WorkflowOutputEvent outputEvent:
                    Console.WriteLine("收到 Workflow Output Event:");
                    outputEvent.Display();
                    break;
                case WorkflowErrorEvent  errorEvent:
                    Console.WriteLine("收到 Workflow Error Event:");
                    errorEvent.Display();
                    break;
                default:
                    Console.WriteLine($"其他事件: {evt.GetType().Name}");
                    evt.Display();
                    break;
            }
        }
        await run.DisposeAsync();
    }
}
await RunSharedStateDemoAsync(sharedStateWorkflow, "ProductBrief");

3. 最佳实践

  • 命名规范

    • 使用 Scope + 功能命名(例:FileContentState)

    • 键值建议为业务 ID(如 fileId、turnId),避免直接储存整段文本作为 key

  • 状态生命周期

    • 在数据敏感的场景结束后调用 QueueClearScopeAsync

    • 长时间运行的 Workflow 可定期写入 UpdatedAt,便于调试

  • 并发注意事项

    • Fan-out 节点共享同一 state 时,只写 ID,不写大对象

    • 如果必须写大对象,可在读取端做 null 检查并提供降级策略

  • 调试建议

    • 使用 ReadStateKeysAsync 打印当前作用域的所有键

    • 配合 WorkflowEvent 日志,快速定位状态缺失或覆盖问题

  • 常见踩坑

    • 在 Executor 中保存可变引用(List/Dictionary),会被后续步骤修改

    • 在多个 scope 中重复使用相同 key,导致取值混乱

    • 忘记 await QueueStateUpdateAsync,导致状态未落盘

七、工作流上下文

1. API 分类速查表

IWorkflowContext 是 Executor 与 Workflow 引擎交互的唯一入口,它提供了以下能力:

分类 API 用途 常见场景
输出管理 SendMessageAsync 向下游 Executor 发送消息 流程编排、数据传递
AddEventAsync 触发自定义业务事件 进度通知、审计日志
YieldOutputAsync 输出最终业务结果 工作流返回值
流程控制 RequestHaltAsync 暂停工作流执行 人工审批、等待外部系统
状态管理 ReadStateAsync 读取状态值 获取共享数据
ReadOrInitStateAsync 读取或初始化状态 计数器、缓存、配置
QueueStateUpdateAsync 更新状态值 写入业务数据
ReadStateKeysAsync 枚举作用域中的所有键 调试、动态路由
QueueClearScopeAsync 清理整个作用域 敏感数据清理
元数据 TraceContext 链路追踪上下文 分布式追踪
ConcurrentRunsEnabled 是否支持并发运行 并发控制

2. 核心概念:三种输出机制的本质区别

这是最容易混淆的部分!理解它们的区别是掌握 Context 的关键。

  • SendMessageAsync - 流程内部通信

    特点:

    • 目标是下游 Executor(通过 Edge 连接的节点)

    • 消息在下一个 SuperStep才会被接收

    • 可以指定 targetId 进行精确路由

    • 如果没有连接关系,消息会被丢弃

    典型场景:

    • 动态路由(根据条件发送给不同的 Executor)

    • 广播模式(一个 Executor 发送给多个下游)

    • 流程编排中的数据传递

  • AddEventAsync - 业务事件通知

    特点:

    • 目标是外部订阅者(调用 workflow 的客户端)

    • 事件在当前 SuperStep 结束时立即发出

    • 用于传递业务进度、审计日志、中间状态

    • 不影响工作流的执行流程

    典型场景:

    • 进度通知("正在生成大纲…"、"已完成 3/10 个文档")

    • 审计日志("用户操作已记录"、"检测到敏感词")

    • 调试信息(自定义 Event 携带中间变量)

  • YieldOutputAsync - 最终业务结果

    特点:

    • 目标是工作流的最终返回值

    • 通过 WorkflowOutputEvent 封装返回

    • 通常在 WithOutputFrom() 指定的步骤中调用

    • 一个工作流可以有多个输出(多次调用 YieldOutputAsync)

    典型场景:

    • 工作流的最终结果(生成的文章、处理后的数据)

    • 批量输出(MapReduce 的 Reduce 结果)

    • 结构化返回值(JSON、DTO)

  • 记忆口诀:

    • Send = 内部流程通信(Executor → Executor)
    • Event = 进度广播(Executor → 订阅者)
    • Output = 最终结果(Workflow → 调用者)
  • 对比总结

维度 SendMessageAsync AddEventAsync YieldOutputAsync
接收者 下游 Executor 外部订阅者 外部订阅者
触发时机 下一个 SuperStep 当前 SuperStep 结束 当前 SuperStep 结束
用途 流程内部数据传递 进度通知/审计日志 最终业务结果
是否影响流程 是(触发下游执行)
典型事件类型 N/A 自定义 WorkflowEvent WorkflowOutputEvent

2. 实战演示:Context Explorer

我们将创建一个专门的 ContextExplorerExecutor,它会依次调用 IWorkflowContext 的所有关键 API,并通过事件流展示每个方法的效果。

演示场景设计

  • 步骤 1:定义自定义事件和 Context Explorer
// 自定义事件:用于报告探索进度
internal sealed class ExplorationProgressEvent(string category, string api, string result) : WorkflowEvent
{
    public string Category { get; } = category;
    public string API { get; } = api;
    public string Result { get; } = result;
}

internal sealed class ContextExplorerExecutor() : Executor<string, string>("ContextExplorerExecutor")
{
    private const string DemoScope = "ExplorerDemoScope";

    public override async ValueTask<string> HandleAsync(string mode, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"ContextExplorer 开始工作 (模式: {mode})");
        Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    
        // ============ 1.状态管理演示 ============
        await this.DemonstrateStateManagementAsync(context, cancellationToken);
    
        // ============ 2.输出机制演示 ============
        await this.DemonstrateOutputMechanismsAsync(context, cancellationToken);
    
        // ============ 3.元数据演示 ============
        await this.DemonstrateMetadataAsync(context, cancellationToken);
    
        Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
        Console.WriteLine("ContextExplorer 工作完成");
    
        return "探索完成";
    }
    
    private async ValueTask DemonstrateStateManagementAsync(IWorkflowContext context, CancellationToken cancellationToken)
    {
        Console.WriteLine("\n【1.状态管理 API 演示】");
    
        // 1. QueueStateUpdateAsync - 写入状态
        await context.QueueStateUpdateAsync("counter", 100, DemoScope, cancellationToken);
        await context.AddEventAsync(new ExplorationProgressEvent(
            "状态管理", 
            "QueueStateUpdateAsync", 
            "写入 counter=100"), cancellationToken);
        Console.WriteLine(" QueueStateUpdateAsync: 写入 counter=100");
    
        // 2. ReadStateAsync - 读取状态
        var counter = await context.ReadStateAsync<int>("counter", DemoScope, cancellationToken);
        await context.AddEventAsync(new ExplorationProgressEvent(
            "状态管理", 
            "ReadStateAsync", 
            $"读取到 counter={counter}"), cancellationToken);
        Console.WriteLine($" ReadStateAsync: 读取到 counter={counter}");
    
        // 3. ReadOrInitStateAsync - 惰性初始化
        var config = await context.ReadOrInitStateAsync(
            "config", 
            () => new { MaxRetry = 3, Timeout = 30 }, 
            DemoScope, 
            cancellationToken);
        await context.AddEventAsync(new ExplorationProgressEvent(
            "状态管理", 
            "ReadOrInitStateAsync", 
            $"初始化配置: MaxRetry={config.MaxRetry}"), cancellationToken);
        Console.WriteLine($" ReadOrInitStateAsync: 初始化配置 MaxRetry={config.MaxRetry}");
    
        // 4. ReadStateKeysAsync - 枚举所有键
        var keys = await context.ReadStateKeysAsync(DemoScope, cancellationToken);
        await context.AddEventAsync(new ExplorationProgressEvent(
            "状态管理", 
            "ReadStateKeysAsync", 
            $"作用域中的键: {string.Join(", ", keys)}"), cancellationToken);
        Console.WriteLine($" ReadStateKeysAsync: 共 {keys.Count} 个键 [{string.Join(", ", keys)}]");
    
        // 5. QueueClearScopeAsync - 清理作用域(演示中不实际执行,避免影响后续测试)
        // await context.QueueClearScopeAsync(DemoScope, cancellationToken);
        Console.WriteLine(" QueueClearScopeAsync: 已跳过(避免清理演示数据)");
    }
    
    private async ValueTask DemonstrateOutputMechanismsAsync(IWorkflowContext context, CancellationToken cancellationToken)
    {
        Console.WriteLine("\n【2.输出机制 API 演示】");
    
        // 1. SendMessageAsync - 发送给下游 Executor
        await context.SendMessageAsync("来自 ContextExplorer 的消息", targetId: null, cancellationToken);
        await context.AddEventAsync(new ExplorationProgressEvent(
            "输出机制", 
            "SendMessageAsync", 
            "已发送消息到下游"), cancellationToken);
        Console.WriteLine(" SendMessageAsync: 已发送消息到下游 Executor");
    
        // 2. AddEventAsync - 触发自定义事件
        await context.AddEventAsync(new ExplorationProgressEvent(
            "输出机制", 
            "AddEventAsync", 
            "这是一个自定义业务事件"), cancellationToken);
        Console.WriteLine(" AddEventAsync: 已触发自定义事件");
    
        // 3. YieldOutputAsync - 输出最终结果
        var finalResult = new
        {
            探索时间 = DateTimeOffset.UtcNow,
            API总数 = 11,
            状态作用域 = DemoScope
        };
        await context.YieldOutputAsync(finalResult, cancellationToken);
        Console.WriteLine(" YieldOutputAsync: 已输出最终结果");
    }
    
    private async ValueTask DemonstrateMetadataAsync(IWorkflowContext context, CancellationToken cancellationToken)
    {
        Console.WriteLine("\n【3.元数据 API 演示】");
    
        // 1. TraceContext - 链路追踪上下文
        var traceContext = context.TraceContext;
        var traceInfo = traceContext != null 
            ? string.Join(", ", traceContext.Select(kv => $"{kv.Key}={kv.Value}"))
            : "null";
        await context.AddEventAsync(new ExplorationProgressEvent(
            "元数据", 
            "TraceContext", 
            traceInfo), cancellationToken);
        Console.WriteLine($" TraceContext: {traceInfo}");
    
        // 2. ConcurrentRunsEnabled - 并发支持
        var concurrentEnabled = context.ConcurrentRunsEnabled;
        await context.AddEventAsync(new ExplorationProgressEvent(
            "元数据", 
            "ConcurrentRunsEnabled", 
            concurrentEnabled.ToString()), cancellationToken);
        Console.WriteLine($" ConcurrentRunsEnabled: {concurrentEnabled}");
    }
}
  • 步骤 2:创建下游接收器

为了演示 SendMessageAsync 的效果,我们需要一个下游 Executor 来接收消息。

internal sealed class DownstreamReceiverExecutor() : Executor<string, string>("DownstreamReceiverExecutor")
{
    public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"DownstreamReceiver 收到消息: {message}");        
        await context.AddEventAsync(new ExplorationProgressEvent(
            "下游接收", 
            "HandleAsync", 
            $"已处理消息: {message}"), cancellationToken);    
        return "消息已接收";
    }
}
  • 步骤 3:构建工作流

将 ContextExplorerExecutor 和DownstreamReceiverExecutor 连接起来,形成完整的工作流。

var explorer = new ContextExplorerExecutor();
var receiver = new DownstreamReceiverExecutor();

var contextExplorerWorkflow = new WorkflowBuilder(explorer)
    .AddEdge(explorer, receiver)
    .WithOutputFrom(explorer) // 输出来自 explorer 的 YieldOutputAsync
    .Build();
  • 步骤 4:流式运行并观察事件

通过流式运行,我们可以实时观察每个 API 调用产生的事件。

static async Task RunContextExplorerAsync(Workflow workflow, string mode)
{
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine($"开始探索 IWorkflowContext API (模式: {mode})");
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");

    await using (var run = await InProcessExecution.StreamAsync(workflow, mode))
    {
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            switch (evt)
            {
                case WorkflowStartedEvent:
                    Console.WriteLine("[系统事件] Workflow 已启动\n");
                    break;
    
                case ExplorationProgressEvent progress:
                    Console.WriteLine($"[业务事件] 分类: {progress.Category}");
                    Console.WriteLine($" API: {progress.API}");
                    Console.WriteLine($" 结果: {progress.Result}\n");
                    break;
    
                case ExecutorCompletedEvent executorCompleted:
                    Console.WriteLine($"[系统事件] {executorCompleted.ExecutorId} 执行完成\n");
                    break;
    
                case WorkflowOutputEvent outputEvent:
                    Console.WriteLine("[输出事件] WorkflowOutputEvent:");
                    outputEvent.Display();
                    Console.WriteLine();
                    break;
    
                default:
                    Console.WriteLine($"[其他事件] {evt.GetType().Name}");
                    break;
            }
        }
    }    
    Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
}

await RunContextExplorerAsync(contextExplorerWorkflow, "完整探索");
  • SendMessageAsync targetId 精准路由实战

典型场景:Intent Router 同时连接图像、文本、兜底三个 Executor,希望只唤醒匹配的下游。

flowchart LR
    R[IntentRouter] --> I[ImageGenerator]
    R --> T[TextGenerator]
    R --> F[FallbackExecutor]
    R -.targetId=image.-> I
    R -.targetId=report.-> T
    R -.targetId=FallbackExecutor.-> F
    style R fill:#e1f5ff
    style I fill:#fff4e1
    style T fill:#f3e5f5
    style F fill:#e8f5e9
internal sealed class IntentRouterExecutor() : Executor<string, string>("IntentRouter")
{
    private readonly Dictionary<string, string> _routingTable = new(StringComparer.OrdinalIgnoreCase)
    {
        ["image"] = "ImageGenerator",
        ["report"] = "TextGenerator"
    };

    public override async ValueTask<string> HandleAsync(string intent, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        var targetId = this._routingTable.TryGetValue(intent, out var candidate) ? candidate : "FallbackExecutor";
        Console.WriteLine($"Router 判定 {intent} 应派发给 {targetId}");
    
        await context.SendMessageAsync($"指令: {intent}", targetId, cancellationToken);
        return $"消息已定向到 {targetId}";
    }
}

internal sealed class ImageGeneratorExecutor() : Executor<string, string>("ImageGenerator")
{
    public override ValueTask<string> HandleAsync(string payload, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"ImageGenerator 收到: {payload}");
        return ValueTask.FromResult("图像任务完成");
    }
}

internal sealed class TextGeneratorExecutor() : Executor<string, string>("TextGenerator")
{
    public override ValueTask<string> HandleAsync(string payload, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"📝 TextGenerator 收到: {payload}");
        return ValueTask.FromResult("文本任务完成");
    }
}

internal sealed class FallbackExecutor() : Executor<string, string>("FallbackExecutor")
{
    public override ValueTask<string> HandleAsync(string payload, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"FallbackExecutor 接管: {payload}");
        return ValueTask.FromResult("兜底处理完成");
    }
}

static async Task RunTargetIdDemoAsync(string intent)
{
    var router = new IntentRouterExecutor();
    var imageExecutor = new ImageGeneratorExecutor();
    var textExecutor = new TextGeneratorExecutor();
    var fallbackExecutor = new FallbackExecutor();

    var workflow = new WorkflowBuilder(router)
        .AddEdge(router, imageExecutor)
        .AddEdge(router, textExecutor)
        .AddEdge(router, fallbackExecutor)
        .WithOutputFrom(router)
        .Build();
    
    Console.WriteLine($"\n输入: {intent}");
    
    await using (var run = await InProcessExecution.StreamAsync(workflow, intent)){
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            if (evt is WorkflowOutputEvent outputEvent)
            {
                Console.WriteLine("Router 输出:");
                outputEvent.Display();
            }
        }
    }
}
await RunTargetIdDemoAsync("image");
await RunTargetIdDemoAsync("report");
await RunTargetIdDemoAsync("speech");

关键提示:

  • targetId 必须等于下游 Executor 的 Id(构造函数里指定)
  • 上游可以保留多条 Edge,但只有 targetId 命中的节点会收到消息
  • 适合意图路由、A/B 测试、租户隔离等需要精确派发的场景

3. 高级场景:QueueClearScopeAsync 的正确使用

QueueClearScopeAsync 用于清理整个作用域中的所有状态,这在处理敏感数据时非常重要。

场景 说明 示例
敏感数据清理 处理完用户隐私数据后立即清理 银行交易、医疗记录
临时状态回收 中间计算结果不再需要 MapReduce 的 Map 阶段结果
生命周期管理 某个业务流程结束后清理上下文 订单处理完成后清理订单缓存
测试隔离 每个测试用例结束后清理状态 单元测试、集成测试
internal sealed class SecureDataExecutor() : Executor<string, string>("SecureDataExecutor")
{
    private const string SensitiveDataScope = "SensitiveData";

    public override async ValueTask<string> HandleAsync(
        string userId, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 1. 读取敏感数据
        var userData = await LoadUserDataAsync(userId);
        await context.QueueStateUpdateAsync("userData", userData, SensitiveDataScope, cancellationToken);

        // 2. 处理业务逻辑
        var result = await ProcessSecureDataAsync(userData);

        // 3. 处理完成后立即清理敏感数据
        await context.QueueClearScopeAsync(SensitiveDataScope, cancellationToken);
        Console.WriteLine("敏感数据已清理");

        return result;
    }

    private Task<string> LoadUserDataAsync(string userId) => Task.FromResult($"UserData-{userId}");

    private Task<string> ProcessSecureDataAsync(string data) => Task.FromResult($"Processed-{data}");
}

4. 最佳实践

  • 输出机制选择指南

  • 状态管理最佳实践
实践 说明 示例
使用常量定义 Scope 避免硬编码字符串 const string MyScope = "OrderContext";
命名约定 {模块名}Scope 或 {功能}Context FileContentScope, UserSessionContext
读写分离 写操作集中在入口步骤,读操作分散在下游 Fan-out/Fan-in 模式
惰性初始化 使用 ReadOrInitStateAsync 避免重复初始化 配置、缓存、计数器
及时清理 处理完敏感数据后立即调用 QueueClearScopeAsync 用户隐私数据、临时文件
  • 元数据利用
public override async ValueTask<string> HandleAsync(
    string message, 
    IWorkflowContext context, 
    CancellationToken cancellationToken = default)
{
    // 1. 检查并发支持
    if (context.ConcurrentRunsEnabled)
    {
        Console.WriteLine("当前环境支持并发执行");
    }

    // 2. 传递链路追踪上下文
    if (context.TraceContext != null)
    {
        var traceId = context.TraceContext.GetValueOrDefault("traceId");
        Console.WriteLine($"TraceId: {traceId}");
        
        // 传递给下游服务(如 HTTP 请求)
        await CallExternalServiceAsync(traceId);
    }

    return "完成";
}
  • 常见错误与解决方案
错误 原因 解决方案
SendMessageAsync 后下游没收到 没有建立 Edge 连接 使用 AddEdge() 连接节点
状态读取返回 null Scope 名称不一致 使用常量统一管理 Scope 名称
并发冲突 多个 Executor 同时写入同一个 Key 使用不同的 Key 或加入步骤序号
事件没有触发 忘记 await AddEventAsync 确保所有异步方法都被 await