Net+AI智能体进阶4:MCP进阶扩展
2025-10-25 11:20:48一、自定义传输协议
我们已经知道 MCP 的 Stdio 和 HTTP 传输协议了,今天我们深入探索一下 InMemory Transport(进程内传输)的实现原理,以及如何创建自定义传输协议。InMemory Transport 特别适合单进程内的 MCP 通信、单元测试和高性能场景。
1. InMemory Transport 原理
InMemory Transport 是一种在单进程内实现 MCP Client 和 Server 通信的传输方式。它使用内存中的数据结构(如 Pipe、Channel)进行消息传递,避免了跨进程通信的开销。
主要优势:
- 高性能:无需序列化到文件或网络,直接在内存中传递消息
- 易于测试:非常适合编写单元测试,不依赖外部进程或网络
- 同步控制:Client 和 Server 在同一进程,便于调试和状态管理
- 零依赖:不需要启动外部进程或监听端口
实现原理:InMemory Transport 基于 Pipe(管道)实现双向通信:
创建两个 Pipe:
clientToServerPipe:Client → Server 方向
serverToClientPipe:Server → Client 方向
连接读写端:
Client 写入 clientToServerPipe.Writer,Server 读取 clientToServerPipe.Reader
Server 写入 serverToClientPipe.Writer,Client 读取 serverToClientPipe.Reader
消息序列化:
使用 JSON-RPC 格式序列化消息
每条消息以换行符 \n 分隔
sequenceDiagram
participant C as McpClient
participant CTP as clientToServerPipe
participant STP as serverToClientPipe
participant S as McpServer
Note over C,S: 初始化连接
C->>CTP: Write (Request)
CTP->>S: Read (Request)
S->>STP: Write (Response)
STP->>C: Read (Response)
Note over C,S: 持续通信
C->>CTP: Write (CallTool)
CTP->>S: Read (CallTool)
S->>STP: Write (Result)
STP->>C: Read (Result)
2. 实现 InMemory Transport 通信
- 创建简单的 Echo Tool
// 创建一个简单的 Echo Tool
var echoTool = McpServerTool.Create(
method: (string arg) => $"Echo: {arg}",
options: new McpServerToolCreateOptions(){
Name = "EchoTool",
Description = "A simple echo tool that returns the input string."
});
- 使用 Pipe 创建 InMemory Transport。创建两个 Pipe 用于双向通信,并配置 Client 和 Server。核心步骤:
- 创建两个 Pipe 实例
- Server 使用 StreamServerTransport 连接 Pipe
- Client 使用 StreamClientTransport 连接 Pipe
- 启动 Server 和 Client
// 步骤 1:创建两个 Pipe(双向管道)
Pipe clientToServerPipe = new(); // Client 写,Server 读
Pipe serverToClientPipe = new(); // Server 写,Client 读
Console.WriteLine("创建了两个 Pipe:");
Console.WriteLine("- clientToServerPipe: Client → Server");
Console.WriteLine("- serverToClientPipe: Server → Client");
// 步骤 2:创建 Server,使用 StreamServerTransport
// Server 从 clientToServerPipe 读取,向 serverToClientPipe 写入
var serverTransport = new StreamServerTransport(
clientToServerPipe.Reader.AsStream(),
serverToClientPipe.Writer.AsStream());
var server = McpServer.Create(
serverTransport,
new McpServerOptions()
{
ToolCollection = [echoTool]
});
Console.WriteLine("McpServer 创建完成");
// 步骤 3:启动 Server(后台运行)
_ = server.RunAsync();
Console.WriteLine("McpServer 已启动(后台运行)");
// 步骤 4:创建 Client,使用 StreamClientTransport
// Client 向 clientToServerPipe 写入,从 serverToClientPipe 读取
var client = await McpClient.CreateAsync(
new StreamClientTransport(
clientToServerPipe.Writer.AsStream(),
serverToClientPipe.Reader.AsStream()));
Console.WriteLine("McpClient 创建完成并已连接");
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine(" InMemory Transport 连接建立成功!");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
- 测试 InMemory Transport 通信。列出工具、调用工具。代码关键点解析:
- 两个 Pipe 实现了全双工通信
- AsStream() 将 PipeReader/PipeWriter 转换为 Stream
- 消息以 JSON-RPC 格式序列化,每行一个消息
- Server 的 RunAsync() 在后台持续监听消息
try
{
Console.WriteLine("步骤 1:列出所有 Tools\n");
// 列出所有工具
var tools = await client.ListToolsAsync();
new
{
工具数量 = tools.Count,
工具列表 = tools.Select(t => new {
名称 = t.Name,
描述 = t.Description
})
}.Display();
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine("步骤 2:调用 Echo Tool\n");
// 调用 Echo 工具
var echo = tools.First(t => t.Name == "EchoTool");
var result = await echo.InvokeAsync(new()
{
["arg"] = "Hello from InMemory Transport!"
});
new
{
调用的工具 = echo.Name,
输入参数 = "Hello from InMemory Transport!",
返回结果 = result
}.Display();
Console.WriteLine("\nInMemory Transport 通信测试成功!");
}
catch (Exception ex)
{
Console.WriteLine("测试失败:");
new { 错误类型 = ex.GetType().Name, 错误消息 = ex.Message }.Display();
}
3. 传输层接口解析
MCP 传输层由三个核心接口组成:
classDiagram
class IClientTransport {
<<interface>>
+string Name
+ConnectAsync() Task~ITransport~
}
class ITransport {
<<interface>>
+string SessionId
+ChannelReader~JsonRpcMessage~ MessageReader
+SendMessageAsync(message) Task
+DisposeAsync() ValueTask
}
class TransportBase {
<<abstract>>
#SetConnected()
#SetDisconnected()
#WriteMessageAsync(message)
+SendMessageAsync(message)*
+DisposeAsync()*
}
ITransport <|.. TransportBase
IClientTransport ..> ITransport : creates
TransportBase <|-- StreamServerTransport
TransportBase <|-- StreamClientSessionTransport
- IClientTransport 接口:客户端传输的工厂接口,负责建立连接。
// 展示 IClientTransport 接口的定义和使用
var clientTransportInfo = new
{
接口名称 = "IClientTransport",
职责 = "客户端传输的工厂接口,负责建立连接",
核心方法 = new[]
{
new {
方法 = "ConnectAsync()",
返回 = "Task<ITransport>",
说明 = "异步建立与 MCP Server 的连接,返回传输会话"
}
},
核心属性 = new[]
{
new {
属性 = "Name",
类型 = "string",
说明 = "传输标识符,用于日志记录"
}
},
实现类示例 = new[]
{
"StdioClientTransport - 标准输入输出传输",
"HttpClientTransport - HTTP 传输",
"StreamClientTransport - 流传输(InMemory)"
},
使用场景 = "在创建 McpClient 时,通过 McpClient.CreateAsync(transport) 传入"
};
clientTransportInfo.Display();
- ITransport 接口:表示一个已建立的传输会话,提供消息的双向通信。
// 展示 ITransport 接口的定义和使用
var transportInfo = new
{
接口名称 = "ITransport",
职责 = "表示已建立的传输会话,提供消息的双向通信",
继承 = "IAsyncDisposable",
核心属性 = new[]
{
new {
属性 = "SessionId",
类型 = "string?",
说明 = "会话标识符,用于多会话传输(HTTP/SSE)"
},
new {
属性 = "MessageReader",
类型 = "ChannelReader<JsonRpcMessage>",
说明 = "接收来自对端的消息的通道读取器"
}
},
核心方法 = new[]
{
new {
方法 = "SendMessageAsync(message)",
返回 = "Task",
说明 = "发送 JSON-RPC 消息到对端"
},
new {
方法 = "DisposeAsync()",
返回 = "ValueTask",
说明 = "异步释放传输资源,关闭连接"
}
},
通信模型 = "生产者-消费者模式,使用 Channel 实现线程安全的消息队列",
实现类示例 = new[]
{
"StreamServerTransport - 基于流的服务端传输",
"StreamClientSessionTransport - 基于流的客户端传输",
"SseResponseStreamTransport - SSE 响应流传输",
"StreamableHttpServerTransport - Streamable HTTP 传输"
}
};
transportInfo.Display();
- TransportBase 抽象基类:为 ITransport 实现提供公共功能,包括消息通道管理、连接状态跟踪、日志记录。
// 展示 TransportBase 的核心功能
var transportBaseInfo = new
{
类名 = "TransportBase",
类型 = "abstract class",
实现接口 = "ITransport",
职责 = "为 ITransport 实现提供公共功能",
提供的核心功能 = new[]
{
"消息通道管理 - 使用 Channel<JsonRpcMessage> 实现无界队列",
"连接状态跟踪 - StateInitial → StateConnected → StateDisconnected",
"日志记录 - 集成 ILogger,记录连接/发送/接收事件",
"线程安全 - 使用 Interlocked 确保状态变更的原子性"
},
protected方法供子类使用 = new[]
{
new {
方法 = "SetConnected()",
说明 = "将传输设置为已连接状态"
},
new {
方法 = "SetDisconnected(error?)",
说明 = "将传输设置为已断开状态,完成消息通道"
},
new {
方法 = "WriteMessageAsync(message)",
说明 = "将接收到的消息写入通道供上层读取"
}
},
abstract方法需子类实现 = new[]
{
new {
方法 = "SendMessageAsync(message)",
说明 = "子类实现具体的发送逻辑(序列化、写入流等)"
},
new {
方法 = "DisposeAsync()",
说明 = "子类实现资源释放逻辑"
}
},
Channel配置 = new
{
类型 = "UnboundedChannel",
原因 = "避免在写入时阻塞,确保消息接收的高性能",
配置 = "SingleReader=true, SingleWriter=false"
}
};
transportBaseInfo.Display();
4. 实现自定义传输协议
现在我们将实现一个自定义的传输协议:基于内存队列的传输,模拟消息队列场景(如 RabbitMQ、Kafka)。
设计思路:我们将创建一个 QueueClientTransport,使用两个 Channel 作为消息队列:
一个用于 Client → Server
一个用于 Server → Client
实现 QueueTransport(基于 TransportBase)
/// <summary>
/// 基于 Channel 的自定义传输实现
/// 参考 StreamServerTransport 的实现模式
/// </summary>
public sealed class QueueTransport : TransportBase
{
private readonly ChannelWriter<JsonRpcMessage> _sendChannel;
private readonly ChannelReader<JsonRpcMessage> _receiveChannel;
private readonly CancellationTokenSource _shutdownCts = new();
private readonly Task _readLoopCompleted;
private int _disposed = 0;
public QueueTransport(
ChannelWriter<JsonRpcMessage> sendChannel,
ChannelReader<JsonRpcMessage> receiveChannel,
string name,
ILoggerFactory? loggerFactory = null)
: base(name, loggerFactory)
{
_sendChannel = sendChannel;
_receiveChannel = receiveChannel;
// 设置为已连接状态
SetConnected();
// 启动后台读取循环
_readLoopCompleted = Task.Run(ReadMessagesAsync, _shutdownCts.Token);
}
public override async Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
{
if (!IsConnected)
{
throw new InvalidOperationException("Transport is not connected.");
}
try
{
// 写入发送通道
await _sendChannel.WriteAsync(message, cancellationToken);
}
catch (Exception ex)
{
throw new IOException($"Failed to send message via queue transport: {ex.Message}", ex);
}
}
private async Task ReadMessagesAsync()
{
CancellationToken shutdownToken = _shutdownCts.Token;
Exception? error = null;
try
{
// 持续从接收通道读取消息
await foreach (var message in _receiveChannel.ReadAllAsync(shutdownToken))
{
// 写入到 TransportBase 管理的消息通道
await WriteMessageAsync(message, shutdownToken);
}
}
catch (OperationCanceledException)
{
// 正常关闭
}
catch (Exception ex)
{
error = ex;
}
finally
{
SetDisconnected(error);
}
}
public override async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _disposed, 1) != 0)
{
return;
}
try
{
// 取消读取循环
await _shutdownCts.CancelAsync();
_shutdownCts.Dispose();
// 等待读取循环完成
try
{
await _readLoopCompleted;
}
catch (OperationCanceledException)
{
}
}
finally
{
SetDisconnected();
}
GC.SuppressFinalize(this);
}
}
- 实现 QueueClientTransport(实现 IClientTransport),创建客户端传输工厂。
/// <summary>
/// 自定义的客户端传输,基于 Channel 队列
/// 参考 StreamClientTransport 的实现模式
/// </summary>
public sealed class QueueClientTransport : IClientTransport
{
private readonly ChannelWriter<JsonRpcMessage> _sendChannel;
private readonly ChannelReader<JsonRpcMessage> _receiveChannel;
private readonly ILoggerFactory? _loggerFactory;
public QueueClientTransport(
ChannelWriter<JsonRpcMessage> sendChannel,
ChannelReader<JsonRpcMessage> receiveChannel,
ILoggerFactory? loggerFactory = null)
{
_sendChannel = sendChannel;
_receiveChannel = receiveChannel;
_loggerFactory = loggerFactory;
}
public string Name => "queue-transport";
public Task<ITransport> ConnectAsync(CancellationToken cancellationToken = default)
{
// 创建传输会话
var transport = new QueueTransport(
_sendChannel,
_receiveChannel,
"Client (queue)",
_loggerFactory);
return Task.FromResult<ITransport>(transport);
}
}
- 测试自定义传输协议
try
{
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine(" 测试自定义 Queue Transport ");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
// 步骤 1:创建两个 Channel 作为消息队列
var clientToServerChannel = Channel.CreateUnbounded<JsonRpcMessage>();
var serverToClientChannel = Channel.CreateUnbounded<JsonRpcMessage>();
Console.WriteLine("创建了两个 Channel 队列");
// 步骤 2:创建 Server(使用 QueueTransport)
var serverTransport2 = new QueueTransport(
serverToClientChannel.Writer,
clientToServerChannel.Reader,
"Server (queue)");
var server2 = McpServer.Create(
serverTransport2,
new McpServerOptions()
{
ToolCollection = [echoTool]
});
_ = server2.RunAsync();
Console.WriteLine("Server 已创建并启动(使用 QueueTransport)");
// 步骤 3:创建 Client(使用 QueueClientTransport)
var clientTransport = new QueueClientTransport(
clientToServerChannel.Writer,
serverToClientChannel.Reader);
var client2 = await McpClient.CreateAsync(clientTransport);
Console.WriteLine("Client 已创建并连接(使用 QueueClientTransport)");
// 步骤 4:测试通信
Console.WriteLine("\n列出 Tools:");
var tools2 = await client2.ListToolsAsync();
tools2.Select(t => t.Name).ToArray().Display();
Console.WriteLine("\n调用 Echo Tool:");
var echo2 = tools2.First(t => t.Name == "EchoTool");
var result2 = await echo2.InvokeAsync(new()
{
["arg"] = "Hello from Queue Transport!"
});
new
{
输入 = "Hello from Queue Transport!",
输出 = result2
}.Display();
Console.WriteLine("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine(" 自定义 Queue Transport 测试成功!");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
// 清理资源
await client2.DisposeAsync();
await server2.DisposeAsync();
}
catch (Exception ex)
{
Console.WriteLine("测试失败:");
new { 错误类型 = ex.GetType().Name, 错误消息 = ex.Message }.Display();
}
5. 高级特性与最佳实践
背压控制(Backpressure)
什么是背压?:当消息生产速度超过消费速度时,未处理的消息会积压,导致内存溢出。
Channel 的背压策略:
| Channel 类型 | 行为 | 适用场景 |
|---|---|---|
| CreateUnbounded | 无界队列,可能导致内存溢出 | 消息量可控,需要高吞吐 |
| CreateBounded(capacity) | 有界队列,写入时可能阻塞 | 需要控制内存使用 |
| BoundedChannelFullMode.Wait | 队列满时阻塞写入 | 不允许丢消息 |
| BoundedChannelFullMode.DropOldest | 队列满时丢弃最旧的消息 | 实时性优先 |
// 推荐:使用有界 Channel + DropWrite 策略
var channel = Channel.CreateBounded<JsonRpcMessage>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.DropWrite // 队列满时丢弃新消息
});
性能优化策略
- 传输层性能优化要点:
| 优化项 | 推荐做法 | 原因 |
|---|---|---|
| 消息序列化 | 使用 System.Text.Json |
高性能,内存友好 |
| 缓冲区管理 | 使用 ArrayPool 复用数组 |
减少 GC 压力 |
| 异步 I/O | 使用 async/await |
避免线程阻塞 |
| Channel 配置 | SingleReader=true |
减少锁竞争 |
| 连接复用 | 长连接 + 会话管理 | 减少建立连接开销 |
- Pipe vs Channel 性能对比:
// Pipe: 更底层,性能更高,适合大量数据流
Pipe pipe = new();
// Channel: 更高层,易用性好,适合消息队列
Channel<T> channel = Channel.CreateUnbounded<T>();
选择建议:
Pipe:适合流式数据(如文件、网络流)
Channel:适合离散消息(如命令、事件)
单元测试最佳实践
InMemory Transport 的优势:
快速:无需启动外部进程
可靠:不依赖网络或文件系统
可控:完全控制消息传递时机
隔离:每个测试独立的传输实例
测试模版
[Fact]
public async Task TestMcpServerTool()
{
// Arrange: 创建 InMemory Transport
Pipe clientToServer = new(), serverToClient = new();
var server = McpServer.Create(
new StreamServerTransport(
clientToServer.Reader.AsStream(),
serverToClient.Writer.AsStream()),
new McpServerOptions { /* ... */ });
_ = server.RunAsync();
var client = await McpClient.CreateAsync(
new StreamClientTransport(
clientToServer.Writer.AsStream(),
serverToClient.Reader.AsStream()));
// Act: 调用 Tool
var tools = await client.ListToolsAsync();
var result = await tools[0].InvokeAsync(new { arg = "test" });
// Assert: 验证结果
Assert.Equal("Expected", result);
// Cleanup
await client.DisposeAsync();
await server.DisposeAsync();
}
错误处理和异常设计
- 传输层错误分类:
| 错误类型 | 处理策略 | 示例 |
|---|---|---|
| 连接错误 | 重试或失败 | 无法建立连接 |
| 序列化错误 | 记录日志,跳过消息 | JSON 格式错误 |
| 超时错误 | 取消操作,通知上层 | 长时间无响应 |
| 资源耗尽 | 背压控制,拒绝新消息 | Channel 已满 |
- 完整的错误处理
public override async Task SendMessageAsync(JsonRpcMessage message, ...)
{
if (!IsConnected)
{
throw new InvalidOperationException("Transport is not connected.");
}
try
{
await _sendChannel.WriteAsync(message, cancellationToken);
}
catch (ChannelClosedException ex)
{
// Channel 已关闭,标记传输为断开
SetDisconnected(ex);
throw new IOException("Transport channel closed.", ex);
}
catch (OperationCanceledException)
{
// 操作被取消,直接重新抛出
throw;
}
catch (Exception ex)
{
// 其他异常,记录日志
LogTransportSendFailed(Name, "message-id", ex);
throw new IOException("Failed to send message.", ex);
}
}
日志记录建议:
连接事件:LogLevel.Information
消息发送/接收:LogLevel.Debug
错误和异常:LogLevel.Error 或 LogLevel.Warning
传输方式对比总结
| 特性 | Stdio | HTTP | InMemory (Pipe/Channel) |
|---|---|---|---|
| 跨进程 | 是 | 是 | 否(单进程内) |
| 网络开销 | 无 | 有 | 无 |
| 序列化开销 | 有(JSON) | 有(JSON) | 有(JSON) |
| 性能 | 高 | 中 | 极高 |
| 调试难度 | 难 | 中 | 易 |
| 适用场景 | CLI 工具 | Web 服务 | 单元测试、高性能场景 |
| 多客户端 | 否 | 是 | 否 |
| 安全性 | 进程隔离 | TLS/Auth | 进程内安全 |
注:InMemory 传输虽然需要 JSON 序列化,但因无网络/文件 I/O,整体性能最高。
自定义传输的应用场景
真实世界的自定义传输示例:
消息队列传输:RabbitMQ、Kafka、Azure Service Bus
WebSocket 传输:实时双向通信,适合 Web 应用
数据库传输:使用数据库表作为消息队列
加密传输:在标准传输基础上添加加密层
gRPC 传输:使用 gRPC 进行跨语言通信
二、自定义 Handler
1. 理解 Handler 的工作机制
Handler 是一个委托(Delegate),用于处理特定类型的 MCP 请求。签名如下:
public delegate Task<TResult> McpRequestHandler<TParams, TResult>(
McpRequestContext<TParams> context,
CancellationToken cancellationToken
);
Handler 与 Attribute 的关系
flowchart LR
Client[客户端请求] --> Server[MCP Server]
Server --> Check{工具存在?}
Check -->|是| Attr[Attribute 定义的工具]
Check -->|否| Handler[Handler 处理]
Attr --> Response[返回结果]
Handler --> Response
执行顺序:
- 优先查找 Attribute 方式注册的工具/资源
- 如果没找到,调用 Handler 处理
- 合并两种方式的结果返回给客户端
常用 Handler 类型对照表:
| Handler 方法 | 请求类型 | 参数类型 | 返回类型 | 用途 |
|---|---|---|---|---|
| WithListToolsHandler | tools/list | ListToolsRequestParams | ListToolsResult | 列出工具 |
| WithCallToolHandler | tools/call | CallToolRequestParams | CallToolResult | 调用工具 |
| WithListResourcesHandler | resources/list | ListResourcesRequestParams | ListResourcesResult | 列出资源 |
| WithReadResourceHandler | resources/read | ReadResourceRequestParams | ReadResourceResult | 读取资源 |
| WithListPromptsHandler | prompts/list | ListPromptsRequestParams | ListPromptsResult | 列出提示 |
| WithGetPromptHandler | prompts/get | GetPromptRequestParams | GetPromptResult | 获取提示 |
| WithCompleteHandler | completion/complete | CompleteRequestParams | CompleteResult | 自动补全 |
2. 自定义 Tool Handler
场景说明:实现一个动态的工具系统,工具列表可以根据运行时条件变化。
- 定义工具管理器和辅助方法
/// <summary>
/// 动态工具管理器
/// </summary>
public class DynamicToolManager
{
private readonly List<Tool> _tools = new();
public void AddTool(string name, string description, JsonElement schema)
{
_tools.Add(new Tool
{
Name = name,
Description = description,
InputSchema = schema
});
}
public List<Tool> GetAllTools() => _tools;
}
// 辅助方法
string HandleCalculate(IReadOnlyDictionary<string, JsonElement> args)
{
var a = args["a"].GetDouble();
var b = args["b"].GetDouble();
var op = args["operation"].GetString() ?? "+";
var result = op switch
{
"+" => a + b,
"-" => a - b,
"*" => a * b,
"/" => a / b,
_ => throw new Exception($"不支持的运算符: {op}")
};
return $"{a} {op} {b} = {result}";
}
string HandleEcho(IReadOnlyDictionary<string, JsonElement> args)
{
var message = args["message"].GetString() ?? "";
return $"Echo: {message}";
}
- 配置 Tools Handlers。关键要点:
- McpServerOptions.Handlers:通过 serverOptions.Handlers.XxxHandler 配置各类 Handler
- 扩展方法方式:也可以使用 .WithListToolsHandler() 等扩展方法配置(下一个示例)
- 两种方式等效:两种配置方式实现相同的功能,按需选择
- McpServerOptions.Handlers 适应简单场景,快速配置
- 扩展方法,适应复杂场景,链式调用
// 创建工具管理器实例
var toolManager = new DynamicToolManager();
// 配置 Handler
var serverOptions = new McpServerOptions();
// 配置 ListToolsHandler
serverOptions.Handlers.ListToolsHandler = async (context, ct) =>
{
Console.WriteLine("收到 list_tools 请求");
var tools = toolManager.GetAllTools();
Console.WriteLine($"返回 {tools.Count} 个工具");
return new ListToolsResult
{
Tools = tools.ToArray()
};
};
// 配置 CallToolHandler
serverOptions.Handlers.CallToolHandler = async (context, ct) =>
{
Console.WriteLine($"调用工具: {context.Params.Name}");
Console.WriteLine($"参数: {JsonSerializer.Serialize(context.Params.Arguments)}");
// 根据工具名称执行不同的逻辑
var result = context.Params.Name switch
{
"get_time" => $"当前时间: {DateTime.Now:yyyy-MM-dd HH:mm:ss}",
"calculate" => HandleCalculate(context.Params.Arguments),
"echo" => HandleEcho(context.Params.Arguments),
_ => throw new Exception($"未知工具: {context.Params.Name}")
};
Console.WriteLine($"工具执行完成: {result}");
return new CallToolResult
{
Content = new List<ContentBlock>
{
new TextContentBlock
{
Text = result
}
}
};
};
- 测试 Tools Handler,动态添加工具并测试
// 添加 get_time 工具
toolManager.AddTool(
"get_time",
"获取当前时间",
JsonSerializer.SerializeToElement(new
{
type = "object",
properties = new { },
required = new string[] { }
})
);
// 添加 calculate 工具
toolManager.AddTool(
"calculate",
"执行数学计算",
JsonSerializer.SerializeToElement(new
{
type = "object",
properties = new
{
a = new { type = "number", description = "第一个数字" },
b = new { type = "number", description = "第二个数字" },
operation = new
{
type = "string",
@enum = new[] { "+", "-", "*", "/" },
description = "运算符"
}
},
required = new[] { "a", "b", "operation" }
})
);
// 添加 echo 工具
toolManager.AddTool(
"echo",
"回显消息",
JsonSerializer.SerializeToElement(new
{
type = "object",
properties = new
{
message = new { type = "string", description = "要回显的消息" }
},
required = new[] { "message" }
})
);
// 创建 Client 和 Server
var (toolsClient, toolsServer) = await McpHelper.CreateInMemoryClientAndServerAsync(
tools: new List<McpServerTool>(),
serverOptions: serverOptions
);
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine(" 测试: 列出工具 ");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
try
{
var tools = await toolsClient.ListToolsAsync();
Console.WriteLine($"成功获取 {tools.Count} 个工具:\n");
foreach (var tool in tools)
{
Console.WriteLine($" {tool.Name}");
Console.WriteLine($" {tool.Description}");
Console.WriteLine();
}
}
catch (Exception ex)
{
Console.WriteLine($"错误: {ex.Message}");
}
3. 自定义 Resources Handler
场景说明:实现一个动态的资源系统,可以从数据库、文件系统等外部数据源加载资源。
- 定义 DynamicResourceManager 和 Resources Handlers
/// <summary>
/// 动态资源管理器
/// </summary>
public class DynamicResourceManager
{
private readonly Dictionary<string, string> _resources = new();
public void AddResource(string uri, string content)
{
_resources[uri] = content;
}
public List<Resource> GetAllResources()
{
return _resources.Select(kvp => new Resource
{
Uri = kvp.Key,
Name = Path.GetFileName(kvp.Key),
Description = $"资源: {Path.GetFileName(kvp.Key)}",
MimeType = "text/plain"
}).ToList();
}
public string? GetResourceContent(string uri)
{
return _resources.TryGetValue(uri, out var content) ? content : null;
}
}
// 创建资源管理器实例
var resourceManager = new DynamicResourceManager();
// 配置 Resources Handler
var resourceServerOptions = new McpServerOptions();
resourceServerOptions.Handlers.ListResourcesHandler = async (context, ct) =>
{
Console.WriteLine("收到 list_resources 请求");
var resources = resourceManager.GetAllResources();
Console.WriteLine($"返回 {resources.Count} 个资源");
return new ListResourcesResult
{
Resources = resources.ToArray()
};
};
resourceServerOptions.Handlers.ReadResourceHandler = async (context, ct) =>
{
Console.WriteLine($"读取资源: {context.Params.Uri}");
var content = resourceManager.GetResourceContent(context.Params.Uri.ToString());
if (content == null)
{
throw new Exception($"资源不存在: {context.Params.Uri}");
}
Console.WriteLine($"资源读取完成,长度: {content.Length} 字符");
return new ReadResourceResult
{
Contents = new List<ResourceContents>
{
new TextResourceContents
{
Uri = context.Params.Uri,
MimeType = "text/plain",
Text = content
}
}
};
};
- 扩展方法方式(可选),关键要点:
- 两种配置方式实现相同的功能
- McpServerOptions.Handlers 方式更直观简洁
- 扩展方法方式支持链式调用,适合多个 Handler 配置
// 也可以使用扩展方法配置 Handler(链式调用)
builder.Services.AddMcpServer()
.WithListResourcesHandler(async (context, ct) => { ... })
.WithReadResourceHandler(async (context, ct) => { ... });
- 测试 Resources Handler,添加资源并测试
// 添加配置文件资源
resourceManager.AddResource(
"file:///config/app.json",
JsonSerializer.Serialize(new
{
AppName = "MyApp",
Version = "1.0.0",
Database = new { Host = "localhost", Port = 5432 }
}, new JsonSerializerOptions { WriteIndented = true })
);
// 添加日志文件资源
resourceManager.AddResource(
"file:///logs/app.log",
"2025-01-20 10:00:00 - 应用启动\n2025-01-20 10:01:15 - 用户登录\n2025-01-20 10:05:30 - 数据处理完成"
);
// 添加文档资源
resourceManager.AddResource(
"file:///docs/readme.md",
"# 项目文档\n\n## 简介\n这是一个示例项目。\n\n## 使用方法\n1. 安装依赖\n2. 运行程序"
);
Console.WriteLine("已添加 3 个动态资源");
// 创建 Client 和 Server
var (resourcesClient, resourcesServer) = await McpHelper.CreateInMemoryClientAndServerAsync(
tools: new List<McpServerTool>(),
serverOptions: resourceServerOptions
);
// 测试资源列表
var resources = await resourcesClient.ListResourcesAsync();
new
{
资源数量 = resources.Count,
资源列表 = resources.Select(r => new
{
Uri = r.Uri.ToString(),
r.Name,
r.MimeType
}).ToArray()
}.Display();
4. 自定义 Prompts Handler
场景说明:实现一个动态的提示模板系统,可以从配置文件或数据库加载提示模板。
- 定义 DynamicPromptManager 和 Prompts Handlers
/// <summary>
/// 动态提示管理器
/// </summary>
public class DynamicPromptManager
{
private readonly Dictionary<string, PromptTemplate> _prompts = new();
public void AddPrompt(string name, string description, List<PromptArgument> arguments, string template)
{
_prompts[name] = new PromptTemplate
{
Name = name,
Description = description,
Arguments = arguments,
Template = template
};
}
public List<Prompt> GetAllPrompts()
{
return _prompts.Values.Select(p => new Prompt
{
Name = p.Name,
Description = p.Description,
Arguments = p.Arguments.ToArray()
}).ToList();
}
public PromptTemplate? GetPrompt(string name)
{
return _prompts.TryGetValue(name, out var prompt) ? prompt : null;
}
}
public class PromptTemplate
{
public string Name { get; set; } = "";
public string Description { get; set; } = "";
public List<PromptArgument> Arguments { get; set; } = new();
public string Template { get; set; } = "";
}
// 创建提示管理器实例
var promptManager = new DynamicPromptManager();
// 配置 Prompts Handler
var promptServerOptions = new McpServerOptions();
promptServerOptions.Handlers.ListPromptsHandler = async (context, ct) =>
{
Console.WriteLine("收到 list_prompts 请求");
var prompts = promptManager.GetAllPrompts();
Console.WriteLine($"返回 {prompts.Count} 个提示模板");
return new ListPromptsResult
{
Prompts = prompts.ToArray()
};
};
promptServerOptions.Handlers.GetPromptHandler = async (context, ct) =>
{
Console.WriteLine($"获取提示模板: {context.Params.Name}");
var prompt = promptManager.GetPrompt(context.Params.Name);
if (prompt == null)
{
throw new Exception($"提示模板不存在: {context.Params.Name}");
}
// 填充参数
var template = prompt.Template;
if (context.Params.Arguments != null)
{
foreach (var arg in context.Params.Arguments)
{
template = template.Replace($"{{{arg.Key}}}", arg.Value.ToString() ?? "");
}
}
Console.WriteLine($"提示模板已生成");
return new GetPromptResult
{
Description = prompt.Description,
Messages = new[]
{
new PromptMessage
{
Role = Role.User,
Content = new TextContentBlock { Text = template }
}
}
};
};
- 测试 Prompts Handler,添加提示模板并测试。
// 添加代码审查模板
promptManager.AddPrompt(
"code_review",
"代码审查提示模板",
new List<PromptArgument>
{
new() { Name = "language", Description = "编程语言", Required = true },
new() { Name = "code", Description = "要审查的代码", Required = true }
},
"请审查以下 {language} 代码:\n\n```{language}\n{code}\n```\n\n请指出潜在的问题、改进建议和最佳实践。"
);
// 添加翻译模板
promptManager.AddPrompt(
"translate",
"文本翻译提示模板",
new List<PromptArgument>
{
new() { Name = "text", Description = "要翻译的文本", Required = true },
new() { Name = "target_language", Description = "目标语言", Required = true }
},
"请将以下文本翻译成 {target_language}:\n\n{text}"
);
// 添加总结模板
promptManager.AddPrompt(
"summarize",
"文档总结提示模板",
new List<PromptArgument>
{
new() { Name = "document", Description = "要总结的文档", Required = true },
new() { Name = "max_words", Description = "最大字数", Required = false }
},
"请用不超过 {max_words} 个字总结以下文档:\n\n{document}"
);
Console.WriteLine("已添加 3 个动态提示模板");
// 创建 Client 和 Server
var (promptsClient, promptsServer) = await McpHelper.CreateInMemoryClientAndServerAsync(
tools: new List<McpServerTool>(),
serverOptions: promptServerOptions
);
// 测试提示模板列表
var prompts = await promptsClient.ListPromptsAsync();
new
{
模板数量 = prompts.Count,
模板列表 = prompts.Select(p => new
{
p.Name,
p.Description,
参数数量 = p.ProtocolPrompt.Arguments.Count
}).ToArray()
}.Display();
5. 其他高级 Handler
CompleteHandler - 自动补全,提供参数值的自动补全建议。
// 配置 CompleteHandler
var completeServerOptions = new McpServerOptions();
completeServerOptions.Handlers.CompleteHandler = async (context, ct) =>
{
Console.WriteLine($"自动补全请求");
Console.WriteLine($"引用类型: {context.Params.Ref.Type}");
Console.WriteLine($"参数名: {context.Params.Argument.Name}");
Console.WriteLine($"当前值: {context.Params.Argument.Value}");
var suggestions = new List<string>();
// 根据不同的引用类型和参数提供建议
if (context.Params.Ref.Type == "ref/prompt")
{
if (context.Params.Argument.Name == "language")
{
suggestions.AddRange(new[] { "C#", "Python", "JavaScript", "Java", "Go" });
}
else if (context.Params.Argument.Name == "target_language")
{
suggestions.AddRange(new[] { "中文", "英文", "日文", "韩文", "法文" });
}
}
else if (context.Params.Ref.Type == "ref/resource")
{
// 为资源 URI 提供建议
suggestions.AddRange(new[]
{
"file:///config/app.json",
"file:///logs/app.log",
"file:///docs/readme.md"
});
}
// 过滤匹配当前输入的建议
var currentValue = context.Params.Argument.Value?.ToString() ?? "";
var filteredSuggestions = suggestions
.Where(s => s.Contains(currentValue, StringComparison.OrdinalIgnoreCase))
.ToList();
Console.WriteLine($"返回 {filteredSuggestions.Count} 个补全建议");
return new CompleteResult
{
Completion = new Completion
{
Values = filteredSuggestions,
HasMore = false,
Total = filteredSuggestions.Count
}
};
};
// 创建 Client 和 Server
var (mcpClient, mcpServer) = await McpHelper.CreateInMemoryClientAndServerAsync(
tools: new List<McpServerTool>(),
serverOptions: completeServerOptions
);
var reference = new PromptReference
{
Name = "abc"
};
var res = await mcpClient.CompleteAsync(reference,"language", "C");
res.Display();
资源订阅 Handler
实现资源变化通知功能。核心概念:
- 订阅:客户端表示对某个资源感兴趣
- 通知:资源变化时服务器主动推送更新
- 取消订阅:客户端不再需要接收通知
/// <summary>
/// 订阅管理器
/// </summary>
public class SubscriptionManager
{
private readonly HashSet<string> _subscriptions = new();
public void Subscribe(string uri)
{
_subscriptions.Add(uri);
Console.WriteLine($"新增订阅: {uri}");
}
public void Unsubscribe(string uri)
{
if (_subscriptions.Remove(uri))
{
Console.WriteLine($"取消订阅: {uri}");
}
}
public bool IsSubscribed(string uri) => _subscriptions.Contains(uri);
public int GetSubscriptionCount() => _subscriptions.Count;
}
// 创建订阅管理器实例
var subscriptionManager = new SubscriptionManager();
// 配置订阅 Handlers
var subscriptionServerOptions = new McpServerOptions();
subscriptionServerOptions.Handlers.SubscribeToResourcesHandler = async (context, ct) =>
{
Console.WriteLine($"收到订阅请求: {context.Params.Uri}");
subscriptionManager.Subscribe(context.Params.Uri.ToString());
Console.WriteLine($"订阅成功,当前订阅数: {subscriptionManager.GetSubscriptionCount()}");
return new EmptyResult();
};
subscriptionServerOptions.Handlers.UnsubscribeFromResourcesHandler = async (context, ct) =>
{
Console.WriteLine($"收到取消订阅请求: {context.Params.Uri}");
subscriptionManager.Unsubscribe(context.Params.Uri.ToString());
Console.WriteLine($"取消订阅成功,当前订阅数: {subscriptionManager.GetSubscriptionCount()}");
return new EmptyResult();
};
日志级别设置 Handler
允许客户端动态调整服务器的日志级别。
// 用于存储当前日志级别
string currentLogLevel = "info";
// 配置日志级别 Handler
var loggingServerOptions = new McpServerOptions();
loggingServerOptions.Handlers.SetLoggingLevelHandler = async (context, ct) =>
{
var oldLevel = currentLogLevel;
currentLogLevel = context.Params.Level.ToString();
Console.WriteLine($"日志级别变更: {oldLevel} → {currentLogLevel}");
Console.WriteLine($"变更说明: 客户端请求调整日志输出级别");
// 在实际应用中,这里会更新 ILogger 的过滤级别
// 例如:loggerFactory.SetMinimumLevel(ParseLogLevel(currentLogLevel));
Console.WriteLine($"日志级别已更新");
return new EmptyResult();
};
Console.WriteLine("SetLoggingLevelHandler 定义完成");
// 显示支持的日志级别
new
{
当前日志级别 = currentLogLevel,
支持的级别 = new[] { "debug", "info", "notice", "warning", "error", "critical", "alert", "emergency" }
}.Display();
6. 高级场景与最佳实践
Handler 的配置方式
| 特性 | McpServerOptions.Handlers | 扩展方法 |
|---|---|---|
| 代码风格 | 直接赋值 | 链式调用 |
| 依赖注入 | 需要通过 context.Services |
自动集成 |
| 适用场景 | 简单快速配置 | 复杂企业级应用 |
| 配置管理 | 集中在 options 对象 | 分散在多个方法调用 |
Handler 推荐做法
- 访问外部依赖
serverOptions.Handlers.ListToolsHandler = async (context, ct) =>
{
// 通过闭包访问外部变量
var tools = toolManager.GetTools();
return new ListToolsResult { Tools = tools };
};
// 或者使用扩展方法从服务容器获取
builder.Services.AddMcpServer()
.WithListToolsHandler(async (context, ct) =>
{
// 从服务容器获取依赖
var manager = context.Services.GetRequiredService<ToolManager>();
var tools = manager.GetTools();
return new ListToolsResult { Tools = tools };
});
- 支持取消令牌
serverOptions.Handlers.CallToolHandler = async (context, ct) =>
{
// 传递取消令牌
var result = await LongRunningOperation(ct);
return new CallToolResult { Content = result };
};
- 完善的错误处理
serverOptions.Handlers.ReadResourceHandler = async (context, ct) =>
{
try
{
var content = await ReadAsync(context.Params.Uri, ct);
return new ReadResourceResult { Contents = content };
}
catch (FileNotFoundException ex)
{
throw new Exception($"资源不存在: {context.Params.Uri}", ex);
}
};
常见错误与解决方案
| 错误场景 | 原因 | 解决方案 |
|---|---|---|
| Handler 未被调用 | Attribute 方式已注册同名项 | 检查是否重复注册 |
| 访问外部变量失败 | 作用域问题 | 使用闭包或依赖注入 |
| 取消令牌未生效 | 未传递 CancellationToken | 在异步操作中传递 ct 参数 |
| 返回结果格式错误 | 类型不匹配 | 检查返回类型是否符合协议定义 |
性能优化建议
- 异步操作:所有 I/O 操作使用 async/await
- 缓存策略:对频繁访问的数据进行缓存
- 分页支持:大数据集使用 cursor 实现分页
- 资源管理:及时释放不再使用的资源
安全考虑
- 输入验证:验证所有客户端输入
- 权限检查:在 Handler 中实现访问控制
- 资源限制:防止资源耗尽攻击
- 敏感信息:不在日志中输出敏感数据
三、自定义 Filter
1. 基础 Filter 实现
- 创建一个简单的日志 Filter
// 定义一个简单的工具类
public class CalculatorTools
{
[McpServerTool]
public static int Add(int a, int b) => a + b;
[McpServerTool]
public static int Multiply(int a, int b) => a * b;
}
// 创建日志 Filter
// Filter 接收 next handler,返回一个新的 handler
McpRequestHandler<ListToolsRequestParams, ListToolsResult> LoggingFilter(
McpRequestHandler<ListToolsRequestParams, ListToolsResult> next)
{
// 返回一个新的 handler
return async (request, cancellationToken) =>
{
// 前置逻辑:记录请求开始
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"[日志Filter] 收到 ListTools 请求");
Console.WriteLine($"游标参数: {request.Params.Cursor ?? "null"}");
var sw = Stopwatch.StartNew();
try
{
// 调用下一个处理器
var response = await next(request, cancellationToken);
sw.Stop();
// 后置逻辑:记录响应结果
Console.WriteLine($"[日志Filter] 请求完成");
Console.WriteLine($"返回工具数: {response.Tools?.Count ?? 0}");
Console.WriteLine($"耗时: {sw.ElapsedMilliseconds}ms");
Console.WriteLine($"下一页游标: {response.NextCursor ?? "null"}");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
return response;
}
catch (Exception ex)
{
sw.Stop();
Console.WriteLine($"[日志Filter] 请求失败: {ex.Message}");
Console.WriteLine($"耗时: {sw.ElapsedMilliseconds}ms");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
throw;
}
};
}
- 注册 Filter 并创建 Server
- 方法1:使用扩展方法(推荐),适用于使用 DI 容器构建 Server 的场景,代码更简洁
- 方法2:使用 McpServerOptions.Filters,适用于手动创建 Server 的场景,提供更精细的控制
// 方法 1
var services = new ServiceCollection();
services.AddMcpServer()
.WithTools<CalculatorTools>()
.AddListToolsFilter(LoggingFilter); // 直接使用扩展方法
// 方法 2
var tools = McpHelper.GetToolsForType<CalculatorTools>();
var serverOptions = new McpServerOptions();
serverOptions.Filters.ListToolsFilters.Add(LoggingFilter); // 添加到 Filters 集合
var (client, server) = await McpHelper.CreateInMemoryClientAndServerAsync(
tools,
serverOptions
);
- 测试日志 Filter
// 使用 Client 请求工具列表
try
{
var result = await client.ListToolsAsync();
Console.WriteLine("\n最终返回的工具列表:");
foreach (var tool in result ?? [])
{
new {
工具名称 = tool.Name,
工具描述 = tool.Description
}.Display();
}
}
catch (Exception ex)
{
Console.WriteLine($"请求失败: {ex.Message}");
}
2. 修改响应的 Filter
在企业环境中,我们可能需要根据用户权限动态过滤工具列表。让我们创建一个 Filter 来演示如何修改响应。
- 创建一个过滤敏感工具的 Filter
// 定义包含敏感工具的类
public class ExtendedCalculatorTools
{
[McpServerTool]
public static int Add(int a, int b) => a + b;
[McpServerTool]
public static int Multiply(int a, int b) => a * b;
[McpServerTool]
[Description("危险操作:删除数据")]
public static string DeleteData(string target) => $"已删除 {target}";
[McpServerTool, Description("管理员专用:重置系统")]
public static string ResetSystem() => "系统已重置";
}
// 创建工具过滤 Filter
McpRequestHandler<ListToolsRequestParams, ListToolsResult> ToolFilteringFilter(
McpRequestHandler<ListToolsRequestParams, ListToolsResult> next)
{
return async (request, cancellationToken) =>
{
// 调用下一个处理器获取完整工具列表
var response = await next(request, cancellationToken);
// 定义敏感工具列表(实际场景中可能从配置或数据库读取)
var sensitiveTools = new[] { "DeleteData", "ResetSystem" };
// 过滤敏感工具
if (response.Tools != null)
{
var originalCount = response.Tools.Count;
response.Tools = response.Tools
.Where(tool => !sensitiveTools.Contains(tool.Name))
.ToList();
var filteredCount = originalCount - response.Tools.Count;
if (filteredCount > 0)
{
Console.WriteLine($"[过滤Filter] 已过滤 {filteredCount} 个敏感工具");
}
}
return response;
};
}
- 组合多个 Filter
现在让我们创建一个包含多个 Filter 的 MCP Server,观察它们的执行顺序。
// 获取扩展工具列表
var tools2 = McpHelper.GetToolsForType<ExtendedCalculatorTools>();
// 创建 Server 配置选项
var serverOptions2 = new McpServerOptions();
// 添加多个 Filter(按顺序执行)
serverOptions2.Filters.ListToolsFilters.Add(LoggingFilter); // Filter 1: 日志记录
serverOptions2.Filters.ListToolsFilters.Add(ToolFilteringFilter); // Filter 2: 工具过滤
// 创建 Client 和 Server
var (client2, server2) = await McpHelper.CreateInMemoryClientAndServerAsync(
tools2,
serverOptions2
);
- 测试多个 Filter 的组合
try
{
Console.WriteLine("发送请求...\n");
var result2 = await client2.ListToolsAsync();
Console.WriteLine("\n最终返回的工具列表:");
foreach (var tool in result2 ?? [])
{
new {
工具名称 = tool.Name,
描述 = tool.Description
}.Display();
}
Console.WriteLine($"\n观察:敏感工具已被过滤,只返回了 {result2?.Count ?? 0} 个安全工具");
}
catch (Exception ex)
{
Console.WriteLine($"请求失败: {ex.Message}");
}
3. CallTool Filter
除了过滤工具列表,我们还可以拦截工具的实际调用。让我们创建一个 CallTool Filter 来演示。
- 创建工具调用拦截 Filter
// 创建工具调用日志 Filter
McpRequestHandler<CallToolRequestParams, CallToolResult> CallToolLoggingFilter(
McpRequestHandler<CallToolRequestParams, CallToolResult> next)
{
return async (request, cancellationToken) =>
{
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine($"[CallTool Filter] 工具调用开始");
Console.WriteLine($"工具名称: {request.Params.Name}");
Console.WriteLine($"参数数量: {request.Params.Arguments?.Count ?? 0}");
if (request.Params.Arguments?.Count > 0)
{
Console.WriteLine("参数详情:");
foreach (var arg in request.Params.Arguments)
{
Console.WriteLine($"- {arg.Key}: {arg.Value}");
}
}
var sw = Stopwatch.StartNew();
try
{
// 调用实际的工具
var response = await next(request, cancellationToken);
sw.Stop();
Console.WriteLine($"[CallTool Filter] 工具执行成功");
Console.WriteLine($"执行耗时: {sw.ElapsedMilliseconds}ms");
Console.WriteLine($"响应内容数: {response.Content?.Count ?? 0}");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
return response;
}
catch (Exception ex)
{
sw.Stop();
Console.WriteLine($"[CallTool Filter] 工具执行失败: {ex.Message}");
Console.WriteLine($"执行耗时: {sw.ElapsedMilliseconds}ms");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
throw;
}
};
}
- 注册 CallTool Filter 并测试
// 获取工具列表
var tools3 = McpHelper.GetToolsForType<CalculatorTools>();
// 创建 Server 配置选项
var serverOptions3 = new McpServerOptions();
// 添加 CallTool Filter
serverOptions3.Filters.CallToolFilters.Add(CallToolLoggingFilter);
// 创建 Client 和 Server
var (client3, server3) = await McpHelper.CreateInMemoryClientAndServerAsync(
tools3,
serverOptions3
);
// 测试工具调用
try
{
Console.WriteLine("调用工具 Add...\n");
var callResult = await client3.CallToolAsync("add", new Dictionary<string, object?>
{
["a"] = 10,
["b"] = 20
});
Console.WriteLine("工具执行结果:");
callResult.Display();
}
catch (Exception ex)
{
Console.WriteLine($"调用失败: {ex.Message}");
}
5. 企业级 Filter 实践
在实际应用中,我们通常会组合多个 Filter 来实现完整的功能。让我们创建一个企业级的示例。
- 创建企业级多功能 Filter 管道
// 1. 性能监控 Filter
McpRequestHandler<CallToolRequestParams, CallToolResult> PerformanceMonitoringFilter(
McpRequestHandler<CallToolRequestParams, CallToolResult> next)
{
return async (request, cancellationToken) =>
{
var sw = Stopwatch.StartNew();
try
{
var response = await next(request, cancellationToken);
sw.Stop();
// 性能警告
if (sw.ElapsedMilliseconds > 1000)
{
Console.WriteLine($"[性能监控] 工具 '{request.Params.Name}' 执行缓慢: {sw.ElapsedMilliseconds}ms");
}
return response;
}
catch
{
sw.Stop();
Console.WriteLine($"[性能监控] 工具 '{request.Params.Name}' 失败前耗时: {sw.ElapsedMilliseconds}ms");
throw;
}
};
}
// 2. 权限验证 Filter
McpRequestHandler<CallToolRequestParams, CallToolResult> AuthorizationFilter(
McpRequestHandler<CallToolRequestParams, CallToolResult> next)
{
return async (request, cancellationToken) =>
{
// 模拟权限检查
var restrictedTools = new[] { "DeleteData", "ResetSystem" };
if (restrictedTools.Contains(request.Params.Name))
{
Console.WriteLine($"[权限验证] 工具 '{request.Params.Name}' 需要管理员权限");
// 模拟权限检查失败
throw new UnauthorizedAccessException($"无权访问工具 '{request.Params.Name}'");
}
Console.WriteLine($"[权限验证] 工具 '{request.Params.Name}' 通过权限检查");
return await next(request, cancellationToken);
};
}
// 3. 审计日志 Filter
McpRequestHandler<CallToolRequestParams, CallToolResult> AuditLogFilter(
McpRequestHandler<CallToolRequestParams, CallToolResult> next)
{
return async (request, cancellationToken) =>
{
var timestamp = DateTime.Now;
var userId = "user_12345"; // 实际场景中从上下文获取
Console.WriteLine($"[审计日志] 时间: {timestamp:yyyy-MM-dd HH:mm:ss}, 用户: {userId}, 工具: {request.Params.Name}");
var response = await next(request, cancellationToken);
// 记录到审计数据库(这里只是模拟)
Console.WriteLine($"[审计日志] 已记录工具调用: {request.Params.Name}");
return response;
};
}
- 构建完整的企业级 Filter 管道
// 获取工具列表
var enterpriseTools = McpHelper.GetToolsForType<CalculatorTools>();
// 创建企业级 Server 配置选项
var enterpriseOptions = new McpServerOptions();
// 按最佳实践顺序添加 Filter
// 1. 外层:性能监控(记录总体性能)
enterpriseOptions.Filters.CallToolFilters.Add(PerformanceMonitoringFilter);
// 2. 中层:权限验证(早期拦截未授权请求)
enterpriseOptions.Filters.CallToolFilters.Add(AuthorizationFilter);
// 3. 内层:审计日志(记录通过验证的调用)
enterpriseOptions.Filters.CallToolFilters.Add(AuditLogFilter);
// 4. 最内层:实际的工具调用 Filter
enterpriseOptions.Filters.CallToolFilters.Add(CallToolLoggingFilter);
// 创建 Client 和 Server
var (enterpriseClient, enterpriseServer) = await McpHelper.CreateInMemoryClientAndServerAsync(
enterpriseTools,
enterpriseOptions
);
Console.WriteLine("企业级 MCP Client 和 Server 创建完成");
Console.WriteLine("\nFilter 管道结构:");
Console.WriteLine(" 1. 性能监控 Filter");
Console.WriteLine(" 2. 权限验证 Filter");
Console.WriteLine(" 3. 审计日志 Filter");
Console.WriteLine(" 4. 调用日志 Filter");
Console.WriteLine(" 5. 核心工具处理器");
- 测试企业级 Filter 管道
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine(" 企业级 MCP Server Filter 管道测试 ");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
// 测试 1:正常的工具调用
Console.WriteLine("【测试 1】正常工具调用:Add\n");
try
{
var result1 = await enterpriseClient.CallToolAsync("add", new Dictionary<string, object?>
{
["a"] = 5,
["b"] = 3
});
Console.WriteLine("测试 1 通过\n");
}
catch (Exception ex)
{
Console.WriteLine($"测试 1 失败: {ex.Message}\n");
}
// 测试 2:调用受限工具(权限验证应该拦截)
Console.WriteLine("\n【测试 2】调用受限工具:DeleteData\n");
try
{
var result2 = await enterpriseClient.CallToolAsync("DeleteData", new Dictionary<string, object?>
{
["target"] = "important_data"
});
Console.WriteLine("测试 2 失败:应该被拦截但未被拦截\n");
}
catch (McpException ex) when (ex.InnerException is UnauthorizedAccessException)
{
Console.WriteLine($"测试 2 通过:权限验证成功拦截 - {ex.InnerException.Message}\n");
}
catch (Exception ex)
{
Console.WriteLine($"测试 2 异常: {ex.Message}\n");
}
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
Console.WriteLine(" 测试完成 ");
Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
6. 高级场景与最佳实践
Filter 执行顺序的最佳实践
- 性能监控在最外层:可以测量整个请求的总耗时
- 权限验证在前面:早期拦截未授权请求,避免浪费资源
- 审计日志在中间:只记录通过验证的请求
- 业务逻辑靠内层:在基础检查通过后执行
// 方式 1:使用扩展方法
services.AddMcpServer()
.WithTools<MyTools>()
.AddCallToolFilter(PerformanceMonitoringFilter)
.AddCallToolFilter(AuthorizationFilter)
.AddCallToolFilter(AuditLogFilter)
.AddCallToolFilter(BusinessLogicFilter);
// 方式 2:使用 McpServerOptions.Filters
var options = new McpServerOptions();
options.Filters.CallToolFilters.Add(PerformanceMonitoringFilter);
options.Filters.CallToolFilters.Add(AuthorizationFilter);
options.Filters.CallToolFilters.Add(AuditLogFilter);
options.Filters.CallToolFilters.Add(BusinessLogicFilter);
常见的 Filter 应用场景
| Filter 类型 | 应用场景 | 实现要点 |
|---|---|---|
| 日志 Filter | 记录所有请求响应 | 在 try-finally 中确保日志记录 |
| 权限 Filter | 验证用户权限 | 在调用 next() 前检查,失败直接抛异常 |
| 缓存 Filter | 缓存频繁请求的结果 | 检查缓存,命中则不调用 next() |
| 限流 Filter | 防止请求过载 | 使用 SemaphoreSlim 或 RateLimiter |
| 转换 Filter | 修改请求或响应 | 修改参数后调用 next(),修改响应后返回 |
| 监控 Filter | 收集性能指标 | 使用 Stopwatch 测量,记录到监控系统 |
Filter 中的错误处理
关键点:
- 使用 try-catch 捕获异常
- 记录详细的错误信息
- 通常应该重新抛出异常(除非有特殊处理)
- 可以在捕获后返回默认响应(谨慎使用)
McpRequestHandler<TParams, TResult> SafeFilter(
McpRequestHandler<TParams, TResult> next)
{
return async (request, cancellationToken) =>
{
try
{
// 前置逻辑
ValidateRequest(request.Params);
// 调用下一个处理器
var response = await next(request, cancellationToken);
// 后置逻辑
ProcessResponse(response);
return response;
}
catch (ValidationException ex)
{
// 记录验证错误
LogValidationError(ex);
throw; // 向上传播
}
catch (Exception ex)
{
// 记录未知错误
LogError(ex);
throw; // 向上传播
}
};
}
性能优化建议
避免过度使用 Filter:
每个 Filter 都会增加调用开销
过多的 Filter 会使调试变得困难
建议将相关功能合并到一个 Filter 中
异步操作注意事项:
Filter 是异步的,避免阻塞操作
使用 ConfigureAwait(false) 优化性能(在非 UI 场景)
考虑使用 ValueTask 减少分配
// 不推荐:多个简单 Filter
options.Filters.CallToolFilters.Add(LogStartFilter);
options.Filters.CallToolFilters.Add(LogEndFilter);
options.Filters.CallToolFilters.Add(MeasureTimeFilter);
// 推荐:合并为一个综合 Filter
options.Filters.CallToolFilters.Add(ComprehensiveLoggingFilter);
Filter 配置方式的选择建议
使用扩展方法(AddXxxFilter)的场景:
构建 ASP.NET Core 应用
使用完整的 DI 容器
需要服务生命周期管理
团队熟悉 ASP.NET Core 风格
使用 McpServerOptions.Filters 的场景:
编写单元测试
快速原型开发
需要动态添加/移除 Filter
使用 McpHelper 等辅助工具
需要更精细的控制
7. MCP Server 支持的所有 Filter 类型
- Tools(工具)相关
- ListToolsFilters - 工具列表查询过滤
- 用途:过滤、排序、分页工具列表
- 方法:AddListToolsFilter()
- CallToolFilters - 工具调用过滤
- 用途:拦截、记录、验证工具调用
- 方法:AddCallToolFilter()
- ListToolsFilters - 工具列表查询过滤
- Prompts(提示)相关
- ListPromptsFilters - 提示列表查询过滤
- 用途:过滤、排序、分页提示列表
- 方法:AddListPromptsFilter()
- GetPromptFilters - 获取提示过滤
- 用途:动态生成或修改提示内容
- 方法:AddGetPromptFilter()
- ListPromptsFilters - 提示列表查询过滤
- Resources(资源)相关
- ListResourceTemplatesFilters - 资源模板列表过滤
- 用途:管理资源模板的可见性
- 方法:AddListResourceTemplatesFilter()
- ListResourcesFilters - 资源列表过滤
- 用途:过滤、排序资源列表
- 方法:AddListResourcesFilter()
- ReadResourceFilters - 读取资源过滤
- 用途:权限验证、内容转换
- 方法:AddReadResourceFilter()
- ListResourceTemplatesFilters - 资源模板列表过滤
- 其他功能
- CompleteFilters - 自动补全过滤
- 用途:提供智能补全建议
- 方法:AddCompleteFilter()
- SubscribeToResourcesFilters - 资源订阅过滤
- 用途:管理资源变更订阅
- 方法:AddSubscribeToResourcesFilter()
- UnsubscribeFromResourcesFilters - 取消订阅过滤
- 用途:管理取消订阅请求
- 方法:AddUnsubscribeFromResourcesFilter()
- SetLoggingLevelFilters - 设置日志级别过滤
- 用途:动态调整日志级别
- 方法:AddSetLoggingLevelFilter()
- CompleteFilters - 自动补全过滤
四、Filter vs Handler
1. 核心概念对比
Filter
- 定义:Filter 是一种中间件模式,用于在请求处理的前后添加横切关注点(Cross-cutting Concerns)。
- 本质:Filter 是一个高阶函数,它接收一个 Handler,返回一个新的 Handler。
- 工作方式:拦截请求 → 前置处理 → 调用 next() → 后置处理 → 返回响应
Handler
- 定义:Handler 是请求处理的核心逻辑,负责实际执行请求并返回结果。
- 本质:Handler 是请求处理的终点,直接处理业务逻辑。
- 工作方式:接收请求 → 执行业务逻辑 → 返回结果
详细对比表
| 维度 | Filter(过滤器) | Handler(处理器) |
|---|---|---|
| 设计目的 | 添加横切关注点(日志、权限、监控等) | 实现核心业务逻辑 |
| 函数类型 | 高阶函数(接收 Handler,返回 Handler) | 普通异步函数 |
| 执行位置 | 请求处理的前后 | 请求处理的核心 |
| 是否调用 next | ||
| 可否修改请求 | ||
| 可否修改响应 | ||
| 可否短路请求 | ||
| 组合方式 | 链式组合,形成管道 | 单个 Handler 替换默认处理 |
| 典型应用 | 日志、权限、性能监控、数据转换 | 动态工具列表、自定义业务逻辑 |
| 配置方式 | AddXxxFilter() 或 Filters.XxxFilters.Add() | WithXxxHandler() 或 Handlers.XxxHandler = |
| 与 Attribute 方式的关系 | 在 Attribute 注册的工具之上增强 | 完全替换 Attribute 方式的处理 |
| 优先级 | 在 Handler 之前执行 | 在 Filter 之后执行 |
| 数量限制 | 可以添加多个,形成管道 | 每种类型只能有一个 |
2. 执行流程对比
Filter
- Filter 按添加顺序形成管道
- 第一个添加的 Filter 在最外层
- 每个 Filter 都有前置和后置两个执行点
- 可以在任何 Filter 中短路请求(不调用 next())
Handler
- Handler 直接处理请求
- 没有 next(),是处理的终点
- 完全控制响应生成过程
- 每种请求类型只能有一个 Handler
Filter + Handler 的组合流程
flowchart LR
A[请求] --> B[Filter 1 前置]
B --> C[Filter 2 前置]
C --> D[Handler]
D --> E[Filter 2 后置]
E --> F[Filter 1 后置]
F --> G[响应]
style D fill:#ff9800
style B fill:#e3f2fd
style C fill:#f3e5f5
说明:
- Filter 在外层,Handler 在内核
- Filter 可以在 Handler 执行前后增强功能
- Handler 负责核心业务逻辑
3. 代码示例对比
Filter 示例:日志记录
// Filter 是高阶函数:接收 Handler,返回新的 Handler
McpRequestHandler<ListToolsRequestParams, ListToolsResult> LoggingFilter(
McpRequestHandler<ListToolsRequestParams, ListToolsResult> next)
{
// 返回一个新的 Handler
return async (request, cancellationToken) =>
{
// 前置逻辑:记录请求开始
Console.WriteLine($"收到 ListTools 请求");
var sw = Stopwatch.StartNew();
try
{
// 调用下一个处理器(可能是另一个 Filter 或最终的 Handler)
var response = await next(request, cancellationToken);
sw.Stop();
// 后置逻辑:记录响应结果
Console.WriteLine($"请求完成,返回 {response.Tools?.Count} 个工具");
Console.WriteLine($"耗时: {sw.ElapsedMilliseconds}ms");
return response;
}
catch (Exception ex)
{
sw.Stop();
Console.WriteLine($"请求失败: {ex.Message}");
throw;
}
};
}
// 配置方式 1:使用扩展方法
services.AddMcpServer()
.WithTools<MyTools>()
.AddListToolsFilter(LoggingFilter);
// 配置方式 2:使用 McpServerOptions
var options = new McpServerOptions();
options.Filters.ListToolsFilters.Add(LoggingFilter);
Handler 示例:动态工具列表
// Handler 直接处理请求,返回结果
async Task<ListToolsResult> CustomListToolsHandler(
McpRequestContext<ListToolsRequestParams> context,
CancellationToken cancellationToken)
{
Console.WriteLine($"自定义 Handler 处理 ListTools 请求");
// 动态生成工具列表
var tools = new List<Tool>
{
new Tool
{
Name = "dynamic_tool_1",
Description = "动态生成的工具 1",
InputSchema = JsonSerializer.SerializeToElement(new { type = "object" })
},
new Tool
{
Name = "dynamic_tool_2",
Description = "动态生成的工具 2",
InputSchema = JsonSerializer.SerializeToElement(new { type = "object" })
}
};
// 可以从数据库、配置文件或外部 API 加载工具
// var toolsFromDb = await _toolRepository.GetToolsAsync();
// tools.AddRange(toolsFromDb);
Console.WriteLine($"返回 {tools.Count} 个动态工具");
return new ListToolsResult
{
Tools = tools
};
}
// 配置方式 1:使用扩展方法
services.AddMcpServer()
.WithListToolsHandler(CustomListToolsHandler);
// 配置方式 2:使用 McpServerOptions
var options = new McpServerOptions();
options.Handlers.ListToolsHandler = CustomListToolsHandler;
4. 使用场景对比
何时使用 Filter:需要在不改变核心逻辑的情况下添加功能
| 场景 | 说明 | 示例 |
|---|---|---|
| 日志记录 | 记录所有请求和响应 | 记录工具调用次数、参数、结果 |
| 权限验证 | 验证用户是否有权访问 | 检查用户角色,过滤受限工具 |
| 性能监控 | 测量请求处理时间 | 记录每个请求的耗时 |
| 数据转换 | 修改请求或响应格式 | 添加额外的工具元数据 |
| 错误处理 | 统一捕获和处理异常 | 将异常转换为友好的错误消息 |
| 缓存 | 缓存频繁请求的结果 | 缓存工具列表,避免重复计算 |
| 审计 | 记录操作日志到数据库 | 记录谁在何时调用了哪个工具 |
| 限流 | 限制请求频率 | 防止滥用,保护系统资源 |
代码特征:
// Filter 的典型结构
McpRequestHandler<TParams, TResult> MyFilter(McpRequestHandler<TParams, TResult> next)
{
return async (request, ct) =>
{
// 前置处理
DoSomethingBefore();
// 调用下一个处理器
var response = await next(request, ct);
// 后置处理
DoSomethingAfter(response);
return response;
};
}
何时使用 Handler:需要完全替换默认行为或动态生成内容
| 场景 | 说明 | 示例 |
|---|---|---|
| 动态工具 | 运行时动态生成工具列表 | 从数据库加载工具配置 |
| 外部集成 | 从外部系统获取资源 | 连接文件系统、数据库、API |
| 自定义逻辑 | 实现复杂的业务逻辑 | 根据上下文返回不同的提示 |
| 企业级集成 | 与现有系统深度集成 | 连接企业知识库、工作流系统 |
| 完全自定义 | 不使用 Attribute 方式 | 完全自己控制工具列表和调用 |
代码特征:
// Handler 的典型结构
async Task<TResult> MyHandler(
McpRequestContext<TParams> context,
CancellationToken ct)
{
// 直接实现业务逻辑
var result = await GenerateResponseAsync(context.Params);
return result;
}
Filter 与 Handler 的组合使用
// 场景:企业级工具管理系统
// 1. 使用 Handler 动态生成工具列表
options.Handlers.ListToolsHandler = async (context, ct) =>
{
// 从数据库加载工具配置
var tools = await _toolRepository.GetToolsAsync();
return new ListToolsResult { Tools = tools };
};
// 2. 使用 Filter 添加权限验证
options.Filters.ListToolsFilters.Add(next => async (request, ct) =>
{
var response = await next(request, ct);
// 过滤用户无权访问的工具
var user = GetCurrentUser();
response.Tools = response.Tools
.Where(tool => user.HasPermission(tool.Name))
.ToList();
return response;
});
// 3. 使用 Filter 添加日志记录
options.Filters.ListToolsFilters.Add(next => async (request, ct) =>
{
var sw = Stopwatch.StartNew();
var response = await next(request, ct);
sw.Stop();
_logger.LogInformation(
"ListTools returned {Count} tools in {Ms}ms",
response.Tools.Count,
sw.ElapsedMilliseconds
);
return response;
});
执行顺序
请求 → 日志 Filter 前置
→ 权限 Filter 前置
→ Handler(加载工具)
→ 权限 Filter 后置(过滤工具)
→ 日志 Filter 后置(记录结果)
→ 响应
5. 配置方式对比
Filter 的两种配置方式
- 方式 1:使用扩展方法(推荐用于 DI 场景)
services.AddMcpServer()
.WithTools<MyTools>()
.AddListToolsFilter(LoggingFilter)
.AddCallToolFilter(AuthorizationFilter);
- 方式 2:使用 McpServerOptions(推荐用于测试/手动创建)
var options = new McpServerOptions();
options.Filters.ListToolsFilters.Add(LoggingFilter);
options.Filters.CallToolFilters.Add(AuthorizationFilter);
var (client, server) = await McpHelper.CreateInMemoryClientAndServerAsync(
tools, options
);
Handler 的两种配置方式
- 方式 1:使用扩展方法
services.AddMcpServer()
.WithListToolsHandler(CustomListToolsHandler)
.WithCallToolHandler(CustomCallToolHandler);
- 方式 2:使用 McpServerOptions
var options = new McpServerOptions();
options.Handlers.ListToolsHandler = CustomListToolsHandler;
options.Handlers.CallToolHandler = CustomCallToolHandler;
var (client, server) = await McpHelper.CreateInMemoryClientAndServerAsync(
tools, options
);
