Spiga

通用AI聊天平台实现4:扩展MCP工具

2025-08-02 22:16:40

一、MCP概述

MCP(Model Context Protocol)是由Anthropic于2024年底提出并开源的一种协议,旨在为AI系统(如AI编程助手、Agent等)提供安全、标准化的数据访问方式。它采用客户端-服务器架构,使AI工具(如Claude Desktop、IDE插件等)能够通过MCP客户端与MCP服务端交互,访问本地或远程数据源。

1. 基本流程

  • AI大模型--->通过函数调用--->函数列表--->发送给大模型--->判断调用函数+生成调用参数--->应用将结果返回给大模型
  • 函数调用--->交互模式--->调用规范

2. MCP的意义

日常开发中由于接口碎片化,我们可能需要开发搜索、SQL数据库、API调用工具等等。

  • 有了MCP,我们可以将

    • 工具开发,封装成MCP服务器

    • AI应用开发,连接MCP服务器

  • 数据处理与隐私安全,MCP服务器可以在本地进程中运行,对接本地设备的私有数据

  • 服务集成与扩展效率

    • MCP服务器可以方便集成
    • MCP服务器可以复用,配置即接入
    • AI应用只需要实现一次MCP接入,就拥有了与所有MCP服务器通信的能力

应用场景

  • 企业办公场景
  • 个人AI助手场景

总之:MCP就是AI应用与外部的工具。

3. 基础概念

MCP 是客户端-服务端架构,一个 Host 可以连接多个 MCP Server。

  • MCP Hosts(宿主程序):如Claude Desktop、IDE等,通过MCP访问数据。
  • MCP Clients(客户端):与服务器建立1:1连接,处理通信。
  • MCP Servers(服务端):轻量级程序,提供标准化的数据或工具访问能力。
  • Local Data Sources(本地数据源):如文件、数据库等,由MCP服务端安全访问。
  • Remote Services(远程服务):如API、云服务等,MCP服务端可代理访问。

6. 协议层与传输层

  • 协议层:负责消息封装(framing)、请求/响应关联、高级通信模式管理。

  • 传输层:支持两种通信方式

    • Stdio传输(标准输入/输出):适用于本地进程间通信。

    • HTTP + SSE传输:

      • 服务端→客户端:Server-Sent Events(SSE)
      • 客户端→服务端:HTTP POST
      • 适用于远程网络通信。

      所有传输均采用JSON-RPC 2.0进行消息交换。

5. 消息类型

MCP 拥有多种类型的消息来处理不同的场景

  • 请求(Request)(期望获得响应)
interface Request {
  method: string;
  params?: { ... };
}
  • 成功响应(Result)
interface Result {
  [key: string]: unknown;
}
  • 错误响应(Error)
interface Error {
  code: number;
  message: string;
  data?: unknown;
}
  • 通知(Notification)(单向,无需响应)
interface Notification {
  method: string;
  params?: { ... };
}

6. 生命周期

类似于三次握手,MCP客户端与MCP服务端初始化建立连接会进行以下步骤:

  • 初始化(Initialization)
    1. 客户端发送initialize请求(含协议版本、能力集)。
    2. 服务端返回版本及能力信息。
    3. 客户端发送initialized通知确认。
    4. 进入正常通信阶段。
  • 消息交换(Message Exchange):当初始化完毕,就可以进行通信了,目前支持:
    • 请求-响应模式(Request-Response):双向通信。
    • 通知模式(Notification):单向消息。
  • 终止(Termination):有以下几种方式会关闭连接
    • 主动关闭(close())。
    • 传输层断开。
    • 错误触发终止。

7. MCP交互流程

假设有一个AI助手应用(MCP主机),需要完成在数据库中查找最新的销售报告,并将其通过电子邮件发送给我。我们分析一下这个场景MCP的交互流程

  1. 初始化:MCP客户端连接到两个MCP服务器(数据库访问、邮件发送)
  2. 用户查询:输入请求
  3. LLM推理:先查询数据,发送邮件
  4. 工具调用:包含工具名和参数的请求应用,请求MCP客户端执行调用
  5. 执行:MCP客户端向调用--->MCP服务器发送一个调用--->执行返回结果
  6. 中间响应:应用结果返回给大模型,LLM得到销售数据,调用邮件发送工具(包含收件人、内容),返回应用
  7. 再次执行:应用拿到工具调用请求,请求MCP客户端调用--->MCP服务器执行,返回确认信息--->LLM
  8. 生成最终答复

8. MCP生态

二、构建MCP主机应用

1. 安装MCP文件系统服务

这里我们从github中下载modelcontextprotocol提供的filesystem来做示例。

安装这个文件系统服务提供了很多种方式,细节可以查看项目说明。我们这里采用 npx来安装

npm install -g @modelcontextprotocol/server-filesystem

2. 构建MCP主机

接下来我们构建一个MCP主机,来使用这个文件系统服务。项目文档里面也有介绍

npx -y @modelcontextprotocol/server-filesystem C:/Test

  • C:/Test 是最小配置原则,必需至少配置一个可以访问的目录,文件系统服务操作设置的目录。多个目录用空格区分

  • 上图我们看到已经启动了一个MCP服务,这个服务使用的是stdio通信方式,这意味着我们没有任何方法来使用这个MCP服务,因为没有提供外部的通信方式。

    stdio包括: 标准输入 stdin,标准输出 stdout,标准错误 stderr。它的通信方式是嵌入到其他进程,已子进程的方式来执行。

    我们可以用这个命令来看一下服务是否正常(检查正常后可以关闭)。

3. MCP Client Demo

接下来我们写一个Demo,把文件系统服务嵌入到项目中。

