Spiga

程序员的AI体验(七):LangChain RAG

2025-05-24 17:36:12

一、RAG基本概念

  • RAG定义与价值:RAG(检索增强生成)是一种AI技术框架,通过检索外部知识源来增强大语言模型的知识储备和生成能力,解决模型知识过时和内容幻觉问题,提升回答的准确性和可信度。
  • RAG工作原理:RAG工作流分为索引阶段和检索与生成阶段。索引阶段将数据处理成可检索格式,存储于向量数据库;检索与生成阶段实时响应用户查询,检索相关文本块并生成答案。
  • RAG应用场景:RAG适用于企业知识管理、智能客服、法律咨询等专业领域,能够针对特定信息源回答问题,发挥巨大价值。
  • 构建基础RAG流程:
    • 索引阶段:线下数据准备过程
      • 加载
      • 分割
      • 嵌入
      • 存储
    • 检索和生成阶段:线上阶段
      • 检索
      • 生成
  • 开发难题
    • 繁杂的数据接入
    • 复杂的文本处理
    • 多样的模型与接口
    • 专业的向量存储
    • 粘合所有环节
  • LangChain的价值
    • 从零开发RAG应用面临数据接入、文本处理、模型接口、向量存储和环节粘合等诸多难题,LangChain通过模块化架构简化了构建过程。
    • LangChain由多个独立包组成,如langchain-core提供核心接口和数据类型,langchain-community集成第三方服务,langchain-openai针对特定模型提供商集成。
    • LangChain具有流式处理、异步与并行执行、可观察性与调试、标准化接口等核心优势,让构建复杂AI应用变得简单高效。

二、文档加载

1. LCEL语言

  • LCEL(LangChain表达式语言)是一种声明式编程范式,用于将LangChain组件组合在一起,通过管道操作符|实现数据流的高效表达。
  • LCEL支持流式处理、异步与并行执行、可观察性与调试、标准化接口,为构建高性能AI应用提供强大支持。

2. 文档加载器

  • 文档加载器是RAG流程的入口,负责将各种数据源加载为LangChain能够处理的文档对象,其质量直接影响整个系统的性能。
  • 文档对象包含page_content和metadata两个核心属性,分别存储文本内容和相关元数据,元数据对于数据过滤、答案引用和来源追溯至关重要。
from langchain_core.documents import Document

doc = Document(
    page_content="这是一段示例文本内容,它将被用作文档的主要内容。",
    metadata={
        "source": "manual_example.py",
        "page": 2,
        "section": "2.3",
        "is_example": True
    }
)

# 我们可以直接打印这个 Document 对象,看看它的结构
print(doc)

# 也可以单独访问它的 page_content 和 metadata 属性
print("\n页面内容 (page_content):")
print(doc.page_content)

print("\n元数据 (metadata):")
print(doc.metadata)
  • 文档加载器遵循"基础加载器"接口规范,提供贪婪加载和懒加载两种方式,开发者需根据实际需求选择合适的加载方式。
加载器类 处理数据类型 所需依赖包 核心功能与特点
TextLoader 纯文本文件 langchain-community 最基础的加载器,将整个文本文件加载为一个Document对象
WebBaseLoader 网页 langchain-community beautifulsoup4 从URL加载静态HTML网页内容,并进行基础的文本清理
CSVLoader CSV文件 langchain-community 将CSV文件的每一个行转换为一个独立的Document对象
PyPDFLoader PDF文件 langchain-community pypdf 强打的PDF加载器,支持按页加载或将整个文档加载为单个Document
DirectoryLoader 整个目录 langchain-community(+ 特定文件加载器依赖) 加载指定目录下的所有文件,可以配合其他加载器来解析特定类型的文件

3. 代码示例

  • 文本加载器
import os
from langchain_community.document_loaders import TextLoader

# 定义我们要加载的文件路径
file_path = "example.txt"

# 1. 初始化 TextLoader
# 我们需要传入文件路径,并强烈建议指定编码为 utf-8 以避免中文乱码
loader = TextLoader(file_path, encoding="utf-8")

# 2. 调用 load() 方法加载文档
# 这会返回一个包含 Document 对象的列表
documents = loader.load()

# 打印加载结果
print(f"加载的文档数量: {len(documents)}")
print("\n打印第一个文档的详细信息:")
# 直接打印 document 对象,可以看到它的完整结构
print(documents[0])

print("\n第一个文档的内容 (page_content):")
print(documents[0].page_content)

print("\n第一个文档的元数据 (metadata):")
print(documents[0].metadata)
  • 网页加载器
from langchain_community.document_loaders import WebBaseLoader

# 1. 初始化 WebBaseLoader
# 它的参数可以是一个单独的 URL 字符串,也可以是一个包含多个 URL 的列表。
# loader = WebBaseLoader("https://www.example.com")
loader = WebBaseLoader(["https://python.langchain.com/docs/get_started/introduction"])

# 2. 调用 load() 方法加载文档
documents = loader.load()

# 打印结果
print(f"加载的文档数量: {len(documents)}")
# 网页内容通常很长,我们只打印一小部分看看
print("\n打印第一个文档内容的前300个字符:")
print(documents[0].page_content)
print("\n打印第一个文档的元数据:")
print(documents[0].metadata)
  • CSV加载器
from langchain_community.document_loaders import CSVLoader

csv_file_path = "example.csv"

# 1. 初始化 CSVLoader,指定文件路径和编码
loader = CSVLoader(file_path=csv_file_path, encoding="utf-8")

# 2. 调用 load() 方法加载文档
# CSVLoader 会将每一行(不包括表头)都转换为一个 Document 对象
documents = loader.load()

# 打印结果
print(f"加载的文档数量: {len(documents)}")

# 我们可以遍历并打印每个文档的详细信息
for i, doc in enumerate(documents):
    print(f"\n--- 文档 {i+1} ---")
    print(f"内容 (page_content):\n{doc.page_content}")
    print(f"元数据 (metadata): {doc.metadata}")
  • PDF加载器
from langchain_community.document_loaders import PyPDFLoader
import pprint # 导入pprint,让字典打印更美观

# 准备好的 PDF 文件路径
pdf_file_path = "./tutorial.pdf"

# # 1. 初始化 PyPDFLoader,采用默认加载模式
# loader_by_page = PyPDFLoader(pdf_file_path)
#
# # 2. 加载文档,这将按页分割
# docs_by_page = loader_by_page.load()
#
# # 打印结果
# print(f"按页加载模式,加载的文档(页面)数量: {len(docs_by_page)}")
#
# # 打印第一页的元数据,以观察其结构
# if docs_by_page:
#     print("\n第一页的元数据 (metadata):")
#     pprint.pprint(docs_by_page[0].metadata)

# 1. 初始化 PyPDFLoader,并设置加载模式为 "single"
loader_single_doc = PyPDFLoader(
    pdf_file_path,
    mode="single",
    pages_delimiter="\n--- 新页面 ---\n" # 自定义页面之间的分隔符
)

# 2. 加载文档
docs_single = loader_single_doc.load()

# 打印结果
print(f"\n单个文档模式,加载的文档数量: {len(docs_single)}")

# 打印这个唯一文档的元数据
if docs_single:
    print("\n单个文档的元数据 (metadata):")
    pprint.pprint(docs_single[0].metadata)
  • 目录加载器
from langchain_community.document_loaders import DirectoryLoader, TextLoader

# 准备我们的示例目录
dir_path = "my_documents"

# 1. 初始化 DirectoryLoader
loader = DirectoryLoader(
    path=dir_path,                 # 指定要加载的目录路径
    glob="**/*.md",                # 只加载所有子目录下的 markdown 文件
    loader_cls=TextLoader,         # 指定使用 TextLoader 来解析这些文件
    loader_kwargs={'encoding': 'utf-8'}, # 传递给 TextLoader 的参数
    use_multithreading=True,       # 开启多线程加载,提高效率
    show_progress=True,            # 显示加载进度条
    silent_errors=True             # 遇到无法加载的文件时静默跳过
)

