Net+AI智能体9:Workflow基础篇
2025-11-29 19:00:55一、快速开始
1. 什么是 Workflow Orchestration
在企业级 AI 应用开发中,我们经常面临以下挑战:
- 单个 Agent 难以处理复杂任务:一个 Agent 无法同时擅长需求分析、代码生成、测试等多个领域
- 业务逻辑与 AI 调用耦合:复杂的条件判断、循环、并发控制分散在代码各处
- 流程难以可视化和维护:多步骤的 AI 处理流程缺乏清晰的结构
- 缺乏统一的状态管理:多个步骤之间的数据传递和状态共享容易出错
Workflow Orchestration (工作流编排) 正是为了解决这些问题而生:
- 模块化设计:将复杂任务拆分为独立的 Executor 和 Agent
- 清晰的流程定义:使用 Builder API 构建可读、可维护的流程图
- 灵活的流程控制:支持条件分支、循环迭代、并发执行等复杂模式
- 统一的状态管理:内置 WorkflowContext 管理跨步骤的状态和数据
- 实时流式反馈:通过事件机制实时监控工作流的执行进度
MAF Workflow 位于应用层和 Agent 层之间,负责:
- 编排多个 Agent:决定 Agent 的执行顺序、条件和并发策略
- 管理数据流:在 Agent 和 Executor 之间传递数据
- 监控执行状态:实时报告工作流的执行进度和结果
- 错误处理:统一处理执行过程中的异常和重试逻辑
2. 第一个工作流:文本处理管道
业务场景:将用户输入的文本 → 转为大写 → 反转顺序
步骤 1:定义第一个 Executor - 大写转换。
继承 Executor<TInput, TOutput>:
TInput = string:接收字符串输入
TOutput = string:返回字符串输出
构造函数传入 "UppercaseExecutor" 作为唯一标识符
实现 HandleAsync 方法:
message:接收上一个步骤传递的数据 (对于第一个 Executor,是工作流的输入)
context:工作流上下文,用于访问共享状态、发布事件等 (后续课程详解)
cancellationToken:用于取消操作
返回值:会自动作为消息沿着 Edge 传递给下一个 Executor
业务逻辑:
使用 ToUpperInvariant() 进行文化无关的大写转换
返回 ValueTask
(高性能异步模式)
/// <summary>
/// 大写转换执行器 - 将输入文本转换为大写
/// </summary>
public class UppercaseExecutor : Executor<string, string>
{
public UppercaseExecutor() : base("UppercaseExecutor") { }
/// <summary>
/// 核心处理方法 - 将输入文本转为大写
/// </summary>
public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// 执行业务逻辑: 转为大写
string result = message.ToUpperInvariant();
// 返回处理结果 (会自动沿着 Edge 传递给下一个 Executor)
return ValueTask.FromResult(result);
}
}
- 步骤 2:定义第二个 Executor - 文本反转
/// <summary>
/// 文本反转执行器 - 将输入文本反转
/// </summary>
public class ReverseTextExecutor : Executor<string, string>
{
public ReverseTextExecutor() : base("ReverseTextExecutor") { }
/// <summary>
/// 核心处理方法 - 反转输入文本
/// </summary>
public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// 执行业务逻辑: 反转字符串
string result = string.Concat(message.Reverse());
// 返回处理结果
return ValueTask.FromResult(result);
}
步骤 3: 使用 WorkflowBuilder 构建工作流:将两个 Executor 通过 Edge 连接起来,组成完整的工作流。
new WorkflowBuilder(uppercaseExecutor):
创建 WorkflowBuilder 并指定起始 Executor
工作流的输入数据会首先传递给这个 Executor
AddEdge(source, target):
添加一条有向边 (Edge),定义数据流动方向
source 的输出会自动传递给 target 的输入
类型检查:source 的 TOutput 必须匹配 target 的 TInput
WithOutputFrom(executor):
指定工作流的最终输出来自哪个 Executor
如果不调用此方法,默认输出来自最后一个 Executor
Build():
完成构建,返回一个不可变的 Workflow 对象
此时会进行拓扑验证:检查是否有循环依赖、孤立节点等
// 创建 Executor 实例
UppercaseExecutor uppercaseExecutor = new UppercaseExecutor();
ReverseTextExecutor reverseExecutor = new ReverseTextExecutor();
// 使用 WorkflowBuilder 构建工作流
WorkflowBuilder builder = new WorkflowBuilder(uppercaseExecutor); // 指定起始 Executor
builder.AddEdge(uppercaseExecutor, reverseExecutor); // 添加边: uppercase → reverse
builder.WithOutputFrom(reverseExecutor); // 指定输出来自 reverse
// 构建最终的工作流对象
Workflow workflow = builder.Build();
Console.WriteLine("工作流构建完成");
new
{
起始节点 = "UppercaseExecutor",
边 = "UppercaseExecutor → ReverseTextExecutor",
输出节点 = "ReverseTextExecutor"
}.Display();
步骤 4: 同步执行工作流
InProcessExecution.RunAsync(workflow, input):
在当前进程内执行工作流 (同步等待完成)
input 参数会传递给起始 Executor
返回 Run 对象,包含执行结果和事件
Run.NewEvents:
获取工作流执行过程中产生的所有事件
主要事件类型:
WorkflowStartedEvent:工作流启动
ExecutorCompletedEvent:某个 Executor 执行完成
WorkflowCompletedEvent:工作流完成
ExecutorCompletedEvent:
ExecutorId:执行器的唯一标识符
Data:执行器的输出数据 (object 类型,需要转换)
事件按执行顺序排列
// 执行工作流 - 同步模式
string input = "Hello, World!";
Console.WriteLine($"输入: {input}");
Console.WriteLine("开始执行工作流...\n");
await using (Run run = await InProcessExecution.RunAsync(workflow, input))
{
// 处理执行结果
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("执行结果:\n");
foreach (WorkflowEvent evt in run.NewEvents)
{
if (evt is ExecutorCompletedEvent executorCompleted)
{
new
{
执行器 = executorCompleted.ExecutorId,
输出数据 = executorCompleted.Data
}.Display();
Console.WriteLine();
}
}
Console.WriteLine("工作流执行完成");
}
3. 流式执行
流式执行允许我们在工作流运行过程中实时接收事件流,而不是等待全部完成。
sequenceDiagram
participant App as 应用代码
participant WF as Workflow
participant E1 as UppercaseExecutor
participant E2 as ReverseTextExecutor
App->>WF: StreamAsync(input)
WF->>E1: 执行
E1-->>App: ExecutorCompletedEvent (立即返回)
WF->>E2: 执行
E2-->>App: ExecutorCompletedEvent (立即返回)
WF-->>App: WorkflowCompletedEvent
对比:
| 特性 | 同步执行 (RunAsync) | 流式执行 (StreamAsync) |
|---|---|---|
| 执行模式 | 阻塞等待全部完成 | 返回异步流 (IAsyncEnumerable) |
| 事件获取 | 一次性获取所有事件 | 实时逐个接收事件 |
| 适用场景 | 短时间任务、批处理 | 长时间任务、实时反馈 |
| 用户体验 | 等待后一次性显示结果 | 逐步显示进度 |
步骤 5:使用 StreamAsync 实现流式执行
InProcessExecution.StreamAsync(workflow, input):
启动流式执行,立即返回 StreamingRun 对象
不会阻塞等待工作流完成
streamingRun.WatchStreamAsync():
返回 IAsyncEnumerable
异步流 使用 await foreach 实时迭代事件
每当一个 Executor 完成,就会产生一个新的 ExecutorCompletedEvent
时间戳 DateTime.Now:
演示事件是逐个实时产生的
在实际的长时间任务中,你会看到明显的时间间隔
主要事件类型:
WorkflowStartedEvent:工作流开始
ExecutorCompletedEvent:单个 Executor 完成
WorkflowOutputEvent:整个工作流完成
// 执行工作流 - 流式模式
string streamingInput = "Workflow Streaming!";
Console.WriteLine($"输入: {streamingInput}");
Console.WriteLine("开始流式执行工作流...\n");
await using (StreamingRun streamingRun = await InProcessExecution.StreamAsync(workflow, streamingInput))
{
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("实时执行进度:\n");
// 实时监听事件流
await foreach (WorkflowEvent evt in streamingRun.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent executorCompleted)
{
Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {executorCompleted.ExecutorId} 完成");
new
{
输出数据 = executorCompleted.Data
}.Display();
Console.WriteLine();
}
else if (evt is WorkflowStartedEvent)
{
Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 工作流启动\n");
}
else if (evt is WorkflowOutputEvent)
{
Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 工作流完成");
Console.WriteLine($"工作流输出:{evt.Data}");
}
}
}
4. 简化写法
使用 Lambda 表达式创建 Executor:对于简单的转换逻辑,每次都定义完整的 Executor 类显得过于繁琐。MAF 提供了 BindAsExecutor 扩展方法,允许我们将普通函数绑定为 Executor。
// 使用 Lambda 表达式定义转换逻辑
Func<string, string> toUpperFunc = input => input.ToUpperInvariant();
Func<string, string> reverseFunc = input => string.Concat(input.Reverse());
// 将函数绑定为 Executor
var upperExecutor = toUpperFunc.BindAsExecutor("UpperExecutor");
var reverseExecutor2 = reverseFunc.BindAsExecutor("ReverseExecutor");
// 构建工作流 (与之前相同的流程)
WorkflowBuilder lambdaBuilder = new WorkflowBuilder(upperExecutor);
lambdaBuilder.AddEdge(upperExecutor, reverseExecutor2);
lambdaBuilder.WithOutputFrom(reverseExecutor2);
Workflow lambdaWorkflow = lambdaBuilder.Build();
代码解析:
BindAsExecutor(name):
将 Func<TInput, TOutput> 绑定为 Executor<TInput, TOutput>
参数 name:Executor 的唯一标识符
限制:只适用于同步、无副作用的纯函数转换
适用场景:
数据格式转换 (大写、小写、Trim)
简单计算 (字符串拼接、数学运算)
数据映射 (DTO 转换)
不适用场景
需要访问 IWorkflowContext 的逻辑
需要异步操作 (如数据库查询、API调用)
复杂的状态管理
建议:
- 简单转换用 Lambda
- 复杂逻辑用完整的 Executor 类
二、集成 Agent 到工作流
1. 为什么要 Agent in Workflow
在工作流中,我们有两种主要的执行单元:
| 特性 | Executor | Agent |
|---|---|---|
| 定义 | 同步/异步的业务逻辑处理单元 | 基于 LLM 的智能对话式处理单元 |
| 适用场景 | 确定性逻辑(格式化、验证、计算) | 需要智能判断、生成、理解的任务 |
| 输入输出 | 任意类型 (TInput/TOutput) |
ChatMessage |
| 执行方式 | 立即执行 | 需要 TurnToken 触发 |
| 成本 | 几乎无成本 | LLM API 调用成本 |
| 可预测性 | 100% 可预测 | 基于模型能力,存在不确定性 |
将 Agent 集成到工作流的优势:
- 自动化数据流:工作流引擎自动管理 Agent 之间的消息传递
- 统一监控:通过 WorkflowEvent 统一监听所有 Agent 的执行状态
- 灵活组合:Agent 可以与 Executor 自由组合,构建复杂业务逻辑
- 错误处理:工作流级别的异常处理和重试机制
- 可视化:清晰的流程图,便于理解和维护
2. 将 Agent 添加到工作流
- 创建一个简单的翻译 Agent
// 创建一个法语翻译 Agent
var frenchAgent = new ChatClientAgent(
chatClient,
"You are a translation assistant that translates the provided text to French."
);
- 将 Agent 添加到工作流
// 构建包含单个 Agent 的工作流
var simpleWorkflow = new WorkflowBuilder(frenchAgent)
.Build();
- 执行工作流 - TurnToken 机制详解
关键概念:当 Agent 被用作工作流步骤时,它不会立即执行。需要通过 TurnToken 来触发 Agent 的处理。
sequenceDiagram
participant User as 用户
participant Workflow as 工作流引擎
participant Agent as Agent (Executor)
participant LLM as LLM
User->>Workflow: StreamAsync(input)
Workflow->>Agent: 传递 ChatMessage
Note over Agent: 消息被缓存,等待触发
User->>Workflow: TrySendMessageAsync(TurnToken)
Workflow->>Agent: 传递 TurnToken
Agent->>LLM: 发送请求
LLM-->>Agent: 返回响应
Agent-->>Workflow: AgentRunUpdateEvent
Workflow-->>User: 流式输出事件
// 执行工作流
Console.WriteLine("开始执行工作流...");
Console.WriteLine();
var input = new ChatMessage(ChatRole.User, "Hello World!");
await using (StreamingRun run = await InProcessExecution.StreamAsync(simpleWorkflow, input))
{
Console.WriteLine("发送 TurnToken 触发 Agent...");
// 关键步骤:发送 TurnToken 来触发 Agent 执行
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
Console.WriteLine();
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("监听工作流事件...");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine();
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentRunUpdateEvent agentUpdate)
{
Console.WriteLine($"Agent 输出: {agentUpdate.ExecutorId}");
Console.WriteLine($"内容: {agentUpdate.Data}");
Console.WriteLine();
}
}
}
3. 关键概念解析
为什么需要 TurnToken?
在对话式 AI 系统中,Agent 可能需要等待多条消息(如用户输入、工具调用结果)后再统一处理。
TurnToken作为一个 的信号,告诉 Agent:"所有输入已就绪,可以开始处理了"
"这是一个完整的对话回合"
TurnToken 的参数
emitEvents:是否发出执行事件(如
AgentRunUpdateEvent)设置为 true 可以实时监控 Agent 的输出
new TurnToken(emitEvents: true)
AgentRunUpdateEvent
这是 Agent 在工作流中执行时发出的事件:
public class AgentRunUpdateEvent : WorkflowEvent
{
public string ExecutorId { get; } // Agent 的 ID
public object Data { get; } // Agent 的输出内容
public string Status { get; } // 执行状态
}
4. 实战:翻译链
让我们构建一个更有趣的案例:将英文依次翻译为法语、西班牙语,最后再翻译回英语,看看经过三次翻译后内容会有什么变化。
- 创建三个翻译 Agent
// 定义一个辅助方法来创建翻译 Agent
AIAgent CreateTranslationAgent(string targetLanguage, IChatClient client)
{
return new ChatClientAgent(
client,
$"You are a translation assistant that translates the provided text to {targetLanguage}."
);
}
// 创建三个翻译 Agent
var frenchTranslator = CreateTranslationAgent("French", chatClient);
var spanishTranslator = CreateTranslationAgent("Spanish", chatClient);
var englishTranslator = CreateTranslationAgent("English", chatClient);
- 构建翻译链工作流:使用 AddEdge 将三个 Agent 顺序连接
// 构建工作流:English → French → Spanish → English
var translationChainWorkflow = new WorkflowBuilder(frenchTranslator)
.AddEdge(frenchTranslator, spanishTranslator)
.AddEdge(spanishTranslator, englishTranslator)
.Build();
- 执行翻译链
Console.WriteLine("开始执行翻译链工作流...");
Console.WriteLine();
var inputMessage = new ChatMessage(ChatRole.User, "Artificial Intelligence is transforming the world!");
Console.WriteLine($"原始输入: {inputMessage.Text}");
Console.WriteLine();
await using (StreamingRun chainRun = await InProcessExecution.StreamAsync(translationChainWorkflow, inputMessage)){
// 发送 TurnToken 触发所有 Agent
await chainRun.TrySendMessageAsync(new TurnToken(emitEvents: true));
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("翻译过程跟踪");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine();
int stepNumber = 1;
await foreach (WorkflowEvent evt in chainRun.WatchStreamAsync())
{
if (evt is AgentRunUpdateEvent agentEvent)
{
Console.WriteLine($"Step {stepNumber}: {agentEvent.ExecutorId}");
Console.WriteLine($" 翻译结果: {agentEvent.Data}");
Console.WriteLine();
stepNumber++;
}
}
Console.WriteLine("翻译链执行完成");
}
观察与分析
通过上面的输出,会发现:
- 顺序执行:三个 Agent 按照定义的顺序依次执行
- 自动传递:每个 Agent 的输出自动成为下一个 Agent 的输入
- 实时反馈:通过 AgentRunUpdateEvent 可以实时看到每一步的翻译结果
- 语义变化:经过多次翻译后,原始语义可能会有细微变化(这是 LLM 的特性)
5. 核心概念深入
Agent 作为 Executor 的内部机制,当将 AIAgent 添加到工作流时,MAF 会自动将其封装为一个特殊的 Executor。
关键特性:
消息缓存:Agent Wrapper 会缓存收到的 ChatMessage
等待触发:只有收到 TurnToken 才会调用内部的 Agent.RunAsync()
事件发射:执行过程中会发出 AgentRunUpdateEvent
类型固定:输入输出都是 ChatMessage
TurnToken 的作用域:单个 TurnToken 触发所有 Agent,在一个工作流中,只需发送一次 TurnToken,它会触发所有等待中的 Agent。
sequenceDiagram
participant User
participant Workflow
participant Agent1
participant Agent2
User->>Workflow: TurnToken
Workflow->>Agent1: 传递 TurnToken
Agent1->>Agent1: 处理并输出
Agent1->>Workflow: 输出 (ChatMessage)
Workflow->>Agent2: 传递输出 + TurnToken
Agent2->>Agent2: 处理并输出
Agent2->>Workflow: 输出 (ChatMessage)
AgentRunUpdateEvent 详解,这是监控 Agent 执行的核心事件。
AgentRunUpdateEvent 的用途:
- 实时进度:显示 Agent 的执行进度给用户
- 调试:检查每个 Agent 的输入输出
- 日志记录:记录完整的执行链路
- 错误诊断:发现哪个 Agent 出现了问题
// 典型的事件处理模式
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentRunUpdateEvent agentEvent)
{
// 事件属性
Console.WriteLine($"Agent ID: {agentEvent.ExecutorId}");
Console.WriteLine($"数据: {agentEvent.Data}");
Console.WriteLine($"状态: {agentEvent.Status}");
Console.WriteLine($"时间戳: {agentEvent.Timestamp}");
}
}
5. 最佳实践
何时使用 Agent vs Executor
| 场景 | 推荐 | 理由 |
|---|---|---|
| 文本格式化、验证 | Executor | 确定性逻辑,无需 LLM |
| 内容生成、改写 | Agent | 需要理解和创造性 |
| 数学计算、数据转换 | Executor | 精确计算,成本低 |
| 意图识别、分类 | Agent | 需要语义理解 |
| API 调用、数据库查询 | Executor | 确定性操作 |
| 多轮对话、推理 | Agent | 需要上下文理解 |
Agent 在工作流中的命名规范
推荐的命名模式:
- 职责导向:名称应反映 Agent 的具体职责
- 动词+名词:如 translateText, reviewCode
- 避免泛化:不要使用 agent1, helper 等通用名称
- 一致性:在同一工作流中保持命名风格一致
// 好的命名 - 清晰描述职责
var frenchTranslator = CreateTranslationAgent("French", client);
var contentReviewer = new ChatClientAgent(client, "Review content for quality");
var codeGenerator = new ChatClientAgent(client, "Generate C# code");
// 不好的命名 - 过于抽象
var agent1 = new ChatClientAgent(...);
var helper = new ChatClientAgent(...);
错误处理策略
在工作流中使用 Agent 时的错误处理:
// 带错误处理的 Agent 工作流执行
async Task<bool> ExecuteWorkflowWithErrorHandlingAsync(Workflow workflow, ChatMessage input, int maxRetries = 3)
{
for (int attempt = 1; attempt <= maxRetries; attempt++)
{
try
{
Console.WriteLine($"尝试执行 (第 {attempt} 次)...");
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, input);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentRunUpdateEvent agentEvent)
{
// 检查 Agent 输出是否有效
if (agentEvent.Data == null || string.IsNullOrWhiteSpace(agentEvent.Data.ToString()))
{
throw new InvalidOperationException($"Agent {agentEvent.ExecutorId} 返回了空结果");
}
}
}
Console.WriteLine("工作流执行成功");
return true;
}
catch (Exception ex)
{
Console.WriteLine($"执行失败: {ex.Message}");
if (attempt < maxRetries)
{
var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt)); // 指数退避
Console.WriteLine($"等待 {delay.TotalSeconds} 秒后重试...");
await Task.Delay(delay);
}
else
{
Console.WriteLine($"达到最大重试次数,放弃执行");
return false;
}
}
}
return false;
}
性能优化建议
- 减少不必要的 Agent 调用
// 不好的做法 - 每次都调用 Agent
var validatorAgent = new ChatClientAgent(client, "Validate input format");
// 好的做法 - 用 Executor 处理格式验证
public class FormatValidatorExecutor : Executor<string, string>
{
protected override Task<string> HandleAsync(string input, IWorkflowContext context, CancellationToken ct)
{
if (string.IsNullOrWhiteSpace(input))
throw new ArgumentException("输入不能为空");
return Task.FromResult(input);
}
}
- 合理使用 emitEvents
// 生产环境:可以关闭事件以提高性能
await run.TrySendMessageAsync(new TurnToken(emitEvents: false));
// 开发/调试:开启事件以便监控
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
Agent 粒度控制
太细:多个简单 Agent 导致过多的 LLM 调用
太粗:单个复杂 Agent 难以复用和维护
适中:每个 Agent 负责一个明确的子任务
三、工作流即 Agent
1. 为什么需要 Workflow as Agent
在实际应用中,我们经常需要将多个 Agent 组成的协作工作流作为一个整体单元来使用:
| 特性 | 独立的 Workflow | Workflow as Agent |
|---|---|---|
| 执行方式 | StreamAsync() + TurnToken | RunAsync() / RunStreamingAsync() |
| 输入类型 | ChatMessage + 手动发送 TurnToken | ChatMessage / string (自动处理) |
| 输出类型 | WorkflowEvent 流 | AgentRunResponse |
| 接口 | Workflow | AIAgent |
| 复用方式 | 需要手动管理执行流程 | 作为工具/插件/子Agent |
问题:直接使用 Workflow 接口存在以下挑战
- 调用复杂:需要手动创建 StreamingRun、发送 TurnToken、监听事件
- 接口不统一:Workflow 和 Agent 使用不同的调用方式
- 复用困难:无法将 Workflow 直接作为 Agent 的工具
- 组合受限:无法在其他 Agent 中轻松集成 Workflow
Workflow as Agent 的解决方案
AsAgent() 扩展方法将基于 Agent 的 Workflow 包装为标准的 AIAgent 接口,使其可以像普通 Agent 一样使用
- 核心优势:
- 统一接口:使用 AIAgent 接口调用 Workflow
- 无缝集成:可以作为 Agent 的工具使用
- 跨协议复用:支持 A2A、MCP 等协议
- 生态兼容:享受 AIAgent 生态的所有中间件
- 使用场景
- 复杂任务封装:将多步骤的业务逻辑封装为可复用的 Agent
- 跨系统集成:需要在 A2A 协作中使用 Workflow
- 工具化封装:将 Workflow 作为其他 Agent 的工具
- 中间件增强:需要对 Workflow 应用 AIAgent 中间件
- 不建议场景
- 简单任务:如果只是简单的顺序调用,直接使用 WorkflowBuilder 即可
- 性能敏感:WorkflowAgent 会增加一层适配开销
2. 创建多 Agent 协助工作流
我们使用前面创建的多Agent 翻译链工作流,现在我们将这个 Workflow 封装为一个 Agent,使其可以像普通 Agent 一样通过 AIAgent 接口调用。
AsAgent() 方法将 Workflow 包装为一个 AIAgent,内部自动处理:
- 接收 ChatMessage 输入:提取用户消息
- 自动管理 TurnToken:无需手动发送 TurnToken
- 返回 AgentRunResponse:将 Workflow 输出包装为标准响应
sequenceDiagram
participant User as 调用方
participant WA as Workflow Agent
participant WF as Workflow
User->>WA: RunAsync(ChatMessage)
WA->>WA: 内部创建 StreamingRun
WA->>WF: 自动发送 TurnToken
WF->>WF: 执行所有 Agent
WF-->>WA: WorkflowEvent 流
WA->>WA: 包装为 AgentRunResponse
WA-->>User: AgentRunResponse
重要限制:ChatProtocol 支持要求
并非所有 Workflow 都能转换为 Agent! AsAgent() 方法有严格的协议要求,只有满足 ChatProtocol 的 Workflow 才能使用 AsAgent(),必须同时支持:
- 输入类型:List
或 ChatMessage - 触发类型:TurnToken(用于触发 Agent 执行)
如果 Workflow 不满足这两个条件,调用 AsAgent() 会抛出异常:
// 错误示例:基于 Executor 的 Workflow
var workflow = new WorkflowBuilder()
.AddExecutor(new MyCustomExecutor()) // 使用自定义 Executor
.Build();
// 运行时报错!
var agent = workflow.AsAgent("my-agent", "MyAgent");
// System.InvalidOperationException:
// Workflow does not support ChatProtocol:
// At least List<ChatMessage> and TurnToken must be supported as input.
正确做法:使用 AgentWorkflowBuilder,要让 Workflow 支持 AsAgent(),必须使用 AgentWorkflowBuilder 创建基于 Agent 的 Workflow
// 正确:使用 AgentWorkflowBuilder
var workflow = AgentWorkflowBuilder.BuildSequential(
"my-workflow",
[agent1, agent2, agent3] // 使用 AIAgent 实例
);
var workflowAgent = workflow.AsAgent("my-agent", "MyAgent"); // 成功
为什么有这个限制?
核心原因:
- AIAgent 接口的 RunAsync() 方法接收 ChatMessage 作为输入
- Agent 通过 TurnToken 触发执行
- Executor-based Workflow 使用自定义输入类型(如 string),不符合 ChatProtocol 规范
| Workflow 类型 | 输入类型 | 是否支持 TurnToken | 能否转 Agent |
|---|---|---|---|
| Executor-based(WorkflowBuilder) | 自定义类型(如 string, MyData) | 不支持 | 不能 |
| Agent-based(AgentWorkflowBuilder) | ChatMessage | 支持 | 可以 |
将 Workflow 转换为 Agent
// 使用 AsAgent() 将 Workflow 转换为 AIAgent
var translationWorkflowAgent = translationWorkflow.AsAgent(
id: "translation-workflow",
name: "TranslationChain"
);
3. 测试 Workflow Agent
- 使用 RunAsync 调用
// 使用 AIAgent 接口调用 Workflow Agent
var userMessage = "The future belongs to those who believe in the beauty of their dreams.";
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("用户输入:");
Console.WriteLine($" '{userMessage}'");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
// 创建 Thread
var thread = translationWorkflowAgent.GetNewThread();
// 关键:像调用普通 Agent 一样调用 Workflow Agent
var response = await translationWorkflowAgent.RunAsync(userMessage, thread);
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("Workflow Agent 响应:");
foreach (var msg in response.Messages)
{
if (msg.Role == ChatRole.Assistant)
{
Console.WriteLine($" {msg.Text}");
}
}
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
- 使用 RunStreamingAsync 流式调用
// 使用流式调用
var streamingMessage = "Technology is best when it brings people together.";
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("用户输入 (流式):");
Console.WriteLine($" '{streamingMessage}'");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
Console.WriteLine("流式输出:");
var streamThread = translationWorkflowAgent.GetNewThread();
await foreach (var update in translationWorkflowAgent.RunStreamingAsync(streamingMessage, streamThread))
{
if (!string.IsNullOrEmpty(update.Text))
{
Console.WriteLine($" {update.Text}");
}
}
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("流式调用完成");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
4. 将 Workflow Agent 作为工具使用
现在让我们演示 Workflow Agent 最强大的功能:将复杂的多 Agent 协作流程作为工具提供给其他 Agent 使用。
场景说明,我们要创建一个 语言专家 Agent,它需要:
- 接收用户请求(如"测试这段文字的翻译一致性")
- 调用翻译链 Workflow (Workflow Agent) 进行多轮翻译
- 分析并报告翻译质量
这是一个典型的 Agent 调用 Workflow Agent"场景。
- 将 Workflow Agent 转换为 AIFunction
// Workflow Agent 可以直接转换为 AIFunction
var translationChainFunction = translationWorkflowAgent.AsAIFunction(
new AIFunctionFactoryOptions
{
Name = "translation_chain",
Description = "对文本进行多轮翻译测试:英文→法语→西班牙语→英文,用于测试翻译一致性"
}
);
- 创建内容审核 Agent (使用 WorkflowAgent 作为工具)
// 创建语言专家 Agent,配置翻译链工具
var linguistAgent = chatClient.CreateAIAgent(
instructions: @"你是一位语言学专家。你可以使用 translation_chain 工具来测试文本的翻译一致性。
工作流程:
1. 使用 translation_chain 工具对用户提供的文本进行多轮翻译
2. 比较原始文本和最终翻译结果
3. 分析翻译过程中的语义变化
4. 提供翻译质量评估和改进建议
返回格式化的分析报告。",
name: "LinguistExpert",
tools: [translationChainFunction]
);
Console.WriteLine("语言专家 Agent 创建完成");
Console.WriteLine(" 配置工具: translation_chain (Workflow Agent)");
- 测试:Agent 自动调用 WorkflowAgent
// 创建线程并提交请求
var linguistThread = linguistAgent.GetNewThread();
var userRequest = "请测试这段文字的翻译一致性: The only way to do great work is to love what you do.";
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("用户请求:");
Console.WriteLine($" {userRequest}");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
Console.WriteLine("语言专家处理中(将自动调用翻译链工具)...\n");
// Agent 会自动调用 translation_chain 工具 (Workflow Agent)
var response = await linguistAgent.RunAsync(userRequest, linguistThread);
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("分析报告:");
foreach (var msg in response.Messages)
{
if (msg.Role == ChatRole.Assistant)
{
Console.WriteLine($"\n{msg.Text}");
}
}
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
5. 直接调用 vs WorkflowAgent
对比总结
| 特性 | 直接调用 Workflow | 使用 WorkflowAgent |
|---|---|---|
| 调用方式 | RunAsync(TInput) | RunAsync(ChatMessage) |
| 接口 | Workflow | AIAgent |
| 输入类型 | 强类型 (string) | ChatMessage |
| 输出类型 | 强类型 (string) | AgentRunResponse |
| 作为工具 | 不支持 | 支持 |
| 中间件 | 不支持 | 支持 AIAgent 中间件 |
| A2A 协作 | 不支持 | 支持 |
| MCP 集成 | 不支持 | 支持 |
| 性能开销 | 低 | 稍高 (适配层) |
架构对比图
flowchart TB
subgraph "直接调用 Workflow"
A1[应用代码] -->|RunAsync| B1[Workflow]
B1 -->|string| A1
style A1 fill:#FF5252,stroke:#C62828,stroke-width:2px,color:#fff
style B1 fill:#FF5252,stroke:#C62828,stroke-width:2px,color:#fff
end
subgraph "WorkflowAgent 封装"
A2[Agent/应用] -->|RunAsync| B2[WorkflowAgent]
B2 -->|调用| C2[Workflow]
C2 -->|结果| B2
B2 -->|AgentRunResponse| A2
B2 -.可应用.-> D2[中间件]
B2 -.可注册为.-> E2[Agent工具]
B2 -.可用于.-> F2[A2A/MCP]
style A2 fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
style B2 fill:#2196F3,stroke:#1565C0,stroke-width:2px,color:#fff
style C2 fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
end
6. 最佳实践
何时使用 WorkflowAgent?
推荐场景:
需要将 Workflow 作为工具提供给其他 Agent
需要在 A2A 协作中使用 Workflow
需要对 Workflow 应用 AIAgent 中间件 (如日志、缓存)
需要通过 MCP 协议暴露 Workflow
构建可复用的业务逻辑组件
不推荐场景:
简单的内部调用,直接使用 RunAsync 更高效
性能敏感的场景 (适配层会有额外开销)
不需要对外暴露的内部工作流
使用建议
- 命名规范:WorkflowAgent 转为工具时,使用清晰的名称和描述
var tool = workflowAgent.AsAIFunction(new AIFunctionFactoryOptions
{
Name = "clean_data", // 动词_名词格式
Description = "清晰描述功能和用途"
});
- 错误处理:确保 Workflow 内部有完善的错误处理
protected override async Task<string> ExecuteAsync(...)
{
try {
// 业务逻辑
} catch (Exception ex) {
// 记录日志并返回友好错误信息
return $"处理失败: {ex.Message}";
}
}
- 性能优化:对于频繁调用的 WorkflowAgent,考虑添加缓存中间件
// 示例:使用缓存中间件 (需自行实现或引用相关库)
// var cachedWorkflowAgent = new CachingAgent(workflowAgent);
四、工作流的事件机制
| 事件类别 | 作用域 | 代表事件 | 使用频率 |
|---|---|---|---|
| Executor 事件 | 单个执行器 | ExecutorCompletedEvent | ⭐⭐⭐⭐⭐ |
| SuperStep 事件 | 执行步骤 | SuperStepCompletedEvent | ⭐⭐ |
| Workflow 事件 | 整个工作流 | WorkflowOutputEvent | ⭐⭐⭐⭐⭐ |
| Agent 事件 | AI Agent | AgentRunUpdateEvent | ⭐⭐⭐⭐ |
| 交互事件 | 人机协作 | RequestInfoEvent | ⭐⭐⭐ |
1. WorkflowEvent 基类剖析
- 类定义
public class WorkflowEvent
{
// 事件携带的数据负载
public object? Data { get; }
// 构造函数
public WorkflowEvent(object? data = null)
{
Data = data;
}
// 提供友好的字符串表示
public override string ToString() =>
Data is not null ?
$"{GetType().Name}(Data: {Data.GetType()} = {Data})" :
$"{GetType().Name}()";
}
- 核心特性
| 特性 | 说明 | 示例 |
|---|---|---|
| 通用性 | 所有事件的基类,提供统一接口 | 可以用 WorkflowEvent 类型接收所有事件 |
| 数据承载 | Data 属性存储事件的业务数据 | 执行结果、错误信息、状态对象等 |
| 可扩展 | 子类可以添加特定属性 | ExecutorEvent 添加 ExecutorId |
| 调试友好 | ToString() 提供清晰的事件描述 | 便于日志输出和调试 |
ExecutorEvent - Executor 作用域事件,ExecutorEvent 是最常用的事件类别,它表示与特定 Executor 执行相关的事件。
关键属性
- ExecutorId:标识哪个 Executor 触发了这个事件
- 用于追踪执行流程
- 区分并发执行的不同 Executor
- 实现基于 Executor 的过滤逻辑
四大子事件
- ExecutorInvokedEvent:开始执行
- ExecutorCompletedEvent:成功完成
- ExecutorFailedEvent:执行失败
- AgentRunUpdateEvent:Agent 更新
- ExecutorId:标识哪个 Executor 触发了这个事件
public class ExecutorEvent : WorkflowEvent
{
// 触发此事件的 Executor 的唯一标识符
public string ExecutorId { get; }
public ExecutorEvent(string executorId, object? data) : base(data)
{
ExecutorId = executorId;
}
public override string ToString() =>
Data is not null ?
$"{GetType().Name}(Executor = {ExecutorId}, Data: {Data.GetType()} = {Data})" :
$"{GetType().Name}(Executor = {ExecutorId})";
}
SuperStepEvent - 执行步骤事件,SuperStepEvent 用于表示工作流的执行步骤(SuperStep)级别的事件。
在 MAF Workflow 中,SuperStep 是一个执行单元,可以包含:
- 一个或多个 Executor 的并发执行
- 一个完整的顺序执行链
- 两个子事件
| 事件类型 | 触发时机 | 主要用途 |
|---|---|---|
| SuperStepStartedEvent | SuperStep 开始执行 | 监控执行进度 |
| SuperStepCompletedEvent | SuperStep 完成执行 | 调试、Checkpoint 保存 |
使用场景:在调试复杂的并发工作流时,SuperStep 事件可以帮助你理解执行的阶段划分。
public class SuperStepEvent : WorkflowEvent
{
// SuperStep 的索引(从 0 开始)
public int StepNumber { get; }
public SuperStepEvent(int stepNumber, object? data = null) : base(data)
{
StepNumber = stepNumber;
}
}
2. 流式执行机制详解
MAF Workflow 提供两种执行模式:
| 执行模式 | 方法 | 特点 | 适用场景 |
|---|---|---|---|
| 同步执行 | RunAsync() | 等待完成后返回结果 | 简单工作流、不需要中间状态 |
| 流式执行 | StreamAsync() | 实时返回事件流 | 需要进度监控、Agent 工作流 |
sequenceDiagram
participant User as 用户代码
participant Workflow as 工作流引擎
participant Executor as Executors
Note over User,Executor: 同步执行 (RunAsync)
User->>Workflow: RunAsync(input)
Workflow->>Executor: 执行所有步骤
Executor-->>Workflow: 返回结果
Workflow-->>User: 返回 Run (包含结果)
Note over User: 阻塞等待完成
Note over User,Executor: 流式执行 (StreamAsync)
User->>Workflow: StreamAsync(input)
Workflow-->>User: 返回 StreamingRun
Note over User: 立即返回
User->>Workflow: WatchStreamAsync()
loop 执行过程中
Workflow->>Executor: 执行步骤
Executor-->>Workflow: 产生事件
Workflow-->>User: 实时推送事件
Note over User: 实时处理
end
- 核心 API:WatchStreamAsync:是流式执行的核心方法,它返回一个 IAsyncEnumerable
,可以使用 await foreach 遍历。
// 1. 启动流式执行
StreamingRun run = await InProcessExecution.StreamAsync(workflow, input);
// 2. 订阅事件流
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
// 3. 处理每个事件
Console.WriteLine($"收到事件: {evt.GetType().Name}");
}
关键概念
| 概念 | 说明 |
|---|---|
| StreamingRun | 流式执行的句柄,用于监听事件和发送消息 |
| IAsyncEnumerable | 异步流,事件按产生顺序依次返回 |
| await foreach | 异步迭代,每收到一个事件就处理一次 |
| 实时性 | 事件一产生就推送,不等待工作流完成 |
执行流程图
flowchart TB
A[StreamAsync] --> B[返回 StreamingRun]
B --> C[WatchStreamAsync]
C --> D[await foreach 循环]
D --> E{收到事件?}
E -->|是| F[处理事件]
F --> E
E -->|工作流完成| G[退出循环]
H[Workflow Engine] -.产生事件.-> E
style A fill:#2196F3,stroke:#1565C0,stroke-width:2px,color:#fff
style C fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
style F fill:#FF9800,stroke:#E65100,stroke-width:2px,color:#fff
- 实验 1:观察基础事件流
我们使用前面章节创建的文本处理管道
// 构建工作流
var uppercaseExecutor = new UppercaseExecutor();
var reverseExecutor = new ReverseExecutor();
var simpleWorkflow = new WorkflowBuilder(uppercaseExecutor)
.AddEdge(uppercaseExecutor, reverseExecutor)
.WithOutputFrom(reverseExecutor)
.Build();
// 执行工作流并监听所有事件
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("开始执行工作流 (流式模式)");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
var input = "Hello Workflow Events!";
Console.WriteLine($"输入: \"{input}\"\n");
// 启动流式执行
StreamingRun run = await InProcessExecution.StreamAsync(simpleWorkflow, input);
Console.WriteLine("开始监听事件流...\n");
Console.WriteLine(new string('─', 60));
int eventCount = 0;
// 订阅事件流
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
eventCount++;
// 显示事件的完整信息
Console.WriteLine($"\n事件 #{eventCount}");
Console.WriteLine($" 类型: {evt.GetType().Name}");
// // 根据事件类型显示详细信息
// switch (evt)
// {
// case WorkflowStartedEvent:
// Console.WriteLine($" 说明: 工作流开始执行");
// break;
// case ExecutorInvokedEvent invokedEvent:
// Console.WriteLine($" Executor: {invokedEvent.ExecutorId}");
// Console.WriteLine($" 说明: Executor 开始执行");
// Console.WriteLine($" 输入: {invokedEvent.Data}");
// break;
// case ExecutorCompletedEvent completedEvent:
// Console.WriteLine($" Executor: {completedEvent.ExecutorId}");
// Console.WriteLine($" 说明: Executor 执行完成");
// Console.WriteLine($" 输出: {completedEvent.Data}");
// break;
// case SuperStepCompletedEvent stepEvent:
// Console.WriteLine($" SuperStep: #{stepEvent.StepNumber}");
// Console.WriteLine($" 说明: 执行步骤完成");
// break;
// case WorkflowOutputEvent outputEvent:
// Console.WriteLine($" 来源: {outputEvent.SourceId}");
// Console.WriteLine($" 说明: 工作流产生输出");
// Console.WriteLine($" 结果: {outputEvent.Data}");
// break;
// default:
// Console.WriteLine($" 数据: {evt.Data}");
// break;
// }
Console.WriteLine(new string('─', 60));
}
Console.WriteLine($"\n工作流执行完成,共产生 {eventCount} 个事件");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
通过上面的实验,我们可以观察到一个典型的事件序列:
1. SuperStepStartedEvent ← SuperStep 0 开始
2. ExecutorInvokedEvent ← UppercaseExecutor 开始
3. ExecutorCompletedEvent ← UppercaseExecutor 完成
4. SuperStepCompletedEvent ← SuperStep 0 完成
5. SuperStepStartedEvent ← SuperStep 1 开始
6. ExecutorInvokedEvent ← ReverseExecutor 开始
7. ExecutorCompletedEvent ← ReverseExecutor 完成
8. WorkflowOutputEvent ← 工作流输出结果
9. SuperStepCompletedEvent ← SuperStep 1 完成
关键发现
- SuperStep 结构:每个 SuperStep 都有明确的开始和结束事件
- 执行顺序:SuperStepStarted → ExecutorInvoked → ExecutorCompleted → SuperStepCompleted
- 完整性:每个 Executor 都有 Invoked 和 Completed 事件对
- 输出时机:WorkflowOutputEvent 在最后一个 Executor 完成后、SuperStep 完成前触发
- 可追溯:通过事件序号和类型可以完整追踪执行流程
3. 系统内置事件详解
ExecutorCompletedEvent - 最常用的事件:ExecutorCompletedEvent 是工作流开发中使用频率最高的事件,它表示一个 Executor 成功完成执行。
public sealed class ExecutorCompletedEvent : ExecutorEvent
{
public ExecutorCompletedEvent(string executorId, object? result)
: base(executorId, data: result)
{
}
}
核心用途
| 用途 | 说明 | 示例 |
|---|---|---|
| 获取结果 | Data 属性包含 Executor 的返回值 | 提取中间处理结果 |
| 进度追踪 | 标记某个步骤已完成 | 更新进度条 |
| 链路追踪 | 记录执行顺序 | 构建执行日志 |
| 调试分析 | 查看每个步骤的输出 | 定位数据转换问题 |
实战示例
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent completed)
{
Console.WriteLine($"{completed.ExecutorId} 完成");
Console.WriteLine($" 输出: {completed.Data}");
// 根据 ExecutorId 做不同处理
if (completed.ExecutorId == "ValidationExecutor")
{
bool isValid = (bool)completed.Data;
if (!isValid)
{
Console.WriteLine("验证失败,终止后续流程");
}
}
}
}
- AgentRunUpdateEvent - Agent 专用事件:当 Agent 在工作流中执行时,会产生 AgentRunUpdateEvent 来报告执行进度和输出内容。
public class AgentRunUpdateEvent : ExecutorEvent
{
public AgentRunUpdateEvent(string executorId, AgentRunResponseUpdate update)
: base(executorId, data: update)
{
Update = update;
}
public AgentRunResponseUpdate Update { get; }
}
关键属性 AgentRunResponseUpdate对象,包含:
- Text:Agent 输出的文本内容
- Contents:结构化内容(可能包含 FunctionCall 等)
- 其他 Agent 运行时信息
触发条件:只有在发送 TurnToken 时设置 emitEvents: true,才会产生此事件!
// 正确:会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
// 错误:不会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: false));
实战示例
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentRunUpdateEvent agentEvent)
{
// 实时显示 Agent 输出
Console.Write(agentEvent.Update.Text);
// 检查是否有函数调用
var functionCall = agentEvent.Update.Contents
.OfType<FunctionCallContent>()
.FirstOrDefault();
if (functionCall != null)
{
Console.WriteLine($"\n调用函数: {functionCall.Name}");
}
}
}
- WorkflowOutputEvent - 工作流输出事件:WorkflowOutputEvent 标志着工作流产生了输出,通常意味着工作流即将完成或已经完成。
public sealed class WorkflowOutputEvent : WorkflowEvent
{
internal WorkflowOutputEvent(object data, string sourceId) : base(data)
{
SourceId = sourceId;
}
public string SourceId { get; }
// 类型安全的数据提取方法
public bool Is<T>() => Data is T;
public bool Is<T>([NotNullWhen(true)] out T? maybeValue)
{
if (Data is T value)
{
maybeValue = value;
return true;
}
maybeValue = default;
return false;
}
public T? As<T>() => Data is T value ? value : default;
}
核心特性
| 特性 | 说明 |
|---|---|
| SourceId | 产生输出的 Executor ID |
| 类型安全 | 提供 Is |
| 终止标志 | 通常是事件流的最后一个事件 |
实战示例
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine($"工作流完成,来自: {outputEvent.SourceId}");
// 类型安全的数据提取
if (outputEvent.Is<string>(out var textResult))
{
Console.WriteLine($"文本结果: {textResult}");
}
else if (outputEvent.Is<List<ChatMessage>>(out var messages))
{
Console.WriteLine($"对话消息: {messages.Count} 条");
}
// 或者使用 As<T>
var result = outputEvent.As<MyCustomType>();
if (result != null)
{
// 处理自定义类型
}
}
}
- ExecutorFailedEvent - 错误处理事件:当 Executor 执行过程中抛出异常时,会产生 ExecutorFailedEvent。
public sealed class ExecutorFailedEvent : ExecutorEvent
{
public ExecutorFailedEvent(string executorId, Exception? err)
: base(executorId, data: err)
{
Data = err;
}
public new Exception? Data { get; }
}
实战示例
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is ExecutorFailedEvent failedEvent)
{
Console.WriteLine($"{failedEvent.ExecutorId} 执行失败");
Console.WriteLine($" 错误类型: {failedEvent.Data?.GetType().Name}");
Console.WriteLine($" 错误消息: {failedEvent.Data?.Message}");
Console.WriteLine($" 堆栈跟踪: {failedEvent.Data?.StackTrace}");
// 根据错误类型做不同处理
switch (failedEvent.Data)
{
case HttpRequestException httpEx:
Console.WriteLine("网络错误,建议重试");
break;
case TimeoutException timeoutEx:
Console.WriteLine("超时错误,可能需要增加超时时间");
break;
default:
Console.WriteLine("未知错误,请检查日志");
break;
}
}
}
- RequestInfoEvent - 人机协作事件:RequestInfoEvent 用于 Human-in-the-Loop 场景,表示工作流需要外部输入(通常是人工干预)。
public sealed class RequestInfoEvent : WorkflowEvent
{
public RequestInfoEvent(ExternalRequest request) : base(request)
{
Request = request;
}
public ExternalRequest Request { get; }
}
使用场景
- 审批流程:等待管理员批准
- 内容审核:等待人工审核敏感内容
- 用户交互:等待用户提供额外信息
- 配置选择:等待用户选择执行路径
实战示例
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is RequestInfoEvent requestEvent)
{
Console.WriteLine("工作流暂停,等待外部输入");
Console.WriteLine($" 请求ID: {requestEvent.Request.RequestId}");
// 获取用户输入(实际场景可能是 Web API 或 UI)
Console.Write("请输入响应内容: ");
var userInput = Console.ReadLine();
// 创建响应
var response = new ExternalResponse
{
RequestId = requestEvent.Request.RequestId,
Data = userInput
};
// 将响应发送回工作流
await run.TrySendMessageAsync(response);
Console.WriteLine("响应已发送,工作流继续执行");
}
}
4. 事件监听与过滤
在实际应用中,我们通常不需要处理所有事件,而是只关注特定类型的事件,接下来我们看看多种事件过滤技巧。
- 基于事件类型的过滤,最基础的过滤方式是使用 is 模式匹配或 switch 表达式。
// 示例:只处理 ExecutorCompletedEvent
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent completed)
{
Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
}
// 忽略其他事件
}
// 使用 switch 表达式处理多种事件类型
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
var message = evt switch
{
ExecutorCompletedEvent completed =>
$"{completed.ExecutorId} 完成: {completed.Data}",
ExecutorFailedEvent failed =>
$"{failed.ExecutorId} 失败: {failed.Data?.Message}",
AgentRunUpdateEvent agentUpdate =>
$"Agent 更新: {agentUpdate.Update.Text}",
WorkflowOutputEvent output =>
$"输出结果: {output.Data}",
_ => null // 忽略其他事件
};
if (message != null)
{
Console.WriteLine(message);
}
}
基于 ExecutorId 的过滤,在复杂的工作流中,我们可能只关注特定 Executor 的事件。
使用场景
进度监控:只追踪关键步骤的执行
问题调试:定位某个特定 Executor 的问题
日志记录:只记录重要步骤的日志
性能优化:减少不必要的事件处理
// 只监听特定 Executor 的完成事件
var targetExecutorId = "ReverseExecutor";
Console.WriteLine($"只监听 {targetExecutorId} 的事件\n");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
// 方式 1: 直接在 ExecutorEvent 上过滤
if (evt is ExecutorEvent executorEvent &&
executorEvent.ExecutorId == targetExecutorId)
{
Console.WriteLine($"{evt.GetType().Name}");
Console.WriteLine($" Executor: {executorEvent.ExecutorId}");
Console.WriteLine($" 数据: {executorEvent.Data}\n");
}
}
// 方式 2: 组合多个条件
var interestedExecutors = new[] { "UppercaseExecutor", "ReverseExecutor" };
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent completed &&
interestedExecutors.Contains(completed.ExecutorId))
{
Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
}
}
- LINQ 过滤(最优雅的方式),利用 C# 的 LINQ 能力,我们可以创建更加优雅和可复用的过滤逻辑。
| 优势 | 说明 |
|---|---|
| 链式调用 | 可以组合多个过滤条件 |
| 类型安全 | 强类型支持,编译时检查 |
| 代码复用 | 过滤逻辑可以封装为扩展方法 |
| 可读性强 | 语义清晰,接近自然语言 |
// 定义 LINQ 扩展方法
//public static class WorkflowEventExtensions
//{
// 过滤特定类型的事件
public static async IAsyncEnumerable<T> OfEventType<T>(
this IAsyncEnumerable<WorkflowEvent> events) where T : WorkflowEvent
{
await foreach (var evt in events)
{
if (evt is T typedEvent)
{
yield return typedEvent;
}
}
}
// 过滤特定 Executor 的事件
public static async IAsyncEnumerable<ExecutorEvent> FromExecutor(
this IAsyncEnumerable<WorkflowEvent> events,
string executorId)
{
await foreach (var evt in events)
{
if (evt is ExecutorEvent executorEvent &&
executorEvent.ExecutorId == executorId)
{
yield return executorEvent;
}
}
}
// 过滤多个 Executor 的事件
public static async IAsyncEnumerable<ExecutorEvent> FromExecutors(
this IAsyncEnumerable<WorkflowEvent> events,
params string[] executorIds)
{
var executorSet = new HashSet<string>(executorIds);
await foreach (var evt in events)
{
if (evt is ExecutorEvent executorEvent &&
executorSet.Contains(executorEvent.ExecutorId))
{
yield return executorEvent;
}
}
}
//}
使用 LINQ 扩展方法的优雅示例
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("LINQ 过滤示例");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
var simpleWorkflow = new WorkflowBuilder(uppercaseExecutor)
.AddEdge(uppercaseExecutor, reverseExecutor)
.WithOutputFrom(reverseExecutor)
.Build();
// 重新执行工作流
var run2 = await InProcessExecution.StreamAsync(simpleWorkflow, "Hello LINQ");
// 示例 1: 只监听 ExecutorCompletedEvent
Console.WriteLine("示例 1: 只监听完成事件\n");
await foreach (var completed in run2.WatchStreamAsync().OfEventType<ExecutorCompletedEvent>())
{
Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
}
await run2.DisposeAsync();
Console.WriteLine("\n" + new string('─', 60) + "\n");
// 示例 2: 只监听特定 Executor
var run3 = await InProcessExecution.StreamAsync(simpleWorkflow, "Hello Filter");
Console.WriteLine("示例 2: 只监听 ReverseExecutor\n");
await foreach (var evt in run3.WatchStreamAsync().FromExecutor("ReverseExecutor"))
{
Console.WriteLine($"{evt.GetType().Name}: {evt.Data}");
}
await run3.DisposeAsync();
Console.WriteLine("\n" + new string('─', 60) + "\n");
// 示例 3: 链式组合(只监听特定 Executor 的完成事件)
var run4 = await InProcessExecution.StreamAsync(simpleWorkflow, "Hello Chain");
Console.WriteLine("示例 3: 链式过滤 (ReverseExecutor + Completed)\n");
await foreach (var completed in run4.WatchStreamAsync()
.FromExecutor("ReverseExecutor")
.OfEventType<ExecutorCompletedEvent>())
{
Console.WriteLine($"{completed.ExecutorId}: {completed.Data}");
}
await run4.DisposeAsync();
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("LINQ 过滤示例完成");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
5. 实战案例 - 构建事件监控系统
现在让我们构建一个完整的事件监控系统,展示如何在实际项目中应用事件系统。
我们将构建一个 WorkflowMonitor 类,它可以:
- 实时统计:记录每个 Executor 的执行时间
- 日志记录:记录关键事件
- 错误监控:捕获并报告错误
- 进度展示:显示执行进度
- 性能分析:识别性能瓶颈
监控系统架构
flowchart TB
subgraph "事件流"
A[WorkflowEvent Stream]
end
subgraph "WorkflowMonitor"
B[事件接收器]
C[统计收集器]
D[日志记录器]
E[错误处理器]
F[进度追踪器]
end
subgraph "输出"
G[控制台日志]
H[性能报告]
I[错误报告]
end
A --> B
B --> C
B --> D
B --> E
B --> F
C --> H
D --> G
E --> I
F --> G
style A fill:#2196F3,stroke:#1565C0,stroke-width:2px,color:#fff
style B fill:#4CAF50,stroke:#2E7D32,stroke-width:2px,color:#fff
style H fill:#FF9800,stroke:#E65100,stroke-width:2px,color:#fff
- 监控类定义
// 完整的工作流监控系统
public class WorkflowMonitor
{
// 执行统计信息
private class ExecutorStats
{
public string ExecutorId { get; set; } = "";
public DateTime? StartTime { get; set; }
public DateTime? EndTime { get; set; }
public TimeSpan? Duration => EndTime - StartTime;
public bool Success { get; set; }
public object? Result { get; set; }
public Exception? Error { get; set; }
}
private readonly Dictionary<string, ExecutorStats> _stats = new();
private readonly List<string> _logs = new();
private DateTime _workflowStartTime;
private DateTime _workflowEndTime;
private int _totalEvents = 0;
private int _superStepCount = 0;
// 监控入口方法
public async Task MonitorAsync(StreamingRun run)
{
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("工作流监控系统已启动");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
_totalEvents++;
await HandleEventAsync(evt);
}
// 执行完成后生成报告
GenerateReport();
}
// 处理单个事件
private async Task HandleEventAsync(WorkflowEvent evt)
{
switch (evt)
{
case WorkflowStartedEvent:
HandleWorkflowStarted();
break;
case ExecutorInvokedEvent invoked:
HandleExecutorInvoked(invoked);
break;
case ExecutorCompletedEvent completed:
HandleExecutorCompleted(completed);
break;
case ExecutorFailedEvent failed:
HandleExecutorFailed(failed);
break;
case SuperStepCompletedEvent step:
HandleSuperStepCompleted(step);
break;
case WorkflowOutputEvent output:
HandleWorkflowOutput(output);
break;
case AgentRunUpdateEvent agentUpdate:
HandleAgentUpdate(agentUpdate);
break;
}
await Task.CompletedTask;
}
// 工作流开始
private void HandleWorkflowStarted()
{
_workflowStartTime = DateTime.Now;
Log("工作流开始执行");
}
// Executor 调用
private void HandleExecutorInvoked(ExecutorInvokedEvent evt)
{
_stats[evt.ExecutorId] = new ExecutorStats
{
ExecutorId = evt.ExecutorId,
StartTime = DateTime.Now
};
Log($" [{evt.ExecutorId}] 开始执行");
}
// Executor 完成
private void HandleExecutorCompleted(ExecutorCompletedEvent evt)
{
if (_stats.TryGetValue(evt.ExecutorId, out var stats))
{
stats.EndTime = DateTime.Now;
stats.Success = true;
stats.Result = evt.Data;
Log($"[{evt.ExecutorId}] 执行成功 (耗时: {stats.Duration?.TotalMilliseconds:F2}ms)");
}
}
// Executor 失败
private void HandleExecutorFailed(ExecutorFailedEvent evt)
{
if (_stats.TryGetValue(evt.ExecutorId, out var stats))
{
stats.EndTime = DateTime.Now;
stats.Success = false;
stats.Error = evt.Data;
Log($"[{evt.ExecutorId}] 执行失败: {evt.Data?.Message}");
}
}
// SuperStep 完成
private void HandleSuperStepCompleted(SuperStepCompletedEvent evt)
{
_superStepCount++;
Log($"SuperStep #{evt.StepNumber} 完成");
}
// 工作流输出
private void HandleWorkflowOutput(WorkflowOutputEvent evt)
{
_workflowEndTime = DateTime.Now;
Log($"工作流输出结果 (来自: {evt.SourceId})");
}
// Agent 更新
private void HandleAgentUpdate(AgentRunUpdateEvent evt)
{
if (!string.IsNullOrEmpty(evt.Update.Text))
{
Log($"[{evt.ExecutorId}] Agent 输出: {evt.Update.Text}");
}
}
// 记录日志
private void Log(string message)
{
var timestamp = DateTime.Now.ToString("HH:mm:ss.fff");
var logMessage = $"[{timestamp}] {message}";
_logs.Add(logMessage);
Console.WriteLine(logMessage);
}
// 生成性能报告
private void GenerateReport()
{
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("执行性能报告");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
// 总体统计
var totalDuration = _workflowEndTime - _workflowStartTime;
Console.WriteLine("### 总体统计");
new
{
总事件数 = _totalEvents,
SuperStep数量 = _superStepCount,
Executor数量 = _stats.Count,
总耗时 = $"{totalDuration.TotalMilliseconds:F2}ms",
成功数 = _stats.Values.Count(s => s.Success),
失败数 = _stats.Values.Count(s => !s.Success)
}.Display();
Console.WriteLine("\n### Executor 性能明细\n");
// Executor 性能表格
foreach (var stats in _stats.Values.OrderBy(s => s.StartTime))
{
var status = stats.Success ? "成功" : "失败";
var duration = stats.Duration?.TotalMilliseconds.ToString("F2") ?? "N/A";
Console.WriteLine($"{stats.ExecutorId}");
Console.WriteLine($" 状态: {status}");
Console.WriteLine($" 耗时: {duration}ms");
if (stats.Error != null)
{
Console.WriteLine($" 错误: {stats.Error.Message}");
}
Console.WriteLine();
}
// 性能分析
if (_stats.Values.Any(s => s.Duration.HasValue))
{
var slowest = _stats.Values
.Where(s => s.Duration.HasValue)
.OrderByDescending(s => s.Duration)
.First();
Console.WriteLine("### 性能瓶颈");
Console.WriteLine($"最慢的 Executor: {slowest.ExecutorId}");
Console.WriteLine($" 耗时: {slowest.Duration?.TotalMilliseconds:F2}ms");
}
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
}
}
- 使用监控系统
// 创建一个更复杂的工作流来测试监控系统
public class DataProcessorExecutor : Executor<string, string>
{
public DataProcessorExecutor() : base("DataProcessor") { }
public override async ValueTask<string> HandleAsync(
string input,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 模拟一些处理时间
await Task.Delay(100);
return $"[Processed: {input}]";
}
}
public class DataValidatorExecutor : Executor<string, bool>
{
public DataValidatorExecutor() : base("DataValidator") { }
public override async ValueTask<bool> HandleAsync(
string input,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 模拟验证时间
await Task.Delay(50);
return input.Length > 0;
}
}
public class DataFormatterExecutor : Executor<string, string>
{
public DataFormatterExecutor() : base("DataFormatter") { }
public override async ValueTask<string> HandleAsync(
string input,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 模拟格式化时间
await Task.Delay(80);
return $"**{input}**";
}
}
// 构建测试工作流
var processor = new DataProcessorExecutor();
var validator = new DataValidatorExecutor();
var formatter = new DataFormatterExecutor();
var testWorkflow = new WorkflowBuilder(processor)
.AddEdge(processor, validator)
.AddEdge(validator, formatter)
.Build();
5. 最佳实践建议
DO(推荐做法)
使用 LINQ 扩展进行过滤 - 代码更清晰、可复用
监听特定事件类型 - 避免处理不需要的事件
记录关键事件 - 便于调试和审计
异步处理事件 - 避免阻塞事件流
使用 switch 表达式 - 处理多种事件类型时更简洁
DON'T(避免做法)
- 在事件处理中做耗时操作 - 会阻塞事件流
- 忽略 ExecutorFailedEvent - 可能导致静默失败
- 过度监听所有事件 - 影响性能
- 在事件处理中修改工作流状态 - 可能导致意外行为
- 忘记设置 emitEvents: true - Agent 事件不会产生
事件监听模式速查
| 场景 | 代码示例 | 适用情况 |
|---|---|---|
| 监听所有事件 | await foreach (var e in run.WatchStreamAsync()) | 调试、完整日志 |
| 监听特定类型 | run.WatchStreamAsync().OfEventType |
只关注某类事件 |
| 监听特定 Executor | run.WatchStreamAsync().FromExecutor("ExecutorId") | 追踪特定步骤 |
| 组合过滤 | run.WatchStreamAsync().FromExecutor("X").OfEventType |
精确过滤 |
| 多类型处理 | evt switch | 差异化处理 |
- 常用事件属性速查
| 事件类型 | 关键属性 | 数据类型 | 获取方式 |
|---|---|---|---|
| ExecutorCompletedEvent | ExecutorId | string | evt.ExecutorId |
| Data (结果) | object? | evt.Data | |
| ExecutorFailedEvent | ExecutorId | string | evt.ExecutorId |
| Data (异常) | Exception? | evt.Data | |
| AgentRunUpdateEvent | ExecutorId | string | evt.ExecutorId |
| Update | AgentRunResponseUpdate | evt.Update | |
| Text (输出文本) | string | evt.Update.Text | |
| WorkflowOutputEvent | SourceId | string | evt.SourceId |
| Data (结果) | object? | evt.Data | |
| 类型检查 | bool | evt.Is |
|
| 类型转换 | T? | evt.As |
|
| SuperStepCompletedEvent | StepNumber | int | evt.StepNumber |
- 错误处理检查清单
- 监听
ExecutorFailedEvent处理异常 - 检查
WorkflowOutputEvent是否为 null - 使用类型安全的
Is<T>()和As<T>()方法 - 在事件处理中捕获异常,避免崩溃
- 记录关键错误信息(ExecutorId、异常类型、堆栈)
- 监听
- 性能优化检查清单
- 使用过滤器减少不必要的事件处理
- 避免在事件处理中执行耗时操作
- 对于密集事件(如 AgentRunUpdateEvent),考虑批处理
- 使用异步操作,避免阻塞事件流
- 监控事件处理的性能瓶颈
- AgentRunUpdateEvent 必备条件:
// 正确:会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
// 错误:不会产生 AgentRunUpdateEvent
await run.TrySendMessageAsync(new TurnToken(emitEvents: false));
await run.TrySendMessageAsync(new TurnToken()); // 默认 false
- 调试技巧速查
| 技巧 | 实现方式 | 用途 |
|---|---|---|
| 查看所有事件 | Console.WriteLine(evt) | 了解事件序列 |
| 记录事件时间戳 | DateTime.Now 在事件处理时记录 | 性能分析 |
| 统计事件数量 | 维护计数器 | 验证执行完整性 |
| 追踪执行路径 | 记录 ExecutorId 序列 | 理解执行流程 |
| 捕获中间结果 | 保存 ExecutorCompletedEvent.Data | 数据流分析 |
五、检查点
1. 概念
在现实生活中,我们经常会遇到需要"保存进度"的场景:
| 场景 | 检查点机制 | 作用 |
|---|---|---|
| 玩游戏 | 存档点 | 游戏失败后可以从存档点重新开始 |
| 写文档 | 自动保存 | 意外关闭后可以恢复到最近保存的版本 |
| Git 版本控制 | Commit 提交点 | 可以回退到任意历史提交点 |
| 数据库事务 | 事务日志 | 系统崩溃后可以从日志恢复数据 |
在 Microsoft Agent Framework (MAF) 的 Workflow 中,检查点(Checkpoint) 就是这样一种机制:
定义:检查点是 Workflow 执行过程中的状态快照,它记录了某个时刻所有 Executor 的状态数据,使得工作流可以在未来的某个时间点从该状态继续执行或重新执行。
检查点机制为 Workflow 应用提供了强大的能力:
容错恢复(Fault Tolerance)
场景示例:长时间运行的数据处理工作流,在处理了 1000 条记录后系统崩溃,可以从最近的检查点恢复,而不必从头开始。
状态回溯(State Rollback)
场景示例:在 AI Agent 决策过程中,如果某条路径的结果不理想,可以回退到之前的检查点,尝试其他决策路径。
flowchart LR
A[初始状态] --> B[检查点 1]
B --> C[检查点 2]
C --> D[检查点 3]
D --> E[当前状态]
E -.回退.-> C
C --> F[新路径探索]
分支探索(Branch Exploration)
场景示例:从同一个检查点创建多个独立的工作流实例,并行探索不同的执行路径(A/B 测试、参数调优)。
人工介入(Human-in-the-Loop)
场景示例:在审批流程中,工作流执行到需要人工审批的节点时创建检查点,等待人工决策后再从该检查点继续执行。
| 维度 | 无检查点 | 有检查点 |
|---|---|---|
| 系统崩溃 | 需要从头开始,浪费计算资源 | 从最近检查点恢复,节省时间 |
| 调试复杂流程 | 每次调试都要完整运行 | 可以从特定检查点开始调试 |
| 长时间任务 | 中断后全部重来 | 断点续传,不丢失进度 |
| 多路径探索 | 需要多次完整执行 | 从检查点分叉,提高效率 |
| 人工干预 | 难以实现异步等待 | 保存状态后等待,自然支持 |
2. 核心概念:Super Step(超级步骤)
定义:Super Step 是 Workflow 执行的基本阶段单位。一个 Super Step 包含一组 Executor 的完整执行周期,从接收信号开始,到所有 Executor 完成工作为止。
Super Step 的执行流程
flowchart TD
A[开始 Super Step] --> B{有待执行的 Executor?}
B -->|是| C[执行 Executor 1]
B -->|是| D[执行 Executor 2]
B -->|是| E[执行 Executor N]
C --> F[等待所有 Executor 完成]
D --> F
E --> F
F --> G[Super Step 完成]
G --> H[自动创建检查点]
H --> I{还有后续信号?}
I -->|是| A
I -->|否| J[工作流结束]
| 特性 | 说明 |
|---|---|
| 并发执行 | 同一 Super Step 中的多个 Executor 可以并行执行 |
| 完整性保证 | 只有当前 Super Step 的所有 Executor 完成后,才进入下一个 Super Step |
| 检查点边界 | 每个 Super Step 结束时自动创建检查点 |
3. 核心概念:Checkpoint(检查点)
当为 Workflow 提供了 CheckpointManager 时,系统会在每个 Super Step 结束时自动创建检查点。
// 创建带检查点的工作流运行
var checkpointManager = CheckpointManager.Default;
await using Checkpointed<StreamingRun> checkpointedRun =
await InProcessExecution.StreamAsync(workflow, initialSignal, checkpointManager);
检查点通过 CheckpointInfo 对象表示,它包含:
| 属性 | 类型 | 说明 |
|---|---|---|
| RunId | Guid | 工作流运行实例的唯一标识 |
| Sequence | int | 检查点序号(第几个 Super Step) |
| StateData | Dictionary | 所有 Executor 保存的状态数据 |
| Timestamp | DateTime | 检查点创建时间 |
检查点通过 SuperStepCompletedEvent 事件获取:
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// 从事件中获取自动创建的检查点
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
// 保存检查点以备后用
checkpoints.Add(checkpoint);
Console.WriteLine($"检查点已创建:序号 {checkpoint.Sequence}");
}
}
}
4. 核心概念:CheckpointManager(检查点管理器)
CheckpointManager 是负责存储和读取检查点的组件:
flowchart LR
A[Workflow Engine] -->|保存检查点| B[CheckpointManager]
B -->|存储到| C[(存储介质)]
C -->|读取检查点| B
B -->|恢复状态| A
style C fill:#e1f5ff
MAF 提供了开箱即用的默认实现:CheckpointManager.Default
MAF 支持自定义 ICheckpointManager 接口实现:
public interface ICheckpointManager
{
// 保存检查点
ValueTask SaveAsync(CheckpointInfo checkpoint, CancellationToken cancellationToken);
// 加载检查点
ValueTask<CheckpointInfo> LoadAsync(Guid runId, int sequence, CancellationToken cancellationToken);
// 列出所有检查点
IAsyncEnumerable<CheckpointInfo> ListAsync(Guid runId, CancellationToken cancellationToken);
}
提示:在生产环境中,可以实现基于 Azure Blob Storage、SQL Server 或 Redis 的 CheckpointManager。
5. 核心概念:Executor 的检查点生命周期
要让 Executor 支持检查点,需要实现两个关键方法:保存状态 和 恢复状态。
检查点生命周期流程
sequenceDiagram
participant WF as Workflow Engine
participant Ex as Executor
participant CM as CheckpointManager
Note over WF,CM: Super Step 执行阶段
WF->>Ex: HandleAsync(signal)
Ex->>Ex: 执行业务逻辑
Ex->>Ex: 更新内部状态
Note over WF,CM: Super Step 完成阶段
WF->>Ex: OnCheckpointingAsync()
Ex->>WF: QueueStateUpdate("key", stateData)
WF->>CM: SaveAsync(checkpoint)
Note over WF,CM: 状态恢复阶段
CM->>WF: LoadAsync(checkpointInfo)
WF->>Ex: OnCheckpointRestoredAsync()
Ex->>WF: GetStateAsync("key")
Ex->>Ex: 恢复内部状态
让我们看看源码中的完整实现:
/// <summary>
/// 猜数字游戏的 Executor,支持检查点机制
/// </summary>
internal sealed class GuessNumberExecutor() : Executor<NumberSignal>("Guess")
{
// 状态数据:猜测范围的上下界
public int LowerBound { get; private set; }
public int UpperBound { get; private set; }
// 用于在检查点中存储状态的键
private const string StateKey = "GuessNumberExecutorState";
// 构造函数:初始化范围
public GuessNumberExecutor(int lowerBound, int upperBound) : this()
{
this.LowerBound = lowerBound;
this.UpperBound = upperBound;
}
// 计算下一次猜测的值(二分法)
private int NextGuess => (this.LowerBound + this.UpperBound) / 2;
// 业务逻辑:处理不同信号
public override async ValueTask HandleAsync(
NumberSignal message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
switch (message)
{
case NumberSignal.Init:
await context.SendMessageAsync(this.NextGuess, cancellationToken: cancellationToken);
break;
case NumberSignal.Above:
this.UpperBound = this.NextGuess - 1; // 调整上界
await context.SendMessageAsync(this.NextGuess, cancellationToken: cancellationToken);
break;
case NumberSignal.Below:
this.LowerBound = this.NextGuess + 1; // 调整下界
await context.SendMessageAsync(this.NextGuess, cancellationToken: cancellationToken);
break;
}
}
/// <summary>
/// 关键方法 1:保存检查点状态
/// 当 Super Step 结束时,框架会调用此方法
/// </summary>
protected override ValueTask OnCheckpointingAsync(
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 将需要保存的状态数据排队(支持多个键值对)
return context.QueueStateUpdateAsync(
StateKey,
(this.LowerBound, this.UpperBound),
cancellationToken: cancellationToken);
}
/// <summary>
/// 关键方法 2:恢复检查点状态
/// 当从检查点恢复时,框架会调用此方法
/// </summary>
protected override async ValueTask OnCheckpointRestoredAsync(
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 从检查点中读取保存的状态数据
(int lowerBound, int upperBound) = await context
.GetStateAsync<(int, int)>(StateKey, cancellationToken: cancellationToken);
// 恢复实例的状态
this.LowerBound = lowerBound;
this.UpperBound = upperBound;
}
}
// 定义信号枚举
internal enum NumberSignal
{
Init, // 初始化
Above, // 猜大了
Below // 猜小了
}
Console.WriteLine("GuessNumberExecutor 定义完成");
代码详解
- OnCheckpointingAsync() - 保存状态
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, ...)
{
// 将状态数据添加到检查点
return context.QueueStateUpdateAsync(StateKey, (LowerBound, UpperBound), ...);
}
- 调用时机:每个 Super Step 结束时自动调用
- 状态数据:可以是任何可序列化的对象(基本类型、元组、自定义类)
- StateKey:用于标识状态数据的键,恢复时需要使用相同的键
- 多状态支持:可以多次调用 QueueStateUpdateAsync() 保存多个状态
- OnCheckpointRestoredAsync() - 恢复状态
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, ...)
{
// 从检查点读取状态数据
var state = await context.GetStateAsync<(int, int)>(StateKey, ...);
// 恢复实例字段
this.LowerBound = state.Item1;
this.UpperBound = state.Item2;
}
- 调用时机:从检查点恢复工作流时调用
- 类型匹配:GetStateAsync
() 的类型必须与保存时一致 - 键匹配:必须使用与保存时相同的 StateKey
- 必须实现:如果 Executor 有状态,这两个方法都必须正确实现
- 状态数据的序列化要求
支持的类型:
基本类型:int, string, bool, DateTime 等
值类型:元组 (int, int)、结构体 struct
引用类型:自定义类(需支持 JSON 序列化)
集合类型:List
, Dictionary<K,V>
不支持的类型:
不可序列化的对象(如数据库连接、文件句柄)
包含循环引用的对象
包含委托或事件的对象
6. Super Step、Checkpoint、Executor 的关系总结
flowchart TB
subgraph "Workflow 执行流程"
A[开始] --> B[Super Step 1]
B --> C[检查点 1]
C --> D[Super Step 2]
D --> E[检查点 2]
E --> F[Super Step 3]
F --> G[检查点 3]
G --> H[结束]
end
subgraph "Super Step 内部"
I[接收信号] --> J[Executor 1<br/>HandleAsync]
I --> K[Executor 2<br/>HandleAsync]
J --> L[等待完成]
K --> L
L --> M[OnCheckpointingAsync]
M --> N[保存状态到检查点]
end
subgraph "检查点恢复"
O[加载检查点] --> P[创建新 Workflow 实例]
P --> Q[OnCheckpointRestoredAsync]
Q --> R[恢复 Executor 状态]
R --> S[继续执行]
end
B -.内部执行.-> I
C -.恢复流程.-> O
| 概念 | 核心要点 |
|---|---|
| Super Step | • Workflow 执行的基本单位 • 包含一组 Executor 的完整执行 • 是检查点创建的边界 |
| Checkpoint | • 在 Super Step 结束时自动创建 • 包含所有 Executor 的状态快照 • 通过 SuperStepCompletedEvent 获取 |
| CheckpointManager | • 负责存储和读取检查点 • 默认实现是内存存储 • 可自定义实现(文件、数据库、云存储) |
| Executor 生命周期 | • OnCheckpointingAsync() 保存状态 • OnCheckpointRestoredAsync() 恢复状态 • 状态数据必须可序列化 |
六、工作流状态共享
在真实业务中,以下痛点最常见:
| 场景 | 传统做法 | 痛点 | Workflow Shared State 方案 |
|---|---|---|---|
| 多步骤共享原始输入 | 通过参数层层传递 | 易丢失、难维护 | 在入口步骤写入状态,后续随取随用 |
| 并发执行器共享缓存 | 使用静态变量 | 线程不安全 | 作用域隔离 + 状态管理器负责并发 |
| 人机协作回填审批结论 | HTTP 回调携带参数 | 状态丢失、难追踪 | WaitForInput + 状态恢复 |
| Agent 输出给多个消费者 | 多次调用同一个 Agent | 成本高 | 把输出写入状态,其他步骤复用 |
1. WorkflowContext 状态 API 速查
| API | 作用 | 典型使用方式 |
|---|---|---|
| QueueStateUpdateAsync(key, value, scope) | 将状态写入队列,当前步骤完成后统一提交,避免脏读 | 适合在 Executor 内写入结果、缓存、审计信息 |
| ReadStateAsync |
读取指定键,找不到则返回 null |
读取上游步骤输出、缓存的 Agent 响应 |
| ReadOrInitStateAsync |
如果不存在则初始化,兼顾懒加载与并发一致性 | 首次接入业务实体、惰性缓存、计数器 / 重试次数 |
| QueueClearScopeAsync(scope) | 清理整个作用域 | 敏感信息擦除、生命周期结束时清理 |
| ReadStateKeysAsync(scope) | 列出作用域中所有键 | 调试、监控、构建动态路由 |
ReadOrInitStateAsync 应用场景
首次加载业务上下文:如 fileId 对应的元数据、审批信息等只需初始化一次的结构。
惰性构建缓存:在第一次命中时生成摘要、Embedding 或模型响应,后续 Executor 复用。
并发累加器:计数器、重试次数、Fan-in 进度等需要“读当前值 + 初始化默认值”的模式。
作用域 (Scope) 分类
| Scope 类型 | 说明 | 建议命名 |
|---|---|---|
| Executor Scope (默认) | 键只对当前 Executor 可见 | nameof(Executor) |
| 自定义业务 Scope | 多个 Executor 共享,例如 "FileContentState" | 模块名 + State |
| System / Environment Scope | 框架内部使用,不建议直接写入 | 使用 VariableScopeNames 常量时需谨慎 |
线程安全说明:QueueStateUpdateAsync 会在状态管理器内部加锁,所以不需要额外的 lock。但仍需控制数据粒度(例如传输 ID,而不是整个大对象)。
2. 案例:文档统计工作流
我们将复刻官方 Workflows/SharedStates 示例,并加入更多注释:
- 入口步骤读取文本内容,写入 FileContentState 作用域中,值为 fileId -> 文本内容
- Fan-out 出两个 Executor:
- WordCountingExecutor 统计单词数
- ParagraphCountingExecutor 统计段落数
- Fan-in 到 AggregationExecutor,汇总结果并输出结构化报表
该案例展示了一次写入,多处读取的模式,也是后续 MapReduce、并发协作的基础。
- 准备工作
// 示例数据
internal static class SharedStateSampleData
{
private static readonly IReadOnlyDictionary<string, string> Documents = new Dictionary<string, string>
{
["ProductBrief"] = "MAF Workflow 让 .NET 团队可以像积木一样组合 Agent、Executor 与工具, 支持流式事件、并发节点和可观测性。\n\n它强调企业级能力, 包括状态管理、依赖注入、权限控制, 适合搭建端到端 AI 业务流程。",
["WeeklyReport"] = "本周平台完成了 Shared State 功能的代码走查, 已经覆盖 Fan-out/Fan-in, Loop, Human-in-the-Loop 三种场景。\n\n下周计划: 1) 集成多模型投票; 2) 增加异常回滚; 3) 落地监控指标。"
};
public static string GetDocument(string name)
=> Documents.TryGetValue(name, out var content)
? content
: throw new ArgumentException($"未找到文档: {name}");
}
// 状态常量
internal static class FileContentStateConstants
{
public const string ScopeName = "FileContentState";
}
// 模型定义
internal sealed class FileStats
{
public int WordCount { get; init; }
public int ParagraphCount { get; init; }
}
// 定义FileReadExecutor
internal sealed class FileReadExecutor() : Executor<string, string>("FileReadExecutor")
{
public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
string content = SharedStateSampleData.GetDocument(message);
string fileId = Guid.NewGuid().ToString("N");
await context.QueueStateUpdateAsync(fileId, content, FileContentStateConstants.ScopeName, cancellationToken);
Console.WriteLine($"FileReadExecutor 将 {message} 写入 Scope:{FileContentStateConstants.ScopeName}");
return fileId;
}
}
// 定义并行统计执行器
internal sealed class WordCountingExecutor() : Executor<string, FileStats>("WordCountingExecutor")
{
public override async ValueTask<FileStats> HandleAsync(string fileId, IWorkflowContext context, CancellationToken cancellationToken = default)
{
string? content = await context.ReadStateAsync<string>(fileId, FileContentStateConstants.ScopeName, cancellationToken);
if (content is null)
{
throw new InvalidOperationException($"无法在 Scope:{FileContentStateConstants.ScopeName} 中找到 fileId={fileId}");
}
int wordCount = content.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
return new FileStats { WordCount = wordCount };
}
}
// 定义AggregationExecutor
internal sealed class ParagraphCountingExecutor() : Executor<string, FileStats>("ParagraphCountingExecutor")
{
public override async ValueTask<FileStats> HandleAsync(string fileId, IWorkflowContext context, CancellationToken cancellationToken = default)
{
string? content = await context.ReadStateAsync<string>(fileId, FileContentStateConstants.ScopeName, cancellationToken);
if (content is null)
{
throw new InvalidOperationException($"无法在 Scope:{FileContentStateConstants.ScopeName} 中找到 fileId={fileId}");
}
int paragraphCount = content.Split(['\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
return new FileStats { ParagraphCount = paragraphCount };
}
}
internal sealed class AggregationExecutor() : Executor<FileStats>("AggregationExecutor")
{
private readonly List<FileStats> _buffer = [];
public override async ValueTask HandleAsync(FileStats message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._buffer.Add(message);
if (this._buffer.Count < 2)
{
return;
}
int totalWords = this._buffer.Sum(x => x.WordCount);
int totalParagraphs = this._buffer.Sum(x => x.ParagraphCount);
var output = new
{
总词数 = totalWords,
总段落数 = totalParagraphs,
统计时间 = DateTimeOffset.UtcNow
};
Console.WriteLine("文档统计结果");
output.Display();
await context.YieldOutputAsync(output, cancellationToken);
}
}
// 定义AggregationExecutor
internal sealed class AggregationExecutor() : Executor<FileStats>("AggregationExecutor")
{
private readonly List<FileStats> _buffer = [];
public override async ValueTask HandleAsync(FileStats message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._buffer.Add(message);
if (this._buffer.Count < 2)
{
return;
}
int totalWords = this._buffer.Sum(x => x.WordCount);
int totalParagraphs = this._buffer.Sum(x => x.ParagraphCount);
var output = new
{
总词数 = totalWords,
总段落数 = totalParagraphs,
统计时间 = DateTimeOffset.UtcNow
};
Console.WriteLine("文档统计结果");
output.Display();
await context.YieldOutputAsync(output, cancellationToken);
}
}
- 步骤 1:组装 Fan-out/Fan-in 工作流
我们将以下列顺序建图:
- FileReadExecutor 作为入口,并指定最终输出来自 AggregationExecutor
- AddFanOutEdge:把文件 ID 同时传给两个统计执行器
- AddFanInEdge:等待两个统计执行器完成后,再进入聚合节点
通过 Builder 定义拓扑后,我们会在后续步骤用 RunStreamingAsync 查看事件流。
var fileRead = new FileReadExecutor();
var wordCounting = new WordCountingExecutor();
var paragraphCounting = new ParagraphCountingExecutor();
var aggregate = new AggregationExecutor();
var sharedStateWorkflow = new WorkflowBuilder(fileRead)
.AddFanOutEdge(fileRead, [wordCounting, paragraphCounting])
.AddFanInEdge([wordCounting, paragraphCounting], aggregate)
.WithOutputFrom(aggregate)
.Build();
Console.WriteLine("Shared State Workflow 构建完成");
- 步骤 2:流式观察状态读写
我们使用 InProcessExecution.RunStreamingAsync 以事件流的方式运行 workflow,并关注三类事件:
- WorkflowStartedEvent / WorkflowCompletedEvent:整体生命周期
- ExecutorCompletedEvent:确认每个节点的执行顺序
- WorkflowOutputEvent:最终业务输出
同时,我们打印状态写入时生成的 fileId,以便验证 Fan-out 节点读取的是同一个共享数据。
static async Task RunSharedStateDemoAsync(Workflow sharedWorkflow, string documentKey)
{
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"演示文档: {documentKey}");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
await using (var run = await InProcessExecution.StreamAsync(sharedWorkflow, documentKey)){
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case WorkflowStartedEvent started:
Console.WriteLine($"Workflow Started");
break;
case ExecutorCompletedEvent executorCompleted:
Console.WriteLine($"{executorCompleted.ExecutorId} 完成");
break;
case WorkflowOutputEvent outputEvent:
Console.WriteLine("收到 Workflow Output Event:");
outputEvent.Display();
break;
case WorkflowErrorEvent errorEvent:
Console.WriteLine("收到 Workflow Error Event:");
errorEvent.Display();
break;
default:
Console.WriteLine($"其他事件: {evt.GetType().Name}");
evt.Display();
break;
}
}
await run.DisposeAsync();
}
}
await RunSharedStateDemoAsync(sharedStateWorkflow, "ProductBrief");
3. 最佳实践
命名规范
使用 Scope + 功能命名(例:FileContentState)
键值建议为业务 ID(如 fileId、turnId),避免直接储存整段文本作为 key
状态生命周期
在数据敏感的场景结束后调用 QueueClearScopeAsync
长时间运行的 Workflow 可定期写入 UpdatedAt,便于调试
并发注意事项
Fan-out 节点共享同一 state 时,只写 ID,不写大对象
如果必须写大对象,可在读取端做 null 检查并提供降级策略
调试建议
使用 ReadStateKeysAsync 打印当前作用域的所有键
配合 WorkflowEvent 日志,快速定位状态缺失或覆盖问题
常见踩坑
在 Executor 中保存可变引用(List/Dictionary),会被后续步骤修改
在多个 scope 中重复使用相同 key,导致取值混乱
忘记 await QueueStateUpdateAsync,导致状态未落盘
七、工作流上下文
1. API 分类速查表
IWorkflowContext 是 Executor 与 Workflow 引擎交互的唯一入口,它提供了以下能力:
| 分类 | API | 用途 | 常见场景 |
|---|---|---|---|
| 输出管理 | SendMessageAsync | 向下游 Executor 发送消息 | 流程编排、数据传递 |
| AddEventAsync | 触发自定义业务事件 | 进度通知、审计日志 | |
| YieldOutputAsync | 输出最终业务结果 | 工作流返回值 | |
| 流程控制 | RequestHaltAsync | 暂停工作流执行 | 人工审批、等待外部系统 |
| 状态管理 | ReadStateAsync |
读取状态值 | 获取共享数据 |
| ReadOrInitStateAsync |
读取或初始化状态 | 计数器、缓存、配置 | |
| QueueStateUpdateAsync |
更新状态值 | 写入业务数据 | |
| ReadStateKeysAsync | 枚举作用域中的所有键 | 调试、动态路由 | |
| QueueClearScopeAsync | 清理整个作用域 | 敏感数据清理 | |
| 元数据 | TraceContext | 链路追踪上下文 | 分布式追踪 |
| ConcurrentRunsEnabled | 是否支持并发运行 | 并发控制 |
2. 核心概念:三种输出机制的本质区别
这是最容易混淆的部分!理解它们的区别是掌握 Context 的关键。
SendMessageAsync - 流程内部通信
特点:
目标是下游 Executor(通过 Edge 连接的节点)
消息在下一个 SuperStep才会被接收
可以指定 targetId 进行精确路由
如果没有连接关系,消息会被丢弃
典型场景:
动态路由(根据条件发送给不同的 Executor)
广播模式(一个 Executor 发送给多个下游)
流程编排中的数据传递
AddEventAsync - 业务事件通知
特点:
目标是外部订阅者(调用 workflow 的客户端)
事件在当前 SuperStep 结束时立即发出
用于传递业务进度、审计日志、中间状态
不影响工作流的执行流程
典型场景:
进度通知("正在生成大纲…"、"已完成 3/10 个文档")
审计日志("用户操作已记录"、"检测到敏感词")
调试信息(自定义 Event 携带中间变量)
YieldOutputAsync - 最终业务结果
特点:
目标是工作流的最终返回值
通过 WorkflowOutputEvent 封装返回
通常在 WithOutputFrom() 指定的步骤中调用
一个工作流可以有多个输出(多次调用 YieldOutputAsync)
典型场景:
工作流的最终结果(生成的文章、处理后的数据)
批量输出(MapReduce 的 Reduce 结果)
结构化返回值(JSON、DTO)
记忆口诀:
- Send = 内部流程通信(Executor → Executor)
- Event = 进度广播(Executor → 订阅者)
- Output = 最终结果(Workflow → 调用者)
对比总结
| 维度 | SendMessageAsync | AddEventAsync | YieldOutputAsync |
|---|---|---|---|
| 接收者 | 下游 Executor | 外部订阅者 | 外部订阅者 |
| 触发时机 | 下一个 SuperStep | 当前 SuperStep 结束 | 当前 SuperStep 结束 |
| 用途 | 流程内部数据传递 | 进度通知/审计日志 | 最终业务结果 |
| 是否影响流程 | 是(触发下游执行) | 否 | 否 |
| 典型事件类型 | N/A | 自定义 WorkflowEvent | WorkflowOutputEvent |
2. 实战演示:Context Explorer
我们将创建一个专门的 ContextExplorerExecutor,它会依次调用 IWorkflowContext 的所有关键 API,并通过事件流展示每个方法的效果。
演示场景设计
flowchart TD
Start[用户输入: 探索模式] --> Explorer[ContextExplorerExecutor]
Explorer --> S1[1.状态管理演示]
S1 --> S1A[QueueStateUpdateAsync]
S1A --> S1B[ReadStateAsync]
S1B --> S1C[ReadOrInitStateAsync]
S1C --> S1D[ReadStateKeysAsync]
Explorer --> S2[2.输出机制演示]
S2 --> S2A[SendMessageAsync]
S2A --> S2B[AddEventAsync]
S2B --> S2C[YieldOutputAsync]
Explorer --> S3[3.元数据演示]
S3 --> S3A[TraceContext]
S3A --> S3B[ConcurrentRunsEnabled]
S1D --> Receiver[DownstreamExecutor]
S2C --> Output[WorkflowOutputEvent]
style Explorer fill:#e1f5ff
style S1 fill:#fff4e1
style S2 fill:#f3e5f5
style S3 fill:#e8f5e9
- 步骤 1:定义自定义事件和 Context Explorer
// 自定义事件:用于报告探索进度
internal sealed class ExplorationProgressEvent(string category, string api, string result) : WorkflowEvent
{
public string Category { get; } = category;
public string API { get; } = api;
public string Result { get; } = result;
}
internal sealed class ContextExplorerExecutor() : Executor<string, string>("ContextExplorerExecutor")
{
private const string DemoScope = "ExplorerDemoScope";
public override async ValueTask<string> HandleAsync(string mode, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"ContextExplorer 开始工作 (模式: {mode})");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
// ============ 1.状态管理演示 ============
await this.DemonstrateStateManagementAsync(context, cancellationToken);
// ============ 2.输出机制演示 ============
await this.DemonstrateOutputMechanismsAsync(context, cancellationToken);
// ============ 3.元数据演示 ============
await this.DemonstrateMetadataAsync(context, cancellationToken);
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("ContextExplorer 工作完成");
return "探索完成";
}
private async ValueTask DemonstrateStateManagementAsync(IWorkflowContext context, CancellationToken cancellationToken)
{
Console.WriteLine("\n【1.状态管理 API 演示】");
// 1. QueueStateUpdateAsync - 写入状态
await context.QueueStateUpdateAsync("counter", 100, DemoScope, cancellationToken);
await context.AddEventAsync(new ExplorationProgressEvent(
"状态管理",
"QueueStateUpdateAsync",
"写入 counter=100"), cancellationToken);
Console.WriteLine(" QueueStateUpdateAsync: 写入 counter=100");
// 2. ReadStateAsync - 读取状态
var counter = await context.ReadStateAsync<int>("counter", DemoScope, cancellationToken);
await context.AddEventAsync(new ExplorationProgressEvent(
"状态管理",
"ReadStateAsync",
$"读取到 counter={counter}"), cancellationToken);
Console.WriteLine($" ReadStateAsync: 读取到 counter={counter}");
// 3. ReadOrInitStateAsync - 惰性初始化
var config = await context.ReadOrInitStateAsync(
"config",
() => new { MaxRetry = 3, Timeout = 30 },
DemoScope,
cancellationToken);
await context.AddEventAsync(new ExplorationProgressEvent(
"状态管理",
"ReadOrInitStateAsync",
$"初始化配置: MaxRetry={config.MaxRetry}"), cancellationToken);
Console.WriteLine($" ReadOrInitStateAsync: 初始化配置 MaxRetry={config.MaxRetry}");
// 4. ReadStateKeysAsync - 枚举所有键
var keys = await context.ReadStateKeysAsync(DemoScope, cancellationToken);
await context.AddEventAsync(new ExplorationProgressEvent(
"状态管理",
"ReadStateKeysAsync",
$"作用域中的键: {string.Join(", ", keys)}"), cancellationToken);
Console.WriteLine($" ReadStateKeysAsync: 共 {keys.Count} 个键 [{string.Join(", ", keys)}]");
// 5. QueueClearScopeAsync - 清理作用域(演示中不实际执行,避免影响后续测试)
// await context.QueueClearScopeAsync(DemoScope, cancellationToken);
Console.WriteLine(" QueueClearScopeAsync: 已跳过(避免清理演示数据)");
}
private async ValueTask DemonstrateOutputMechanismsAsync(IWorkflowContext context, CancellationToken cancellationToken)
{
Console.WriteLine("\n【2.输出机制 API 演示】");
// 1. SendMessageAsync - 发送给下游 Executor
await context.SendMessageAsync("来自 ContextExplorer 的消息", targetId: null, cancellationToken);
await context.AddEventAsync(new ExplorationProgressEvent(
"输出机制",
"SendMessageAsync",
"已发送消息到下游"), cancellationToken);
Console.WriteLine(" SendMessageAsync: 已发送消息到下游 Executor");
// 2. AddEventAsync - 触发自定义事件
await context.AddEventAsync(new ExplorationProgressEvent(
"输出机制",
"AddEventAsync",
"这是一个自定义业务事件"), cancellationToken);
Console.WriteLine(" AddEventAsync: 已触发自定义事件");
// 3. YieldOutputAsync - 输出最终结果
var finalResult = new
{
探索时间 = DateTimeOffset.UtcNow,
API总数 = 11,
状态作用域 = DemoScope
};
await context.YieldOutputAsync(finalResult, cancellationToken);
Console.WriteLine(" YieldOutputAsync: 已输出最终结果");
}
private async ValueTask DemonstrateMetadataAsync(IWorkflowContext context, CancellationToken cancellationToken)
{
Console.WriteLine("\n【3.元数据 API 演示】");
// 1. TraceContext - 链路追踪上下文
var traceContext = context.TraceContext;
var traceInfo = traceContext != null
? string.Join(", ", traceContext.Select(kv => $"{kv.Key}={kv.Value}"))
: "null";
await context.AddEventAsync(new ExplorationProgressEvent(
"元数据",
"TraceContext",
traceInfo), cancellationToken);
Console.WriteLine($" TraceContext: {traceInfo}");
// 2. ConcurrentRunsEnabled - 并发支持
var concurrentEnabled = context.ConcurrentRunsEnabled;
await context.AddEventAsync(new ExplorationProgressEvent(
"元数据",
"ConcurrentRunsEnabled",
concurrentEnabled.ToString()), cancellationToken);
Console.WriteLine($" ConcurrentRunsEnabled: {concurrentEnabled}");
}
}
- 步骤 2:创建下游接收器
为了演示 SendMessageAsync 的效果,我们需要一个下游 Executor 来接收消息。
internal sealed class DownstreamReceiverExecutor() : Executor<string, string>("DownstreamReceiverExecutor")
{
public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"DownstreamReceiver 收到消息: {message}");
await context.AddEventAsync(new ExplorationProgressEvent(
"下游接收",
"HandleAsync",
$"已处理消息: {message}"), cancellationToken);
return "消息已接收";
}
}
- 步骤 3:构建工作流
将 ContextExplorerExecutor 和DownstreamReceiverExecutor 连接起来,形成完整的工作流。
var explorer = new ContextExplorerExecutor();
var receiver = new DownstreamReceiverExecutor();
var contextExplorerWorkflow = new WorkflowBuilder(explorer)
.AddEdge(explorer, receiver)
.WithOutputFrom(explorer) // 输出来自 explorer 的 YieldOutputAsync
.Build();
- 步骤 4:流式运行并观察事件
通过流式运行,我们可以实时观察每个 API 调用产生的事件。
static async Task RunContextExplorerAsync(Workflow workflow, string mode)
{
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"开始探索 IWorkflowContext API (模式: {mode})");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
await using (var run = await InProcessExecution.StreamAsync(workflow, mode))
{
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case WorkflowStartedEvent:
Console.WriteLine("[系统事件] Workflow 已启动\n");
break;
case ExplorationProgressEvent progress:
Console.WriteLine($"[业务事件] 分类: {progress.Category}");
Console.WriteLine($" API: {progress.API}");
Console.WriteLine($" 结果: {progress.Result}\n");
break;
case ExecutorCompletedEvent executorCompleted:
Console.WriteLine($"[系统事件] {executorCompleted.ExecutorId} 执行完成\n");
break;
case WorkflowOutputEvent outputEvent:
Console.WriteLine("[输出事件] WorkflowOutputEvent:");
outputEvent.Display();
Console.WriteLine();
break;
default:
Console.WriteLine($"[其他事件] {evt.GetType().Name}");
break;
}
}
}
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
}
await RunContextExplorerAsync(contextExplorerWorkflow, "完整探索");
- SendMessageAsync
targetId精准路由实战
典型场景:Intent Router 同时连接图像、文本、兜底三个 Executor,希望只唤醒匹配的下游。
flowchart LR
R[IntentRouter] --> I[ImageGenerator]
R --> T[TextGenerator]
R --> F[FallbackExecutor]
R -.targetId=image.-> I
R -.targetId=report.-> T
R -.targetId=FallbackExecutor.-> F
style R fill:#e1f5ff
style I fill:#fff4e1
style T fill:#f3e5f5
style F fill:#e8f5e9
internal sealed class IntentRouterExecutor() : Executor<string, string>("IntentRouter")
{
private readonly Dictionary<string, string> _routingTable = new(StringComparer.OrdinalIgnoreCase)
{
["image"] = "ImageGenerator",
["report"] = "TextGenerator"
};
public override async ValueTask<string> HandleAsync(string intent, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var targetId = this._routingTable.TryGetValue(intent, out var candidate) ? candidate : "FallbackExecutor";
Console.WriteLine($"Router 判定 {intent} 应派发给 {targetId}");
await context.SendMessageAsync($"指令: {intent}", targetId, cancellationToken);
return $"消息已定向到 {targetId}";
}
}
internal sealed class ImageGeneratorExecutor() : Executor<string, string>("ImageGenerator")
{
public override ValueTask<string> HandleAsync(string payload, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"ImageGenerator 收到: {payload}");
return ValueTask.FromResult("图像任务完成");
}
}
internal sealed class TextGeneratorExecutor() : Executor<string, string>("TextGenerator")
{
public override ValueTask<string> HandleAsync(string payload, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"TextGenerator 收到: {payload}");
return ValueTask.FromResult("文本任务完成");
}
}
internal sealed class FallbackExecutor() : Executor<string, string>("FallbackExecutor")
{
public override ValueTask<string> HandleAsync(string payload, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"FallbackExecutor 接管: {payload}");
return ValueTask.FromResult("兜底处理完成");
}
}
static async Task RunTargetIdDemoAsync(string intent)
{
var router = new IntentRouterExecutor();
var imageExecutor = new ImageGeneratorExecutor();
var textExecutor = new TextGeneratorExecutor();
var fallbackExecutor = new FallbackExecutor();
var workflow = new WorkflowBuilder(router)
.AddEdge(router, imageExecutor)
.AddEdge(router, textExecutor)
.AddEdge(router, fallbackExecutor)
.WithOutputFrom(router)
.Build();
Console.WriteLine($"\n输入: {intent}");
await using (var run = await InProcessExecution.StreamAsync(workflow, intent)){
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine("Router 输出:");
outputEvent.Display();
}
}
}
}
await RunTargetIdDemoAsync("image");
await RunTargetIdDemoAsync("report");
await RunTargetIdDemoAsync("speech");
关键提示:
- targetId 必须等于下游 Executor 的 Id(构造函数里指定)
- 上游可以保留多条 Edge,但只有 targetId 命中的节点会收到消息
- 适合意图路由、A/B 测试、租户隔离等需要精确派发的场景
3. 高级场景:QueueClearScopeAsync 的正确使用
QueueClearScopeAsync 用于清理整个作用域中的所有状态,这在处理敏感数据时非常重要。
| 场景 | 说明 | 示例 |
|---|---|---|
| 敏感数据清理 | 处理完用户隐私数据后立即清理 | 银行交易、医疗记录 |
| 临时状态回收 | 中间计算结果不再需要 | MapReduce 的 Map 阶段结果 |
| 生命周期管理 | 某个业务流程结束后清理上下文 | 订单处理完成后清理订单缓存 |
| 测试隔离 | 每个测试用例结束后清理状态 | 单元测试、集成测试 |
internal sealed class SecureDataExecutor() : Executor<string, string>("SecureDataExecutor")
{
private const string SensitiveDataScope = "SensitiveData";
public override async ValueTask<string> HandleAsync(
string userId,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 1. 读取敏感数据
var userData = await LoadUserDataAsync(userId);
await context.QueueStateUpdateAsync("userData", userData, SensitiveDataScope, cancellationToken);
// 2. 处理业务逻辑
var result = await ProcessSecureDataAsync(userData);
// 3. 处理完成后立即清理敏感数据
await context.QueueClearScopeAsync(SensitiveDataScope, cancellationToken);
Console.WriteLine("敏感数据已清理");
return result;
}
private Task<string> LoadUserDataAsync(string userId) => Task.FromResult($"UserData-{userId}");
private Task<string> ProcessSecureDataAsync(string data) => Task.FromResult($"Processed-{data}");
}
4. 最佳实践
- 输出机制选择指南
flowchart TD
Start{需要输出什么?}
Start -->|给下游 Executor| A[SendMessageAsync]
Start -->|进度/日志| B[AddEventAsync]
Start -->|最终结果| C[YieldOutputAsync]
A --> A1[流程编排<br/>动态路由<br/>数据传递]
B --> B1[进度通知<br/>审计日志<br/>调试信息]
C --> C1[业务返回值<br/>批量输出<br/>结构化结果]
- 状态管理最佳实践
| 实践 | 说明 | 示例 |
|---|---|---|
| 使用常量定义 Scope | 避免硬编码字符串 | const string MyScope = "OrderContext"; |
| 命名约定 | {模块名}Scope 或 {功能}Context | FileContentScope, UserSessionContext |
| 读写分离 | 写操作集中在入口步骤,读操作分散在下游 | Fan-out/Fan-in 模式 |
| 惰性初始化 | 使用 ReadOrInitStateAsync 避免重复初始化 | 配置、缓存、计数器 |
| 及时清理 | 处理完敏感数据后立即调用 QueueClearScopeAsync | 用户隐私数据、临时文件 |
- 元数据利用
public override async ValueTask<string> HandleAsync(
string message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 1. 检查并发支持
if (context.ConcurrentRunsEnabled)
{
Console.WriteLine("当前环境支持并发执行");
}
// 2. 传递链路追踪上下文
if (context.TraceContext != null)
{
var traceId = context.TraceContext.GetValueOrDefault("traceId");
Console.WriteLine($"TraceId: {traceId}");
// 传递给下游服务(如 HTTP 请求)
await CallExternalServiceAsync(traceId);
}
return "完成";
}
- 常见错误与解决方案
| 错误 | 原因 | 解决方案 |
|---|---|---|
| SendMessageAsync 后下游没收到 | 没有建立 Edge 连接 | 使用 AddEdge() 连接节点 |
| 状态读取返回 null | Scope 名称不一致 | 使用常量统一管理 Scope 名称 |
| 并发冲突 | 多个 Executor 同时写入同一个 Key | 使用不同的 Key 或加入步骤序号 |
| 事件没有触发 | 忘记 await AddEventAsync | 确保所有异步方法都被 await |