import asyncio
import json
import os
from contextlib import AsyncExitStack
from typing import Optional

from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import OpenAI

load_dotenv()

class FileAgentClient:
    def __init__(self):
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.openai_client = OpenAI(
            api_key=os.getenv("DASHSCOPE_API_KEY"),
            base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
        )
        self.conversation_history = []

    async def connect_to_server(self, allowed_directory: str):
        """启动并连接到 Filesystem MCP server。"""
        print(f"正在启动 Filesystem Server,授权访问目录: {allowed_directory}...")
        server_params = StdioServerParameters(
            command="npx",
            args=["-y", "@modelcontextprotocol/server-filesystem", allowed_directory]
        )

        # 启动服务器子进程并建立 stdio 连接
        read, write = await self.exit_stack.enter_async_context(stdio_client(server_params))

        # 创建并初始化客户端会话
        self.session = await self.exit_stack.enter_async_context(ClientSession(read, write))
        await self.session.initialize()

        response = await self.session.list_tools()
        tool_names = [tool.name for tool in response.tools]
        print(f"连接成功!可用文件工具: {tool_names}")

    async def process_query(self, user_query: str):
        """处理用户查询,与 LLM 和 MCP 服务器交互。"""
        self.conversation_history.append({"role": "user", "content": user_query})

        # 1. 从服务器获取可用工具的定义
        list_tools_response = await self.session.list_tools()

        available_tools = []

        # 转换为 LLM 能识别的 function 工具格式
        for tool in list_tools_response.tools:
            tool_schema = getattr(
                tool,
                "inputSchema",
                {"type": "object", "properties": {}, "required": []},
            )

            openai_tool = {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool_schema,
                },
            }
            available_tools.append(openai_tool)

        # 2. 调用 LLM,并提供工具定义
        print("正在思考...")
        response = self.openai_client.chat.completions.create(
            model="qwen3-max",
            messages=self.conversation_history,
            tools=available_tools
        )

        choice = response.choices[0]
        message = choice.message

        # 3. 判断是否有工具调用(tool_calls)
        if hasattr(message, "tool_calls") and message.tool_calls:
            for tool_call in message.tool_calls:
                tool_name = tool_call.function.name
                tool_args = json.loads(tool_call.function.arguments or "{}")
                print(f"LLM 请求调用 MCP 工具: {tool_name}({tool_args})")

                # 调用 MCP 服务器对应工具
                result = await self.session.call_tool(tool_name, tool_args)
                print(f"工具执行结果: {result}")

                # 把结果反馈给 LLM
                self.conversation_history.append(message)
                self.conversation_history.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "name": tool_name,
                    "content": str(result)
                })

            # 再次调用 LLM,让它基于工具结果总结回复
            print("正在生成最终回答...")
            final_response = self.openai_client.chat.completions.create(
                model="qwen3-max",
                messages=self.conversation_history
            )
            answer = final_response.choices[0].message.content
        else:
            # 没有工具调用,直接输出 LLM 回复
            answer = message.content

        # 记录回复内容
        self.conversation_history.append({"role": "assistant", "content": answer})
        return answer

    async def chat(self):
        """运行交互式聊天循环。"""
        print("\n--- 文件系统代理已启动 ---")
        print("输入 'quit' 退出。")

        while True:
            try:
                query = input("\n> ").strip()
                if query.lower() == 'quit':
                    break
                if not query:
                    continue

                response_text = await self.process_query(query)
                print(f"\nAI: {response_text}")

            except Exception as e:
                print(f"\n发生错误: {e}")

async def main():
    client = FileAgentClient()

    allowed_dir = os.getenv("ALLOWED_DIRECTORY", ".")
    await client.connect_to_server(allowed_dir)
    await client.chat()

if __name__ == "__main__":
    asyncio.run(main())
# .env

DASHSCOPE_API_KEY="sk-XXXXXXXXXXXXXXXXXX"
ALLOWED_DIRECTORY="C:\Test"

三、AI聊天应用集成MCP设计

1. 集成需求分析

  • 服务的生命周期管理
  • 多样化连接支持
  • 配置持久化
  • 服务自启动
  • 动态工具发现

2. 功能设计

  • API层:管理服务配置
  • 业务服务层:API层和底层AI逻辑的协调者
    • 创建一个服务请求
    • 将服务配置写入数据库
    • 调用AI层去启动服务
  • AI核心层(MCP模块):客户端管理器(全局单例)
    • 连接管理
    • 适配协议
    • 工具加载
    • 资源回收
  • 持久化层:存储MCP服务的配置信息,与会话多对多关联

3. MCP核心工作流程

配置一个MCP服务:

  1. 创建一个新的MCP服务器(文件系统服务、stido、参数)
  2. API层接受请求,转交给MCP业务层
  3. 业务层:服务的配置信息(名称、参数配置)存数据库,传递服务配置给AI层,启动并连接该服务
  4. AI层:收到启动请求,根据服务类型选择对应的连接逻辑

MCP服务自启动:

  1. 应用生命周期事件
  2. 业务层:查询所有已启用的MCP服务配置,返回配置列表
  3. 应用启动逻辑遍历列表,调用AI层逐个启动服务器

聊天中使用工具:

  1. 创建聊天会话选择服务,会话与MCP服务绑定
  2. 用户在会话输入问题
  3. AI聊天服务接受请求,加载会话信息,向AI层 MCP请求,获取绑定服务的工具列表
  4. AI聊天服务,工具列表与用户提问发送给大模型,调用工具

提问:mcp是大模型自动去调用吗?