# 2. 调用 load() 方法加载所有匹配的文件
documents = loader.load()

# 打印结果
print(f"\n成功加载的文档数量: {len(documents)}")
for doc in documents:
    print(f"\n--- 文档来源: {doc.metadata['source']} ---")
    print(doc.page_content)

三、文本分割

文本分割是RAG系统中的关键环节,连接原始数据和结构化知识库。它将长文档分割成小文本块,便于后续处理。若处理不当,后续步骤效率和准确性将受影响。

1. 文本分割的必要性

  • 模型上下文窗口限制:大语言模型存在上下文窗口限制,超出部分信息会被忽略。文本分割确保文档信息能被完整处理。
  • 对检索质量的直接影响:小而精的文本块能显著提高检索的准确率和召回率,使检索系统更精准地找到与用户查询相关的信息。
  • 成本、延迟与幻觉的考量:合理的文本分割策略可降低嵌入和存储成本,优化查询延迟,并有效抑制大语言模型产生幻觉的风险。

2. 核心参数解析

  • 块大小:块大小定义文本块的目标最大尺寸,单位取决于分割器类型。选择合适的块大小需权衡检索质量、成本和延迟。
  • 块重叠:块重叠定义连续块之间重复内容的长度,确保信息在相邻块之间平滑过渡,降低关键信息丢失风险。
  • 分块的难题:分块策略需在检索精度、上下文丰富度和系统效率之间进行权衡,不同场景需选择合适的策略。

3. 基础分割策略

  • 字符文本分割器:字符文本分割器依赖分隔符进行文本分割,优先按分隔符分割,再考虑块大小。其优点是简单快速,但局限性在于过于机械化。
from langchain_text_splitters import CharacterTextSplitter

# 准备一段示例文本
sample_text = """LangChain是一个强大的框架,旨在简化使用大型语言模型(LLM)构建应用程序的过程。它提供了一套全面的工具、组件和接口。

这些工具使得开发者可以轻松地将LLM与各种数据源连接起来,创建具有状态的、能够记忆上下文的复杂应用链。无论是构建问答机器人、文档摘要工具还是代码生成器,LangChain都提供了坚实的基础。"""

text_splitter = CharacterTextSplitter(
    separator="\n\n",
    chunk_size=80,
    chunk_overlap=10,
    length_function=len, # 使用 Python 内置的 len 函数计算长度
)

# 执行分割
chunks = text_splitter.split_text(sample_text)

# 打印结果
print(f"原始文本长度: {len(sample_text)}")
for i, chunk in enumerate(chunks):
    print(f"--- 块 {i+1} (长度: {len(chunk)}) ---")
    print(chunk)
    print("-" * (len(f"--- 块 {i+1} (长度: {len(chunk)}) ---")))

示例代码展示了字符文本分割器的使用,但其局限性在于无法处理复杂文本结构,可能导致超长块。

  • 递归字符文本分割器:递归字符文本分割器采用分而治之的递归策略,使用有序分隔符列表进行层层递进的分割,生成语义更连贯的文本块。
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 使用与之前相同的示例文本
# 准备一段示例文本
sample_text = """LangChain是一个强大的框架,旨在简化使用大型语言模型(LLM)构建应用程序的过程。它提供了一套全面的工具、组件和接口。

这些工具能够让开发者可以轻松地将LLM与各种数据源连接起来,创建具有状态的、能够记忆上下文的复杂应用链。无论是构建问答机器人、文档摘要工具还是代码生成器,LangChain都提供了坚实的基础。"""

# 初始化 RecursiveCharacterTextSplitter
recursive_splitter = RecursiveCharacterTextSplitter(
    chunk_size=80,
    chunk_overlap=10,
    length_function=len,
)

# 执行分割
chunks = recursive_splitter.split_text(sample_text)

# 打印结果
print(f"原始文本长度: {len(sample_text)}")
for i, chunk in enumerate(chunks):
    print(f"--- 块 {i+1} (长度: {len(chunk)}) ---")
    print(chunk)
    print("-" * (len(f"--- 块 {i+1} (长度: {len(chunk)}) ---")))

示例代码展示了递归字符文本分割器的使用,其优势在于能更好地处理复杂文本结构,生成更符合语义的文本块。

4. 按Token划分

  • Token文本分割器:Token文本分割器先将文本编码成Token ID列表,再按设定的块大小和块重叠进行分割,最后解码回文本字符串。
# 导入 TokenTextSplitter
from langchain_text_splitters import TokenTextSplitter

# 示例文本
sample_text = "LangChain是一个强大的框架,旨在简化使用大型语言模型(LLM)构建应用程序的过程。它提供了一套全面的工具、组件和接口。"

# 初始化 TokenTextSplitter
token_splitter = TokenTextSplitter(
    encoding_name="cl100k_base",
    chunk_size=10,
    chunk_overlap=2
)

# 执行分割
chunks = token_splitter.split_text(sample_text)

# 打印结果
for i, chunk in enumerate(chunks):
    print(f"--- 块 {i+1} ---")
    print(chunk)
    print("-" * (len(f"--- 块 {i+1} ---") + len(chunk)))

# 验证Token数量
import tiktoken
encoding = tiktoken.get_encoding("cl100k_base")
for i, chunk in enumerate(chunks):
    num_tokens = len(encoding.encode(chunk))
    print(f"块 {i+1} 的Token数: {num_tokens}")

示例代码展示了Token文本分割器的使用,但其局限性在于可能破坏多字节Unicode字符,导致乱码。

  • 递归Token分割:递归Token分割器融合了递归字符文本分割器的逻辑和Token计数的精确性,既尊重文本结构,又确保块大小精确。
from langchain_text_splitters import RecursiveCharacterTextSplitter

sample_text = """第一部分:LangChain 简介。
LangChain是一个强大的框架,旨在简化使用大型语言模型(LLM)构建应用程序的过程。

第二部分:核心组件。
它提供了一套全面的工具、组件和接口,例如文本分割器、向量数据库和代理。"""

# 1. 使用 from_tiktoken_encoder 工厂方法创建分割器
recursive_token_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    model_name="gpt-4",
    chunk_size=30,
    chunk_overlap=5,
)

# 2. 执行分割
chunks = recursive_token_splitter.split_text(sample_text)

# 3. 打印结果
for i, chunk in enumerate(chunks):
    print(f"--- 块 {i+1} ---")
    print(chunk)
    print("-" * (len(f"--- 块 {i+1} ---") + len(chunk)))

示例代码展示了递归Token分割器的使用,其优势在于结合了两种方法的优点,适用于多种场景。

5. 结构感知分割

  • 分割编程语言代码:代码分割器通过特定语言的分隔符列表进行分割,优先保留代码结构和语义单元,如类和函数。
from langchain_text_splitters import RecursiveCharacterTextSplitter, Language

# 一段 Python 示例代码
python_code = """
class DataProcessor:
    \"\"\"A simple class to process data.\"\"\"

    def __init__(self, data):
        self.data = data

    def process(self):
        \"\"\"Process the data.\"\"\"
        print("Processing data...")
        # Imagine complex processing logic here
        processed_data = [d * 2 for d in self.data]
        return processed_data

def helper_function():
    \"\"\"A standalone helper function.\"\"\"
    print("This is a helper function.")

# Script execution part
processor = DataProcessor()
result = processor.process()
helper_function()
"""

# 使用 from_language 创建 Python 代码分割器
python_splitter = RecursiveCharacterTextSplitter.from_language(
    language=Language.PYTHON,
    chunk_size=200,  # 按字符计数的块大小
    chunk_overlap=20
)

# 对代码字符串进行分割
chunks = python_splitter.split_text(python_code)

