Spiga

程序员的AI体验(六):LangChain 链

2025-05-17 18:43:27

一、链的概念

  • 定义与作用:链是LangChain中最核心的概念,指将多个模块化组件按特定顺序链接起来,执行多步骤任务的序列,可将复杂任务分解为更小、更易于管理的独立步骤。
  • 核心优势:链在LangChain生态系统中扮演“粘合剂”和“骨架”的角色,连接不同人工智能组件,促成协同工作,简化应用开发过程,提升开发效率。
  • 链的演进
    • 在早期LangChain版本中,链通过继承Chain抽象基类实现,采用命令式的面向对象继承编程模型,存在组合灵活性不足等问题。
    • LangChain引入了LangChain表达式语言(LCEL),采用声明式的管道操作符|连接组件,具有可组合性、可读性、灵活性、原生支持高级功能和可观察性等优势。
  • 构建链的核心组件
    • 模型:模型是链的核心引擎,包括大型语言模型(LLMs)和聊天模型(Chat Models),负责执行主要的智能任务。
    • 提示模板:提示模板负责构建和格式化模型的输入,将用户输入与预设指令和示例结合,生成完整、结构化的提示。
    • 输出解析器:输出解析器将模型返回的原始输出转换为更结构化、易于处理的格式,是连接非结构化智能与结构化数据的桥梁。

二、构建第一个LLM应用

1. LangChain表达式语言基础

  • 基础链结构:最基础的链结构为提示 | 模型 | 解析器,是所有复杂链条的基础构建单元,体现了与大型语言模型交互的三个基本阶段。
  • 工作原理:详细追踪了从输入到输出的完整数据流动过程,包括提示模板的格式化、模型的推理生成和输出解析器的结构化处理。

2. 从零构建翻译链

import os
# 1. 导入所需的核心组件
from dotenv import load_dotenv
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

# 从.env 文件加载我们配置的环境变量
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")

# 2. 定义我们的提示模板
# 这个模板包含了两个动态变量:language 和 text。
# 它清晰地指示模型,将 text 从英语翻译成指定的 language。
system_template = "将下列内容从英语翻译为{language}"
prompt_template = ChatPromptTemplate.from_messages([
    ("system", system_template),
    ("human", "{text}")
])

# 3. 初始化我们的语言模型
# temperature=0 表示我们希望模型的输出更具确定性,减少随机发挥。
model = ChatOpenAI(
    base_url=api_base,
    api_key=api_key,
    model_name="qwen-plus",
    temperature=0,
    extra_body={"enable_thinking": False})

# 4. 初始化输出解析器
# StrOutputParser 会将模型返回的聊天消息对象,转换为我们更易于使用的简单字符串。
parser = StrOutputParser()

# 5. 使用 LCEL 管道操作符 | 将三大组件链接成链
# 这行代码可以说是 LangChain 现代用法的精髓所在。
# 它以声明式的方式,定义了一个清晰的执行流程:输入 -> 提示模板 -> 模型 -> 解析器 -> 输出
chain = prompt_template | model | parser

# 6. 调用链并传入我们的输入数据
# 我们使用 invoke 方法来执行这条链。
# 输入是一个字典,其键必须与提示模板中定义的变量名完全对应。
input_data = {"language": "中文", "text": "Hello, how are you?"}
response = chain.invoke(input_data)

print(f"输入: {input_data}")
print(f"翻译结果: {response}")

input_data_japanese = {"language": "日语", "text": "I love programming."}
response_japanese = chain.invoke(input_data_japanese)
print(f"\n输入: {input_data_japanese}")
print(f"翻译结果: {response_japanese}")
  • 通过完整的Python代码示例,展示了如何从零构建一个简单的翻译应用,包括定义提示模板、初始化模型、初始化输出解析器、链接组件和调用链等步骤。
  • 该翻译链可将英文文本翻译成用户指定的任何语言,具有实际应用价值,体现了LangChain在构建实用AI应用中的强大能力。

3. LangSmith

  • LangSmith是LangChain团队打造的用于调试、测试、评估和监控LLM应用的平台,提供了前所未有的“可观察性”,帮助开发者深入理解系统内部行为。
  • 如何通过设置环境变量,将LangChain应用的运行轨迹数据发送到LangSmith平台,并通过平台查看详细的追踪信息,实现对复杂AI应用的调试和监控。

