Spiga

Net+AI智能体11:A2A智能体跨系统协作

2025-12-13 20:14:17

一、A2A 协议

1. 为什么需要 A2A 协议?

随着 AI 技术的发展,企业中的 AI Agent 数量不断增加。这些 Agent 可能来自不同的团队、不同的技术栈,甚至不同的组织:

flowchart LR
    subgraph 企业A
        A1[客服 Agent]
        A2[销售 Agent]
    end
    
    subgraph 企业B
        B1[物流 Agent]
        B2[库存 Agent]
    end
    
    subgraph 第三方服务
        C1[支付 Agent]
        C2[天气 Agent]
    end
    
    A1 -.->|如何通信?| B1
    A2 -.->|如何协作?| C1
    B2 -.->|如何发现?| C2
    
    style A1 fill:#e1f5ff
    style A2 fill:#e1f5ff
    style B1 fill:#ffe1e1
    style B2 fill:#ffe1e1
    style C1 fill:#e1ffe1
    style C2 fill:#e1ffe1

在没有标准协议的情况下,Agent 之间的互联面临诸多挑战:

问题 描述 影响
协议不统一 每个 Agent 使用不同的 API 格式 集成成本高,难以扩展
发现困难 无法自动发现其他 Agent 的能力 需要人工配置,耦合度高
状态不透明 无法追踪跨 Agent 任务的执行状态 调试困难,可靠性差
安全性参差 缺乏统一的认证授权机制 安全风险高

Agent-to-Agent (A2A) Protocol 提供了一套标准化的解决方案,核心价值:

  • 统一发现机制:通过 Agent Card 声明和发现能力
  • 标准化通信:基于 JSON-RPC 2.0 的统一消息格式
  • 任务生命周期:完整的任务状态管理和追踪
  • 流式支持:SSE 实时事件推送
  • 安全机制:内置认证和授权方案

2. 什么是 A2A 协议?

Agent-to-Agent (A2A) Protocol 是由 Google 发起并开源的一个开放协议,旨在实现 AI Agent 之间的标准化通信与协作。

A2A 是一个开放协议,用于实现 AI Agent 之间的无缝通信与协作。它提供标准化的 Agent 发现、消息交换和任务管理机制,让 AI Agent 能够跨平台、跨服务安全地互联互通。

特性 描述
开放标准 开源协议,任何人都可以实现和使用
语言无关 基于 HTTP + JSON-RPC,支持任何编程语言
双向通信 Agent 之间是对等关系,可以互相发起请求
状态管理 支持长时间运行的任务和状态追踪
流式传输 通过 SSE 支持实时事件推送
安全内置 支持多种认证和授权方案

A2A 协议目前处于活跃开发阶段:

3. A2A vs MCP:两种协议的对比

flowchart TB
    subgraph MCP协议 ["MCP 协议 - 工具扩展"]
        direction LR
        LLM[LLM/Agent]
        Tools[工具]
        Resources[资源]
        Prompts[提示]
        LLM --> Tools
        LLM --> Resources
        LLM --> Prompts
    end
    
    subgraph A2A协议 ["A2A 协议 - Agent 互联"]
        direction LR
        Agent1[Agent A]
        Agent2[Agent B]
        Agent3[Agent C]
        Agent1 <--> Agent2
        Agent2 <--> Agent3
        Agent1 <--> Agent3
    end
    
    style MCP协议 fill:#e1f5ff
    style A2A协议 fill:#ffe1e1
维度 MCP Protocol A2A Protocol
核心定位 LLM ↔️ Tools/Resources Agent ↔️ Agent
通信对象 被动工具(等待调用) 自主 Agent(主动决策)
发现机制 Initialize 握手协商 Agent Card 能力声明
状态管理 无状态(请求/响应) 有状态(Task 生命周期)
交互模式 Client-Server(单向调用) 双向对等(互相请求)
传输协议 JSON-RPC over stdio/SSE JSON-RPC over HTTP/SSE
典型场景 扩展 AI 能力(搜索、计算) 跨 Agent 协作(任务分发)

MCP 像是"工具箱":

  • Agent 是工人,MCP Server 是工具箱
  • 工人使用工具完成任务
  • 工具是被动的,等待工人使用

A2A 像是"同事协作":

  • 多个 Agent 是不同的同事
  • 同事之间可以互相委托任务
  • 每个同事都是自主的,可以主动决策

在实际企业场景中,A2A 和 MCP 通常协同使用:

flowchart TB
    User[用户] --> AgentA[Agent A]
    
    subgraph AgentA内部
        AgentA --> MCP1[MCP: 数据库工具]
        AgentA --> MCP2[MCP: 搜索工具]
    end
    
    AgentA <-->|A2A 协议| AgentB[Agent B]
    AgentA <-->|A2A 协议| AgentC[Agent C]
    
    subgraph AgentB内部
        AgentB --> MCP3[MCP: 支付工具]
    end
    
    subgraph AgentC内部
        AgentC --> MCP4[MCP: 邮件工具]
    end
    
    style User fill:#fff4e1
    style AgentA fill:#e1f5ff
    style AgentB fill:#e1ffe1
    style AgentC fill:#ffe1f5
  • MCP:每个 Agent 内部使用 MCP 扩展自己的能力
  • A2A:Agent 之间使用 A2A 进行跨 Agent 协作

4. A2A 核心架构

A2A 协议定义了三个核心组件:ClientServerTaskManager

  • 整体架构
flowchart TB
    subgraph 客户端
        Client[A2AClient]
        Resolver[A2ACardResolver]
    end
    
    subgraph 服务端
        Server[A2A Server<br/>ASP.NET Core]
        TM[TaskManager]
        Agent[Your Agent<br/>业务逻辑]
        
        Server --> TM
        TM --> Agent
    end
    
    Resolver -->|1. 获取 Agent Card| Server
    Client -->|2. 发送消息/任务| Server
    Server -->|3. 返回响应/事件| Client
    
    style Client fill:#e1f5ff
    style Resolver fill:#e1f5ff
    style Server fill:#ffe1e1
    style TM fill:#fff4e1
    style Agent fill:#e1ffe1
  • 核心组件说明
组件 职责 .NET 类
A2AClient 发送请求、接收响应、订阅事件 A2AClient
A2ACardResolver 发现 Agent、获取 Agent Card A2ACardResolver
TaskManager 管理任务生命周期、分发回调 TaskManager
Agent 处理实际业务逻辑 自定义实现

核心概念:

  • Agent Card(身份名片),Agent Card 是 Agent 的"名片",声明 Agent 的基本信息和能力:
┌───────────────────────────────────┐
│          Agent Card               │
├───────────────────────────────────┤
│ Name: "Research Agent"            │
│ Description: "研究分析助手"         │
│ URL: "https://agent.example.com"  │
│ Version: "1.0.0"                  │
├───────────────────────────────────┤
│ Capabilities:                     │
│  ・Streaming: ✅                  │
│  ・PushNotifications: ✅          │
├───────────────────────────────────┤
│ Skills:                           │
│  ・web-research: 网页研究           │
│  ・data-analysis: 数据分析          │
└───────────────────────────────────┘
  • Message(消息),Message 是 Agent 之间交换的基本数据单元:
┌───────────────────────────────────┐
│          Message                  │
├───────────────────────────────────┤
│ MessageId: "msg-001"              │
│ ContextId: "ctx-001"              │
│ Role: User / Agent                │
├───────────────────────────────────┤
│ Parts:                            │
│  ・TextPart: "你好"                │
│  ・FilePart: attachment.pdf       │
│  ・DataPart: { json: data }       │
└───────────────────────────────────┘
  • Task(任务),Task 代表一个可追踪的长时间运行操作:
