Net+AI智能体进阶8:Workflow工作流
2025-11-22 10:34:18一、工作流基础
1. 什么是 Workflow Orchestration
在企业级 AI 应用开发中,我们经常面临以下挑战:
- 单个 Agent 难以处理复杂任务:一个 Agent 无法同时擅长需求分析、代码生成、测试等多个领域
- 业务逻辑与 AI 调用耦合:复杂的条件判断、循环、并发控制分散在代码各处
- 流程难以可视化和维护:多步骤的 AI 处理流程缺乏清晰的结构
- 缺乏统一的状态管理:多个步骤之间的数据传递和状态共享容易出错
Workflow Orchestration (工作流编排) 正是为了解决这些问题而生:
- 模块化设计:将复杂任务拆分为独立的 Executor 和 Agent
- 清晰的流程定义:使用 Builder API 构建可读、可维护的流程图
- 灵活的流程控制:支持条件分支、循环迭代、并发执行等复杂模式
- 统一的状态管理:内置 WorkflowContext 管理跨步骤的状态和数据
- 实时流式反馈:通过事件机制实时监控工作流的执行进度
MAF Workflow 位于应用层和 Agent 层之间,负责:
- 编排多个 Agent:决定 Agent 的执行顺序、条件和并发策略
- 管理数据流:在 Agent 和 Executor 之间传递数据
- 监控执行状态:实时报告工作流的执行进度和结果
- 错误处理:统一处理执行过程中的异常和重试逻辑
2. Workflow 的核心构建块
Workflow 由以下三个核心概念组成:
Executor (执行器) - 处理单元:Executor 是工作流的基本处理单元,负责执行具体的业务逻辑。
特点:
- 强类型输入:TInput 定义接收的数据类型
- 强类型输出:TOutput 定义返回的数据类型
- 纯逻辑处理:可以是数据转换、验证、格式化等任何操作
- Agent 也是 Executor:AIAgent 可以直接作为 Executor 使用
示例用途:
- 文本转换 (大写、反转、清理)
- 数据验证 (格式检查、安全过滤)
- 结果聚合 (合并多个输出)
- AI 调用 (通过 Agent)
public abstract class Executor<TInput, TOutput> : Executor
{
public abstract ValueTask<TOutput> HandleAsync(
TInput input,
IWorkflowContext context,
CancellationToken cancellationToken = default);
}
Edge (边) - 数据流路径:Edge 定义了 Executor 之间的数据流动方向和连接关系。
类型:
顺序边 (Sequential Edge):A → B - 数据从 A 流向 B
条件边 (Conditional Edge):A → B or C - 根据条件选择路径
Fan-out 边:A → [B, C, D] - 一个输出分发给多个 Executor
Fan-in 边:[B, C, D] → E - 多个输出汇聚到一个 Executor
Workflow (工作流) - 完整流程定义:Workflow 是由多个 Executor 通过 Edge 连接而成的完整处理流程。
执行方式:
- 同步执行:RunAsync() - 等待完整结果
- 流式执行:StreamAsync() - 实时接收事件流
构建方式:
// 使用 WorkflowBuilder 构建工作流
WorkflowBuilder builder = new(startExecutor);
builder.AddEdge(executorA, executorB);
builder.AddEdge(executorB, executorC);
builder.WithOutputFrom(executorC);
Workflow workflow = builder.Build();
3. 第一个工作流:文本处理管道
业务场景:将用户输入的文本 → 转为大写 → 反转顺序
步骤 1:定义第一个 Executor - 大写转换。
继承 Executor<TInput, TOutput>:
TInput = string:接收字符串输入
TOutput = string:返回字符串输出
构造函数传入 "UppercaseExecutor" 作为唯一标识符
实现 HandleAsync 方法:
message:接收上一个步骤传递的数据 (对于第一个 Executor,是工作流的输入)
context:工作流上下文,用于访问共享状态、发布事件等 (后续课程详解)
cancellationToken:用于取消操作
返回值:会自动作为消息沿着 Edge 传递给下一个 Executor
业务逻辑:
使用 ToUpperInvariant() 进行文化无关的大写转换
返回 ValueTask
(高性能异步模式)
/// <summary>
/// 大写转换执行器 - 将输入文本转换为大写
/// </summary>
public class UppercaseExecutor : Executor<string, string>
{
public UppercaseExecutor() : base("UppercaseExecutor") { }
/// <summary>
/// 核心处理方法 - 将输入文本转为大写
/// </summary>
public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// 执行业务逻辑: 转为大写
string result = message.ToUpperInvariant();
// 返回处理结果 (会自动沿着 Edge 传递给下一个 Executor)
return ValueTask.FromResult(result);
}
}
- 步骤 2:定义第二个 Executor - 文本反转
/// <summary>
/// 文本反转执行器 - 将输入文本反转
/// </summary>
public class ReverseTextExecutor : Executor<string, string>
{
public ReverseTextExecutor() : base("ReverseTextExecutor") { }
/// <summary>
/// 核心处理方法 - 反转输入文本
/// </summary>
public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// 执行业务逻辑: 反转字符串
string result = string.Concat(message.Reverse());
// 返回处理结果
return ValueTask.FromResult(result);
}
步骤 3: 使用 WorkflowBuilder 构建工作流:将两个 Executor 通过 Edge 连接起来,组成完整的工作流。
new WorkflowBuilder(uppercaseExecutor):
创建 WorkflowBuilder 并指定起始 Executor
工作流的输入数据会首先传递给这个 Executor
AddEdge(source, target):
添加一条有向边 (Edge),定义数据流动方向
source 的输出会自动传递给 target 的输入
类型检查:source 的 TOutput 必须匹配 target 的 TInput
WithOutputFrom(executor):
指定工作流的最终输出来自哪个 Executor
如果不调用此方法,默认输出来自最后一个 Executor
Build():
完成构建,返回一个不可变的 Workflow 对象
此时会进行拓扑验证:检查是否有循环依赖、孤立节点等
// 创建 Executor 实例
UppercaseExecutor uppercaseExecutor = new UppercaseExecutor();
ReverseTextExecutor reverseExecutor = new ReverseTextExecutor();
// 使用 WorkflowBuilder 构建工作流
WorkflowBuilder builder = new WorkflowBuilder(uppercaseExecutor); // 指定起始 Executor
builder.AddEdge(uppercaseExecutor, reverseExecutor); // 添加边: uppercase → reverse
builder.WithOutputFrom(reverseExecutor); // 指定输出来自 reverse
// 构建最终的工作流对象
Workflow workflow = builder.Build();
Console.WriteLine("工作流构建完成");
new
{
起始节点 = "UppercaseExecutor",
边 = "UppercaseExecutor → ReverseTextExecutor",
输出节点 = "ReverseTextExecutor"
}.Display();
步骤 4: 同步执行工作流
InProcessExecution.RunAsync(workflow, input):
在当前进程内执行工作流 (同步等待完成)
input 参数会传递给起始 Executor
返回 Run 对象,包含执行结果和事件
Run.NewEvents:
获取工作流执行过程中产生的所有事件
主要事件类型:
WorkflowStartedEvent:工作流启动
ExecutorCompletedEvent:某个 Executor 执行完成
WorkflowCompletedEvent:工作流完成
ExecutorCompletedEvent:
ExecutorId:执行器的唯一标识符
Data:执行器的输出数据 (object 类型,需要转换)
事件按执行顺序排列
// 执行工作流 - 同步模式
string input = "Hello, World!";
Console.WriteLine($"输入: {input}");
Console.WriteLine("开始执行工作流...\n");
await using (Run run = await InProcessExecution.RunAsync(workflow, input))
{
// 处理执行结果
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("执行结果:\n");
foreach (WorkflowEvent evt in run.NewEvents)
{
if (evt is ExecutorCompletedEvent executorCompleted)
{
new
{
执行器 = executorCompleted.ExecutorId,
输出数据 = executorCompleted.Data
}.Display();
Console.WriteLine();
}
}
Console.WriteLine("工作流执行完成");
}
4. 流式执行
流式执行允许我们在工作流运行过程中实时接收事件流,而不是等待全部完成。
sequenceDiagram
participant App as 应用代码
participant WF as Workflow
participant E1 as UppercaseExecutor
participant E2 as ReverseTextExecutor
App->>WF: StreamAsync(input)
WF->>E1: 执行
E1-->>App: ExecutorCompletedEvent (立即返回)
WF->>E2: 执行
E2-->>App: ExecutorCompletedEvent (立即返回)
WF-->>App: WorkflowCompletedEvent
对比:
| 特性 | 同步执行 (RunAsync) | 流式执行 (StreamAsync) |
|---|---|---|
| 执行模式 | 阻塞等待全部完成 | 返回异步流 (IAsyncEnumerable) |
| 事件获取 | 一次性获取所有事件 | 实时逐个接收事件 |
| 适用场景 | 短时间任务、批处理 | 长时间任务、实时反馈 |
| 用户体验 | 等待后一次性显示结果 | 逐步显示进度 |
步骤 5:使用 StreamAsync 实现流式执行
InProcessExecution.StreamAsync(workflow, input):
启动流式执行,立即返回 StreamingRun 对象
不会阻塞等待工作流完成
streamingRun.WatchStreamAsync():
返回 IAsyncEnumerable
异步流 使用 await foreach 实时迭代事件
每当一个 Executor 完成,就会产生一个新的 ExecutorCompletedEvent
时间戳 DateTime.Now:
演示事件是逐个实时产生的
在实际的长时间任务中,你会看到明显的时间间隔
主要事件类型:
WorkflowStartedEvent:工作流开始
ExecutorCompletedEvent:单个 Executor 完成
WorkflowOutputEvent:整个工作流完成
// 执行工作流 - 流式模式
string streamingInput = "Workflow Streaming!";
Console.WriteLine($"输入: {streamingInput}");
Console.WriteLine("开始流式执行工作流...\n");
await using (StreamingRun streamingRun = await InProcessExecution.StreamAsync(workflow, streamingInput))
{
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("实时执行进度:\n");
// 实时监听事件流
await foreach (WorkflowEvent evt in streamingRun.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent executorCompleted)
{
Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {executorCompleted.ExecutorId} 完成");
new
{
输出数据 = executorCompleted.Data
}.Display();
Console.WriteLine();
}
else if (evt is WorkflowStartedEvent)
{
Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 工作流启动\n");
}
else if (evt is WorkflowOutputEvent)
{
Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 工作流完成");
Console.WriteLine($"工作流输出:{evt.Data}");
}
}
}
5. 简化写法
使用 Lambda 表达式创建 Executor:对于简单的转换逻辑,每次都定义完整的 Executor 类显得过于繁琐。MAF 提供了 BindAsExecutor 扩展方法,允许我们将普通函数绑定为 Executor。
// 使用 Lambda 表达式定义转换逻辑
Func<string, string> toUpperFunc = input => input.ToUpperInvariant();
Func<string, string> reverseFunc = input => string.Concat(input.Reverse());
// 将函数绑定为 Executor
var upperExecutor = toUpperFunc.BindAsExecutor("UpperExecutor");
var reverseExecutor2 = reverseFunc.BindAsExecutor("ReverseExecutor");
// 构建工作流 (与之前相同的流程)
WorkflowBuilder lambdaBuilder = new WorkflowBuilder(upperExecutor);
lambdaBuilder.AddEdge(upperExecutor, reverseExecutor2);
lambdaBuilder.WithOutputFrom(reverseExecutor2);
Workflow lambdaWorkflow = lambdaBuilder.Build();
代码解析:
BindAsExecutor(name):
将 Func<TInput, TOutput> 绑定为 Executor<TInput, TOutput>
参数 name:Executor 的唯一标识符
限制:只适用于同步、无副作用的纯函数转换
适用场景:
数据格式转换 (大写、小写、Trim)
简单计算 (字符串拼接、数学运算)
数据映射 (DTO 转换)
不适用场景
需要访问 IWorkflowContext 的逻辑
需要异步操作 (如数据库查询、API调用)
复杂的状态管理
建议:
- 简单转换用 Lambda
- 复杂逻辑用完整的 Executor 类
二、集成 Agent 到工作流
1. 为什么要 Agent in Workflow
在工作流中,我们有两种主要的执行单元:
| 特性 | Executor | Agent |
|---|---|---|
| 定义 | 同步/异步的业务逻辑处理单元 | 基于 LLM 的智能对话式处理单元 |
| 适用场景 | 确定性逻辑(格式化、验证、计算) | 需要智能判断、生成、理解的任务 |
| 输入输出 | 任意类型 (TInput/TOutput) |
ChatMessage |
| 执行方式 | 立即执行 | 需要 TurnToken 触发 |
| 成本 | 几乎无成本 | LLM API 调用成本 |
| 可预测性 | 100% 可预测 | 基于模型能力,存在不确定性 |
将 Agent 集成到工作流的优势:
- 自动化数据流:工作流引擎自动管理 Agent 之间的消息传递
- 统一监控:通过 WorkflowEvent 统一监听所有 Agent 的执行状态
- 灵活组合:Agent 可以与 Executor 自由组合,构建复杂业务逻辑
- 错误处理:工作流级别的异常处理和重试机制
- 可视化:清晰的流程图,便于理解和维护
2. 将 Agent 添加到工作流
- 创建一个简单的翻译 Agent
// 创建一个法语翻译 Agent
var frenchAgent = new ChatClientAgent(
chatClient,
"You are a translation assistant that translates the provided text to French."
);
- 将 Agent 添加到工作流
// 构建包含单个 Agent 的工作流
var simpleWorkflow = new WorkflowBuilder(frenchAgent)
.Build();
- 执行工作流 - TurnToken 机制详解
关键概念:当 Agent 被用作工作流步骤时,它不会立即执行。需要通过 TurnToken 来触发 Agent 的处理。
sequenceDiagram
participant User as 用户
participant Workflow as 工作流引擎
participant Agent as Agent (Executor)
participant LLM as LLM
User->>Workflow: StreamAsync(input)
Workflow->>Agent: 传递 ChatMessage
Note over Agent: 消息被缓存,等待触发
User->>Workflow: TrySendMessageAsync(TurnToken)
Workflow->>Agent: 传递 TurnToken
Agent->>LLM: 发送请求
LLM-->>Agent: 返回响应
Agent-->>Workflow: AgentRunUpdateEvent
Workflow-->>User: 流式输出事件
// 执行工作流
Console.WriteLine("开始执行工作流...");
Console.WriteLine();
var input = new ChatMessage(ChatRole.User, "Hello World!");
await using (StreamingRun run = await InProcessExecution.StreamAsync(simpleWorkflow, input))
{
Console.WriteLine("发送 TurnToken 触发 Agent...");
// 关键步骤:发送 TurnToken 来触发 Agent 执行
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
Console.WriteLine();
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("监听工作流事件...");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine();
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentRunUpdateEvent agentUpdate)
{
Console.WriteLine($"Agent 输出: {agentUpdate.ExecutorId}");
Console.WriteLine($"内容: {agentUpdate.Data}");
Console.WriteLine();
}
}
}
3. 关键概念解析
为什么需要 TurnToken?
在对话式 AI 系统中,Agent 可能需要等待多条消息(如用户输入、工具调用结果)后再统一处理。
TurnToken作为一个 的信号,告诉 Agent:"所有输入已就绪,可以开始处理了"
"这是一个完整的对话回合"
TurnToken 的参数
emitEvents:是否发出执行事件(如
AgentRunUpdateEvent)设置为 true 可以实时监控 Agent 的输出
new TurnToken(emitEvents: true)
AgentRunUpdateEvent
这是 Agent 在工作流中执行时发出的事件:
public class AgentRunUpdateEvent : WorkflowEvent
{
public string ExecutorId { get; } // Agent 的 ID
public object Data { get; } // Agent 的输出内容
public string Status { get; } // 执行状态
}
4. 实战:翻译链
让我们构建一个更有趣的案例:将英文依次翻译为法语、西班牙语,最后再翻译回英语,看看经过三次翻译后内容会有什么变化。
- 创建三个翻译 Agent
// 定义一个辅助方法来创建翻译 Agent
AIAgent CreateTranslationAgent(string targetLanguage, IChatClient client)
{
return new ChatClientAgent(
client,
$"You are a translation assistant that translates the provided text to {targetLanguage}."
);
}
// 创建三个翻译 Agent
var frenchTranslator = CreateTranslationAgent("French", chatClient);
var spanishTranslator = CreateTranslationAgent("Spanish", chatClient);
var englishTranslator = CreateTranslationAgent("English", chatClient);
- 构建翻译链工作流:使用 AddEdge 将三个 Agent 顺序连接
// 构建工作流:English → French → Spanish → English
var translationChainWorkflow = new WorkflowBuilder(frenchTranslator)
.AddEdge(frenchTranslator, spanishTranslator)
.AddEdge(spanishTranslator, englishTranslator)
.Build();
- 执行翻译链
Console.WriteLine("开始执行翻译链工作流...");
Console.WriteLine();
var inputMessage = new ChatMessage(ChatRole.User, "Artificial Intelligence is transforming the world!");
Console.WriteLine($"原始输入: {inputMessage.Text}");
Console.WriteLine();
await using (StreamingRun chainRun = await InProcessExecution.StreamAsync(translationChainWorkflow, inputMessage)){
// 发送 TurnToken 触发所有 Agent
await chainRun.TrySendMessageAsync(new TurnToken(emitEvents: true));
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("翻译过程跟踪");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine();
int stepNumber = 1;
await foreach (WorkflowEvent evt in chainRun.WatchStreamAsync())
{
if (evt is AgentRunUpdateEvent agentEvent)
{
Console.WriteLine($"Step {stepNumber}: {agentEvent.ExecutorId}");
Console.WriteLine($" 翻译结果: {agentEvent.Data}");
Console.WriteLine();
stepNumber++;
}
}
Console.WriteLine("翻译链执行完成");
}
观察与分析
通过上面的输出,会发现:
- 顺序执行:三个 Agent 按照定义的顺序依次执行
- 自动传递:每个 Agent 的输出自动成为下一个 Agent 的输入
- 实时反馈:通过 AgentRunUpdateEvent 可以实时看到每一步的翻译结果
- 语义变化:经过多次翻译后,原始语义可能会有细微变化(这是 LLM 的特性)
5. 核心概念深入
Agent 作为 Executor 的内部机制,当将 AIAgent 添加到工作流时,MAF 会自动将其封装为一个特殊的 Executor。
关键特性:
消息缓存:Agent Wrapper 会缓存收到的 ChatMessage
等待触发:只有收到 TurnToken 才会调用内部的 Agent.RunAsync()
事件发射:执行过程中会发出 AgentRunUpdateEvent
类型固定:输入输出都是 ChatMessage
TurnToken 的作用域:单个 TurnToken 触发所有 Agent,在一个工作流中,只需发送一次 TurnToken,它会触发所有等待中的 Agent。
sequenceDiagram
participant User
participant Workflow
participant Agent1
participant Agent2
User->>Workflow: TurnToken
Workflow->>Agent1: 传递 TurnToken
Agent1->>Agent1: 处理并输出
Agent1->>Workflow: 输出 (ChatMessage)
Workflow->>Agent2: 传递输出 + TurnToken
Agent2->>Agent2: 处理并输出
Agent2->>Workflow: 输出 (ChatMessage)
AgentRunUpdateEvent 详解,这是监控 Agent 执行的核心事件。
AgentRunUpdateEvent 的用途:
- 实时进度:显示 Agent 的执行进度给用户
- 调试:检查每个 Agent 的输入输出
- 日志记录:记录完整的执行链路
- 错误诊断:发现哪个 Agent 出现了问题
// 典型的事件处理模式
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentRunUpdateEvent agentEvent)
{
// 事件属性
Console.WriteLine($"Agent ID: {agentEvent.ExecutorId}");
Console.WriteLine($"数据: {agentEvent.Data}");
Console.WriteLine($"状态: {agentEvent.Status}");
Console.WriteLine($"时间戳: {agentEvent.Timestamp}");
}
}
5. 最佳实践
何时使用 Agent vs Executor
| 场景 | 推荐 | 理由 |
|---|---|---|
| 文本格式化、验证 | Executor | 确定性逻辑,无需 LLM |
| 内容生成、改写 | Agent | 需要理解和创造性 |
| 数学计算、数据转换 | Executor | 精确计算,成本低 |
| 意图识别、分类 | Agent | 需要语义理解 |
| API 调用、数据库查询 | Executor | 确定性操作 |
| 多轮对话、推理 | Agent | 需要上下文理解 |
Agent 在工作流中的命名规范
推荐的命名模式:
- 职责导向:名称应反映 Agent 的具体职责
- 动词+名词:如 translateText, reviewCode
- 避免泛化:不要使用 agent1, helper 等通用名称
- 一致性:在同一工作流中保持命名风格一致
// 好的命名 - 清晰描述职责
var frenchTranslator = CreateTranslationAgent("French", client);
var contentReviewer = new ChatClientAgent(client, "Review content for quality");
var codeGenerator = new ChatClientAgent(client, "Generate C# code");
// 不好的命名 - 过于抽象
var agent1 = new ChatClientAgent(...);
var helper = new ChatClientAgent(...);
错误处理策略
在工作流中使用 Agent 时的错误处理:
// 带错误处理的 Agent 工作流执行
async Task<bool> ExecuteWorkflowWithErrorHandlingAsync(Workflow workflow, ChatMessage input, int maxRetries = 3)
{
for (int attempt = 1; attempt <= maxRetries; attempt++)
{
try
{
Console.WriteLine($"尝试执行 (第 {attempt} 次)...");
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, input);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentRunUpdateEvent agentEvent)
{
// 检查 Agent 输出是否有效
if (agentEvent.Data == null || string.IsNullOrWhiteSpace(agentEvent.Data.ToString()))
{
throw new InvalidOperationException($"Agent {agentEvent.ExecutorId} 返回了空结果");
}
}
}
Console.WriteLine("工作流执行成功");
return true;
}
catch (Exception ex)
{
Console.WriteLine($"执行失败: {ex.Message}");
if (attempt < maxRetries)
{
var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt)); // 指数退避
Console.WriteLine($"等待 {delay.TotalSeconds} 秒后重试...");
await Task.Delay(delay);
}
else
{
Console.WriteLine($"达到最大重试次数,放弃执行");
return false;
}
}
}
return false;
}
性能优化建议
- 减少不必要的 Agent 调用
// 不好的做法 - 每次都调用 Agent
var validatorAgent = new ChatClientAgent(client, "Validate input format");
// 好的做法 - 用 Executor 处理格式验证
public class FormatValidatorExecutor : Executor<string, string>
{
protected override Task<string> HandleAsync(string input, IWorkflowContext context, CancellationToken ct)
{
if (string.IsNullOrWhiteSpace(input))
throw new ArgumentException("输入不能为空");
return Task.FromResult(input);
}
}
- 合理使用 emitEvents
// 生产环境:可以关闭事件以提高性能
await run.TrySendMessageAsync(new TurnToken(emitEvents: false));
// 开发/调试:开启事件以便监控
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
Agent 粒度控制
太细:多个简单 Agent 导致过多的 LLM 调用
太粗:单个复杂 Agent 难以复用和维护
适中:每个 Agent 负责一个明确的子任务
三、Workflow as Agent(工作流即 Agent)
1. 为什么需要 Workflow as Agent
在实际应用中,我们经常需要将多个 Agent 组成的协作工作流作为一个整体单元来使用:
| 特性 | 独立的 Workflow | Workflow as Agent |
|---|---|---|
| 执行方式 | StreamAsync() + TurnToken | RunAsync() / RunStreamingAsync() |
| 输入类型 | ChatMessage + 手动发送 TurnToken | ChatMessage / string (自动处理) |
| 输出类型 | WorkflowEvent 流 | AgentRunResponse |
| 接口 | Workflow | AIAgent |
| 复用方式 | 需要手动管理执行流程 | 作为工具/插件/子Agent |
问题:直接使用 Workflow 接口存在以下挑战
- 调用复杂:需要手动创建 StreamingRun、发送 TurnToken、监听事件
- 接口不统一:Workflow 和 Agent 使用不同的调用方式
- 复用困难:无法将 Workflow 直接作为 Agent 的工具
- 组合受限:无法在其他 Agent 中轻松集成 Workflow
Workflow as Agent 的解决方案
AsAgent() 扩展方法将基于 Agent 的 Workflow 包装为标准的 AIAgent 接口,使其可以像普通 Agent 一样使用
- 核心优势:
- 统一接口:使用 AIAgent 接口调用 Workflow
- 无缝集成:可以作为 Agent 的工具使用
- 跨协议复用:支持 A2A、MCP 等协议
- 生态兼容:享受 AIAgent 生态的所有中间件
- 使用场景
- 复杂任务封装:将多步骤的业务逻辑封装为可复用的 Agent
- 跨系统集成:需要在 A2A 协作中使用 Workflow
- 工具化封装:将 Workflow 作为其他 Agent 的工具
- 中间件增强:需要对 Workflow 应用 AIAgent 中间件
- 不建议场景
- 简单任务:如果只是简单的顺序调用,直接使用 WorkflowBuilder 即可
- 性能敏感:WorkflowAgent 会增加一层适配开销
2. 创建多 Agent 协助工作流
我们使用前面创建的多Agent 翻译链工作流,现在我们将这个 Workflow 封装为一个 Agent,使其可以像普通 Agent 一样通过 AIAgent 接口调用。
AsAgent() 方法将 Workflow 包装为一个 AIAgent,内部自动处理:
- 接收 ChatMessage 输入:提取用户消息
- 自动管理 TurnToken:无需手动发送 TurnToken
- 返回 AgentRunResponse:将 Workflow 输出包装为标准响应
sequenceDiagram
participant User as 调用方
participant WA as Workflow Agent
participant WF as Workflow
User->>WA: RunAsync(ChatMessage)
WA->>WA: 内部创建 StreamingRun
WA->>WF: 自动发送 TurnToken
WF->>WF: 执行所有 Agent
WF-->>WA: WorkflowEvent 流
WA->>WA: 包装为 AgentRunResponse
WA-->>User: AgentRunResponse
重要限制:ChatProtocol 支持要求
并非所有 Workflow 都能转换为 Agent! AsAgent() 方法有严格的协议要求,只有满足 ChatProtocol 的 Workflow 才能使用 AsAgent(),必须同时支持:
- 输入类型:List
或 ChatMessage - 触发类型:TurnToken(用于触发 Agent 执行)
如果 Workflow 不满足这两个条件,调用 AsAgent() 会抛出异常:
// 错误示例:基于 Executor 的 Workflow
var workflow = new WorkflowBuilder()
.AddExecutor(new MyCustomExecutor()) // 使用自定义 Executor
.Build();
// 运行时报错!
var agent = workflow.AsAgent("my-agent", "MyAgent");
// System.InvalidOperationException:
// Workflow does not support ChatProtocol:
// At least List<ChatMessage> and TurnToken must be supported as input.
正确做法:使用 AgentWorkflowBuilder,要让 Workflow 支持 AsAgent(),必须使用 AgentWorkflowBuilder 创建基于 Agent 的 Workflow
// 正确:使用 AgentWorkflowBuilder
var workflow = AgentWorkflowBuilder.BuildSequential(
"my-workflow",
[agent1, agent2, agent3] // 使用 AIAgent 实例
);
var workflowAgent = workflow.AsAgent("my-agent", "MyAgent"); // 成功
为什么有这个限制?
核心原因:
- AIAgent 接口的 RunAsync() 方法接收 ChatMessage 作为输入
- Agent 通过 TurnToken 触发执行
- Executor-based Workflow 使用自定义输入类型(如 string),不符合 ChatProtocol 规范
| Workflow 类型 | 输入类型 | 是否支持 TurnToken | 能否转 Agent |
|---|---|---|---|
| Executor-based(WorkflowBuilder) | 自定义类型(如 string, MyData) | 不支持 | 不能 |
| Agent-based(AgentWorkflowBuilder) | ChatMessage | 支持 | 可以 |
将 Workflow 转换为 Agent
// 使用 AsAgent() 将 Workflow 转换为 AIAgent
var translationWorkflowAgent = translationWorkflow.AsAgent(
id: "translation-workflow",
name: "TranslationChain"
);
3. 测试 Workflow Agent
- 使用 RunAsync 调用
// 使用 AIAgent 接口调用 Workflow Agent
var userMessage = "The future belongs to those who believe in the beauty of their dreams.";
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("用户输入:");
Console.WriteLine($" '{userMessage}'");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
// 创建 Thread
var thread = translationWorkflowAgent.GetNewThread();
// 关键:像调用普通 Agent 一样调用 Workflow Agent
var response = await translationWorkflowAgent.RunAsync(userMessage, thread);
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("Workflow Agent 响应:");
foreach (var msg in response.Messages)
{
if (msg.Role == ChatRole.Assistant)
{
Console.WriteLine($" {msg.Text}");
}
}
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
- 使用 RunStreamingAsync 流式调用
// 使用流式调用
var streamingMessage = "Technology is best when it brings people together.";
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("用户输入 (流式):");
Console.WriteLine($" '{streamingMessage}'");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
Console.WriteLine("流式输出:");
var streamThread = translationWorkflowAgent.GetNewThread();
await foreach (var update in translationWorkflowAgent.RunStreamingAsync(streamingMessage, streamThread))
{
if (!string.IsNullOrEmpty(update.Text))
{
Console.WriteLine($" {update.Text}");
}
}
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("流式调用完成");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
4. 将 Workflow Agent 作为工具使用
现在让我们演示 Workflow Agent 最强大的功能:将复杂的多 Agent 协作流程作为工具提供给其他 Agent 使用。
场景说明,我们要创建一个 语言专家 Agent,它需要:
- 接收用户请求(如"测试这段文字的翻译一致性")
- 调用翻译链 Workflow (Workflow Agent) 进行多轮翻译
- 分析并报告翻译质量
这是一个典型的 Agent 调用 Workflow Agent"场景。
- 将 Workflow Agent 转换为 AIFunction
// Workflow Agent 可以直接转换为 AIFunction
var translationChainFunction = translationWorkflowAgent.AsAIFunction(
new AIFunctionFactoryOptions
{
Name = "translation_chain",
Description = "对文本进行多轮翻译测试:英文→法语→西班牙语→英文,用于测试翻译一致性"
}
);
- 创建内容审核 Agent (使用 WorkflowAgent 作为工具)
// 创建语言专家 Agent,配置翻译链工具
var linguistAgent = chatClient.CreateAIAgent(
instructions: @"你是一位语言学专家。你可以使用 translation_chain 工具来测试文本的翻译一致性。
工作流程:
1. 使用 translation_chain 工具对用户提供的文本进行多轮翻译
2. 比较原始文本和最终翻译结果
3. 分析翻译过程中的语义变化
4. 提供翻译质量评估和改进建议
返回格式化的分析报告。",
name: "LinguistExpert",
tools: [translationChainFunction]
);
Console.WriteLine("语言专家 Agent 创建完成");
Console.WriteLine(" 配置工具: translation_chain (Workflow Agent)");
- 测试:Agent 自动调用 WorkflowAgent
// 创建线程并提交请求
var linguistThread = linguistAgent.GetNewThread();
var userRequest = "请测试这段文字的翻译一致性: The only way to do great work is to love what you do.";
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("用户请求:");
Console.WriteLine($" {userRequest}");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
Console.WriteLine("语言专家处理中(将自动调用翻译链工具)...\n");
// Agent 会自动调用 translation_chain 工具 (Workflow Agent)
var response = await linguistAgent.RunAsync(userRequest, linguistThread);
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("分析报告:");
foreach (var msg in response.Messages)
{
if (msg.Role == ChatRole.Assistant)
{
Console.WriteLine($"\n{msg.Text}");
}
}
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
5. 直接调用 vs WorkflowAgent
对比总结
| 特性 | 直接调用 Workflow | 使用 WorkflowAgent |
|---|---|---|
| 调用方式 | RunAsync(TInput) | RunAsync(ChatMessage) |
| 接口 | Workflow | AIAgent |
| 输入类型 | 强类型 (string) | ChatMessage |
| 输出类型 | 强类型 (string) | AgentRunResponse |
| 作为工具 | 不支持 | 支持 |
| 中间件 | 不支持 | 支持 AIAgent 中间件 |
| A2A 协作 | 不支持 | 支持 |
| MCP 集成 | 不支持 | 支持 |
| 性能开销 | 低 | 稍高 (适配层) |
架构对比图
flowchart TB
subgraph "直接调用 Workflow"
A1[应用代码] -->|RunAsync| B1[Workflow]
B1 -->|string| A1
style A1 fill:#FF5252,stroke:#C62828,stroke-width:2px,color:#fff
style B1 fill:#FF5252,stroke:#C62828,stroke-width:2px,color:#fff
end
subgraph "WorkflowAgent 封装"
A2[Agent/应用] -->|RunAsync| B2[WorkflowAgent]
B2 -->|调用| C2[Workflow]
C2 -->|结果| B2
B2 -->|AgentRunResponse| A2
B2 -.可应用.-> D2[中间件]
B2 -.可注册为.-> E2[Agent工具]
B2 -.可用于.-> F2[A2A/MCP]
style A2 fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
style B2 fill:#2196F3,stroke:#1565C0,stroke-width:2px,color:#fff
style C2 fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
end
6. 最佳实践
何时使用 WorkflowAgent?
推荐场景:
需要将 Workflow 作为工具提供给其他 Agent
需要在 A2A 协作中使用 Workflow
需要对 Workflow 应用 AIAgent 中间件 (如日志、缓存)
需要通过 MCP 协议暴露 Workflow
构建可复用的业务逻辑组件
不推荐场景:
简单的内部调用,直接使用 RunAsync 更高效
性能敏感的场景 (适配层会有额外开销)
不需要对外暴露的内部工作流
使用建议
- 命名规范:WorkflowAgent 转为工具时,使用清晰的名称和描述
var tool = workflowAgent.AsAIFunction(new AIFunctionFactoryOptions
{
Name = "clean_data", // 动词_名词格式
Description = "清晰描述功能和用途"
});
- 错误处理:确保 Workflow 内部有完善的错误处理
protected override async Task<string> ExecuteAsync(...)
{
try {
// 业务逻辑
} catch (Exception ex) {
// 记录日志并返回友好错误信息
return $"处理失败: {ex.Message}";
}
}
- 性能优化:对于频繁调用的 WorkflowAgent,考虑添加缓存中间件
// 示例:使用缓存中间件 (需自行实现或引用相关库)
// var cachedWorkflowAgent = new CachingAgent(workflowAgent);
四、工作流的事件机制
| 事件类别 | 作用域 | 代表事件 | 使用频率 |
|---|---|---|---|
| Executor 事件 | 单个执行器 | ExecutorCompletedEvent | ⭐⭐⭐⭐⭐ |
| SuperStep 事件 | 执行步骤 | SuperStepCompletedEvent | ⭐⭐ |
| Workflow 事件 | 整个工作流 | WorkflowOutputEvent | ⭐⭐⭐⭐⭐ |
| Agent 事件 | AI Agent | AgentRunUpdateEvent | ⭐⭐⭐⭐ |
| 交互事件 | 人机协作 | RequestInfoEvent | ⭐⭐⭐ |
1. WorkflowEvent 基类剖析
- 类定义
public class WorkflowEvent
{
// 事件携带的数据负载
public object? Data { get; }
// 构造函数
public WorkflowEvent(object? data = null)
{
Data = data;
}
// 提供友好的字符串表示
public override string ToString() =>
Data is not null ?
$"{GetType().Name}(Data: {Data.GetType()} = {Data})" :
$"{GetType().Name}()";
}
- 核心特性
| 特性 | 说明 | 示例 |
|---|---|---|
| 通用性 | 所有事件的基类,提供统一接口 | 可以用 WorkflowEvent 类型接收所有事件 |
| 数据承载 | Data 属性存储事件的业务数据 | 执行结果、错误信息、状态对象等 |
| 可扩展 | 子类可以添加特定属性 | ExecutorEvent 添加 ExecutorId |
| 调试友好 | ToString() 提供清晰的事件描述 | 便于日志输出和调试 |
ExecutorEvent - Executor 作用域事件,ExecutorEvent 是最常用的事件类别,它表示与特定 Executor 执行相关的事件。
关键属性
- ExecutorId:标识哪个 Executor 触发了这个事件
- 用于追踪执行流程
- 区分并发执行的不同 Executor
- 实现基于 Executor 的过滤逻辑
四大子事件
- ExecutorInvokedEvent:开始执行
- ExecutorCompletedEvent:成功完成
- ExecutorFailedEvent:执行失败
- AgentRunUpdateEvent:Agent 更新
- ExecutorId:标识哪个 Executor 触发了这个事件
public class ExecutorEvent : WorkflowEvent
{
// 触发此事件的 Executor 的唯一标识符
public string ExecutorId { get; }
public ExecutorEvent(string executorId, object? data) : base(data)
{
ExecutorId = executorId;
}
public override string ToString() =>
Data is not null ?
$"{GetType().Name}(Executor = {ExecutorId}, Data: {Data.GetType()} = {Data})" :
$"{GetType().Name}(Executor = {ExecutorId})";
}
SuperStepEvent - 执行步骤事件,SuperStepEvent 用于表示工作流的执行步骤(SuperStep)级别的事件。
在 MAF Workflow 中,SuperStep 是一个执行单元,可以包含:
- 一个或多个 Executor 的并发执行
- 一个完整的顺序执行链
- 两个子事件
| 事件类型 | 触发时机 | 主要用途 |
|---|---|---|
| SuperStepStartedEvent | SuperStep 开始执行 | 监控执行进度 |
| SuperStepCompletedEvent | SuperStep 完成执行 | 调试、Checkpoint 保存 |
使用场景:在调试复杂的并发工作流时,SuperStep 事件可以帮助你理解执行的阶段划分。
public class SuperStepEvent : WorkflowEvent
{
// SuperStep 的索引(从 0 开始)
public int StepNumber { get; }
public SuperStepEvent(int stepNumber, object? data = null) : base(data)
{
StepNumber = stepNumber;
}
}
2. 流式执行机制详解
MAF Workflow 提供两种执行模式:
| 执行模式 | 方法 | 特点 | 适用场景 |
|---|---|---|---|
| 同步执行 | RunAsync() | 等待完成后返回结果 | 简单工作流、不需要中间状态 |
| 流式执行 | StreamAsync() | 实时返回事件流 | 需要进度监控、Agent 工作流 |
sequenceDiagram
participant User as 用户代码
participant Workflow as 工作流引擎
participant Executor as Executors
Note over User,Executor: 同步执行 (RunAsync)
User->>Workflow: RunAsync(input)
Workflow->>Executor: 执行所有步骤
Executor-->>Workflow: 返回结果
Workflow-->>User: 返回 Run (包含结果)
Note over User: 阻塞等待完成
Note over User,Executor: 流式执行 (StreamAsync)
User->>Workflow: StreamAsync(input)
Workflow-->>User: 返回 StreamingRun
Note over User: 立即返回
User->>Workflow: WatchStreamAsync()
loop 执行过程中
Workflow->>Executor: 执行步骤
Executor-->>Workflow: 产生事件
Workflow-->>User: 实时推送事件
Note over User: 实时处理
end
- 核心 API:WatchStreamAsync:是流式执行的核心方法,它返回一个 IAsyncEnumerable
,可以使用 await foreach 遍历。
// 1. 启动流式执行
StreamingRun run = await InProcessExecution.StreamAsync(workflow, input);
// 2. 订阅事件流
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
// 3. 处理每个事件
Console.WriteLine($"收到事件: {evt.GetType().Name}");
}
关键概念
| 概念 | 说明 |
|---|---|
| StreamingRun | 流式执行的句柄,用于监听事件和发送消息 |
| IAsyncEnumerable | 异步流,事件按产生顺序依次返回 |
| await foreach | 异步迭代,每收到一个事件就处理一次 |
| 实时性 | 事件一产生就推送,不等待工作流完成 |
执行流程图
flowchart TB
A[StreamAsync] --> B[返回 StreamingRun]
B --> C[WatchStreamAsync]
C --> D[await foreach 循环]
D --> E{收到事件?}
E -->|是| F[处理事件]
F --> E
E -->|工作流完成| G[退出循环]
H[Workflow Engine] -.产生事件.-> E
style A fill:#2196F3,stroke:#1565C0,stroke-width:2px,color:#fff
style C fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
style F fill:#FF9800,stroke:#E65100,stroke-width:2px,color:#fff
- 实验 1:观察基础事件流
我们使用前面章节创建的文本处理管道
// 构建工作流
var uppercaseExecutor = new UppercaseExecutor();
var reverseExecutor = new ReverseExecutor();
var simpleWorkflow = new WorkflowBuilder(uppercaseExecutor)
.AddEdge(uppercaseExecutor, reverseExecutor)
.WithOutputFrom(reverseExecutor)
.Build();
// 执行工作流并监听所有事件
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("开始执行工作流 (流式模式)");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
var input = "Hello Workflow Events!";
Console.WriteLine($"输入: \"{input}\"\n");
// 启动流式执行
StreamingRun run = await InProcessExecution.StreamAsync(simpleWorkflow, input);
Console.WriteLine("开始监听事件流...\n");
Console.WriteLine(new string('─', 60));
int eventCount = 0;
// 订阅事件流
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
eventCount++;
// 显示事件的完整信息
Console.WriteLine($"\n事件 #{eventCount}");
Console.WriteLine($" 类型: {evt.GetType().Name}");
// // 根据事件类型显示详细信息
// switch (evt)
// {
// case WorkflowStartedEvent:
// Console.WriteLine($" 说明: 工作流开始执行");
// break;
// case ExecutorInvokedEvent invokedEvent:
// Console.WriteLine($" Executor: {invokedEvent.ExecutorId}");
// Console.WriteLine($" 说明: Executor 开始执行");
// Console.WriteLine($" 输入: {invokedEvent.Data}");
// break;
// case ExecutorCompletedEvent completedEvent:
// Console.WriteLine($" Executor: {completedEvent.ExecutorId}");
// Console.WriteLine($" 说明: Executor 执行完成");
// Console.WriteLine($" 输出: {completedEvent.Data}");
// break;
// case SuperStepCompletedEvent stepEvent:
// Console.WriteLine($" SuperStep: #{stepEvent.StepNumber}");
// Console.WriteLine($" 说明: 执行步骤完成");
// break;
// case WorkflowOutputEvent outputEvent:
// Console.WriteLine($" 来源: {outputEvent.SourceId}");
// Console.WriteLine($" 说明: 工作流产生输出");
// Console.WriteLine($" 结果: {outputEvent.Data}");
// break;
// default:
// Console.WriteLine($" 数据: {evt.Data}");
// break;
// }
Console.WriteLine(new string('─', 60));
}
Console.WriteLine($"\n工作流执行完成,共产生 {eventCount} 个事件");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
通过上面的实验,我们可以观察到一个典型的事件序列:
1. SuperStepStartedEvent ← SuperStep 0 开始
2. ExecutorInvokedEvent ← UppercaseExecutor 开始
3. ExecutorCompletedEvent ← UppercaseExecutor 完成
4. SuperStepCompletedEvent ← SuperStep 0 完成
5. SuperStepStartedEvent ← SuperStep 1 开始
6. ExecutorInvokedEvent ← ReverseExecutor 开始
7. ExecutorCompletedEvent ← ReverseExecutor 完成
8. WorkflowOutputEvent ← 工作流输出结果
9. SuperStepCompletedEvent ← SuperStep 1 完成
关键发现
- SuperStep 结构:每个 SuperStep 都有明确的开始和结束事件
- 执行顺序:SuperStepStarted → ExecutorInvoked → ExecutorCompleted → SuperStepCompleted
- 完整性:每个 Executor 都有 Invoked 和 Completed 事件对
- 输出时机:WorkflowOutputEvent 在最后一个 Executor 完成后、SuperStep 完成前触发
- 可追溯:通过事件序号和类型可以完整追踪执行流程
3. 系统内置事件详解
ExecutorCompletedEvent - 最常用的事件:ExecutorCompletedEvent 是工作流开发中使用频率最高的事件,它表示一个 Executor 成功完成执行。
public sealed class ExecutorCompletedEvent : ExecutorEvent
{
public ExecutorCompletedEvent(string executorId, object? result)
: base(executorId, data: result)
{
}
}
核心用途
| 用途 | 说明 | 示例 |
|---|---|---|
| 获取结果 | Data 属性包含 Executor 的返回值 | 提取中间处理结果 |
| 进度追踪 | 标记某个步骤已完成 | 更新进度条 |
| 链路追踪 | 记录执行顺序 | 构建执行日志 |
| 调试分析 | 查看每个步骤的输出 | 定位数据转换问题 |
实战示例
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent completed)
{
Console.WriteLine($"{completed.ExecutorId} 完成");
Console.WriteLine($" 输出: {completed.Data}");
// 根据 ExecutorId 做不同处理
if (completed.ExecutorId == "ValidationExecutor")
{
bool isValid = (bool)completed.Data;
if (!isValid)
{
Console.WriteLine("验证失败,终止后续流程");
}
}
}
}
- AgentRunUpdateEvent - Agent 专用事件:当 Agent 在工作流中执行时,会产生 AgentRunUpdateEvent 来报告执行进度和输出内容。
public class AgentRunUpdateEvent : ExecutorEvent
{
public AgentRunUpdateEvent(string executorId, AgentRunResponseUpdate update)
: base(executorId, data: update)
{
Update = update;
}
public AgentRunResponseUpdate Update { get; }
}
关键属性 AgentRunResponseUpdate对象,包含:
- Text:Agent 输出的文本内容
- Contents:结构化内容(可能包含 FunctionCall 等)
- 其他 Agent 运行时信息
触发条件:只有在发送 TurnToken 时设置 emitEvents: true,才会产生此事件!
// 正确:会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
// 错误:不会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: false));
实战示例
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentRunUpdateEvent agentEvent)
{
// 实时显示 Agent 输出
Console.Write(agentEvent.Update.Text);
// 检查是否有函数调用
var functionCall = agentEvent.Update.Contents
.OfType<FunctionCallContent>()
.FirstOrDefault();
if (functionCall != null)
{
Console.WriteLine($"\n调用函数: {functionCall.Name}");
}
}
}
- WorkflowOutputEvent - 工作流输出事件:WorkflowOutputEvent 标志着工作流产生了输出,通常意味着工作流即将完成或已经完成。
public sealed class WorkflowOutputEvent : WorkflowEvent
{
internal WorkflowOutputEvent(object data, string sourceId) : base(data)
{
SourceId = sourceId;
}
public string SourceId { get; }
// 类型安全的数据提取方法
public bool Is<T>() => Data is T;
public bool Is<T>([NotNullWhen(true)] out T? maybeValue)
{
if (Data is T value)
{
maybeValue = value;
return true;
}
maybeValue = default;
return false;
}
public T? As<T>() => Data is T value ? value : default;
}
核心特性
| 特性 | 说明 |
|---|---|
| SourceId | 产生输出的 Executor ID |
| 类型安全 | 提供 Is |
| 终止标志 | 通常是事件流的最后一个事件 |
实战示例
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"工作流完成,来自: {outputEvent.SourceId}");
// 类型安全的数据提取
if (outputEvent.Is<string>(out var textResult))
{
Console.WriteLine($"文本结果: {textResult}");
}
else if (outputEvent.Is<List<ChatMessage>>(out var messages))
{
Console.WriteLine($"对话消息: {messages.Count} 条");
}
// 或者使用 As<T>
var result = outputEvent.As<MyCustomType>();
if (result != null)
{
// 处理自定义类型
}
}
}
- ExecutorFailedEvent - 错误处理事件:当 Executor 执行过程中抛出异常时,会产生 ExecutorFailedEvent。
public sealed class ExecutorFailedEvent : ExecutorEvent
{
public ExecutorFailedEvent(string executorId, Exception? err)
: base(executorId, data: err)
{
Data = err;
}
public new Exception? Data { get; }
}
实战示例
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is ExecutorFailedEvent failedEvent)
{
Console.WriteLine($"{failedEvent.ExecutorId} 执行失败");
Console.WriteLine($" 错误类型: {failedEvent.Data?.GetType().Name}");
Console.WriteLine($" 错误消息: {failedEvent.Data?.Message}");
Console.WriteLine($" 堆栈跟踪: {failedEvent.Data?.StackTrace}");
// 根据错误类型做不同处理
switch (failedEvent.Data)
{
case HttpRequestException httpEx:
Console.WriteLine("网络错误,建议重试");
break;
case TimeoutException timeoutEx:
Console.WriteLine("超时错误,可能需要增加超时时间");
break;
default:
Console.WriteLine("未知错误,请检查日志");
break;
}
}
}
- RequestInfoEvent - 人机协作事件:RequestInfoEvent 用于 Human-in-the-Loop 场景,表示工作流需要外部输入(通常是人工干预)。
public sealed class RequestInfoEvent : WorkflowEvent
{
public RequestInfoEvent(ExternalRequest request) : base(request)
{
Request = request;
}
public ExternalRequest Request { get; }
}
使用场景
- 审批流程:等待管理员批准
- 内容审核:等待人工审核敏感内容
- 用户交互:等待用户提供额外信息
- 配置选择:等待用户选择执行路径
实战示例
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is RequestInfoEvent requestEvent)
{
Console.WriteLine("工作流暂停,等待外部输入");
Console.WriteLine($" 请求ID: {requestEvent.Request.RequestId}");
// 获取用户输入(实际场景可能是 Web API 或 UI)
Console.Write("请输入响应内容: ");
var userInput = Console.ReadLine();
// 创建响应
var response = new ExternalResponse
{
RequestId = requestEvent.Request.RequestId,
Data = userInput
};
// 将响应发送回工作流
await run.TrySendMessageAsync(response);
Console.WriteLine("响应已发送,工作流继续执行");
}
}
4. 事件监听与过滤
在实际应用中,我们通常不需要处理所有事件,而是只关注特定类型的事件,接下来我们看看多种事件过滤技巧。
- 基于事件类型的过滤,最基础的过滤方式是使用 is 模式匹配或 switch 表达式。
// 示例:只处理 ExecutorCompletedEvent
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent completed)
{
Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
}
// 忽略其他事件
}
// 使用 switch 表达式处理多种事件类型
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
var message = evt switch
{
ExecutorCompletedEvent completed =>
$"{completed.ExecutorId} 完成: {completed.Data}",
ExecutorFailedEvent failed =>
$"{failed.ExecutorId} 失败: {failed.Data?.Message}",
AgentRunUpdateEvent agentUpdate =>
$"Agent 更新: {agentUpdate.Update.Text}",
WorkflowOutputEvent output =>
$"输出结果: {output.Data}",
_ => null // 忽略其他事件
};
if (message != null)
{
Console.WriteLine(message);
}
}
基于 ExecutorId 的过滤,在复杂的工作流中,我们可能只关注特定 Executor 的事件。
使用场景
进度监控:只追踪关键步骤的执行
问题调试:定位某个特定 Executor 的问题
日志记录:只记录重要步骤的日志
性能优化:减少不必要的事件处理
// 只监听特定 Executor 的完成事件
var targetExecutorId = "ReverseExecutor";
Console.WriteLine($"只监听 {targetExecutorId} 的事件\n");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
// 方式 1: 直接在 ExecutorEvent 上过滤
if (evt is ExecutorEvent executorEvent &&
executorEvent.ExecutorId == targetExecutorId)
{
Console.WriteLine($"{evt.GetType().Name}");
Console.WriteLine($" Executor: {executorEvent.ExecutorId}");
Console.WriteLine($" 数据: {executorEvent.Data}\n");
}
}
// 方式 2: 组合多个条件
var interestedExecutors = new[] { "UppercaseExecutor", "ReverseExecutor" };
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent completed &&
interestedExecutors.Contains(completed.ExecutorId))
{
Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
}
}
- LINQ 过滤(最优雅的方式),利用 C# 的 LINQ 能力,我们可以创建更加优雅和可复用的过滤逻辑。
| 优势 | 说明 |
|---|---|
| 链式调用 | 可以组合多个过滤条件 |
| 类型安全 | 强类型支持,编译时检查 |
| 代码复用 | 过滤逻辑可以封装为扩展方法 |
| 可读性强 | 语义清晰,接近自然语言 |
// 定义 LINQ 扩展方法
//public static class WorkflowEventExtensions
//{
// 过滤特定类型的事件
public static async IAsyncEnumerable<T> OfEventType<T>(
this IAsyncEnumerable<WorkflowEvent> events) where T : WorkflowEvent
{
await foreach (var evt in events)
{
if (evt is T typedEvent)
{
yield return typedEvent;
}
}
}
// 过滤特定 Executor 的事件
public static async IAsyncEnumerable<ExecutorEvent> FromExecutor(
this IAsyncEnumerable<WorkflowEvent> events,
string executorId)
{
await foreach (var evt in events)
{
if (evt is ExecutorEvent executorEvent &&
executorEvent.ExecutorId == executorId)
{
yield return executorEvent;
}
}
}
// 过滤多个 Executor 的事件
public static async IAsyncEnumerable<ExecutorEvent> FromExecutors(
this IAsyncEnumerable<WorkflowEvent> events,
params string[] executorIds)
{
var executorSet = new HashSet<string>(executorIds);
await foreach (var evt in events)
{
if (evt is ExecutorEvent executorEvent &&
executorSet.Contains(executorEvent.ExecutorId))
{
yield return executorEvent;
}
}
}
//}
使用 LINQ 扩展方法的优雅示例
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("LINQ 过滤示例");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
var simpleWorkflow = new WorkflowBuilder(uppercaseExecutor)
.AddEdge(uppercaseExecutor, reverseExecutor)
.WithOutputFrom(reverseExecutor)
.Build();
// 重新执行工作流
var run2 = await InProcessExecution.StreamAsync(simpleWorkflow, "Hello LINQ");
// 示例 1: 只监听 ExecutorCompletedEvent
Console.WriteLine("示例 1: 只监听完成事件\n");
await foreach (var completed in run2.WatchStreamAsync().OfEventType<ExecutorCompletedEvent>())
{
Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
}
await run2.DisposeAsync();
Console.WriteLine("\n" + new string('─', 60) + "\n");
// 示例 2: 只监听特定 Executor
var run3 = await InProcessExecution.StreamAsync(simpleWorkflow, "Hello Filter");
Console.WriteLine("示例 2: 只监听 ReverseExecutor\n");
await foreach (var evt in run3.WatchStreamAsync().FromExecutor("ReverseExecutor"))
{
Console.WriteLine($"{evt.GetType().Name}: {evt.Data}");
}
await run3.DisposeAsync();
Console.WriteLine("\n" + new string('─', 60) + "\n");
// 示例 3: 链式组合(只监听特定 Executor 的完成事件)
var run4 = await InProcessExecution.StreamAsync(simpleWorkflow, "Hello Chain");
Console.WriteLine("示例 3: 链式过滤 (ReverseExecutor + Completed)\n");
await foreach (var completed in run4.WatchStreamAsync()
.FromExecutor("ReverseExecutor")
.OfEventType<ExecutorCompletedEvent>())
{
Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
}
await run4.DisposeAsync();
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("LINQ 过滤示例完成");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
5. 实战案例 - 构建事件监控系统
现在让我们构建一个完整的事件监控系统,展示如何在实际项目中应用事件系统。
我们将构建一个 WorkflowMonitor 类,它可以:
- 实时统计:记录每个 Executor 的执行时间
- 日志记录:记录关键事件
- 错误监控:捕获并报告错误
- 进度展示:显示执行进度
- 性能分析:识别性能瓶颈
监控系统架构
flowchart TB
subgraph "事件流"
A[WorkflowEvent Stream]
end
subgraph "WorkflowMonitor"
B[事件接收器]
C[统计收集器]
D[日志记录器]
E[错误处理器]
F[进度追踪器]
end
subgraph "输出"
G[控制台日志]
H[性能报告]
I[错误报告]
end
A --> B
B --> C
B --> D
B --> E
B --> F
C --> H
D --> G
E --> I
F --> G
style A fill:#2196F3,stroke:#1565C0,stroke-width:2px,color:#fff
style B fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
style H fill:#FF9800,stroke:#E65100,stroke-width:2px,color:#fff
- 监控类定义
// 完整的工作流监控系统
public class WorkflowMonitor
{
// 执行统计信息
private class ExecutorStats
{
public string ExecutorId { get; set; } = "";
public DateTime? StartTime { get; set; }
public DateTime? EndTime { get; set; }
public TimeSpan? Duration => EndTime - StartTime;
public bool Success { get; set; }
public object? Result { get; set; }
public Exception? Error { get; set; }
}
private readonly Dictionary<string, ExecutorStats> _stats = new();
private readonly List<string> _logs = new();
private DateTime _workflowStartTime;
private DateTime _workflowEndTime;
private int _totalEvents = 0;
private int _superStepCount = 0;
// 监控入口方法
public async Task MonitorAsync(StreamingRun run)
{
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("工作流监控系统已启动");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
_totalEvents++;
await HandleEventAsync(evt);
}
// 执行完成后生成报告
GenerateReport();
}
// 处理单个事件
private async Task HandleEventAsync(WorkflowEvent evt)
{
switch (evt)
{
case WorkflowStartedEvent:
HandleWorkflowStarted();
break;
case ExecutorInvokedEvent invoked:
HandleExecutorInvoked(invoked);
break;
case ExecutorCompletedEvent completed:
HandleExecutorCompleted(completed);
break;
case ExecutorFailedEvent failed:
HandleExecutorFailed(failed);
break;
case SuperStepCompletedEvent step:
HandleSuperStepCompleted(step);
break;
case WorkflowOutputEvent output:
HandleWorkflowOutput(output);
break;
case AgentRunUpdateEvent agentUpdate:
HandleAgentUpdate(agentUpdate);
break;
}
await Task.CompletedTask;
}
// 工作流开始
private void HandleWorkflowStarted()
{
_workflowStartTime = DateTime.Now;
Log("工作流开始执行");
}
// Executor 调用
private void HandleExecutorInvoked(ExecutorInvokedEvent evt)
{
_stats[evt.ExecutorId] = new ExecutorStats
{
ExecutorId = evt.ExecutorId,
StartTime = DateTime.Now
};
Log($" [{evt.ExecutorId}] 开始执行");
}
// Executor 完成
private void HandleExecutorCompleted(ExecutorCompletedEvent evt)
{
if (_stats.TryGetValue(evt.ExecutorId, out var stats))
{
stats.EndTime = DateTime.Now;
stats.Success = true;
stats.Result = evt.Data;
Log($"[{evt.ExecutorId}] 执行成功 (耗时: {stats.Duration?.TotalMilliseconds:F2}ms)");
}
}
// Executor 失败
private void HandleExecutorFailed(ExecutorFailedEvent evt)
{
if (_stats.TryGetValue(evt.ExecutorId, out var stats))
{
stats.EndTime = DateTime.Now;
stats.Success = false;
stats.Error = evt.Data;
Log($"[{evt.ExecutorId}] 执行失败: {evt.Data?.Message}");
}
}
// SuperStep 完成
private void HandleSuperStepCompleted(SuperStepCompletedEvent evt)
{
_superStepCount++;
Log($"SuperStep #{evt.StepNumber} 完成");
}
// 工作流输出
private void HandleWorkflowOutput(WorkflowOutputEvent evt)
{
_workflowEndTime = DateTime.Now;
Log($"工作流输出结果 (来自: {evt.SourceId})");
}
// Agent 更新
private void HandleAgentUpdate(AgentRunUpdateEvent evt)
{
if (!string.IsNullOrEmpty(evt.Update.Text))
{
Log($"[{evt.ExecutorId}] Agent 输出: {evt.Update.Text}");
}
}
// 记录日志
private void Log(string message)
{
var timestamp = DateTime.Now.ToString("HH:mm:ss.fff");
var logMessage = $"[{timestamp}] {message}";
_logs.Add(logMessage);
Console.WriteLine(logMessage);
}
// 生成性能报告
private void GenerateReport()
{
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("执行性能报告");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
// 总体统计
var totalDuration = _workflowEndTime - _workflowStartTime;
Console.WriteLine("### 总体统计");
new
{
总事件数 = _totalEvents,
SuperStep数量 = _superStepCount,
Executor数量 = _stats.Count,
总耗时 = $"{totalDuration.TotalMilliseconds:F2}ms",
成功数 = _stats.Values.Count(s => s.Success),
失败数 = _stats.Values.Count(s => !s.Success)
}.Display();
Console.WriteLine("\n### Executor 性能明细\n");
// Executor 性能表格
foreach (var stats in _stats.Values.OrderBy(s => s.StartTime))
{
var status = stats.Success ? "成功" : "失败";
var duration = stats.Duration?.TotalMilliseconds.ToString("F2") ?? "N/A";
Console.WriteLine($"{stats.ExecutorId}");
Console.WriteLine($" 状态: {status}");
Console.WriteLine($" 耗时: {duration}ms");
if (stats.Error != null)
{
Console.WriteLine($" 错误: {stats.Error.Message}");
}
Console.WriteLine();
}
// 性能分析
if (_stats.Values.Any(s => s.Duration.HasValue))
{
var slowest = _stats.Values
.Where(s => s.Duration.HasValue)
.OrderByDescending(s => s.Duration)
.First();
Console.WriteLine("### 性能瓶颈");
Console.WriteLine($"最慢的 Executor: {slowest.ExecutorId}");
Console.WriteLine($" 耗时: {slowest.Duration?.TotalMilliseconds:F2}ms");
}
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
}
}
- 使用监控系统
// 创建一个更复杂的工作流来测试监控系统
public class DataProcessorExecutor : Executor<string, string>
{
public DataProcessorExecutor() : base("DataProcessor") { }
public override async ValueTask<string> HandleAsync(
string input,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 模拟一些处理时间
await Task.Delay(100);
return $"[Processed: {input}]";
}
}
public class DataValidatorExecutor : Executor<string, bool>
{
public DataValidatorExecutor() : base("DataValidator") { }
public override async ValueTask<bool> HandleAsync(
string input,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 模拟验证时间
await Task.Delay(50);
return input.Length > 0;
}
}
public class DataFormatterExecutor : Executor<string, string>
{
public DataFormatterExecutor() : base("DataFormatter") { }
public override async ValueTask<string> HandleAsync(
string input,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 模拟格式化时间
await Task.Delay(80);
return $"**{input}**";
}
}
// 构建测试工作流
var processor = new DataProcessorExecutor();
var validator = new DataValidatorExecutor();
var formatter = new DataFormatterExecutor();
var testWorkflow = new WorkflowBuilder(processor)
.AddEdge(processor, validator)
.AddEdge(validator, formatter)
.Build();
5. 最佳实践建议
DO(推荐做法)
使用 LINQ 扩展进行过滤 - 代码更清晰、可复用
监听特定事件类型 - 避免处理不需要的事件
记录关键事件 - 便于调试和审计
异步处理事件 - 避免阻塞事件流
使用 switch 表达式 - 处理多种事件类型时更简洁
DON'T(避免做法)
- 在事件处理中做耗时操作 - 会阻塞事件流
- 忽略 ExecutorFailedEvent - 可能导致静默失败
- 过度监听所有事件 - 影响性能
- 在事件处理中修改工作流状态 - 可能导致意外行为
- 忘记设置 emitEvents: true - Agent 事件不会产生
事件监听模式速查
| 场景 | 代码示例 | 适用情况 |
|---|---|---|
| 监听所有事件 | await foreach (var e in run.WatchStreamAsync()) | 调试、完整日志 |
| 监听特定类型 | run.WatchStreamAsync().OfEventType |
只关注某类事件 |
| 监听特定 Executor | run.WatchStreamAsync().FromExecutor("ExecutorId") | 追踪特定步骤 |
| 组合过滤 | run.WatchStreamAsync().FromExecutor("X").OfEventType |
精确过滤 |
| 多类型处理 | evt switch | 差异化处理 |
- 常用事件属性速查
| 事件类型 | 关键属性 | 数据类型 | 获取方式 |
|---|---|---|---|
| ExecutorCompletedEvent | ExecutorId | string | evt.ExecutorId |
| Data (结果) | object? | evt.Data | |
| ExecutorFailedEvent | ExecutorId | string | evt.ExecutorId |
| Data (异常) | Exception? | evt.Data | |
| AgentRunUpdateEvent | ExecutorId | string | evt.ExecutorId |
| Update | AgentRunResponseUpdate | evt.Update | |
| Text (输出文本) | string | evt.Update.Text | |
| WorkflowOutputEvent | SourceId | string | evt.SourceId |
| Data (结果) | object? | evt.Data | |
| 类型检查 | bool | evt.Is |
|
| 类型转换 | T? | evt.As |
|
| SuperStepCompletedEvent | StepNumber | int | evt.StepNumber |
- 错误处理检查清单
- 监听
ExecutorFailedEvent处理异常 - 检查
WorkflowOutputEvent是否为 null - 使用类型安全的
Is<T>()和As<T>()方法 - 在事件处理中捕获异常,避免崩溃
- 记录关键错误信息(ExecutorId、异常类型、堆栈)
- 监听
- 性能优化检查清单
- 使用过滤器减少不必要的事件处理
- 避免在事件处理中执行耗时操作
- 对于密集事件(如 AgentRunUpdateEvent),考虑批处理
- 使用异步操作,避免阻塞事件流
- 监控事件处理的性能瓶颈
- AgentRunUpdateEvent 必备条件:
// 正确:会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
// 错误:不会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: false));
await run.TrySendMessageAsync(new TurnToken()); // 默认 false
- 调试技巧速查
| 技巧 | 实现方式 | 用途 |
|---|---|---|
| 查看所有事件 | Console.WriteLine(evt) | 了解事件序列 |
| 记录事件时间戳 | DateTime.Now 在事件处理时记录 | 性能分析 |
| 统计事件数量 | 维护计数器 | 验证执行完整性 |
| 追踪执行路径 | 记录 ExecutorId 序列 | 理解执行流程 |
| 捕获中间结果 | 保存 ExecutorCompletedEvent.Data | 数据流分析 |
五、人机协作
1. 为什么需要人机协作?
虽然 AI 能力强大,但在以下场景中,人工干预仍然不可或缺:
| 场景 | 说明 | 示例 |
|---|---|---|
| 合规审核 | 需要人工判断内容合规性 | 内容发布前的审核流程 |
| 高风险决策 | 涉及资金或重要资源的操作 | 大额转账审批、资源调配 |
| 创意评审 | 需要主观判断的创意内容 | 营销文案终审、设计方案选择 |
| 质量把控 | AI 输出需要专家验证 | 医疗诊断辅助、法律文书生成 |
| 异常处理 | 遇到 AI 无法处理的边界情况 | 复杂客诉的升级处理 |
MAF 提供了三个核心组件来实现 Human-in-the-Loop:
classDiagram
class RequestPort {
+string PortName
+Type RequestType
+Type ResponseType
+Create~TRequest,TResponse~()
}
class RequestInfoEvent {
+ExternalRequest Request
+string PortName
+Type RequestType
}
class ExternalResponse {
+object Data
+string RequestId
+CreateResponse(data)
}
RequestPort ..> RequestInfoEvent: 发出事件
RequestInfoEvent ..> ExternalResponse: 触发响应
ExternalResponse ..> RequestPort: 恢复执行
RequestPort (请求端口)
作用:在工作流中插入"暂停点"
特性:像 Executor 一样使用,但不执行逻辑
创建:RequestPort.Create<TRequest, TResponse>("PortName")
RequestInfoEvent (请求信息事件)
- 作用:通知外部世界"需要人工干预"
- 内容:包含请求的数据类型、端口名称
- 监听:通过流式运行的事件流捕获
ExternalResponse (外部响应)
- 作用:将人工决策结果传回工作流
- 创建:request.CreateResponse(data)
- 发送:handle.SendResponseAsync(response)
2. 基础示例 - 猜数字游戏
我们实现一个猜数字游戏,演示最基本的 HITL 交互:
- 工作流设定一个目标数字(如 42)
- 通过 RequestPort 向用户请求猜测
- JudgeExecutor 判断猜测结果(太大/太小/正确)
- 循环直到猜对
graph LR
A[RequestPort<br/>请求猜测] --> B[JudgeExecutor<br/>判断结果]
B -->|太小| C[发送Below信号]
B -->|太大| D[发送Above信号]
B -->|正确| E[输出成功]
C --> A
D --> A
style A fill:#ffeb3b
style E fill:#4caf50
- 步骤1:创建 JudgeExecutor (判断执行器)
/// <summary>
/// 游戏信号:用于 RequestPort 和 JudgeExecutor 之间的通信
/// </summary>
public enum NumberSignal
{
Init, // 初始状态
Above, // 猜的数字太大
Below // 猜的数字太小
}
/// <summary>
/// 裁判执行器:判断用户猜测的数字
/// </summary>
public class JudgeExecutor : Executor<int>
{
private readonly int _targetNumber; // 目标数字
private int _tries = 0; // 尝试次数
public JudgeExecutor(int targetNumber) : base("Judge")
{
_targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(
int guess,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
_tries++;
Console.WriteLine($"━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"第 {_tries} 次尝试,您猜测的数字是: {guess}");
if (guess == _targetNumber)
{
// 猜对了!输出结果并结束
await context.YieldOutputAsync(
$"恭喜!数字 {_targetNumber} 在 {_tries} 次尝试后找到!",
cancellationToken);
}
else if (guess < _targetNumber)
{
// 太小了
Console.WriteLine($"提示: 太小了!");
await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
}
else
{
// 太大了
Console.WriteLine($"提示: 太大了!");
await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
}
}
}
步骤2:构建工作流
核心要点:
- RequestPort 接收 NumberSignal 作为请求,返回 int 作为响应
- 工作流从 RequestPort 开始
- 创建循环边:JudgeExecutor → RequestPort → JudgeExecutor
// 创建 RequestPort
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
// 创建 JudgeExecutor(目标数字是 42)
var judgeExecutor = new JudgeExecutor(42);
// 构建工作流
var guessWorkflow = new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor) // RequestPort → Judge
.AddEdge(judgeExecutor, numberRequestPort) // Judge → RequestPort (循环)
.WithOutputFrom(judgeExecutor) // 从 Judge 输出结果
.Build();
- 步骤3:定义外部请求处理逻辑,这是 HITL 的核心:处理 RequestInfoEvent 并返回 ExternalResponse
/// <summary>
/// 处理外部请求:根据不同的 NumberSignal 提示用户输入
/// </summary>
async Task<ExternalResponse> HandleExternalRequest(ExternalRequest request)
{
if (request.DataIs<NumberSignal>())
{
var signal = request.DataAs<NumberSignal>();
switch (signal)
{
case NumberSignal.Init:
var prompt = "\n游戏开始!请输入您的初始猜测 (1-100):";
Console.WriteLine(prompt);
int initialGuess = await PolyglotHelper.ReadConsoleInputAsync<int>(prompt);
return request.CreateResponse(initialGuess);
case NumberSignal.Above:
var promptAbove = "\n上次猜的太大了,请输入一个更小的数字:";
Console.WriteLine(promptAbove);
int lowerGuess = await PolyglotHelper.ReadConsoleInputAsync<int>(promptAbove);
return request.CreateResponse(lowerGuess);
case NumberSignal.Below:
var promptBelow = "\n上次猜的太小了,请输入一个更大的数字:";
Console.WriteLine(promptBelow);
int higherGuess = await PolyglotHelper.ReadConsoleInputAsync<int>(promptBelow);
return request.CreateResponse(higherGuess);
}
}
throw new NotSupportedException($"不支持的请求类型: {request.PortInfo.RequestType}");
}
Console.WriteLine("外部请求处理函数定义完成");
步骤4:运行工作流(流式执行 + 事件处理)
关键流程:
- 使用 InProcessExecution.StreamAsync() 流式运行
- 初始消息发送 NumberSignal.Init 启动游戏
- 监听 RequestInfoEvent → 调用 HandleExternalRequest() → 发送 ExternalResponse
- 监听 WorkflowOutputEvent 获取最终结果
Console.WriteLine("开始猜数字游戏!\n");
// 流式运行工作流,初始消息是 NumberSignal.Init
await using (StreamingRun handle = await InProcessExecution.StreamAsync(guessWorkflow, NumberSignal.Init)){
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInfoEvent:
// 收到外部请求事件
Console.WriteLine($"\n收到 RequestInfoEvent (PortName: {requestInfoEvent.Request.PortInfo.PortId})");
// 处理请求并返回响应
ExternalResponse response = await HandleExternalRequest(requestInfoEvent.Request);
await handle.SendResponseAsync(response);
break;
case WorkflowOutputEvent outputEvent:
// 工作流完成
Console.WriteLine($"\n━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"{outputEvent.Data}");
return;
}
}
}
3. 企业场景 - 内容审核工作流
场景说明:某企业的内容发布系统需要实现以下审核流程:
- 用户提交文章内容
- AI 进行初步敏感词检测
- 如果检测到风险内容,暂停工作流,通知人工审核员
- 审核员决定:通过 / 拒绝 / 需要修改
- 根据审核结果继续后续流程
- 步骤1:定义数据模型
/// <summary>
/// 内容提交
/// </summary>
public record ContentSubmission(string Title, string Body, string Author);
/// <summary>
/// AI 检测结果
/// </summary>
public record DetectionResult(string Content, bool IsRisky, string Reason);
/// <summary>
/// 审核决策
/// </summary>
public enum ReviewDecision
{
Approve, // 通过
Reject, // 拒绝
NeedEdit // 需要修改
}
/// <summary>
/// 审核请求(用于 RequestPort)
/// </summary>
public record ReviewRequest(string Content, string Reason);
- 步骤2:实现 AI 检测执行器
/// <summary>
/// AI 初步检测执行器
/// </summary>
public class ContentDetectionExecutor : Executor<ContentSubmission>
{
private readonly IChatClient _chatClient;
public ContentDetectionExecutor(IChatClient chatClient) : base("ContentDetection")
{
_chatClient = chatClient;
}
public override async ValueTask HandleAsync(
ContentSubmission content,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
Console.WriteLine($"\nAI 正在检测内容...");
Console.WriteLine($" 标题: {content.Title}");
Console.WriteLine($" 作者: {content.Author}");
// 构建检测 Prompt
var prompt = $@"
请检测以下内容是否包含敏感信息、违规内容或不当表述。
内容:
{content.Body}
请以 JSON 格式返回:
{{
""isRisky"": true/false,
""reason"": ""检测原因""
}}
";
try
{
var response = await _chatClient.GetResponseAsync<DetectionResult>(prompt, cancellationToken: cancellationToken);
var result = new DetectionResult(content.Body, response.Result!.IsRisky, response.Result.Reason);
if (result.IsRisky)
{
Console.WriteLine($"检测到风险内容: {result.Reason}");
}
else
{
Console.WriteLine($"内容安全,可以自动发布");
}
// 发送检测结果到下游
await context.SendMessageAsync(result, cancellationToken: cancellationToken);
}
catch (Exception ex)
{
Console.WriteLine($"AI 检测失败: {ex.Message}");
// 失败时默认走人工审核
await context.SendMessageAsync(
new DetectionResult(content.Body, true, "AI检测失败,需要人工审核"),
cancellationToken: cancellationToken);
}
}
}
- 步骤3:实现路由执行器(决定是否需要人工审核)
/// <summary>
/// 路由执行器:根据风险判断是否需要人工审核
/// </summary>
public class RoutingExecutor : Executor<DetectionResult>
{
public RoutingExecutor() : base("Routing") { }
public override async ValueTask HandleAsync(
DetectionResult result,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
if (result.IsRisky)
{
// 高风险,发送到人工审核端口
Console.WriteLine($"触发人工审核流程");
await context.SendMessageAsync(
new ReviewRequest(result.Content, result.Reason),
cancellationToken: cancellationToken);
}
else
{
// 低风险,直接发布
Console.WriteLine($"自动通过审核,准备发布");
await context.YieldOutputAsync($"内容已自动发布", cancellationToken);
}
}
}
- 步骤4:实现发布执行器
/// <summary>
/// 发布执行器:处理人工审核后的结果
/// </summary>
public class PublishExecutor : Executor<ReviewDecision>
{
public PublishExecutor() : base("Publish") { }
public override async ValueTask HandleAsync(
ReviewDecision decision,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
Console.WriteLine($"\n处理审核决策: {decision}");
switch (decision)
{
case ReviewDecision.Approve:
await context.YieldOutputAsync("人工审核通过,内容已发布", cancellationToken);
break;
case ReviewDecision.Reject:
await context.YieldOutputAsync("人工审核拒绝,内容未发布", cancellationToken);
break;
case ReviewDecision.NeedEdit:
await context.YieldOutputAsync("需要作者修改后重新提交", cancellationToken);
break;
}
}
}
- 步骤5:构建审核工作流
// 创建执行器
var detectionExecutor = new ContentDetectionExecutor(chatClient);
var routingExecutor = new RoutingExecutor();
var publishExecutor = new PublishExecutor();
// 创建 RequestPort(用于人工审核)
var reviewPort = RequestPort.Create<ReviewRequest, ReviewDecision>("HumanReview");
// 构建工作流
var reviewWorkflow = new WorkflowBuilder(detectionExecutor)
.AddEdge(detectionExecutor, routingExecutor) // 检测 → 路由
.AddEdge(routingExecutor, reviewPort) // 路由 → 人工审核端口
.AddEdge(reviewPort, publishExecutor) // 审核端口 → 发布
.WithOutputFrom(routingExecutor) // 自动发布的输出
.WithOutputFrom(publishExecutor) // 人工审核后的输出
.Build();
Console.WriteLine("内容审核工作流构建完成");
- 步骤6:定义外部审核处理逻辑
/// <summary>
/// 模拟人工审核员的决策
/// </summary>
async Task<ExternalResponse> HandleReviewRequest(ExternalRequest request)
{
if (request.DataIs<ReviewRequest>())
{
var reviewRequest = request.DataAs<ReviewRequest>();
Console.WriteLine($"\n━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"人工审核界面");
Console.WriteLine($"━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"风险原因: {reviewRequest.Reason}");
Console.WriteLine($"内容摘要: {reviewRequest.Content.Substring(0, Math.Min(50, reviewRequest.Content.Length))}...");
Console.WriteLine($"\n请输入审核决策:");
Console.WriteLine($"1 - 通过 (Approve)");
Console.WriteLine($"2 - 拒绝 (Reject)");
Console.WriteLine($"3 - 需要修改 (NeedEdit)");
Console.WriteLine($"━━━━━━━━━━━━━━━━━━\n");
int choice = await PolyglotHelper.ReadConsoleInputAsync<int>("请输入您的选择 (1-3): 1:通过,2:拒绝,3:需要修改");
ReviewDecision decision = choice switch
{
1 => ReviewDecision.Approve,
2 => ReviewDecision.Reject,
3 => ReviewDecision.NeedEdit,
_ => ReviewDecision.Reject // 默认拒绝
};
Console.WriteLine($"\n审核决策已提交: {decision}");
return request.CreateResponse(decision);
}
throw new NotSupportedException($"不支持的请求类型: {request.PortInfo.RequestType}");
}
- 步骤7:测试工作流 - 高风险内容
// 构造一个高风险内容的测试用例
var riskyContent = new ContentSubmission(
Title: "测试文章",
Body: "这是一篇包含敏感政治话题和未经证实的健康信息的文章内容,可能违反平台规定。",
Author: "测试作者"
);
Console.WriteLine("开始测试内容审核工作流(高风险场景)\n");
await using (StreamingRun handle = await InProcessExecution.StreamAsync(reviewWorkflow, riskyContent)){
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInfoEvent:
// 收到人工审核请求
Console.WriteLine($"\n收到 RequestInfoEvent (需要人工审核)");
ExternalResponse response = await HandleReviewRequest(requestInfoEvent.Request);
await handle.SendResponseAsync(response);
break;
case WorkflowOutputEvent outputEvent:
// 工作流完成
Console.WriteLine($"\n━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"工作流完成: {outputEvent.Data}");
return;
}
}
}
- 步骤8:测试工作流 - 低风险内容(自动通过)
// 构造一个低风险内容的测试用例
var safeContent = new ContentSubmission(
Title: "技术分享",
Body: "本文将介绍如何使用 .NET 开发 AI Agent,包括 MAF 框架的核心概念和最佳实践。",
Author: "技术博主"
);
Console.WriteLine("开始测试内容审核工作流(低风险场景)\n");
await using (StreamingRun handle2 = await InProcessExecution.StreamAsync(reviewWorkflow, safeContent)){
await foreach (WorkflowEvent evt in handle2.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInfoEvent:
// 这个场景不应该触发人工审核
Console.WriteLine($"意外触发人工审核");
ExternalResponse response = await HandleReviewRequest(requestInfoEvent.Request);
await handle2.SendResponseAsync(response);
break;
case WorkflowOutputEvent outputEvent:
Console.WriteLine($"\n━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"工作流完成: {outputEvent.Data}");
return;
}
}
}
4. 高级模式 - 多轮交互
场景说明:在某些场景中,可能需要多轮人工交互。例如:
- 需求澄清:AI 生成代码后,用户可能多次提出修改意见
- 设计迭代:设计稿需要经过多轮审核和调整
- 调查问卷:根据上一个答案动态生成下一个问题
实现要点,多轮交互的关键是使用循环边和状态管理:
graph LR
A[AI生成初稿] --> B[RequestPort<br/>用户反馈]
B --> C{是否满意?}
C -->|否| D[AI修改]
D --> B
C -->|是| E[完成]
style B fill:#ffeb3b
style C fill:#ff9800
- 简易实现
/// <summary>
/// 需求澄清执行器
/// </summary>
public class RequirementClarificationExecutor : Executor<string>
{
private int _roundCount = 0;
private const int MaxRounds = 3; // 最多3轮交互
public RequirementClarificationExecutor() : base("RequirementClarification") { }
public override async ValueTask HandleAsync(
string userFeedback,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
_roundCount++;
Console.WriteLine($"\n第 {_roundCount} 轮澄清");
Console.WriteLine($" 用户反馈: {userFeedback}");
if (userFeedback.Contains("满意") || userFeedback.Contains("确认"))
{
// 用户满意,结束流程
await context.YieldOutputAsync($"需求已确认!共进行了 {_roundCount} 轮澄清。", cancellationToken);
}
else if (_roundCount >= MaxRounds)
{
// 达到最大轮数
await context.YieldOutputAsync($"已达到最大轮数 ({MaxRounds}),流程结束。", cancellationToken);
}
else
{
// 继续下一轮
await context.SendMessageAsync("需要更多信息", cancellationToken: cancellationToken);
}
}
}
// 构建多轮交互工作流
var clarificationPort = RequestPort.Create<string, string>("UserInput");
var clarificationExecutor = new RequirementClarificationExecutor();
var multiRoundWorkflow = new WorkflowBuilder(clarificationPort)
.AddEdge(clarificationPort, clarificationExecutor)
.AddEdge(clarificationExecutor, clarificationPort) // 循环边
.WithOutputFrom(clarificationExecutor)
.Build();
- 测试多轮交互工作流
Console.WriteLine("开始多轮需求澄清流程\n");
await using (StreamingRun handle3 = await InProcessExecution.StreamAsync(multiRoundWorkflow, "开始")){
await foreach (WorkflowEvent evt in handle3.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInfoEvent:
string feedback = await PolyglotHelper.ReadConsoleInputAsync<string>("请输入您的反馈 (输入'满意'结束流程):");
var response = requestInfoEvent.Request.CreateResponse(feedback);
await handle3.SendResponseAsync(response);
break;
case WorkflowOutputEvent outputEvent:
Console.WriteLine($"\n━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"{outputEvent.Data}");
return;
}
}
}
5. 最佳实践
HITL 最佳实践
| 实践 | 说明 | 示例 |
|---|---|---|
| 最大迭代保护 | 防止无限循环 | _roundCount >= MaxRounds |
| 超时机制 | 人工响应超时处理 | CancellationTokenSource.CancelAfter() |
| 状态持久化 | 长时间等待需保存状态 | 结合 Checkpoint 机制 |
| 清晰的上下文 | 提供足够的决策信息 | 在 ReviewRequest 中包含完整上下文 |
| 通知机制 | 及时提醒审核人员 | 发送邮件/IM 通知 |
| 审计日志 | 记录所有人工决策 | 在 PublishEventAsync 中记录 |
常见陷阱
- 忘记发送响应
// 错误:收到 RequestInfoEvent 后没有调用 SendResponseAsync
case RequestInfoEvent evt:
HandleReviewRequest(evt.Request);
// 工作流会永久卡住!
break;
// 正确
case RequestInfoEvent evt:
var response = HandleReviewRequest(evt.Request);
await handle.SendResponseAsync(response);
break;
- 类型不匹配
// 错误:RequestPort 期望 int,但返回了 string
var port = RequestPort.Create<string, int>("Port");
var response = request.CreateResponse("42"); // string!
// 正确
var response = request.CreateResponse(42); // int
- 缺少循环边
// 错误:RequestPort 只有入边,没有出边
.AddEdge(judgeExecutor, requestPort) // 只有这一条边
// 正确:需要双向边形成循环
.AddEdge(requestPort, judgeExecutor)
.AddEdge(judgeExecutor, requestPort)
架构建议
对于生产环境的 HITL 系统,建议:
- 解耦工作流与 UI
- 工作流不应直接依赖 Console.ReadLine()
- 使用消息队列或 WebSocket 进行通信
- 持久化 Checkpoint
- 在 RequestPort 前保存 Checkpoint
- 允许长时间等待和系统重启
- 监控与告警
- 监控等待中的请求数量
- 超时自动升级或降级处理
六、工作流状态共享
在真实业务中,以下痛点最常见:
| 场景 | 传统做法 | 痛点 | Workflow Shared State 方案 |
|---|---|---|---|
| 多步骤共享原始输入 | 通过参数层层传递 | 易丢失、难维护 | 在入口步骤写入状态,后续随取随用 |
| 并发执行器共享缓存 | 使用静态变量 | 线程不安全 | 作用域隔离 + 状态管理器负责并发 |
| 人机协作回填审批结论 | HTTP 回调携带参数 | 状态丢失、难追踪 | WaitForInput + 状态恢复 |
| Agent 输出给多个消费者 | 多次调用同一个 Agent | 成本高 | 把输出写入状态,其他步骤复用 |
1. WorkflowContext 状态 API 速查
| API | 作用 | 典型使用方式 |
|---|---|---|
| QueueStateUpdateAsync(key, value, scope) | 将状态写入队列,当前步骤完成后统一提交,避免脏读 | 适合在 Executor 内写入结果、缓存、审计信息 |
| ReadStateAsync |
读取指定键,找不到则返回 null |
读取上游步骤输出、缓存的 Agent 响应 |
| ReadOrInitStateAsync |
如果不存在则初始化,兼顾懒加载与并发一致性 | 首次接入业务实体、惰性缓存、计数器 / 重试次数 |
| QueueClearScopeAsync(scope) | 清理整个作用域 | 敏感信息擦除、生命周期结束时清理 |
| ReadStateKeysAsync(scope) | 列出作用域中所有键 | 调试、监控、构建动态路由 |
ReadOrInitStateAsync 应用场景
首次加载业务上下文:如 fileId 对应的元数据、审批信息等只需初始化一次的结构。
惰性构建缓存:在第一次命中时生成摘要、Embedding 或模型响应,后续 Executor 复用。
并发累加器:计数器、重试次数、Fan-in 进度等需要“读当前值 + 初始化默认值”的模式。
作用域 (Scope) 分类
| Scope 类型 | 说明 | 建议命名 |
|---|---|---|
| Executor Scope (默认) | 键只对当前 Executor 可见 | nameof(Executor) |
| 自定义业务 Scope | 多个 Executor 共享,例如 "FileContentState" | 模块名 + State |
| System / Environment Scope | 框架内部使用,不建议直接写入 | 使用 VariableScopeNames 常量时需谨慎 |
线程安全说明:QueueStateUpdateAsync 会在状态管理器内部加锁,所以不需要额外的 lock。但仍需控制数据粒度(例如传输 ID,而不是整个大对象)。
2. 案例:文档统计工作流
我们将复刻官方 Workflows/SharedStates 示例,并加入更多注释:
- 入口步骤读取文本内容,写入 FileContentState 作用域中,值为 fileId -> 文本内容
- Fan-out 出两个 Executor:
- WordCountingExecutor 统计单词数
- ParagraphCountingExecutor 统计段落数
- Fan-in 到 AggregationExecutor,汇总结果并输出结构化报表
该案例展示了一次写入,多处读取的模式,也是后续 MapReduce、并发协作的基础。
- 准备工作
// 示例数据
internal static class SharedStateSampleData
{
private static readonly IReadOnlyDictionary<string, string> Documents = new Dictionary<string, string>
{
["ProductBrief"] = "MAF Workflow 让 .NET 团队可以像积木一样组合 Agent、Executor 与工具, 支持流式事件、并发节点和可观测性。\n\n它强调企业级能力, 包括状态管理、依赖注入、权限控制, 适合搭建端到端 AI 业务流程。",
["WeeklyReport"] = "本周平台完成了 Shared State 功能的代码走查, 已经覆盖 Fan-out/Fan-in, Loop, Human-in-the-Loop 三种场景。\n\n下周计划: 1) 集成多模型投票; 2) 增加异常回滚; 3) 落地监控指标。"
};
public static string GetDocument(string name)
=> Documents.TryGetValue(name, out var content)
? content
: throw new ArgumentException($"未找到文档: {name}");
}
// 状态常量
internal static class FileContentStateConstants
{
public const string ScopeName = "FileContentState";
}
// 模型定义
internal sealed class FileStats
{
public int WordCount { get; init; }
public int ParagraphCount { get; init; }
}
// 定义FileReadExecutor
internal sealed class FileReadExecutor() : Executor<string, string>("FileReadExecutor")
{
public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
string content = SharedStateSampleData.GetDocument(message);
string fileId = Guid.NewGuid().ToString("N");
await context.QueueStateUpdateAsync(fileId, content, FileContentStateConstants.ScopeName, cancellationToken);
Console.WriteLine($"FileReadExecutor 将 {message} 写入 Scope:{FileContentStateConstants.ScopeName}");
return fileId;
}
}
// 定义并行统计执行器
internal sealed class WordCountingExecutor() : Executor<string, FileStats>("WordCountingExecutor")
{
public override async ValueTask<FileStats> HandleAsync(string fileId, IWorkflowContext context, CancellationToken cancellationToken = default)
{
string? content = await context.ReadStateAsync<string>(fileId, FileContentStateConstants.ScopeName, cancellationToken);
if (content is null)
{
throw new InvalidOperationException($"无法在 Scope:{FileContentStateConstants.ScopeName} 中找到 fileId={fileId}");
}
int wordCount = content.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
return new FileStats { WordCount = wordCount };
}
}
// 定义AggregationExecutor
internal sealed class ParagraphCountingExecutor() : Executor<string, FileStats>("ParagraphCountingExecutor")
{
public override async ValueTask<FileStats> HandleAsync(string fileId, IWorkflowContext context, CancellationToken cancellationToken = default)
{
string? content = await context.ReadStateAsync<string>(fileId, FileContentStateConstants.ScopeName, cancellationToken);
if (content is null)
{
throw new InvalidOperationException($"无法在 Scope:{FileContentStateConstants.ScopeName} 中找到 fileId={fileId}");
}
int paragraphCount = content.Split(['\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
return new FileStats { ParagraphCount = paragraphCount };
}
}
internal sealed class AggregationExecutor() : Executor<FileStats>("AggregationExecutor")
{
private readonly List<FileStats> _buffer = [];
public override async ValueTask HandleAsync(FileStats message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._buffer.Add(message);
if (this._buffer.Count < 2)
{
return;
}
int totalWords = this._buffer.Sum(x => x.WordCount);
int totalParagraphs = this._buffer.Sum(x => x.ParagraphCount);
var output = new
{
总词数 = totalWords,
总段落数 = totalParagraphs,
统计时间 = DateTimeOffset.UtcNow
};
Console.WriteLine("文档统计结果");
output.Display();
await context.YieldOutputAsync(output, cancellationToken);
}
}
// 定义AggregationExecutor
internal sealed class AggregationExecutor() : Executor<FileStats>("AggregationExecutor")
{
private readonly List<FileStats> _buffer = [];
public override async ValueTask HandleAsync(FileStats message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._buffer.Add(message);
if (this._buffer.Count < 2)
{
return;
}
int totalWords = this._buffer.Sum(x => x.WordCount);
int totalParagraphs = this._buffer.Sum(x => x.ParagraphCount);
var output = new
{
总词数 = totalWords,
总段落数 = totalParagraphs,
统计时间 = DateTimeOffset.UtcNow
};
Console.WriteLine("文档统计结果");
output.Display();
await context.YieldOutputAsync(output, cancellationToken);
}
}
- 步骤 1:组装 Fan-out/Fan-in 工作流
我们将以下列顺序建图:
- FileReadExecutor 作为入口,并指定最终输出来自 AggregationExecutor
- AddFanOutEdge:把文件 ID 同时传给两个统计执行器
- AddFanInEdge:等待两个统计执行器完成后,再进入聚合节点
通过 Builder 定义拓扑后,我们会在后续步骤用 RunStreamingAsync 查看事件流。
var fileRead = new FileReadExecutor();
var wordCounting = new WordCountingExecutor();
var paragraphCounting = new ParagraphCountingExecutor();
var aggregate = new AggregationExecutor();
var sharedStateWorkflow = new WorkflowBuilder(fileRead)
.AddFanOutEdge(fileRead, [wordCounting, paragraphCounting])
.AddFanInEdge([wordCounting, paragraphCounting], aggregate)
.WithOutputFrom(aggregate)
.Build();
Console.WriteLine("Shared State Workflow 构建完成");
- 步骤 2:流式观察状态读写
我们使用 InProcessExecution.RunStreamingAsync 以事件流的方式运行 workflow,并关注三类事件:
- WorkflowStartedEvent / WorkflowCompletedEvent:整体生命周期
- ExecutorCompletedEvent:确认每个节点的执行顺序
- WorkflowOutputEvent:最终业务输出
同时,我们打印状态写入时生成的 fileId,以便验证 Fan-out 节点读取的是同一个共享数据。
static async Task RunSharedStateDemoAsync(Workflow sharedWorkflow, string documentKey)
{
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"演示文档: {documentKey}");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
await using (var run = await InProcessExecution.StreamAsync(sharedWorkflow, documentKey)){
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case WorkflowStartedEvent started:
Console.WriteLine($"Workflow Started");
break;
case ExecutorCompletedEvent executorCompleted:
Console.WriteLine($"{executorCompleted.ExecutorId} 完成");
break;
case WorkflowOutputEvent outputEvent:
Console.WriteLine("收到 Workflow Output Event:");
outputEvent.Display();
break;
case WorkflowErrorEvent errorEvent:
Console.WriteLine("收到 Workflow Error Event:");
errorEvent.Display();
break;
default:
Console.WriteLine($"其他事件: {evt.GetType().Name}");
evt.Display();
break;
}
}
await run.DisposeAsync();
}
}
await RunSharedStateDemoAsync(sharedStateWorkflow, "ProductBrief");
3. 最佳实践
命名规范
使用 Scope + 功能命名(例:FileContentState)
键值建议为业务 ID(如 fileId、turnId),避免直接储存整段文本作为 key
状态生命周期
在数据敏感的场景结束后调用 QueueClearScopeAsync
长时间运行的 Workflow 可定期写入 UpdatedAt,便于调试
并发注意事项
Fan-out 节点共享同一 state 时,只写 ID,不写大对象
如果必须写大对象,可在读取端做 null 检查并提供降级策略
调试建议
使用 ReadStateKeysAsync 打印当前作用域的所有键
配合 WorkflowEvent 日志,快速定位状态缺失或覆盖问题
常见踩坑
在 Executor 中保存可变引用(List/Dictionary),会被后续步骤修改
在多个 scope 中重复使用相同 key,导致取值混乱
忘记 await QueueStateUpdateAsync,导致状态未落盘
七、工作流上下文
1. API 分类速查表
IWorkflowContext 是 Executor 与 Workflow 引擎交互的唯一入口,它提供了以下能力:
| 分类 | API | 用途 | 常见场景 |
|---|---|---|---|
| 输出管理 | SendMessageAsync | 向下游 Executor 发送消息 | 流程编排、数据传递 |
| AddEventAsync | 触发自定义业务事件 | 进度通知、审计日志 | |
| YieldOutputAsync | 输出最终业务结果 | 工作流返回值 | |
| 流程控制 | RequestHaltAsync | 暂停工作流执行 | 人工审批、等待外部系统 |
| 状态管理 | ReadStateAsync |
读取状态值 | 获取共享数据 |
| ReadOrInitStateAsync |
读取或初始化状态 | 计数器、缓存、配置 | |
| QueueStateUpdateAsync |
更新状态值 | 写入业务数据 | |
| ReadStateKeysAsync | 枚举作用域中的所有键 | 调试、动态路由 | |
| QueueClearScopeAsync | 清理整个作用域 | 敏感数据清理 | |
| 元数据 | TraceContext | 链路追踪上下文 | 分布式追踪 |
| ConcurrentRunsEnabled | 是否支持并发运行 | 并发控制 |
2. 核心概念:三种输出机制的本质区别
这是最容易混淆的部分!理解它们的区别是掌握 Context 的关键。
SendMessageAsync - 流程内部通信
特点:
目标是下游 Executor(通过 Edge 连接的节点)
消息在下一个 SuperStep才会被接收
可以指定 targetId 进行精确路由
如果没有连接关系,消息会被丢弃
典型场景:
动态路由(根据条件发送给不同的 Executor)
广播模式(一个 Executor 发送给多个下游)
流程编排中的数据传递
AddEventAsync - 业务事件通知
特点:
目标是外部订阅者(调用 workflow 的客户端)
事件在当前 SuperStep 结束时立即发出
用于传递业务进度、审计日志、中间状态
不影响工作流的执行流程
典型场景:
进度通知("正在生成大纲…"、"已完成 3/10 个文档")
审计日志("用户操作已记录"、"检测到敏感词")
调试信息(自定义 Event 携带中间变量)
YieldOutputAsync - 最终业务结果
特点:
目标是工作流的最终返回值
通过 WorkflowOutputEvent 封装返回
通常在 WithOutputFrom() 指定的步骤中调用
一个工作流可以有多个输出(多次调用 YieldOutputAsync)
典型场景:
工作流的最终结果(生成的文章、处理后的数据)
批量输出(MapReduce 的 Reduce 结果)
结构化返回值(JSON、DTO)
记忆口诀:
- Send = 内部流程通信(Executor → Executor)
- Event = 进度广播(Executor → 订阅者)
- Output = 最终结果(Workflow → 调用者)
对比总结
| 维度 | SendMessageAsync | AddEventAsync | YieldOutputAsync |
|---|---|---|---|
| 接收者 | 下游 Executor | 外部订阅者 | 外部订阅者 |
| 触发时机 | 下一个 SuperStep | 当前 SuperStep 结束 | 当前 SuperStep 结束 |
| 用途 | 流程内部数据传递 | 进度通知/审计日志 | 最终业务结果 |
| 是否影响流程 | 是(触发下游执行) | 否 | 否 |
| 典型事件类型 | N/A | 自定义 WorkflowEvent | WorkflowOutputEvent |
2. 实战演示:Context Explorer
我们将创建一个专门的 ContextExplorerExecutor,它会依次调用 IWorkflowContext 的所有关键 API,并通过事件流展示每个方法的效果。
演示场景设计