# 打印结果
for i, chunk in enumerate(chunks):
    print(f"--- 代码块 {i + 1} ---")
    print(chunk)
    print("-" * (len(f"--- 代码块 {i + 1} ---") + len(chunk)))

示例代码展示了如何使用代码分割器分割Python代码,生成的文本块保持了代码的完整性和逻辑性。

  • Markdown文本分割器:Markdown文本分割器依据Markdown标题级别进行分割,并将标题层级作为元数据附加到每个文本块。
from langchain_text_splitters import MarkdownHeaderTextSplitter

markdown_text = """
# LangChain 教程

## 第一章:入门

这是入门章节的内容。我们介绍 LangChain 的基本概念。

### 1.1 核心组件

组件是 LangChain 的基石。

## 第二章:进阶

这是进阶章节的内容。我们将深入探讨代理(Agent)和链(Chain)。

"""

# 定义要作为分割依据的标题
headers_to_split_on = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
]

# 初始化 MarkdownHeaderTextSplitter
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)

# 执行分割
chunks = markdown_splitter.split_text(markdown_text)

# 打印结果
for chunk in chunks:
    print(f"--- Markdown 块 ---")
    print(f"元数据: {chunk.metadata}")
    print(f"内容: {chunk.page_content.strip()}")
    print("-" * 20)

示例代码展示了Markdown文本分割器的使用,生成的文本块保留了文档的结构信息。

  • HTML文本分割器:HTML文本分割器通过解析HTML DOM树,依据指定的标题标签进行分割,并将标题作为元数据附加到每个文本块。
from langchain_text_splitters import HTMLHeaderTextSplitter

html_string = """
<!DOCTYPE html>
<html>
<body>
<div>
    <h1>主题一:AI 简介</h1>
    <p>人工智能是计算机科学的一个分支。</p>
    <h2>子主题 1.1:机器学习</h2>
    <p>机器学习是实现 AI 的一种方法。</p>
</div>
<div>
    <h1>主题二:自然语言处理</h1>
    <p>NLP 是 AI 的一个重要领域。</p>
</div>
</body>
</html>
"""

# 定义要作为分割依据的标题标签
headers_to_split_on = [
    ("h1", "Header 1"),
    ("h2", "Header 2"),
]

# 初始化 HTMLHeaderTextSplitter
html_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)

# 执行分割
# 注意:该分割器直接返回 Document 对象列表,类似于 split_documents
chunks = html_splitter.split_text(html_string)

# 打印结果
for chunk in chunks:
    print(f"--- HTML 块 ---")
    print(f"元数据: {chunk.metadata}")
    print(f"内容: {chunk.page_content.strip()}")
    print("-" * 20)

示例代码展示了HTML文本分割器的使用,生成的文本块保留了网页的结构信息。

四、文档嵌入

1. 什么是向量

  • 向量的定义:向量是具有大小和方向的几何对象。在计算机中,向量用有序数字列表表示,如二维向量 (x,y)。在机器学习中,向量通常有很高维度,每个分量代表数据在特定维度上的特征值,共同描绘出复杂对象的样貌。
  • 向量空间与运算:向量存在于向量空间中,可进行距离或角度计算。距离近或方向相似的向量被认为相关,这是语义检索的数学基础。
  • 文本的向量化:文本通过词嵌入技术转换为向量,模型学习海量数据后提炼出的向量浓缩了文本的语义信息。
  • 图片与用户行为的向量化:图片可转换为向量,蕴含物体类别、颜色分布等特征;用户行为历史也可聚合成向量,代表用户兴趣偏好。

2. 什么是文档嵌入

  • 文本嵌入是将文本映射到高维向量空间的技术,语义相近的文本在空间中位置靠近,可将判断语义相似性问题转化为计算向量距离问题。
  • 文本嵌入的应用:文本嵌入催生了语义检索,使系统能理解用户意图,找到与问题语义相关的内容,而不仅是关键词匹配。

3. LangChain的统一嵌入接口

  • 接口设计理念:LangChain设计了统一的嵌入接口,封装一致的调用范式,降低开发者在不同模型间切换的复杂度。
  • 核心方法:embed_documents 用于批量文档嵌入,embed_query 用于单个查询嵌入,映射模型的非对称语义搜索能力。
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 步骤一:加载文档
# 首先,我们指定要加载的文件路径和编码格式,创建一个TextLoader实例。
loader = TextLoader("./rag_sample_text.txt", encoding='utf-8')
# 然后调用load方法,将整个文件的内容加载到内存中。
raw_documents = loader.load()

# 步骤二:分割文档
# 接下来,我们实例化一个递归字符文本分割器,
# 设置每个块的最大长度为150个字符,块之间的重叠为20个字符。
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=150,
    chunk_overlap=20
)
# 使用这个分割器来处理我们刚刚加载的原始文档。
docs = text_splitter.split_documents(raw_documents)

# 检查分割结果
# 让我们打印一下,看看文档被分成了多少块。
print(f"文档被分割成了 {len(docs)} 个块。\n")
# 并且逐一查看每个块的内容和长度。
for i, doc in enumerate(docs):
    print(f"--- 块 {i + 1} (长度: {len(doc.page_content)}) ---")
    print(doc.page_content)

# 步骤三:创建嵌入模型实例
import os
from dotenv import load_dotenv
from langchain_community.embeddings import DashScopeEmbeddings

# 加载.env文件中的环境变量
load_dotenv()
# 从环境变量中获取API Key
api_key = os.getenv("DASHSCOPE_API_KEY")

# 实例化通义千问嵌入模型
embeddings_model = DashScopeEmbeddings(
    model="text-embedding-v4",
    dashscope_api_key=api_key
)

# 批量嵌入我们之前分割好的文档列表
document_vectors = embeddings_model.embed_documents([doc.page_content for doc in docs])

print(f"\n成功嵌入 {len(document_vectors)} 个文档块。")
print(f"第一个文档块的向量维度为: {len(document_vectors[0])}")
print(f"让我们看一下第一个向量的前5个值: {document_vectors[0][:5]}\n")

# 现在,我们来嵌入一个单独的用户查询
query = "RAG中的核心技术是什么?"
# 调用 embed_query 方法来获取查询向量
query_vector = embeddings_model.embed_query(query)

print(f"成功嵌入查询: '{query}'")
print(f"查询向量的维度为: {len(query_vector)}")
print(f"同样,我们看一下查询向量的前5个值: {query_vector[:5]}")

4. 本地嵌入模型

  • 本地化嵌入的优势

    • 数据隐私与安全:本地部署可确保数据处理在受控环境中进行,避免数据外泄风险。
    • 离线运行与系统稳定性:本地模型不依赖外部网络,避免因API服务波动导致应用故障。
    • 成本控制:本地化部署是一次性硬件投入,适合大规模、高频率应用,降低API费用。
    • 模型定制化:开源模型可在专有数据集上微调,提升特定领域检索效果。
  • 本地嵌入的核心引擎

    • Hugging Face与sentence-transformers:Hugging Face是开源模型社区,LangChain的 HuggingFaceEmbeddings 基于 sentence-transformers 库,简化了从模型加载到编码执行的过程。
    • 模型加载与编码执行:sentence-transformers 库分两步工作:模型加载和编码执行,LangChain的 HuggingFaceEmbeddings 类将这两步的配置参数通过 model_kwargs 和 encode_kwargs 解耦。
  • 使用HuggingFace嵌入模型

    • 模型下载与安装:可通过Git命令或Hugging Face官方CLI工具下载模型文件,安装 sentence-transformers库。
    • 实例化与嵌入计算:指定模型路径和参数,实例化 HuggingFaceEmbeddings,使用本地模型进行文档和查询的嵌入计算。
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 步骤一和二:加载和分割文档,这部分和之前的例子完全一样
loader = TextLoader("./rag_sample_text.txt", encoding='utf-8')
raw_documents = loader.load()

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=150,
    chunk_overlap=20
)
docs = text_splitter.split_documents(raw_documents)