┌───────────────────────────────────┐
│            Task                   │
├───────────────────────────────────┤
│ Id: "task-001"                    │
│ Status:                           │
│  ・State: Working / Completed     │
│  ・Timestamp: 2024-01-01T...      │
├───────────────────────────────────┤
│ History: [Message, Message...]    │
│ Artifacts: [文件, 数据...]         │
└───────────────────────────────────┘

5. A2A 通信模式

A2A 协议支持两种主要的通信模式:Message-based(消息模式)Task-based(任务模式)

  • Message-based 模式
    • 特点:简单直接,适合快速请求/响应场景。
    • 适用场景:
      • 简单问答
      • 快速查询
      • 无需追踪状态
sequenceDiagram
    participant C as A2AClient
    participant S as A2A Server
    participant A as Agent
    
    C->>S: message/send
    S->>A: OnMessageReceived
    A->>S: AgentMessage
    S->>C: AgentMessage (响应)
    
    Note over C,A: 同步请求/响应,无状态
  • Task-based 模式

    • 特点:支持长时间运行、状态追踪、可取消。

      sequenceDiagram
          participant C as A2AClient
          participant S as A2A Server
          participant A as Agent
      
          C->>S: message/send (createTask)
          S->>A: OnTaskCreated
          S->>C: AgentTask (submitted)
      
          loop 任务执行中
              A->>S: 更新状态
              S-->>C: SSE: TaskStatusUpdate
          end
      
          A->>S: 完成任务
          S-->>C: SSE: TaskStatusUpdate (completed)
      
          Note over C,A: 异步执行,状态可追踪
      
    • Task 状态流转:

stateDiagram-v2
    [*] --> Submitted: 创建任务
    Submitted --> Working: 开始处理
    Working --> Working: 更新进度
    Working --> InputRequired: 需要用户输入
    InputRequired --> Working: 收到输入
    Working --> Completed: 完成
    Working --> Failed: 失败
    Working --> Canceled: 取消
    Completed --> [*]
    Failed --> [*]
    Canceled --> [*]

6. 流式通信(Streaming)

无论是 Message-based 还是 Task-based,A2A 都支持 SSE 流式通信:

方法 用途
SendMessageAsync 非流式消息发送
SendMessageStreamingAsync 流式消息发送(SSE)
SubscribeToTaskAsync 订阅任务状态更新(SSE)

A2A 事件类型:

事件类型 描述
TaskStatusUpdateEvent 任务状态变更通知
TaskArtifactUpdateEvent 任务产物更新通知

7. JSON-RPC 2.0 通信协议

A2A 基于 JSON-RPC 2.0 作为底层通信协议。了解 JSON-RPC 有助于我们理解 A2A 的通信机制。

JSON-RPC 是一种轻量级的远程过程调用协议:

  • 使用 JSON 作为数据格式
  • 支持请求/响应和通知模式
  • 简单、易于实现

请求格式

{
    "jsonrpc": "2.0",
    "method": "message/send",
    "params": {
        "message": {
            "role": "user",
            "parts": [{ "kind": "text", "text": "你好" }]
        }
    },
    "id": "req-001"
}

成功响应:

{
    "jsonrpc": "2.0",
    "result": {
        "role": "agent",
        "messageId": "msg-001",
        "parts": [{ "kind": "text", "text": "你好!有什么可以帮您的?" }]
    },
    "id": "req-001"
}

错误响应:

{
    "jsonrpc": "2.0",
    "error": {
        "code": -32600,
        "message": "Invalid Request"
    },
    "id": "req-001"
}

A2A 支持的方法

方法 描述
message/send 发送消息(Message-based)
message/stream 流式发送消息
task/get 获取任务信息
task/cancel 取消任务
task/subscribe 订阅任务事件
pushNotificationConfig/set 设置推送通知
pushNotificationConfig/get 获取推送通知配置

二、Agent Card

在 A2A (Agent-to-Agent) 协议中,Agent Card 是 Agent 的"身份证"和"说明书"。它是一个标准化的 JSON 结构,用于向外界声明 Agent 的身份信息、具备的能力(Capabilities)、支持的技能(Skills)以及输入输出模态(Modalities)。

通过 Agent Card,其他 Agent 或客户端可以:

  1. 识别身份:了解 Agent 的名称、描述、版本和维护者信息。
  2. 发现能力:判断 Agent 是否支持流式传输、推送通知等高级特性。
  3. 了解技能:获取 Agent 提供的具体功能列表。
  4. 建立连接:获取连接所需的 URL 和认证方式。

1. Agent Card 核心结构

AgentCard 类是 A2A 协议的核心数据模型。让我们先通过代码直观地看一个完整的 Agent Card 定义。

var agentCard = new AgentCard
{
    // 1. 基本身份信息
    Name = "Research Agent",
    Description = "专业的研究分析助手,擅长搜索网络并生成深度报告。",
    Url = "https://agent.example.com/research",
    Version = "1.0.0",
    ProtocolVersion = "0.3.0", // 遵循的 A2A 协议版本    

    // 2. 能力声明 (Capabilities)
    Capabilities = new AgentCapabilities
    {
        Streaming = true,           // 支持 SSE 流式传输
        PushNotifications = true,   // 支持 Webhook 推送通知
        StateTransitionHistory = true // 支持查询任务状态变更历史
    },
    
    // 3. 输入输出模态 (Modalities)
    DefaultInputModes = ["text", "image"], // 支持文本和图片输入
    DefaultOutputModes = ["text", "file"], // 输出文本和文件
    
    // 4. 技能列表 (Skills)
    Skills = [
        new AgentSkill
        {
            Id = "web-research",
            Name = "网页研究",
            Description = "搜索和分析网页内容"
        },
        new AgentSkill
        {
            Id = "report-generation",
            Name = "报告生成",
            Description = "基于研究结果生成 PDF 报告"
        }
    ],
    
    // 5. 认证配置 (SecuritySchemes)
    SecuritySchemes = new Dictionary<string, SecurityScheme>
    {
        ["apiKey"] = new ApiKeySecurityScheme("X-API-Key", "header")
    },
    
    Security = new List<Dictionary<string, string[]>>
    {
        new()
        {
            ["apiKey"] = []
        }
    },
};

核心属性详解:

  1. 基本信息 (Name, Description, Url, Version):

    • 用于人类可读的展示和唯一标识。
    • Url 是 Agent 的根地址,客户端将基于此地址构建 API 请求(如 /message/send)。
  2. 能力声明 (Capabilities):

    • Streaming: 是否支持 Server-Sent Events (SSE) 流式响应。如果为 true,客户端可以使用 SendMessageStreamingAsync。
    • PushNotifications: 是否支持通过 Webhook 异步推送任务状态和产物。这对于耗时任务非常重要。
    • StateTransitionHistory: 是否保存并允许查询任务的状态流转历史。
  3. 模态 (DefaultInputModes, DefaultOutputModes):

    • 声明 Agent 能够理解和生成的媒体类型(如 text, image, audio)。这有助于编排器(Orchestrator)选择合适的 Agent。
  4. 技能 (Skills):

    • 列出 Agent 具备的具体功能。这类似于 Semantic Kernel 的 Plugins 或 OpenAI 的 Functions,但这里仅作为元数据声明,用于发现和路由。
  5. 安全方案 (SecuritySchemes):

    • 遵循 OpenAPI 规范的安全定义。告诉客户端如何进行身份验证(例如 API Key 的名称和位置)。

2. Agent 发现 (Discovery)

在 A2A 架构中,客户端通常不知道 Agent 的具体实现细节,而是通过 Agent 发现机制来获取 Agent Card。

A2A SDK 提供了 A2ACardResolver 类,用于从指定的 URL 获取 Agent Card。