MCP 只是提供工具列表:

  • MCP主机(就是我们的应用),会把消息和能使用的工具一起发送给大模型
  • 大模型给出工具调用请求,返回给我们的应用
  • 我们的主机再去找MCP调用工具,返回结果给应用
  • 然后,应用再把结构给模型,生成最终答复

四、实现AI聊天集成MCP

1. 实现AI层MCP核心功能

  • 抽象一个MCP模块基类
# app/ai/mcp/base.py

"""
MCP 模块抽象基类,提供统一的接口和数据模型
"""
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Dict, Optional
from langchain_core.tools import BaseTool
from pydantic import BaseModel

class MCPTransportType(str, Enum):
    """MCP 传输类型枚举"""
    STDIO = "stdio"
    STREAMABLE_HTTP = "streamable_http"
    SSE = "sse"

class MCPServiceConfig(BaseModel):
    """MCP 服务配置"""
    service_id: int
    name: str
    transport_type: MCPTransportType
    command: Optional[str] = None
    args: Optional[List[str]] = None
    env: Optional[Dict[str, str]] = None
    url: Optional[str] = None
    headers: Optional[Dict[str, str]] = None
    enabled: bool = True

class MCPServiceStatus(BaseModel):
    """MCP 服务状态"""
    service_id: int
    name: str
    running: bool
    error: Optional[str] = None
    tools_count: int = 0

class MCPProvider(ABC):
    """MCP 服务提供者抽象基类"""

    @abstractmethod
    async def start_service(self, service: MCPServiceConfig) -> bool:
        """启动单个MCP服务"""
        pass

    @abstractmethod
    async def stop_service(self, service_id: int) -> bool:
        """停止单个MCP服务"""
        pass

    @abstractmethod
    async def update_service(self, service: MCPServiceConfig) -> bool:
        """更新MCP服务配置"""
        pass

    @abstractmethod
    async def get_service_status(self, service_id: int) -> MCPServiceStatus:
        """获取MCP服务状态"""
        pass

    @abstractmethod
    async def get_tools(self, service_id: int) -> List[BaseTool]:
        """获取指定服务的工具列表"""
        pass

    @abstractmethod
    async def get_tools_by_ids(self, service_ids: List[int]) -> List[BaseTool]:
        """根据服务ID列表获取工具列表"""
        pass

  • 实现MCP客户端管理器
# app/ai/mcp/client_manager.py

import logging
logger = logging.getLogger(__name__)

from langchain_core.tools import BaseTool
from typing import Dict, List
from contextlib import AsyncExitStack
from mcp import ClientSession
from mcp.client.stdio import stdio_client, StdioServerParameters
from mcp.client.streamable_http import streamablehttp_client, StreamableHTTPTransport
from mcp.client.sse import sse_client
from langchain_mcp_adapters.tools import load_mcp_tools
from .base import MCPProvider, MCPServiceConfig, MCPServiceStatus, MCPTransportType