# 打印分割结果以供检查
print(f"文档被分割成了 {len(docs)} 个块。\n")
for i, doc in enumerate(docs):
    print(f"--- 块 {i + 1} (长度: {len(doc.page_content)}) ---")
    print(doc.page_content)

# 步骤三:创建本地嵌入模型实例
from langchain_huggingface import HuggingFaceEmbeddings

# 指定我们刚刚下载到本地的模型路径
model_path = "./Qwen3-Embedding-0.6B"

# model_kwargs 用于配置模型的加载行为。
# 这里我们指定模型在CPU上运行。
model_kwargs = {'device': 'cpu'} # cuda

# encode_kwargs 用于配置编码过程的行为。
# 这里我们设置对生成的向量进行归一化处理。
encode_kwargs = {'normalize_embeddings': True}

# 实例化HuggingFace嵌入模型
hf_embeddings = HuggingFaceEmbeddings(
    model_name=model_path,      # 指定模型路径
    model_kwargs=model_kwargs,  # 传入模型加载参数
    encode_kwargs=encode_kwargs # 传入编码参数
)

print(f"\n成功加载本地模型: {hf_embeddings.model_name}")

# 使用本地模型进行嵌入计算
# 请注意,第一次运行时,模型会被加载到内存,可能需要一些时间。
local_doc_vectors = hf_embeddings.embed_documents([doc.page_content for doc in docs])

print(f"\n成功嵌入 {len(local_doc_vectors)} 个文档块。")
print(f"第一个文档块的向量维度为: {len(local_doc_vectors[0])}")
print(f"第一个向量的前5个值为: {local_doc_vectors[0][:5]}\n")

query = "RAG中的核心技术是什么?"
local_query_vector = hf_embeddings.embed_query(query)

print(f"\n使用本地模型成功嵌入查询: '{query}'")
print(f"查询向量的维度为: {len(local_query_vector)}")
print(f"查询向量的前5个值为: {local_query_vector[:5]}")

5. 使用GPU加速嵌入模型

  • GPU擅长并行处理大规模数据,速度飞快但成本高;CPU适合串行处理,灵活性强,成本效益高。
  • GPU适用于数据密集型、计算可并行化的任务,如深度学习中的矩阵运算。
  • 什么是CUDA
    • 定义:CUDA是NVIDIA为自家GPU打造的并行计算平台和编程模型,开发者可通过它将计算任务转移到GPU上执行。
    • 做用:CUDA让GPU成为强大的并行计算协处理器,充分发挥其并行处理能力。
  • GPU加速嵌入的原理
    • 数据传输:调用嵌入方法时,数据从主机内存复制到GPU显存。
    • 并行计算:GPU调度CUDA核心,对批次中的每个文本项同时执行前向传播计算,提升性能。
    • 结果返回:计算完成后,结果从显存复制回主机内存。
  • 实现步骤
    1. 安装NVIDIA驱动程序:确保系统安装了最新或较新的NVIDIA专有驱动程序,通过 nvidia-smi 命令检查驱动版本和GPU状态。
    2. 安装CUDA兼容的PyTorch:根据系统环境和NVIDIA驱动版本,在PyTorch官网选择合适的安装命令,安装支持GPU加速的PyTorch版本。
    3. 验证环境:通过简单代码验证PyTorch是否能识别并使用GPU
    4. 使用CUDA参数:在代码中将模型配置参数中的设备从 cpu 改为 cuda ,实现GPU加速。
# 验证环境
import torch

if torch.cuda.is_available():
    # 如果CUDA可用,打印成功信息和GPU型号
    print(f"CUDA is available. GPU: {torch.cuda.get_device_name(0)}")
else:
    # 如果CUDA不可用,打印提示信息
    print("CUDA is not available. Running on CPU.")		
# 启用cuda
model_kwargs = {'device': 'cuda'} # cpu ==> cuda

五、向量存储

1. 向量存储的重要性

  • 核心组件:向量存储是RAG系统的关键,如同大脑记忆库,负责存储海量知识库信息,并高效检索与问题意图契合的内容片段,为LLM生成准确回答提供原材料,其检索效率和质量直接影响RAG系统输出内容的准确性、深度和可靠性。
  • 传统数据库局限:传统数据库依赖关键词、标签或元数据字段进行精确匹配搜索,无法理解人类语言的深层语义和复杂上下文,存在局限性,如可能错过未直接使用关键词但极具参考价值的文章。
  • 向量存储优势:向量存储通过深度学习模型将非结构化数据转换为数值化向量,捕捉语义信息,将数据匹配方式从“字符串匹配”提升到“语义概念匹配”,实现基于“含义”的智能搜索,为RAG流程提供核心技术基础。

2. 向量搜索本质

  • 相似性搜索:向量搜索是“相似性搜索”或“最近邻搜索”,其核心任务是在庞大向量集合中,快速找出与查询向量最“相似”的N个向量,通过计算向量之间的“距离”来衡量内容相似度。
  • 高维向量空间:高维向量空间是一个数学框架,每个维度代表数据的潜在特征,由嵌入模型自动提取。知识库文档片段在该空间中被表示为“点”,向量搜索过程包括查询向量化、相似度计算和结果返回三个步骤。
  • 嵌入模型关键:嵌入模型的质量是向量搜索效果的天花板,卓越的嵌入模型能生成结构优良、语义区分度高的向量空间,使语义相近的内容向量紧密相连,而质量不佳的模型会导致向量空间混乱无序,影响搜索结果。

3. 核心度量方法

  • 欧氏距离:欧氏距离衡量两个向量在空间中的直线距离,距离值越小,内容越相似,它关注向量的绝对位置差异,适用于某些特定场景,但在处理文本数据时可能受篇幅长短等因素影响。
  • 余弦相似度:余弦相似度衡量两个向量之间的夹角大小,只关心方向是否一致,不受向量长度影响。其相似度得分范围为-1到1,适用于文本数据,能更纯粹地衡量语义相关性,忽略篇幅长短等外部因素。
  • 应用场景选择:在实际应用中,特别是处理文本数据时,余弦相似度通常更受青睐,因为它能更好地衡量文本的语义相关性,而欧氏距离则在某些特定场景下有其独特优势,开发者需根据具体需求选择合适的度量方法。

4. 近似最近邻搜索

  • 暴力搜索局限:精确最近邻搜索通过一对一比较查询向量和数据库中所有向量来找到最相似结果,但当数据量巨大时,这种方法效率极低,无法满足实际应用需求。
  • ANN搜索优势:近似最近邻搜索通过构建数据索引或导航地图,将相似向量组织成“社区”,查询时直接跳到最可能包含答案的“社区”进行精确比较,以微小精度损失换取数量级上的查询速度提升,是一种“空间换时间”的策略。
  • 参数调整平衡:开发者可通过调整ANN索引的构建参数和搜索参数,在查询速度和准确性之间找到最适合业务需求的平衡点,以实现高效且可靠的向量搜索效果。