// 模拟一个运行中的 Agent Server 地址 (这里我们假设它运行在 localhost:5048)
// 在实际场景中,这通常是一个真实的 HTTP 地址
var agentUrl = new Uri("http://localhost:5048/echo");

// 创建解析器
var cardResolver = new A2ACardResolver(agentUrl);

// 注意:由于我们没有实际启动 Server,这里仅演示代码用法。
Console.WriteLine($"解析器已创建,目标地址: {agentUrl}");
// 模拟获取到的 Agent Card (为了演示,我们直接使用上面定义的对象)
var discoveredCard = agentCard; 

// 检查 Agent 能力的辅助方法
void CheckAgentCapabilities(AgentCard card)
{
    Console.WriteLine($"分析 Agent: {card.Name}");
    if (card.Capabilities.Streaming)
    {
        Console.WriteLine("  支持流式通信 (Streaming)");
    }
    else
    {
        Console.WriteLine("  不支持流式通信");
    }
    
    if (card.Capabilities.PushNotifications)
    {
        Console.WriteLine("  支持推送通知 (Push Notifications)");
    }
    
    Console.WriteLine($"  包含技能: {string.Join(", ", card.Skills.Select(s => s.Name))}");
}
CheckAgentCapabilities(discoveredCard);

3. 动态生成 Agent Card

在服务端开发中,我们通常不需要手动 new AgentCard,而是通过实现 OnAgentCardQuery 委托来动态返回。这允许我们根据配置或环境动态调整 Agent 的声明。

// 定义一个获取 Agent Card 的委托
Task<AgentCard> GetAgentCardAsync(string agentUrl, CancellationToken cancellationToken)
{
    // 可以在这里读取配置文件或数据库
    var version = "1.0.0"; 
    var card = new AgentCard
    {
        Name = "Dynamic Agent",
        Description = $"动态生成的 Agent (v{version})",
        Url = agentUrl, // 使用传入的 agentUrl,确保与请求上下文一致
        Version = version,
        Capabilities = new AgentCapabilities { Streaming = true }
    };    
    return Task.FromResult(card);
}

// 模拟调用
var card = await GetAgentCardAsync("https://myservice.com/agent", CancellationToken.None);
card.Display();

三、A2A Server 快速开始

1. 定义 Echo Agent

首先,我们需要定义 Agent 的逻辑。在 A2A 中,Agent 通过实现 ITaskManager 的回调函数来处理请求。

对于简单的消息处理,我们主要关注两个回调:

  1. OnAgentCardQuery:当客户端查询 Agent 信息时调用。
  2. OnMessageReceived:当客户端发送消息时调用。
public class EchoAgent
{
    public void Attach(ITaskManager taskManager)
    {
        // 处理 Agent Card 查询
        taskManager.OnAgentCardQuery = GetAgentCardAsync;        

        // 处理消息接收(Message-based 模式)
        taskManager.OnMessageReceived = ProcessMessageAsync;
    }
    
    private Task<A2AResponse> ProcessMessageAsync(MessageSendParams messageSendParams, CancellationToken cancellationToken)
    {
        var messageText = messageSendParams.Message.Parts
            .OfType<TextPart>().First().Text;
    
        var response = new AgentMessage
        {
            Role = MessageRole.Agent,
            MessageId = Guid.NewGuid().ToString(),
            ContextId = messageSendParams.Message.ContextId,
            Parts = [new TextPart { Text = $"Echo: {messageText}" }]
        };
    
        return Task.FromResult<A2AResponse>(response);
    }
    
    private Task<AgentCard> GetAgentCardAsync(string agentUrl, CancellationToken cancellationToken)
    {
        return Task.FromResult(new AgentCard
        {
            Name = "Echo Agent",
            Description = "回显收到的所有消息",
            Url = agentUrl,
            Version = "1.0.0",
            DefaultInputModes = ["text"],
            DefaultOutputModes = ["text"],
            Capabilities = new AgentCapabilities { Streaming = true }
        });
    }
}
Console.WriteLine("EchoAgent 类定义完成");

2. 创建 A2A Server

在 ASP.NET Core 中,我们使用 TaskManager 来管理 Agent,并使用 MapA2A 扩展方法将其映射到 HTTP 端点。

我们来完成一个完整示例:

  • 核心代码 (Program.cs)
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();

// 1. 创建 TaskManager
var taskManager = new TaskManager();

// 2. 创建并挂载 Agent
var agent = new EchoAgent();
agent.Attach(taskManager);

// 3. 映射 A2A 端点
app.MapA2A(taskManager, "/echo");                    // JSON-RPC 端点
app.MapWellKnownAgentCard(taskManager, "/echo");     // Agent Card 端点
app.MapHttpA2A(taskManager, "/echo");                // HTTP 端点(可选)

app.Run();
  • 运行示例项目

请在终端中运行以下命令来启动 Echo Agent 服务器:

dotnet run --agent echo --urls https://localhost:5048

A2A 端点将位于 /echo 路径下。可以直接通过浏览器访问 https://localhost:5048/echo/agent-card.json 来查看 Agent Card 信息。

3. 客户端访问

可以使用任何支持 A2A 协议的客户端来与 Echo Agent 交互。发送消息时,Agent 会将收到的消息原样返回。

  • 获取 Agent Card
// Create agent card resolver
A2ACardResolver agentCardResolver = new(new Uri("https://localhost:5048/echo"));

// Get agent card
AgentCard agentCard = await agentCardResolver.GetAgentCardAsync();
agentCard.Display();
  • 使用 A2AClient 发送消息
A2AClient agentClient = new(new Uri(agentCard.Url));
// Create a message to send to the agent
AgentMessage userMessage = new()
{
    Role = MessageRole.User,
    MessageId = Guid.NewGuid().ToString(),
    Parts = [
        new TextPart
        {
            Text = "Hello from the message-based communication sample! Please echo this message."
        }
    ]
};

var sendParams = new MessageSendParams { Message = userMessage };
// Send the message and get the response
AgentMessage agentResponse = (AgentMessage)await agentClient.SendMessageAsync(sendParams);

Console.WriteLine("Received response from agent:");
agentResponse.Display();
  • 发送流式消息 (SSE),使用 SendMessageStreamingAsync 方法可以获取一个异步枚举器,逐个接收事件。
using System.Net.ServerSentEvents;

await foreach (SseItem<A2AEvent> sseItem in agentClient.SendMessageStreamingAsync(new MessageSendParams { Message = userMessage }))
{
   // sseItem.Data 是 A2AEvent 基类
   // 常见的事件类型包括:
   // - TaskStatusUpdateEvent: 任务状态更新
   // - TaskArtifactUpdateEvent: 任务产物更新
   // - AgentMessage: 直接返回的消息(在简单场景下)    
   Console.WriteLine($"收到事件类型: {sseItem.EventType}");    
   if (sseItem.Data is AgentMessage msg)
   {
        var text = msg.Parts.OfType<TextPart>().FirstOrDefault()?.Text;
        Console.WriteLine($"消息内容: {text}");
   }
}

四、Task 任务管理

前面主要使用了 Message-based(基于消息)的通信模式,这种模式类似于即时聊天,适合简单的问答场景。

然而,在复杂的 Agent 协作场景中,我们往往需要处理更复杂的工作流,例如:

  • 需要长时间运行的任务
  • 需要多轮交互和确认的任务
  • 需要追踪任务进度和状态的任务

这时,我们就需要使用 Task-based(基于任务)的通信模式。

1. Task 任务管理基础