class MCPClientManager(MCPProvider):
    """MCP客户端管理器"""

    def __init__(self):
        self.services: Dict[int, MCPServiceConfig] = {}
        self.sessions: Dict[int, ClientSession] = {}
        self.exit_stacks: Dict[int, AsyncExitStack] = {}

    async def start_service(self, service: MCPServiceConfig) -> bool:
        """启动单个MCP服务"""
        try:
            self.services[service.service_id] = service

            if service.transport_type == MCPTransportType.STDIO:
                await self._start_stdio_service(service)
            elif service.transport_type == MCPTransportType.STREAMABLE_HTTP:
                await self._start_streamable_http_service(service)
            elif service.transport_type == MCPTransportType.SSE:
                await self._start_sse_service(service)
            else:
                raise ValueError(f"不支持的传输类型: {service.transport_type}")

            logger.info(f"成功启动MCP服务: {service.name} (ID: {service.service_id})")
            return True
        except Exception as e:
            logger.error(f"启动MCP服务失败 {service.name}: {e}")
            return False

    async def _start_stdio_service(self, service: MCPServiceConfig) -> None:
        """启动stdio类型的MCP服务"""
        command = service.command
        args = service.args or []

        if not command:
            raise ValueError(f"stdio服务 {service.name} 缺少command配置")

        server_params = StdioServerParameters(
            command=command,
            args=args
        )

        # 使用stdio_client连接
        exit_stack = AsyncExitStack()
        read, write = await exit_stack.enter_async_context(
            stdio_client(server_params)
        )

        # 创建客户端会话
        session = ClientSession(read, write)
        await exit_stack.enter_async_context(session)

        # 初始化会话
        await session.initialize()

        self.sessions[service.service_id] =  session
        self.exit_stacks[service.service_id] = exit_stack

        logger.info(f"stdio服务 {service.name} 启动成功")

    async def _start_streamable_http_service(self, service: MCPServiceConfig) -> None:
        """启动streamable_http类型的MCP服务"""
        url = service.url
        headers = service.headers or {}

        if not url:
            raise ValueError(f"streamable_http服务 {service.name} 缺少url配置")

        http_transport = StreamableHTTPTransport(
            url=url,
            headers=headers
        )

        # 使用streamablehttp_client连接
        exit_stack = AsyncExitStack()
        read, write, _ = await exit_stack.enter_async_context(
            streamablehttp_client(http_transport)
        )

        # 创建客户端会话
        session = ClientSession(read, write)
        await exit_stack.enter_async_context(session)

        # 初始化会话
        await session.initialize()

        self.sessions[service.service_id] = session
        self.exit_stacks[service.service_id] = exit_stack

        logger.info(f"streamable_http服务 {service.name} 启动成功")

    async def _start_sse_service(self, service: MCPServiceConfig) -> None:
        """启动sse类型的MCP服务"""
        url = service.url
        headers = service.headers or {}

        if not url:
            raise ValueError(f"sse服务 {service.name} 缺少url配置")

        http_transport = StreamableHTTPTransport(
            url=url,
            headers=headers
        )

        # 使用sse_client连接
        exit_stack = AsyncExitStack()
        read, write = await exit_stack.enter_async_context(
            sse_client(http_transport)
        )

        # 创建客户端会话
        session = ClientSession(read, write)
        await exit_stack.enter_async_context(session)

        # 初始化会话
        await session.initialize()

        self.sessions[service.service_id] = session
        self.exit_stacks[service.service_id] = exit_stack

        logger.info(f"sse服务 {service.name} 启动成功")

    async def stop_service(self, service_id: int) -> bool:
        """停止单个MCP服务"""
        try:
            exit_stack = self.exit_stacks.get(service_id)
            if not exit_stack:
                logger.warning(f"服务 {service_id} 不存在或未运行")
                return True  # 认为已经停止

            # 获取该服务的AsyncExitStack并关闭所有相关连接
            if exit_stack:
                try:
                    # 关闭该服务的所有连接(包括stdio、session等)
                    await exit_stack.aclose()
                    logger.info(f"成功关闭服务 {service_id} 的所有连接")
                except Exception as e:
                    logger.warning(f"关闭服务 {service_id} 连接时出错: {e}")

            self.exit_stacks.pop(service_id)
            self.sessions.pop(service_id)

            logger.info(f"成功停止MCP服务: {service_id}")
            return True
        except Exception as e:
            logger.error(f"停止MCP服务失败 {service_id}: {e}")
            return False

    async def stop_all_service(self) -> None:
        for key, stack in self.exit_stacks.items():
            await stack.aclose()

    async def update_service(self, service: MCPServiceConfig) -> bool:
        """更新MCP服务配置"""
        try:
            service_id = service.service_id
            if service_id in self.services:
                # 先停止旧服务
                await self.stop_service(service_id)

            # 启动新服务
            await self.start_service(service)
            logger.info(f"成功更新MCP服务: {service.name} (ID: {service_id})")
            return True
        except Exception as e:
            logger.error(f"更新MCP服务失败 {service.name}: {e}")
            return False

    def get_service_status(self, service_id: int) -> MCPServiceStatus:
        """获取服务状态"""
        if service_id not in self.exit_stacks:
            return MCPServiceStatus(
                service_id=service_id,
                name=f"Service-{service_id}",
                running=False,
                error="服务未启动"
            )

        service = self.services[service_id]
        session = self.sessions.get(service_id)
        if not session:
            return MCPServiceStatus(
                service_id=service_id,
                name=service.name,
                running=False,
                error="会话不存在"
            )

        return MCPServiceStatus(
            service_id=service_id,
            name=service.name,
            running=True
        )

    async def get_tools(self, service_id: int) -> List[BaseTool]:
        """获取指定服务的工具列表"""
        session = self.sessions.get(service_id)
        if not session:
            logger.warning(f"服务 {service_id} 不存在或未运行")
            return []

        try:
            # 使用 langchain-mcp-adapters 加载工具
            tools = await load_mcp_tools(session)

            service_name = self.services[service_id].name
            logger.info(f"成功获取服务 {service_name} 的 {len(tools)} 个工具")
            return tools

        except Exception as e:
            logger.error(f"获取服务 {service_id} 工具失败: {e}")
            return []

    async def get_tools_by_ids(self, service_ids: List[int]) -> List[BaseTool]:
        """
        根据服务ID列表获取工具列表
        """
        all_tools = []
        for service_id in service_ids:
            tools = await self.get_tools(service_id)
            all_tools.extend(tools)

        return all_tools
  • 提供全局单例函数
# app/ai/mcp/dependency.py

from .client_manager import MCPClientManager

# 全局单例变量
mcp_client_manager: MCPClientManager | None = None

def get_mcp_client_manager() -> MCPClientManager:
    """获取 MCPClientManager 单例"""
    global mcp_client_manager
    if mcp_client_manager is None:
        mcp_client_manager = MCPClientManager()
    return mcp_client_manager
  • 重新实现chat方法

由于之前我们的chat方法是基于langchain链来实现,它不能完成工具调用,于是我们要改成代理的方式来重写chat方法,我们在app/ai文件夹,新建一个agent.py文件,代码如下:

import json
from typing import List, AsyncGenerator, Optional, Any
from langchain.agents import create_agent
from langchain_core.messages import BaseMessage, trim_messages, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import Runnable
from langchain_openai import ChatOpenAI
from app.ai.mcp.client_manager import MCPClientManager
from app.ai.vectorization.service import DocumentVectorizationService
from app.data.models import Role