5. 向量数据库选型

  • Pinecone:Pinecone是一个完全托管的云原生服务,提供Serverless架构,自动扩容缩容,性能稳定可靠,API简洁易用,支持元数据过滤和混合搜索等高级功能,但闭源且成本较高,适合企业级应用。

  • Milvus:Milvus是一个强大的开源项目,专为处理海量向量数据设计,性能卓越,可扩展性强,支持多种高级索引类型和混合搜索,能充分利用GPU加速,但学习曲线较陡,适合大规模企业级应用。

  • Qdrant:Qdrant是一个使用Rust语言编写的开源向量数据库,高性能且过滤能力强,允许在向量搜索时施加复杂元数据过滤条件,API友好,部署方式灵活,但生态系统和某些高级功能还不够完善,适合需要将向量搜索与复杂业务逻辑结合的应用。

  • Weaviate:Weaviate是一个开源的云原生向量数据库,模块化系统可自动处理数据向量化,降低开发者使用门槛,强调端到端的语义搜索能力,支持GraphQL API查询,适合希望快速构建具备深度语义理解能力的应用。

  • Elasticsearch:Elasticsearch是一个功能强大的传统搜索引擎,近年来增加了向量搜索支持,可实现传统关键词搜索与现代语义搜索的结合,适合已深度使用Elastic Stack的团队,但在向量搜索性能和效率方面不如专业向量数据库。

  • Redis:Redis是一个高性能内存键值数据库,通过Redis Stack等模块扩展了向量搜索支持,核心优势是低延迟,适合对延迟要求苛刻的实时应用,但存储成本高,不适合作为海量向量数据的主存储方案。

  • pgvector:pgvector是PostgreSQL的一个扩展插件,使PostgreSQL具备存储和查询向量的能力,可在同一数据库中处理结构化数据和向量数据,享受PostgreSQL的事务、备份、安全等能力,但在向量搜索性能和可扩展性上不如专业向量数据库。

  • FAISS

    • FAISS是一个高性能的相似性搜索库,由Facebook AI开源,使用C++实现,提供Python接口,核心优势是极致性能、大规模数据处理能力、GPU加速和灵活的索引选项,但学习曲线陡峭,功能单一,需自行实现数据持久化等外围功能。
    • 数据点与核心优势:FAISS是一个高性能相似性搜索库,使用C++实现,提供Python接口,核心优势包括极致性能、大规模数据处理能力、GPU加速和灵活的索引选项,但学习曲线陡峭,功能单一,需自行实现数据持久化等外围功能。
    • 缺点与挑战:FAISS学习曲线陡峭,C++内核和丰富参数使其上手难度大;作为库,不提供数据库功能,如数据持久化、事务管理和复杂元数据过滤等,需开发者自行实现。
  • ChromaDB

    • ChromaDB是一个为AI应用而生的开源向量数据库,优先考虑易用性和开发者体验,提供简单直观的API,内置数据持久化、元数据过滤等功能,部署方式灵活,但性能和可扩展性不如FAISS,适合快速原型开发和中小型项目。
    • 数据点与核心优势:ChromaDB是一个功能完备的开源向量数据库,提供简单直观的Python API,内置数据持久化、元数据过滤等功能,部署方式灵活,适合快速原型开发和中小型项目,但性能和可扩展性不如FAISS。
    • 缺点与挑战:ChromaDB在性能和可扩展性方面不如FAISS,处理超大规模数据集时存在差距;作为较新的项目,生态系统和稳定性曾受诟病,但随着社区发展和版本迭代,问题正在改善。

6. FAISS实战

  1. 创建项目文件夹:创建项目文件夹,将教程资料 tutorial.pdf 复制到该文件夹中,并准备好本地文本嵌入模型 Qwen3-Embedding-0.6B ,创建Python文件rag.py。
  2. 加载与分割文档:使用 TextLoader 加载文档, RecursiveCharacterTextSplitter 分割文档,将文档内容分割成多个小块,为后续向量存储创建做准备。
  3. 创建嵌入模型实例:使用 HuggingFaceEmbeddings 创建本地嵌入模型实例,指定模型路径、加载设备和编码参数,为文档块生成向量提供支持。
  4. 创建FAISS向量存储
    • 安装FAISS库:安装FAISS的Python库,根据需求选择 faiss-cpu 或 faiss-gpu 版本,为创建向量存储提供基础支持。
    • 使用LangChain创建:使用LangChain的 FAISS.from_documents 方法,从文档块和嵌入模型直接创建FAISS向量存储,LangChain在后台处理复杂工作,包括批量嵌入、初始化索引、添加数据等,最终返回功能完备的向量存储对象。
    • 向量存储工作流程: from_documents 方法工作流程包括接收输入、批量嵌入、初始化FAISS索引、添加数据和返回实例,实现了从文档块到向量存储的高效转换。
  5. 实现持久化
    • 保存到本地磁盘:使用LangChain的 save_local 方法,将向量存储保存到本地磁盘指定文件夹,生成 index.faiss 和 index.pkl 文件,实现向量存储的持久化,方便后续快速加载和重用。
    • 从本地加载:使用LangChain的 load_local 方法,从本地磁盘加载已保存的FAISS向量存储,需传入与创建时相同的嵌入模型实例和 allow_dangerous_deserialization=True 参数,确保加载过程安全可靠。
    • 加载注意事项:加载时需确保嵌入模型实例与创建时一致,避免查询向量和索引向量位于不同语义空间,导致搜索结果无意义;同时需注意 pickle 模块的安全性,仅在确定索引文件来源可信时,才设置 allow_dangerous_deserialization=True。
#rag.py  存储向量

from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 步骤一:加载文档
# 我们先指定要加载的文本文件路径
file_path = "./rag_sample_text.txt"
# 然后创建一个TextLoader实例
loader = TextLoader(file_path, encoding="utf-8")
# 调用load方法,将文件内容加载成Document对象
documents = loader.load()

# 步骤二:分割文档
# 创建一个递归字符文本分割器实例
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=100,  # 我们设定每个块的最大字符数为100
    chunk_overlap=20,  # 并且设定块与块之间有20个字符的重叠,这有助于保持上下文的连续性
    length_function=len, # 使用内置的len函数来计算文本长度
    add_start_index=True, # 在元数据中自动添加每个块在原始文档中的起始位置索引
)
# 调用split_documents方法,将加载的文档分割成多个小块
all_splits = text_splitter.split_documents(documents)
print(f"分割后文档块数量: {len(all_splits)}")

# 步骤三:创建本地嵌入模型实例
from langchain_huggingface import HuggingFaceEmbeddings
# 指定本地模型的存放路径
model_path = "./Qwen3-Embedding-0.6B"
# 指定模型加载时使用的设备,这里我们使用CPU
model_kwargs = {'device': 'cpu'}
# 指定编码时进行归一化,这对很多相似度计算有好处
encode_kwargs = {'normalize_embeddings': True}
# 创建HuggingFaceEmbeddings实例
hf_embeddings = HuggingFaceEmbeddings(
    model_name=model_path,      # 传入模型路径
    model_kwargs=model_kwargs,  # 传入模型加载参数
    encode_kwargs=encode_kwargs # 传入编码参数
)
print(f"\n成功加载本地模型: {hf_embeddings.model_name}")

# 步骤四:创建向量数据库实例
from langchain_community.vectorstores import FAISS
# 使用 LangChain 提供的 from_documents 类方法,从文档块和嵌入模型直接创建 FAISS 向量存储
# 这是最关键的一步,LangChain 在后台为我们处理了所有复杂的工作。
vector_store = FAISS.from_documents(documents=all_splits, embedding=hf_embeddings)
print("\nFAISS 向量存储创建成功!")
print(f"向量存储中包含了 {vector_store.index.ntotal} 个向量。")

# 步骤五: 保存向量存储到本地磁盘
# 我们只需要指定一个文件夹路径,LangChain 会自动在该路径下创建所需的文件。
folder_path = "my_faiss_index"
vector_store.save_local(folder_path)
print(f"\n向量存储已成功保存到文件夹: {folder_path}")
# laod.py  加载向量

# 这里大家一定要注意:加载向量存储时,我们必须传入一个与创建时完全相同类型的嵌入模型实例。
from langchain_huggingface import HuggingFaceEmbeddings
model_path = "./Qwen3-Embedding-0.6B"
model_kwargs = {'device': 'cpu'}
encode_kwargs = {'normalize_embeddings': True}
hf_embeddings = HuggingFaceEmbeddings(
    model_name=model_path,      # 指定模型路径
    model_kwargs=model_kwargs,  # 传入模型加载参数
    encode_kwargs=encode_kwargs # 传入编码参数
)
print(f"\n成功加载本地模型: {hf_embeddings.model_name}")