核心概念:AgentTask 与 TaskState,在 A2A 协议中,Task 是一个核心概念,它封装了任务的生命周期、状态、历史记录和产物。

  • AgentTask 模型,一个 AgentTask 对象主要包含以下属性:

    • Id: 任务的唯一标识符

    • Status: 任务的当前状态(包含 State 枚举和进度信息)

    • History: 任务的对话历史(AgentMessage 列表)

    • Artifacts: 任务生成的产物(如文件、报告等)

  • TaskState 状态流转,任务状态 (TaskState) 描述了任务当前的生命周期阶段:

    • Submitted: 任务已提交
    • Working: 正在处理中
    • InputRequired: 需要用户或调用方提供更多输入
    • Completed: 任务成功完成
    • Failed: 任务失败
    • Canceled: 任务被取消
flowchart LR
    Submitted --> Working
    Working --> InputRequired
    InputRequired --> Working
    Working --> Completed
    Working --> Failed
    Working --> Canceled

2. 定义 Task-based Agent

要创建一个支持任务管理的 Agent,我们需要在 Attach 方法中订阅 OnTaskCreated 和 OnTaskUpdated 事件,而不是 OnMessageReceived。

下面我们定义一个 EchoAgentWithTasks,它会模拟一个耗时任务,并返回一个产物。

public class EchoAgentWithTasks
{
    private ITaskManager? _taskManager;

    public void Attach(ITaskManager taskManager)
    {
        _taskManager = taskManager;
        
        // 1. 订阅任务创建事件
        taskManager.OnTaskCreated = ProcessTaskAsync;
        
        // 2. 订阅任务更新事件(用于多轮对话,本例暂不涉及)
        taskManager.OnTaskUpdated = ProcessTaskAsync;
        
        // 3. 提供 Agent Card
        taskManager.OnAgentCardQuery = (url, ct) => Task.FromResult(new AgentCard
        {
            Name = "Task Echo Agent",
            Description = "基于任务的回显 Agent",
            Url = url,
            Version = "1.0.0",
            Capabilities = new AgentCapabilities { Streaming = true },
            DefaultInputModes = ["text"],
            DefaultOutputModes = ["text"]
        });
    }
    
    private async Task ProcessTaskAsync(AgentTask task, CancellationToken ct)
    {
        // 获取最新的一条消息
        var lastMessage = task.History!.Last();
        var messageText = lastMessage.Parts.OfType<TextPart>().First().Text;
    
        Console.WriteLine($"[Server] 收到任务 {task.Id} 消息: {messageText}");
    
        // 模拟处理时间 (1秒)
        await Task.Delay(1000, ct);
    
        // 1. 返回产物 (Artifact)
        // 产物通常是任务的最终结果,如生成的文件、报告等
        await _taskManager!.ReturnArtifactAsync(task.Id, new Artifact
        {
            Parts = [new TextPart { Text = $"Echo Result: {messageText}" }]
        }, ct);
    
        // 2. 更新任务状态为 Completed
        // final: true 表示这是最终状态,任务结束
        await _taskManager!.UpdateStatusAsync(
            task.Id,
            status: TaskState.Completed,
            final: true,
            cancellationToken: ct);
            
        Console.WriteLine($"[Server] 任务 {task.Id} 已完成");
    }
}
  • 运行示例项目,请在终端中运行以下命令来启动 Echo Tasks Agent 服务器:
dotnet run --agent echotasks --urls https://localhost:5048

A2A 端点将位于 /echotasks 路径下。可以直接通过浏览器访问 https://localhost:5048/.well-known/agent-card.json 来查看 Agent Card 信息。

  • 客户端调用与任务交互,现在使用 A2AClient 来创建一个任务,并查看返回的结果。
var agentEndpoint = "https://localhost:5048/echotasks";
var agentCard = await A2AHelper.GetAgentCard(agentEndpoint);
Console.WriteLine("已获取 Agent Card 信息:");
agentCard.Display();
var agentUrl = new Uri(agentCard.Url);
var client = new A2AClient(agentUrl);

Console.WriteLine("发送任务请求...");

// 发送消息 (这将触发 OnTaskCreated)
//  Create a new task by sending the message to the agent
var response = await client.SendMessageAsync(new MessageSendParams
{
    Message = new AgentMessage
    {
        Role = MessageRole.User,
        Parts = [new TextPart { Text = "Hello Task World!" }]
    }
});

// 检查响应类型
if (response is AgentTask task)
{
    Console.WriteLine($"收到任务响应,ID: {task.Id}");    

    // 显示任务详情
    new {
        TaskId = task.Id,
        Status = task.Status.State,
        ArtifactsCount = task.Artifacts?.Count ?? 0,
        FirstArtifact = task.Artifacts?.FirstOrDefault()?.Parts.OfType<TextPart>().FirstOrDefault()?.Text
    }.Display();
}
else
{
    Console.WriteLine("收到了非 Task 类型的响应");
    response.Display();
}

从上面的输出中,我们可以看到:

  • SendMessageAsync 返回了一个 AgentTask 对象。
  • 任务状态为 Completed(因为我们在 Server 端等待了处理完成)。
  • Artifacts 中包含了我们返回的 "Echo Result"。

这就是 Task-based 模式的基本流程:提交任务 -> 处理 -> 返回任务对象(含状态和产物)

3. Task 任务管理核心概念

  • 任务状态查询与控制,除了创建和更新任务,Client 端还需要能够查询任务的当前状态,或者在必要时取消任务。A2A 提供了以下 API:

    • GetTaskAsync: 查询任务当前状态

    • CancelTaskAsync: 取消正在进行的任务

    • SubscribeToTaskAsync: 订阅任务事件

  • 多轮交互 (Human-in-the-loop),很多复杂的任务不能一次性完成,可能需要 Agent 在中间步骤请求用户的确认或补充信息。A2A 协议通过 InputRequired 状态支持这种交互模式。

  • 任务元数据与历史管理

    • Metadata: 任务的自定义元数据,可以存储额外的业务信息

    • History: 完整的消息历史记录

  • 任务存储 (Task Store),为了支持长时间运行的任务和历史记录查询,TaskManager 需要持久化任务数据。ITaskStore 接口定义了存储规范,默认提供了 InMemoryTaskStore。

4. ITaskStore 与 InMemoryTaskStore

TaskManager 依赖 ITaskStore 来保存任务数据。默认情况下,它使用 InMemoryTaskStore,这意味着重启后数据会丢失。

  • ITaskStore 接口,定义了任务的 CRUD 操作:

    • GetTaskAsync - 获取任务

    • SetTaskAsync - 保存任务

    • UpdateStatusAsync - 更新状态

    • AddArtifactAsync - 添加产物

    • AddHistoryAsync - 添加历史记录

  • InMemoryTaskStore,SDK 默认提供的内存存储实现:

    • 无需额外配置,开箱即用

    • 性能高,适合开发和测试

    • 数据不持久化,重启后丢失

    • 不支持分布式部署

在生产环境中,通常会实现自定义 ITaskStore(例如基于 SQL 数据库、Redis、Cosmos DB 等)。

// 创建一个显式的 InMemoryTaskStore 实例(通常不需要手动创建,TaskManager 默认会使用它)
var taskStore = new InMemoryTaskStore();

// 创建 TaskManager 时注入 TaskStore
var taskManager = new TaskManager(taskStore:taskStore); // 使用命名参数避免歧义

5. 构建多轮交互 Agent (Researcher Agent)

我们将构建一个 ResearcherAgent,它模拟一个研究任务。这个任务分为三个阶段:

  1. Planning: 制定研究计划
  2. WaitingForFeedback: 等待用户确认计划 (InputRequired)
  3. Researching: 执行研究并完成

这个示例将展示如何使用 InputRequired 状态来实现 Human-in-the-loop 工作流。

public class ResearcherAgent
{
    private ITaskManager? _taskManager;
    private readonly Dictionary<string, AgentState> _agentStates = [];
    public static readonly ActivitySource ActivitySource = new("A2A.ResearcherAgent", "1.0.0");