class AgentService:
    def __init__(self, role: Role, history: List[BaseMessage], max_tokens = 20,
                 vectorization_service: Optional[DocumentVectorizationService] = None,
                 mcp_manager: Optional[MCPClientManager] = None,
                 mcp_service_ids: Optional[List[int]] = None):

        self.role = role
        self.vectorization_service = vectorization_service
        self.mcp_manager = mcp_manager
        self.mcp_service_ids = mcp_service_ids

        # 创建ChatOpenAI实例
        self.llm = ChatOpenAI(
            base_url=role.provider.endpoint,
            api_key=role.provider.api_key,
            model=role.provider.model,
            temperature=role.temperature,
            streaming=True
        )

        # 构建聊天历史消息的修剪器
        trimmer = trim_messages(
            token_counter=len,
            max_tokens=max_tokens,
            start_on=("human", "ai")
        )

        self.history = trimmer.invoke(history)

    async def call_agent(self, messages: List[BaseMessage]) -> AsyncGenerator[str]:
        """调用代理模型进行聊天"""
        tools = []
        if self.mcp_manager and self.mcp_service_ids:
            tools = await self.mcp_manager.get_tools_by_ids(self.mcp_service_ids)

        agent: Runnable = create_agent(model=self.llm, tools=tools)

        async for token, metadata in agent.astream(
                { "messages": messages },
                stream_mode="messages"
        ):
            node = metadata["langgraph_node"]
            if node == "tools":
                yield f"```\n调用工具:{token.name}\n调用结果:\n{token.content}\n```\n"
            if node == "model":
                yield token.content


    async def chat_stream(self, user_input: str) -> AsyncGenerator[str]:
        """处理聊天请求"""
        # 构建Prompt模板
        prompt = ChatPromptTemplate.from_messages([
            SystemMessage(content=self.role.system_prompt),
            MessagesPlaceholder(variable_name="history"),
            ("human", "{input}"),
        ])

        messages = prompt.invoke({"history": self.history, "input": user_input}).to_messages()
        async for chunk in self.call_agent(messages):
            yield chunk

    async def _search_knowledge_base(self, query: str, knowledge_base_ids: List[int]) -> List[Any] | None:
        """搜索知识库并返回带有元数据的JSON格式结果"""
        if not self.vectorization_service or not knowledge_base_ids:
            return None

        # 直接调用DocumentVectorizationService搜索相关文档
        search_results = await self.vectorization_service.search_similar_documents(
            query=query,
            knowledge_base_ids=knowledge_base_ids,
            k=20
        )

        if not search_results:
            return None

        # 将搜索结果转换为带有元数据的JSON格式
        formatted_results = []
        for i, result in enumerate(search_results, 1):
            metadata = result.get('metadata', {})
            formatted_result = {
                "index": i,
                "content": result['content'],
                "metadata": {
                    "filename": metadata.get('filename', '未知文件'),
                    "chunk_index": metadata.get('chunk_index', 0),
                    "total_chunks": metadata.get('total_chunks', 1)
                }
            }
            formatted_results.append(formatted_result)

        return formatted_results

    async def rag_chat_stream(self, user_input: str, knowledge_base_ids: List[int]) -> AsyncGenerator[str]:
        """基于知识库的RAG流式聊天"""
        # 搜索知识库
        knowledge_data = await self._search_knowledge_base(user_input, knowledge_base_ids)
        # 将知识库数据转换为JSON字符串
        knowledge_json = json.dumps(knowledge_data, ensure_ascii=False, indent=2)

        # 构建增强的系统提示词
        enhanced_system_prompt = f"""{self.role.system_prompt}

基于以下知识库信息回答用户问题:

知识库搜索结果:
{knowledge_json}

重要要求:
1. 请根据上述知识库信息回答用户的问题
2. 在回答中必须明确指出信息来源,使用以下格式引用:
   - 当引用某个文档片段时,使用 [1]、[2] 等格式标注
   - 在回答末尾提供完整的来源列表
3. 如果知识库中没有相关信息,请明确说明"根据知识库信息,未找到相关内容"
4. 回答要准确、详细,并确保每个关键信息都有明确的来源标注
5. 充分利用JSON中的元数据信息,包括文件名、片段位置等

文档来源格式说明:
- [1] 表示第1个引用
- 每个结果包含:文件名、片段位置等元数据
- 在回答末尾提供完整的来源列表"""

        # 构建Prompt模板
        prompt = ChatPromptTemplate.from_messages([
            SystemMessage(content=enhanced_system_prompt),
            MessagesPlaceholder(variable_name="history"),
            ("human", "{input}"),
        ])

        messages = prompt.invoke({"history": self.history, "input": user_input}).to_messages()

        async for chunk in self.call_agent(messages):
            yield chunk

以上代码大部分逻辑跟chat.py一样,只是链式调用改成了代理实现。

2. 实现业务层MCP管理功能

  • 修改数据层模型
# app/data/model/mcp_service.py

# ---------------------------
# MCP 服务
# ---------------------------
from datetime import datetime, timezone
from sqlalchemy import Column, Integer, String, DateTime, Boolean, JSON, func
from sqlalchemy.orm import relationship

from app.data.models.base import Base

class MCPService(Base):
    __tablename__ = "mcp_services"

    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False, unique=True)  # 服务名称(唯一)
    description = Column(String(500), nullable=True)         # 服务描述
    transport = Column(String(20), nullable=False)          # 传输类型(stdio/streamable_http/sse)
    config = Column(JSON, nullable=False)                   # JSON 配置(存储 command/args/url/headers 等)
    enabled = Column(Boolean, default=True)                 # 是否启用
    created_at = Column(DateTime(timezone=True),
                        default=lambda: datetime.now(timezone.utc),
                        server_default=func.now())          # 创建时间

    # 定义关系:MCP 服务 -> 会话(多对多)
    sessions = relationship("Session", secondary="session_mcp", back_populates="mcp_services")
# app/data/model/session.py

# ---------------------------
# 会话
# ---------------------------
from datetime import datetime, timezone
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, func
from sqlalchemy.orm import relationship

from app.data.models.base import Base