# 从本地磁盘加载向量存储
from langchain_community.vectorstores import FAISS
print("\n正在从本地加载向量存储...")
folder_path = "my_faiss_index"
# 调用 load_local 方法进行加载
vector_store = FAISS.load_local(
    folder_path,
    hf_embeddings,
    allow_dangerous_deserialization=True # 允许加载 pickle 文件
)
print("向量存储加载成功!")
print(f"向量存储中包含了 {vector_store.index.ntotal} 个向量。")

7. ChromaDB实战

  1. 创建新Python文件:创建新的Python文件 chroma.py ,将FAISS示例中的数据准备和嵌入模型创建代码复制过来,为使用ChromaDB做准备。
  2. 安装ChromaDB包:安装与ChromaDB对接的专属包 langchain_chroma,为后续创建和使用ChromaDB向量存储提供支持。
  3. 创建并持久化ChromaDB
    • 定义数据库路径:定义ChromaDB数据库文件存放路径 chroma_db_path,为创建和持久化ChromaDB向量存储指定存储位置。
    • 使用LangChain创建:使用LangChain的 Chroma.from_documents 方法,从文档块和嵌入模型创建并持久化ChromaDB向量存储,只需提供 persist_directory 路径,ChromaDB自动将数据保存到指定位置,实现无缝持久化。
    • 持久化优势:ChromaDB内置持久化功能,在创建向量存储时直接指定本地路径即可实现数据保存,无需手动调用 save 方法,简化了操作流程,提高了开发效率。
  4. 加载ChromaDB并搜索
    • 重新加载嵌入模型:重新加载嵌入模型实例,为从磁盘加载ChromaDB并执行搜索提供支持,确保加载过程与创建时一致。
    • 从磁盘加载:使用LangChain的 Chroma 类,传入 persist_directory 路径和嵌入模型实例,从磁盘加载已持久化的ChromaDB向量存储,返回立即可用的向量存储对象,方便后续搜索操作。
    • 加载与搜索流程:加载ChromaDB时,需确保嵌入模型实例与创建时一致,避免语义空间不匹配问题;加载后可直接执行搜索操作,利用ChromaDB的高效搜索能力,为AI应用提供强大的向量存储支持。
# chroma.py

from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 步骤一:加载文档
file_path = "./rag_sample_text.txt"
loader = TextLoader(file_path, encoding="utf-8")
documents = loader.load()

# 步骤二:分割文档
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=100,
    chunk_overlap=20,
    length_function=len,
    add_start_index=True,
)
all_splits = text_splitter.split_documents(documents)
print(f"分割后文档块数量: {len(all_splits)}")

# 步骤三:创建本地嵌入模型实例
from langchain_huggingface import HuggingFaceEmbeddings
model_path = "./Qwen3-Embedding-0.6B"
model_kwargs = {'device': 'cpu'}
encode_kwargs = {'normalize_embeddings': True}
hf_embeddings = HuggingFaceEmbeddings(
    model_name=model_path,
    model_kwargs=model_kwargs,
    encode_kwargs=encode_kwargs
)
print(f"\n成功加载本地模型: {hf_embeddings.model_name}")

# 步骤四:创建并持久化 ChromaDB 向量存储
from langchain_chroma import Chroma
# 我们先定义一个路径,用于存放数据库文件
chroma_db_path = "./my_chroma_db"

# 使用 from_documents 方法创建并持久化 Chroma 向量存储
# 大家注意看,这里我们只需提供一个 persist_directory 路径,数据就会被自动保存,非常方便
chroma_vector_store = Chroma.from_documents(
    documents=all_splits,
    embedding=hf_embeddings,
    persist_directory=chroma_db_path
)

print(f"ChromaDB 向量存储创建成功并已持久化到: {chroma_db_path}")

# loaded_chroma_store = Chroma(
#     persist_directory=chroma_db_path,
#     embedding_function=hf_embeddings
# )

8. FAISS与ChromaDB对比

  • 核心特性对比:FAISS定位为高性能相似性搜索库,核心优势是极致性能和大规模数据处理能力;ChromaDB定位为功能齐全的AI原生向量数据库,核心优势是易用性和开发体验。FAISS适合需要极致性能的场景,ChromaDB适合快速原型开发和中小型项目。
  • 适用场景差异:FAISS适用于需要极致性能和可扩展性的大规模系统、研究项目和底层机器学习管道;ChromaDB适用于快速原型开发、中小型项目、Web应用以及优先考虑开发效率和易用性的场景。
  • LangChain封装价值:LangChain通过强大的抽象层,为FAISS提供了类似ChromaDB的持久化能力,让开发者专注于上层业务逻辑创新,无需纠结底层工具差异。选择FAISS还是ChromaDB更多是架构决策,而非编码决策。

六、检索

1. LangChain检索器基础

  • 检索器定义:在LangChain框架中,检索器是实现信息检索的关键组件,负责从海量知识库中精准获取所需信息,为RAG应用提供数据支持。
  • 接口设计优势:检索器采用高度抽象化的接口设计,形成标准化的数据获取层,屏蔽底层向量数据库的复杂细节,使上层应用逻辑更简洁,体现软件工程中的“解耦”原则。
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 步骤一:加载文档
loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
documents = loader.load()

# 步骤二:分割文档
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=20,
    add_start_index=True,
)
all_splits = text_splitter.split_documents(documents)
print(f"分割后文档块数量: {len(all_splits)}")

# 步骤三:创建本地嵌入模型实例
from langchain_huggingface import HuggingFaceEmbeddings
hf_embeddings = HuggingFaceEmbeddings(
    model_name="./Qwen3-Embedding-0.6B",
    model_kwargs={'device': 'cuda'},
    encode_kwargs={'normalize_embeddings': True}
)
print(f"\n成功加载本地模型: {hf_embeddings.model_name}")

# 步骤四:创建并持久化 FAISS 向量存储
from langchain_community.vectorstores import FAISS
vector_store = FAISS.from_documents(
    documents=all_splits,
    embedding=hf_embeddings
)

# 步骤五:将向量存储转换为检索器
retriever = vector_store.as_retriever(
    search_type="similarity",
    search_kwargs={'k': 4}
)

# 未完,后面代码见下一个示例

通过代码示例展示检索器的创建过程,从加载文档、分割文档到创建向量存储并转换为检索器,直观呈现其在实际RAG流程中的应用。

2. 检索器的搜索类型

  • 相似性搜索

    • 工作原理:相似性搜索将查询文本转换为向量,计算与知识库中文档向量的相似度,返回最接近的 k 个文档。

    • 应用场景:适用于通用问答场景,当目标是快速找到与查询语义最接近的文档时,相似性搜索是高效可靠的选择。

    • 核心参数 k:参数 k 决定返回的最相似文档数量,默认值通常为4,可根据需求调整。

  • 最大边际相关性(MMR)

    • 解决的问题:针对相似性搜索中可能出现的文档内容高度重复问题,MMR算法通过平衡相关性和多样性,避免结果冗余。
    • 工作原理:MMR先找出与查询最相关的候选文档,然后迭代选择既相关又具多样性的文档,综合考量与查询的相关性和与已选文档的不相似性。
    • 核心参数:参数 k、fetch_k 和 lambda_mult 分别控制最终返回数量、候选文档数量和相关性与多样性的权重平衡。
  • 相似性得分阈值

    • 解决的问题:防止在查询不相关时返回大量不相关结果,避免误导大语言模型生成错误答案,浪费计算资源。
    • 工作原理:设置相似度得分“及格线”,只有得分高于阈值的文档才被返回,确保结果的相关性。
    • 核心参数:参数 score_threshold 设定相似度得分的最小值, k 用于限制返回结果数量。
# 代码承接上面部分