    private enum AgentState
    {
        Planning,
        WaitingForFeedbackOnPlan,
        Researching
    }
    
    public void Attach(ITaskManager taskManager)
    {
        _taskManager = taskManager;
        _taskManager.OnTaskCreated = async (task, cancellationToken) =>
        {
            // Initialize the agent state for the task
            _agentStates[task.Id] = AgentState.Planning;
            // Ignore other content in the task, just assume it is a text message.
            var message = ((TextPart?)task.History?.Last()?.Parts?.FirstOrDefault())?.Text ?? string.Empty;
            await InvokeAsync(task.Id, message, cancellationToken);
        };
        _taskManager.OnTaskUpdated = async (task, cancellationToken) =>
        {
            // Note that the updated callback is helpful to know not to initialize the agent state again.
            var message = ((TextPart?)task.History?.Last()?.Parts?.FirstOrDefault())?.Text ?? string.Empty;
            await InvokeAsync(task.Id, message, cancellationToken);
        };
        _taskManager.OnAgentCardQuery = GetAgentCardAsync;
    }
    
    // This is the main entry point for the agent. It is called when a task is created or updated.
    // It probably should have a cancellation token to enable the process to be cancelled.
    public async Task InvokeAsync(string taskId, string message, CancellationToken cancellationToken)
    {
        if (_taskManager == null)
        {
            throw new InvalidOperationException("TaskManager is not attached.");
        }
    
        using var activity = ActivitySource.StartActivity("Invoke", ActivityKind.Server);
        activity?.SetTag("task.id", taskId);
        activity?.SetTag("message", message);
        activity?.SetTag("state", _agentStates[taskId].ToString());
    
        switch (_agentStates[taskId])
        {
            case AgentState.Planning:
                await DoPlanningAsync(taskId, message, cancellationToken);
                await _taskManager.UpdateStatusAsync(taskId, TaskState.InputRequired, new AgentMessage()
                {
                    Parts = [new TextPart() { Text = "When ready say go ahead" }],
                },
                cancellationToken: cancellationToken);
                break;
            case AgentState.WaitingForFeedbackOnPlan:
                if (message == "go ahead")  // Dumb check for now to avoid using an LLM
                {
                    await DoResearchAsync(taskId, message, cancellationToken);
                }
                else
                {
                    // Take the message and redo planning
                    await DoPlanningAsync(taskId, message, cancellationToken);
                    await _taskManager.UpdateStatusAsync(taskId, TaskState.InputRequired, new AgentMessage()
                    {
                        Parts = [new TextPart() { Text = "When ready say go ahead" }],
                    },
                    cancellationToken: cancellationToken);
                }
                break;
            case AgentState.Researching:
                await DoResearchAsync(taskId, message, cancellationToken);
                break;
        }
    }
    
    private async Task DoResearchAsync(string taskId, string message, CancellationToken cancellationToken)
    {
        if (_taskManager == null)
        {
            throw new InvalidOperationException("TaskManager is not attached.");
        }
    
        using var activity = ActivitySource.StartActivity("DoResearch", ActivityKind.Server);
        activity?.SetTag("task.id", taskId);
        activity?.SetTag("message", message);
    
        _agentStates[taskId] = AgentState.Researching;
        await _taskManager.UpdateStatusAsync(taskId, TaskState.Working, cancellationToken: cancellationToken);
    
        await _taskManager.ReturnArtifactAsync(
            taskId,
            new Artifact()
            {
                Parts = [new TextPart() { Text = $"{message} received." }],
            },
            cancellationToken);
    
        await _taskManager.UpdateStatusAsync(taskId, TaskState.Completed, new AgentMessage()
        {
            Parts = [new TextPart() { Text = "Task completed successfully" }],
        },
        cancellationToken: cancellationToken);
    }
    private async Task DoPlanningAsync(string taskId, string message, CancellationToken cancellationToken)
    {
        if (_taskManager == null)
        {
            throw new InvalidOperationException("TaskManager is not attached.");
        }
    
        using var activity = ActivitySource.StartActivity("DoPlanning", ActivityKind.Server);
        activity?.SetTag("task.id", taskId);
        activity?.SetTag("message", message);
    
        // Task should be in status Submitted
        // Simulate being in a queue for a while
        await Task.Delay(1000, cancellationToken);
    
        // Simulate processing the task
        await _taskManager.UpdateStatusAsync(taskId, TaskState.Working, cancellationToken: cancellationToken);
    
        await _taskManager.ReturnArtifactAsync(
            taskId,
            new Artifact()
            {
                Parts = [new TextPart() { Text = $"{message} received." }],
            },
            cancellationToken);
    
        await _taskManager.UpdateStatusAsync(taskId, TaskState.InputRequired, new AgentMessage()
        {
            Parts = [new TextPart() { Text = "When ready say go ahead" }],
        },
        cancellationToken: cancellationToken);
        _agentStates[taskId] = AgentState.WaitingForFeedbackOnPlan;
    }
    
    private Task<AgentCard> GetAgentCardAsync(string agentUrl, CancellationToken cancellationToken)
    {
        if (cancellationToken.IsCancellationRequested)
        {
            return Task.FromCanceled<AgentCard>(cancellationToken);
        }
    
        var capabilities = new AgentCapabilities()
        {
            Streaming = true,
            PushNotifications = false,
        };
    
        return Task.FromResult(new AgentCard()
        {
            Name = "Researcher Agent",
            Description = "Agent which conducts research.",
            Url = agentUrl,
            Version = "1.0.0",
            DefaultInputModes = ["text"],
            DefaultOutputModes = ["text"],
            Capabilities = capabilities,
            Skills = [],
        });
    }
}
  • 启动 Server 并运行交互流程

在终端中运行以下命令来启动 Echo Tasks Agent 服务器:

cd samples/AgentServer
dotnet run --agent researcher --urls https://localhost:5048

A2A 端点将位于 /researcher 路径下。可以直接通过浏览器访问 https://localhost:5048/.well-known/agent-card.json 来查看 Agent Card 信息。

var researcherEndpoint = "https://localhost:5048/researcher";
var researcherCard = await A2AHelper.GetAgentCard(researcherEndpoint);
Console.WriteLine("已获取 Agent Card 信息:");
researcherCard.Display();

6. 创建 Client 并开始多轮交互

现在让我们创建一个 A2A Client,与 Researcher Agent 进行多轮交互。我们将演示完整的工作流程:

  1. 创建任务(提交研究问题)
  2. Agent 进入 Planning 状态并返回计划
  3. Agent 切换到 InputRequired 状态,等待确认
  4. 用户确认计划
  5. Agent 执行研究并完成任务

这个过程展示了如何使用 InputRequired 实现 Human-in-the-loop。

// 创建 A2A Client
var client = new A2AClient(new Uri(researcherEndpoint));

// 步骤 1: 创建任务
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("步骤 1: 创建研究任务");
var taskResult = await client.SendMessageAsync(new MessageSendParams
{
    Message = new AgentMessage
    {
        Parts = [new TextPart { Text = "Research the latest AI trends in 2025" }]
    }
});

// 检查响应类型(应为 AgentTask)
if (taskResult is not AgentTask task)
{
    throw new InvalidOperationException("预期返回 AgentTask,但收到其他类型");
}

var taskId = task.Id;
Console.WriteLine($"任务已创建: {taskId}");
Console.WriteLine($"初始状态: {task.Status.State}");

task = await client.GetTaskAsync(taskId);
await Task.Delay(2000);

// 步骤 2: 查询任务状态 (使用 GetTaskAsync)
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("步骤 2: 查询任务状态 (GetTaskAsync)");

var task = await client.GetTaskAsync(taskId);
Console.WriteLine($"当前状态: {task.Status.State}");

