Net+AI智能体8:Workflow概念篇
2025-11-22 18:21:43一、核心概念
1. Executor(执行器)
Executor(执行器) 是 Workflow 中的最小工作单元,类似于:
| 类比 | 说明 |
|---|---|
| 工厂里的工人 | 每个工人负责一道工序 |
| 乐高积木块 | 每个积木有特定功能,组合成整体 |
| 电路中的元件 | 接收输入信号,输出处理结果 |
flowchart LR
Input[输入消息] --> Executor[Executor\n执行器]
Executor --> Output[输出消息]
style Executor fill:#4CAF50,color:white
Executor 的核心特征
- 唯一标识(Id):每个 Executor 有一个唯一的 ID,用于在 Workflow 中引用
- 消息处理:接收特定类型的输入消息,处理后产生输出消息
- 路由配置:通过 ConfigureRoutes 方法定义能处理哪些类型的消息
- 状态感知:可以通过 IWorkflowContext 访问和修改工作流状态
Executor 的类型层次:MAF 提供了多种 Executor 类型,满足不同场景需求
classDiagram
class Executor {
+string Id
+ExecuteAsync()
#ConfigureRoutes()
}
class Executor~TInput~ {
+HandleAsync(TInput)
}
class Executor~TInput,TOutput~ {
+HandleAsync(TInput) TOutput
}
class FunctionExecutor~TInput~ {
+委托函数处理
}
class FunctionExecutor~TInput,TOutput~ {
+委托函数处理
}
class StatefulExecutor~TState~ {
+TState State
+ReadStateAsync()
+QueueStateUpdateAsync()
}
Executor <|-- Executor~TInput~
Executor <|-- Executor~TInput,TOutput~
Executor~TInput~ <|-- FunctionExecutor~TInput~
Executor~TInput,TOutput~ <|-- FunctionExecutor~TInput,TOutput~
Executor <|-- StatefulExecutor~TState~
| 类型 | 用途 | 示例场景 |
|---|---|---|
| Executor |
处理消息,无返回值 | 日志记录、通知发送 |
| Executor<TInput, TOutput> | 处理消息,有返回值 | 数据转换、AI 调用 |
| FunctionExecutor |
用委托函数快速创建 | 简单处理逻辑 |
| StatefulExecutor |
需要维护状态的执行器 | 会话管理、计数器 |
- Executor 源码解析,关键点:
- 每个 Executor 有唯一 ID
- 通过 ConfigureRoutes 声明能处理的消息类型
- 执行过程会产生事件(ExecutorInvokedEvent、ExecutorCompletedEvent 等)
// 来自 Executor.cs
public abstract class Executor : IIdentified
{
// 唯一标识符
public string Id { get; }
// 配置消息路由(子类必须实现)
protected abstract RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder);
// 执行消息处理
public async ValueTask<object?> ExecuteAsync(
object message,
TypeId messageType,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 记录调用事件
await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message));
// 路由消息到正确的处理器
CallResult? result = await this.Router.RouteMessageAsync(message, context, ...);
// 记录完成或失败事件
ExecutorEvent executionResult = result?.IsSuccess is not false
? new ExecutorCompletedEvent(this.Id, result?.Result)
: new ExecutorFailedEvent(this.Id, result.Exception);
await context.AddEventAsync(executionResult);
return result.Result;
}
}
2.Edge(边)
Edge(边) 是连接两个 Executor 的消息通道,类似于:
| 类比 | 说明 |
|---|---|
| 工厂传送带 | 把上一道工序的产品传送到下一道工序 |
| 水管 | 把水从一个容器引导到另一个容器 |
| 邮路 | 把信件从发件人送到收件人 |
flowchart LR
A[Executor A] -->|Edge 边| B[Executor B]
subgraph "Edge 的作用"
direction TB
E1["1. 定义消息流向"]
E2["2. 可设置条件"]
E3["3. 支持扇入/扇出"]
end
- Edge 的三种类型
flowchart TB
subgraph "Direct 直连边"
A1[A] --> B1[B]
end
subgraph "FanOut 扇出边"
A2[A] --> B2[B]
A2 --> C2[C]
A2 --> D2[D]
end
subgraph "FanIn 扇入边"
B3[B] --> E3[E]
C3[C] --> E3
D3[D] --> E3
end
| 类型 | 说明 | 使用场景 |
|---|---|---|
| Direct(直连) | 一对一连接 | 顺序处理流程 |
| FanOut(扇出) | 一对多连接 | 并行分发任务 |
| FanIn(扇入) | 多对一连接 | 汇聚多个结果 |
- Edge 源码解析,关键点:
- Edge 定义了消息从哪里来、到哪里去
- Direct Edge 支持条件路由(只有满足条件的消息才传递)
- FanOut Edge 可以实现广播或分区逻辑
// 来自 Edge.cs
public enum EdgeKind
{
Direct, // 直连:一对一
FanOut, // 扇出:一对多
FanIn // 扇入:多对一
}
public sealed class Edge
{
public EdgeKind Kind { get; init; } // 边的类型
public EdgeData Data { get; init; } // 边的具体数据
}
// 来自 DirectEdgeData.cs - 直连边的数据
public sealed class DirectEdgeData : EdgeData
{
public string SourceId { get; } // 源 Executor ID
public string SinkId { get; } // 目标 Executor ID
public Func<object?, bool>? Condition; // 可选的条件判断
}
3. Workflow(工作流)
Workflow(工作流) 是将多个 Executor 通过 Edge 连接起来的完整流程定义,类似于:
| 类比 | 说明 |
|---|---|
| 流程图 | 定义了从开始到结束的完整流程 |
| 生产线 | 多个工位通过传送带连接成完整生产线 |
| 乐谱 | 规定了演奏的顺序和节奏 |
flowchart TB
subgraph "Workflow 的组成"
direction LR
S[StartExecutor<br>起点] --> E1[Executor 1]
E1 --> E2[Executor 2]
E2 --> E3[Output<br>终点]
end
subgraph "Workflow 元数据"
N["Name: 工作流名称"]
D["Description: 工作流描述"]
O["OutputExecutors: 输出节点集合"]
end
- Workflow 的核心属性
// 来自 Workflow.cs
public class Workflow
{
// 起始 Executor 的 ID
public string StartExecutorId { get; }
// 工作流名称(可选)
public string? Name { get; internal init; }
// 工作流描述(可选)
public string? Description { get; internal init; }
// Executor 绑定字典
internal Dictionary<string, ExecutorBinding> ExecutorBindings { get; init; }
// 边的集合(按源节点分组)
internal Dictionary<string, HashSet<Edge>> Edges { get; init; }
// 输出 Executor 集合
internal HashSet<string> OutputExecutors { get; init; }
}
- 使用 WorkflowBuilder 构建工作流:MAF 采用建造者模式(Builder Pattern)来构建 Workflow,这使得工作流的定义更加直观
// 创建工作流示例
var workflow = new WorkflowBuilder(startExecutor) // 指定起点
.WithName("订单处理工作流") // 设置名称
.WithDescription("处理电商订单的完整流程") // 设置描述
.AddEdge(receiveOrder, validateOrder) // 添加边
.AddEdge(validateOrder, processPayment,
condition: x => x.IsValid) // 条件边
.AddEdge(processPayment, sendNotification)
.WithOutputFrom(sendNotification) // 指定输出节点
.Build(); // 构建工作流
4. SuperStep(超步)
SuperStep(超步) 是 Workflow 执行的基本处理周期。可以类比为:
| 类比 | 说明 |
|---|---|
| 游戏中的"回合" | 每个回合内所有玩家同时行动 |
| 工厂的"班次" | 每个班次内完成一批任务 |
| 海浪的"一波" | 一波消息被处理,然后产生下一波 |
sequenceDiagram
participant W as Workflow
participant S1 as SuperStep 1
participant S2 as SuperStep 2
participant S3 as SuperStep 3
W->>S1: 开始执行
Note over S1: 处理初始消息
S1->>S2: 传递输出消息
Note over S2: 处理第二批消息
S2->>S3: 传递输出消息
Note over S3: 处理完成
S3->>W: 返回结果
- SuperStep 的执行流程,关键事件:
- SuperStepStartedEvent:超步开始
- SuperStepCompletedEvent:超步完成
flowchart TB
subgraph "SuperStep N"
A[1.开始] --> B[2.收集待处理消息]
B --> C[3.执行所有 Executor]
C --> D[4.收集输出消息]
D --> E[5.应用状态更新]
E --> F[6.触发事件]
F --> G[7.完成]
end
G --> H{还有消息?}
H -->|是| I[SuperStep N+1]
H -->|否| J[工作流结束]
// SuperStep 事件定义
public class SuperStepEvent(int stepNumber, object? data = null) : WorkflowEvent(data)
{
// 超步的序号(从 0 开始)
public int StepNumber => stepNumber;
}
5. WorkflowContext(工作流上下文)
WorkflowContext(工作流上下文) 是 Executor 执行时的运行环境,类似于:
| 类比 | 说明 |
|---|---|
| 工人的工作台 | 提供工具、材料和通信渠道 |
| 通信枢纽 | 允许各个工位之间传递信息 |
| 共享内存 | 存储和读取状态数据 |
flowchart TB
subgraph "IWorkflowContext 提供的能力"
A[发送消息<br>SendMessageAsync]
B[添加事件<br>AddEventAsync]
C[输出结果<br>YieldOutputAsync]
D[读取状态<br>ReadStateAsync]
E[更新状态<br>QueueStateUpdateAsync]
F[请求停止<br>RequestHaltAsync]
end
- IWorkflowContext 核心接口,关键点:
- 消息传递:通过 SendMessageAsync 在 Executor 之间传递消息
- 状态管理:支持读取、初始化和更新状态
- 事件通知:通过 AddEventAsync 发出事件
- 流程控制:通过 RequestHaltAsync 停止工作流
// 来自 IWorkflowContext.cs
public interface IWorkflowContext
{
// 添加工作流事件(在当前 SuperStep 结束时触发)
ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default);
// 发送消息给下游 Executor(在下一个 SuperStep 处理)
ValueTask SendMessageAsync(object message, string? targetId, CancellationToken cancellationToken = default);
// 输出工作流结果
ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default);
// 请求在当前 SuperStep 结束时停止工作流
ValueTask RequestHaltAsync();
// 读取状态
ValueTask<T?> ReadStateAsync<T>(string key, string? scopeName = null, CancellationToken cancellationToken = default);
// 读取或初始化状态
ValueTask<T> ReadOrInitStateAsync<T>(string key, Func<T> initialStateFactory, string? scopeName = null, CancellationToken cancellationToken = default);
// 更新状态(排队更新,在 SuperStep 结束时应用)
ValueTask QueueStateUpdateAsync<T>(string key, T value, string? scopeName = null, CancellationToken cancellationToken = default);
}
6. WorkflowEvent(工作流事件)
WorkflowEvent(工作流事件) 是工作流执行过程中产生的通知消息,类似于:
| 类比 | 说明 |
|---|---|
| 广播通知 | 向所有人广播系统状态变化 |
| 日志记录 | 记录系统执行过程中的关键节点 |
| 事件订阅 | 允许外部监听并响应特定事件 |
classDiagram
class WorkflowEvent {
+object? Data
+ToString()
}
class ExecutorEvent {
+string ExecutorId
}
class SuperStepEvent {
+int StepNumber
}
class WorkflowStartedEvent
class WorkflowOutputEvent
class WorkflowErrorEvent
class WorkflowWarningEvent
class ExecutorInvokedEvent
class ExecutorCompletedEvent
class ExecutorFailedEvent
class SuperStepStartedEvent
class SuperStepCompletedEvent
WorkflowEvent <|-- ExecutorEvent
WorkflowEvent <|-- SuperStepEvent
WorkflowEvent <|-- WorkflowStartedEvent
WorkflowEvent <|-- WorkflowOutputEvent
WorkflowEvent <|-- WorkflowErrorEvent
WorkflowEvent <|-- WorkflowWarningEvent
ExecutorEvent <|-- ExecutorInvokedEvent
ExecutorEvent <|-- ExecutorCompletedEvent
ExecutorEvent <|-- ExecutorFailedEvent
SuperStepEvent <|-- SuperStepStartedEvent
SuperStepEvent <|-- SuperStepCompletedEvent
- 事件分类
| 事件层级 | 事件类型 | 说明 |
|---|---|---|
| 工作流级别 | WorkflowStartedEvent | 工作流开始执行 |
| WorkflowOutputEvent | 工作流产生输出 | |
| WorkflowErrorEvent | 工作流发生错误 | |
| WorkflowWarningEvent | 工作流产生警告 | |
| 超步级别 | SuperStepStartedEvent | 超步开始 |
| SuperStepCompletedEvent | 超步完成 | |
| 执行器级别 | ExecutorInvokedEvent | Executor 被调用 |
| ExecutorCompletedEvent | Executor 完成处理 | |
| ExecutorFailedEvent | Executor 处理失败 |
7. 核心概念:Run(运行实例)
Run(运行实例)是 Workflow 的一次具体执行,类似于:
| 类比 | 说明 |
|---|---|
| 电影的一场放映 | 同一部电影可以放映多场 |
| 生产线的一个批次 | 同一条生产线可以生产多个批次 |
| 游戏的一局 | 同一个游戏可以玩多局 |
flowchart LR
subgraph "Workflow(工作流定义)"
W[流程定义]
end
subgraph "Runs(运行实例)"
R1[Run 1]
R2[Run 2]
R3[Run 3]
end
W --> R1
W --> R2
W --> R3
- Run 的核心特性
// 来自 Run.cs
public sealed class Run : IAsyncDisposable
{
// 运行实例的唯一标识符
public string RunId => this._runHandle.RunId;
// 获取当前执行状态
public ValueTask<RunStatus> GetStatusAsync(CancellationToken cancellationToken = default);
// 获取所有产生的事件
public IEnumerable<WorkflowEvent> OutgoingEvents => this._eventSink;
// 获取自上次访问后的新事件
public IEnumerable<WorkflowEvent> NewEvents { get; }
// 恢复执行(带外部响应)
public async ValueTask<bool> ResumeAsync(IEnumerable<ExternalResponse> responses, CancellationToken cancellationToken = default);
}
- RunStatus(运行状态)
public enum RunStatus
{
NotStarted, // 尚未开始
Idle, // 空闲(已暂停,无待处理请求)
PendingRequests, // 等待外部响应
Ended, // 已结束
Running // 正在运行
}
stateDiagram-v2
[*] --> NotStarted
NotStarted --> Running : 开始执行
Running --> Idle : 暂停
Running --> PendingRequests : 等待外部响应
Running --> Ended : 完成/终止
Idle --> Running : 恢复
PendingRequests --> Running : 收到响应
Ended --> [*]
8. Checkpoint(检查点)
Checkpoint(检查点) 是工作流在某个时刻的完整状态快照,类似于:
| 类比 | 说明 |
|---|---|
| 游戏存档 | 保存游戏进度,随时可以读档继续 |
| 照片 | 记录某一时刻的完整状态 |
| 书签 | 标记阅读进度,下次从这里继续 |
flowchart LR
subgraph "工作流执行"
S1[SuperStep 1] --> C1[Checkpoint 1]
C1 --> S2[SuperStep 2]
S2 --> C2[Checkpoint 2]
C2 --> S3[SuperStep 3]
S3 --> C3[Checkpoint 3]
end
subgraph "恢复执行"
C2 -.->|从此处恢复| S3'[SuperStep 3]
end
- Checkpoint 的核心信息
// 来自 CheckpointInfo.cs
public sealed class CheckpointInfo
{
// 运行实例的唯一标识符
public string RunId { get; }
// 检查点的唯一标识符
public string CheckpointId { get; }
}
// 检查点的完整数据(来自 Checkpoint.cs)
internal sealed class Checkpoint
{
public int StepNumber { get; } // 超步编号
public WorkflowInfo Workflow { get; } // 工作流信息
public RunnerStateData RunnerData { get; } // 运行器状态
public Dictionary<ScopeKey, PortableValue> StateData { get; } // 状态数据
public Dictionary<EdgeId, PortableValue> EdgeStateData { get; } // 边状态数据
public CheckpointInfo? Parent { get; } // 父检查点
}
- Checkpoint 的使用场景
| 场景 | 说明 |
|---|---|
| 故障恢复 | 系统崩溃后从最近的检查点恢复 |
| 长时间运行 | 分段执行,每段结束保存进度 |
| 人机交互 | 等待用户输入时保存状态 |
| 调试回放 | 从任意检查点重新执行 |
| 版本分支 | 从同一个检查点创建多个分支执行 |
flowchart TB
CP[Checkpoint] --> R1[恢复为 Run A]
CP --> R2[恢复为 Run B]
CP --> R3[恢复为 Run C]
R1 -->|不同输入| O1[结果 A]
R2 -->|不同输入| O2[结果 B]
R3 -->|不同输入| O3[结果 C]
9. 核心概念关系图
让我们把所有核心概念联系起来,看看它们是如何协作的:
flowchart TB
subgraph "定义层"
WB[WorkflowBuilder<br>工作流建造器] -->|构建| W[Workflow<br>工作流]
E1[Executor 1] --> WB
E2[Executor 2] --> WB
Edge[Edge 边] --> WB
end
subgraph "执行层"
W -->|创建| R[Run<br>运行实例]
R -->|包含多个| SS[SuperStep<br>超步]
SS -->|调用| Ex[Executor 执行]
end
subgraph "运行时"
Ex -->|使用| WC[WorkflowContext<br>工作流上下文]
WC -->|产生| WE[WorkflowEvent<br>工作流事件]
WC -->|管理| State[状态数据]
end
subgraph "持久化层"
SS -->|保存| CP[Checkpoint<br>检查点]
CP -->|包含| State
CP -->|恢复| R
end
- 生命周期视角
sequenceDiagram
participant Dev as 开发者
participant WB as WorkflowBuilder
participant W as Workflow
participant R as Run
participant SS as SuperStep
participant E as Executor
participant WC as WorkflowContext
participant CP as Checkpoint
Note over Dev,CP: 1.定义阶段
Dev->>WB: 创建 WorkflowBuilder
Dev->>WB: 添加 Executors
Dev->>WB: 添加 Edges
WB->>W: Build() 构建
Note over Dev,CP: 2.执行阶段
Dev->>W: 启动 Run
W->>R: 创建运行实例
loop 每个 SuperStep
R->>SS: 开始超步
SS->>E: 调用 Executor
E->>WC: 使用上下文
WC-->>E: 发送消息/读写状态
E-->>SS: 完成处理
SS->>CP: 保存检查点
end
Note over Dev,CP: 3.恢复阶段(可选)
Dev->>CP: 加载检查点
CP->>R: 恢复运行
- 消息流视角,关键理解:
- 消息驱动:Executor 之间通过消息传递数据
- 异步批处理:同一 SuperStep 内的 Executor 可以并行执行
- 边控制流向:Edge 决定消息从哪里到哪里
- 状态隔离:每个 SuperStep 结束时应用状态更新
flowchart LR
subgraph "SuperStep 1"
I[输入消息] --> E1[Executor A]
E1 -->|通过 Edge| M1[消息队列]
end
subgraph "SuperStep 2"
M1 --> E2[Executor B]
M1 --> E3[Executor C]
E2 -->|通过 Edge| M2[消息队列]
E3 -->|通过 Edge| M2
end
subgraph "SuperStep 3"
M2 --> E4[Executor D]
E4 --> O[输出结果]
end
10. 总结
- 记忆口诀
- 工作流程边连接,执行器来做处理
- 超步批量往前走,上下文中读写态
- 事件通知全过程,运行实例跑一次
- 检查点来存进度,随时恢复不怕挂
- 核心概念速查表
| 概念 | 定义 | 关键类 | 核心职责 |
|---|---|---|---|
| Executor | 执行器/处理节点 | Executor, Executor |
处理消息,产生输出 |
| Edge | 连接边 | Edge, EdgeData, DirectEdgeData | 定义消息流向和条件 |
| Workflow | 工作流定义 | Workflow, WorkflowBuilder | 组织 Executor 和 Edge |
| SuperStep | 超步/批量处理周期 | SuperStepEvent, SuperStepStartedEvent | 批量处理消息 |
| WorkflowContext | 工作流上下文 | IWorkflowContext | 提供运行时服务 |
| WorkflowEvent | 工作流事件 | WorkflowEvent, ExecutorEvent | 通知执行状态 |
| Run | 运行实例 | Run, RunStatus | 管理一次执行 |
| Checkpoint | 检查点 | CheckpointInfo, ICheckpointStore | 保存和恢复状态 |
二、条件工作流
1. 业务场景:企业邮箱智能卫士
- 角色:客服专员、内容安全团队、邮件助手 Agent
- 输入:每天成百上千封顾客邮件,内容既包含正常咨询也夹杂垃圾信息
- 流程:
- Spam Detection Agent 判定邮件是否为垃圾
- Conditional Edge 决策下一步:
- Legit → Email Assistant Agent → SendEmail 执行器
- Spam → HandleSpam 执行器,记录原因并触发人工跟进
- 价值:极大降低人工分拣成本,同时保留风控可观测性,可扩展多级路由(Switch/Multi-Selection)。
2. 核心构建块速览
| 构建块 | 中文描述 | 关注点 |
|---|---|---|
| Step / Executor | 执行单元,输入类型 → 输出类型 | SpamDetectionExecutor : Executor<ChatMessage, DetectionResult> |
| Edge | 步骤之间的连接 | AddEdge(A, B) 表示顺序执行 |
| Conditional Edge | 带决策函数的 Edge | AddEdge(A, B, condition: Func<object?, bool>) |
| Shared State | 跨步骤共享数据 | context.QueueStateUpdateAsync() / ReadStateAsync() |
| WorkflowEvent | 运行事件流 | WorkflowOutputEvent, ExecutorCompletedEvent 等 |
| 3. 实践流程 |
flowchart LR
A[客户邮件] --> B[SpamDetectionExecutor]
B -->|IsSpam == false| C[EmailAssistantExecutor]
C --> D[SendEmailExecutor]
B -->|IsSpam == true| E[HandleSpamExecutor]
D --> F[WorkflowOutputEvent]
E --> F
- 步骤 1:定义领域模型与共享状态键
using System.Text.Json.Serialization;
using Microsoft.Agents.AI.Workflows;
// 统一的共享状态 scope
internal static class EmailStateConstants
{
public const string EmailStateScope = "EmailState";
}
/// <summary>
/// 垃圾邮件检测结果
/// Spam 判断结果(is_spam + reason)
/// </summary>
public sealed class DetectionResult
{
[JsonPropertyName("is_spam")]
public bool IsSpam { get; set; }
/// <summary>
/// 判定理由(用于审计和调试)
/// </summary>
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
/// <summary>
/// 邮件ID(用于关联 Shared State 中的原始内容)
/// </summary>
[JsonIgnore] // 不参与 JSON 序列化
public string EmailId { get; set; } = string.Empty;
}
/// <summary>
/// 邮件内容(存储在 Shared State 中)
/// 在 Shared State 中保存原始正文,便于下游执行器读取
/// </summary>
internal sealed class Email
{
[JsonPropertyName("email_id")]
public string EmailId { get; set; } = string.Empty;
[JsonPropertyName("email_content")]
public string EmailContent { get; set; } = string.Empty;
}
/// <summary>
/// 邮件助手生成的回复内容
/// </summary>
public sealed class EmailResponse
{
[JsonPropertyName("response")]
public string Response { get; set; } = string.Empty;
}
步骤 2:实现 SpamDetectionExecutor(入口节点)
职责:
- 生成唯一 EmailId,把邮件正文写入 Shared State
- 调用 SpamDetectionAgent,拿到结构化 DetectionResult
- 把 EmailId 写回结果,供下游执行器继续查找 Shared State
var spamDetectionAgent = new ChatClientAgent(
ChatClient,
new ChatClientAgentOptions(instructions: "You are a spam detection assistant that labels spam emails with reasons.")
{
ChatOptions = new ChatOptions
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
}
});
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
_spamDetectionAgent = spamDetectionAgent;
}
public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var trackedEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(trackedEmail.EmailId, trackedEmail, scopeName: EmailStateConstants.EmailStateScope, cancellationToken);
var agentResponse = await _spamDetectionAgent.RunAsync(message, cancellationToken: cancellationToken);
var detection = JsonSerializer.Deserialize<DetectionResult>(agentResponse.Text)
?? throw new InvalidOperationException("无法解析 Spam Detection 响应。");
detection.EmailId = trackedEmail.EmailId;
return detection;
}
}
步骤 3:实现下游执行器(EmailAssistant + SendEmail + HandleSpam)
EmailAssistantExecutor:读取 Shared State 中的正文,再调用第二个 Agent 输出 JSON 回复
SendEmailExecutor:模拟发送,使用 context.YieldOutputAsync 输出成功消息
HandleSpamExecutor:命中垃圾时输出风险提示,若误入则抛出异常,帮助我们在调试阶段及时发现边误判
var emailAssistantAgent = new ChatClientAgent(
ChatClient,
new ChatClientAgentOptions(instructions: "You are an enterprise email assistant. Provide professional Chinese responses.")
{
ChatOptions = new ChatOptions
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
});
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
_emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (message.IsSpam)
{
throw new InvalidOperationException("Spam 邮件不应进入 EmailAssistantExecutor。");
}
var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope, cancellationToken)
?? throw new InvalidOperationException("找不到对应 Email 内容。");
var agentResponse = await _emailAssistantAgent.RunAsync(email.EmailContent, cancellationToken: cancellationToken);
return JsonSerializer.Deserialize<EmailResponse>(agentResponse.Text)
?? throw new InvalidOperationException("无法解析 Email Assistant 响应。");
}
}
internal sealed class SendEmailExecutor() : Executor<EmailResponse>("SendEmailExecutor")
{
public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
await context.YieldOutputAsync($"Email sent: {message.Response}", cancellationToken);
}
}
internal sealed class HandleSpamExecutor() : Executor<DetectionResult>("HandleSpamExecutor")
{
public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (!message.IsSpam)
{
throw new InvalidOperationException("非垃圾邮件不应进入 HandleSpamExecutor。");
}
await context.YieldOutputAsync($"Spam captured: {message.Reason}", cancellationToken);
}
}
步骤 4:使用 Conditional Edge 构建工作流
核心代码:
GetCondition(bool expected):返回 Func<object?, bool>,转型后比对 DetectionResult.IsSpam
AddEdge(spamDetection, emailAssistant, condition: GetCondition(false))
AddEdge(spamDetection, handleSpam, condition: GetCondition(true))
WithOutputFrom(handleSpam, sendEmail):串联事件输出
运行结果将随输入邮件内容决定走向,实现真正的 if-else 路由。
Func<object?, bool> BuildCondition(bool expectedSpamFlag) =>
detection => detection is DetectionResult dr && dr.IsSpam == expectedSpamFlag;
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var conditionalWorkflow = new WorkflowBuilder(spamDetectionExecutor)
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: BuildCondition(false))
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: BuildCondition(true))
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
.Build();
步骤 5:运行工作流并观察事件
使用 InProcessExecution.StreamAsync 获取 StreamingRun
通过 run.TrySendMessageAsync(new TurnToken(emitEvents: true)) 打开事件推送
订阅 run.WatchStreamAsync(),把 WorkflowEvent 统一输出
static async Task RunWorkflowAsync(Workflow conditionalWorkflow, string scenarioName, string emailBody)
{
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"场景:{scenarioName}");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
await using StreamingRun run = await InProcessExecution.StreamAsync(conditionalWorkflow, new ChatMessage(ChatRole.User, emailBody));
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorCompletedEvent completed:
completed.Display();
break;
case WorkflowOutputEvent outputEvent:
outputEvent.Display();
break;
}
}
}
var legitimateEmail = @"客服团队你好,我想确认上周提交的采购订单是否已经发货,如果还有缺少信息请告知。";
await RunWorkflowAsync(conditionalWorkflow, "正常咨询 → EmailAssistant 分支", legitimateEmail);
步骤 6:触发垃圾邮件分支以验证条件
使用 Resources/spam.txt(与源码一致)或自定义明显的垃圾内容
预期路径:SpamDetectionExecutor → HandleSpamExecutor
输出信息应包含 Spam captured,表明 Conditional Edge 跳转正确
var spamEmail = @"令人惊喜的投资机会!只需支付保证金即可在 24 小时内获得 10 倍收益,点击可疑链接领取奖励。";
await RunWorkflowAsync(conditionalWorkflow, "垃圾邮件 → HandleSpam 分支", spamEmail);
4. 高级场景与最佳实践
条件函数设计技巧
始终进行类型检查:result is DetectionResult detection
将条件函数提取为独立方法,避免在 AddEdge 中写长匿名函数
尽量保持纯函数(无副作用),保证工作流可预测
调试建议
打印 ExecutorCompletedEvent 的输入输出摘要,若条件无法命中可快速定位
可将 WorkflowEvent 推送到前端(SignalR/AGUI)实时可视化
常见错误与解决方案
- 条件未命中:确认 WithOutputFrom 是否把输出连接到正确节点
- 状态缺失:运行前先确认入口执行器已把数据写入 Shared State
- 类型不匹配:确保执行器泛型签名与实际输入输出一致
5. 实际应用示例:多级风控 + 工单系统
在企业环境中,可将工作流作为“一级过滤器”,再将输出接入更多节点:
- HandleSpamExecutor 输出后追加 RiskAuditExecutor,把 spam 详情写入 Cosmos DB 或内部审计系统
- 将 EmailAssistantExecutor 的结果包装成 WorkflowAgent,供客服 Copilot 直接调用,实现自动回信
- 若检测为高风险,可通过 AddEdge 连到 ManualReviewExecutor,在其中调用 Teams / AGUI 通知人工审批
var extendedWorkflow = new WorkflowBuilder(spamDetectionExecutor)
.AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: BuildCondition(false))
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
.AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: BuildCondition(true))
.AddEdge(handleSpamExecutor, riskAuditExecutor)
.AddEdge(riskAuditExecutor, manualReviewExecutor, condition: BuildCondition(true))
.Build();
通过“Conditional Edge + 多级 Executor”,可以轻松演进到复杂的内容安全与客服协同系统。
三、分支工作流
1. 业务场景:企业智能邮件安全中心
前面我们实现了简单的二分法邮件过滤(垃圾/正常)。但在企业实际场景中,邮件安全判定往往需要更精细的分类:
- 角色:企业安全团队、客服专员、AI 安全助手
- 挑战:每天处理数千封邮件,其中包含:
- 正常邮件(NotSpam):客户咨询、业务往来
- 垃圾邮件(Spam):明显的诈骗、广告
- 不确定邮件(Uncertain):可能是钓鱼邮件,需要人工审核
2. 业务流程
flowchart LR
A[客户邮件] --> B[SpamDetectionExecutor]
B -->|NotSpam| C[EmailAssistantExecutor]
C --> D[SendEmailExecutor]
B -->|Spam| E[HandleSpamExecutor]
B -->|Uncertain| F[HandleUncertainExecutor]
D --> G[WorkflowOutput]
E --> G
F --> G
3. 核心构建块
| 构建块 | 中文描述 | 本课关注点 |
|---|---|---|
| AddSwitch | Switch-Case 路由构建器 | builder.AddSwitch(sourceStep, switchBuilder => ...) |
| AddCase | 单个 Case 分支 | switchBuilder.AddCase(condition, targetStep) |
| WithDefault | Default 兜底分支 | switchBuilder.WithDefault(defaultStep) |
| Enum 条件 | 基于枚举的精确匹配 | SpamDecision.NotSpam / Spam / Uncertain |
| Shared State | 跨步骤共享数据 | 存储邮件内容供下游读取 |
API 对比:
// 多个 Conditional Edge
builder
.AddEdge(source, target1, condition: c => c.IsSpam == false)
.AddEdge(source, target2, condition: c => c.IsSpam == true);
// Switch-Case
builder.AddSwitch(source, sb => sb
.AddCase(c => c.Decision == NotSpam, target1)
.AddCase(c => c.Decision == Spam, target2)
.WithDefault(target3)
);
3. 实践流程
核心代码结构:
var workflow = new WorkflowBuilder(spamDetectionExecutor)
.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
.AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
.AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
.WithDefault(handleUncertainExecutor) // 兜底机制
)
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor)
.Build();
- 步骤 1:定义领域模型与三分类枚举
public enum SpamDecision
{
NotSpam, // 正常邮件,可以自动回复
Spam, // 明显的垃圾邮件,需要拦截
Uncertain // 不确定的邮件,需要人工审核
}
public sealed class DetectionResult
{
public bool IsSpam { get; set; } // 只有两种状态
/// <summary>
/// 检测决策(NotSpam / Spam / Uncertain)
/// </summary>
[JsonPropertyName("spam_decision")]
[JsonConverter(typeof(JsonStringEnumConverter))] // JSON 序列化为字符串
public SpamDecision spamDecision { get; set; }
}
步骤 2:实现 Spam Detection Executor(支持三分类)
核心职责
- 生成唯一 EmailId:为每封邮件分配唯一标识
- 保存到 Shared State:将邮件内容存储,供下游 Executor 读取
- 调用 AI Agent 检测:使用结构化输出获取 DetectionResult
- 返回检测结果:包含 spamDecision 枚举值和判定理由
关键技术点
结构化输出:ChatResponseFormat.ForJsonSchema
() 枚举序列化:JsonStringEnumConverter 确保 JSON 中使用字符串而非数字
状态管理:context.QueueStateUpdateAsync() 写入邮件内容
三分类提示词:指示 AI 在不确定时选择 Uncertain
var spamDetectionAgent = new ChatClientAgent(
ChatClient,
new ChatClientAgentOptions(
instructions: @"你是一个垃圾邮件检测助手。判定规则:
- NotSpam: 明显的正常业务邮件(订单查询、售后咨询等)
- Spam: 明显的垃圾邮件(诈骗、广告、钓鱼)
- Uncertain: 无法明确判断,包含可疑元素但不确定(如含可疑链接但内容模糊)
对于模棱两可的情况,倾向于标记为 Uncertain 以保证安全。"
)
{
ChatOptions = new ChatOptions
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
}
}
);
/// <summary>
/// 垃圾邮件检测执行器
/// 输入: ChatMessage(邮件内容)
/// 输出: DetectionResult(三分类结果)
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
private readonly AIAgent _spamDetectionAgent;
public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
{
_spamDetectionAgent = spamDetectionAgent;
}
public override async ValueTask<DetectionResult> HandleAsync(
ChatMessage message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 1.生成唯一邮件ID并保存内容到 Shared State
var trackedEmail = new Email
{
EmailId = Guid.NewGuid().ToString("N"),
EmailContent = message.Text
};
await context.QueueStateUpdateAsync(
trackedEmail.EmailId,
trackedEmail,
scopeName: EmailStateConstants.EmailStateScope,
cancellationToken
);
// 2.调用 AI Agent 进行三分类检测
var agentResponse = await _spamDetectionAgent.RunAsync(
message,
cancellationToken: cancellationToken
);
// 3.解析结构化输出
var detection = JsonSerializer.Deserialize<DetectionResult>(agentResponse.Text)
?? throw new InvalidOperationException("无法解析 Spam Detection 响应");
// 4.关联 EmailId(供下游 Executor 查找原始内容)
detection.EmailId = trackedEmail.EmailId;
return detection;
}
}
- 步骤 3:实现三个下游处理器,根据 Switch-Case 的三个分支,需要实现三个下游 Executor:
flowchart LR
A[SpamDetectionExecutor] -->|NotSpam| B[EmailAssistantExecutor]
B --> C[SendEmailExecutor]
A -->|Spam| D[HandleSpamExecutor]
A -->|Uncertain| E[HandleUncertainExecutor]
处理器职责
| Executor | 触发条件 | 主要职责 |
|---|---|---|
| EmailAssistantExecutor | NotSpam | 调用 AI 生成专业回复 |
| SendEmailExecutor | EmailAssistant 之后 | 模拟发送邮件 |
| HandleSpamExecutor | Spam | 记录拦截信息 |
| HandleUncertainExecutor | Uncertain (Default) | 标记为待审核 |
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 1. 正常邮件分支:EmailAssistant + SendEmail
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
var emailAssistantAgent = new ChatClientAgent(
chatClient,
new ChatClientAgentOptions(
instructions: "你是一个企业邮件助手,为客户邮件生成专业、友好的中文回复。"
)
{
ChatOptions = new ChatOptions
{
ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
}
}
);
/// <summary>
/// 邮件助手执行器(仅处理正常邮件)
/// 输入: DetectionResult(必须是 NotSpam)
/// 输出: EmailResponse(AI 生成的回复)
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
private readonly AIAgent _emailAssistantAgent;
public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
{
_emailAssistantAgent = emailAssistantAgent;
}
public override async ValueTask<EmailResponse> HandleAsync(
DetectionResult message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 防御性检查:确保只处理正常邮件
if (message.spamDecision == SpamDecision.Spam)
{
throw new InvalidOperationException(
"EmailAssistantExecutor 不应处理垃圾邮件,请检查路由配置。"
);
}
// 1.从 Shared State 读取原始邮件内容
var email = await context.ReadStateAsync<Email>(
message.EmailId,
scopeName: EmailStateConstants.EmailStateScope,
cancellationToken
) ?? throw new InvalidOperationException($"找不到 EmailId={message.EmailId} 的邮件内容");
// 2.调用 AI Agent 生成回复
var agentResponse = await _emailAssistantAgent.RunAsync(
email.EmailContent,
cancellationToken: cancellationToken
);
// 3.解析结构化输出
return JsonSerializer.Deserialize<EmailResponse>(agentResponse.Text)
?? throw new InvalidOperationException("无法解析 Email Assistant 响应");
}
}
/// <summary>
/// 邮件发送执行器(模拟发送)
/// 输入: EmailResponse
/// 输出: 工作流事件
/// </summary>
internal sealed class SendEmailExecutor() : Executor<EmailResponse>("SendEmailExecutor")
{
public override async ValueTask HandleAsync(
EmailResponse message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 模拟邮件发送(实际项目中可调用 SMTP、SendGrid 等服务)
await context.YieldOutputAsync(
$"邮件已发送: {message.Response}",
cancellationToken
);
}
}
/// <summary>
/// 2. 垃圾邮件分支 + 不确定邮件分支
/// 输入: DetectionResult(必须是 Spam)
/// 输出: 工作流事件
/// </summary>
internal sealed class HandleSpamExecutor() : Executor<DetectionResult>("HandleSpamExecutor")
{
public override async ValueTask HandleAsync(
DetectionResult message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 防御性检查:确保只处理垃圾邮件
if (message.spamDecision != SpamDecision.Spam)
{
throw new InvalidOperationException(
"HandleSpamExecutor 只应处理 Spam 类型的邮件,请检查路由配置。"
);
}
// 记录垃圾邮件(实际项目中可写入数据库或日志系统)
await context.YieldOutputAsync(
$"垃圾邮件已拦截: {message.Reason}",
cancellationToken
);
}
}
/// <summary>
/// 不确定邮件处理执行器(Default Case)
/// 输入: DetectionResult(应该是 Uncertain,但也处理其他未匹配情况)
/// 输出: 工作流事件
/// </summary>
internal sealed class HandleUncertainExecutor() : Executor<DetectionResult>("HandleUncertainExecutor")
{
public override async ValueTask HandleAsync(
DetectionResult message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 防御性检查:确保只处理不确定邮件
if (message.spamDecision != SpamDecision.Uncertain)
{
throw new InvalidOperationException(
"HandleUncertainExecutor 只应处理 Uncertain 类型的邮件(或作为 Default Case)。"
);
}
// 1.从 Shared State 读取原始邮件内容(用于人工审核)
var email = await context.ReadStateAsync<Email>(
message.EmailId,
scopeName: EmailStateConstants.EmailStateScope,
cancellationToken
);
// 2.输出待审核信息
await context.YieldOutputAsync(
$"不确定邮件需人工审核:\n" +
$"原因: {message.Reason}\n" +
$"内容预览: {email?.EmailContent?.Substring(0, Math.Min(100, email.EmailContent.Length))}...",
cancellationToken
);
}
}
步骤 4:使用 AddSwitch 构建 Switch-Case 工作流
为了保持代码简洁,我们定义一个工厂方法来生成条件函数,为什么这样设计?
- 类型安全:使用 is 模式匹配确保类型正确
- 可复用:通过参数化生成不同的条件函数
- 可读性强:条件逻辑清晰明确
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 条件函数工厂方法
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
/// <summary>
/// 创建基于 SpamDecision 枚举的条件函数
/// </summary>
/// <param name="expectedDecision">期望的垃圾邮件判定结果</param>
/// <returns>条件函数,用于 Switch-Case 路由</returns>
Func<object?, bool> GetCondition(SpamDecision expectedDecision) =>
detectionResult =>
detectionResult is DetectionResult result &&
result.spamDecision == expectedDecision;
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 创建执行器实例
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 使用 AddSwitch 构建 Switch-Case 工作流
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
var switchCaseWorkflow = new WorkflowBuilder(spamDetectionExecutor)
// AddSwitch: 定义 Switch-Case 路由
.AddSwitch(spamDetectionExecutor, switchBuilder =>
switchBuilder
// Case 1: NotSpam → EmailAssistant
.AddCase(
GetCondition(expectedDecision: SpamDecision.NotSpam),
emailAssistantExecutor
)
// Case 2: Spam → HandleSpam
.AddCase(
GetCondition(expectedDecision: SpamDecision.Spam),
handleSpamExecutor
)
// Default: Uncertain (或任何未匹配的情况) → HandleUncertain
.WithDefault(
handleUncertainExecutor
)
)
// EmailAssistant 之后自动发送邮件
.AddEdge(emailAssistantExecutor, sendEmailExecutor)
// 配置输出节点(三个终点执行器都会产生输出)
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor)
.Build();
5. 测试
- 测试策略
| 邮件类型 | 预期路径 | 验证目标 |
|---|---|---|
| 正常邮件 | NotSpam → EmailAssistant → SendEmail | 验证 Case 1 |
| 垃圾邮件 | Spam → HandleSpam | 验证 Case 2 |
| 不确定邮件 | Uncertain → HandleUncertain | 验证 Default |
- 为了避免重复代码,我们定义一个统一的运行函数
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 通用工作流运行函数
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
/// <summary>
/// 运行工作流并输出事件流
/// </summary>
static async Task RunWorkflowAsync(
Workflow workflow,
string scenarioName,
string emailContent)
{
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"测试场景:{scenarioName}");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"邮件内容:{emailContent.Substring(0, Math.Min(80, emailContent.Length))}...\n");
await using StreamingRun run = await InProcessExecution.StreamAsync(
workflow,
new ChatMessage(ChatRole.User, emailContent)
);
// 发送 Turn Token,启用事件推送
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
// 订阅事件流
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorCompletedEvent completed:
completed.Display();
break;
case WorkflowOutputEvent outputEvent:
outputEvent.Display();
break;
}
}
Console.WriteLine();
}
测试正常邮件路径(Case 1: NotSpam)
预期行为:
- SpamDetectionExecutor 判定为 NotSpam
- Switch 路由到 EmailAssistantExecutor
- EmailAssistantExecutor 生成回复
- SendEmailExecutor 发送邮件
- 输出:“邮件已发送: ...“
// 测试场景 1:正常客户咨询邮件
var legitimateEmail = @"
尊敬的客服团队:
您好!我是贵公司的长期客户,订单号为 #2025-001。
我想确认一下上周提交的采购订单是否已经安排发货。
如果需要补充任何信息,请随时告知。
期待您的回复,谢谢!
客户:张先生
";
await RunWorkflowAsync(
switchCaseWorkflow,
"正常邮件 → EmailAssistant → SendEmail",
legitimateEmail
);
测试垃圾邮件路径(Case 2: Spam)
预期行为:
- SpamDetectionExecutor 判定为 Spam
- Switch 路由到 HandleSpamExecutor
- 输出:"垃圾邮件已拦截: ..."
- 不会进入 EmailAssistant 分支
// 测试场景 2:明显的垃圾邮件
var spamEmail = @"
恭喜您中奖啦!
您已被选中获得 100 万现金大奖!
立即点击以下链接领取:
http://suspicious-site.com/claim-prize
仅限今日有效,过期作废!
不需要任何手续费,完全免费!
快速行动,机不可失!
";
await RunWorkflowAsync(
switchCaseWorkflow,
"垃圾邮件 → HandleSpam",
spamEmail
);
测试不确定邮件路径(Default Case: Uncertain)
预期行为:
- SpamDetectionExecutor 判定为 Uncertain
- Switch 路由到 HandleUncertainExecutor(Default Case)
- 输出:“不确定邮件需人工审核: ...“
- 包含邮件内容预览,便于人工判断
为什么需要 Default Case?
兜底机制:即使 AI 无法明确判断,也有对应处理流程
安全优先:可疑邮件不会被错误归类为正常或垃圾
人工介入:为需要深度审核的情况预留通道
// 测试场景 3:不确定的可疑邮件(来自源码的 ambiguous_email.txt)
var uncertainEmail = @"
主题:需要验证您的账户
尊敬的客户:
我们检测到您的账户存在异常活动,需要验证您的身份以确保账户安全。
请登录您的账户并完成验证流程,以继续使用服务。
账户详情:
- 用户名:johndoe@contoso.com
- 最后登录:08/15/2025
- 登录地点:西雅图,华盛顿州
- 登录设备:移动设备
这是一项自动安全措施。如果您认为此邮件是错误发送的,请立即联系我们的支持团队。
此致
安全团队
客户服务部门
";
await RunWorkflowAsync(
switchCaseWorkflow,
"不确定邮件 → HandleUncertain (Default)",
uncertainEmail
);
6. Default Case 的最佳实践
推荐使用的场景:
- 枚举值可能扩展:未来可能添加新的分类类型
- 防御性编程:确保所有情况都有处理路径
- 不确定判断:AI 模型输出可能包含"不确定"状态
- 兜底机制:为意外情况提供安全降级
不需要 Default 的场景:
- 枚举值完全穷尽且不会变化
- 每个 Case 都已明确定义
- 希望未匹配时抛出异常(快速失败)
Default Case 的三种处理策略
策略 1:人工审核(推荐)
优势:安全优先,可疑情况由人工判断
.WithDefault(humanReviewExecutor)
策略 2:保守处理
优势:适用于安全敏感场景
.WithDefault(treatAsSpamExecutor) // 宁可误拦,不可放过
策略 3:日志记录
优势:适用于非关键路径
.WithDefault(logAndIgnoreExecutor) // 记录后忽略
Default Case 的防御性检查,即使配置了 Default,执行器内部仍应进行验证:
为什么需要双重检查?
- 捕获配置错误:及早发现路由配置问题
- 明确职责边界:执行器清楚知道自己应该处理什么
- 便于调试:输出警告信息,快速定位问题
public override async ValueTask HandleAsync(DetectionResult message, ...)
{
// 推荐:检查是否是预期的类型
if (message.spamDecision != SpamDecision.Uncertain)
{
// 记录警告或抛出异常
Console.WriteLine($"警告:HandleUncertainExecutor 收到非预期类型: {message.spamDecision}");
}
// 继续处理...
}
7. 高级场景与最佳实践
多维度分类路由,在实际企业场景中,可能需要更细粒度的分类:
优势:
- 更精细的安全分级
- 不同风险级别有不同的响应策略
- 易于根据业务需求调整分类标准
public enum EmailRiskLevel
{
Safe, // 完全安全
LowRisk, // 低风险(可能是营销邮件)
MediumRisk, // 中风险(可疑但不确定)
HighRisk, // 高风险(疑似钓鱼)
Malicious // 恶意(确认的威胁)
}
// 对应的 Switch-Case 工作流
.AddSwitch(riskAssessmentExecutor, sb => sb
.AddCase(GetRiskCondition(Safe), autoReplyExecutor)
.AddCase(GetRiskCondition(LowRisk), marketingFilterExecutor)
.AddCase(GetRiskCondition(MediumRisk), humanReviewExecutor)
.AddCase(GetRiskCondition(HighRisk), securityTeamAlertExecutor)
.AddCase(GetRiskCondition(Malicious), blockAndReportExecutor)
.WithDefault(escalateExecutor)
)
动态 Case 构建,某些场景下,Case 分支可能需要根据配置动态生成:
适用场景:
- 需要热更新分类规则
- 多租户系统,每个租户有不同的分类策略
- A/B 测试不同的路由策略
// 从配置加载分类策略
var classificationRules = LoadClassificationRules();
var switchBuilder = new WorkflowBuilder(classifierExecutor);
var innerSwitchBuilder = switchBuilder.AddSwitch(classifierExecutor, sb => {
foreach (var rule in classificationRules)
{
sb.AddCase(
result => result is ClassificationResult r && r.Category == rule.Category,
rule.TargetExecutor
);
}
return sb.WithDefault(defaultExecutor);
});
常见错误与解决方案
错误 1:条件函数重叠
解决方案:确保条件互斥或使用优先级顺序
错误 2:忘记配置输出
解决方案:确保所有终点执行器都在 WithOutputFrom 中
错误 3:Default Case 中的逻辑错误
解决方案:始终进行防御性检查
// 1.条件函数重叠
// 错误:两个条件可能同时为 true
.AddCase(r => r is DetectionResult dr && dr.Reason.Contains("spam"), executor1)
.AddCase(r => r is DetectionResult dr && dr.Reason.Contains("phishing"), executor2)
// 如果 Reason 同时包含 "spam" 和 "phishing",会路由到哪里?
// 正确:使用枚举确保互斥
.AddCase(GetCondition(SpamDecision.Spam), executor1)
.AddCase(GetCondition(SpamDecision.Phishing), executor2)
// 2:忘记配置输出
// 错误:HandleSpamExecutor 的输出没有配置
var workflow = new WorkflowBuilder(spamDetectionExecutor)
.AddSwitch(...)
.WithOutputFrom(sendEmailExecutor) // 只配置了一个输出
.Build();
// 正确:配置所有输出节点
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor)
// 3. Default Case 中的逻辑错误
// 错误:Default 执行器假设只会收到 Uncertain
public override async ValueTask HandleAsync(DetectionResult message, ...)
{
// 直接使用,没有检查
await ProcessUncertainEmail(message); // 如果 message 是其他类型呢?
}
// 正确:验证输入类型
if (message.spamDecision != SpamDecision.Uncertain)
{
throw new InvalidOperationException($"意外的决策类型: {message.spamDecision}");
}
8. 实际应用示例:企业级内容审核系统
在基础上,我们可以将 Switch-Case 模式扩展到更广泛的内容审核场景:
flowchart TD
A[用户提交内容] --> B[内容审核Executor]
B -->|安全| C[自动发布]
B -->|轻微违规| D[自动过滤敏感词]
B -->|中度违规| E[人工审核]
B -->|严重违规| F[拒绝并记录]
B -->|不确定| G[AI二次审核]
以下代码展示如何构建一个企业级内容审核工作流:
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 内容审核分类枚举
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
public enum ContentModerationType
{
Safe, // 安全内容,可直接发布
MinorViolation, // 轻微违规,自动过滤后发布
ModerateRisk, // 中度风险,需要人工审核
SevereViolation,// 严重违规,拒绝发布
Uncertain // 不确定,需要二次审核
}
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 构建多级内容审核工作流
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
var contentModerationWorkflow = new WorkflowBuilder(contentAnalysisExecutor)
.AddSwitch(contentAnalysisExecutor, sb => sb
// Case 1: 安全内容 → 直接发布
.AddCase(
GetModerationCondition(ContentModerationType.Safe),
autoPublishExecutor
)
// Case 2: 轻微违规 → 过滤后发布
.AddCase(
GetModerationCondition(ContentModerationType.MinorViolation),
filterAndPublishExecutor
)
// Case 3: 中度风险 → 人工审核
.AddCase(
GetModerationCondition(ContentModerationType.ModerateRisk),
humanModerationExecutor
)
// Case 4: 严重违规 → 拒绝并记录
.AddCase(
GetModerationCondition(ContentModerationType.SevereViolation),
rejectAndLogExecutor
)
// Default: 不确定 → AI 二次审核
.WithDefault(
aiSecondaryReviewExecutor
)
)
// 配置所有可能的输出节点
.WithOutputFrom(
autoPublishExecutor,
filterAndPublishExecutor,
humanModerationExecutor,
rejectAndLogExecutor,
aiSecondaryReviewExecutor
)
.Build();
扩展点,基于这个框架,可以进一步扩展:
- 多级审核流程
// AI 二次审核后可能再次路由到人工审核
.AddEdge(aiSecondaryReviewExecutor, humanModerationExecutor,
condition: result => result.ConfidenceScore < 0.7)
- 审核结果统计
// 每个执行器都可以记录统计数据到 Shared State
await context.QueueStateUpdateAsync("stats", new ModerationStats
{
TotalProcessed = total,
AutoApproved = autoApproved,
HumanReviewed = humanReviewed
});
- 实时监控仪表盘
// 通过 WorkflowEvent 推送到前端实时显示
await context.AddEventAsync(new ModerationDashboardEvent
{
ModerationType = result.Type,
ProcessingTime = elapsed,
ExecutorName = this.Name
});
实际部署建议,在生产环境中使用时:
- 日志记录:所有分支都记录详细日志
- 性能监控:跟踪每个 Case 的执行时间
- A/B 测试:对比不同分类策略的效果
- 人工反馈:将人工审核结果反馈给 AI 模型训练
四、循环工作流
1. 业务场景:智能客服工单质检与自动改进
- 背景:某电商平台每天产生大量客服工单回复。AI 助手先生成回复草稿,然后经过多维度质检(礼貌度、准确性、合规性),不合格则自动改进,直到达标或转人工处理。
- 角色:回复生成 Agent、质检 Agent、智能改进 Executor、人工客服(兜底)
- 痛点:
- 初稿常见问题:语气生硬、信息不全、包含敏感词
- 需要多维度评分:礼貌度 ≥ 85、准确性 ≥ 90、合规性 100%
- 需要可控的重试次数,避免无限循环
- 需要可观测的状态,便于人工介入和审计
- 目标:构建可配置的 Loop,自动迭代"生成 → 质检 → 改进",只要满足门槛即可退出,否则转人工处理
示例输入数据:
var tickets = new []
{
new { Id = "TKT-2025-001", Query = "我买的手机充不进电了", Category = "电子产品", Priority = "High" },
new { Id = "TKT-2025-002", Query = "订单3天没发货,能退款吗", Category = "物流问题", Priority = "Medium" }
};
2. 核心概念
| 概念 | 中文释义 | 在 Loop 中的作用 |
|---|---|---|
| Loop Edge | 循环边 | 将质检结果重新路由到生成/改进节点,实现多次尝试 |
| Self-Correction Pipeline | 自我修正管道 | 组合生成、质检、改进三个步骤,自动闭环 |
| Multi-Dimension Quality | 多维度质检 | 礼貌度、准确性、合规性三重评分,全部达标才通过 |
| Max Attempts | 最大尝试次数 | 防止无限循环,触发转人工策略 |
| WorkflowContext State | 工作流上下文状态 | 记录 AttemptCount、各维度得分、问题列表 |
| Streaming Events | 流式事件 | 实时观测每轮执行情况,生成审计日志 |
3. 实践流程
步骤 1:准备业务数据与质检基线
我们使用三份工单请求,模拟不同优先级和问题类型,并设置多维度质检阈值。
using System.Text.Json.Serialization;
internal record TicketRequest(string Id, string Query, string Category, string Priority);
// 质检结果的结构化输出模型
internal class QualityReportDto
{
[JsonPropertyName("politenessScore")]
public int PolitenessScore { get; set; }
[JsonPropertyName("accuracyScore")]
public int AccuracyScore { get; set; }
[JsonPropertyName("compliancePassed")]
public bool CompliancePassed { get; set; }
[JsonPropertyName("issues")]
public List<QualityIssueDto> Issues { get; set; } = new();
}
internal class QualityIssueDto
{
[JsonPropertyName("type")]
public string Type { get; set; } = "";
[JsonPropertyName("description")]
public string Description { get; set; } = "";
[JsonPropertyName("scoreImpact")]
public int ScoreImpact { get; set; }
}
internal record QualityIssue(string Type, string Description, int ScoreImpact);
var ticketRequests = new []
{
new TicketRequest("TKT-2025-001", "我买的手机充不进电了,什么情况?", "电子产品", "High"),
new TicketRequest("TKT-2025-002", "订单已经3天没发货,能退款吗?", "物流问题", "Medium"),
new TicketRequest("TKT-2025-003", "会员积分为什么突然清零了?", "账户问题", "Low")
};
// 质检标准:礼貌度 ≥ 85,准确性 ≥ 90,合规性必须 100%
// 注意:阈值设置较高,配合第一次生成简化版本,确保能体现循环改进过程
const int politenessThreshold = 85;
const int accuracyThreshold = 90;
new
{
工单总数 = ticketRequests.Length,
礼貌度阈值 = politenessThreshold,
准确性阈值 = accuracyThreshold,
合规性要求 = "100% 通过",
示例工单 = ticketRequests.First()
}.Display();
步骤 2:实现基础循环(生成 → 质检)
先定义两个 Executor:
- ReplyDraftExecutor:根据客户问题生成回复草稿(模拟 AI 产出)。
- QualityCheckExecutor:执行多维度质检,根据得分决定是否继续循环。
Loop 逻辑:当任一维度不达标时,发送 QCSignal.Revise,将结果回流到生成节点。
AI 集成说明,AI 的三大作用:
- 生成回复 (ReplyDraftExecutor)
- 使用 AI 根据客户问题生成专业的客服回复
- Prompt 包含:客户问题、产品类别、优先级、回复要求
- 质检评分 (QualityCheckExecutor)
- AI 担任质检专家角色,对回复进行多维度评分
- 返回结构化 JSON:礼貌度、准确性、合规性 + 具体问题列表
- 自动解析评分结果,判断是否通过质量门槛
- 智能改进 (IntelligentImproveExecutor)
- 根据质检报告中的问题列表,AI 针对性优化回复内容
- 礼貌度低 → 增加感谢语和尊称
- 准确性低 → 补充具体解决方案和时间
- 合规性问题 → 移除敏感词,规范表述
技术亮点:
使用结构化输出 (GetResponseAsync
) 确保类型安全和自动反序列化 渐进式生成策略:第一次故意生成简化版本(简短、缺少礼貌用语),确保触发循环改进
高阈值质检:礼貌度≥85、准确性≥90,配合初始低质量输出,体现多次迭代价值
上下文传递:通过 WorkflowContext 保存改进后的内容
迭代优化:每次改进都基于上一次的质检反馈
internal enum QCSignal
{
Init,
Revise
}
internal record ReplyDraft(string TicketId, string Content, int Attempt);
internal record QualityReport(string TicketId, int PolitenessScore, int AccuracyScore, bool CompliancePassed, IReadOnlyList<QualityIssue> Issues);
internal record TicketOutcome(string TicketId, string Status, int Attempts, QualityReport FinalReport);
internal sealed class BaselineReplyDraftExecutor : Executor<QCSignal>
{
private readonly TicketRequest _ticket;
private readonly IChatClient _chatClient;
public BaselineReplyDraftExecutor(TicketRequest ticket, IChatClient chatClient) : base("BaselineReplyDraft")
{
_ticket = ticket;
_chatClient = chatClient;
}
public override async ValueTask HandleAsync(QCSignal message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
int attempt = await context.ReadOrInitStateAsync("attempt", () => 0, cancellationToken);
attempt++;
await context.QueueStateUpdateAsync("attempt", attempt, cancellationToken);
var prompt = $"""
你是一位电商客服。请针对以下客户问题生成一条简短回复:
客户问题:{_ticket.Query}
产品类别:{_ticket.Category}
直接返回回复内容,不要添加任何前缀或说明。
""";
var response = await _chatClient.GetResponseAsync(prompt, cancellationToken: cancellationToken);
Console.WriteLine($"第 {attempt} 次生成回复草稿完成");
var content = response.Text ?? "抱歉,我们会尽快处理您的问题。";
Console.WriteLine($"回复内容:{content}");
var draft = new ReplyDraft(_ticket.Id, content, attempt);
await context.SendMessageAsync(draft, targetId: "BaselineQualityCheck", cancellationToken);
}
}
internal sealed class BaselineQualityCheckExecutor : Executor<ReplyDraft>
{
private readonly int _politenessThreshold;
private readonly int _accuracyThreshold;
private readonly IChatClient _chatClient;
public BaselineQualityCheckExecutor(int politenessThreshold, int accuracyThreshold, IChatClient chatClient)
: base("BaselineQualityCheck")
{
_politenessThreshold = politenessThreshold;
_accuracyThreshold = accuracyThreshold;
_chatClient = chatClient;
}
public override async ValueTask HandleAsync(ReplyDraft draft, IWorkflowContext context, CancellationToken cancellationToken = default)
{
int attempt = await context.ReadOrInitStateAsync("attempt", () => 1, cancellationToken);
// 使用 AI 进行多维度质检评分(结构化输出,严格标准)
var prompt = $"""
你是一位严格的客服质检专家。请对以下客服回复进行多维度评分(0-100分):
回复内容:{draft.Content}
评分维度和严格标准:
1. 礼貌度(0-100):必须包含称呼语(您、亲)、感谢语(感谢、谢谢)、结束语(祝、期待)等,语气亲和温暖
- 缺少任何一项扣20分
- 语气生硬、机械扣10-30分
2. 准确性(0-100):必须提供具体的解决方案、明确的处理时间或有效的后续步骤
- 只说"会处理"但无具体方案扣30分
- 无明确时间承诺扣20分
- 信息过于笼统扣10-20分
3. 合规性(通过/不通过):不得包含敏感词、不当表述、推诿责任的话语
- 发现任何敏感词或不当表述直接判定为"不通过"
请对每个维度进行严格评分,并在issues字段中列出所有发现的问题。
""";
// 使用结构化输出:GetResponseAsync<T> 自动生成 JSON Schema 并反序列化
var response = await _chatClient.GetResponseAsync<QualityReportDto>(prompt, cancellationToken: cancellationToken);
var reportDto = response.Result;
// 转换为业务模型
var issues = reportDto.Issues.Select(i => new QualityIssue(i.Type, i.Description, i.ScoreImpact)).ToList();
var report = new QualityReport(draft.TicketId, reportDto.PolitenessScore, reportDto.AccuracyScore, reportDto.CompliancePassed, issues);
await context.AddEventAsync(new BaselineQualityScoreEvent(draft.TicketId, attempt, reportDto.PolitenessScore, reportDto.AccuracyScore, reportDto.CompliancePassed), cancellationToken);
await context.QueueStateUpdateAsync("attempt", ++attempt, cancellationToken);
if (reportDto.PolitenessScore >= _politenessThreshold && reportDto.AccuracyScore >= _accuracyThreshold && reportDto.CompliancePassed)
{
await context.YieldOutputAsync(new TicketOutcome(draft.TicketId, "Approved", attempt, report), cancellationToken);
}
else
{
await context.SendMessageAsync(QCSignal.Revise, targetId: "BaselineReplyDraft", cancellationToken);
}
}
}
internal sealed class BaselineQualityScoreEvent : WorkflowEvent
{
public BaselineQualityScoreEvent(string ticketId, int attempt, int politenessScore, int accuracyScore, bool compliancePassed)
: base(new { TicketId = ticketId, Attempt = attempt, PolitenessScore = politenessScore, AccuracyScore = accuracyScore, CompliancePassed = compliancePassed })
{
}
}
Console.WriteLine("基础循环执行器定义完成(已集成 AI 结构化输出 + 严格质检标准)");
- 步骤 3:构建 Workflow 并验证基础 Loop,使用 WorkflowBuilder 将两个步骤连接成闭环,并运行单次请求,观察得分随循环变化。
// 获取 AI 客户端
var targetTicket = ticketRequests.First();
var draftExecutor = new BaselineReplyDraftExecutor(targetTicket, chatClient);
var qcExecutor = new BaselineQualityCheckExecutor(politenessThreshold, accuracyThreshold, chatClient);
var baselineWorkflow = new WorkflowBuilder(draftExecutor)
.AddEdge(draftExecutor, qcExecutor)
.AddEdge(qcExecutor, draftExecutor)
.WithOutputFrom(qcExecutor)
.Build();
await using (var baselineRun = await InProcessExecution.StreamAsync(baselineWorkflow, QCSignal.Init)){
var scoreTimeline = new List<object>();
await foreach (var evt in baselineRun.WatchStreamAsync())
{
// 强制中断(最多5次尝试)
if (scoreTimeline.Count == 5)
{
Console.WriteLine("强制中断工作流执行(已完成5次评估)");
break;
}
switch (evt)
{
case BaselineQualityScoreEvent scoreEvent:
dynamic payload = scoreEvent.Data!;
scoreTimeline.Add(new {
尝试次数 = payload.Attempt,
礼貌度 = payload.PolitenessScore,
准确性 = payload.AccuracyScore,
合规性 = payload.CompliancePassed ? "✅" : "❌"
});
Console.WriteLine($"AI 质检结果 => 礼貌度:{payload.PolitenessScore} 准确性:{payload.AccuracyScore} 合规性:{(payload.CompliancePassed ? "通过" : "不通过")}");
break;
case WorkflowOutputEvent outputEvent:
Console.WriteLine("工作流完成");
outputEvent.Data.Display();
break;
}
}
步骤 4:扩展自我修正节点
为了解决"质检发现问题但无修复动作"的缺陷,我们新增 IntelligentImproveExecutor:
- 根据质检报告的 Issues 列表,针对性改进回复内容
- 礼貌度问题 → 优化语气,添加称呼和感谢语
- 准确性问题 → 补充具体信息和解决方案
- 合规性问题 → 移除敏感词,规范表述
- 将改进后的草稿反馈给质检节点,形成更智能的 Loop
internal sealed class AdaptiveReplyDraftExecutor : Executor<QCSignal>
{
private readonly TicketRequest _ticket;
private readonly IChatClient _chatClient;
- public AdaptiveReplyDraftExecutor(TicketRequest ticket, IChatClient chatClient) : base("AdaptiveReplyDraft")
{
_ticket = ticket;
_chatClient = chatClient;
}
public override async ValueTask HandleAsync(QCSignal message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
int attempt = await context.ReadOrInitStateAsync("attempt", () => 0, cancellationToken);
attempt++;
await context.QueueStateUpdateAsync("attempt", attempt, cancellationToken);
// 使用 AI 生成客服回复(渐进式生成策略)
var prompt = attempt == 1
? $"""
你是一位电商客服。请针对以下客户问题生成一条简短回复(刻意保持简短、缺少礼貌用语):
客户问题:{_ticket.Query}
产品类别:{_ticket.Category}
要求:
1. 只用1-2句话回答,不要称呼语和感谢语
2. 只说结论,不提供具体处理时间
3. 字数控制在30字以内
直接返回回复内容,不要添加任何前缀或说明。
"""
: $"""
你是一位专业的电商客服。请针对以下客户问题生成一条改进后的回复:
客户问题:{_ticket.Query}
产品类别:{_ticket.Category}
优先级:{_ticket.Priority}
要求:
1. 语气亲和、专业,使用恰当的称呼和感谢语
2. 提供具体的解决方案或处理时间
3. 符合客服规范,不包含敏感词
4. 字数控制在80-100字
直接返回回复内容,不要添加任何前缀或说明。
""";
var response = await _chatClient.GetResponseAsync(prompt, cancellationToken: cancellationToken);
var content = response.Text ?? "抱歉,我们会尽快处理您的问题。";
Console.WriteLine($"第 {attempt} 次生成回复草稿 (策略: {(attempt == 1 ? "简化版" : "完整版")})");
Console.WriteLine($"回复内容:{content}");
var draft = new ReplyDraft(_ticket.Id, content, attempt);
await context.SendMessageAsync(draft, targetId: "AdaptiveQualityCheck", cancellationToken);
}
}
internal sealed class IntelligentImproveExecutor : Executor<QualityReport>
{
private readonly TicketRequest _ticket;
private readonly IChatClient _chatClient;
public IntelligentImproveExecutor(TicketRequest ticket, IChatClient chatClient) : base("IntelligentImprove")
{
_ticket = ticket;
_chatClient = chatClient;
}
public override async ValueTask HandleAsync(QualityReport report, IWorkflowContext context, CancellationToken cancellationToken = default)
{
int attempt = await context.ReadOrInitStateAsync("attempt", () => 1, cancellationToken);
// 构建改进提示词,基于质检反馈
var issuesSummary = string.Join("\n", report.Issues.Select(i => $"- {i.Type}: {i.Description}"));
var prompt = $"""
你是一位客服优化专家。请根据以下质检反馈,改进客服回复内容:
原始问题:{_ticket.Query}
产品类别:{_ticket.Category}
优先级:{_ticket.Priority}
当前评分:
- 礼貌度:{report.PolitenessScore}/100 (要求≥{politenessThreshold})
- 准确性:{report.AccuracyScore}/100 (要求≥{accuracyThreshold})
- 合规性:{(report.CompliancePassed ? "通过" : "不通过")}
发现的问题:
{issuesSummary}
请生成一条改进后的客服回复,针对性解决上述问题:
1. 如果礼貌度不足,增加称呼语、感谢语,使用更亲和的表述
2. 如果准确性不足,补充具体的解决方案、处理时间、后续步骤
3. 如果合规性不通过,移除敏感词,规范表述
4. 字数控制在80-100字
直接返回改进后的回复内容,不要添加任何前缀或说明。
""";
var response = await _chatClient.GetResponseAsync(prompt, cancellationToken: cancellationToken);
var improvedContent = response.Text ?? "抱歉,我们会尽快处理您的问题。";
await context.AddEventAsync(new LoopProgressEvent(_ticket.Id, attempt, report.PolitenessScore, report.AccuracyScore, report.CompliancePassed, "Improve"), cancellationToken);
Console.WriteLine($"第 {attempt} 次智能改进完成");
Console.WriteLine($"改进后内容:{improvedContent}");
// 触发下一次生成(使用改进后的内容作为上下文)
await context.SendMessageAsync(QCSignal.Revise, targetId: "AdaptiveReplyDraft", cancellationToken);
}
}
internal sealed class LoopProgressEvent : WorkflowEvent
{
public LoopProgressEvent(string ticketId, int attempt, int politenessScore, int accuracyScore, bool compliancePassed, string stage)
: base(new { ticketId, attempt, politenessScore, accuracyScore, compliancePassed, stage })
{
}
}
Console.WriteLine("自适应生成器和智能改进器定义完成");
internal sealed class AdaptiveQualityCheckExecutor : Executor<ReplyDraft>
{
private readonly int _politenessThreshold;
private readonly int _accuracyThreshold;
private readonly int _maxAttempts;
private readonly IChatClient _chatClient;
// 默认最大尝试次数为5次
public AdaptiveQualityCheckExecutor(int politenessThreshold, int accuracyThreshold, IChatClient chatClient)
: this(politenessThreshold, accuracyThreshold, 5, chatClient)
{
}
public AdaptiveQualityCheckExecutor(int politenessThreshold, int accuracyThreshold, int maxAttempts, IChatClient chatClient)
: base("AdaptiveQualityCheck")
{
_politenessThreshold = politenessThreshold;
_accuracyThreshold = accuracyThreshold;
_maxAttempts = maxAttempts;
_chatClient = chatClient;
}
public override async ValueTask HandleAsync(ReplyDraft draft, IWorkflowContext context, CancellationToken cancellationToken = default)
{
int attempt = await context.ReadOrInitStateAsync("attempt", () => 1, cancellationToken);
// 使用 AI 进行多维度质检评分(结构化输出,严格标准)
var prompt = $"""
你是一位严格的客服质检专家。请对以下客服回复进行多维度评分(0-100分):
回复内容:{draft.Content}
评分维度和严格标准:
1. 礼貌度(0-100):必须包含称呼语(您、亲)、感谢语(感谢、谢谢)、结束语(祝、期待)等,语气亲和温暖
- 缺少任何一项扣20分
- 语气生硬、机械扣10-30分
2. 准确性(0-100):必须提供具体的解决方案、明确的处理时间或有效的后续步骤
- 只说"会处理"但无具体方案扣30分
- 无明确时间承诺扣20分
- 信息过于笼统扣10-20分
3. 合规性(通过/不通过):不得包含敏感词、不当表述、推诿责任的话语
- 发现任何敏感词或不当表述直接判定为"不通过"
请对每个维度进行严格评分,并在issues字段中列出所有发现的问题。
""";
// 使用结构化输出:GetResponseAsync<T> 自动生成 JSON Schema 并反序列化
var response = await _chatClient.GetResponseAsync<QualityReportDto>(prompt, cancellationToken: cancellationToken);
var reportDto = response.Result;
// 转换为业务模型
var issues = reportDto.Issues.Select(i => new QualityIssue(i.Type, i.Description, i.ScoreImpact)).ToList();
var report = new QualityReport(draft.TicketId, reportDto.PolitenessScore, reportDto.AccuracyScore, reportDto.CompliancePassed, issues);
await context.AddEventAsync(new AdaptiveQualityScoreEvent(draft.TicketId, attempt, reportDto.PolitenessScore, reportDto.AccuracyScore, reportDto.CompliancePassed), cancellationToken);
if (reportDto.PolitenessScore >= _politenessThreshold && reportDto.AccuracyScore >= _accuracyThreshold && reportDto.CompliancePassed)
{
await context.YieldOutputAsync(new TicketOutcome(draft.TicketId, "Approved", attempt, report), cancellationToken);
}
else if (attempt >= _maxAttempts)
{
await context.AddEventAsync(new AdaptiveMaxAttemptsReachedEvent(draft.TicketId, _maxAttempts), cancellationToken);
await context.RequestHaltAsync();//质检失败:已达到最大尝试次数
}
else
{
await context.QueueStateUpdateAsync("attempt", attempt + 1, cancellationToken);
// 发送质检报告到改进环节
await context.SendMessageAsync(report, targetId: "IntelligentImprove", cancellationToken);
}
}
}
internal sealed class AdaptiveQualityScoreEvent : WorkflowEvent
{
public AdaptiveQualityScoreEvent(string ticketId, int attempt, int politenessScore, int accuracyScore, bool compliancePassed)
: base(new { TicketId = ticketId, Attempt = attempt, PolitenessScore = politenessScore, AccuracyScore = accuracyScore, CompliancePassed = compliancePassed })
{
}
}
internal sealed class AdaptiveMaxAttemptsReachedEvent : WorkflowEvent
{
public AdaptiveMaxAttemptsReachedEvent(string ticketId, int maxAttempts)
: base(new { TicketId = ticketId, MaxAttempts = maxAttempts })
{
}
}
Console.WriteLine("自适应质检执行器定义完成(已集成 AI 结构化输出 + 严格质检标准)");
步骤 5:运行自我修正 Loop(流式监控)
使用 RunStreamingAsync 获取实时事件,区分 Review 阶段与 AutoFix 阶段,输出更丰富的轨迹信息。
var adaptiveDraft = new AdaptiveReplyDraftExecutor(targetTicket, chatClient);
var adaptiveQC = new AdaptiveQualityCheckExecutor(politenessThreshold, accuracyThreshold, chatClient);
var intelligentImprove = new IntelligentImproveExecutor(targetTicket, chatClient);
var adaptiveWorkflow = new WorkflowBuilder(adaptiveDraft)
.AddEdge(adaptiveDraft, adaptiveQC)
.AddEdge(adaptiveQC, intelligentImprove)
.AddEdge(intelligentImprove, adaptiveDraft)
.WithOutputFrom(adaptiveQC)
.Build();
await using (var adaptiveRun = await InProcessExecution.StreamAsync(adaptiveWorkflow, QCSignal.Init)){
var adaptiveTimeline = new List<object>();
await foreach (var evt in adaptiveRun.WatchStreamAsync())
{
switch (evt)
{
case AdaptiveQualityScoreEvent scoreEvent:
dynamic scoreData = scoreEvent.Data!;
adaptiveTimeline.Add(new {
工单 = scoreData.TicketId,
尝试 = scoreData.Attempt,
礼貌度 = scoreData.PolitenessScore,
准确性 = scoreData.AccuracyScore,
合规 = scoreData.CompliancePassed ? "✅" : "❌",
阶段 = "质检"
});
Console.WriteLine($"AI 质检结果 => 礼貌度:{scoreData.PolitenessScore} 准确性:{scoreData.AccuracyScore} 合规性:{(scoreData.CompliancePassed ? "通过" : "不通过")}");
break;
case WorkflowOutputEvent outputEvent:
Console.WriteLine("AI 自我修正流程结束");
outputEvent.Data.Display();
break;
}
}
4. 高级场景与最佳实践
Loop 设计守则
明确退出条件:结合最大尝试次数、多维度阈值、人工信号,避免无限循环。
多维度质检:礼貌度、准确性、合规性等多个指标独立评估,全部达标才通过
状态最小可见:使用 ReadOrInitStateAsync + QueueStateUpdateAsync 将指标限定在必要范围,防止脏读。
阶段事件:自定义 LoopProgressEvent,方便前端 UI 或监控系统订阅。
转人工兜底:超过最大尝试次数后,通过 RequestHaltAsync 触发人工介入。
常见错误与解决
忘记清理状态:循环结束后可调用 QueueClearScopeAsync 清理历史,避免下一次 run 复用旧数据。
类型不匹配:Edge 之间的消息类型必须与 Executor 泛型对应,必要时添加 Adapter。
重复触发 Halt:RequestHaltAsync 只在关键节点调用一次,防止产生过多暂停事件。
质检标准不合理:阈值设置过高导致永远无法通过,过低则失去质量保障意义。
性能优化建议
并行质检:可在 Loop 内拆分礼貌度/准确性/合规性多条并行链路,然后使用 Fan-In 汇总。
缓存改进策略:若相同问题类型多次出现,可在状态中缓存改进模板以加速循环。
指标上报:结合 OpenTelemetry 记录 Loop 时长、成功率、转人工率。
自适应阈值:根据历史数据动态调整质检标准,平衡效率与质量。
AI 评分最佳实践
结构化输出:使用 JSON 格式确保 AI 返回可解析的结果,避免自由文本。
Prompt 工程:明确评分标准和范围(0-100),提供评分示例。
异常处理:JSON 解析失败时使用默认评分,记录日志便于排查。
成本控制:评估 Token 消耗,考虑使用更小的模型(如 gpt-4o-mini)进行质检。
一致性保障:在 Prompt 中强调评分标准的一致性,避免同样内容得分波动过大。
上下文传递:将改进历史传递给 AI,让后续改进更有针对性。
五、并行工作流
1. 业务场景:电商多平台价格监控与决策系统
- 背景:跨境电商公司需要实时监控同一商品在亚马逊、eBay、Shopee等多个平台的定价策略,在检测到竞争对手降价时快速做出响应决策。
- 角色:Amazon价格Agent、eBay价格Agent、Shopee价格Agent、定价策略聚合Executor。
- 挑战:
- 多平台API查询需要在3秒内全部返回(串行执行需10+秒);
- 每个平台的数据格式不同,需要标准化处理;
- 聚合后需要给出智能调价建议,而非简单罗列数据。
- 目标:构建 Fan-out(并发查询)+ Fan-in(策略汇总)工作流,实现一次查询、并行抓取、智能决策的企业级模式。
2. 核心概念速览
| 概念 | 中文释义 | 在并发模式中的作用 |
|---|---|---|
| Fan-out Edge | 并发分支 | 将商品查询请求广播给多个平台Agent |
| Fan-in Edge | 汇聚边 | 收集所有平台价格数据,交由策略Executor分析 |
| ChatClientAgent | LLM 执行器 | 模拟平台API返回,生成价格与库存信息 |
| Aggregation Executor | 聚合执行器 | 组合多平台数据,计算最优定价策略 |
| Streaming Events | 流式事件 | 实时监控各平台查询状态,快速定位超时 |
3. 实践流程
步骤 1:建模业务问题与输入数据
先把商品查询请求抽象成
PriceQuery,确保 Fan-out 阶段广播的是结构化数据,包含商品ID、区域等关键参数。
internal record PriceQuery(string ProductId, string ProductName, string TargetRegion);
var priceQuery = new PriceQuery(
ProductId: "IPHONE15-PRO-256",
ProductName: "iPhone 15 Pro 256GB",
TargetRegion: "US"
);
步骤 2:定义 Agent 阵列与自定义 Executor
PlatformPriceExecutor 封装平台查询逻辑,模拟各平台API响应;
PriceQueryStartExecutor 负责广播查询请求并发放 TurnToken;
PricingStrategyExecutor 做 Fan-in 汇总,生成智能定价建议报告。
// 定义自定义 Executor 包装 Agent 功能
internal sealed class PlatformPriceExecutor(string id, IChatClient chatClient, string platformInstructions) : Executor<ChatMessage>(id)
{
private readonly IChatClient _chatClient = chatClient;
private readonly string _instructions = platformInstructions;
public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var messages = new List<ChatMessage>
{
new(ChatRole.System, _instructions),
message
};
var response = await _chatClient.GetResponseAsync(messages, cancellationToken: cancellationToken);
var replyMessage = new ChatMessage(ChatRole.Assistant, response.Text ?? string.Empty)
{
AuthorName = this.Id
};
await context.SendMessageAsync(replyMessage, cancellationToken: cancellationToken);
Console.WriteLine($"{this.Id} 完成查询");
}
}
var amazonExecutor = new PlatformPriceExecutor(
"AmazonPriceAgent",
chatClient,
"你是Amazon平台价格查询Agent。返回格式:价格=$XXX,库存状态=充足/紧张,配送说明=Prime会员免运费/标准配送。"
);
var ebayExecutor = new PlatformPriceExecutor(
"eBayPriceAgent",
chatClient,
"你是eBay平台价格查询Agent。返回格式:价格=$XXX,商品状态=全新/二手XX新,运费说明=包邮/买家承担。"
);
var shopeeExecutor = new PlatformPriceExecutor(
"ShopeePriceAgent",
chatClient,
"你是Shopee平台价格查询Agent。返回格式:价格=$XXX(含税),区域=东南亚/台湾,促销信息=满减活动/无。"
);
internal sealed class PriceQueryStartExecutor() : Executor<PriceQuery>(nameof(PriceQueryStartExecutor))
{
public override async ValueTask HandleAsync(PriceQuery query, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var userPrompt = $@"商品ID: {query.ProductId}
商品名称: {query.ProductName}
目标区域: {query.TargetRegion}
请查询该商品在你的平台上的当前价格、库存状态和配送信息。";
await context.SendMessageAsync(new ChatMessage(ChatRole.User, userPrompt), cancellationToken: cancellationToken);
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken);
Console.WriteLine("Fan-out 价格查询广播已发送");
}
}
// 修改为接收单个 ChatMessage,而不是 List<ChatMessage>
internal sealed class PricingStrategyExecutor(int targetCount) : Executor<ChatMessage>(nameof(PricingStrategyExecutor))
{
private readonly List<ChatMessage> _messages = [];
private readonly int _targetCount = targetCount;
public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.Add(message);
Console.WriteLine($"已收集 {_messages.Count}/{_targetCount} 个平台数据 - 来自 {message.AuthorName}");
if (this._messages.Count == this._targetCount)
{
var platformData = string.Join("\n", this._messages.Select(m => $"• {m.AuthorName}: {m.Text}"));
var strategyReport = $@"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
多平台价格汇总(共 {this._messages.Count} 个平台)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
{platformData}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
智能定价建议
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
基于以上数据,建议分析竞争对手价格区间,制定差异化定价策略。
考虑因素:库存压力、配送成本、平台佣金率、目标利润率。";
await context.YieldOutputAsync(strategyReport, cancellationToken);
Console.WriteLine("Fan-in 定价策略生成完成");
}
}
}
步骤 3:构建 Fan-out / Fan-in 工作流
使用 WorkflowBuilder 将广播、并发 Agent、聚合三个阶段串联成完整拓扑,并指定输出来自聚合 Executor。
public Workflow GetWorkflow()
{
var startExecutor = new PriceQueryStartExecutor();
var strategyExecutor = new PricingStrategyExecutor(3);
var priceMonitorWorkflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, [amazonExecutor, ebayExecutor, shopeeExecutor])
.AddFanInEdge([amazonExecutor, ebayExecutor, shopeeExecutor], strategyExecutor)
.WithOutputFrom(strategyExecutor)
.Build();
Console.WriteLine("价格监控 Workflow 构建完成");
return priceMonitorWorkflow;
}
- 步骤 4:Execute vs Stream,观察并发执行细节
- 先执行一次非流式任务获取最终报告,再切换到流式模式及时感知每个 Executor 的生命周期事件。
flowchart TD PriceQueryStartExecutor["PriceQueryStartExecutor (Start)"]; AmazonPriceAgent["AmazonPriceAgent"]; eBayPriceAgent["eBayPriceAgent"]; ShopeePriceAgent["ShopeePriceAgent"]; PricingStrategyExecutor["PricingStrategyExecutor"]; fan_in_PricingStrategyExecutor_719E34D9((fan-in)) AmazonPriceAgent --> fan_in_PricingStrategyExecutor_719E34D9; ShopeePriceAgent --> fan_in_PricingStrategyExecutor_719E34D9; eBayPriceAgent --> fan_in_PricingStrategyExecutor_719E34D9; fan_in_PricingStrategyExecutor_719E34D9 --> PricingStrategyExecutor; PriceQueryStartExecutor --> AmazonPriceAgent; PriceQueryStartExecutor --> eBayPriceAgent; PriceQueryStartExecutor --> ShopeePriceAgent;
var pricingReport = await InProcessExecution.RunAsync(priceMonitorWorkflow, priceQuery);
new
{
priceQuery.ProductName,
priceQuery.TargetRegion,
定价策略报告 = pricingReport
}.Display();
var status = await pricingReport.GetStatusAsync();
await pricingReport.DisposeAsync();
Console.WriteLine("StreamAsync:实时监控平台查询进度");
var priceMonitorWorkflow = GetWorkflow();
await using (var streamingRun = await InProcessExecution.StreamAsync(priceMonitorWorkflow, priceQuery))
{
await foreach (WorkflowEvent evt in streamingRun.WatchStreamAsync())
{
switch (evt)
{
case ExecutorInvokedEvent started:
Console.WriteLine($"{started.ExecutorId} 启动");
break;
case ExecutorCompletedEvent completed:
Console.WriteLine($"{completed.ExecutorId} 完成");
break;
case WorkflowOutputEvent output:
Console.WriteLine("Fan-in 汇总输出:");
Console.WriteLine(output.Data);
break;
}
}
await streamingRun.DisposeAsync();
}
Console.WriteLine("并发执行演示完成");
4. 高级实践与最佳实践
- 性能监控:在聚合 Executor 中追加 Stopwatch,记录最慢平台API响应时间,作为SLA监控依据。
- 动态平台:agentRoster 可根据商品类目动态调整,例如3C产品加入京东Agent,服装类加入Zalando Agent。
- 降级策略:搭配自我修正模式,当某个平台API超时时可使用缓存数据或标记为"暂无数据"。
- 实时告警:streamingRun 输出可接入监控系统,当检测到竞对降价超过5%时触发Slack/钉钉通知。
- 缓存优化:为高频查询商品设置Redis缓存(TTL=30秒),减少API调用成本。
六、多选工作流
1. 业务场景:内容多渠道分发引擎
- 背景:营销团队需要根据内容长度、风险级别、语言类型,将一份稿件同时推送到不同媒介。
- 角色:分类器 Executor、长文渠道、短文渠道、合规审核。
- 挑战:
- 一个决策可能需要同时推送多个渠道;
- 合规风险需要额外介入,但不能阻塞其他渠道;
- 必须记录每次路由的去向,方便 BI 分析。
- 目标:构建 Multi-Selection 工作流,让分类器一次决策即可触发多个下游执行器。
2. 核心概念
| 概念 | 中文释义 | 在 Multi-Selection 中的作用 |
|---|---|---|
| Target Assigner | 目标分配器 | 返回需要触发的下游索引集合 |
| Partition Fan-out | 分区式并发 | 按照 IEnumerable |
| Shared Output | 多路输出 | 可以一次返回多个渠道的执行结果 |
| Conditional Logging | 条件日志 | 记录每次路由路径,支持审计 |
3. 实践流程
- 步骤 1:建模内容请求与样本数据
internal record ContentSubmission(string Id, string Title, string Category, int Length, bool ContainsRisk, string Language);
internal sealed class DistributionPlan
{
public required string SubmissionId { get; init; }
public bool PublishToLongForm { get; init; }
public bool PublishToShortForm { get; init; }
public bool EscalateToModerator { get; init; }
public string Reason { get; init; } = string.Empty;
}
ContentSubmission[] submissions =
{
new("CNT-1001", "6月能耗报告", "Operations", 1200, false, "zh-CN"),
new("CNT-1002", "618 预热短文案", "Marketing", 260, false, "zh-CN"),
new("CNT-1003", "跨境补贴政策说明", "Compliance", 620, true, "en-US")
};
步骤 2:定义分类器与渠道 Executor
为演示重点放在路由逻辑,此处分类器使用简单规则(长度/风险),生产环境可替换为多 Agent 推理或参考源码中的 JSON Schema LLM。
using System.Threading;
internal sealed class ContentClassifierExecutor() : Executor<ContentSubmission, DistributionPlan>(nameof(ContentClassifierExecutor))
{
public override ValueTask<DistributionPlan> HandleAsync(ContentSubmission submission, IWorkflowContext context, CancellationToken cancellationToken = default)
{
bool publishLong = submission.Length > 600;
bool publishShort = submission.Length <= 600 || submission.Category is "Marketing";
bool needModerator = submission.ContainsRisk || submission.Category is "Compliance";
return ValueTask.FromResult(new DistributionPlan
{
SubmissionId = submission.Id,
PublishToLongForm = publishLong,
PublishToShortForm = publishShort,
EscalateToModerator = needModerator,
Reason = needModerator ? "命中风险或合规模块" : "常规稿件"
});
}
}
internal sealed class LongFormChannelExecutor() : Executor<DistributionPlan>(nameof(LongFormChannelExecutor))
{
public override async ValueTask HandleAsync(DistributionPlan plan, IWorkflowContext context, CancellationToken cancellationToken = default)
{
await context.YieldOutputAsync($"" + plan.SubmissionId + " 发布到长文渠道", cancellationToken);
}
}
internal sealed class ShortFormChannelExecutor() : Executor<DistributionPlan>(nameof(ShortFormChannelExecutor))
{
public override async ValueTask HandleAsync(DistributionPlan plan, IWorkflowContext context, CancellationToken cancellationToken = default)
{
await context.YieldOutputAsync($"" + plan.SubmissionId + " 发布到短文渠道", cancellationToken);
}
}
internal sealed class ModeratorSignalExecutor() : Executor<DistributionPlan>(nameof(ModeratorSignalExecutor))
{
public override async ValueTask HandleAsync(DistributionPlan plan, IWorkflowContext context, CancellationToken cancellationToken = default)
{
await context.YieldOutputAsync($"{plan.SubmissionId} 触发人工审核:{plan.Reason}", cancellationToken);
}
}
步骤 3:构建 Multi-Selection 工作流
关键在于 Target Assigner,根据 DistributionPlan 返回需要触发的目标索引集合。
var classifier = new ContentClassifierExecutor();
var longFormExecutor = new LongFormChannelExecutor();
var shortFormExecutor = new ShortFormChannelExecutor();
var moderatorExecutor = new ModeratorSignalExecutor();
static Func<DistributionPlan?, int, IEnumerable<int>> BuildChannelRouter() => (plan, targetCount) =>
{
if (plan is null)
{
return Array.Empty<int>();
}
List<int> targets = [];
if (plan.PublishToLongForm) targets.Add(0);
if (plan.PublishToShortForm) targets.Add(1);
if (plan.EscalateToModerator) targets.Add(2);
return targets;
};
var multiSelectionWorkflow = new WorkflowBuilder(classifier)
.AddFanOutEdge(
classifier,
[longFormExecutor, shortFormExecutor, moderatorExecutor],
BuildChannelRouter())
.WithOutputFrom(longFormExecutor, shortFormExecutor, moderatorExecutor)
.Build();
Console.WriteLine("Multi-Selection Workflow 构建完成");
flowchart TD ContentClassifierExecutor["ContentClassifierExecutor (Start)"]; LongFormChannelExecutor["LongFormChannelExecutor"]; ShortFormChannelExecutor["ShortFormChannelExecutor"]; ModeratorSignalExecutor["ModeratorSignalExecutor"]; ContentClassifierExecutor --> LongFormChannelExecutor; ContentClassifierExecutor --> ShortFormChannelExecutor; ContentClassifierExecutor --> ModeratorSignalExecutor;
步骤 4:运行工作流并观察路由结果
分别演示批量执行与流式监控,确认一条稿件可以同时触发多条通路。
Console.WriteLine("批量执行 3 条稿件");
foreach (var submission in submissions)
{
await using var run = await InProcessExecution.StreamAsync(multiSelectionWorkflow, submission);
List<string> outputs = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent outputEvent)
{
outputs.Add(outputEvent.Data?.ToString() ?? string.Empty);
}
}
new
{
submission.Id,
submission.Category,
submission.Length,
发布路径 = outputs
}.Display();
}
Console.WriteLine("单条稿件的实时事件");
var spotlightSubmission = submissions.Last();
await using (var spotlightRun = await InProcessExecution.StreamAsync(multiSelectionWorkflow, spotlightSubmission)){
await foreach (WorkflowEvent evt in spotlightRun.WatchStreamAsync())
{
switch (evt)
{
case ExecutorInvokedEvent started:
Console.WriteLine($"{started.ExecutorId} 启动");
break;
case ExecutorCompletedEvent completed:
Console.WriteLine($" {completed.ExecutorId} 完成");
break;
case WorkflowOutputEvent outputEvent:
Console.WriteLine($"输出: {outputEvent.Data}");
break;
}
}
}
Console.WriteLine("Multi-Selection 演示结束");
4. 最佳实践
- 使用 JSON Schema LLM:将 ContentClassifierExecutor 替换为 ChatClientAgent + ChatResponseFormat.ForJsonSchema,提升弹性。
- 记录路由链路:利用 WorkflowContext.QueueStateUpdateAsync 保留每一步所选通路,方便追溯。
- 组合其他模式:与并发执行结合,实现“多路选择 + 每路继续并行”复杂场景。
- 可观测性:将 outputs 写入 Application Insights,构建渠道 KPI 看板。
七、Map Reduce 工作流
1. 业务场景:企业安全白皮书汇总
- 背景:安全团队每周生成数万字白皮书,需要自动拆分章节、并行摘要并最终生成执行摘要。
- 角色:拆分 Executor、多个摘要 Agent、共识归并、发布器。
- 挑战:
- 文本极长,单次推理成本高;
- 需要保持段落顺序,避免信息错位;
- 汇总阶段要补充 KPI 与推荐动作。
- 目标:用 MapReduce 将巨文拆为可控块,Mapper 并行摘要,Reducer 组合并打上标签。
2. 核心概念
| 概念 | 中文释义 | 在本课的作用 |
|---|---|---|
| Chunk Splitter | 文本拆分器 | Map 阶段入口,写入共享状态并广播事件 |
| DocumentSummarizer | 摘要执行器 | Map 阶段,处理分配到的段落 |
| Consensus Reducer | 归并器 | Reduce 阶段,收敛所有摘要并排序 |
| Publisher | 发布器 | 将 Reduce 结果格式化输出 |
| Shared State | 共享状态 | 存放段落文本、总段数,用于协调并发 |
3. 实践流程
- 步骤 1:准备长文档样本
internal static class DocumentState
{
public const string Scope = "DocMapReduce";
public const string TotalChunksKey = "TOTAL_CHUNKS";
}
internal record ChunkEnvelope(string ChunkStateKey, string Text, int Order);
internal sealed class ChunkReadyEvent(string ChunkStateKey, int Order) : WorkflowEvent
{
public string ChunkStateKey { get; } = ChunkStateKey;
public int Order { get; } = Order;
}
internal sealed class ChunkSummaryCompletedEvent(int Order, string Summary) : WorkflowEvent
{
public int Order { get; } = Order;
public string Summary { get; } = Summary;
}
internal sealed class ReduceCompletedEvent(IReadOnlyList<string> Summaries) : WorkflowEvent
{
public IReadOnlyList<string> Summaries { get; } = Summaries;
}
- 步骤 2:实现 Map 阶段(拆分 + 摘要)
internal sealed class ChunkSplitterExecutor(string[] summarizerIds) : Executor<string>(nameof(ChunkSplitterExecutor))
{
private readonly string[] _summarizerIds = summarizerIds;
public override async ValueTask HandleAsync(string manuscript, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var paragraphs = manuscript
.Split(["\r\n\r\n", "\n\n"], StringSplitOptions.RemoveEmptyEntries)
.Select(p => p.Trim())
.Where(p => !string.IsNullOrWhiteSpace(p))
.ToArray();
await context.QueueStateUpdateAsync(DocumentState.TotalChunksKey, paragraphs.Length, scopeName: DocumentState.Scope, cancellationToken);
Console.WriteLine($"Map 阶段:段落数 = {paragraphs.Length}");
for (int i = 0; i < paragraphs.Length; i++)
{
var targetId = this._summarizerIds[i % this._summarizerIds.Length];
var chunkStateKey = $"chunk_{i}"; // 简化键,避免ID耦合
var envelope = new ChunkEnvelope(chunkStateKey, paragraphs[i], i);
await context.QueueStateUpdateAsync(chunkStateKey, envelope, scopeName: DocumentState.Scope, cancellationToken);
Console.WriteLine($"发送 ChunkReadyEvent(order={i}) 到 {targetId}");
await context.SendMessageAsync(new ChunkReadyEvent(chunkStateKey, i), targetId: targetId, cancellationToken: cancellationToken);
}
Console.WriteLine($"Map 阶段:已拆分 {paragraphs.Length} 个段落");
}
}
internal sealed class DocumentSummarizerExecutor(string id, string reducerId) : Executor<ChunkReadyEvent>(id)
{
private readonly string _reducerId = reducerId;
public override async ValueTask HandleAsync(ChunkReadyEvent message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"{this.Id} 收到 ChunkReadyEvent(order={message.Order})");
var envelope = await context.ReadStateAsync<ChunkEnvelope>(message.ChunkStateKey, scopeName: DocumentState.Scope, cancellationToken);
var summary = Summarize(envelope!.Text);
await context.SendMessageAsync(new ChunkSummaryCompletedEvent(envelope.Order, summary), targetId: this._reducerId, cancellationToken: cancellationToken);
Console.WriteLine($"{this.Id} 完成段落 {envelope.Order}");
}
private static string Summarize(string text)
{
var sentences = text.Split(['。', '!', '?'], StringSplitOptions.RemoveEmptyEntries);
var focus = sentences.Length > 0 ? sentences[0] : text;
var trimmed = focus.Length > 80 ? focus[..80] + "..." : focus;
var normalized = trimmed.Replace("\r", string.Empty).Replace("\n", " ").Trim();
return $"• {normalized}";
}
}
- 步骤 3:实现 Reduce & Publish 阶段
internal sealed class ConsensusReducerExecutor(string id, string publisherId) : Executor<ChunkSummaryCompletedEvent>(id)
{
private readonly SortedDictionary<int, string> _summaries = new();
private readonly string _publisherId = publisherId;
private int? _expectedChunks;
public override async ValueTask HandleAsync(ChunkSummaryCompletedEvent message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
_expectedChunks ??= await context.ReadStateAsync<int>(DocumentState.TotalChunksKey, scopeName: DocumentState.Scope, cancellationToken);
_summaries[message.Order] = message.Summary;
Console.WriteLine($"Reduce 进度: {_summaries.Count}/{_expectedChunks}");
if (_expectedChunks.HasValue && _summaries.Count >= _expectedChunks.Value)
{
var ordered = _summaries.OrderBy(kvp => kvp.Key).Select(kvp => kvp.Value).ToList();
await context.SendMessageAsync(new ReduceCompletedEvent(ordered), targetId: this._publisherId, cancellationToken: cancellationToken);
Console.WriteLine("Reduce 阶段完成");
}
}
}
internal sealed class DocumentPublisherExecutor(string id) : Executor<ReduceCompletedEvent>(id)
{
public override async ValueTask HandleAsync(ReduceCompletedEvent message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var builder = new StringBuilder();
builder.AppendLine("《本周安全白皮书摘要》");
builder.AppendLine(new string('━', 30));
int order = 1;
foreach (var summary in message.Summaries)
{
builder.AppendLine($"{order++}. {summary}");
}
builder.AppendLine(new string('━', 30));
builder.AppendLine("推荐动作:请检查 OTA 签名、同步 SOC 新规则、与供应链共享告警。");
await context.YieldOutputAsync(builder.ToString(), cancellationToken);
}
}
步骤 4:构建 Workflow 并运行
三组 Map 执行器并行处理段落,Reduce 负责聚合,Publish 统一输出。
string[] summarizerIds = ["mapper_secops", "mapper_rnd", "mapper_compliance"];
var reducerId = "reducer_consensus";
var publisherId = "publisher_whitepaper";
var splitter = new ChunkSplitterExecutor(summarizerIds);
var summarizers = summarizerIds.Select(id => new DocumentSummarizerExecutor(id, reducerId)).ToArray();
var reducer = new ConsensusReducerExecutor(reducerId, publisherId);
var publisher = new DocumentPublisherExecutor(publisherId);
var mapReduceWorkflow = new WorkflowBuilder(splitter)
.AddFanOutEdge(splitter, [..summarizers])
.AddFanInEdge([..summarizers], reducer)
.AddEdge(reducer, publisher)
.WithOutputFrom(publisher)
.Build();
flowchart TD ChunkSplitterExecutor["ChunkSplitterExecutor (Start)"]; mapper_secops["mapper_secops"]; mapper_rnd["mapper_rnd"]; mapper_compliance["mapper_compliance"]; reducer_consensus["reducer_consensus"]; publisher_whitepaper["publisher_whitepaper"]; fan_in_reducer_consensus_26615B5E((fan-in)) mapper_compliance --> fan_in_reducer_consensus_26615B5E; mapper_rnd --> fan_in_reducer_consensus_26615B5E; mapper_secops --> fan_in_reducer_consensus_26615B5E; fan_in_reducer_consensus_26615B5E --> reducer_consensus; ChunkSplitterExecutor --> mapper_secops; ChunkSplitterExecutor --> mapper_rnd; ChunkSplitterExecutor --> mapper_compliance; reducer_consensus --> publisher_whitepaper;
string manuscript = """
第一段:本周我们在生产环境中检测到三起高危漏洞利用尝试,分别针对身份验证与API速率限制。我们已临时封禁相关IP,并更新WAF规则以缓解风险。
第二段:研发团队完成了对零信任网络访问策略的回顾,新增设备基线检查与会话时长限制,预计下周进入灰度发布阶段。
第三段:合规方面,依据最新的行业标准,我们优化了日志保留周期与隐私数据访问审批流程,减少人工审批时间。
第四段:对外联动方面,已与供应链伙伴同步威胁情报,建议对固件OTA签名策略进行交叉验证,并提升告警共享频率。
""";
Console.WriteLine("长文档样本文本已准备");
await using (var run = await InProcessExecution.StreamAsync(mapReduceWorkflow, manuscript)){
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorInvokedEvent started:
Console.WriteLine($"{started.ExecutorId} started");
break;
case ExecutorCompletedEvent completed:
Console.WriteLine($"{completed.ExecutorId} completed");
break;
case WorkflowOutputEvent outputEvent:
Console.WriteLine("最终摘要:");
Console.WriteLine(outputEvent.Data);
break;
default:
Console.WriteLine($"事件: {evt.GetType().Name}");
break;
}
}
}
Console.WriteLine("MapReduce 演示完成");
4. 最佳实践
- 共享状态命名空间:使用 Scope 隔离任务,避免与其他 Workflow 相互影响。
- 持久化中间结果:长文档/大数据场景可参考源码将 Map/Reduce 中间件落地磁盘。
- LLM 评估:Map 阶段可接入 AIClientHelper.GetDefaultChatClient(),通过 ChatResponseFormat 生成结构化摘要。
- 可视化:利用 mapReduceWorkflow.ToMermaidString() 输出拓扑,供架构评审使用。
八、子工作流
1. 业务场景:客户投诉智能处理流水线
- 背景:企业客服中心每天收到大量投诉,需要根据类型(产品质量/物流问题)分发到不同的标准化处理子流程。
- 角色:投诉分类器 → 产品质量子流程 / 物流问题子流程 → 合规审核 → 情绪评估。
- 目标:将处理子流程封装为可复用的 Workflow,主流程统一管理路由和审核,所有处理记录写入共享状态。
- AI 能力:使用真实 AI 生成回复模板、分析投诉情绪、提供处理建议。
2. 核心概念
| 概念 | 中文释义 | 场景价值 |
|---|---|---|
| Sub-Workflow | 子工作流 | 封装标准化处理流程,多渠道复用 |
| ExecutorBinding | 执行器绑定 | 将子工作流作为单个 Executor 暴露 |
| Conditional Routing | 条件路由 | 根据投诉类型分发到不同子流程 |
| Shared State | 共享状态 | 投诉记录在各执行器间传递和更新 |
| AI Integration | AI 集成 | 真实 AI 生成回复模板和分析建议 |
3. 实践路径
- 步骤 1:建模投诉数据与共享状态
// 投诉数据模型
internal record CustomerComplaint(
string OrderId,
string CustomerName,
string ComplaintText,
DateTime SubmittedAt
);
// 共享状态:投诉处理记录(各执行器会更新此对象)
internal class ComplaintProcessingRecord
{
public CustomerComplaint Original { get; set; }
public string Category { get; set; } = "未分类";
public string Handler { get; set; } = "待分配";
public List<string> ProcessingSteps { get; set; } = new();
public string AIGeneratedResponse { get; set; } = "";
public string ComplianceStatus { get; set; } = "待审核";
public string SentimentScore { get; set; } = "未评估";
}
var complaint = new CustomerComplaint(
OrderId: "ORD-2025-8821",
CustomerName: "张先生",
ComplaintText: "收到的手机屏幕有明显划痕,要求退货退款",
SubmittedAt: DateTime.Now
);
var processingRecord = new ComplaintProcessingRecord { Original = complaint };
- 步骤 2:实现产品质量处理子工作流(评估 → 退换货判定 → AI生成回复)
// === 产品质量子工作流 ===
// 1. 问题评估执行器
internal sealed class ProductEvaluationExecutor() : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(ProductEvaluationExecutor))
{
public override ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine("正在评估产品质量问题...");
record.ProcessingSteps.Add("[产品评估] 检测到屏幕外观缺陷,符合质量问题定义");
record.Handler = "产品质量团队";
return ValueTask.FromResult(record);
}
}
// 2. 退换货判定执行器
internal sealed class ReturnPolicyExecutor() : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(ReturnPolicyExecutor))
{
public override ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine("判定退换货政策...");
var daysFromOrder = (DateTime.Now - record.Original.SubmittedAt).Days;
if (daysFromOrder <= 7)
{
record.ProcessingSteps.Add("[退换货判定] 符合7天无理由退货政策,批准全额退款");
}
else
{
record.ProcessingSteps.Add("[退换货判定] 超过退货期限,建议换货或部分补偿");
}
return ValueTask.FromResult(record);
}
}
// 3. AI生成回复执行器
internal sealed class AIResponseGeneratorExecutor(IChatClient chatClient) : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(AIResponseGeneratorExecutor))
{
public override async ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine("AI 正在生成客户回复...");
var prompt = $@"你是专业的客服主管。根据以下投诉处理信息,生成一封正式、有同理心的客户回复邮件(150字内):
客户:{record.Original.CustomerName}
订单号:{record.Original.OrderId}
投诉内容:{record.Original.ComplaintText}
处理步骤:
{string.Join("\n", record.ProcessingSteps)}
要求:
1. 表达歉意和理解
2. 说明处理方案
3. 提供后续联系方式
4. 语气真诚、专业";
var response = await chatClient.GetResponseAsync(prompt, cancellationToken: cancellationToken);
record.AIGeneratedResponse = response.Text ?? "AI 生成失败";
record.ProcessingSteps.Add($"[AI 回复] 已生成客户回复模板({record.AIGeneratedResponse.Length}字)");
return record;
}
}
var chatClient = AIClientHelper.GetDefaultChatClient();
var productEvalExecutor = new ProductEvaluationExecutor();
var returnPolicyExecutor = new ReturnPolicyExecutor();
var aiResponseExecutor = new AIResponseGeneratorExecutor(chatClient);
var productQualitySubWorkflow = new WorkflowBuilder(productEvalExecutor)
.AddEdge(productEvalExecutor, returnPolicyExecutor)
.AddEdge(returnPolicyExecutor, aiResponseExecutor)
.WithOutputFrom(aiResponseExecutor)
.Build();
var productQualitySubExecutor = productQualitySubWorkflow.BindAsExecutor("ProductQualitySubWorkflow");
- 步骤 3:实现物流问题处理子工作流(追踪 → 延迟分析 → AI生成回复)
// === 物流问题子工作流 ===
// 1. 物流追踪执行器
internal sealed class LogisticsTrackingExecutor() : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(LogisticsTrackingExecutor))
{
public override ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine("正在查询物流信息...");
record.ProcessingSteps.Add("[物流追踪] 包裹在中转站滞留3天,当前状态:运输中");
record.Handler = "物流运营团队";
return ValueTask.FromResult(record);
}
}
// 2. 延迟分析执行器
internal sealed class DelayAnalysisExecutor() : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(DelayAnalysisExecutor))
{
public override ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine("分析延迟原因...");
record.ProcessingSteps.Add("[延迟分析] 延迟原因:暴雨导致道路封闭,预计2天内恢复配送");
record.ProcessingSteps.Add("[补偿方案] 提供50元优惠券 + 免运费");
return ValueTask.FromResult(record);
}
}
var logisticsTrackExecutor = new LogisticsTrackingExecutor();
var delayAnalysisExecutor = new DelayAnalysisExecutor();
var logisticsAIResponseExecutor = new AIResponseGeneratorExecutor(chatClient);
var logisticsSubWorkflow = new WorkflowBuilder(logisticsTrackExecutor)
.AddEdge(logisticsTrackExecutor, delayAnalysisExecutor)
.AddEdge(delayAnalysisExecutor, logisticsAIResponseExecutor)
.WithOutputFrom(logisticsAIResponseExecutor)
.Build();
var logisticsSubExecutor = logisticsSubWorkflow.BindAsExecutor("LogisticsSubWorkflow");
- 步骤 4:构建主工作流(分类器 → 条件路由 → 合规审核 → 情绪评估)
// 1. 投诉分类执行器
internal sealed class ComplaintClassifierExecutor() : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(ComplaintClassifierExecutor))
{
public override ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine("正在分类投诉类型...");
// 简单规则分类(实际可用 AI 分类)
if (record.Original.ComplaintText.Contains("划痕") || record.Original.ComplaintText.Contains("质量") || record.Original.ComplaintText.Contains("退货"))
{
record.Category = "产品质量";
}
else if (record.Original.ComplaintText.Contains("延迟") || record.Original.ComplaintText.Contains("物流") || record.Original.ComplaintText.Contains("配送"))
{
record.Category = "物流问题";
}
else
{
record.Category = "其他";
}
record.ProcessingSteps.Add($"[分类器] 投诉类型识别为:{record.Category}");
return ValueTask.FromResult(record);
}
}
// 2. 合规审核执行器
internal sealed class ComplianceCheckExecutor() : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(ComplianceCheckExecutor))
{
public override ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine("合规审核中...");
var checks = new List<string>();
if (record.AIGeneratedResponse.Contains("歉意")) checks.Add("包含道歉");
if (record.AIGeneratedResponse.Length <= 300) checks.Add("字数合规");
if (!record.AIGeneratedResponse.Contains("法律") && !record.AIGeneratedResponse.Contains("诉讼"))
checks.Add("无敏感法律词汇");
record.ComplianceStatus = checks.Count >= 2 ? "审核通过" : "需人工复审";
record.ProcessingSteps.Add($"[合规审核] {record.ComplianceStatus} - {string.Join(", ", checks)}");
return ValueTask.FromResult(record);
}
}
// 3. 情绪评估执行器(使用 AI)
internal sealed class SentimentAnalysisExecutor(IChatClient chatClient) : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(SentimentAnalysisExecutor))
{
public override async ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine("AI 情绪评估中...");
var prompt = $@"分析以下客服回复的情绪基调,用一个词概括(如:友好、专业、冷淡、热情):
{record.AIGeneratedResponse}
只返回一个词,不要解释。";
var response = await chatClient.GetResponseAsync(prompt, cancellationToken: cancellationToken);
record.SentimentScore = response.Text?.Trim() ?? "中性";
record.ProcessingSteps.Add($"[情绪评估] AI 评估语气为:{record.SentimentScore}");
return record;
}
}
var classifierExecutor = new ComplaintClassifierExecutor();
var complianceExecutor = new ComplianceCheckExecutor();
var sentimentExecutor = new SentimentAnalysisExecutor(chatClient);
// 主工作流:分类 → 条件路由到子流程 → 合规 → 情绪
var mainWorkflow = new WorkflowBuilder(classifierExecutor)
.AddEdge<ComplaintProcessingRecord>(classifierExecutor, productQualitySubExecutor,
condition: record => record.Category == "产品质量")
.AddEdge<ComplaintProcessingRecord>(classifierExecutor, logisticsSubExecutor,
condition: record => record.Category == "物流问题")
.AddEdge(productQualitySubExecutor, complianceExecutor)
.AddEdge(logisticsSubExecutor, complianceExecutor)
.AddEdge(complianceExecutor, sentimentExecutor)
.WithOutputFrom(sentimentExecutor)
.Build();
- 步骤 5:执行工作流并观察共享状态流转
Console.WriteLine("开始执行投诉处理流水线...");
Console.WriteLine(new string('━', 60));
await using (var streaming = await InProcessExecution.StreamAsync(mainWorkflow, processingRecord))
{
await foreach (WorkflowEvent evt in streaming.WatchStreamAsync())
{
switch (evt)
{
case ExecutorInvokedEvent started:
Console.WriteLine($"\n{started.ExecutorId} 开始执行");
break;
case ExecutorCompletedEvent completed when completed.Data is ComplaintProcessingRecord rec:
Console.WriteLine($" 共享状态更新 → 处理步骤数:{rec.ProcessingSteps.Count}");
break;
case WorkflowOutputEvent outputEvt when outputEvt.Data is ComplaintProcessingRecord finalRecord:
Console.WriteLine("\n" + new string('━', 60));
Console.WriteLine("投诉处理完成!最终处理记录:\n");
new {
订单号 = finalRecord.Original.OrderId,
客户 = finalRecord.Original.CustomerName,
投诉类型 = finalRecord.Category,
处理团队 = finalRecord.Handler,
处理步骤数 = finalRecord.ProcessingSteps.Count,
合规状态 = finalRecord.ComplianceStatus,
情绪评分 = finalRecord.SentimentScore
}.Display();
Console.WriteLine("\n处理步骤详情:");
foreach (var step in finalRecord.ProcessingSteps)
{
Console.WriteLine($" {step}");
}
Console.WriteLine("\nAI 生成的客户回复:");
Console.WriteLine(new string('─', 60));
Console.WriteLine(finalRecord.AIGeneratedResponse);
Console.WriteLine(new string('─', 60));
break;
}
}
}
4. 最佳实践
- 标准化接口:子工作流使用统一的状态对象(如 ComplaintProcessingRecord),便于主流程复用和扩展。
- 拆分粒度:保持子工作流 3-4 个步骤,聚焦单一职责(产品质量 vs 物流问题)。
- 共享状态设计:使用引用类型(class)而非值类型(record),确保各执行器修改同一对象。
- AI 集成策略:将 IChatClient 注入执行器构造函数,便于测试时 Mock。
- 条件路由:通过 AddEdge 的 condition 参数实现动态分流,避免硬编码。
- 可观测性:在共享状态中记录 ProcessingSteps,便于审计和调试。
- 多渠道复用:此子工作流可被"电话客服系统"、"在线聊天机器人"、"邮件处理系统"共同复用。