class Session(Base):
    __tablename__ = "sessions"

    id = Column(Integer, primary_key=True)
    title = Column(String(100), nullable=False)             # 标题
    created_at = Column(DateTime(timezone=True),
                        default=lambda: datetime.now(timezone.utc),
                        server_default=func.now())  # 创建时间

    # 关联角色ID
    role_id = Column(Integer, ForeignKey("roles.id"), nullable=False)
    # 定义模型关系:会话 -> 角色(多对一)
    role = relationship("Role", back_populates="sessions")

    # 定义关系:会话 -> 消息(一对多)
    messages = relationship("Message", back_populates="session", cascade="all, delete-orphan", passive_deletes=True)

    # 定义关系:会话 -> 知识库(多对多)
    knowledge_bases = relationship("KnowledgeBase", secondary="session_knowledge", back_populates="sessions", passive_deletes=True)

    # 定义关系:会话 -> MCP服务(多对多)
    mcp_services = relationship("MCPService", secondary="session_mcp", back_populates="sessions", passive_deletes=True)
# app/data/model/session_mcp.py

# ---------------------------
# 会话-MCP服务 关系表(多对多中间表)
# ---------------------------
from sqlalchemy import Table, Column, Integer, ForeignKey
from app.data.models.base import Base

session_mcp = Table(
    "session_mcp",
    Base.metadata,
    Column("session_id", Integer, ForeignKey("sessions.id", ondelete="CASCADE"), primary_key=True),
    Column("mcp_service_id", Integer, ForeignKey("mcp_services.id", ondelete="CASCADE"), primary_key=True)
)
# app/data/model/__init__.py

from .mcp_service import MCPService
from .session_mcp import session_mcp
  • 修改DTO
# app/service/message.py

from datetime import datetime
from pydantic import BaseModel, Field
from enum import Enum

class MessageRole(str, Enum):
    """消息角色枚举"""
    User = "user"
    Assistant = "assistant"
    System = "system"

class MessageBase(BaseModel):
    """Message基础DTO模型"""
    role: MessageRole = Field(..., description="消息角色:user/assistant/system")
    content: str = Field(..., description="消息内容")
    session_id: int = Field(..., description="关联的会话ID")

class MessageResponse(MessageBase):
    """Message响应DTO模型"""
    id: int = Field(..., description="消息ID")
    created_at: datetime = Field(..., description="创建时间")

    class Config:
        from_attributes = True

class ChatRequest(BaseModel):
    """聊天请求DTO模型"""
    message: str = Field(..., description="用户消息内容")
    session_id: int = Field(..., description="会话ID")
    use_knowledge_base: bool = Field(default=False, description="是否启用知识库问答")
    use_mcp_tools: bool = Field(default=False, description="是否启用MCP工具")
# app/service/session.py

from typing import Optional, List
from datetime import datetime
from pydantic import BaseModel, Field

class SessionBase(BaseModel):
    """Session基础DTO模型"""
    title: str = Field(..., max_length=100, description="会话标题")
    role_id: int = Field(..., description="关联的角色ID")

class SessionCreate(SessionBase):
    """创建Session的DTO模型"""
    knowledge_base_ids: Optional[List[int]] = Field(default=[], description="绑定的知识库ID列表")
    mcp_service_ids: Optional[List[int]] = Field(default=[], description="绑定的会话ID列表")

class SessionResponse(SessionBase):
    """Session响应DTO模型"""
    id: int = Field(..., description="会话ID")
    created_at: datetime = Field(..., description="创建时间")
    message_count: Optional[int] = Field(None, description="消息数量")

    class Config:
        from_attributes = True

class SessionUpdate(BaseModel):
    """更新Session的DTO模型"""
    title: Optional[str] = Field(None, max_length=100, description="会话标题")
    role_id: Optional[int] = Field(None, description="关联的角色ID")
    knowledge_base_ids: Optional[List[int]] = Field(default=[], description="绑定的知识库ID列表")
    mcp_services_ids: Optional[List[int]] = Field(default=[], description="绑定的会话ID列表")

class SessionListResponse(BaseModel):
    """Session列表响应DTO模型"""
    id: int = Field(..., description="会话ID")
    title: str = Field(..., description="会话标题")
    created_at: datetime = Field(..., description="创建时间")
    role_name: str = Field(..., description="角色名称")

    class Config:
        from_attributes = True

class SessionWithKnowledgeBases(SessionBase):
    """包含知识库详细信息的Session响应DTO模型"""
    id: int = Field(..., description="会话ID")
    created_at: datetime = Field(..., description="创建时间")
    message_count: Optional[int] = Field(None, description="消息数量")
    knowledge_bases: Optional[List[dict]] = Field(default=[], description="绑定的知识库详细信息")

    class Config:
        from_attributes = True
# app/service/mcp_session.py

from datetime import datetime
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field

from app.ai.mcp.base import MCPTransportType

class MCPServiceBase(BaseModel):
    """MCP 服务基础字段"""
    name: str = Field(..., max_length=100, description="服务名称")
    description: Optional[str] = Field(None, max_length=500, description="服务描述")
    transport: MCPTransportType = Field(..., description="传输类型")
    config: Dict[str, Any] = Field(..., description="服务配置")
    enabled: bool = Field(True, description="是否启用")


class MCPServiceCreate(MCPServiceBase):
    """创建 MCP 服务 DTO"""
    pass


class MCPServiceUpdate(BaseModel):
    """更新 MCP 服务 DTO"""
    name: Optional[str] = Field(None, max_length=100, description="服务名称")
    description: Optional[str] = Field(None, max_length=500, description="服务描述")
    transport: Optional[MCPTransportType] = Field(None, description="传输类型")
    config: Optional[Dict[str, Any]] = Field(None, description="服务配置")
    enabled: Optional[bool] = Field(None, description="是否启用")


class MCPServiceResponse(MCPServiceBase):
    """MCP 服务响应 DTO"""
    id: int = Field(..., description="服务ID")
    created_at: datetime = Field(..., description="创建时间")

    class Config:
        from_attributes = True
  • 实现业务层逻辑