// 查看最新的消息和产物
if (task.Status.Message?.Parts != null)
{
    var message = task.Status.Message.Parts.OfType<TextPart>().FirstOrDefault()?.Text;
    Console.WriteLine($"Agent 消息: {message}");
}

if (task.Artifacts?.Count > 0)
{
    var artifact = task.Artifacts.Last();
    var artifactText = artifact.Parts?.OfType<TextPart>().FirstOrDefault()?.Text;
    await client.SendMessageAsync(new MessageSendParams()
    {
        Message = new AgentMessage
        {
    		// 步骤 3: 响应 InputRequired(发送确认)
            ContextId = taskId,  // 关联到现有任务
            Parts = [new TextPart { Text = "go ahead" }]
        }
    });
    Console.WriteLine("已发送确认消息");
}
// 等待 Agent 完成研究
await Task.Delay(2000);

// 步骤 4: 再次查询任务状态
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("步骤 4: 查询最终任务状态");
task = await client.GetTaskAsync(taskId);
Console.WriteLine($"最终状态: {task.Status.State}");

if (task.Status.Message?.Parts != null)
{
    var message = task.Status.Message.Parts.OfType<TextPart>().FirstOrDefault()?.Text;
    Console.WriteLine($"完成消息: {message}");
}
Console.WriteLine($"\n任务历史记录数量: {task.History?.Count ?? 0}");
Console.WriteLine($"产物总数: {task.Artifacts?.Count ?? 0}");

关键点解析:上面的代码展示了完整的多轮交互流程:

  • CreateTaskAsync: 创建任务并提交初始消息
  • GetTaskAsync: 随时查询任务状态,获取最新进展
  • UpdateTaskAsync: 当任务处于 InputRequired 状态时,发送用户反馈
  • 任务历史: 所有的消息交互都保存在 task.History 中
  • 产物收集: Agent 返回的所有产物都存储在 task.Artifacts 中

这种模式非常适合需要人工审核、确认或补充信息的工作流程。

7. 任务元数据 (Metadata) 的使用

任务元数据允许你存储与任务相关的自定义信息。A2A 协议中有两种 Metadata:

  • AgentMessage.Metadata,存储在消息级别,随消息一起传递,保存在任务历史中。
  • AgentTask.Metadata ,存储在任务级别,需要在 Server 端(TaskManager)显式设置。

注意:MessageSendParams.Metadata 目前不会自动传递到 AgentTask.Metadata,这是 SDK 的一个限制。如果需要任务级别的 Metadata,需要在 Server 端处理。

让我们看看如何使用 Message Metadata:

// 创建带 Metadata 的任务
// 注意: Metadata 应该设置在 AgentMessage 上
var taskWithMetadata = await client.SendMessageAsync(new MessageSendParams
{
    Message = new AgentMessage
    {
        Parts = [new TextPart { Text = "Research quantum computing applications" }],
        // Metadata 设置在 Message 上
        Metadata = new Dictionary<string, System.Text.Json.JsonElement>
        {
            ["userId"] = System.Text.Json.JsonSerializer.SerializeToElement("user-12345"),
            ["priority"] = System.Text.Json.JsonSerializer.SerializeToElement("high"),
            ["department"] = System.Text.Json.JsonSerializer.SerializeToElement("R&D"),
            ["requestId"] = System.Text.Json.JsonSerializer.SerializeToElement(Guid.NewGuid().ToString())
        }
    }
});

// 检查响应类型
if (taskWithMetadata is not AgentTask taskMeta)
{
    throw new InvalidOperationException("预期返回 AgentTask");
}
Console.WriteLine($"任务已创建: {taskMeta.Id}");

// 查询任务历史中的消息 Metadata
if (taskMeta.History != null && taskMeta.History.Count > 0)
{
    var firstMessage = taskMeta.History[0];
    Console.WriteLine("\n消息元数据 (Message.Metadata):");
    if (firstMessage.Metadata != null)
    {
        foreach (var (key, value) in firstMessage.Metadata)
        {
            Console.WriteLine($"  {key}: {value}");
        }
    }
    else
    {
        Console.WriteLine("  (未设置)");
    }
}

// 注意: AgentTask.Metadata 与 AgentMessage.Metadata 是不同的
Console.WriteLine("\n任务元数据 (Task.Metadata):");
if (taskMeta.Metadata != null)
{
    foreach (var (key, value) in taskMeta.Metadata)
    {
        Console.WriteLine($"  {key}: {value}");
    }
}
else
{
    Console.WriteLine("  (Task 级别的 Metadata 为空,因为它需要在 Server 端显式设置)");
}

// Metadata 的典型用途
new 
{
    说明 = "Metadata 在 A2A 中的使用",
    Message_Metadata = new 
    {
        用途 = "存储消息级别的元信息",
        示例 = new[]
        {
            "消息来源标识",
            "用户上下文信息",
            "请求追踪ID"
        }
    },
    Task_Metadata = new 
    {
        用途 = "存储任务级别的元信息(需 Server 端设置)",
        示例 = new[]
        {
            "任务优先级",
            "业务分类",
            "SLA 要求"
        }
    },
    注意 = "MessageSendParams.Metadata 不会自动传递到 AgentTask.Metadata"
}.Display();

8. 任务取消 (CancelTaskAsync)

有时候任务执行时间过长,或者用户改变了主意,我们需要取消任务。A2A 提供了 CancelTaskAsync 方法。

取消任务的场景

  • 用户主动取消请求
  • 任务超时
  • 检测到重复任务
  • 资源不足需要终止

让我们模拟一个场景:

// 创建一个任务
var longRunningTask = await client.SendMessageAsync(new MessageSendParams
{
    Message = new AgentMessage
    {
        Parts = [new TextPart { Text = "Research comprehensive AI history" }]
    }
});

// 检查响应类型
if (longRunningTask is not AgentTask longTask)
{
    throw new InvalidOperationException("预期返回 AgentTask");
}

Console.WriteLine($"任务已创建: {longTask.Id}");
Console.WriteLine($"初始状态: {longTask.Status.State}");

// 等待一会儿,然后决定取消
await Task.Delay(1500);

Console.WriteLine("\n用户决定取消任务...");

// 取消任务(使用扩展方法)
var cancelResult = await client.CancelTaskAsync(longTask.Id);

Console.WriteLine($"取消请求已发送");
Console.WriteLine($"取消后状态: {cancelResult.Status.State}");

// 再次查询确认状态
var cancelledTask = await client.GetTaskAsync(longTask.Id);
Console.WriteLine($"最终确认状态: {cancelledTask.Status.State}");

// 取消任务的注意事项
new 
{
    方法 = "CancelTaskAsync(taskId)",
    状态变更 = "Working/InputRequired → Canceled",
    注意事项 = new[]
    {
        "Agent 需要正确处理 CancellationToken",
        "取消是异步操作,可能不会立即生效",
        "已完成或已失败的任务无法取消",
        "取消后的任务仍可查询,但不能更新"
    },
    最佳实践 = "在 Agent 实现中始终检查 cancellationToken.IsCancellationRequested"
}.Display();

9. 长时间运行任务的设计模式

对于耗时较长的任务(如数据分析、视频处理、大规模爬虫等),我们需要特殊的设计模式:

  • 模式 1: 定期更新状态
await _taskManager.UpdateStatusAsync(
    taskId, 
    TaskState.Working, 
    new AgentMessage { Parts = [new TextPart { Text = "进度: 50%" }] },
    cancellationToken: ct);
  • 模式 2: 分阶段返回产物
// 第一阶段完成
await _taskManager.ReturnArtifactAsync(taskId, 
    new Artifact { Parts = [new TextPart { Text = "Phase 1 完成" }] }, ct);