三、链的几种形式

1. 顺序链

  • 顺序链定义:顺序链是将多个独立链串联形成复杂处理流水线,前一链输出是后一链输入,适用于多步任务,如先生成书名再写简介。
  • 顺序链价值:顺序链可分解复杂任务为小任务,清晰易开发调试;控制逻辑流,按特定顺序执行操作;组合不同功能链,实现复杂功能。
  • 简单顺序链
    • 工作原理:简单顺序链处理简单线性任务流,假设每个子链只有单一字符串输入输出,依次传递直到最后一个子链完成处理。
    • 局限性:简单顺序链只能处理单一输入输出,丢失中间结果,不适用于复杂场景。
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 加载环境变量
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")

llm = ChatOpenAI(
    base_url=api_base,
    api_key=api_key,
    model="qwen-plus",
    extra_body={"enable_thinking": False}
)

# 第一个链:根据主题生成书名
prompt_title = ChatPromptTemplate.from_template(
    "你是一位专业的作家。请为一个关于 '{topic}' 主题的书籍想一个吸引人的标题。"
)

# 第二个链:为书名生成宣传语
prompt_tagline = ChatPromptTemplate.from_template(
    "你是一位市场营销专家。请为书籍《{title}》创作一句引人注目的宣传语。"
)

# sequential_chain = (
#     prompt_title | llm | StrOutputParser()
#     | {"title": lambda title: title}  # 将上一步的输出(标题字符串)包装成一个字典
#     | prompt_tagline | llm | StrOutputParser()
# )

sequential_chain = (
    {"title": prompt_title | llm | StrOutputParser()}
    | prompt_tagline | llm | StrOutputParser()
)

# 运行链
topic = "人工智能在医疗领域的未来"
# 注意:LCEL 链的输入现在是一个字典,以匹配第一个提示模板的输入变量
final_tagline = sequential_chain.invoke({"topic": topic})

print("\n" + "="*20)
print(f"输入的主题: {topic}")
print(f"最终生成的宣传语: {final_tagline}")
print("="*20)

通过代码示例,展示如何使用简单顺序链实现根据主题生成书名和宣传语的功能,体现其简洁性。

  • 通用顺序链
    • 工作原理:通用顺序链通过共享上下文(字典)管理和传递变量,子链可接收多个输入变量并产生多个输出变量,最终返回完整字典。
    • 优势:通用顺序链可处理复杂工作流,保留中间结果,提供灵活性和控制力,适用于非严格线性任务。
import os
import json
from operator import itemgetter # 导入 itemgetter 以便轻松选择字典键
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# 加载环境变量
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")

# 初始化模型
llm = ChatOpenAI(
    base_url=api_base,
    api_key=api_key,
    model="qwen-plus",
    extra_body={"enable_thinking": False}
)

# 提示 1: 将评论翻译成中文
prompt_translate = ChatPromptTemplate.from_template(
    "将以下评论翻译成中文:\n\n{review}"
)

# 提示 2: 总结中文评论
prompt_summarize = ChatPromptTemplate.from_template(
    "用一句话总结以下评论:\n\n{chinese_review}"
)

# 提示 3: 识别原始评论的语言
prompt_identify_lang = ChatPromptTemplate.from_template(
    "识别以下评论所使用的语言:\n\n{review}"
)

# 提示 4: 生成后续回复
prompt_followup = ChatPromptTemplate.from_template(
    "请根据以下摘要,用指定的语言撰写一条礼貌的后续回复:\n\n摘要: {summary}\n\n语言: {language}"
)

translator = prompt_translate | llm | StrOutputParser()
summarizer = prompt_summarize | llm | StrOutputParser()
identifier = prompt_identify_lang | llm | StrOutputParser()
responder = prompt_followup | llm | StrOutputParser()

overall_chain = RunnablePassthrough.assign(
    # 步骤 a: 翻译评论。它需要原始的 "review"。
    chinese_review=itemgetter("review") | translator
).assign(
    # 步骤 b: 总结。它需要上一步生成的 "english_review"。
    summary=itemgetter("chinese_review") | summarizer
).assign(
    # 步骤 c: 识别语言。它也需要原始的 "review"。
    language=itemgetter("review") | identifier
).assign(
    # 步骤 d: 生成回复。它需要 "summary" 和 "language"。
    # responder 链会自动从传入的字典中提取它需要的键。
    followup_message=responder
)