# app/service/mcp_service.py

from typing import List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.data.models import MCPService
from app.schemas.mcp_service import MCPServiceCreate, MCPServiceUpdate, MCPServiceResponse
from app.ai.mcp.client_manager import MCPClientManager
from app.ai.mcp.base import MCPServiceConfig, MCPTransportType

class MCPManagerService:
    def __init__(self, db: AsyncSession, mcp_client_manager: Optional[MCPClientManager] = None):
        self.db = db
        self.mcp_client_manager = mcp_client_manager

    async def create(self, mcp_service: MCPServiceCreate) -> MCPServiceResponse:
        """创建MCP服务"""
        # 检查服务名称是否已存在
        existing = await self.db.scalar(
            select(MCPService).where(MCPService.name == mcp_service.name)
        )
        if existing:
            raise ValueError(f"MCP服务名称 '{mcp_service.name}' 已存在")

        # 创建新服务
        service_data = mcp_service.model_dump()
        service = MCPService(**service_data)

        self.db.add(service)
        await self.db.commit()
        await self.db.refresh(service)

        if service.enabled:
            service_config = self.convert_to_mcp_config(service)
            await self.mcp_client_manager.start_service(service_config)

        return MCPServiceResponse.model_validate(service)

    async def get_by_id(self, service_id: int) -> Optional[MCPServiceResponse]:
        """根据ID获取MCP服务"""
        service = await self.db.get(MCPService, service_id)
        if not service:
            return None
        return MCPServiceResponse.model_validate(service)

    async def get_all(self) -> List[MCPServiceResponse]:
        """获取所有MCP服务"""
        result = await self.db.execute(
            select(MCPService).order_by(MCPService.created_at.desc())
        )
        services = result.scalars().all()
        return [MCPServiceResponse.model_validate(service) for service in services]

    async def get_enabled_services_as_config(self) -> List[MCPServiceConfig]:
        """获取所有启用的MCP服务配置"""
        result = await self.db.execute(
            select(MCPService)
            .where(MCPService.enabled == True)
        )
        services = result.scalars().all()
        return [self.convert_to_mcp_config(service) for service in services]

    async def update(self, service_id: int, mcp_service: MCPServiceUpdate) -> Optional[MCPServiceResponse]:
        """更新MCP服务"""
        service: MCPService | None = await self.db.get(MCPService, service_id)
        if not service:
            return None

        # 检查名称冲突
        if mcp_service.name and mcp_service.name != service.name:
            existing = await self.db.scalar(
                select(MCPService).where(MCPService.name == mcp_service.name)
            )
            if existing:
                raise ValueError(f"MCP服务名称 '{mcp_service.name}' 已存在")

        # 更新字段
        update_data = mcp_service.model_dump(exclude_unset=True)
        for field, value in update_data.items():
            setattr(service, field, value)

        await self.db.commit()
        await self.db.refresh(service)

        if service.enabled:
            service_config = self.convert_to_mcp_config(service)
            await self.mcp_client_manager.update_service(service_config)

        return MCPServiceResponse.model_validate(service)

    async def delete(self, service_id: int) -> bool:
        """删除MCP服务"""
        service = await self.db.get(MCPService, service_id)
        if not service:
            return False

        await self.db.delete(service)
        await self.db.commit()

        mcp_status = self.mcp_client_manager.get_service_status(service_id)
        if mcp_status.running:
            await self.mcp_client_manager.stop_service(service_id)
        return True

    async def get_services_by_ids(self, service_ids: List[int]) -> List[MCPServiceResponse]:
        """根据ID列表获取MCP服务"""
        if not service_ids:
            return []

        result = await self.db.execute(
            select(MCPService)
            .where(MCPService.id.in_(service_ids))
            .where(MCPService.enabled == True)
        )
        services = result.scalars().all()
        return [MCPServiceResponse.model_validate(service) for service in services]

    @staticmethod
    def convert_to_mcp_config(service: MCPService) -> MCPServiceConfig:
        """将数据库模型转换为MCP内部配置"""
        config = MCPServiceConfig(
            service_id=service.id,
            name=service.name,
            transport_type = MCPTransportType(service.transport),
            enabled=service.enabled
        )

        match config.transport_type:
            case MCPTransportType.STDIO:
                config.command = service.config['command']
                config.args = service.config['args']
            case MCPTransportType.SSE | MCPTransportType.STREAMABLE_HTTP:
                config.url = service.config['url']
                config.headers = service.config['headers']

        return config
  • 修改消息服务chat方法
# app/service/message.py