- 步骤 1:定义自定义事件和 Context Explorer
// 自定义事件:用于报告探索进度
internal sealed class ExplorationProgressEvent(string category, string api, string result) : WorkflowEvent
{
public string Category { get; } = category;
public string API { get; } = api;
public string Result { get; } = result;
}
internal sealed class ContextExplorerExecutor() : Executor<string, string>("ContextExplorerExecutor")
{
private const string DemoScope = "ExplorerDemoScope";
public override async ValueTask<string> HandleAsync(string mode, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"ContextExplorer 开始工作 (模式: {mode})");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
// ============ 1.状态管理演示 ============
await this.DemonstrateStateManagementAsync(context, cancellationToken);
// ============ 2.输出机制演示 ============
await this.DemonstrateOutputMechanismsAsync(context, cancellationToken);
// ============ 3.元数据演示 ============
await this.DemonstrateMetadataAsync(context, cancellationToken);
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("ContextExplorer 工作完成");
return "探索完成";
}
private async ValueTask DemonstrateStateManagementAsync(IWorkflowContext context, CancellationToken cancellationToken)
{
Console.WriteLine("\n【1.状态管理 API 演示】");
// 1. QueueStateUpdateAsync - 写入状态
await context.QueueStateUpdateAsync("counter", 100, DemoScope, cancellationToken);
await context.AddEventAsync(new ExplorationProgressEvent(
"状态管理",
"QueueStateUpdateAsync",
"写入 counter=100"), cancellationToken);
Console.WriteLine(" QueueStateUpdateAsync: 写入 counter=100");
// 2. ReadStateAsync - 读取状态
var counter = await context.ReadStateAsync<int>("counter", DemoScope, cancellationToken);
await context.AddEventAsync(new ExplorationProgressEvent(
"状态管理",
"ReadStateAsync",
$"读取到 counter={counter}"), cancellationToken);
Console.WriteLine($" ReadStateAsync: 读取到 counter={counter}");
// 3. ReadOrInitStateAsync - 惰性初始化
var config = await context.ReadOrInitStateAsync(
"config",
() => new { MaxRetry = 3, Timeout = 30 },
DemoScope,
cancellationToken);
await context.AddEventAsync(new ExplorationProgressEvent(
"状态管理",
"ReadOrInitStateAsync",
$"初始化配置: MaxRetry={config.MaxRetry}"), cancellationToken);
Console.WriteLine($" ReadOrInitStateAsync: 初始化配置 MaxRetry={config.MaxRetry}");
// 4. ReadStateKeysAsync - 枚举所有键
var keys = await context.ReadStateKeysAsync(DemoScope, cancellationToken);
await context.AddEventAsync(new ExplorationProgressEvent(
"状态管理",
"ReadStateKeysAsync",
$"作用域中的键: {string.Join(", ", keys)}"), cancellationToken);
Console.WriteLine($" ReadStateKeysAsync: 共 {keys.Count} 个键 [{string.Join(", ", keys)}]");
// 5. QueueClearScopeAsync - 清理作用域(演示中不实际执行,避免影响后续测试)
// await context.QueueClearScopeAsync(DemoScope, cancellationToken);
Console.WriteLine(" QueueClearScopeAsync: 已跳过(避免清理演示数据)");
}
private async ValueTask DemonstrateOutputMechanismsAsync(IWorkflowContext context, CancellationToken cancellationToken)
{
Console.WriteLine("\n【2.输出机制 API 演示】");
// 1. SendMessageAsync - 发送给下游 Executor
await context.SendMessageAsync("来自 ContextExplorer 的消息", targetId: null, cancellationToken);
await context.AddEventAsync(new ExplorationProgressEvent(
"输出机制",
"SendMessageAsync",
"已发送消息到下游"), cancellationToken);
Console.WriteLine(" SendMessageAsync: 已发送消息到下游 Executor");
// 2. AddEventAsync - 触发自定义事件
await context.AddEventAsync(new ExplorationProgressEvent(
"输出机制",
"AddEventAsync",
"这是一个自定义业务事件"), cancellationToken);
Console.WriteLine(" AddEventAsync: 已触发自定义事件");
// 3. YieldOutputAsync - 输出最终结果
var finalResult = new
{
探索时间 = DateTimeOffset.UtcNow,
API总数 = 11,
状态作用域 = DemoScope
};
await context.YieldOutputAsync(finalResult, cancellationToken);
Console.WriteLine(" YieldOutputAsync: 已输出最终结果");
}
private async ValueTask DemonstrateMetadataAsync(IWorkflowContext context, CancellationToken cancellationToken)
{
Console.WriteLine("\n【3.元数据 API 演示】");
// 1. TraceContext - 链路追踪上下文
var traceContext = context.TraceContext;
var traceInfo = traceContext != null
? string.Join(", ", traceContext.Select(kv => $"{kv.Key}={kv.Value}"))
: "null";
await context.AddEventAsync(new ExplorationProgressEvent(
"元数据",
"TraceContext",
traceInfo), cancellationToken);
Console.WriteLine($" TraceContext: {traceInfo}");
// 2. ConcurrentRunsEnabled - 并发支持
var concurrentEnabled = context.ConcurrentRunsEnabled;
await context.AddEventAsync(new ExplorationProgressEvent(
"元数据",
"ConcurrentRunsEnabled",
concurrentEnabled.ToString()), cancellationToken);
Console.WriteLine($" ConcurrentRunsEnabled: {concurrentEnabled}");
}
}
- 步骤 2:创建下游接收器
为了演示 SendMessageAsync 的效果,我们需要一个下游 Executor 来接收消息。
internal sealed class DownstreamReceiverExecutor() : Executor<string, string>("DownstreamReceiverExecutor")
{
public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"DownstreamReceiver 收到消息: {message}");
await context.AddEventAsync(new ExplorationProgressEvent(
"下游接收",
"HandleAsync",
$"已处理消息: {message}"), cancellationToken);
return "消息已接收";
}
}
- 步骤 3:构建工作流
将 ContextExplorerExecutor 和DownstreamReceiverExecutor 连接起来,形成完整的工作流。
var explorer = new ContextExplorerExecutor();
var receiver = new DownstreamReceiverExecutor();
var contextExplorerWorkflow = new WorkflowBuilder(explorer)
.AddEdge(explorer, receiver)
.WithOutputFrom(explorer) // 输出来自 explorer 的 YieldOutputAsync
.Build();
- 步骤 4:流式运行并观察事件
通过流式运行,我们可以实时观察每个 API 调用产生的事件。
static async Task RunContextExplorerAsync(Workflow workflow, string mode)
{
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"开始探索 IWorkflowContext API (模式: {mode})");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
await using (var run = await InProcessExecution.StreamAsync(workflow, mode))
{
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case WorkflowStartedEvent:
Console.WriteLine("[系统事件] Workflow 已启动\n");
break;
case ExplorationProgressEvent progress:
Console.WriteLine($"[业务事件] 分类: {progress.Category}");
Console.WriteLine($" API: {progress.API}");
Console.WriteLine($" 结果: {progress.Result}\n");
break;
case ExecutorCompletedEvent executorCompleted:
Console.WriteLine($"[系统事件] {executorCompleted.ExecutorId} 执行完成\n");
break;
case WorkflowOutputEvent outputEvent:
Console.WriteLine("[输出事件] WorkflowOutputEvent:");
outputEvent.Display();
Console.WriteLine();
break;
default:
Console.WriteLine($"[其他事件] {evt.GetType().Name}");
break;
}
}
}
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
}
await RunContextExplorerAsync(contextExplorerWorkflow, "完整探索");
- SendMessageAsync
targetId精准路由实战
典型场景:Intent Router 同时连接图像、文本、兜底三个 Executor,希望只唤醒匹配的下游。
flowchart LR
R[IntentRouter] --> I[ImageGenerator]
R --> T[TextGenerator]
R --> F[FallbackExecutor]
R -.targetId=image.-> I
R -.targetId=report.-> T
R -.targetId=FallbackExecutor.-> F
style R fill:#e1f5ff
style I fill:#fff4e1
style T fill:#f3e5f5
style F fill:#e8f5e9
internal sealed class IntentRouterExecutor() : Executor<string, string>("IntentRouter")
{
private readonly Dictionary<string, string> _routingTable = new(StringComparer.OrdinalIgnoreCase)
{
["image"] = "ImageGenerator",
["report"] = "TextGenerator"
};
public override async ValueTask<string> HandleAsync(string intent, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var targetId = this._routingTable.TryGetValue(intent, out var candidate) ? candidate : "FallbackExecutor";
Console.WriteLine($"Router 判定 {intent} 应派发给 {targetId}");
await context.SendMessageAsync($"指令: {intent}", targetId, cancellationToken);
return $"消息已定向到 {targetId}";
}
}
internal sealed class ImageGeneratorExecutor() : Executor<string, string>("ImageGenerator")
{
public override ValueTask<string> HandleAsync(string payload, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"ImageGenerator 收到: {payload}");
return ValueTask.FromResult("图像任务完成");
}
}
internal sealed class TextGeneratorExecutor() : Executor<string, string>("TextGenerator")
{
public override ValueTask<string> HandleAsync(string payload, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"📝 TextGenerator 收到: {payload}");
return ValueTask.FromResult("文本任务完成");
}
}
internal sealed class FallbackExecutor() : Executor<string, string>("FallbackExecutor")
{
public override ValueTask<string> HandleAsync(string payload, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"FallbackExecutor 接管: {payload}");
return ValueTask.FromResult("兜底处理完成");
}
}
static async Task RunTargetIdDemoAsync(string intent)
{
var router = new IntentRouterExecutor();
var imageExecutor = new ImageGeneratorExecutor();
var textExecutor = new TextGeneratorExecutor();
var fallbackExecutor = new FallbackExecutor();
var workflow = new WorkflowBuilder(router)
.AddEdge(router, imageExecutor)
.AddEdge(router, textExecutor)
.AddEdge(router, fallbackExecutor)
.WithOutputFrom(router)
.Build();
Console.WriteLine($"\n输入: {intent}");
await using (var run = await InProcessExecution.StreamAsync(workflow, intent)){
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine("Router 输出:");
outputEvent.Display();
}
}
}
}
await RunTargetIdDemoAsync("image");
await RunTargetIdDemoAsync("report");
await RunTargetIdDemoAsync("speech");
关键提示:
- targetId 必须等于下游 Executor 的 Id(构造函数里指定)
- 上游可以保留多条 Edge,但只有 targetId 命中的节点会收到消息
- 适合意图路由、A/B 测试、租户隔离等需要精确派发的场景
3. 高级场景:QueueClearScopeAsync 的正确使用
QueueClearScopeAsync 用于清理整个作用域中的所有状态,这在处理敏感数据时非常重要。
| 场景 | 说明 | 示例 |
|---|---|---|
| 敏感数据清理 | 处理完用户隐私数据后立即清理 | 银行交易、医疗记录 |
| 临时状态回收 | 中间计算结果不再需要 | MapReduce 的 Map 阶段结果 |
| 生命周期管理 | 某个业务流程结束后清理上下文 | 订单处理完成后清理订单缓存 |
| 测试隔离 | 每个测试用例结束后清理状态 | 单元测试、集成测试 |
internal sealed class SecureDataExecutor() : Executor<string, string>("SecureDataExecutor")
{
private const string SensitiveDataScope = "SensitiveData";
public override async ValueTask<string> HandleAsync(
string userId,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 1. 读取敏感数据
var userData = await LoadUserDataAsync(userId);
await context.QueueStateUpdateAsync("userData", userData, SensitiveDataScope, cancellationToken);
// 2. 处理业务逻辑
var result = await ProcessSecureDataAsync(userData);
// 3. 处理完成后立即清理敏感数据
await context.QueueClearScopeAsync(SensitiveDataScope, cancellationToken);
Console.WriteLine("敏感数据已清理");
return result;
}
private Task<string> LoadUserDataAsync(string userId) => Task.FromResult($"UserData-{userId}");
private Task<string> ProcessSecureDataAsync(string data) => Task.FromResult($"Processed-{data}");
}
4. 最佳实践
- 输出机制选择指南