# 运行链
original_review = "Ce produit est absolument fantastique! La qualité est bien au-delà de mes attentes."
result = overall_chain.invoke({"review": original_review})

# {
#   "review": "原始语言",
#   "chinese_review": "翻译后的中文",
#   "summary": "翻译后的摘要",
#   "language": "语种"
#  followup_message=responder
# }


# 打印结果
# 最终的字典包含了所有中间步骤的结果
print("\n" + "="*20)
print(f"原始评论: {original_review}")
print("\n--- 执行结果 ---")
# 为了更清晰地展示,只打印我们关心的最终输出变量
final_output = {
    "english_review": result.get("chinese_review"),
    "summary": result.get("summary"),
    "language": result.get("language"),
    "followup_message": result.get("followup_message"),
}
print(json.dumps(final_output, indent=2, ensure_ascii=False))
print("="*20)

通过多语言电商平台用户评论处理的复杂工作流,展示通用顺序链如何实现翻译、总结、语言识别和生成回复的功能。

2. 路由链

  • 路由链定义:路由链是“调度中心”,分析输入后将任务路由到多个预定义目标链中的某一个去执行,实现动态选择处理路径。
  • 与顺序链区别:顺序链执行路径固定、线性,路由链执行路径非确定性、条件性,引入分支逻辑。
  • 与Agent关系:路由链决策逻辑预定义,Agent决策逻辑动态自主,路由链是构建复杂代理系统的基础之一。
  • 实现过程
    • 创建分类链:构建分类链对用户输入进行分类,输出清晰标签,如问答机器人将问题分类为不同领域。
    • 创建路由函数:创建路由函数根据分类结果返回相应的专家链,用RunnableLambda包装使其能集成到LangChain表达式链中。
    • 构建多主题专家问答机器人:通过代码示例,构建一个多主题专家问答机器人,展示路由链如何实现动态选择专家链回答问题。
import os
from dotenv import load_dotenv
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI

# 加载环境变量
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")

# 初始化模型
# 使用一个速度较快的模型用于分类
classifier_llm = ChatOpenAI(
    base_url=api_base,
    api_key=api_key,
    model="qwen-turbo",
    temperature=0,
    extra_body={"enable_thinking": False}
)

# 使用一个能力更强的模型用于回答问题
expert_llm = ChatOpenAI(
    base_url=api_base,
    api_key=api_key,
    model="qwen-plus",
    temperature=0.7,
    extra_body={"enable_thinking": False}
)

# --- 第一步:创建分类链 ---
classification_prompt = ChatPromptTemplate.from_template(
"""
请根据以下用户问题,将其分类为 `LangChain`、`Dify` 或 `Other` 中的一种。
你只能回复分类列表中指定的词。

<question>
{question}
</question>
"""
)

classifier_chain = classification_prompt | classifier_llm | StrOutputParser()

# 测试分类链
test_question = "如何使用 LangChain 构建 Agent 应用?"
classification_result = classifier_chain.invoke({"question": test_question})
print(f"问题 '{test_question}' 的分类结果是: {classification_result}")
print("-" * 20)

# --- 第二步:创建各个领域的专家链 ---

# LangChain 专家链
langchain_prompt = ChatPromptTemplate.from_template(
"""你是一位 LangChain 专家。
请回答以下问题:
Question: {question}
Answer:"""
)
langchain_chain = langchain_prompt | expert_llm | StrOutputParser()

# Dify 专家链
dify_prompt = ChatPromptTemplate.from_template(
"""你是一位 Dify 专家。
请回答以下问题:
Question: {question}
Answer:"""
)
dify_chain = dify_prompt | expert_llm | StrOutputParser()

# 通用知识链
general_prompt = ChatPromptTemplate.from_template(
"""你是一位知识渊博的通用 AI 助手。
请回答以下问题:
Question: {question}
Answer:"""
)
general_chain = general_prompt | expert_llm | StrOutputParser()

# --- 第三步:创建路由函数 ---
def route(info):
    topic = info["topic"].lower()
    if "langchain" in topic:
        return langchain_chain
    elif "dify" in topic:
        return dify_chain
    else:
        return general_chain

