通用AI聊天平台实现4:扩展MCP工具
2025-08-02 22:16:40一、MCP概述
MCP(Model Context Protocol)是由Anthropic于2024年底提出并开源的一种协议,旨在为AI系统(如AI编程助手、Agent等)提供安全、标准化的数据访问方式。它采用客户端-服务器架构,使AI工具(如Claude Desktop、IDE插件等)能够通过MCP客户端与MCP服务端交互,访问本地或远程数据源。
1. 基本流程

- AI大模型--->通过函数调用--->函数列表--->发送给大模型--->判断调用函数+生成调用参数--->应用将结果返回给大模型
- 函数调用--->交互模式--->调用规范
2. MCP的意义
日常开发中由于接口碎片化,我们可能需要开发搜索、SQL数据库、API调用工具等等。
有了MCP,我们可以将
工具开发,封装成MCP服务器
AI应用开发,连接MCP服务器
数据处理与隐私安全,MCP服务器可以在本地进程中运行,对接本地设备的私有数据
服务集成与扩展效率
- MCP服务器可以方便集成
- MCP服务器可以复用,配置即接入
- AI应用只需要实现一次MCP接入,就拥有了与所有MCP服务器通信的能力
应用场景
- 企业办公场景
- 个人AI助手场景
总之:MCP就是AI应用与外部的工具。
3. 基础概念
MCP 是客户端-服务端架构,一个 Host 可以连接多个 MCP Server。
- MCP Hosts(宿主程序):如Claude Desktop、IDE等,通过MCP访问数据。
- MCP Clients(客户端):与服务器建立1:1连接,处理通信。
- MCP Servers(服务端):轻量级程序,提供标准化的数据或工具访问能力。
- Local Data Sources(本地数据源):如文件、数据库等,由MCP服务端安全访问。
- Remote Services(远程服务):如API、云服务等,MCP服务端可代理访问。

6. 协议层与传输层
协议层:负责消息封装(framing)、请求/响应关联、高级通信模式管理。
传输层:支持两种通信方式
Stdio传输(标准输入/输出):适用于本地进程间通信。
HTTP + SSE传输:
- 服务端→客户端:Server-Sent Events(SSE)
- 客户端→服务端:HTTP POST
- 适用于远程网络通信。
所有传输均采用JSON-RPC 2.0进行消息交换。
5. 消息类型
MCP 拥有多种类型的消息来处理不同的场景
- 请求(Request)(期望获得响应)
interface Request {
method: string;
params?: { ... };
}
- 成功响应(Result)
interface Result {
[key: string]: unknown;
}
- 错误响应(Error)
interface Error {
code: number;
message: string;
data?: unknown;
}
- 通知(Notification)(单向,无需响应)
interface Notification {
method: string;
params?: { ... };
}
6. 生命周期
类似于三次握手,MCP客户端与MCP服务端初始化建立连接会进行以下步骤:
- 初始化(Initialization)
- 客户端发送initialize请求(含协议版本、能力集)。
- 服务端返回版本及能力信息。
- 客户端发送initialized通知确认。
- 进入正常通信阶段。
- 消息交换(Message Exchange):当初始化完毕,就可以进行通信了,目前支持:
- 请求-响应模式(Request-Response):双向通信。
- 通知模式(Notification):单向消息。
- 终止(Termination):有以下几种方式会关闭连接
- 主动关闭(close())。
- 传输层断开。
- 错误触发终止。
7. MCP交互流程
假设有一个AI助手应用(MCP主机),需要完成在数据库中查找最新的销售报告,并将其通过电子邮件发送给我。我们分析一下这个场景MCP的交互流程
- 初始化:MCP客户端连接到两个MCP服务器(数据库访问、邮件发送)
- 用户查询:输入请求
- LLM推理:先查询数据,发送邮件
- 工具调用:包含工具名和参数的请求应用,请求MCP客户端执行调用
- 执行:MCP客户端向调用--->MCP服务器发送一个调用--->执行返回结果
- 中间响应:应用结果返回给大模型,LLM得到销售数据,调用邮件发送工具(包含收件人、内容),返回应用
- 再次执行:应用拿到工具调用请求,请求MCP客户端调用--->MCP服务器执行,返回确认信息--->LLM
- 生成最终答复
8. MCP生态
二、构建MCP主机应用
1. 安装MCP文件系统服务
这里我们从github中下载modelcontextprotocol提供的filesystem来做示例。
安装这个文件系统服务提供了很多种方式,细节可以查看项目说明。我们这里采用 npx来安装
npm install -g @modelcontextprotocol/server-filesystem
2. 构建MCP主机
接下来我们构建一个MCP主机,来使用这个文件系统服务。项目文档里面也有介绍
npx -y @modelcontextprotocol/server-filesystem C:/Test

