Net+AI智能体11:A2A智能体跨系统协作
2025-12-13 20:14:17一、A2A 协议
1. 为什么需要 A2A 协议?
随着 AI 技术的发展,企业中的 AI Agent 数量不断增加。这些 Agent 可能来自不同的团队、不同的技术栈,甚至不同的组织:
flowchart LR
subgraph 企业A
A1[客服 Agent]
A2[销售 Agent]
end
subgraph 企业B
B1[物流 Agent]
B2[库存 Agent]
end
subgraph 第三方服务
C1[支付 Agent]
C2[天气 Agent]
end
A1 -.->|如何通信?| B1
A2 -.->|如何协作?| C1
B2 -.->|如何发现?| C2
style A1 fill:#e1f5ff
style A2 fill:#e1f5ff
style B1 fill:#ffe1e1
style B2 fill:#ffe1e1
style C1 fill:#e1ffe1
style C2 fill:#e1ffe1
在没有标准协议的情况下,Agent 之间的互联面临诸多挑战:
| 问题 | 描述 | 影响 |
|---|---|---|
| 协议不统一 | 每个 Agent 使用不同的 API 格式 | 集成成本高,难以扩展 |
| 发现困难 | 无法自动发现其他 Agent 的能力 | 需要人工配置,耦合度高 |
| 状态不透明 | 无法追踪跨 Agent 任务的执行状态 | 调试困难,可靠性差 |
| 安全性参差 | 缺乏统一的认证授权机制 | 安全风险高 |
Agent-to-Agent (A2A) Protocol 提供了一套标准化的解决方案,核心价值:
- 统一发现机制:通过 Agent Card 声明和发现能力
- 标准化通信:基于 JSON-RPC 2.0 的统一消息格式
- 任务生命周期:完整的任务状态管理和追踪
- 流式支持:SSE 实时事件推送
- 安全机制:内置认证和授权方案
2. 什么是 A2A 协议?
Agent-to-Agent (A2A) Protocol 是由 Google 发起并开源的一个开放协议,旨在实现 AI Agent 之间的标准化通信与协作。
A2A 是一个开放协议,用于实现 AI Agent 之间的无缝通信与协作。它提供标准化的 Agent 发现、消息交换和任务管理机制,让 AI Agent 能够跨平台、跨服务安全地互联互通。
| 特性 | 描述 |
|---|---|
| 开放标准 | 开源协议,任何人都可以实现和使用 |
| 语言无关 | 基于 HTTP + JSON-RPC,支持任何编程语言 |
| 双向通信 | Agent 之间是对等关系,可以互相发起请求 |
| 状态管理 | 支持长时间运行的任务和状态追踪 |
| 流式传输 | 通过 SSE 支持实时事件推送 |
| 安全内置 | 支持多种认证和授权方案 |
A2A 协议目前处于活跃开发阶段:
- 当前版本:v0.3.0
- 官方网站:a2a-protocol.org
- .NET SDK:A2A NuGet 包
3. A2A vs MCP:两种协议的对比
flowchart TB
subgraph MCP协议 ["MCP 协议 - 工具扩展"]
direction LR
LLM[LLM/Agent]
Tools[工具]
Resources[资源]
Prompts[提示]
LLM --> Tools
LLM --> Resources
LLM --> Prompts
end
subgraph A2A协议 ["A2A 协议 - Agent 互联"]
direction LR
Agent1[Agent A]
Agent2[Agent B]
Agent3[Agent C]
Agent1 <--> Agent2
Agent2 <--> Agent3
Agent1 <--> Agent3
end
style MCP协议 fill:#e1f5ff
style A2A协议 fill:#ffe1e1
| 维度 | MCP Protocol | A2A Protocol |
|---|---|---|
| 核心定位 | LLM ↔️ Tools/Resources | Agent ↔️ Agent |
| 通信对象 | 被动工具(等待调用) | 自主 Agent(主动决策) |
| 发现机制 | Initialize 握手协商 | Agent Card 能力声明 |
| 状态管理 | 无状态(请求/响应) | 有状态(Task 生命周期) |
| 交互模式 | Client-Server(单向调用) | 双向对等(互相请求) |
| 传输协议 | JSON-RPC over stdio/SSE | JSON-RPC over HTTP/SSE |
| 典型场景 | 扩展 AI 能力(搜索、计算) | 跨 Agent 协作(任务分发) |
MCP 像是"工具箱":
- Agent 是工人,MCP Server 是工具箱
- 工人使用工具完成任务
- 工具是被动的,等待工人使用
A2A 像是"同事协作":
- 多个 Agent 是不同的同事
- 同事之间可以互相委托任务
- 每个同事都是自主的,可以主动决策
在实际企业场景中,A2A 和 MCP 通常协同使用:
flowchart TB
User[用户] --> AgentA[Agent A]
subgraph AgentA内部
AgentA --> MCP1[MCP: 数据库工具]
AgentA --> MCP2[MCP: 搜索工具]
end
AgentA <-->|A2A 协议| AgentB[Agent B]
AgentA <-->|A2A 协议| AgentC[Agent C]
subgraph AgentB内部
AgentB --> MCP3[MCP: 支付工具]
end
subgraph AgentC内部
AgentC --> MCP4[MCP: 邮件工具]
end
style User fill:#fff4e1
style AgentA fill:#e1f5ff
style AgentB fill:#e1ffe1
style AgentC fill:#ffe1f5
- MCP:每个 Agent 内部使用 MCP 扩展自己的能力
- A2A:Agent 之间使用 A2A 进行跨 Agent 协作
4. A2A 核心架构
A2A 协议定义了三个核心组件:Client、Server 和 TaskManager。
- 整体架构
flowchart TB
subgraph 客户端
Client[A2AClient]
Resolver[A2ACardResolver]
end
subgraph 服务端
Server[A2A Server<br/>ASP.NET Core]
TM[TaskManager]
Agent[Your Agent<br/>业务逻辑]
Server --> TM
TM --> Agent
end
Resolver -->|1. 获取 Agent Card| Server
Client -->|2. 发送消息/任务| Server
Server -->|3. 返回响应/事件| Client
style Client fill:#e1f5ff
style Resolver fill:#e1f5ff
style Server fill:#ffe1e1
style TM fill:#fff4e1
style Agent fill:#e1ffe1
- 核心组件说明
| 组件 | 职责 | .NET 类 |
|---|---|---|
| A2AClient | 发送请求、接收响应、订阅事件 | A2AClient |
| A2ACardResolver | 发现 Agent、获取 Agent Card | A2ACardResolver |
| TaskManager | 管理任务生命周期、分发回调 | TaskManager |
| Agent | 处理实际业务逻辑 | 自定义实现 |
核心概念:
- Agent Card(身份名片),Agent Card 是 Agent 的"名片",声明 Agent 的基本信息和能力:
┌───────────────────────────────────┐
│ Agent Card │
├───────────────────────────────────┤
│ Name: "Research Agent" │
│ Description: "研究分析助手" │
│ URL: "https://agent.example.com" │
│ Version: "1.0.0" │
├───────────────────────────────────┤
│ Capabilities: │
│ ・Streaming: ✅ │
│ ・PushNotifications: ✅ │
├───────────────────────────────────┤
│ Skills: │
│ ・web-research: 网页研究 │
│ ・data-analysis: 数据分析 │
└───────────────────────────────────┘
- Message(消息),Message 是 Agent 之间交换的基本数据单元:
┌───────────────────────────────────┐
│ Message │
├───────────────────────────────────┤
│ MessageId: "msg-001" │
│ ContextId: "ctx-001" │
│ Role: User / Agent │
├───────────────────────────────────┤
│ Parts: │
│ ・TextPart: "你好" │
│ ・FilePart: attachment.pdf │
│ ・DataPart: { json: data } │
└───────────────────────────────────┘
- Task(任务),Task 代表一个可追踪的长时间运行操作:
┌───────────────────────────────────┐
│ Task │
├───────────────────────────────────┤
│ Id: "task-001" │
│ Status: │
│ ・State: Working / Completed │
│ ・Timestamp: 2024-01-01T... │
├───────────────────────────────────┤
│ History: [Message, Message...] │
│ Artifacts: [文件, 数据...] │
└───────────────────────────────────┘
5. A2A 通信模式
A2A 协议支持两种主要的通信模式:Message-based(消息模式) 和 Task-based(任务模式)。
- Message-based 模式
- 特点:简单直接,适合快速请求/响应场景。
- 适用场景:
- 简单问答
- 快速查询
- 无需追踪状态
sequenceDiagram
participant C as A2AClient
participant S as A2A Server
participant A as Agent
C->>S: message/send
S->>A: OnMessageReceived
A->>S: AgentMessage
S->>C: AgentMessage (响应)
Note over C,A: 同步请求/响应,无状态
Task-based 模式
特点:支持长时间运行、状态追踪、可取消。
sequenceDiagram participant C as A2AClient participant S as A2A Server participant A as Agent C->>S: message/send (createTask) S->>A: OnTaskCreated S->>C: AgentTask (submitted) loop 任务执行中 A->>S: 更新状态 S-->>C: SSE: TaskStatusUpdate end A->>S: 完成任务 S-->>C: SSE: TaskStatusUpdate (completed) Note over C,A: 异步执行,状态可追踪Task 状态流转:
stateDiagram-v2
[*] --> Submitted: 创建任务
Submitted --> Working: 开始处理
Working --> Working: 更新进度
Working --> InputRequired: 需要用户输入
InputRequired --> Working: 收到输入
Working --> Completed: 完成
Working --> Failed: 失败
Working --> Canceled: 取消
Completed --> [*]
Failed --> [*]
Canceled --> [*]
6. 流式通信(Streaming)
无论是 Message-based 还是 Task-based,A2A 都支持 SSE 流式通信:
| 方法 | 用途 |
|---|---|
| SendMessageAsync | 非流式消息发送 |
| SendMessageStreamingAsync | 流式消息发送(SSE) |
| SubscribeToTaskAsync | 订阅任务状态更新(SSE) |
A2A 事件类型:
| 事件类型 | 描述 |
|---|---|
| TaskStatusUpdateEvent | 任务状态变更通知 |
| TaskArtifactUpdateEvent | 任务产物更新通知 |
7. JSON-RPC 2.0 通信协议
A2A 基于 JSON-RPC 2.0 作为底层通信协议。了解 JSON-RPC 有助于我们理解 A2A 的通信机制。
JSON-RPC 是一种轻量级的远程过程调用协议:
- 使用 JSON 作为数据格式
- 支持请求/响应和通知模式
- 简单、易于实现
请求格式
{
"jsonrpc": "2.0",
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [{ "kind": "text", "text": "你好" }]
}
},
"id": "req-001"
}
成功响应:
{
"jsonrpc": "2.0",
"result": {
"role": "agent",
"messageId": "msg-001",
"parts": [{ "kind": "text", "text": "你好!有什么可以帮您的?" }]
},
"id": "req-001"
}
错误响应:
{
"jsonrpc": "2.0",
"error": {
"code": -32600,
"message": "Invalid Request"
},
"id": "req-001"
}
A2A 支持的方法
| 方法 | 描述 |
|---|---|
| message/send | 发送消息(Message-based) |
| message/stream | 流式发送消息 |
| task/get | 获取任务信息 |
| task/cancel | 取消任务 |
| task/subscribe | 订阅任务事件 |
| pushNotificationConfig/set | 设置推送通知 |
| pushNotificationConfig/get | 获取推送通知配置 |
二、Agent Card
在 A2A (Agent-to-Agent) 协议中,Agent Card 是 Agent 的"身份证"和"说明书"。它是一个标准化的 JSON 结构,用于向外界声明 Agent 的身份信息、具备的能力(Capabilities)、支持的技能(Skills)以及输入输出模态(Modalities)。
通过 Agent Card,其他 Agent 或客户端可以:
- 识别身份:了解 Agent 的名称、描述、版本和维护者信息。
- 发现能力:判断 Agent 是否支持流式传输、推送通知等高级特性。
- 了解技能:获取 Agent 提供的具体功能列表。
- 建立连接:获取连接所需的 URL 和认证方式。
1. Agent Card 核心结构
AgentCard 类是 A2A 协议的核心数据模型。让我们先通过代码直观地看一个完整的 Agent Card 定义。
var agentCard = new AgentCard
{
// 1. 基本身份信息
Name = "Research Agent",
Description = "专业的研究分析助手,擅长搜索网络并生成深度报告。",
Url = "https://agent.example.com/research",
Version = "1.0.0",
ProtocolVersion = "0.3.0", // 遵循的 A2A 协议版本
// 2. 能力声明 (Capabilities)
Capabilities = new AgentCapabilities
{
Streaming = true, // 支持 SSE 流式传输
PushNotifications = true, // 支持 Webhook 推送通知
StateTransitionHistory = true // 支持查询任务状态变更历史
},
// 3. 输入输出模态 (Modalities)
DefaultInputModes = ["text", "image"], // 支持文本和图片输入
DefaultOutputModes = ["text", "file"], // 输出文本和文件
// 4. 技能列表 (Skills)
Skills = [
new AgentSkill
{
Id = "web-research",
Name = "网页研究",
Description = "搜索和分析网页内容"
},
new AgentSkill
{
Id = "report-generation",
Name = "报告生成",
Description = "基于研究结果生成 PDF 报告"
}
],
// 5. 认证配置 (SecuritySchemes)
SecuritySchemes = new Dictionary<string, SecurityScheme>
{
["apiKey"] = new ApiKeySecurityScheme("X-API-Key", "header")
},
Security = new List<Dictionary<string, string[]>>
{
new()
{
["apiKey"] = []
}
},
};
核心属性详解:
基本信息 (Name, Description, Url, Version):
- 用于人类可读的展示和唯一标识。
- Url 是 Agent 的根地址,客户端将基于此地址构建 API 请求(如 /message/send)。
能力声明 (Capabilities):
- Streaming: 是否支持 Server-Sent Events (SSE) 流式响应。如果为 true,客户端可以使用 SendMessageStreamingAsync。
- PushNotifications: 是否支持通过 Webhook 异步推送任务状态和产物。这对于耗时任务非常重要。
- StateTransitionHistory: 是否保存并允许查询任务的状态流转历史。
模态 (DefaultInputModes, DefaultOutputModes):
- 声明 Agent 能够理解和生成的媒体类型(如 text, image, audio)。这有助于编排器(Orchestrator)选择合适的 Agent。
技能 (Skills):
- 列出 Agent 具备的具体功能。这类似于 Semantic Kernel 的 Plugins 或 OpenAI 的 Functions,但这里仅作为元数据声明,用于发现和路由。
安全方案 (SecuritySchemes):
- 遵循 OpenAPI 规范的安全定义。告诉客户端如何进行身份验证(例如 API Key 的名称和位置)。
2. Agent 发现 (Discovery)
在 A2A 架构中,客户端通常不知道 Agent 的具体实现细节,而是通过 Agent 发现机制来获取 Agent Card。
A2A SDK 提供了 A2ACardResolver 类,用于从指定的 URL 获取 Agent Card。
// 模拟一个运行中的 Agent Server 地址 (这里我们假设它运行在 localhost:5048)
// 在实际场景中,这通常是一个真实的 HTTP 地址
var agentUrl = new Uri("http://localhost:5048/echo");
// 创建解析器
var cardResolver = new A2ACardResolver(agentUrl);
// 注意:由于我们没有实际启动 Server,这里仅演示代码用法。
Console.WriteLine($"解析器已创建,目标地址: {agentUrl}");
// 模拟获取到的 Agent Card (为了演示,我们直接使用上面定义的对象)
var discoveredCard = agentCard;
// 检查 Agent 能力的辅助方法
void CheckAgentCapabilities(AgentCard card)
{
Console.WriteLine($"分析 Agent: {card.Name}");
if (card.Capabilities.Streaming)
{
Console.WriteLine(" 支持流式通信 (Streaming)");
}
else
{
Console.WriteLine(" 不支持流式通信");
}
if (card.Capabilities.PushNotifications)
{
Console.WriteLine(" 支持推送通知 (Push Notifications)");
}
Console.WriteLine($" 包含技能: {string.Join(", ", card.Skills.Select(s => s.Name))}");
}
CheckAgentCapabilities(discoveredCard);
3. 动态生成 Agent Card
在服务端开发中,我们通常不需要手动 new AgentCard,而是通过实现 OnAgentCardQuery 委托来动态返回。这允许我们根据配置或环境动态调整 Agent 的声明。
// 定义一个获取 Agent Card 的委托
Task<AgentCard> GetAgentCardAsync(string agentUrl, CancellationToken cancellationToken)
{
// 可以在这里读取配置文件或数据库
var version = "1.0.0";
var card = new AgentCard
{
Name = "Dynamic Agent",
Description = $"动态生成的 Agent (v{version})",
Url = agentUrl, // 使用传入的 agentUrl,确保与请求上下文一致
Version = version,
Capabilities = new AgentCapabilities { Streaming = true }
};
return Task.FromResult(card);
}
// 模拟调用
var card = await GetAgentCardAsync("https://myservice.com/agent", CancellationToken.None);
card.Display();
三、A2A Server 快速开始
1. 定义 Echo Agent
首先,我们需要定义 Agent 的逻辑。在 A2A 中,Agent 通过实现 ITaskManager 的回调函数来处理请求。
对于简单的消息处理,我们主要关注两个回调:
- OnAgentCardQuery:当客户端查询 Agent 信息时调用。
- OnMessageReceived:当客户端发送消息时调用。
public class EchoAgent
{
public void Attach(ITaskManager taskManager)
{
// 处理 Agent Card 查询
taskManager.OnAgentCardQuery = GetAgentCardAsync;
// 处理消息接收(Message-based 模式)
taskManager.OnMessageReceived = ProcessMessageAsync;
}
private Task<A2AResponse> ProcessMessageAsync(MessageSendParams messageSendParams, CancellationToken cancellationToken)
{
var messageText = messageSendParams.Message.Parts
.OfType<TextPart>().First().Text;
var response = new AgentMessage
{
Role = MessageRole.Agent,
MessageId = Guid.NewGuid().ToString(),
ContextId = messageSendParams.Message.ContextId,
Parts = [new TextPart { Text = $"Echo: {messageText}" }]
};
return Task.FromResult<A2AResponse>(response);
}
private Task<AgentCard> GetAgentCardAsync(string agentUrl, CancellationToken cancellationToken)
{
return Task.FromResult(new AgentCard
{
Name = "Echo Agent",
Description = "回显收到的所有消息",
Url = agentUrl,
Version = "1.0.0",
DefaultInputModes = ["text"],
DefaultOutputModes = ["text"],
Capabilities = new AgentCapabilities { Streaming = true }
});
}
}
Console.WriteLine("EchoAgent 类定义完成");
2. 创建 A2A Server
在 ASP.NET Core 中,我们使用 TaskManager 来管理 Agent,并使用 MapA2A 扩展方法将其映射到 HTTP 端点。
我们来完成一个完整示例:
- 核心代码 (Program.cs)
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
// 1. 创建 TaskManager
var taskManager = new TaskManager();
// 2. 创建并挂载 Agent
var agent = new EchoAgent();
agent.Attach(taskManager);
// 3. 映射 A2A 端点
app.MapA2A(taskManager, "/echo"); // JSON-RPC 端点
app.MapWellKnownAgentCard(taskManager, "/echo"); // Agent Card 端点
app.MapHttpA2A(taskManager, "/echo"); // HTTP 端点(可选)
app.Run();
- 运行示例项目
请在终端中运行以下命令来启动 Echo Agent 服务器:
dotnet run --agent echo --urls https://localhost:5048
A2A 端点将位于 /echo 路径下。可以直接通过浏览器访问 https://localhost:5048/echo/agent-card.json 来查看 Agent Card 信息。
3. 客户端访问
可以使用任何支持 A2A 协议的客户端来与 Echo Agent 交互。发送消息时,Agent 会将收到的消息原样返回。
- 获取 Agent Card
// Create agent card resolver
A2ACardResolver agentCardResolver = new(new Uri("https://localhost:5048/echo"));
// Get agent card
AgentCard agentCard = await agentCardResolver.GetAgentCardAsync();
agentCard.Display();
- 使用 A2AClient 发送消息
A2AClient agentClient = new(new Uri(agentCard.Url));
// Create a message to send to the agent
AgentMessage userMessage = new()
{
Role = MessageRole.User,
MessageId = Guid.NewGuid().ToString(),
Parts = [
new TextPart
{
Text = "Hello from the message-based communication sample! Please echo this message."
}
]
};
var sendParams = new MessageSendParams { Message = userMessage };
// Send the message and get the response
AgentMessage agentResponse = (AgentMessage)await agentClient.SendMessageAsync(sendParams);
Console.WriteLine("Received response from agent:");
agentResponse.Display();
- 发送流式消息 (SSE),使用 SendMessageStreamingAsync 方法可以获取一个异步枚举器,逐个接收事件。
using System.Net.ServerSentEvents;
await foreach (SseItem<A2AEvent> sseItem in agentClient.SendMessageStreamingAsync(new MessageSendParams { Message = userMessage }))
{
// sseItem.Data 是 A2AEvent 基类
// 常见的事件类型包括:
// - TaskStatusUpdateEvent: 任务状态更新
// - TaskArtifactUpdateEvent: 任务产物更新
// - AgentMessage: 直接返回的消息(在简单场景下)
Console.WriteLine($"收到事件类型: {sseItem.EventType}");
if (sseItem.Data is AgentMessage msg)
{
var text = msg.Parts.OfType<TextPart>().FirstOrDefault()?.Text;
Console.WriteLine($"消息内容: {text}");
}
}
四、Task 任务管理
前面主要使用了 Message-based(基于消息)的通信模式,这种模式类似于即时聊天,适合简单的问答场景。
然而,在复杂的 Agent 协作场景中,我们往往需要处理更复杂的工作流,例如:
- 需要长时间运行的任务
- 需要多轮交互和确认的任务
- 需要追踪任务进度和状态的任务
这时,我们就需要使用 Task-based(基于任务)的通信模式。
1. Task 任务管理基础
核心概念:AgentTask 与 TaskState,在 A2A 协议中,Task 是一个核心概念,它封装了任务的生命周期、状态、历史记录和产物。
AgentTask 模型,一个 AgentTask 对象主要包含以下属性:
Id: 任务的唯一标识符
Status: 任务的当前状态(包含 State 枚举和进度信息)
History: 任务的对话历史(AgentMessage 列表)
Artifacts: 任务生成的产物(如文件、报告等)
TaskState 状态流转,任务状态 (TaskState) 描述了任务当前的生命周期阶段:
- Submitted: 任务已提交
- Working: 正在处理中
- InputRequired: 需要用户或调用方提供更多输入
- Completed: 任务成功完成
- Failed: 任务失败
- Canceled: 任务被取消
flowchart LR
Submitted --> Working
Working --> InputRequired
InputRequired --> Working
Working --> Completed
Working --> Failed
Working --> Canceled
2. 定义 Task-based Agent
要创建一个支持任务管理的 Agent,我们需要在 Attach 方法中订阅 OnTaskCreated 和 OnTaskUpdated 事件,而不是 OnMessageReceived。
下面我们定义一个 EchoAgentWithTasks,它会模拟一个耗时任务,并返回一个产物。
public class EchoAgentWithTasks
{
private ITaskManager? _taskManager;
public void Attach(ITaskManager taskManager)
{
_taskManager = taskManager;
// 1. 订阅任务创建事件
taskManager.OnTaskCreated = ProcessTaskAsync;
// 2. 订阅任务更新事件(用于多轮对话,本例暂不涉及)
taskManager.OnTaskUpdated = ProcessTaskAsync;
// 3. 提供 Agent Card
taskManager.OnAgentCardQuery = (url, ct) => Task.FromResult(new AgentCard
{
Name = "Task Echo Agent",
Description = "基于任务的回显 Agent",
Url = url,
Version = "1.0.0",
Capabilities = new AgentCapabilities { Streaming = true },
DefaultInputModes = ["text"],
DefaultOutputModes = ["text"]
});
}
private async Task ProcessTaskAsync(AgentTask task, CancellationToken ct)
{
// 获取最新的一条消息
var lastMessage = task.History!.Last();
var messageText = lastMessage.Parts.OfType<TextPart>().First().Text;
Console.WriteLine($"[Server] 收到任务 {task.Id} 消息: {messageText}");
// 模拟处理时间 (1秒)
await Task.Delay(1000, ct);
// 1. 返回产物 (Artifact)
// 产物通常是任务的最终结果,如生成的文件、报告等
await _taskManager!.ReturnArtifactAsync(task.Id, new Artifact
{
Parts = [new TextPart { Text = $"Echo Result: {messageText}" }]
}, ct);
// 2. 更新任务状态为 Completed
// final: true 表示这是最终状态,任务结束
await _taskManager!.UpdateStatusAsync(
task.Id,
status: TaskState.Completed,
final: true,
cancellationToken: ct);
Console.WriteLine($"[Server] 任务 {task.Id} 已完成");
}
}
- 运行示例项目,请在终端中运行以下命令来启动 Echo Tasks Agent 服务器:
dotnet run --agent echotasks --urls https://localhost:5048
A2A 端点将位于 /echotasks 路径下。可以直接通过浏览器访问 https://localhost:5048/.well-known/agent-card.json 来查看 Agent Card 信息。
- 客户端调用与任务交互,现在使用 A2AClient 来创建一个任务,并查看返回的结果。
var agentEndpoint = "https://localhost:5048/echotasks";
var agentCard = await A2AHelper.GetAgentCard(agentEndpoint);
Console.WriteLine("已获取 Agent Card 信息:");
agentCard.Display();
var agentUrl = new Uri(agentCard.Url);
var client = new A2AClient(agentUrl);
Console.WriteLine("发送任务请求...");
// 发送消息 (这将触发 OnTaskCreated)
// Create a new task by sending the message to the agent
var response = await client.SendMessageAsync(new MessageSendParams
{
Message = new AgentMessage
{
Role = MessageRole.User,
Parts = [new TextPart { Text = "Hello Task World!" }]
}
});
// 检查响应类型
if (response is AgentTask task)
{
Console.WriteLine($"收到任务响应,ID: {task.Id}");
// 显示任务详情
new {
TaskId = task.Id,
Status = task.Status.State,
ArtifactsCount = task.Artifacts?.Count ?? 0,
FirstArtifact = task.Artifacts?.FirstOrDefault()?.Parts.OfType<TextPart>().FirstOrDefault()?.Text
}.Display();
}
else
{
Console.WriteLine("收到了非 Task 类型的响应");
response.Display();
}
从上面的输出中,我们可以看到:
- SendMessageAsync 返回了一个 AgentTask 对象。
- 任务状态为 Completed(因为我们在 Server 端等待了处理完成)。
- Artifacts 中包含了我们返回的 "Echo Result"。
这就是 Task-based 模式的基本流程:提交任务 -> 处理 -> 返回任务对象(含状态和产物)。
3. Task 任务管理核心概念
任务状态查询与控制,除了创建和更新任务,Client 端还需要能够查询任务的当前状态,或者在必要时取消任务。A2A 提供了以下 API:
GetTaskAsync: 查询任务当前状态
CancelTaskAsync: 取消正在进行的任务
SubscribeToTaskAsync: 订阅任务事件
多轮交互 (Human-in-the-loop),很多复杂的任务不能一次性完成,可能需要 Agent 在中间步骤请求用户的确认或补充信息。A2A 协议通过 InputRequired 状态支持这种交互模式。
任务元数据与历史管理
Metadata: 任务的自定义元数据,可以存储额外的业务信息
History: 完整的消息历史记录
任务存储 (Task Store),为了支持长时间运行的任务和历史记录查询,TaskManager 需要持久化任务数据。ITaskStore 接口定义了存储规范,默认提供了 InMemoryTaskStore。
4. ITaskStore 与 InMemoryTaskStore
TaskManager 依赖 ITaskStore 来保存任务数据。默认情况下,它使用 InMemoryTaskStore,这意味着重启后数据会丢失。
ITaskStore 接口,定义了任务的 CRUD 操作:
GetTaskAsync - 获取任务
SetTaskAsync - 保存任务
UpdateStatusAsync - 更新状态
AddArtifactAsync - 添加产物
AddHistoryAsync - 添加历史记录
InMemoryTaskStore,SDK 默认提供的内存存储实现:
无需额外配置,开箱即用
性能高,适合开发和测试
数据不持久化,重启后丢失
不支持分布式部署
在生产环境中,通常会实现自定义 ITaskStore(例如基于 SQL 数据库、Redis、Cosmos DB 等)。
// 创建一个显式的 InMemoryTaskStore 实例(通常不需要手动创建,TaskManager 默认会使用它)
var taskStore = new InMemoryTaskStore();
// 创建 TaskManager 时注入 TaskStore
var taskManager = new TaskManager(taskStore:taskStore); // 使用命名参数避免歧义
5. 构建多轮交互 Agent (Researcher Agent)
我们将构建一个 ResearcherAgent,它模拟一个研究任务。这个任务分为三个阶段:
- Planning: 制定研究计划
- WaitingForFeedback: 等待用户确认计划 (InputRequired)
- Researching: 执行研究并完成
这个示例将展示如何使用 InputRequired 状态来实现 Human-in-the-loop 工作流。
public class ResearcherAgent
{
private ITaskManager? _taskManager;
private readonly Dictionary<string, AgentState> _agentStates = [];
public static readonly ActivitySource ActivitySource = new("A2A.ResearcherAgent", "1.0.0");
private enum AgentState
{
Planning,
WaitingForFeedbackOnPlan,
Researching
}
public void Attach(ITaskManager taskManager)
{
_taskManager = taskManager;
_taskManager.OnTaskCreated = async (task, cancellationToken) =>
{
// Initialize the agent state for the task
_agentStates[task.Id] = AgentState.Planning;
// Ignore other content in the task, just assume it is a text message.
var message = ((TextPart?)task.History?.Last()?.Parts?.FirstOrDefault())?.Text ?? string.Empty;
await InvokeAsync(task.Id, message, cancellationToken);
};
_taskManager.OnTaskUpdated = async (task, cancellationToken) =>
{
// Note that the updated callback is helpful to know not to initialize the agent state again.
var message = ((TextPart?)task.History?.Last()?.Parts?.FirstOrDefault())?.Text ?? string.Empty;
await InvokeAsync(task.Id, message, cancellationToken);
};
_taskManager.OnAgentCardQuery = GetAgentCardAsync;
}
// This is the main entry point for the agent. It is called when a task is created or updated.
// It probably should have a cancellation token to enable the process to be cancelled.
public async Task InvokeAsync(string taskId, string message, CancellationToken cancellationToken)
{
if (_taskManager == null)
{
throw new InvalidOperationException("TaskManager is not attached.");
}
using var activity = ActivitySource.StartActivity("Invoke", ActivityKind.Server);
activity?.SetTag("task.id", taskId);
activity?.SetTag("message", message);
activity?.SetTag("state", _agentStates[taskId].ToString());
switch (_agentStates[taskId])
{
case AgentState.Planning:
await DoPlanningAsync(taskId, message, cancellationToken);
await _taskManager.UpdateStatusAsync(taskId, TaskState.InputRequired, new AgentMessage()
{
Parts = [new TextPart() { Text = "When ready say go ahead" }],
},
cancellationToken: cancellationToken);
break;
case AgentState.WaitingForFeedbackOnPlan:
if (message == "go ahead") // Dumb check for now to avoid using an LLM
{
await DoResearchAsync(taskId, message, cancellationToken);
}
else
{
// Take the message and redo planning
await DoPlanningAsync(taskId, message, cancellationToken);
await _taskManager.UpdateStatusAsync(taskId, TaskState.InputRequired, new AgentMessage()
{
Parts = [new TextPart() { Text = "When ready say go ahead" }],
},
cancellationToken: cancellationToken);
}
break;
case AgentState.Researching:
await DoResearchAsync(taskId, message, cancellationToken);
break;
}
}
private async Task DoResearchAsync(string taskId, string message, CancellationToken cancellationToken)
{
if (_taskManager == null)
{
throw new InvalidOperationException("TaskManager is not attached.");
}
using var activity = ActivitySource.StartActivity("DoResearch", ActivityKind.Server);
activity?.SetTag("task.id", taskId);
activity?.SetTag("message", message);
_agentStates[taskId] = AgentState.Researching;
await _taskManager.UpdateStatusAsync(taskId, TaskState.Working, cancellationToken: cancellationToken);
await _taskManager.ReturnArtifactAsync(
taskId,
new Artifact()
{
Parts = [new TextPart() { Text = $"{message} received." }],
},
cancellationToken);
await _taskManager.UpdateStatusAsync(taskId, TaskState.Completed, new AgentMessage()
{
Parts = [new TextPart() { Text = "Task completed successfully" }],
},
cancellationToken: cancellationToken);
}
private async Task DoPlanningAsync(string taskId, string message, CancellationToken cancellationToken)
{
if (_taskManager == null)
{
throw new InvalidOperationException("TaskManager is not attached.");
}
using var activity = ActivitySource.StartActivity("DoPlanning", ActivityKind.Server);
activity?.SetTag("task.id", taskId);
activity?.SetTag("message", message);
// Task should be in status Submitted
// Simulate being in a queue for a while
await Task.Delay(1000, cancellationToken);
// Simulate processing the task
await _taskManager.UpdateStatusAsync(taskId, TaskState.Working, cancellationToken: cancellationToken);
await _taskManager.ReturnArtifactAsync(
taskId,
new Artifact()
{
Parts = [new TextPart() { Text = $"{message} received." }],
},
cancellationToken);
await _taskManager.UpdateStatusAsync(taskId, TaskState.InputRequired, new AgentMessage()
{
Parts = [new TextPart() { Text = "When ready say go ahead" }],
},
cancellationToken: cancellationToken);
_agentStates[taskId] = AgentState.WaitingForFeedbackOnPlan;
}
private Task<AgentCard> GetAgentCardAsync(string agentUrl, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<AgentCard>(cancellationToken);
}
var capabilities = new AgentCapabilities()
{
Streaming = true,
PushNotifications = false,
};
return Task.FromResult(new AgentCard()
{
Name = "Researcher Agent",
Description = "Agent which conducts research.",
Url = agentUrl,
Version = "1.0.0",
DefaultInputModes = ["text"],
DefaultOutputModes = ["text"],
Capabilities = capabilities,
Skills = [],
});
}
}
- 启动 Server 并运行交互流程
在终端中运行以下命令来启动 Echo Tasks Agent 服务器:
cd samples/AgentServer
dotnet run --agent researcher --urls https://localhost:5048
A2A 端点将位于 /researcher 路径下。可以直接通过浏览器访问 https://localhost:5048/.well-known/agent-card.json 来查看 Agent Card 信息。
var researcherEndpoint = "https://localhost:5048/researcher";
var researcherCard = await A2AHelper.GetAgentCard(researcherEndpoint);
Console.WriteLine("已获取 Agent Card 信息:");
researcherCard.Display();
6. 创建 Client 并开始多轮交互
现在让我们创建一个 A2A Client,与 Researcher Agent 进行多轮交互。我们将演示完整的工作流程:
- 创建任务(提交研究问题)
- Agent 进入 Planning 状态并返回计划
- Agent 切换到 InputRequired 状态,等待确认
- 用户确认计划
- Agent 执行研究并完成任务
这个过程展示了如何使用 InputRequired 实现 Human-in-the-loop。
// 创建 A2A Client
var client = new A2AClient(new Uri(researcherEndpoint));
// 步骤 1: 创建任务
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("步骤 1: 创建研究任务");
var taskResult = await client.SendMessageAsync(new MessageSendParams
{
Message = new AgentMessage
{
Parts = [new TextPart { Text = "Research the latest AI trends in 2025" }]
}
});
// 检查响应类型(应为 AgentTask)
if (taskResult is not AgentTask task)
{
throw new InvalidOperationException("预期返回 AgentTask,但收到其他类型");
}
var taskId = task.Id;
Console.WriteLine($"任务已创建: {taskId}");
Console.WriteLine($"初始状态: {task.Status.State}");
task = await client.GetTaskAsync(taskId);
await Task.Delay(2000);
// 步骤 2: 查询任务状态 (使用 GetTaskAsync)
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("步骤 2: 查询任务状态 (GetTaskAsync)");
var task = await client.GetTaskAsync(taskId);
Console.WriteLine($"当前状态: {task.Status.State}");
// 查看最新的消息和产物
if (task.Status.Message?.Parts != null)
{
var message = task.Status.Message.Parts.OfType<TextPart>().FirstOrDefault()?.Text;
Console.WriteLine($"Agent 消息: {message}");
}
if (task.Artifacts?.Count > 0)
{
var artifact = task.Artifacts.Last();
var artifactText = artifact.Parts?.OfType<TextPart>().FirstOrDefault()?.Text;
await client.SendMessageAsync(new MessageSendParams()
{
Message = new AgentMessage
{
// 步骤 3: 响应 InputRequired(发送确认)
ContextId = taskId, // 关联到现有任务
Parts = [new TextPart { Text = "go ahead" }]
}
});
Console.WriteLine("已发送确认消息");
}
// 等待 Agent 完成研究
await Task.Delay(2000);
// 步骤 4: 再次查询任务状态
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("步骤 4: 查询最终任务状态");
task = await client.GetTaskAsync(taskId);
Console.WriteLine($"最终状态: {task.Status.State}");
if (task.Status.Message?.Parts != null)
{
var message = task.Status.Message.Parts.OfType<TextPart>().FirstOrDefault()?.Text;
Console.WriteLine($"完成消息: {message}");
}
Console.WriteLine($"\n任务历史记录数量: {task.History?.Count ?? 0}");
Console.WriteLine($"产物总数: {task.Artifacts?.Count ?? 0}");
关键点解析:上面的代码展示了完整的多轮交互流程:
- CreateTaskAsync: 创建任务并提交初始消息
- GetTaskAsync: 随时查询任务状态,获取最新进展
- UpdateTaskAsync: 当任务处于 InputRequired 状态时,发送用户反馈
- 任务历史: 所有的消息交互都保存在 task.History 中
- 产物收集: Agent 返回的所有产物都存储在 task.Artifacts 中
这种模式非常适合需要人工审核、确认或补充信息的工作流程。
7. 任务元数据 (Metadata) 的使用
任务元数据允许你存储与任务相关的自定义信息。A2A 协议中有两种 Metadata:
- AgentMessage.Metadata,存储在消息级别,随消息一起传递,保存在任务历史中。
- AgentTask.Metadata ,存储在任务级别,需要在 Server 端(TaskManager)显式设置。
注意:MessageSendParams.Metadata 目前不会自动传递到 AgentTask.Metadata,这是 SDK 的一个限制。如果需要任务级别的 Metadata,需要在 Server 端处理。
让我们看看如何使用 Message Metadata:
// 创建带 Metadata 的任务
// 注意: Metadata 应该设置在 AgentMessage 上
var taskWithMetadata = await client.SendMessageAsync(new MessageSendParams
{
Message = new AgentMessage
{
Parts = [new TextPart { Text = "Research quantum computing applications" }],
// Metadata 设置在 Message 上
Metadata = new Dictionary<string, System.Text.Json.JsonElement>
{
["userId"] = System.Text.Json.JsonSerializer.SerializeToElement("user-12345"),
["priority"] = System.Text.Json.JsonSerializer.SerializeToElement("high"),
["department"] = System.Text.Json.JsonSerializer.SerializeToElement("R&D"),
["requestId"] = System.Text.Json.JsonSerializer.SerializeToElement(Guid.NewGuid().ToString())
}
}
});
// 检查响应类型
if (taskWithMetadata is not AgentTask taskMeta)
{
throw new InvalidOperationException("预期返回 AgentTask");
}
Console.WriteLine($"任务已创建: {taskMeta.Id}");
// 查询任务历史中的消息 Metadata
if (taskMeta.History != null && taskMeta.History.Count > 0)
{
var firstMessage = taskMeta.History[0];
Console.WriteLine("\n消息元数据 (Message.Metadata):");
if (firstMessage.Metadata != null)
{
foreach (var (key, value) in firstMessage.Metadata)
{
Console.WriteLine($" {key}: {value}");
}
}
else
{
Console.WriteLine(" (未设置)");
}
}
// 注意: AgentTask.Metadata 与 AgentMessage.Metadata 是不同的
Console.WriteLine("\n任务元数据 (Task.Metadata):");
if (taskMeta.Metadata != null)
{
foreach (var (key, value) in taskMeta.Metadata)
{
Console.WriteLine($" {key}: {value}");
}
}
else
{
Console.WriteLine(" (Task 级别的 Metadata 为空,因为它需要在 Server 端显式设置)");
}
// Metadata 的典型用途
new
{
说明 = "Metadata 在 A2A 中的使用",
Message_Metadata = new
{
用途 = "存储消息级别的元信息",
示例 = new[]
{
"消息来源标识",
"用户上下文信息",
"请求追踪ID"
}
},
Task_Metadata = new
{
用途 = "存储任务级别的元信息(需 Server 端设置)",
示例 = new[]
{
"任务优先级",
"业务分类",
"SLA 要求"
}
},
注意 = "MessageSendParams.Metadata 不会自动传递到 AgentTask.Metadata"
}.Display();
8. 任务取消 (CancelTaskAsync)
有时候任务执行时间过长,或者用户改变了主意,我们需要取消任务。A2A 提供了 CancelTaskAsync 方法。
取消任务的场景
- 用户主动取消请求
- 任务超时
- 检测到重复任务
- 资源不足需要终止
让我们模拟一个场景:
// 创建一个任务
var longRunningTask = await client.SendMessageAsync(new MessageSendParams
{
Message = new AgentMessage
{
Parts = [new TextPart { Text = "Research comprehensive AI history" }]
}
});
// 检查响应类型
if (longRunningTask is not AgentTask longTask)
{
throw new InvalidOperationException("预期返回 AgentTask");
}
Console.WriteLine($"任务已创建: {longTask.Id}");
Console.WriteLine($"初始状态: {longTask.Status.State}");
// 等待一会儿,然后决定取消
await Task.Delay(1500);
Console.WriteLine("\n用户决定取消任务...");
// 取消任务(使用扩展方法)
var cancelResult = await client.CancelTaskAsync(longTask.Id);
Console.WriteLine($"取消请求已发送");
Console.WriteLine($"取消后状态: {cancelResult.Status.State}");
// 再次查询确认状态
var cancelledTask = await client.GetTaskAsync(longTask.Id);
Console.WriteLine($"最终确认状态: {cancelledTask.Status.State}");
// 取消任务的注意事项
new
{
方法 = "CancelTaskAsync(taskId)",
状态变更 = "Working/InputRequired → Canceled",
注意事项 = new[]
{
"Agent 需要正确处理 CancellationToken",
"取消是异步操作,可能不会立即生效",
"已完成或已失败的任务无法取消",
"取消后的任务仍可查询,但不能更新"
},
最佳实践 = "在 Agent 实现中始终检查 cancellationToken.IsCancellationRequested"
}.Display();
9. 长时间运行任务的设计模式
对于耗时较长的任务(如数据分析、视频处理、大规模爬虫等),我们需要特殊的设计模式:
- 模式 1: 定期更新状态
await _taskManager.UpdateStatusAsync(
taskId,
TaskState.Working,
new AgentMessage { Parts = [new TextPart { Text = "进度: 50%" }] },
cancellationToken: ct);
- 模式 2: 分阶段返回产物
// 第一阶段完成
await _taskManager.ReturnArtifactAsync(taskId,
new Artifact { Parts = [new TextPart { Text = "Phase 1 完成" }] }, ct);
// 第二阶段完成
await _taskManager.ReturnArtifactAsync(taskId,
new Artifact { Parts = [new TextPart { Text = "Phase 2 完成" }] }, ct);
- 模式 3: 使用 Push Notifications(下节课)
对于超长任务,可以配置 Webhook,让 Agent 在状态变更时主动通知 Client,避免轮询。
设计建议
| 任务时长 | 推荐方案 | 说明 |
|---|---|---|
| < 5秒 | 直接等待 | 同步返回结果 |
| 5-30秒 | 轮询 GetTaskAsync | 每 1-2 秒查询一次 |
| 30秒-5分钟 | SSE 订阅 | 实时接收状态更新(下节课) |
| > 5分钟 | Push Notifications | Webhook 回调通知 |
10. SubscribeToTaskAsync
SubscribeToTaskAsync 是一个非常强大的功能,它使用 SSE 实现实时事件流。
主要用途
- 实时接收任务状态变更
- 实时获取新产物
- 避免轮询,减少网络开销
- 支持长时间运行任务的监控
基本用法
await foreach (var @event in client.SubscribeToTaskAsync(taskId))
{
if (@event.Event is TaskStatusChangedEvent statusEvent)
{
Console.WriteLine($"状态变更: {@event.Data.Status?.State}");
}
else if (@event.Event is ArtifactCreatedEvent artifactEvent)
{
Console.WriteLine("新产物生成");
}
}
与轮询的对比
| 特性 | 轮询 GetTaskAsync | SSE SubscribeToTaskAsync |
|---|---|---|
| 实时性 | 延迟 1-2 秒 | 即时(毫秒级) |
| 网络开销 | 高(频繁请求) | 低(单个连接) |
| 服务器压力 | 高 | 低 |
| 实现复杂度 | 简单 | 中等 |
| 适用场景 | 短任务 | 长任务、实时监控 |
11. 总结
核心功能:
GetTaskAsync - 查询任务状态
- 随时获取任务最新进展
- 查看历史记录和产物
- 检查任务元数据
CancelTaskAsync - 取消任务
- 用户主动取消或超时场景
- Agent 需正确处理 CancellationToken
- 取消后状态变为 Canceled
多轮交互 - InputRequired 状态
- 实现 Human-in-the-loop 工作流
- Agent 可主动请求用户反馈
- 支持复杂的审批和确认流程
Metadata - 任务元数据
- 存储自定义业务信息
- 不影响任务处理逻辑
- 用于跟踪、分类、优先级管理
ITaskStore - 任务存储
- 默认使用 InMemoryTaskStore
- 生产环境可自定义实现(数据库、Redis)
- 支持任务持久化和分布式部署
任务管理最佳实践:
graph TD
A[创建任务] --> B{任务类型?}
B -->|短任务<5s| C[直接等待结果]
B -->|中等任务5-30s| D[轮询 GetTaskAsync]
B -->|长任务>30s| E[SSE 订阅或 Webhook]
C --> F[GetTaskAsync 获取结果]
D --> G[定时查询状态]
E --> H[实时接收事件]
F --> I{需要交互?}
G --> I
H --> I
I -->|是| J[InputRequired 状态]
I -->|否| K[自动完成]
J --> L[UpdateTaskAsync 发送反馈]
L --> M[继续处理]
K --> N[Completed 状态]
M --> N
五、MAF 集成 - A2A Agent 作为工具
在复杂的 AI 系统中,不同的 Agent 可能使用不同的框架和协议。通过将 A2A Agent 封装为标准的 AIFunction,我们可以:
- 统一调用接口:让 MAF Agent 像调用本地函数一样调用远程 A2A Agent
- 能力复用:将 A2A Agent 的专业技能作为工具提供给其他 Agent
- 跨框架协作:打通 MAF、A2A、MCP 等不同 Agent 生态系统
1. 核心概念:A2A Agent 作为工具
架构设计:
flowchart TB
subgraph MAF["Microsoft Agent Framework"]
MainAgent["主 Agent<br/>(旅行规划助手)"]
Tools["Function Tools"]
end
subgraph A2A["A2A Protocol"]
CardResolver["A2ACardResolver"]
AgentCard["Agent Card"]
A2AAgent["A2A Agent"]
end
subgraph Remote["远程 A2A Server"]
RouteAgent["路线规划 Agent"]
end
MainAgent -->|"调用工具"| Tools
Tools -->|"技能 → AIFunction"| A2AAgent
CardResolver -->|"发现"| AgentCard
A2AAgent -->|"JSON-RPC"| RouteAgent
核心类说明:
| 类 | 作用 |
|---|---|
| A2ACardResolver | 发现 A2A Agent 并获取 Agent Card |
| AgentCard.GetAIAgent() | 将 Agent Card 转换为可调用的 AIAgent 实例 |
| AIFunctionFactory.Create() | 将委托方法转换为 AIFunction 工具 |
| AIAgent.RunAsync() | 执行 Agent 并获取响应 |
2、发现 A2A Agent
使用 A2ACardResolver 发现远程 A2A Agent 并获取其 Agent Card:
前置条件:确保已启动旅行规划 Agent。在终端中运行:
cd ./file-based-apps dotnet run TravelPlannerAgent.cs该 Agent 提供以下技能:
- 路线规划 - 规划最优出行路线
- 天气查询 - 查询目的地天气
- 酒店推荐 - 推荐合适的住宿
- 景点推荐 - 推荐热门景点
// A2A Agent 端点 - 旅行规划 Agent
var a2aAgentHost = "http://localhost:5081";
// 1. 创建 Agent Card 解析器
A2ACardResolver agentCardResolver = new(new Uri(a2aAgentHost));
// 2. 获取 Agent Card
AgentCard agentCard = await agentCardResolver.GetAgentCardAsync();
Console.WriteLine("已获取 A2A Agent Card");
Console.WriteLine($"名称: {agentCard.Name}");
Console.WriteLine($"描述: {agentCard.Description}");
Console.WriteLine($"URL: {agentCard.Url}");
Console.WriteLine($"版本: {agentCard.Version}");
agentCard.Display();
// 显示技能列表
if (agentCard.Skills?.Count > 0)
{
Console.WriteLine($"\n技能列表 ({agentCard.Skills.Count} 个):");
foreach (var skill in agentCard.Skills)
{
Console.WriteLine($"• {skill.Name}: {skill.Description}");
if (skill.Examples?.Count > 0)
{
Console.WriteLine($" 示例: {string.Join(", ", skill.Examples.Take(2))}");
}
}
}
else
{
Console.WriteLine("\nAgent 未声明任何技能,将使用默认技能");
}
3. 将 A2A Agent 转换为 AIAgent
在 MAF 中提供了Microsoft.Agents.AI.A2A 包,用于将 A2A Agent 集成到 MAF 环境中。其中提供了 AgentCard.GetAIAgent() 扩展方法,可以将 A2A Agent 封装为 MAF 的 AIAgent 实例:
#r "nuget: Microsoft.Agents.AI.A2A, 1.0.0-preview.251110.2"
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
// 将 A2A Agent 转换为 AIAgent 实例
AIAgent a2aAgent = agentCard.GetAIAgent();
Console.WriteLine("A2A Agent 已转换为 AIAgent 实例");
// 直接调用 A2A Agent
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("直接调用 A2A Agent:");
var response = await a2aAgent.RunAsync("你好,请自我介绍一下");
Console.WriteLine($"回答: {response.Text}");
4. 将 Agent 技能转换为 Function Tools
A2A Agent 的技能(Skills)可以转换为 MAF 的 AIFunction 工具。这样,主 Agent 就可以根据用户请求,自动选择并调用合适的技能。
- 核心逻辑
foreach (var skill in agentCard.Skills)
{
AIFunctionFactoryOptions options = new()
{
Name = skill.Name, // 技能名称
Description = skill.Description // 技能描述(供 LLM 理解用途)
};
yield return AIFunctionFactory.Create(
async (string input, CancellationToken ct) =>
await a2aAgent.RunAsync(input, cancellationToken: ct),
options
);
}
- 函数名规范化
由于 AIFunction 的名称必须符合一定规范(仅限字母、数字、下划线),我们需要对技能名称进行清理:
// 函数名规范化帮助类
static class FunctionNameSanitizer
{
private static readonly Regex InvalidNameCharsRegex = new Regex("[^0-9A-Za-z]+", RegexOptions.Compiled);
public static string Sanitize(string name)
{
return InvalidNameCharsRegex.Replace(name, "_");
}
}
Console.WriteLine("FunctionNameSanitizer 定义完成");
// 测试规范化
var testNames = new[] { "route-planning", "weather query", "AI助手" };
foreach (var name in testNames)
{
Console.WriteLine($"• '{name}' → '{FunctionNameSanitizer.Sanitize(name)}'");
}
- 创建 Function Tools,现在我们将 A2A Agent 的所有技能转换为 AIFunction 工具:
static IEnumerable<AIFunction> CreateFunctionTools(AIAgent a2aAgent, AgentCard agentCard)
{
foreach (var skill in agentCard.Skills)
{
// A2A agent skills don't have schemas describing the expected shape of their inputs and outputs.
// Schemas can be beneficial for AI models to better understand the skill's contract, generate
// the skill's input accordingly and to know what to expect in the skill's output.
// However, the A2A specification defines properties such as name, description, tags, examples,
// inputModes, and outputModes to provide context about the skill's purpose, capabilities, usage,
// and supported MIME types. These properties are added to the function tool description to help
// the model determine the appropriate shape of the skill's input and output.
AIFunctionFactoryOptions options = new()
{
Name = FunctionNameSanitizer.Sanitize(skill.Id),
Description = $$"""
{
"description": "{{skill.Description}}",
"tags": "[{{string.Join(", ", skill.Tags ?? [])}}]",
"examples": "[{{string.Join(", ", skill.Examples ?? [])}}]",
"inputModes": "[{{string.Join(", ", skill.InputModes ?? [])}}]",
"outputModes": "[{{string.Join(", ", skill.OutputModes ?? [])}}]"
}
""",
};
yield return AIFunctionFactory.Create(RunAgentAsync, options);
}
async Task<string> RunAgentAsync(string input, CancellationToken cancellationToken)
{
var response = await a2aAgent.RunAsync(input, cancellationToken: cancellationToken).ConfigureAwait(false);
return response.Text;
}
}
Console.WriteLine("CreateFunctionTools 方法定义完成");
// 生成 Function Tools
var functionTools = CreateFunctionTools(a2aAgent, agentCard).ToList();
Console.WriteLine($"已创建 {functionTools.Count} 个 Function Tool:");
foreach (var tool in functionTools)
{
Console.WriteLine($"• {tool.Name}");
Console.WriteLine($" 描述: {tool.Description?.Substring(0, Math.Min(50, tool.Description?.Length ?? 0))}...");
}
- 创建主 Agent 并集成 A2A 工具,现在我们创建一个主 Agent(旅行规划助手),并将 A2A Agent 的技能作为工具提供给它:
// 获取 Chat Client(用于主 Agent 的 LLM 能力)
var chatClient = AIClientHelper.GetDefaultChatClient();
// 创建主 Agent,并注册 A2A Agent 的技能作为工具
AIAgent mainAgent = chatClient.CreateAIAgent(
instructions: """
你是一个智能旅行规划助手。你可以利用可用的工具来帮助用户完成任务。
当用户询问时,请使用合适的工具获取信息,然后给出建议。
""",
tools: [.. functionTools]
);
Console.WriteLine("主 Agent 创建完成");
Console.WriteLine($"已注册 {functionTools.Count} 个工具");
5. 执行多 Agent 协作
现在让主 Agent 处理一个用户请求。主 Agent 会自动识别需要调用 A2A Agent 的技能:
// 用户请求 - 测试不同的技能调用
var userRequests = new[]
{
"帮我规划从北京到上海的旅行路线",
"查询一下上海的天气情况",
"推荐上海的酒店"
};
foreach (var userRequest in userRequests)
{
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"用户请求: {userRequest}");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
// 执行 Agent
Console.WriteLine("主 Agent 处理中...");
var result = await mainAgent.RunAsync(userRequest);
Console.WriteLine($"回答:\n{result.Text}");
Console.WriteLine();
}
6. 完整示例:多 A2A Agent 集成
在实际项目中,可能需要集成多个 A2A Agent。以下是一个完整的多 Agent 协作示例:
flowchart LR
User["用户"] --> MainAgent["主 Agent<br/>(协调者)"]
MainAgent --> RouteAgent["路线规划 Agent<br/>A2A"]
MainAgent --> WeatherAgent["天气查询 Agent<br/>A2A"]
MainAgent --> HotelAgent["酒店推荐 Agent<br/>A2A"]
RouteAgent --> Response["综合回答"]
WeatherAgent --> Response
HotelAgent --> Response
Response --> User
// 定义多个 A2A Agent 端点
var agentEndpoints = new[]
{
"https://route-agent.example.com/a2a",
"https://weather-agent.example.com/a2a",
"https://hotel-agent.example.com/a2a"
};
// 收集所有工具
var allTools = new List<AIFunction>();
foreach (var endpoint in agentEndpoints)
{
var resolver = new A2ACardResolver(new Uri(endpoint));
var card = await resolver.GetAgentCardAsync();
var agent = card.GetAIAgent();
allTools.AddRange(CreateFunctionTools(agent, card));
}
// 创建主 Agent
var orchestrator = chatClient.CreateAIAgent(
instructions: "你是一个旅行规划协调者,可以调用多个专业 Agent...",
tools: [.. allTools]
);
7. 最佳实践与注意事项
- 技能描述要详细,A2A Agent 的技能没有 schema 来描述输入输出的具体格式,因此需要通过 description、tags、examples 等属性提供足够的上下文,帮助 LLM 理解如何调用该技能:
new AgentSkill
{
Id = "route-planning",
Name = "Route Planning",
Description = "规划从起点到终点的最优路线,支持避开收费站、高速等选项",
Tags = ["navigation", "travel", "routing"],
Examples = ["规划从北京到上海的路线", "避开收费站从A到B"],
InputModes = ["text"],
OutputModes = ["text"]
}
- 错误处理
try
{
var response = await a2aAgent.RunAsync(input, cancellationToken: ct);
return response.Text;
}
catch (A2AException ex) when (ex.ErrorCode == A2AErrorCode.TaskFailed)
{
return $"任务执行失败: {ex.Message}";
}
catch (HttpRequestException ex)
{
return $"网络错误: {ex.Message}";
}
- 重试机制,建议为 A2A 调用添加重试逻辑:
var policy = Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
await policy.ExecuteAsync(async () =>
{
var response = await a2aAgent.RunAsync(input);
return response.Text;
});
- 性能考虑
| 场景 | 建议 |
|---|---|
| 短任务 | 直接调用 RunAsync |
| 长任务 | 使用 Task-based 模式 + Push Notifications |
| 高并发 | 考虑限流和熔断 |
| 低延迟 | 预热 Agent 连接,使用连接池 |