// 第二阶段完成
await _taskManager.ReturnArtifactAsync(taskId, 
    new Artifact { Parts = [new TextPart { Text = "Phase 2 完成" }] }, ct);
  • 模式 3: 使用 Push Notifications(下节课)

对于超长任务,可以配置 Webhook,让 Agent 在状态变更时主动通知 Client,避免轮询。

设计建议

任务时长 推荐方案 说明
< 5秒 直接等待 同步返回结果
5-30秒 轮询 GetTaskAsync 每 1-2 秒查询一次
30秒-5分钟 SSE 订阅 实时接收状态更新(下节课)
> 5分钟 Push Notifications Webhook 回调通知

10. SubscribeToTaskAsync

SubscribeToTaskAsync 是一个非常强大的功能,它使用 SSE 实现实时事件流。

主要用途

  • 实时接收任务状态变更
  • 实时获取新产物
  • 避免轮询,减少网络开销
  • 支持长时间运行任务的监控

基本用法

await foreach (var @event in client.SubscribeToTaskAsync(taskId))
{
    if (@event.Event is TaskStatusChangedEvent statusEvent)
    {
        Console.WriteLine($"状态变更: {@event.Data.Status?.State}");
    }
    else if (@event.Event is ArtifactCreatedEvent artifactEvent)
    {
        Console.WriteLine("新产物生成");
    }
}

与轮询的对比

特性 轮询 GetTaskAsync SSE SubscribeToTaskAsync
实时性 延迟 1-2 秒 即时(毫秒级)
网络开销 高(频繁请求) 低(单个连接)
服务器压力
实现复杂度 简单 中等
适用场景 短任务 长任务、实时监控

11. 总结

核心功能:

  1. GetTaskAsync - 查询任务状态

    • 随时获取任务最新进展
    • 查看历史记录和产物
    • 检查任务元数据
  2. CancelTaskAsync - 取消任务

    • 用户主动取消或超时场景
    • Agent 需正确处理 CancellationToken
    • 取消后状态变为 Canceled
  3. 多轮交互 - InputRequired 状态

    • 实现 Human-in-the-loop 工作流
    • Agent 可主动请求用户反馈
    • 支持复杂的审批和确认流程
  4. Metadata - 任务元数据

    • 存储自定义业务信息
    • 不影响任务处理逻辑
    • 用于跟踪、分类、优先级管理
  5. ITaskStore - 任务存储

    • 默认使用 InMemoryTaskStore
    • 生产环境可自定义实现(数据库、Redis)
    • 支持任务持久化和分布式部署

任务管理最佳实践:

graph TD
    A[创建任务] --> B{任务类型?}
    B -->|短任务<5s| C[直接等待结果]
    B -->|中等任务5-30s| D[轮询 GetTaskAsync]
    B -->|长任务>30s| E[SSE 订阅或 Webhook]
    
    C --> F[GetTaskAsync 获取结果]
    D --> G[定时查询状态]
    E --> H[实时接收事件]
    
    F --> I{需要交互?}
    G --> I
    H --> I
    
    I -->|是| J[InputRequired 状态]
    I -->|否| K[自动完成]
    
    J --> L[UpdateTaskAsync 发送反馈]
    L --> M[继续处理]
    
    K --> N[Completed 状态]
    M --> N

五、MAF 集成 - A2A Agent 作为工具

在复杂的 AI 系统中,不同的 Agent 可能使用不同的框架和协议。通过将 A2A Agent 封装为标准的 AIFunction,我们可以:

  • 统一调用接口:让 MAF Agent 像调用本地函数一样调用远程 A2A Agent
  • 能力复用:将 A2A Agent 的专业技能作为工具提供给其他 Agent
  • 跨框架协作:打通 MAF、A2A、MCP 等不同 Agent 生态系统

1. 核心概念:A2A Agent 作为工具

架构设计:

flowchart TB
    subgraph MAF["Microsoft Agent Framework"]
        MainAgent["主 Agent<br/>(旅行规划助手)"]
        Tools["Function Tools"]
    end
    
    subgraph A2A["A2A Protocol"]
        CardResolver["A2ACardResolver"]
        AgentCard["Agent Card"]
        A2AAgent["A2A Agent"]
    end
    
    subgraph Remote["远程 A2A Server"]
        RouteAgent["路线规划 Agent"]
    end
    
    MainAgent -->|"调用工具"| Tools
    Tools -->|"技能 → AIFunction"| A2AAgent
    CardResolver -->|"发现"| AgentCard
    A2AAgent -->|"JSON-RPC"| RouteAgent

核心类说明:

作用
A2ACardResolver 发现 A2A Agent 并获取 Agent Card
AgentCard.GetAIAgent() 将 Agent Card 转换为可调用的 AIAgent 实例
AIFunctionFactory.Create() 将委托方法转换为 AIFunction 工具
AIAgent.RunAsync() 执行 Agent 并获取响应

2、发现 A2A Agent

使用 A2ACardResolver 发现远程 A2A Agent 并获取其 Agent Card:

前置条件:确保已启动旅行规划 Agent。在终端中运行:

cd ./file-based-apps
dotnet run TravelPlannerAgent.cs

该 Agent 提供以下技能:

  • 路线规划 - 规划最优出行路线
  • 天气查询 - 查询目的地天气
  • 酒店推荐 - 推荐合适的住宿
  • 景点推荐 - 推荐热门景点
// A2A Agent 端点 - 旅行规划 Agent
var a2aAgentHost = "http://localhost:5081";

// 1. 创建 Agent Card 解析器
A2ACardResolver agentCardResolver = new(new Uri(a2aAgentHost));

// 2. 获取 Agent Card
AgentCard agentCard = await agentCardResolver.GetAgentCardAsync();

Console.WriteLine("已获取 A2A Agent Card");
Console.WriteLine($"名称: {agentCard.Name}");
Console.WriteLine($"描述: {agentCard.Description}");
Console.WriteLine($"URL: {agentCard.Url}");
Console.WriteLine($"版本: {agentCard.Version}");
agentCard.Display();

// 显示技能列表
if (agentCard.Skills?.Count > 0)
{
    Console.WriteLine($"\n技能列表 ({agentCard.Skills.Count} 个):");
    foreach (var skill in agentCard.Skills)
    {
        Console.WriteLine($"• {skill.Name}: {skill.Description}");
        if (skill.Examples?.Count > 0)
        {
            Console.WriteLine($"  示例: {string.Join(", ", skill.Examples.Take(2))}");
        }
    }
}
else
{
    Console.WriteLine("\nAgent 未声明任何技能,将使用默认技能");
}

3. 将 A2A Agent 转换为 AIAgent

在 MAF 中提供了Microsoft.Agents.AI.A2A 包,用于将 A2A Agent 集成到 MAF 环境中。其中提供了 AgentCard.GetAIAgent() 扩展方法,可以将 A2A Agent 封装为 MAF 的 AIAgent 实例:

#r "nuget: Microsoft.Agents.AI.A2A, 1.0.0-preview.251110.2"
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
// 将 A2A Agent 转换为 AIAgent 实例
AIAgent a2aAgent = agentCard.GetAIAgent();

Console.WriteLine("A2A Agent 已转换为 AIAgent 实例");

// 直接调用 A2A Agent
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("直接调用 A2A Agent:");

var response = await a2aAgent.RunAsync("你好,请自我介绍一下");
Console.WriteLine($"回答: {response.Text}");

4. 将 Agent 技能转换为 Function Tools

A2A Agent 的技能(Skills)可以转换为 MAF 的 AIFunction 工具。这样,主 Agent 就可以根据用户请求,自动选择并调用合适的技能。

  • 核心逻辑
