程序员的AI体验(六):LangChain 链
2025-05-17 18:43:27一、链的概念
- 定义与作用:链是LangChain中最核心的概念,指将多个模块化组件按特定顺序链接起来,执行多步骤任务的序列,可将复杂任务分解为更小、更易于管理的独立步骤。
- 核心优势:链在LangChain生态系统中扮演“粘合剂”和“骨架”的角色,连接不同人工智能组件,促成协同工作,简化应用开发过程,提升开发效率。
- 链的演进
- 在早期LangChain版本中,链通过继承Chain抽象基类实现,采用命令式的面向对象继承编程模型,存在组合灵活性不足等问题。
- LangChain引入了LangChain表达式语言(LCEL),采用声明式的管道操作符|连接组件,具有可组合性、可读性、灵活性、原生支持高级功能和可观察性等优势。
- 构建链的核心组件
- 模型:模型是链的核心引擎,包括大型语言模型(LLMs)和聊天模型(Chat Models),负责执行主要的智能任务。
- 提示模板:提示模板负责构建和格式化模型的输入,将用户输入与预设指令和示例结合,生成完整、结构化的提示。
- 输出解析器:输出解析器将模型返回的原始输出转换为更结构化、易于处理的格式,是连接非结构化智能与结构化数据的桥梁。
二、构建第一个LLM应用
1. LangChain表达式语言基础
- 基础链结构:最基础的链结构为提示 | 模型 | 解析器,是所有复杂链条的基础构建单元,体现了与大型语言模型交互的三个基本阶段。
- 工作原理:详细追踪了从输入到输出的完整数据流动过程,包括提示模板的格式化、模型的推理生成和输出解析器的结构化处理。
2. 从零构建翻译链
import os
# 1. 导入所需的核心组件
from dotenv import load_dotenv
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# 从.env 文件加载我们配置的环境变量
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")
# 2. 定义我们的提示模板
# 这个模板包含了两个动态变量:language 和 text。
# 它清晰地指示模型,将 text 从英语翻译成指定的 language。
system_template = "将下列内容从英语翻译为{language}"
prompt_template = ChatPromptTemplate.from_messages([
("system", system_template),
("human", "{text}")
])
# 3. 初始化我们的语言模型
# temperature=0 表示我们希望模型的输出更具确定性,减少随机发挥。
model = ChatOpenAI(
base_url=api_base,
api_key=api_key,
model_name="qwen-plus",
temperature=0,
extra_body={"enable_thinking": False})
# 4. 初始化输出解析器
# StrOutputParser 会将模型返回的聊天消息对象,转换为我们更易于使用的简单字符串。
parser = StrOutputParser()
# 5. 使用 LCEL 管道操作符 | 将三大组件链接成链
# 这行代码可以说是 LangChain 现代用法的精髓所在。
# 它以声明式的方式,定义了一个清晰的执行流程:输入 -> 提示模板 -> 模型 -> 解析器 -> 输出
chain = prompt_template | model | parser
# 6. 调用链并传入我们的输入数据
# 我们使用 invoke 方法来执行这条链。
# 输入是一个字典,其键必须与提示模板中定义的变量名完全对应。
input_data = {"language": "中文", "text": "Hello, how are you?"}
response = chain.invoke(input_data)
print(f"输入: {input_data}")
print(f"翻译结果: {response}")
input_data_japanese = {"language": "日语", "text": "I love programming."}
response_japanese = chain.invoke(input_data_japanese)
print(f"\n输入: {input_data_japanese}")
print(f"翻译结果: {response_japanese}")
- 通过完整的Python代码示例,展示了如何从零构建一个简单的翻译应用,包括定义提示模板、初始化模型、初始化输出解析器、链接组件和调用链等步骤。
- 该翻译链可将英文文本翻译成用户指定的任何语言,具有实际应用价值,体现了LangChain在构建实用AI应用中的强大能力。
3. LangSmith
- LangSmith是LangChain团队打造的用于调试、测试、评估和监控LLM应用的平台,提供了前所未有的“可观察性”,帮助开发者深入理解系统内部行为。
- 如何通过设置环境变量,将LangChain应用的运行轨迹数据发送到LangSmith平台,并通过平台查看详细的追踪信息,实现对复杂AI应用的调试和监控。
三、链的几种形式
1. 顺序链
- 顺序链定义:顺序链是将多个独立链串联形成复杂处理流水线,前一链输出是后一链输入,适用于多步任务,如先生成书名再写简介。
- 顺序链价值:顺序链可分解复杂任务为小任务,清晰易开发调试;控制逻辑流,按特定顺序执行操作;组合不同功能链,实现复杂功能。
- 简单顺序链
- 工作原理:简单顺序链处理简单线性任务流,假设每个子链只有单一字符串输入输出,依次传递直到最后一个子链完成处理。
- 局限性:简单顺序链只能处理单一输入输出,丢失中间结果,不适用于复杂场景。
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 加载环境变量
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")
llm = ChatOpenAI(
base_url=api_base,
api_key=api_key,
model="qwen-plus",
extra_body={"enable_thinking": False}
)
# 第一个链:根据主题生成书名
prompt_title = ChatPromptTemplate.from_template(
"你是一位专业的作家。请为一个关于 '{topic}' 主题的书籍想一个吸引人的标题。"
)
# 第二个链:为书名生成宣传语
prompt_tagline = ChatPromptTemplate.from_template(
"你是一位市场营销专家。请为书籍《{title}》创作一句引人注目的宣传语。"
)
# sequential_chain = (
# prompt_title | llm | StrOutputParser()
# | {"title": lambda title: title} # 将上一步的输出(标题字符串)包装成一个字典
# | prompt_tagline | llm | StrOutputParser()
# )
sequential_chain = (
{"title": prompt_title | llm | StrOutputParser()}
| prompt_tagline | llm | StrOutputParser()
)
# 运行链
topic = "人工智能在医疗领域的未来"
# 注意:LCEL 链的输入现在是一个字典,以匹配第一个提示模板的输入变量
final_tagline = sequential_chain.invoke({"topic": topic})
print("\n" + "="*20)
print(f"输入的主题: {topic}")
print(f"最终生成的宣传语: {final_tagline}")
print("="*20)
通过代码示例,展示如何使用简单顺序链实现根据主题生成书名和宣传语的功能,体现其简洁性。
- 通用顺序链
- 工作原理:通用顺序链通过共享上下文(字典)管理和传递变量,子链可接收多个输入变量并产生多个输出变量,最终返回完整字典。
- 优势:通用顺序链可处理复杂工作流,保留中间结果,提供灵活性和控制力,适用于非严格线性任务。
import os
import json
from operator import itemgetter # 导入 itemgetter 以便轻松选择字典键
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
# 加载环境变量
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")
# 初始化模型
llm = ChatOpenAI(
base_url=api_base,
api_key=api_key,
model="qwen-plus",
extra_body={"enable_thinking": False}
)
# 提示 1: 将评论翻译成中文
prompt_translate = ChatPromptTemplate.from_template(
"将以下评论翻译成中文:\n\n{review}"
)
# 提示 2: 总结中文评论
prompt_summarize = ChatPromptTemplate.from_template(
"用一句话总结以下评论:\n\n{chinese_review}"
)
# 提示 3: 识别原始评论的语言
prompt_identify_lang = ChatPromptTemplate.from_template(
"识别以下评论所使用的语言:\n\n{review}"
)
# 提示 4: 生成后续回复
prompt_followup = ChatPromptTemplate.from_template(
"请根据以下摘要,用指定的语言撰写一条礼貌的后续回复:\n\n摘要: {summary}\n\n语言: {language}"
)
translator = prompt_translate | llm | StrOutputParser()
summarizer = prompt_summarize | llm | StrOutputParser()
identifier = prompt_identify_lang | llm | StrOutputParser()
responder = prompt_followup | llm | StrOutputParser()
overall_chain = RunnablePassthrough.assign(
# 步骤 a: 翻译评论。它需要原始的 "review"。
chinese_review=itemgetter("review") | translator
).assign(
# 步骤 b: 总结。它需要上一步生成的 "english_review"。
summary=itemgetter("chinese_review") | summarizer
).assign(
# 步骤 c: 识别语言。它也需要原始的 "review"。
language=itemgetter("review") | identifier
).assign(
# 步骤 d: 生成回复。它需要 "summary" 和 "language"。
# responder 链会自动从传入的字典中提取它需要的键。
followup_message=responder
)
# 运行链
original_review = "Ce produit est absolument fantastique! La qualité est bien au-delà de mes attentes."
result = overall_chain.invoke({"review": original_review})
# {
# "review": "原始语言",
# "chinese_review": "翻译后的中文",
# "summary": "翻译后的摘要",
# "language": "语种"
# followup_message=responder
# }
# 打印结果
# 最终的字典包含了所有中间步骤的结果
print("\n" + "="*20)
print(f"原始评论: {original_review}")
print("\n--- 执行结果 ---")
# 为了更清晰地展示,只打印我们关心的最终输出变量
final_output = {
"english_review": result.get("chinese_review"),
"summary": result.get("summary"),
"language": result.get("language"),
"followup_message": result.get("followup_message"),
}
print(json.dumps(final_output, indent=2, ensure_ascii=False))
print("="*20)
通过多语言电商平台用户评论处理的复杂工作流,展示通用顺序链如何实现翻译、总结、语言识别和生成回复的功能。
2. 路由链
- 路由链定义:路由链是“调度中心”,分析输入后将任务路由到多个预定义目标链中的某一个去执行,实现动态选择处理路径。
- 与顺序链区别:顺序链执行路径固定、线性,路由链执行路径非确定性、条件性,引入分支逻辑。
- 与Agent关系:路由链决策逻辑预定义,Agent决策逻辑动态自主,路由链是构建复杂代理系统的基础之一。
- 实现过程
- 创建分类链:构建分类链对用户输入进行分类,输出清晰标签,如问答机器人将问题分类为不同领域。
- 创建路由函数:创建路由函数根据分类结果返回相应的专家链,用RunnableLambda包装使其能集成到LangChain表达式链中。
- 构建多主题专家问答机器人:通过代码示例,构建一个多主题专家问答机器人,展示路由链如何实现动态选择专家链回答问题。
import os
from dotenv import load_dotenv
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
# 加载环境变量
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")
# 初始化模型
# 使用一个速度较快的模型用于分类
classifier_llm = ChatOpenAI(
base_url=api_base,
api_key=api_key,
model="qwen-turbo",
temperature=0,
extra_body={"enable_thinking": False}
)
# 使用一个能力更强的模型用于回答问题
expert_llm = ChatOpenAI(
base_url=api_base,
api_key=api_key,
model="qwen-plus",
temperature=0.7,
extra_body={"enable_thinking": False}
)
# --- 第一步:创建分类链 ---
classification_prompt = ChatPromptTemplate.from_template(
"""
请根据以下用户问题,将其分类为 `LangChain`、`Dify` 或 `Other` 中的一种。
你只能回复分类列表中指定的词。
<question>
{question}
</question>
"""
)
classifier_chain = classification_prompt | classifier_llm | StrOutputParser()
# 测试分类链
test_question = "如何使用 LangChain 构建 Agent 应用?"
classification_result = classifier_chain.invoke({"question": test_question})
print(f"问题 '{test_question}' 的分类结果是: {classification_result}")
print("-" * 20)
# --- 第二步:创建各个领域的专家链 ---
# LangChain 专家链
langchain_prompt = ChatPromptTemplate.from_template(
"""你是一位 LangChain 专家。
请回答以下问题:
Question: {question}
Answer:"""
)
langchain_chain = langchain_prompt | expert_llm | StrOutputParser()
# Dify 专家链
dify_prompt = ChatPromptTemplate.from_template(
"""你是一位 Dify 专家。
请回答以下问题:
Question: {question}
Answer:"""
)
dify_chain = dify_prompt | expert_llm | StrOutputParser()
# 通用知识链
general_prompt = ChatPromptTemplate.from_template(
"""你是一位知识渊博的通用 AI 助手。
请回答以下问题:
Question: {question}
Answer:"""
)
general_chain = general_prompt | expert_llm | StrOutputParser()
# --- 第三步:创建路由函数 ---
def route(info):
topic = info["topic"].lower()
if "langchain" in topic:
return langchain_chain
elif "dify" in topic:
return dify_chain
else:
return general_chain
# --- 第四步:构建完整的路由链 ---
full_router_chain = (
# RunnableParallel
{
"topic": classifier_chain,
"question": lambda x: x["question"]
} | RunnableLambda(route))
# {
# "topic": "LangChain",
# "question": "如何使用 LangChain 的 LCEL 语法?"
# }
# --- 第五步:测试路由链 ---
# 测试 LangChain 问题
response_lc = full_router_chain.invoke({"question": "如何使用 LangChain 的 LCEL 语法?"})
print("--- LangChain 问题 ---")
print(response_lc)
print("\n")
# 测试 Dify 问题
response_hf = full_router_chain.invoke({"question": "如何在 Dify 中安装插件?"})
print("--- Dify 问题 ---")
print(response_hf)
print("\n")
# 测试通用问题
response_other = full_router_chain.invoke({"question": "中国的首都是哪里?"})
print("--- 通用问题 ---")
print(response_other)
3. API链
核心流程:API链使LLM与外部API交互,获取实时信息或执行操作,通过API文档让LLM理解API功能和使用方法。
import os
from dotenv import load_dotenv
import requests
import json
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
# 加载环境变量
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")
# --- 步骤 1: 定义一个工具 ---
@tool
def get_weather(latitude: float, longitude: float) -> str:
"""
获取指定经纬度的当前天气信息。
"""
base_url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": latitude,
"longitude": longitude,
"current_weather": "true" # API 要求布尔值为字符串 "true"
}
try:
response = requests.get(base_url, params=params)
response.raise_for_status() # 如果请求失败则抛出异常
weather_data = response.json()
return json.dumps(weather_data["current_weather"])
except requests.exceptions.RequestException as e:
return f"获取天气 API 数据时出错: {e}"
except KeyError:
return f"API 响应中缺少 'current_weather' 键: {response.text}"
# --- 步骤 2: 将工具绑定到模型 ---
llm = ChatOpenAI(
base_url=api_base,
api_key=api_key,
model="qwen-plus",
temperature=0.7,
extra_body={"enable_thinking": False}
)
# 将我们定义的工具列表绑定到模型上
tools = [get_weather]
llm_with_tools = llm.bind_tools(tools)
# --- 步骤 3: 构建完整的、包含最终回复的 API 链 ---
# 定义用于生成最终回复的提示模板
answer_prompt = ChatPromptTemplate.from_template(
"""请根据以下工具输出,为用户的问题提供一个自然的语言回答。
用户问题: {question}
工具输出: {tool_output}
""")
# 构建完整的链
api_chain = (
RunnablePassthrough.assign(
tool_output=(lambda x: x["question"]) | llm_with_tools | (
lambda msg: msg.tool_calls[0]["args"]) | get_weather
)
| answer_prompt
| llm
| StrOutputParser()
)
# --- 步骤 4: 调用完整的链 ---
# 现在,我们只需要调用这个封装好的链,即可完成从自然语言到最终答案的全过程
query = "武汉现在天气怎么样?"
final_answer = api_chain.invoke({"question": query})
print(f"查询: {query}")
print(f"最终回答: {final_answer}")
通过天气API示例,展示如何定义工具、将工具绑定到模型、构建完整的API调用链并调用,实现从自然语言到最终答案的全过程。
四、LangGraph框架
LangGraph通过状态图建模,赋予开发者显式控制流、结构化状态管理、高度可定制与可扩展能力。
1. 核心概念
- 状态(State):状态是图中共享数据结构,通常为类型化字典,用于存储应用快照。
- 节点(Nodes):节点是图中工作单元,可为函数或Runnable对象,接收状态输入并返回状态更新。
- 边(Edges):边连接节点,定义控制流,包括普通边和条件边,实现循环和复杂决策逻辑。
2. 使用LangGraph构建ReAct Agent
1. 定义状态和工具:定义Agent状态和工具,初始化模型并绑定工具。
1. 创建节点和边:创建调用模型和工具的节点,设置条件边和普通边,构建完整图。
1. 图的运行与结果:编译图并运行,通过流式输出观察每一步执行结果。
3. LangGraph高级特性
- 预构建Agent:LangGraph提供预构建函数,简化常见Agent模式创建过程,如create_react_agent。
import os
from dotenv import load_dotenv
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from datetime import datetime
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.checkpoint.memory import MemorySaver
# 0. 加载环境变量
# 确保您的 .env 文件中有 DASHSCOPE_API_BASE 和 DASHSCOPE_API_KEY
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")
# 2. 定义工具
@tool
def get_current_time() -> str:
"""返回当前的标准时间。"""
return datetime.now().isoformat()
@tool
def multiply(a: int, b: int) -> int:
"""计算两个整数的乘积。"""
return a * b
tools = [get_current_time, multiply]
# 初始化 LLM 并绑定工具
# 使用通义千问 qwen-plus 模型
llm = ChatOpenAI(
base_url=api_base,
api_key=api_key,
model="qwen-plus",
temperature=0,
streaming=True,
extra_body={"enable_thinking": False} # 根据需要设置
)
memory = MemorySaver()
agent_with_memory = create_react_agent(llm, tools, checkpointer=memory)
# 3. 在调用时,通过 config 指定一个唯一的线程 ID
# 同一个 thread_id 的调用会共享同一份记忆
config = {"configurable": {"thread_id": "my-session-1"}}
# 第一轮对话
inputs1 = {"messages": [HumanMessage(content="我的名字是张三。")]}
response1 = agent_with_memory.invoke(inputs1, config)
print("AI 回复 1:", response1['messages'][-1].content)
# 第二轮对话,在同一个会话中
inputs2 = {"messages": [HumanMessage(content="你知道我的名字吗?")]}
response2 = agent_with_memory.invoke(inputs2, config)
print("AI 回复 2:", response2['messages'][-1].content)
# 创建一个新会话
config_new = {"configurable": {"thread_id": "my-session-2"}}
inputs3 = {"messages": [HumanMessage(content="你知道我的名字吗?")]}
response3 = agent_with_memory.invoke(inputs3, config_new)
print("AI 回复 3 (新会话):", response3['messages'][-1].content)
- 记忆机制:通过检查点工具在图执行后持久化状态,实现长期记忆。
import os
from dotenv import load_dotenv
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from datetime import datetime
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.checkpoint.memory import MemorySaver
# 0. 加载环境变量
# 确保您的 .env 文件中有 DASHSCOPE_API_BASE 和 DASHSCOPE_API_KEY
load_dotenv()
api_base = os.getenv("DASHSCOPE_API_BASE")
api_key = os.getenv("DASHSCOPE_API_KEY")
# 2. 定义工具
@tool
def get_current_time() -> str:
"""返回当前的标准时间。"""
return datetime.now().isoformat()
@tool
def multiply(a: int, b: int) -> int:
"""计算两个整数的乘积。"""
return a * b
tools = [get_current_time, multiply]
# 初始化 LLM 并绑定工具
# 使用通义千问 qwen-plus 模型
llm = ChatOpenAI(
base_url=api_base,
api_key=api_key,
model="qwen-plus",
temperature=0,
streaming=True,
extra_body={"enable_thinking": False} # 根据需要设置
)
memory = MemorySaver()
agent_with_memory = create_react_agent(llm, tools, checkpointer=memory)
# 3. 在调用时,通过 config 指定一个唯一的线程 ID
# 同一个 thread_id 的调用会共享同一份记忆
config = {"configurable": {"thread_id": "my-session-1"}}
# 第一轮对话
inputs1 = {"messages": [HumanMessage(content="我的名字是张三。")]}
response1 = agent_with_memory.invoke(inputs1, config)
print("AI 回复 1:", response1['messages'][-1].content)
# 第二轮对话,在同一个会话中
inputs2 = {"messages": [HumanMessage(content="你知道我的名字吗?")]}
response2 = agent_with_memory.invoke(inputs2, config)
print("AI 回复 2:", response2['messages'][-1].content)
# 创建一个新会话
config_new = {"configurable": {"thread_id": "my-session-2"}}
inputs3 = {"messages": [HumanMessage(content="你知道我的名字吗?")]}
response3 = agent_with_memory.invoke(inputs3, config_new)
print("AI 回复 3 (新会话):", response3['messages'][-1].content)