class MessageService:
    def __init__(self, db: AsyncSession,
                 vectorization_service: Optional[DocumentVectorizationService] = None,
                 mcp_client_manager: Optional[MCPClientManager] = None):
        self.db = db
        self.vectorization_service = vectorization_service
        self.mcp_client_manager = mcp_client_manager
        
	async def _get_mcp_service_ids(self, session_id: int) -> List[int]:
        """获取会话关联的MCP服务ID列表"""
        session = await self.db.scalar(
            select(Session)
            .options(
                selectinload(Session.mcp_services)
            )
            .where(Session.id == session_id)
        )

        # 收集知识库ID
        mcp_service_ids = set()

        # 添加会话关联的知识库ID
        for mcp in session.mcp_services:
            mcp_service_ids.add(mcp.id)

        return list(mcp_service_ids)

    async def chat(self, chat_request: ChatRequest) -> AsyncGenerator[str]:
        """处理聊天请求"""
        # 验证session是否存在
        session = await self.db.get(Session, chat_request.session_id)
        if not session:
            raise ValueError(f"会话 ID {chat_request.session_id} 不存在")

        # 获取会话绑定的角色与模型信息
        role = await self.db.scalar(
            select(Role)
            .options(selectinload(Role.provider))  # 贪婪加载 provider
            .where(Role.id == session.role_id)
        )
        if not role:
            raise ValueError(f"角色【{role.name}】不存在")

        # 创建聊天历史管理器
        history_mgr = ChatHistoryManager(self.db, chat_request.session_id)
        history = await history_mgr.load_history()
        assistant_content_buffer = ""

        mcp_service_ids = None
        if chat_request.use_mcp_tools:
            mcp_service_ids = await self._get_mcp_service_ids(chat_request.session_id)

        ai = AIChatService(role,
                           vectorization_service=self.vectorization_service,
                           mcp_client_manager=self.mcp_client_manager,
                           mcp_service_ids=mcp_service_ids)

        # 根据用户选择决定使用RAG还是普通聊天
        if chat_request.use_knowledge_base:
            # 检查是否有知识库可用
            knowledge_base_ids = await self._get_knowledge_base_ids(chat_request.session_id)
            async for chunk in ai.rag_chat_stream(history, chat_request.message, knowledge_base_ids):
                assistant_content_buffer += chunk
                payload = json.dumps({
                    "content": chunk
                }, ensure_ascii=False)
                yield payload
        else:
            # 调用 AI 服务的流式方法
            async for chunk in ai.chat_stream(history, chat_request.message):
                assistant_content_buffer += chunk
                payload = json.dumps({
                    "content": chunk
                }, ensure_ascii=False)
                yield payload  # 边生成边返回给调用方

        # 保存消息到 DB + Redis
        await history_mgr.save_message(MessageBase(
            role=MessageRole.User,
            content=chat_request.message,
            session_id=chat_request.session_id
        ))
        await history_mgr.save_message(MessageBase(
            role=MessageRole.Assistant,
            content=assistant_content_buffer,
            session_id=chat_request.session_id
        ))
  • 实现API方法
# app/api/mcp_service.py

from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.ai.mcp.dependency import get_mcp_client_manager
from app.data.db import get_db
from app.service.mcp_service import MCPManagerService
from app.schemas.mcp_service import (
    MCPServiceCreate,
    MCPServiceUpdate,
    MCPServiceResponse
)

router = APIRouter(prefix="/api/mcp-services", tags=["MCP服务管理"])

@router.post("/", response_model=MCPServiceResponse, status_code=status.HTTP_201_CREATED)
async def create_mcp_service(
        mcp_service: MCPServiceCreate,
        db: AsyncSession = Depends(get_db),
        mcp_client_manager = Depends(get_mcp_client_manager)
):
    """创建MCP服务"""
    service = MCPManagerService(db, mcp_client_manager)
    try:
        # 创建数据库记录
        result = await service.create(mcp_service)

        return result
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))


@router.get("/", response_model=List[MCPServiceResponse])
async def get_mcp_services(db: AsyncSession = Depends(get_db)):
    """获取所有MCP服务"""
    service = MCPManagerService(db)
    return await service.get_all()


@router.get("/{service_id}", response_model=MCPServiceResponse)
async def get_mcp_service(
        service_id: int,
        db: AsyncSession = Depends(get_db)
):
    """获取指定MCP服务"""
    service = MCPManagerService(db)
    result = await service.get_by_id(service_id)
    if not result:
        raise HTTPException(status_code=404, detail="MCP服务不存在")
    return result


@router.put("/{service_id}", response_model=MCPServiceResponse)
async def update_mcp_service(
        service_id: int,
        mcp_service: MCPServiceUpdate,
        db: AsyncSession = Depends(get_db),
        mcp_client_manager = Depends(get_mcp_client_manager)
):
    """更新MCP服务"""
    service = MCPManagerService(db, mcp_client_manager)
    result = await service.update(service_id, mcp_service)
    if not result:
        raise HTTPException(status_code=404, detail="MCP服务不存在")

    return result

@router.delete("/{service_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_mcp_service(
        service_id: int,
        db: AsyncSession = Depends(get_db),
        mcp_client_manager = Depends(get_mcp_client_manager)
):
    """删除MCP服务"""
    service = MCPManagerService(db, mcp_client_manager)
    # 删除数据库记录
    success = await service.delete(service_id)
    if not success:
        raise HTTPException(status_code=404, detail="MCP服务不存在")
  • 启动函数注入mcp单例对象,并注册MCP API
# app/main.py

import logging
logging.basicConfig(
    level=logging.INFO,  # 设置全局日志级别
    format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)

from app.ai.mcp.dependency import get_mcp_client_manager
from app.service.mcp_service import MCPManagerService

logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期:启动 -> 运行 -> 关闭"""
    logger.info("正在启动MCP服务...")
    try:
        # 获取所有启用的MCP服务
        async with SessionLocal() as db:
            mcp = MCPManagerService(db)
            enabled_services = await mcp.get_enabled_services_as_config()

            if enabled_services:
                # 初始化MCP客户端管理器单例
                mcp_client_manager = get_mcp_client_manager()
                for service in enabled_services:
                    await mcp_client_manager.start_service(service)
                logger.info(f"成功启动 {len(enabled_services)} 个MCP服务")
            else:
                logger.info("没有启用的MCP服务")
    except Exception as e:
        logger.error(f"启动MCP服务失败: {e}")
    yield

# 注册MCP服务路由
app.include_router(
    mcp_service.router,
    prefix="/api"
)

至此,我们的通用AI工具的后端应用已经全部完成,UI部分就不再实现了。