foreach (var skill in agentCard.Skills)
{
    AIFunctionFactoryOptions options = new()
    {
        Name = skill.Name,           // 技能名称
        Description = skill.Description  // 技能描述(供 LLM 理解用途)
    };

    yield return AIFunctionFactory.Create(
        async (string input, CancellationToken ct) => 
            await a2aAgent.RunAsync(input, cancellationToken: ct),
        options
    );
}
  • 函数名规范化

由于 AIFunction 的名称必须符合一定规范(仅限字母、数字、下划线),我们需要对技能名称进行清理:

// 函数名规范化帮助类
static class FunctionNameSanitizer
{
    private static readonly Regex InvalidNameCharsRegex = new Regex("[^0-9A-Za-z]+", RegexOptions.Compiled);    

    public static string Sanitize(string name)
    {
        return InvalidNameCharsRegex.Replace(name, "_");
    }
}
Console.WriteLine("FunctionNameSanitizer 定义完成");

// 测试规范化
var testNames = new[] { "route-planning", "weather query", "AI助手" };
foreach (var name in testNames)
{
    Console.WriteLine($"• '{name}' → '{FunctionNameSanitizer.Sanitize(name)}'");
}
  • 创建 Function Tools,现在我们将 A2A Agent 的所有技能转换为 AIFunction 工具:
static IEnumerable<AIFunction> CreateFunctionTools(AIAgent a2aAgent, AgentCard agentCard)
{
    foreach (var skill in agentCard.Skills)
    {
        // A2A agent skills don't have schemas describing the expected shape of their inputs and outputs. 
        // Schemas can be beneficial for AI models to better understand the skill's contract, generate 
        // the skill's input accordingly and to know what to expect in the skill's output.
        // However, the A2A specification defines properties such as name, description, tags, examples, 
        // inputModes, and outputModes to provide context about the skill's purpose, capabilities, usage, 
        // and supported MIME types. These properties are added to the function tool description to help 
        // the model determine the appropriate shape of the skill's input and output.
        AIFunctionFactoryOptions options = new()
        {
            Name = FunctionNameSanitizer.Sanitize(skill.Id),
            Description = $$"""
            {
                "description": "{{skill.Description}}",
                "tags": "[{{string.Join(", ", skill.Tags ?? [])}}]",
                "examples": "[{{string.Join(", ", skill.Examples ?? [])}}]",
                "inputModes": "[{{string.Join(", ", skill.InputModes ?? [])}}]",
                "outputModes": "[{{string.Join(", ", skill.OutputModes ?? [])}}]"
            }
            """,
        };

        yield return AIFunctionFactory.Create(RunAgentAsync, options);
    }
    
    async Task<string> RunAgentAsync(string input, CancellationToken cancellationToken)
    {
        var response = await a2aAgent.RunAsync(input, cancellationToken: cancellationToken).ConfigureAwait(false);
    
        return response.Text;
    }
}
Console.WriteLine("CreateFunctionTools 方法定义完成");
// 生成 Function Tools
var functionTools = CreateFunctionTools(a2aAgent, agentCard).ToList();


Console.WriteLine($"已创建 {functionTools.Count} 个 Function Tool:");
foreach (var tool in functionTools)
{
    Console.WriteLine($"• {tool.Name}");
    Console.WriteLine($"  描述: {tool.Description?.Substring(0, Math.Min(50, tool.Description?.Length ?? 0))}...");
}
  • 创建主 Agent 并集成 A2A 工具,现在我们创建一个主 Agent(旅行规划助手),并将 A2A Agent 的技能作为工具提供给它:
// 获取 Chat Client(用于主 Agent 的 LLM 能力)
var chatClient = AIClientHelper.GetDefaultChatClient();

// 创建主 Agent,并注册 A2A Agent 的技能作为工具
AIAgent mainAgent = chatClient.CreateAIAgent(
    instructions: """
    你是一个智能旅行规划助手。你可以利用可用的工具来帮助用户完成任务。
    当用户询问时,请使用合适的工具获取信息,然后给出建议。
    """,
    tools: [.. functionTools]
);
Console.WriteLine("主 Agent 创建完成");
Console.WriteLine($"已注册 {functionTools.Count} 个工具");

5. 执行多 Agent 协作

现在让主 Agent 处理一个用户请求。主 Agent 会自动识别需要调用 A2A Agent 的技能:

// 用户请求 - 测试不同的技能调用
var userRequests = new[]
{
    "帮我规划从北京到上海的旅行路线",
    "查询一下上海的天气情况",
    "推荐上海的酒店"
};

foreach (var userRequest in userRequests)
{
    Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine($"用户请求: {userRequest}");
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    // 执行 Agent
    Console.WriteLine("主 Agent 处理中...");
    var result = await mainAgent.RunAsync(userRequest);    
    Console.WriteLine($"回答:\n{result.Text}");
    Console.WriteLine();

}

6. 完整示例:多 A2A Agent 集成

在实际项目中,可能需要集成多个 A2A Agent。以下是一个完整的多 Agent 协作示例:

flowchart LR
    User["用户"] --> MainAgent["主 Agent<br/>(协调者)"]
    MainAgent --> RouteAgent["路线规划 Agent<br/>A2A"]
    MainAgent --> WeatherAgent["天气查询 Agent<br/>A2A"]
    MainAgent --> HotelAgent["酒店推荐 Agent<br/>A2A"]
    
    RouteAgent --> Response["综合回答"]
    WeatherAgent --> Response
    HotelAgent --> Response
    Response --> User
// 定义多个 A2A Agent 端点
var agentEndpoints = new[]
{
    "https://route-agent.example.com/a2a",
    "https://weather-agent.example.com/a2a",
    "https://hotel-agent.example.com/a2a"
};

// 收集所有工具
var allTools = new List<AIFunction>();

foreach (var endpoint in agentEndpoints)
{
    var resolver = new A2ACardResolver(new Uri(endpoint));
    var card = await resolver.GetAgentCardAsync();
    var agent = card.GetAIAgent();
    
    allTools.AddRange(CreateFunctionTools(agent, card));
}

// 创建主 Agent
var orchestrator = chatClient.CreateAIAgent(
    instructions: "你是一个旅行规划协调者,可以调用多个专业 Agent...",
    tools: [.. allTools]
);

7. 最佳实践与注意事项

  • 技能描述要详细,A2A Agent 的技能没有 schema 来描述输入输出的具体格式,因此需要通过 description、tags、examples 等属性提供足够的上下文,帮助 LLM 理解如何调用该技能:
new AgentSkill
{
    Id = "route-planning",
    Name = "Route Planning",
    Description = "规划从起点到终点的最优路线,支持避开收费站、高速等选项",
    Tags = ["navigation", "travel", "routing"],
    Examples = ["规划从北京到上海的路线", "避开收费站从A到B"],
    InputModes = ["text"],
    OutputModes = ["text"]
}
  • 错误处理
try
{
    var response = await a2aAgent.RunAsync(input, cancellationToken: ct);
    return response.Text;
}
catch (A2AException ex) when (ex.ErrorCode == A2AErrorCode.TaskFailed)
{
    return $"任务执行失败: {ex.Message}";
}
catch (HttpRequestException ex)
{
    return $"网络错误: {ex.Message}";
}
  • 重试机制,建议为 A2A 调用添加重试逻辑:
var policy = Policy
    .Handle<HttpRequestException>()
    .WaitAndRetryAsync(3, retryAttempt => 
        TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));

await policy.ExecuteAsync(async () =>
{
    var response = await a2aAgent.RunAsync(input);
    return response.Text;
});
  • 性能考虑
场景 建议
短任务 直接调用 RunAsync
长任务 使用 Task-based 模式 + Push Notifications
高并发 考虑限流和熔断
低延迟 预热 Agent 连接,使用连接池