# 创建一个使用 MMR 的检索器,旨在获取 3 个既相关又具多样性的文档
# retriever_mmr = vectorstore.as_retriever(
#     search_type="mmr",
#     search_kwargs={'k': 3, 'fetch_k': 20, 'lambda_mult': 0.5}
# )
# retrieved_docs = retriever_mmr.invoke("什么是信息检索算法?")

# 创建一个只返回相似度得分高于 0.75 的文档的检索器
# retriever_threshold = vector_store.as_retriever(
#     search_type="similarity_score_threshold",
#     search_kwargs={'score_threshold': 0.75}
# )
# retrieved_docs = retriever_threshold.invoke("一个知识库里完全没有的话题")
# 预期当查询无关时,返回一个空列表,而不是强制返回不相关的结果

# 步骤六: 执行检索 ---
query = "如何减少检索结果的重复性?"
retrieved_docs = retriever.invoke(query)
print(f"\n执行检索,查询: '{query}'")

print("\n检索到的文档:")
for i, doc in enumerate(retrieved_docs):
    print(f"  文档 {i+1}:")
    print(f"  {doc.page_content}")
    
    
    
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

setup_and_retrieval = RunnableParallel(
    {"context": retriever, "question": RunnablePassthrough()}
)

setup_and_retrieval.invoke({"quesion": "什么是信息检索算法?"})

# {'context': [docs...], 'question': '...'}

from langchain_core.prompts import ChatPromptTemplate

template = """
你是一个用于问答任务的助手,使用以下检索到的上下文片段来回答问题。
你只能使用上下文片段中的内容回答问题,如果上下文片段中没有相关内容,你只需回答不知道。

<问题>
{question}
</问题>

<上下文>
{context}
</上下文>
"""
prompt = ChatPromptTemplate.from_template(template)

七、构建完整的RAG流程

1. RAG链的逻辑流与数据流

  • 数据流转过程:详细描述RAG链中数据的流动过程,包括输入、分叉、检索、传递、聚合、格式化、生成和解析等关键步骤。
  • 核心组件作用:解析直通管道、并行执行器、提示词模板、语言模型和输出解析器等组件在RAG链中的作用和实现方式。
  • 组装并运行RAG链
    • 环境与模型设置:介绍准备环境变量文件和初始化语言模型等步骤,为RAG链的运行提供基础支持。
    • 数据准备工作:展示加载、分割、嵌入和存储文档的过程,为检索器的创建和RAG链的运行奠定数据基础。
    • RAG链构建与调用:通过代码示例演示如何使用LangChain表达式语言将核心组件链接起来,形成完整的RAG链,并调用链生成答案。
  • 流程图

# --- 1. 加载、分块和索引---
# 导入所需的模块
import os
from dotenv import load_dotenv
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# 使用文档加载器从网页加载内容
loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
docs = loader.load()

# 使用文本分割器将文档分割成小块
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)

# 创建本地嵌入模型实例
model_path = "./Qwen3-Embedding-0.6B"
model_kwargs = {'device': 'cuda'}
encode_kwargs = {'normalize_embeddings': True}
hf_embeddings = HuggingFaceEmbeddings(
    model_name=model_path,      # 指定模型路径
    model_kwargs=model_kwargs,  # 传入模型加载参数
    encode_kwargs=encode_kwargs # 传入编码参数
)

# 创建向量存储,用于索引和搜索嵌入
# 这里我们使用FAISS作为向量数据库,并使用本地嵌入模型
vectorstore = FAISS.from_documents(documents=splits, embedding=hf_embeddings)

# 将向量存储转换为检索器接口
# 检索器是RAG链中用于获取相关文档的核心组件
retriever = vectorstore.as_retriever()

# 定义提示词模板,指导LLM如何利用上下文回答问题
template = """
你是一个用于问答任务的助手,使用以下检索到的上下文片段来回答问题。
你只能使用上下文片段中的内容回答问题,如果上下文片段中没有相关内容,你只需回答不知道。

<问题>
{question}
</问题>

<上下文>
{context}
</上下文>
"""
prompt = ChatPromptTemplate.from_template(template)

# 初始化语言模型
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")
llm = ChatOpenAI(
    base_url=api_base,
    api_key=api_key,
    model="qwen-plus",
    temperature=0.7,
    streaming=True,
    extra_body={"enable_thinking": False} # 根据需要设置
)

# 使用LCEL构建RAG链
rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

# --- 3. 调用链并打印结果---
question = "LangChain是什么?" # 它有什么特点?
response = rag_chain.invoke(question)

print("--- 问题 ---")
print(question)
print("\n--- RAG链的回答 ---")
print(response)

2. 构建对话式RAG

  • 无状态RAG的局限性
    • 对话场景问题:以对话场景为例,说明无状态RAG在处理连续追问时的局限,强调引入对话历史和状态管理的必要性。
    • 查询重写的重要性:强调查询重写在对话式RAG中的关键作用,通过改写用户问题为独立查询,解决上下文依赖问题。
# 两阶段高级检索策略示例

import os
from dotenv import load_dotenv
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_openai import ChatOpenAI
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter

# 使用文档加载器从网页加载内容
loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
docs = loader.load()

# 使用文本分割器将文档分割成小块
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)

# 创建本地嵌入模型实例
model_path = "./Qwen3-Embedding-0.6B"
model_kwargs = {'device': 'cuda'}
encode_kwargs = {'normalize_embeddings': True}
hf_embeddings = HuggingFaceEmbeddings(
    model_name=model_path,      # 指定模型路径
    model_kwargs=model_kwargs,  # 传入模型加载参数
    encode_kwargs=encode_kwargs # 传入编码参数
)

# 创建向量存储,用于索引和搜索嵌入
# 这里我们使用FAISS作为向量数据库,并使用本地嵌入模型
vectorstore = FAISS.from_documents(documents=splits, embedding=hf_embeddings)

# 将向量存储转换为检索器接口
# 检索器是RAG链中用于获取相关文档的核心组件
retriever = vectorstore.as_retriever()

# 定义用于生成多查询的LLM
# 创建语言模型实例
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")
llm = ChatOpenAI(
    base_url=api_base,
    api_key=api_key,
    model="qwen-plus",
    temperature=0,
    streaming=True,
    extra_body={"enable_thinking": False} # 根据需要设置
)

# 创建多查询检索器
# from_llm方法是一个方便的工厂函数,它会自动处理prompt和输出解析
multi_query_retriever = MultiQueryRetriever.from_llm(
    retriever=vectorstore.as_retriever(),
    llm=llm
)

# 示例:查看它如何为问题生成变体
import logging
logging.basicConfig()
logging.getLogger("langchain.retrievers.multi_query").setLevel(logging.INFO)

question = "AI Agent 的主要组成部分是什么"
# 在实际调用中,这些日志会显示生成的查询
# unique_docs = multi_query_retriever.invoke(question)

from langchain.retrievers import ContextualCompressionRetriever
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain.retrievers.document_compressors import CrossEncoderReranker

# 这里我们使用 HuggingFace 提供的一个常见的交叉编码器模型
cross_encoder_model = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-base")

# 创建重排器
reranker = CrossEncoderReranker(model=cross_encoder_model, top_n=5)

# 将重排器与多查询检索器结合,创建我们的最终检索器
compression_retriever = ContextualCompressionRetriever(
    base_compressor=reranker,
    base_retriever=multi_query_retriever
)

# 示例调用
compressed_docs = compression_retriever.invoke(question)
# compressed_docs将只包含经过重排后得分最高的5个文档
print(compressed_docs)
  • 使用LangGraph构建RAG流程
    • LangGraph核心概念:回顾LangGraph的核心思想,包括节点、边和状态的概念,以及如何利用LangGraph进行状态管理和复杂应用开发。
    • 核心组件实现:介绍两阶段高级检索策略、查询重写节点、检索节点、生成节点和决策节点的实现方法,展示如何将这些组件整合到LangGraph中。
    • 图的构建与运行:展示如何构建并编译LangGraph工作流,以及如何通过命令行界面运行交互式对话,实现对话式RAG Agent的功能。