# --- 第四步:构建完整的路由链 ---
full_router_chain = (
# RunnableParallel
{
    "topic": classifier_chain,
    "question": lambda x: x["question"]
} | RunnableLambda(route))

# {
#   "topic": "LangChain",
#   "question": "如何使用 LangChain 的 LCEL 语法?"
# }

# --- 第五步:测试路由链 ---
# 测试 LangChain 问题
response_lc = full_router_chain.invoke({"question": "如何使用 LangChain 的 LCEL 语法?"})
print("--- LangChain 问题 ---")
print(response_lc)
print("\n")

# 测试 Dify 问题
response_hf = full_router_chain.invoke({"question": "如何在 Dify 中安装插件?"})
print("--- Dify 问题 ---")
print(response_hf)
print("\n")

# 测试通用问题
response_other = full_router_chain.invoke({"question": "中国的首都是哪里?"})
print("--- 通用问题 ---")
print(response_other)

3. API链

核心流程:API链使LLM与外部API交互,获取实时信息或执行操作,通过API文档让LLM理解API功能和使用方法。

import os
from dotenv import load_dotenv
import requests
import json
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

# 加载环境变量
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")


# --- 步骤 1: 定义一个工具 ---
@tool
def get_weather(latitude: float, longitude: float) -> str:
    """
    获取指定经纬度的当前天气信息。
    """
    base_url = "https://api.open-meteo.com/v1/forecast"
    params = {
        "latitude": latitude,
        "longitude": longitude,
        "current_weather": "true"  # API 要求布尔值为字符串 "true"
    }
    try:
        response = requests.get(base_url, params=params)
        response.raise_for_status()  # 如果请求失败则抛出异常
        weather_data = response.json()
        return json.dumps(weather_data["current_weather"])
    except requests.exceptions.RequestException as e:
        return f"获取天气 API 数据时出错: {e}"
    except KeyError:
        return f"API 响应中缺少 'current_weather' 键: {response.text}"

# --- 步骤 2: 将工具绑定到模型 ---
llm = ChatOpenAI(
    base_url=api_base,
    api_key=api_key,
    model="qwen-plus",
    temperature=0.7,
    extra_body={"enable_thinking": False}
)

# 将我们定义的工具列表绑定到模型上
tools = [get_weather]
llm_with_tools = llm.bind_tools(tools)

# --- 步骤 3: 构建完整的、包含最终回复的 API 链 ---

# 定义用于生成最终回复的提示模板
answer_prompt = ChatPromptTemplate.from_template(
    """请根据以下工具输出,为用户的问题提供一个自然的语言回答。
    用户问题: {question}
    工具输出: {tool_output}
    """)

# 构建完整的链
api_chain = (
        RunnablePassthrough.assign(
            tool_output=(lambda x: x["question"]) | llm_with_tools | (
                lambda msg: msg.tool_calls[0]["args"]) | get_weather
        )
        | answer_prompt
        | llm
        | StrOutputParser()
)

# --- 步骤 4: 调用完整的链 ---
# 现在,我们只需要调用这个封装好的链,即可完成从自然语言到最终答案的全过程
query = "武汉现在天气怎么样?"
final_answer = api_chain.invoke({"question": query})

print(f"查询: {query}")
print(f"最终回答: {final_answer}")

通过天气API示例,展示如何定义工具、将工具绑定到模型、构建完整的API调用链并调用,实现从自然语言到最终答案的全过程。

四、LangGraph框架

LangGraph通过状态图建模,赋予开发者显式控制流、结构化状态管理、高度可定制与可扩展能力。

1. 核心概念

  • 状态(State):状态是图中共享数据结构,通常为类型化字典,用于存储应用快照。
  • 节点(Nodes):节点是图中工作单元,可为函数或Runnable对象,接收状态输入并返回状态更新。
  • 边(Edges):边连接节点,定义控制流,包括普通边和条件边,实现循环和复杂决策逻辑。

2. 使用LangGraph构建ReAct Agent

1. 定义状态和工具:定义Agent状态和工具,初始化模型并绑定工具。
1. 创建节点和边:创建调用模型和工具的节点,设置条件边和普通边,构建完整图。
1. 图的运行与结果:编译图并运行,通过流式输出观察每一步执行结果。