- 状态管理最佳实践
| 实践 | 说明 | 示例 |
|---|---|---|
| 使用常量定义 Scope | 避免硬编码字符串 | const string MyScope = "OrderContext"; |
| 命名约定 | {模块名}Scope 或 {功能}Context | FileContentScope, UserSessionContext |
| 读写分离 | 写操作集中在入口步骤,读操作分散在下游 | Fan-out/Fan-in 模式 |
| 惰性初始化 | 使用 ReadOrInitStateAsync 避免重复初始化 | 配置、缓存、计数器 |
| 及时清理 | 处理完敏感数据后立即调用 QueueClearScopeAsync | 用户隐私数据、临时文件 |
- 元数据利用
public override async ValueTask<string> HandleAsync(
string message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 1. 检查并发支持
if (context.ConcurrentRunsEnabled)
{
Console.WriteLine("当前环境支持并发执行");
}
// 2. 传递链路追踪上下文
if (context.TraceContext != null)
{
var traceId = context.TraceContext.GetValueOrDefault("traceId");
Console.WriteLine($"TraceId: {traceId}");
// 传递给下游服务(如 HTTP 请求)
await CallExternalServiceAsync(traceId);
}
return "完成";
}
- 常见错误与解决方案
| 错误 | 原因 | 解决方案 |
|---|---|---|
| SendMessageAsync 后下游没收到 | 没有建立 Edge 连接 | 使用 AddEdge() 连接节点 |
| 状态读取返回 null | Scope 名称不一致 | 使用常量统一管理 Scope 名称 |
| 并发冲突 | 多个 Executor 同时写入同一个 Key | 使用不同的 Key 或加入步骤序号 |
| 事件没有触发 | 忘记 await AddEventAsync | 确保所有异步方法都被 await |