# --- 1. 环境与模型设置 ---
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI

load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")

# 初始化用于不同任务的LLM
# Agent使用能力更强的模型以保证决策质量,生成模型可以使用更经济的选项
llm = ChatOpenAI(
    base_url=api_base,
    api_key=api_key,
    model="qwen-plus",
    temperature=0,
    streaming=True,
    extra_body={"enable_thinking": False}
)

# generation_llm  = ChatOpenAI(
#     base_url=api_base,
#     api_key=api_key,
#     model="qwen-plus",
#     temperature=0,
#     streaming=True,
#     extra_body={"enable_thinking": False}
# )

# --- 2. 知识库与高级检索器构建 ---
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain_community.document_loaders import WebBaseLoader
from langchain.retrievers import ContextualCompressionRetriever

# 加载文档
loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
docs = loader.load()
# 分割文档
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
# 创建向量存储
model_path = "./Qwen3-Embedding-0.6B"
model_kwargs = {'device': 'cuda'}
encode_kwargs = {'normalize_embeddings': True}
hf_embeddings = HuggingFaceEmbeddings(
    model_name=model_path,      # 指定模型路径
    model_kwargs=model_kwargs,  # 传入模型加载参数
    encode_kwargs=encode_kwargs # 传入编码参数
)
vectorstore = FAISS.from_documents(documents=splits, embedding=hf_embeddings)
base_retriever = vectorstore.as_retriever(search_kwargs={"k": 10}) # 初始检索更多文档

# 创建重排器 (Cross-Encoder)
reranker_model = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-base")
reranker = CrossEncoderReranker(model=reranker_model, top_n=3) # 重排后保留前3个

# 创建上下文压缩检索器
compression_retriever = ContextualCompressionRetriever(
    base_compressor=reranker, base_retriever=base_retriever
)

# --- 3. 定义图状态 ---
from typing import List, TypedDict, Annotated
from langchain_core.documents import Document
from langchain_core.messages import BaseMessage
import operator

class GraphState(TypedDict):
    """
    定义图的状态。

    Attributes:
        messages: 对话消息列表,会不断累积。
        question: 用户最新的原始问题。
        standalone_question: 经过重写后的独立问题。
        documents: 检索到的文档列表。
        generation: LLM生成的最终回答。
    """
    messages: Annotated[List[BaseMessage], operator.add]
    question: str
    standalone_question: str
    documents: List[Document]
    generation: str

# --- 4. 定义LangGraph节点 ---
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.output_parsers import StrOutputParser

def rewrite_node(state: GraphState) -> dict:
    """查询重写节点:根据对话历史,将用户最新问题改写为独立的查询。"""
    print("--- 节点: rewrite ---")
    question = state["question"]
    messages = state["messages"]

    # 如果只有一条消息,说明是对话开始,无需重写
    if len(messages) <= 1:
        standalone_question = question
    else:
        # 否则,创建查询重写链
        rewrite_prompt = ChatPromptTemplate.from_messages(
            [
                ("system",
                 "你是一个专业的AI助手。你的任务是根据对话历史,将用户最新的问题改写成一个独立的、无需上下文就能理解的问题。"),
                MessagesPlaceholder(variable_name="chat_history"),
                ("human", "{question}"),
            ]
        )
        rewriter_chain = rewrite_prompt | llm | StrOutputParser()
        standalone_question = rewriter_chain.invoke({
            "chat_history": messages[:-1],
            "question": question
        })

    print(f"重写后的问题: {standalone_question}")
    return {"standalone_question": standalone_question}

def retrieve_node(state: GraphState) -> dict:
    """检索节点:使用高级检索器获取相关文档。"""
    print("--- 节点: retrieve ---")
    standalone_question = state["standalone_question"]
    documents = compression_retriever.invoke(standalone_question)
    print(f"检索到 {len(documents)} 篇文档")
    return {"documents": documents}

def generate_node(state: GraphState) -> dict:
    """生成节点:根据上下文和问题生成最终回答。"""
    print("--- 节点: generate ---")
    standalone_question = state["standalone_question"]
    documents = state.get("documents", [])
    messages = state["messages"]

    # 根据是否存在文档来选择不同的提示
    if not documents:
        # 如果没有文档,说明是闲聊或无法回答
        prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个乐于助人的AI助手。请直接回答用户的问题。"),
            # 传递完整的消息历史以支持多轮对话
            MessagesPlaceholder(variable_name="chat_history"),
        ])
        chain = prompt | llm | StrOutputParser()
        input_data = {"chat_history": messages}
    else:
        # 如果有文档,使用RAG提示
        rag_prompt = ChatPromptTemplate.from_template(
            "你是一位智能问答助手。请根据以下提供的上下文信息来回答用户的问题。\n\n上下文:\n{context}\n\n问题:\n{question}"
        )

        # 将检索到的文档内容拼接成一个字符串
        context = "\n\n---\n\n".join([doc.page_content for doc in documents])

        chain = rag_prompt | llm | StrOutputParser()
        input_data = {"context": context, "question": standalone_question}

    # 将生成的回答添加到消息历史中
    generation = ""
    print("助手: ", end="", flush=True)
    for chunk in chain.stream(input_data):
        print(chunk, end="", flush=True)  # 逐块打印,实现流式效果
        generation += chunk  # 将块累加起来,形成完整回复

    # 换行,使下一次用户输入在新的一行开始
    print()

    # 将生成的完整回答添加到消息历史中
    new_messages = state["messages"] + [AIMessage(content=generation)]
    return {"generation": generation, "messages": new_messages}

def agent_router(state: GraphState) -> str:
    """
    条件路由:判断问题类型,决定是进行检索还是直接生成回答。
    """
    print("--- 决策: agent_router ---")
    standalone_question = state["standalone_question"]

    prompt = ChatPromptTemplate.from_template(
        """你是一个路由专家。根据用户的问题,将其分类为 'rag' 或 'chat'。
        'rag' 类型的问题需要从知识库中检索信息来回答。
        'chat' 类型的问题是闲聊、问候或不需要知识库就能回答的问题。
        只返回 'rag' 或 'chat'。

        问题: {question}"""
    )

    routing_chain = prompt | llm | StrOutputParser()
    decision = routing_chain.invoke({"question": standalone_question})

    print(f"路由决策: {decision}")
    # 增加对输出格式的健壮性处理
    if 'rag' in decision.lower():
        return "retrieve"
    else:
        return "generate"

# --- 5. 编排工作流 ---
from langgraph.graph import StateGraph, END
workflow = StateGraph(GraphState)

# 添加节点
workflow.add_node("rewrite", rewrite_node)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)

# 设置入口点
workflow.set_entry_point("rewrite")

# 添加边
workflow.add_conditional_edges(
    "rewrite",
    agent_router,
    {
        "retrieve": "retrieve",
        "generate": "generate",
    },
)
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", END)

# 编译图
app = workflow.compile()

# --- 6. 运行交互式对话 ---
def run_conversation():
    """在一个循环中运行对话。"""
    # 初始化对话历史
    messages = []
    print("您好!我是对话式RAG助手。输入 'exit' 退出。")

    while True:
        user_input = input("您: ")
        if user_input.lower() == 'exit':
            print("再见!")
            break

        # 将用户输入添加到消息历史中
        messages.append(HumanMessage(content=user_input))

        # 准备图的输入
        graph_input = {"messages": messages, "question": user_input}

        # 调用图的核心步骤
        final_state = app.invoke(graph_input)

        # 从最终状态获取AI的回答并更新到messages列表
        # 这一步是为了让下一轮对话拥有完整的历史记录
        ai_response_content = final_state.get('generation', '我不知道该如何回答。')
        messages = final_state.get('messages', messages)


run_conversation()