3. LangGraph高级特性

  • 预构建Agent:LangGraph提供预构建函数,简化常见Agent模式创建过程,如create_react_agent。
import os
from dotenv import load_dotenv
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from datetime import datetime
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.checkpoint.memory import MemorySaver

# 0. 加载环境变量
# 确保您的 .env 文件中有 DASHSCOPE_API_BASE 和 DASHSCOPE_API_KEY
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")

# 2. 定义工具
@tool
def get_current_time() -> str:
    """返回当前的标准时间。"""
    return datetime.now().isoformat()

@tool
def multiply(a: int, b: int) -> int:
    """计算两个整数的乘积。"""
    return a * b

tools = [get_current_time, multiply]

# 初始化 LLM 并绑定工具
# 使用通义千问 qwen-plus 模型
llm = ChatOpenAI(
    base_url=api_base,
    api_key=api_key,
    model="qwen-plus",
    temperature=0,
    streaming=True,
    extra_body={"enable_thinking": False} # 根据需要设置
)

memory = MemorySaver()

agent_with_memory = create_react_agent(llm, tools, checkpointer=memory)

# 3. 在调用时,通过 config 指定一个唯一的线程 ID
#    同一个 thread_id 的调用会共享同一份记忆
config = {"configurable": {"thread_id": "my-session-1"}}

# 第一轮对话
inputs1 = {"messages": [HumanMessage(content="我的名字是张三。")]}
response1 = agent_with_memory.invoke(inputs1, config)
print("AI 回复 1:", response1['messages'][-1].content)

# 第二轮对话,在同一个会话中
inputs2 = {"messages": [HumanMessage(content="你知道我的名字吗?")]}
response2 = agent_with_memory.invoke(inputs2, config)
print("AI 回复 2:", response2['messages'][-1].content)

# 创建一个新会话
config_new = {"configurable": {"thread_id": "my-session-2"}}
inputs3 = {"messages": [HumanMessage(content="你知道我的名字吗?")]}
response3 = agent_with_memory.invoke(inputs3, config_new)
print("AI 回复 3 (新会话):", response3['messages'][-1].content)

  • 记忆机制:通过检查点工具在图执行后持久化状态,实现长期记忆。
import os
from dotenv import load_dotenv
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from datetime import datetime
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.checkpoint.memory import MemorySaver

# 0. 加载环境变量
# 确保您的 .env 文件中有 DASHSCOPE_API_BASE 和 DASHSCOPE_API_KEY
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")

# 2. 定义工具
@tool
def get_current_time() -> str:
    """返回当前的标准时间。"""
    return datetime.now().isoformat()

@tool
def multiply(a: int, b: int) -> int:
    """计算两个整数的乘积。"""
    return a * b

tools = [get_current_time, multiply]

# 初始化 LLM 并绑定工具
# 使用通义千问 qwen-plus 模型
llm = ChatOpenAI(
    base_url=api_base,
    api_key=api_key,
    model="qwen-plus",
    temperature=0,
    streaming=True,
    extra_body={"enable_thinking": False} # 根据需要设置
)

memory = MemorySaver()

agent_with_memory = create_react_agent(llm, tools, checkpointer=memory)

# 3. 在调用时,通过 config 指定一个唯一的线程 ID
#    同一个 thread_id 的调用会共享同一份记忆
config = {"configurable": {"thread_id": "my-session-1"}}

# 第一轮对话
inputs1 = {"messages": [HumanMessage(content="我的名字是张三。")]}
response1 = agent_with_memory.invoke(inputs1, config)
print("AI 回复 1:", response1['messages'][-1].content)

# 第二轮对话,在同一个会话中
inputs2 = {"messages": [HumanMessage(content="你知道我的名字吗?")]}
response2 = agent_with_memory.invoke(inputs2, config)
print("AI 回复 2:", response2['messages'][-1].content)

# 创建一个新会话
config_new = {"configurable": {"thread_id": "my-session-2"}}
inputs3 = {"messages": [HumanMessage(content="你知道我的名字吗?")]}
response3 = agent_with_memory.invoke(inputs3, config_new)
print("AI 回复 3 (新会话):", response3['messages'][-1].content)