程序员的AI体验(十二):Magentic-One与GraphFlow
2025-06-28 15:45:22一、Magentic-One系统解析
1. Magentic-One定位与价值
- 预构建专家系统:Magentic-One是一个基于AutoGen框架构建的预构建通用任务解决专家系统,具备处理复杂任务的能力。
- 双重价值:Magentic-One既可以作为强大的工具直接使用,也是学习复杂多智能体设计的最佳实践蓝图,为开发者提供设计参考。
2. 编排者双循环机制
- 内循环执行与反思:编排者的内循环围绕进度分类账进行任务执行与即时反思,快速迭代以确保任务按计划推进。
- 外循环规划调整:当内循环陷入停滞时,编排者启动外循环,重新审视任务分类账,调整宏观计划以应对复杂情况。
- 自适应能力:双循环机制赋予Magentic-One强大的自适应能力,使其能够灵活应对复杂多变的任务环境。
3. 专家角色能力矩阵
- WebSurfer:负责网页浏览与交互,能够控制浏览器状态,执行网页导航、交互和内容阅读等操作。
- FileSurfer:专注于本地文件系统操作,支持读取多种文件格式、浏览目录结构和文件管理。
- Coder:擅长代码编写和数据分析,根据任务需求生成Python脚本等代码,完成数据处理和信息整合。
- ComputerTerminal:为Coder提供安全的代码执行环境,确保代码安全运行并处理依赖库安装等。
4. 环境安装与安全守则
- 安装准备:运行Magentic-One前需安装autogen-ext[magentic-one]、Playwright及Chromium等依赖,确保环境完备。
- 安全运行建议:建议在Docker容器中运行Magentic-One,以隔离环境风险,保护系统安全。
- 人工审批机制:引入人工审批机制,对代码执行等关键操作进行审核,防止潜在风险。
pip install "autogen-ext[magentic-one]" # 时间比较长,耐心等待
playwrithr install --with-deps chromium
# PowerShell 下运行
# https://chocolatey.org/install
Set-ExecutionPolicy Bypass -Scope Process -Force; [System.Net.ServicePointManager]::SecurityProtocol = [System.Net.ServicePointManager]::SecurityProtocol -bor 3072; iex ((New-Object System.Net.WebClient).DownloadString('https://community.chocolatey.org/install.ps1'))
#
choco install ffmpeg
二、Magentic-One实战演练
1. 单智能体群聊演示
- 任务规划:Magentic-One编排器接收任务后,先进行自我规划,再将子任务分配给唯一成员Assistant。
- 结构化执行:即使仅有一个智能体,Magentic-One也能通过规划将模糊任务转化为有条不紊的执行过程。
import os
import dotenv
dotenv.load_dotenv()
import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import MagenticOneGroupChat
from autogen_agentchat.ui import Console
# 配置通义千问模型客户端
model_client = OpenAIChatCompletionClient(
# 指定要使用的通义千问模型,例如 qwen-plus, qwen-turbo, qwen-max 等
model="qwen-plus",
# 填入您在 DashScope 获得的 API Key
api_key=os.getenv("DASHSCOPE_API_KEY"),
# 填入通义千问的 OpenAI 兼容模式 API 地址
base_url=os.getenv("DASHSCOPE_API_BASE"),
# 为自定义模型提供信息
model_info={
"vision": False,
"function_calling": True,
"json_output": True,
"structured_output": True,
"family": "qwen",
}
)
# 创建一个标准的助理智能体实例
assistant = AssistantAgent(
name="Assistant",
model_client=model_client,
model_client_stream=True
)
# 创建一个 Magentic-One 群聊团队实例
# 第一个参数是一个列表,包含了所有团队成员,这里只有我们刚刚创建的 "助理"
# 第二个参数是团队(主要是编排者)自身进行思考和规划时使用的模型客户端
team = MagenticOneGroupChat([assistant], model_client=model_client)
async def main() -> None:
await Console(team.run_stream(task="请告诉我抖音的开发者是谁?"))
await model_client.close()
import asyncio
if __name__ == "__main__":
asyncio.run(main())
2. WebSurfer实时查询
- 任务识别:编排器识别出任务需实时网络数据后,调用WebSurfer进行搜索。
- 数据提取:WebSurfer从搜索结果中提取准确数据,完成任务。
- 高级抽象:专家智能体封装了工具能力,简化了开发者对底层工具的使用,降低了开发门槛。
import os
import dotenv
dotenv.load_dotenv()
# 配置通义千问模型客户端
from autogen_ext.models.openai import OpenAIChatCompletionClient
model_client = OpenAIChatCompletionClient(
model="qwen-plus",
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_API_BASE"),
model_info={
"vision": False,
"function_calling": True,
"json_output": True,
"structured_output": True,
"family": "qwen",
}
)
# 实例化专家智能体
# 我们创建一个 MultimodalWebSurfer 的实例,并为其命名为 "WebSurfer"
# 这个智能体天生就具备了所有浏览网页和解析内容的能力
from autogen_ext.agents.web_surfer import MultimodalWebSurfer
surfer = MultimodalWebSurfer(
name="WebSurfer",
model_client=model_client
)
# 创建 MagenticOne 团队
from autogen_agentchat.teams import MagenticOneGroupChat
team = MagenticOneGroupChat([surfer], model_client=model_client)
from autogen_agentchat.ui import Console
async def main() -> None:
# 执行任务
await Console(team.run_stream(task="武汉今天的天气如何?"))
await model_client.close()
import asyncio
if __name__ == "__main__":
asyncio.run(main())
3. 一站式辅助类部署
自动创建团队:MagenticOne辅助类自动创建包含网络爬虫、文件管理、编码、终端等角色的完整专家团队。
- 一键部署:开发者只需实例化MagenticOne类,即可快速部署全功能专家团队,简化开发流程。
- 人工审批机制:通过人工审批机制,在代码执行前进行审核,确保安全可控。
- 任务示例:以API数据保存脚本任务为例,展示一站式部署的强大功能。
import os
import dotenv
dotenv.load_dotenv()
# 配置通义千问模型客户端
from autogen_ext.models.openai import OpenAIChatCompletionClient
model_client = OpenAIChatCompletionClient(
model="qwen-plus",
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_API_BASE"),
model_info={
"vision": False,
"function_calling": True,
"json_output": True,
"structured_output": True,
"family": "qwen",
}
)
# 步骤 1: 定义人工审批函数
# 导入批准请求和批准响应
from autogen_agentchat.agents import ApprovalRequest, ApprovalResponse
def approval_func(request: ApprovalRequest) -> ApprovalResponse:
"""一个简单的审批函数,在代码执行前请求用户确认。"""
# 打印将要被执行的代码,让用户清晰地知道智能体想要做什么
print(f"--- 待执行代码审批 ---\n{request.code}\n--------------------")
# 向用户提问,并获取输入
user_input = input("您是否批准执行以上代码? (y/n):\n ").strip().lower()
# 根据用户的输入,返回一个包含批准状态和原因的 ApprovalResponse 对象
if user_input == 'y':
# 如果用户输入 'y',则批准执行
return ApprovalResponse(approved=True, reason="用户已批准代码执行")
else:
# 否则,拒绝执行
return ApprovalResponse(approved=False, reason="用户已拒绝代码执行")
# 步骤2: 创建一个代码执行器实例
# 导入代码执行器。我们将使用本地命令行执行器。
# 注意:出于安全考虑,官方推荐使用 DockerCommandLineCodeExecutor。
# 但如果环境中没有安装或运行 Docker,使用 LocalCommandLineCodeExecutor 是一个可行的替代方案。
# 使用本地执行器意味着代码将直接在当前计算机上运行,请确保只批准执行可信的代码。
from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor
code_executor = LocalCommandLineCodeExecutor()
# 步骤 3: 一键式创建 Magentic-One 团队
from autogen_ext.teams.magentic_one import MagenticOne
# 我们只需实例化 MagenticOne 类。
# - client: 传入我们创建的模型客户端
# - code_executor: 传入代码执行器实例
# - approval_func: 将我们刚刚定义的审批函数作为参数传入,从而激活代码执行前的人工审核机制
team = MagenticOne(client=model_client, code_executor=code_executor, approval_func=approval_func)
# 定义主函数
from autogen_agentchat.ui import Console
async def main() -> None:
# 执行任务
task = "编写一个 Python 脚本,从 'https://jsonplaceholder.typicode.com/todos/1' 获取数据,并将结果保存到D盘的 api_result.json 文件中。"
await Console(team.run_stream(task=task))
await model_client.close()
import asyncio
if __name__ == "__main__":
asyncio.run(main())
三、GraphFlow工作流编排
1. GraphFlow设计哲学
- 确定性流程引擎:GraphFlow作为确定性流程引擎,通过有向图精确控制智能体执行顺序,适用于企业级复杂流程。
- 补充自由对话模式:GraphFlow与自由对话模式互为补充,满足不同场景需求,扩展AutoGen的应用范围。
2. 核心组件与解耦思想
- DiGraphBuilder:DiGraphBuilder负责构建工作流蓝图,添加智能体节点和定义执行路径。
- Node与Edge:Node代表智能体,Edge定义节点间的执行依赖关系,共同构成工作流逻辑。
- 执行图与消息图分离:GraphFlow实现执行图与消息图解耦,提升系统灵活性和可维护性。
3. 顺序工作流
- 工作流构建:通过添加节点、定义边构建顺序工作流,确保任务按既定顺序传递。
- 任务执行:以气候变化短文任务为例,验证GraphFlow顺序工作流的有效性。
import os
import dotenv
dotenv.load_dotenv()
from autogen_ext.models.openai import OpenAIChatCompletionClient
# 配置通义千问模型客户端
model_client = OpenAIChatCompletionClient(
# 指定要使用的通义千问模型,例如 qwen-plus, qwen-turbo, qwen-max 等
model="qwen-plus",
# 填入您在 DashScope 获得的 API Key
api_key=os.getenv("DASHSCOPE_API_KEY"),
# 填入通义千问的 OpenAI 兼容模式 API 地址
base_url=os.getenv("DASHSCOPE_API_BASE"),
# 为自定义模型提供信息
model_info={
"vision": False,
"function_calling": True,
"json_output": True,
"structured_output": True,
"family": "qwen",
}
)
from autogen_agentchat.agents import AssistantAgent
# 创建“作家”智能体
# 它的名字是 "writer",这个名字在图中将作为节点的唯一标识
# 它被赋予了明确的系统指令:编写一段关于气候变化的短文
writer = AssistantAgent(
name="writer",
model_client=model_client,
model_client_stream=True,
system_message="你是一位才华横溢的作家。你的任务是撰写一段关于气候变化的短文。",
)
# 创建“评论家”智能体
# 它的名字是 "reviewer",同样是唯一的节点标识
# 它的系统指令是评审草稿并提出改进建议
reviewer = AssistantAgent(
name="reviewer", # 智能体的名字
model_client=model_client, # 指定模型客户端
model_client_stream=True,
system_message="你是一位经验丰富的编辑。你的任务是评审草稿,并对其提出具体的改进建议。", # 定义角色和行为
)
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
# 实例化一个有向图构建器
builder = DiGraphBuilder()
# 向图中添加作家节点和评论家节点
builder.add_node(writer).add_node(reviewer)
# 在作家和评论家之间添加一条边
# 这条边定义了执行的顺序:当作家完成任务后,轮到评论家行动
builder.add_edge(writer, reviewer)
# 调用 build() 方法来完成图的构建和验证
# 这个方法会返回一个 DiGraph 对象,它是工作流的静态表示
graph = builder.build()
# 创建 GraphFlow 实例
# participants 参数接收一个包含所有参与智能体的列表
# graph 参数接收我们刚刚构建的图对象
flow = GraphFlow(
participants=[writer, reviewer], # 明确告知流程中有哪些参与者
graph=graph, # 传入定义好的图结构
)
# 定义一个异步主函数来运行工作流
from autogen_agentchat.ui import Console
async def main():
# 使用 Console 将工作流的运行过程实时、格式化地输出到控制台
# run_stream 方法启动工作流,并传入初始任务
# 这个任务将首先被发送给图的源节点,即“作家”
await Console(flow.run_stream(task="请撰写一段关于气候变化的短文。"))
import asyncio
if __name__ == "__main__":
asyncio.run(main())
4. 并行分流与合流机制
- 并行处理:在出版流程中,语法编辑和风格编辑并行处理稿件,提升效率。
- 合流逻辑:最终审稿人等待语法和风格编辑都完成后才开始整合,确保任务完整。
- AND合流逻辑:GraphFlow默认采用AND合流逻辑,保障前置任务全部完成。
import os
import dotenv
dotenv.load_dotenv()
from autogen_ext.models.openai import OpenAIChatCompletionClient
# 配置通义千问模型客户端
model_client = OpenAIChatCompletionClient(
# 指定要使用的通义千问模型,例如 qwen-plus, qwen-turbo, qwen-max 等
model="qwen-plus",
# 填入您在 DashScope 获得的 API Key
api_key=os.getenv("DASHSCOPE_API_KEY"),
# 填入通义千问的 OpenAI 兼容模式 API 地址
base_url=os.getenv("DASHSCOPE_API_BASE"),
# 为自定义模型提供信息
model_info={
"vision": False,
"function_calling": True,
"json_output": True,
"structured_output": True,
"family": "qwen",
}
)
from autogen_agentchat.agents import AssistantAgent
# 创建“作家”智能体
writer = AssistantAgent(
"writer",
model_client=model_client,
model_client_stream=True,
system_message="你是一名作家,负责撰写关于可再生能源的短文初稿。",
)
# 创建“语法编辑”智能体
editor1 = AssistantAgent(
"editor1",
model_client=model_client,
model_client_stream=True,
system_message="你是一名语法专家,请仔细检查文稿中的语法、拼写和标点错误。",
)
# 创建“风格编辑”智能体
editor2 = AssistantAgent(
"editor2",
model_client=model_client,
model_client_stream=True,
system_message="你是一名风格编辑,请评估文稿的流畅性、清晰度和整体风格,并提出改进建议。",
)
# 创建“最终审稿人”智能体
final_reviewer = AssistantAgent(
"final_reviewer",
model_client=model_client,
model_client_stream=True,
system_message="你是一名主编,请整合语法和风格编辑的意见,形成最终的、可发布的版本。",
)
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
# 实例化构建器
builder = DiGraphBuilder()
# 添加所有四个智能体作为节点
builder.add_node(writer).add_node(editor1).add_node(editor2).add_node(final_reviewer)
# --- 定义分流逻辑 ---
# 从作家到语法编辑
builder.add_edge(writer, editor1)
# 从作家到风格编辑
builder.add_edge(writer, editor2)
# --- 定义合流逻辑 ---
# 从语法编辑到最终审稿人
builder.add_edge(editor1, final_reviewer)
# 从风格编辑到最终审稿人
builder.add_edge(editor2, final_reviewer)
# 构建并验证图
graph = builder.build()
# 创建 GraphFlow 实例
flow = GraphFlow(
participants=builder.get_participants(), # 使用 builder.get_participants() 可以方便地获取所有节点
graph=graph,
)
from autogen_agentchat.ui import Console
# 定义异步主函数
async def main():
await Console(flow.run_stream(task="请撰写一篇关于太阳能优势的短文。"))
import asyncio
# 运行主函数
if __name__ == "__main__":
asyncio.run(main())
5. 条件边与动态路由
- 条件函数:通过lambda条件函数将自然语言输出转为布尔路由信号,实现动态流程控制。
- 循环与终止:结合MaxMessageTermination防止无限循环,确保流程可控。
import os
import dotenv
dotenv.load_dotenv()
from autogen_ext.models.openai import OpenAIChatCompletionClient
# 配置通义千问模型客户端
model_client = OpenAIChatCompletionClient(
# 指定要使用的通义千问模型,例如 qwen-plus, qwen-turbo, qwen-max 等
model="qwen-plus",
# 填入您在 DashScope 获得的 API Key
api_key=os.getenv("DASHSCOPE_API_KEY"),
# 填入通义千问的 OpenAI 兼容模式 API 地址
base_url=os.getenv("DASHSCOPE_API_BASE"),
# 为自定义模型提供信息
model_info={
"vision": False,
"function_calling": True,
"json_output": True,
"structured_output": True,
"family": "qwen",
}
)
from autogen_agentchat.agents import AssistantAgent
# 创建“生成者”智能体
generator = AssistantAgent(
name="generator",
model_client=model_client,
model_client_stream=True,
system_message="你是一名创意大师,负责根据用户请求生成创新的想法。",
)
# 创建“评审者”智能体
reviewer = AssistantAgent(
name="reviewer",
model_client=model_client,
model_client_stream=True,
system_message="你是一名严格的评论家。评审生成的想法,如果满意,请明确说出 '赞同'。如果不满意,请提出具体修改意见。",
)
# 纠创建“总结者”智能体
summarizer = AssistantAgent(
name="summarizer",
model_client=model_client
model_client_strean=True,
system_message="你是一名秘书,负责总结用户的初始请求和评审者最终批准的想法。",
)
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
# 实例化构建器
builder = DiGraphBuilder()
# 添加所有节点
builder.add_node(generator).add_node(reviewer).add_node(summarizer)
# 1. 从生成者到评审者:这是一条无条件边,生成后总是需要评审
builder.add_edge(generator, reviewer)
# 2. 从评审者回到生成者(循环边)
# 只有当评审者的回复中不包含 "赞同" 时,这条边才会被激活
builder.add_edge(
reviewer,
generator,
condition=lambda msg: "赞同" not in msg.to_model_text(),
)
# 3. 从评审者到总结者(退出循环边)
# 只有当评审者的回复中包含 "赞同" 时,这条边才会被激活
builder.add_edge(
reviewer,
summarizer,
condition=lambda msg: "赞同" in msg.to_model_text(),
)
# 设置图的入口点
# 在有循环的图中,可能没有明确的“源节点”(因为每个节点都可能有入边)
# 此时必须使用 set_entry_point() 来告诉 GraphFlow 从哪里开始
builder.set_entry_point(generator)
# 构建图
graph = builder.build()
# 设置一个终止条件,防止无限循环
from autogen_agentchat.conditions import MaxMessageTermination
# 这里我们设定,总消息数达到10条后,无论如何都停止
termination_condition = MaxMessageTermination(10)
# 创建 GraphFlow 实例
flow = GraphFlow(
participants=builder.get_participants(),
graph=graph,
termination_condition=termination_condition, # 传入终止条件
)
from autogen_agentchat.ui import Console
async def main():
await Console(flow.run_stream(task="请提出三种解决内卷的方法。"))
import asyncio
if __name__ == "__main__":
asyncio.run(main())
6. 上下文隔离优化
- 消息过滤器:使用MessageFilterAgent为总结者过滤消息,仅保留关键信息。
- 执行流与信息流解耦:GraphFlow实现执行流与信息流解耦,提升系统灵活性和效率。
# 在上面的示例中我们加入隔离优化
# 纠创建“总结者”智能体
#summarizer = AssistantAgent(
# name="summarizer",
# model_client=model_client
# model_client_strean=True,
# system_message="你是一名秘书,负责总结用户的初始请求和评审者最终批准的想法。",
#)
# 导入 MessageFilterAgent 相关的类
from autogen_agentchat.agents import MessageFilterAgent, MessageFilterConfig, PerSourceFilter
# 创建一个核心的总结者智能体
summarizer_core = AssistantAgent(
name="summary", # 注意,名字现在是 "summary"
model_client=model_client,
model_client_stream=True,
system_message="你是一名秘书,负责总结用户的初始请求和评审者最终批准的想法。",
)
# 使用 MessageFilterAgent 包装核心总结者
summarizer = MessageFilterAgent(
name="summary", # 包装器的名字与核心智能体保持一致
wrapped_agent=summarizer_core, # 指定被包装的智能体
filter=MessageFilterConfig(
per_source=[PerSourceFilter(source="reviewer", position="last", count=1)]
),
)
四、综合实践与展望
1. 三种模式对比总结
- Swarm模式:Swarm模式强调去中心化灵活移交,适合任务流转灵活的场景。
- Magentic-One:Magentic-One聚焦通用任务解决与专家协作,适用于复杂任务处理。
- GraphFlow:GraphFlow提供确定性流程编排,适合企业级复杂流程。
2. 安全与治理最佳实践
- 通用安全准则:隔离环境、人工审批、敏感数据隔离等是跨模式通用的安全准则。
- 平衡能力与安全:在释放AI能力的同时,需严格遵守安全准则,确保可信AI系统落地。
3. 性能与成本优化策略
- 上下文管理:通过MessageFilter减少Token消耗,优化性能与成本。
- 并行执行:利用GraphFlow并行加速任务处理,提升效率。
- 模型选择:合理选择模型规格,平衡性能与成本。
4. 未来演进方向展望
- 专家智能体库:未来将提供更丰富的专家智能体库,拓展系统能力。
- 云原生集成:与云原生深度集成,提升系统可扩展性。
- 可视化流程设计器:开发可视化流程设计器,降低编排门槛。
- 强化学习驱动:引入强化学习驱动的动态编排,提升系统智能水平。