C:/Test 是最小配置原则,必需至少配置一个可以访问的目录,文件系统服务操作设置的目录。多个目录用空格区分
上图我们看到已经启动了一个MCP服务,这个服务使用的是stdio通信方式,这意味着我们没有任何方法来使用这个MCP服务,因为没有提供外部的通信方式。
stdio包括: 标准输入 stdin,标准输出 stdout,标准错误 stderr。它的通信方式是嵌入到其他进程,已子进程的方式来执行。
我们可以用这个命令来看一下服务是否正常(检查正常后可以关闭)。
3. MCP Client Demo
接下来我们写一个Demo,把文件系统服务嵌入到项目中。
import asyncio
import json
import os
from contextlib import AsyncExitStack
from typing import Optional
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import OpenAI
load_dotenv()
class FileAgentClient:
def __init__(self):
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.openai_client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
self.conversation_history = []
async def connect_to_server(self, allowed_directory: str):
"""启动并连接到 Filesystem MCP server。"""
print(f"正在启动 Filesystem Server,授权访问目录: {allowed_directory}...")
server_params = StdioServerParameters(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", allowed_directory]
)
# 启动服务器子进程并建立 stdio 连接
read, write = await self.exit_stack.enter_async_context(stdio_client(server_params))
# 创建并初始化客户端会话
self.session = await self.exit_stack.enter_async_context(ClientSession(read, write))
await self.session.initialize()
response = await self.session.list_tools()
tool_names = [tool.name for tool in response.tools]
print(f"连接成功!可用文件工具: {tool_names}")
async def process_query(self, user_query: str):
"""处理用户查询,与 LLM 和 MCP 服务器交互。"""
self.conversation_history.append({"role": "user", "content": user_query})
# 1. 从服务器获取可用工具的定义
list_tools_response = await self.session.list_tools()
available_tools = []
# 转换为 LLM 能识别的 function 工具格式
for tool in list_tools_response.tools:
tool_schema = getattr(
tool,
"inputSchema",
{"type": "object", "properties": {}, "required": []},
)
openai_tool = {
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool_schema,
},
}
available_tools.append(openai_tool)
# 2. 调用 LLM,并提供工具定义
print("正在思考...")
response = self.openai_client.chat.completions.create(
model="qwen3-max",
messages=self.conversation_history,
tools=available_tools
)
choice = response.choices[0]
message = choice.message
# 3. 判断是否有工具调用(tool_calls)
if hasattr(message, "tool_calls") and message.tool_calls:
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments or "{}")
print(f"LLM 请求调用 MCP 工具: {tool_name}({tool_args})")
# 调用 MCP 服务器对应工具
result = await self.session.call_tool(tool_name, tool_args)
print(f"工具执行结果: {result}")
# 把结果反馈给 LLM
self.conversation_history.append(message)
self.conversation_history.append({
"role": "tool",
"tool_call_id": tool_call.id,
"name": tool_name,
"content": str(result)
})
# 再次调用 LLM,让它基于工具结果总结回复
print("正在生成最终回答...")
final_response = self.openai_client.chat.completions.create(
model="qwen3-max",
messages=self.conversation_history
)
answer = final_response.choices[0].message.content
else:
# 没有工具调用,直接输出 LLM 回复
answer = message.content
# 记录回复内容
self.conversation_history.append({"role": "assistant", "content": answer})
return answer
async def chat(self):
"""运行交互式聊天循环。"""
print("\n--- 文件系统代理已启动 ---")
print("输入 'quit' 退出。")
while True:
try:
query = input("\n> ").strip()
if query.lower() == 'quit':
break
if not query:
continue
response_text = await self.process_query(query)
print(f"\nAI: {response_text}")
except Exception as e:
print(f"\n发生错误: {e}")
async def main():
client = FileAgentClient()
allowed_dir = os.getenv("ALLOWED_DIRECTORY", ".")
await client.connect_to_server(allowed_dir)
await client.chat()
if __name__ == "__main__":
asyncio.run(main())
# .env
DASHSCOPE_API_KEY="sk-XXXXXXXXXXXXXXXXXX"
ALLOWED_DIRECTORY="C:\Test"
三、AI聊天应用集成MCP设计
1. 集成需求分析
- 服务的生命周期管理
- 多样化连接支持
- 配置持久化
- 服务自启动
- 动态工具发现
2. 功能设计
- API层:管理服务配置
- 业务服务层:API层和底层AI逻辑的协调者
- 创建一个服务请求
- 将服务配置写入数据库
- 调用AI层去启动服务
- AI核心层(MCP模块):客户端管理器(全局单例)
- 连接管理
- 适配协议
- 工具加载
- 资源回收
- 持久化层:存储MCP服务的配置信息,与会话多对多关联
3. MCP核心工作流程
配置一个MCP服务:
- 创建一个新的MCP服务器(文件系统服务、stido、参数)
- API层接受请求,转交给MCP业务层
- 业务层:服务的配置信息(名称、参数配置)存数据库,传递服务配置给AI层,启动并连接该服务
- AI层:收到启动请求,根据服务类型选择对应的连接逻辑
MCP服务自启动:
- 应用生命周期事件
- 业务层:查询所有已启用的MCP服务配置,返回配置列表
- 应用启动逻辑遍历列表,调用AI层逐个启动服务器
聊天中使用工具:
- 创建聊天会话选择服务,会话与MCP服务绑定
- 用户在会话输入问题
- AI聊天服务接受请求,加载会话信息,向AI层 MCP请求,获取绑定服务的工具列表
- AI聊天服务,工具列表与用户提问发送给大模型,调用工具
提问:mcp是大模型自动去调用吗?
MCP 只是提供工具列表:
- MCP主机(就是我们的应用),会把消息和能使用的工具一起发送给大模型
- 大模型给出工具调用请求,返回给我们的应用
- 我们的主机再去找MCP调用工具,返回结果给应用
- 然后,应用再把结构给模型,生成最终答复
四、实现AI聊天集成MCP
1. 实现AI层MCP核心功能
- 抽象一个MCP模块基类
# app/ai/mcp/base.py
"""
MCP 模块抽象基类,提供统一的接口和数据模型
"""
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Dict, Optional
from langchain_core.tools import BaseTool
from pydantic import BaseModel
class MCPTransportType(str, Enum):
"""MCP 传输类型枚举"""
STDIO = "stdio"
STREAMABLE_HTTP = "streamable_http"
SSE = "sse"
class MCPServiceConfig(BaseModel):
"""MCP 服务配置"""
service_id: int
name: str
transport_type: MCPTransportType
command: Optional[str] = None
args: Optional[List[str]] = None
env: Optional[Dict[str, str]] = None
url: Optional[str] = None
headers: Optional[Dict[str, str]] = None
enabled: bool = True
class MCPServiceStatus(BaseModel):
"""MCP 服务状态"""
service_id: int
name: str
running: bool
error: Optional[str] = None
tools_count: int = 0
class MCPProvider(ABC):
"""MCP 服务提供者抽象基类"""
@abstractmethod
async def start_service(self, service: MCPServiceConfig) -> bool:
"""启动单个MCP服务"""
pass
@abstractmethod
async def stop_service(self, service_id: int) -> bool:
"""停止单个MCP服务"""
pass
@abstractmethod
async def update_service(self, service: MCPServiceConfig) -> bool:
"""更新MCP服务配置"""
pass
@abstractmethod
async def get_service_status(self, service_id: int) -> MCPServiceStatus:
"""获取MCP服务状态"""
pass
@abstractmethod
async def get_tools(self, service_id: int) -> List[BaseTool]:
"""获取指定服务的工具列表"""
pass
@abstractmethod
async def get_tools_by_ids(self, service_ids: List[int]) -> List[BaseTool]:
"""根据服务ID列表获取工具列表"""
pass
- 实现MCP客户端管理器
# app/ai/mcp/client_manager.py
import logging
logger = logging.getLogger(__name__)
from langchain_core.tools import BaseTool
from typing import Dict, List
from contextlib import AsyncExitStack
from mcp import ClientSession
from mcp.client.stdio import stdio_client, StdioServerParameters
from mcp.client.streamable_http import streamablehttp_client, StreamableHTTPTransport
from mcp.client.sse import sse_client
from langchain_mcp_adapters.tools import load_mcp_tools
from .base import MCPProvider, MCPServiceConfig, MCPServiceStatus, MCPTransportType
class MCPClientManager(MCPProvider):
"""MCP客户端管理器"""
def __init__(self):
self.services: Dict[int, MCPServiceConfig] = {}
self.sessions: Dict[int, ClientSession] = {}
self.exit_stacks: Dict[int, AsyncExitStack] = {}
async def start_service(self, service: MCPServiceConfig) -> bool:
"""启动单个MCP服务"""
try:
self.services[service.service_id] = service
if service.transport_type == MCPTransportType.STDIO:
await self._start_stdio_service(service)
elif service.transport_type == MCPTransportType.STREAMABLE_HTTP:
await self._start_streamable_http_service(service)
elif service.transport_type == MCPTransportType.SSE:
await self._start_sse_service(service)
else:
raise ValueError(f"不支持的传输类型: {service.transport_type}")
logger.info(f"成功启动MCP服务: {service.name} (ID: {service.service_id})")
return True
except Exception as e:
logger.error(f"启动MCP服务失败 {service.name}: {e}")
return False
async def _start_stdio_service(self, service: MCPServiceConfig) -> None:
"""启动stdio类型的MCP服务"""
command = service.command
args = service.args or []
if not command:
raise ValueError(f"stdio服务 {service.name} 缺少command配置")
server_params = StdioServerParameters(
command=command,
args=args
)
# 使用stdio_client连接
exit_stack = AsyncExitStack()
read, write = await exit_stack.enter_async_context(
stdio_client(server_params)
)
# 创建客户端会话
session = ClientSession(read, write)
await exit_stack.enter_async_context(session)
# 初始化会话
await session.initialize()
self.sessions[service.service_id] = session
self.exit_stacks[service.service_id] = exit_stack
logger.info(f"stdio服务 {service.name} 启动成功")
async def _start_streamable_http_service(self, service: MCPServiceConfig) -> None:
"""启动streamable_http类型的MCP服务"""
url = service.url
headers = service.headers or {}
if not url:
raise ValueError(f"streamable_http服务 {service.name} 缺少url配置")
http_transport = StreamableHTTPTransport(
url=url,
headers=headers
)
# 使用streamablehttp_client连接
exit_stack = AsyncExitStack()
read, write, _ = await exit_stack.enter_async_context(
streamablehttp_client(http_transport)
)
# 创建客户端会话
session = ClientSession(read, write)
await exit_stack.enter_async_context(session)
# 初始化会话
await session.initialize()
self.sessions[service.service_id] = session
self.exit_stacks[service.service_id] = exit_stack
logger.info(f"streamable_http服务 {service.name} 启动成功")
async def _start_sse_service(self, service: MCPServiceConfig) -> None:
"""启动sse类型的MCP服务"""
url = service.url
headers = service.headers or {}
if not url:
raise ValueError(f"sse服务 {service.name} 缺少url配置")
http_transport = StreamableHTTPTransport(
url=url,
headers=headers
)
# 使用sse_client连接
exit_stack = AsyncExitStack()
read, write = await exit_stack.enter_async_context(
sse_client(http_transport)
)
# 创建客户端会话
session = ClientSession(read, write)
await exit_stack.enter_async_context(session)
# 初始化会话
await session.initialize()
self.sessions[service.service_id] = session
self.exit_stacks[service.service_id] = exit_stack
logger.info(f"sse服务 {service.name} 启动成功")
async def stop_service(self, service_id: int) -> bool:
"""停止单个MCP服务"""
try:
exit_stack = self.exit_stacks.get(service_id)
if not exit_stack:
logger.warning(f"服务 {service_id} 不存在或未运行")
return True # 认为已经停止
# 获取该服务的AsyncExitStack并关闭所有相关连接
if exit_stack:
try:
# 关闭该服务的所有连接(包括stdio、session等)
await exit_stack.aclose()
logger.info(f"成功关闭服务 {service_id} 的所有连接")
except Exception as e:
logger.warning(f"关闭服务 {service_id} 连接时出错: {e}")
self.exit_stacks.pop(service_id)
self.sessions.pop(service_id)
logger.info(f"成功停止MCP服务: {service_id}")
return True
except Exception as e:
logger.error(f"停止MCP服务失败 {service_id}: {e}")
return False
async def stop_all_service(self) -> None:
for key, stack in self.exit_stacks.items():
await stack.aclose()
async def update_service(self, service: MCPServiceConfig) -> bool:
"""更新MCP服务配置"""
try:
service_id = service.service_id
if service_id in self.services:
# 先停止旧服务
await self.stop_service(service_id)
# 启动新服务
await self.start_service(service)
logger.info(f"成功更新MCP服务: {service.name} (ID: {service_id})")
return True
except Exception as e:
logger.error(f"更新MCP服务失败 {service.name}: {e}")
return False
def get_service_status(self, service_id: int) -> MCPServiceStatus:
"""获取服务状态"""
if service_id not in self.exit_stacks:
return MCPServiceStatus(
service_id=service_id,
name=f"Service-{service_id}",
running=False,
error="服务未启动"
)
service = self.services[service_id]
session = self.sessions.get(service_id)
if not session:
return MCPServiceStatus(
service_id=service_id,
name=service.name,
running=False,
error="会话不存在"
)
return MCPServiceStatus(
service_id=service_id,
name=service.name,
running=True
)
async def get_tools(self, service_id: int) -> List[BaseTool]:
"""获取指定服务的工具列表"""
session = self.sessions.get(service_id)
if not session:
logger.warning(f"服务 {service_id} 不存在或未运行")
return []
try:
# 使用 langchain-mcp-adapters 加载工具
tools = await load_mcp_tools(session)
service_name = self.services[service_id].name
logger.info(f"成功获取服务 {service_name} 的 {len(tools)} 个工具")
return tools
except Exception as e:
logger.error(f"获取服务 {service_id} 工具失败: {e}")
return []
async def get_tools_by_ids(self, service_ids: List[int]) -> List[BaseTool]:
"""
根据服务ID列表获取工具列表
"""
all_tools = []
for service_id in service_ids:
tools = await self.get_tools(service_id)
all_tools.extend(tools)
return all_tools
- 提供全局单例函数
# app/ai/mcp/dependency.py
from .client_manager import MCPClientManager
# 全局单例变量
mcp_client_manager: MCPClientManager | None = None
def get_mcp_client_manager() -> MCPClientManager:
"""获取 MCPClientManager 单例"""
global mcp_client_manager
if mcp_client_manager is None:
mcp_client_manager = MCPClientManager()
return mcp_client_manager
- 重新实现chat方法
由于之前我们的chat方法是基于langchain链来实现,它不能完成工具调用,于是我们要改成代理的方式来重写chat方法,我们在app/ai文件夹,新建一个agent.py文件,代码如下:
import json
from typing import List, AsyncGenerator, Optional, Any
from langchain.agents import create_agent
from langchain_core.messages import BaseMessage, trim_messages, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import Runnable
from langchain_openai import ChatOpenAI
from app.ai.mcp.client_manager import MCPClientManager
from app.ai.vectorization.service import DocumentVectorizationService
from app.data.models import Role
class AgentService:
def __init__(self, role: Role, history: List[BaseMessage], max_tokens = 20,
vectorization_service: Optional[DocumentVectorizationService] = None,
mcp_manager: Optional[MCPClientManager] = None,
mcp_service_ids: Optional[List[int]] = None):
self.role = role
self.vectorization_service = vectorization_service
self.mcp_manager = mcp_manager
self.mcp_service_ids = mcp_service_ids
# 创建ChatOpenAI实例
self.llm = ChatOpenAI(
base_url=role.provider.endpoint,
api_key=role.provider.api_key,
model=role.provider.model,
temperature=role.temperature,
streaming=True
)
# 构建聊天历史消息的修剪器
trimmer = trim_messages(
token_counter=len,
max_tokens=max_tokens,
start_on=("human", "ai")
)
self.history = trimmer.invoke(history)
async def call_agent(self, messages: List[BaseMessage]) -> AsyncGenerator[str]:
"""调用代理模型进行聊天"""
tools = []
if self.mcp_manager and self.mcp_service_ids:
tools = await self.mcp_manager.get_tools_by_ids(self.mcp_service_ids)
agent: Runnable = create_agent(model=self.llm, tools=tools)
async for token, metadata in agent.astream(
{ "messages": messages },
stream_mode="messages"
):
node = metadata["langgraph_node"]
if node == "tools":
yield f"```\n调用工具:{token.name}\n调用结果:\n{token.content}\n```\n"
if node == "model":
yield token.content
async def chat_stream(self, user_input: str) -> AsyncGenerator[str]:
"""处理聊天请求"""
# 构建Prompt模板
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content=self.role.system_prompt),
MessagesPlaceholder(variable_name="history"),
("human", "{input}"),
])
messages = prompt.invoke({"history": self.history, "input": user_input}).to_messages()
async for chunk in self.call_agent(messages):
yield chunk
async def _search_knowledge_base(self, query: str, knowledge_base_ids: List[int]) -> List[Any] | None:
"""搜索知识库并返回带有元数据的JSON格式结果"""
if not self.vectorization_service or not knowledge_base_ids:
return None
# 直接调用DocumentVectorizationService搜索相关文档
search_results = await self.vectorization_service.search_similar_documents(
query=query,
knowledge_base_ids=knowledge_base_ids,
k=20
)
if not search_results:
return None
# 将搜索结果转换为带有元数据的JSON格式
formatted_results = []
for i, result in enumerate(search_results, 1):
metadata = result.get('metadata', {})
formatted_result = {
"index": i,
"content": result['content'],
"metadata": {
"filename": metadata.get('filename', '未知文件'),
"chunk_index": metadata.get('chunk_index', 0),
"total_chunks": metadata.get('total_chunks', 1)
}
}
formatted_results.append(formatted_result)
return formatted_results
async def rag_chat_stream(self, user_input: str, knowledge_base_ids: List[int]) -> AsyncGenerator[str]:
"""基于知识库的RAG流式聊天"""
# 搜索知识库
knowledge_data = await self._search_knowledge_base(user_input, knowledge_base_ids)
# 将知识库数据转换为JSON字符串
knowledge_json = json.dumps(knowledge_data, ensure_ascii=False, indent=2)
# 构建增强的系统提示词
enhanced_system_prompt = f"""{self.role.system_prompt}
基于以下知识库信息回答用户问题:
知识库搜索结果:
{knowledge_json}
重要要求:
1. 请根据上述知识库信息回答用户的问题
2. 在回答中必须明确指出信息来源,使用以下格式引用:
- 当引用某个文档片段时,使用 [1]、[2] 等格式标注
- 在回答末尾提供完整的来源列表
3. 如果知识库中没有相关信息,请明确说明"根据知识库信息,未找到相关内容"
4. 回答要准确、详细,并确保每个关键信息都有明确的来源标注
5. 充分利用JSON中的元数据信息,包括文件名、片段位置等
文档来源格式说明:
- [1] 表示第1个引用
- 每个结果包含:文件名、片段位置等元数据
- 在回答末尾提供完整的来源列表"""
# 构建Prompt模板
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content=enhanced_system_prompt),
MessagesPlaceholder(variable_name="history"),
("human", "{input}"),
])
messages = prompt.invoke({"history": self.history, "input": user_input}).to_messages()
async for chunk in self.call_agent(messages):
yield chunk
以上代码大部分逻辑跟chat.py一样,只是链式调用改成了代理实现。
2. 实现业务层MCP管理功能
- 修改数据层模型
# app/data/model/mcp_service.py
# ---------------------------
# MCP 服务
# ---------------------------
from datetime import datetime, timezone
from sqlalchemy import Column, Integer, String, DateTime, Boolean, JSON, func
from sqlalchemy.orm import relationship
from app.data.models.base import Base
class MCPService(Base):
__tablename__ = "mcp_services"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False, unique=True) # 服务名称(唯一)
description = Column(String(500), nullable=True) # 服务描述
transport = Column(String(20), nullable=False) # 传输类型(stdio/streamable_http/sse)
config = Column(JSON, nullable=False) # JSON 配置(存储 command/args/url/headers 等)
enabled = Column(Boolean, default=True) # 是否启用
created_at = Column(DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
server_default=func.now()) # 创建时间
# 定义关系:MCP 服务 -> 会话(多对多)
sessions = relationship("Session", secondary="session_mcp", back_populates="mcp_services")
# app/data/model/session.py
# ---------------------------
# 会话
# ---------------------------
from datetime import datetime, timezone
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, func
from sqlalchemy.orm import relationship
from app.data.models.base import Base
class Session(Base):
__tablename__ = "sessions"
id = Column(Integer, primary_key=True)
title = Column(String(100), nullable=False) # 标题
created_at = Column(DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
server_default=func.now()) # 创建时间
# 关联角色ID
role_id = Column(Integer, ForeignKey("roles.id"), nullable=False)
# 定义模型关系:会话 -> 角色(多对一)
role = relationship("Role", back_populates="sessions")
# 定义关系:会话 -> 消息(一对多)
messages = relationship("Message", back_populates="session", cascade="all, delete-orphan", passive_deletes=True)
# 定义关系:会话 -> 知识库(多对多)
knowledge_bases = relationship("KnowledgeBase", secondary="session_knowledge", back_populates="sessions", passive_deletes=True)
# 定义关系:会话 -> MCP服务(多对多)
mcp_services = relationship("MCPService", secondary="session_mcp", back_populates="sessions", passive_deletes=True)
# app/data/model/session_mcp.py
# ---------------------------
# 会话-MCP服务 关系表(多对多中间表)
# ---------------------------
from sqlalchemy import Table, Column, Integer, ForeignKey
from app.data.models.base import Base
session_mcp = Table(
"session_mcp",
Base.metadata,
Column("session_id", Integer, ForeignKey("sessions.id", ondelete="CASCADE"), primary_key=True),
Column("mcp_service_id", Integer, ForeignKey("mcp_services.id", ondelete="CASCADE"), primary_key=True)
)
# app/data/model/__init__.py
from .mcp_service import MCPService
from .session_mcp import session_mcp
- 修改DTO
# app/service/message.py
from datetime import datetime
from pydantic import BaseModel, Field
from enum import Enum
class MessageRole(str, Enum):
"""消息角色枚举"""
User = "user"
Assistant = "assistant"
System = "system"
class MessageBase(BaseModel):
"""Message基础DTO模型"""
role: MessageRole = Field(..., description="消息角色:user/assistant/system")
content: str = Field(..., description="消息内容")
session_id: int = Field(..., description="关联的会话ID")
class MessageResponse(MessageBase):
"""Message响应DTO模型"""
id: int = Field(..., description="消息ID")
created_at: datetime = Field(..., description="创建时间")
class Config:
from_attributes = True
class ChatRequest(BaseModel):
"""聊天请求DTO模型"""
message: str = Field(..., description="用户消息内容")
session_id: int = Field(..., description="会话ID")
use_knowledge_base: bool = Field(default=False, description="是否启用知识库问答")
use_mcp_tools: bool = Field(default=False, description="是否启用MCP工具")
# app/service/session.py
from typing import Optional, List
from datetime import datetime
from pydantic import BaseModel, Field
class SessionBase(BaseModel):
"""Session基础DTO模型"""
title: str = Field(..., max_length=100, description="会话标题")
role_id: int = Field(..., description="关联的角色ID")
class SessionCreate(SessionBase):
"""创建Session的DTO模型"""
knowledge_base_ids: Optional[List[int]] = Field(default=[], description="绑定的知识库ID列表")
mcp_service_ids: Optional[List[int]] = Field(default=[], description="绑定的会话ID列表")
class SessionResponse(SessionBase):
"""Session响应DTO模型"""
id: int = Field(..., description="会话ID")
created_at: datetime = Field(..., description="创建时间")
message_count: Optional[int] = Field(None, description="消息数量")
class Config:
from_attributes = True
class SessionUpdate(BaseModel):
"""更新Session的DTO模型"""
title: Optional[str] = Field(None, max_length=100, description="会话标题")
role_id: Optional[int] = Field(None, description="关联的角色ID")
knowledge_base_ids: Optional[List[int]] = Field(default=[], description="绑定的知识库ID列表")
mcp_services_ids: Optional[List[int]] = Field(default=[], description="绑定的会话ID列表")
class SessionListResponse(BaseModel):
"""Session列表响应DTO模型"""
id: int = Field(..., description="会话ID")
title: str = Field(..., description="会话标题")
created_at: datetime = Field(..., description="创建时间")
role_name: str = Field(..., description="角色名称")
class Config:
from_attributes = True
class SessionWithKnowledgeBases(SessionBase):
"""包含知识库详细信息的Session响应DTO模型"""
id: int = Field(..., description="会话ID")
created_at: datetime = Field(..., description="创建时间")
message_count: Optional[int] = Field(None, description="消息数量")
knowledge_bases: Optional[List[dict]] = Field(default=[], description="绑定的知识库详细信息")
class Config:
from_attributes = True
# app/service/mcp_session.py
from datetime import datetime
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field
from app.ai.mcp.base import MCPTransportType
class MCPServiceBase(BaseModel):
"""MCP 服务基础字段"""
name: str = Field(..., max_length=100, description="服务名称")
description: Optional[str] = Field(None, max_length=500, description="服务描述")
transport: MCPTransportType = Field(..., description="传输类型")
config: Dict[str, Any] = Field(..., description="服务配置")
enabled: bool = Field(True, description="是否启用")
class MCPServiceCreate(MCPServiceBase):
"""创建 MCP 服务 DTO"""
pass
class MCPServiceUpdate(BaseModel):
"""更新 MCP 服务 DTO"""
name: Optional[str] = Field(None, max_length=100, description="服务名称")
description: Optional[str] = Field(None, max_length=500, description="服务描述")
transport: Optional[MCPTransportType] = Field(None, description="传输类型")
config: Optional[Dict[str, Any]] = Field(None, description="服务配置")
enabled: Optional[bool] = Field(None, description="是否启用")
class MCPServiceResponse(MCPServiceBase):
"""MCP 服务响应 DTO"""
id: int = Field(..., description="服务ID")
created_at: datetime = Field(..., description="创建时间")
class Config:
from_attributes = True
- 实现业务层逻辑
# app/service/mcp_service.py
from typing import List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.data.models import MCPService
from app.schemas.mcp_service import MCPServiceCreate, MCPServiceUpdate, MCPServiceResponse
from app.ai.mcp.client_manager import MCPClientManager
from app.ai.mcp.base import MCPServiceConfig, MCPTransportType
class MCPManagerService:
def __init__(self, db: AsyncSession, mcp_client_manager: Optional[MCPClientManager] = None):
self.db = db
self.mcp_client_manager = mcp_client_manager
async def create(self, mcp_service: MCPServiceCreate) -> MCPServiceResponse:
"""创建MCP服务"""
# 检查服务名称是否已存在
existing = await self.db.scalar(
select(MCPService).where(MCPService.name == mcp_service.name)
)
if existing:
raise ValueError(f"MCP服务名称 '{mcp_service.name}' 已存在")
# 创建新服务
service_data = mcp_service.model_dump()
service = MCPService(**service_data)
self.db.add(service)
await self.db.commit()
await self.db.refresh(service)
if service.enabled:
service_config = self.convert_to_mcp_config(service)
await self.mcp_client_manager.start_service(service_config)
return MCPServiceResponse.model_validate(service)
async def get_by_id(self, service_id: int) -> Optional[MCPServiceResponse]:
"""根据ID获取MCP服务"""
service = await self.db.get(MCPService, service_id)
if not service:
return None
return MCPServiceResponse.model_validate(service)
async def get_all(self) -> List[MCPServiceResponse]:
"""获取所有MCP服务"""
result = await self.db.execute(
select(MCPService).order_by(MCPService.created_at.desc())
)
services = result.scalars().all()
return [MCPServiceResponse.model_validate(service) for service in services]
async def get_enabled_services_as_config(self) -> List[MCPServiceConfig]:
"""获取所有启用的MCP服务配置"""
result = await self.db.execute(
select(MCPService)
.where(MCPService.enabled == True)
)
services = result.scalars().all()
return [self.convert_to_mcp_config(service) for service in services]
async def update(self, service_id: int, mcp_service: MCPServiceUpdate) -> Optional[MCPServiceResponse]:
"""更新MCP服务"""
service: MCPService | None = await self.db.get(MCPService, service_id)
if not service:
return None
# 检查名称冲突
if mcp_service.name and mcp_service.name != service.name:
existing = await self.db.scalar(
select(MCPService).where(MCPService.name == mcp_service.name)
)
if existing:
raise ValueError(f"MCP服务名称 '{mcp_service.name}' 已存在")
# 更新字段
update_data = mcp_service.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(service, field, value)
await self.db.commit()
await self.db.refresh(service)
if service.enabled:
service_config = self.convert_to_mcp_config(service)
await self.mcp_client_manager.update_service(service_config)
return MCPServiceResponse.model_validate(service)
async def delete(self, service_id: int) -> bool:
"""删除MCP服务"""
service = await self.db.get(MCPService, service_id)
if not service:
return False
await self.db.delete(service)
await self.db.commit()
mcp_status = self.mcp_client_manager.get_service_status(service_id)
if mcp_status.running:
await self.mcp_client_manager.stop_service(service_id)
return True
async def get_services_by_ids(self, service_ids: List[int]) -> List[MCPServiceResponse]:
"""根据ID列表获取MCP服务"""
if not service_ids:
return []
result = await self.db.execute(
select(MCPService)
.where(MCPService.id.in_(service_ids))
.where(MCPService.enabled == True)
)
services = result.scalars().all()
return [MCPServiceResponse.model_validate(service) for service in services]
@staticmethod
def convert_to_mcp_config(service: MCPService) -> MCPServiceConfig:
"""将数据库模型转换为MCP内部配置"""
config = MCPServiceConfig(
service_id=service.id,
name=service.name,
transport_type = MCPTransportType(service.transport),
enabled=service.enabled
)
match config.transport_type:
case MCPTransportType.STDIO:
config.command = service.config['command']
config.args = service.config['args']
case MCPTransportType.SSE | MCPTransportType.STREAMABLE_HTTP:
config.url = service.config['url']
config.headers = service.config['headers']
return config
- 修改消息服务chat方法
# app/service/message.py
class MessageService:
def __init__(self, db: AsyncSession,
vectorization_service: Optional[DocumentVectorizationService] = None,
mcp_client_manager: Optional[MCPClientManager] = None):
self.db = db
self.vectorization_service = vectorization_service
self.mcp_client_manager = mcp_client_manager
async def _get_mcp_service_ids(self, session_id: int) -> List[int]:
"""获取会话关联的MCP服务ID列表"""
session = await self.db.scalar(
select(Session)
.options(
selectinload(Session.mcp_services)
)
.where(Session.id == session_id)
)
# 收集知识库ID
mcp_service_ids = set()
# 添加会话关联的知识库ID
for mcp in session.mcp_services:
mcp_service_ids.add(mcp.id)
return list(mcp_service_ids)
async def chat(self, chat_request: ChatRequest) -> AsyncGenerator[str]:
"""处理聊天请求"""
# 验证session是否存在
session = await self.db.get(Session, chat_request.session_id)
if not session:
raise ValueError(f"会话 ID {chat_request.session_id} 不存在")
# 获取会话绑定的角色与模型信息
role = await self.db.scalar(
select(Role)
.options(selectinload(Role.provider)) # 贪婪加载 provider
.where(Role.id == session.role_id)
)
if not role:
raise ValueError(f"角色【{role.name}】不存在")
# 创建聊天历史管理器
history_mgr = ChatHistoryManager(self.db, chat_request.session_id)
history = await history_mgr.load_history()
assistant_content_buffer = ""
mcp_service_ids = None
if chat_request.use_mcp_tools:
mcp_service_ids = await self._get_mcp_service_ids(chat_request.session_id)
ai = AIChatService(role,
vectorization_service=self.vectorization_service,
mcp_client_manager=self.mcp_client_manager,
mcp_service_ids=mcp_service_ids)
# 根据用户选择决定使用RAG还是普通聊天
if chat_request.use_knowledge_base:
# 检查是否有知识库可用
knowledge_base_ids = await self._get_knowledge_base_ids(chat_request.session_id)
async for chunk in ai.rag_chat_stream(history, chat_request.message, knowledge_base_ids):
assistant_content_buffer += chunk
payload = json.dumps({
"content": chunk
}, ensure_ascii=False)
yield payload
else:
# 调用 AI 服务的流式方法
async for chunk in ai.chat_stream(history, chat_request.message):
assistant_content_buffer += chunk
payload = json.dumps({
"content": chunk
}, ensure_ascii=False)
yield payload # 边生成边返回给调用方
# 保存消息到 DB + Redis
await history_mgr.save_message(MessageBase(
role=MessageRole.User,
content=chat_request.message,
session_id=chat_request.session_id
))
await history_mgr.save_message(MessageBase(
role=MessageRole.Assistant,
content=assistant_content_buffer,
session_id=chat_request.session_id
))
- 实现API方法
# app/api/mcp_service.py
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.ai.mcp.dependency import get_mcp_client_manager
from app.data.db import get_db
from app.service.mcp_service import MCPManagerService
from app.schemas.mcp_service import (
MCPServiceCreate,
MCPServiceUpdate,
MCPServiceResponse
)
router = APIRouter(prefix="/api/mcp-services", tags=["MCP服务管理"])
@router.post("/", response_model=MCPServiceResponse, status_code=status.HTTP_201_CREATED)
async def create_mcp_service(
mcp_service: MCPServiceCreate,
db: AsyncSession = Depends(get_db),
mcp_client_manager = Depends(get_mcp_client_manager)
):
"""创建MCP服务"""
service = MCPManagerService(db, mcp_client_manager)
try:
# 创建数据库记录
result = await service.create(mcp_service)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/", response_model=List[MCPServiceResponse])
async def get_mcp_services(db: AsyncSession = Depends(get_db)):
"""获取所有MCP服务"""
service = MCPManagerService(db)
return await service.get_all()
@router.get("/{service_id}", response_model=MCPServiceResponse)
async def get_mcp_service(
service_id: int,
db: AsyncSession = Depends(get_db)
):
"""获取指定MCP服务"""
service = MCPManagerService(db)
result = await service.get_by_id(service_id)
if not result:
raise HTTPException(status_code=404, detail="MCP服务不存在")
return result
@router.put("/{service_id}", response_model=MCPServiceResponse)
async def update_mcp_service(
service_id: int,
mcp_service: MCPServiceUpdate,
db: AsyncSession = Depends(get_db),
mcp_client_manager = Depends(get_mcp_client_manager)
):
"""更新MCP服务"""
service = MCPManagerService(db, mcp_client_manager)
result = await service.update(service_id, mcp_service)
if not result:
raise HTTPException(status_code=404, detail="MCP服务不存在")
return result
@router.delete("/{service_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_mcp_service(
service_id: int,
db: AsyncSession = Depends(get_db),
mcp_client_manager = Depends(get_mcp_client_manager)
):
"""删除MCP服务"""
service = MCPManagerService(db, mcp_client_manager)
# 删除数据库记录
success = await service.delete(service_id)
if not success:
raise HTTPException(status_code=404, detail="MCP服务不存在")
- 启动函数注入mcp单例对象,并注册MCP API
# app/main.py
import logging
logging.basicConfig(
level=logging.INFO, # 设置全局日志级别
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)
from app.ai.mcp.dependency import get_mcp_client_manager
from app.service.mcp_service import MCPManagerService
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期:启动 -> 运行 -> 关闭"""
logger.info("正在启动MCP服务...")
try:
# 获取所有启用的MCP服务
async with SessionLocal() as db:
mcp = MCPManagerService(db)
enabled_services = await mcp.get_enabled_services_as_config()
if enabled_services:
# 初始化MCP客户端管理器单例
mcp_client_manager = get_mcp_client_manager()
for service in enabled_services:
await mcp_client_manager.start_service(service)
logger.info(f"成功启动 {len(enabled_services)} 个MCP服务")
else:
logger.info("没有启用的MCP服务")
except Exception as e:
logger.error(f"启动MCP服务失败: {e}")
yield
# 注册MCP服务路由
app.include_router(
mcp_service.router,
prefix="/api"
)
至此,我们的通用AI工具的后端应用已经全部完成,UI部分就不再实现了。
