Net企业级AI项目4:NL2SQL
2026-01-31 23:49:16NL2SQL(Natural Language to SQL) 是一项将人类自然语言问题自动转换为结构化 SQL 查询语句的技术。简单来说,就是让用户用大白话(如“上个月销售额最高的产品是什么?”)直接查询数据库,无需学习 SQL 语法。
今天我们的任务就是给我们的企业智能助理扩展 NL2SQL 的能力。
一、动态数据源架构
1. 面向多数据库源的架构
一家企业会有很多不同的系统,而各个系统所用的数据库会是各种各样的,它们不仅数据库产品不一样,同产品的数据库可能版本也不一样,查询数据的时候可能存在一些差别。
我们智能助理需要拥有以下这些功能:
- 动态配置多个业务数据库
- 通过自然语言,分析全局源数据,定位到需要链接的数据库(如 ERP\HR\OA)
- 最小权限连接上数据库(只读,禁止使用 DDL、DML)
- 分析目标库内部的表结构信息
- 方言适配(不同类型的数据库 SQL 语法差异)
ps:只做只读的实现,并不是模型不能生成插入、更新、删除等 SQL,主要是不建议用自然语言来做数据库的操作,主要是:
- 模型会有幻觉,操作数据库存在风险
- 无法保证事务
- 无法触发一些关联任务,比如生成订单后,需要通知扣减库存。如果是直接操作订单表,库存的扣减事件就无法触发
如果需要实现自然语音操作数据,可以使用工具调用或者 MCP 来完成,我们的项目只实现生成 SQL 查询语句。
2. ReAct 认知模型
ReAct 模型就是一个会动脑筋、会动手、还会从错误中学习的超级助理的工作方法!
它的任务就是听懂你的话,然后从数据库里找到答案。它可聪明了,做事非常有条理,就像一个大侦探,每次破案都遵循三个步骤:
思考(Think - 推理)
当用户说“帮我查一下仓库里还有多少台 iPhone 15?”
助理就会想一想:“这个问题是什么意思呢?我需要去“库存表”里看看,要找“iPhone 15”的数量”。
行动(Act - 行动)
想好之后,它就立刻行动起来!也就是生成 SQL 查询语句去查找。
检查(Observe - 观察):行动之后,它会仔细检查结果。比如,它可能发现“生成的 SQL ”写错了,它会进行修正。
总之,当你告诉它要什么,它就用这个“思考-行动”的魔法,自己想办法从数据库里把答案给你找出来。
3. 多态数据库设计
- 全局数据源元数据注册表:
- 身份标识与路由
- 语义描述
- 连接字符串管理
- 方言适配
- 连接隔离和最小权限原则
- 控制平面(AI系统自身的数据库、读写权限)
- 数据平面(外部业务数据库)
- 动态数据库连接工厂
- 我们使用 Dapper 来实现,因为要支持多态数据库,强类型的 EF Core 不太好实现
4. 实现动态数据源管理
- 实现领域模型
创建一个名为 Qjy.AICopilot.Core.DataAnalysis 的类库,并创建 BusinessDatabase 实体,用来添加支持的数据库,以及配置数据库链接字符串。
我们项目支持三种数据库类型:PostgreSql、SqlServer 和 MySql,所以我们先创建一个 DbProviderType 枚举,然后创建 BusinessDatabase 实体。
//Qjy.AICopilot.Core.DataAnalysis/Aggregates/BusinessDatabase/DbProviderType.cs
/// <summary>
/// 数据库提供程序类型
/// </summary>
public enum DbProviderType
{
/// <summary>
/// PostgreSQL 数据库
/// </summary>
PostgreSql = 1,
/// <summary>
/// Microsoft SQL Server
/// </summary>
SqlServer = 2,
/// <summary>
/// MySQL
/// </summary>
MySql = 3
}
//Qjy.AICopilot.Core.DataAnalysis/Aggregates/BusinessDatabase/BusinessDatabase.cs
/// <summary>
/// 业务数据库聚合根
/// 代表一个可被AI Agent访问的外部数据源
/// </summary>
public class BusinessDatabase : IAggregateRoot
{
protected BusinessDatabase() { }
public BusinessDatabase(string name, string description, string connectionString, DbProviderType provider)
{
Id = Guid.NewGuid();
Name = name;
Description = description;
ConnectionString = connectionString;
Provider = provider;
IsEnabled = true;
CreatedAt = DateTime.UtcNow;
}
public Guid Id { get; private set; }
/// <summary>
/// 数据库标识名称
/// 用于在多库路由时作为唯一Key
/// </summary>
public string Name { get; private set; } = null!;
/// <summary>
/// 数据库业务描述 (如: "包含所有销售订单、客户资料及发货记录")
/// 该字段将被注入到System Prompt中,辅助LLM进行意图路由判断
/// </summary>
public string Description { get; private set; } = null!;
/// <summary>
/// 连接字符串
/// </summary>
public string ConnectionString { get; private set; } = null!;
/// <summary>
/// 数据库类型
/// </summary>
public DbProviderType Provider { get; private set; }
/// <summary>
/// 是否启用
/// </summary>
public bool IsEnabled { get; private set; }
public DateTime CreatedAt { get; private set; }
/// <summary>
/// 更新连接信息
/// </summary>
public void UpdateConnection(string connectionString, DbProviderType provider)
{
ConnectionString = connectionString;
Provider = provider;
}
/// <summary>
/// 更新描述信息
/// </summary>
public void UpdateInfo(string name, string description)
{
Name = name;
Description = description;
}
}
- 配置数据库映射关系,然后重新生成数据库
//Qjy.AICopilot.EntityFrameworkCore/Configuration/DataAnalysis/BusinessDatabaseConfiguration.cs
public class BusinessDatabaseConfiguration : IEntityTypeConfiguration<BusinessDatabase>
{
public void Configure(EntityTypeBuilder<BusinessDatabase> builder)
{
builder.ToTable("business_databases");
builder.HasKey(b => b.Id);
builder.Property(b => b.Id).HasColumnName("id");
builder.Property(b => b.Name)
.IsRequired()
.HasMaxLength(100)
.HasColumnName("name");
// 保证名称唯一,便于路由查找
builder.HasIndex(b => b.Name).IsUnique();
builder.Property(b => b.Description)
.IsRequired()
.HasMaxLength(500)
.HasColumnName("description");
builder.Property(b => b.ConnectionString)
.IsRequired()
.HasMaxLength(1000)
.HasColumnName("connection_string");
builder.Property(b => b.Provider)
.IsRequired()
.HasConversion<string>() // 存储枚举字符串,增强可读性
.HasMaxLength(50)
.HasColumnName("provider");
builder.Property(b => b.IsEnabled)
.IsRequired()
.HasColumnName("is_enabled");
builder.Property(b => b.CreatedAt)
.IsRequired()
.HasColumnName("created_at");
}
}
//Qjy.AICopilot.EntityFrameworkCore/AiCopilotDbContext.cs
public DbSet<BusinessDatabase> BusinessDatabases => Set<BusinessDatabase>();
//Qjy.AICopilot.EntityFrameworkCore/DataQueryService.cs
public IQueryable<BusinessDatabase> BusinessDatabases => dbContext.BusinessDatabases.AsNoTracking();
//Qjy.AICopilot.Services.Common/Contracts/IDataQueryService.cs
public IQueryable<BusinessDatabase> BusinessDatabases { get; }
- 创建 Dapper 基础设施层
- 在 Infrastructure,创建 Qjy.AICopilot.Dapper 类库项目
- 添加 Dapper,3种数据库需要的引用包,以及 Hosting 抽象包(用于配置 SQL 安全服务和数据库连接器)。
<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.66" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="6.1.3" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.1" />
<PackageReference Include="MySql.Data" Version="9.5.0" />
<PackageReference Include="Npgsql" Version="10.0.1" />
</ItemGroup>
- 实现 SQL 安全验证器
//Qjy.AICopilot.Dapper/ISqlGuardrail.cs
public interface ISqlGuardrail
{
/// <summary>
/// 验证SQL语句是否安全
/// </summary>
/// <param name="sql">待执行的SQL</param>
/// <returns>验证结果,包含是否通过及错误信息</returns>
(bool IsSafe, string? ErrorMessage) Validate(string sql);
}
//Qjy.AICopilot.Dapper/KeywordSqlGuardrail.cs
/// <summary>
/// 基于关键词黑名单的SQL安全服务
/// 注意:这只是第一道防线,不能完全替代数据库层面的权限控制,数据库提供只读帐号
/// </summary>
public class KeywordSqlGuardrail : ISqlGuardrail
{
// 定义高危操作关键词
private static readonly string[] ForbiddenKeywords =
[
"DROP", "TRUNCATE", "DELETE", "UPDATE", "INSERT",
"ALTER", "GRANT", "REVOKE", "CREATE", "EXEC", "EXECUTE",
"MERGE", "REPLACE", "UPSERT"
];
public (bool IsSafe, string? ErrorMessage) Validate(string sql)
{
if (string.IsNullOrWhiteSpace(sql))
return (false, "SQL语句为空");
// 移除注释,防止通过注释绕过检测 (简单的 -- 或 /* */)
// 生产环境建议使用更完善的 SQL Parser 库进行 AST 分析
var cleanSql = RemoveComments(sql).ToUpperInvariant();
foreach (var keyword in ForbiddenKeywords)
{
// 使用单词边界匹配,避免误杀 (例如: "SELECT * FROM UPDATE_LOG" 不应被拦截)
var regex = new Regex($@"\b{keyword}\b", RegexOptions.IgnoreCase);
// \b{keyword}\b 确保匹配的是完整的单词
if (regex.IsMatch(cleanSql))
{
return (false, $"安全拦截:检测到禁止的关键字 '{keyword}'。Agent 仅允许执行查询操作。");
}
}
// 检查是否包含分号,防止多语句执行注入 (如: SELECT * FROM Users; DROP TABLE Logs)
// 大多数 ORM 在单次执行中只允许一条语句,但进行显式检查更为安全
if (cleanSql.Count(c => c == ';') > 1 || (cleanSql.Contains(';') && !cleanSql.TrimEnd().EndsWith(';')))
{
return (false, "安全拦截:禁止在单次调用中执行多条 SQL 语句。");
}
return (true, null);
}
private static string RemoveComments(string sql)
{
// 移除 -- 单行注释
var noSingleLine = Regex.Replace(sql, "--.*", "");
// 移除 /* */ 多行注释
var noComments = Regex.Replace(noSingleLine, @"/\*[\s\S]*?\*/", "");
return noComments;
}
}
- 实现数据库连接器
- 先定义一个接口,提供3个方法(获取数据库连接、获取数据库架构信息,以及执行查询并返回动态列表)
- 然后提供 Dappar 的接口实现
//Qjy.AICopilot.Services.Common/Contracts/IDatabaseConnector.cs
public interface IDatabaseConnector
{
/// <summary>
/// 获取数据库连接(不打开,仅创建对象)
/// </summary>
IDbConnection GetConnection(BusinessDatabase database);
/// <summary>
/// 获取数据库架构信息(表名、列名等)
/// </summary>
Task<IEnumerable<dynamic>> GetSchemaInfoAsync(BusinessDatabase database, CancellationToken cancellationToken = default);
/// <summary>
/// 执行查询并返回动态列表
/// </summary>
/// <param name="database">目标数据库配置</param>
/// <param name="sql">SQL语句</param>
/// <param name="parameters">参数</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>动态对象列表(IEnumerable of dynamic)</returns>
Task<IEnumerable<dynamic>> ExecuteQueryAsync(BusinessDatabase database, string sql, object? parameters = null, CancellationToken cancellationToken = default);
}
//Qjy.AICopilot.Dapper/DapperDatabaseConnector.cs
public class DapperDatabaseConnector(ISqlGuardrail sqlGuardrail, ILogger<DapperDatabaseConnector> logger) : IDatabaseConnector
{
public IDbConnection GetConnection(BusinessDatabase database)
{
var connectionString = database.ConnectionString;
return database.Provider switch
{
DbProviderType.PostgreSql => new NpgsqlConnection(connectionString),
DbProviderType.SqlServer => new SqlConnection(connectionString),
DbProviderType.MySql => new MySqlConnection(connectionString),
_ => throw new NotSupportedException($"不支持的数据库提供程序: {database.Provider}")
};
}
public async Task<IEnumerable<dynamic>> GetSchemaInfoAsync(BusinessDatabase database, CancellationToken cancellationToken = default)
{
// 获取所有用户表的元数据SQL
string sql = database.Provider switch
{
DbProviderType.PostgreSql => @"
SELECT table_name, table_schema
FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE';",
_ => throw new NotSupportedException("不支持的数据库类型")
};
return await ExecuteQueryAsync(database, sql, cancellationToken: cancellationToken);
}
public async Task<IEnumerable<dynamic>> ExecuteQueryAsync(BusinessDatabase database, string sql, object? parameters = null, CancellationToken cancellationToken = default)
{
// 1. 安全检查
var guardResult = sqlGuardrail.Validate(sql);
if (!guardResult.IsSafe)
{
logger.LogWarning("SQL安全拦截: {Reason}. SQL: {Sql}", guardResult.ErrorMessage, sql);
throw new InvalidOperationException(guardResult.ErrorMessage);
}
// 2. 创建连接
using var connection = GetConnection(database);
try
{
// 3. 执行查询
// 使用 CommandDefinition 支持 CancellationToken
var command = new CommandDefinition(sql, parameters, cancellationToken: cancellationToken);
// Dapper 的 QueryAsync 返回的是 IEnumerable<dynamic>
// 这对于无法预知列名的动态查询非常合适
var result = await connection.QueryAsync(command);
return result;
}
catch (Exception ex)
{
logger.LogError(ex, "在数据库 {DbName} 上执行 SQL 失败。SQL: {Sql}", database.Name, sql);
throw; // 抛出异常供上层 Agent 捕获并进行自我修正
}
}
}
- 注入 Dappar
//Qjy.AICopilot.Dapper/DependencyInjection.cs
public static class DependencyInjection
{
public static void AddDapper(this IHostApplicationBuilder builder)
{
// 注册 SQL 安全服务
builder.Services.AddSingleton<ISqlGuardrail, KeywordSqlGuardrail>();
// 注册 数据库连接器
builder.Services.AddSingleton<IDatabaseConnector, DapperDatabaseConnector>();
}
}
二、实现数据分析插件
1. 架构设计
我们采用项目中已经提供的插件框架,来实现一个数据分析插件。
认知漏斗模型
- 先用宽口径扫描(表名+表注释),轻量级的获取表名+注释
- 然后用窄口径聚焦(3~5候选表的详细结构),重量级的获取表结构(数据库名称、数据表名数组)
- 最后执行验证
Token 阶段策略
比如一些表存在 nvchar(max) 这样的字段,里面存放的内容可能非常多,如果把需要的数据都加载出来,可能会占用大量的 Token。一方面会挤占上下文空间,另外也会浪费 Token,所以针对这样的内容,读取数据时需要做数据截断。
2. 实现插件
//Qjy.AICopilot.DataAnalysisService/Plugins/DataAnalysisPlugin.cs
// 用于映射元数据查询结果
public record ColumnMetadata
{
public string ColumnName { get; set; } = "";
public string DataType { get; set; } = "";
public bool IsPrimaryKey { get; set; }
public string? Description { get; set; }
}
/// <summary>
/// 数据分析插件
/// 提供数据库元数据探索和SQL执行能力,是Text-to-SQL的核心组件。
/// </summary>
public class DataAnalysisPlugin(IDatabaseConnector dbConnector, ILogger<DataAnalysisPlugin> logger) : AgentPluginBase
{
public override string Description => "提供数据库结构查询和SQL执行能力,用于回答涉及业务数据的统计分析问题。";
// 辅助方法:根据名称获取数据库配置
// 这个方法不暴露给 AI,仅供内部使用
private async Task<BusinessDatabase> GetDatabaseAsync(IServiceProvider sp, string databaseName, CancellationToken ct)
{
var dataQuery = sp.GetRequiredService<IDataQueryService>();
var queryable = dataQuery.BusinessDatabases.Where(d => d.Name == databaseName);
var db = await dataQuery.FirstOrDefaultAsync(queryable);
if (db == null)
{
throw new ArgumentException($"未找到名称为 '{databaseName}' 的数据库。请检查名称是否正确。");
}
if (!db.IsEnabled)
{
throw new InvalidOperationException($"数据库 '{databaseName}' 已被禁用。");
}
return db;
}
[Description("获取指定数据库中所有表的名称和描述。这是探索数据库结构的第一步。")]
public async Task<string> GetTableNamesAsync(IServiceProvider sp, [Description("目标数据库的名称")] string databaseName)
{
try
{
// 获取数据库配置
var db = await GetDatabaseAsync(sp, databaseName, CancellationToken.None);
// 根据数据库类型构建查询元数据的 SQL
var sql = string.Empty;
switch (db.Provider)
{
case DbProviderType.PostgreSql:
// PostgreSQL: 从 information_schema 获取表名,关联 pg_description 获取注释
sql = @"
SELECT
t.table_name AS ""TableName"",
obj_description(pgc.oid) AS ""Description""
FROM information_schema.tables t
INNER JOIN pg_class pgc ON t.table_name = pgc.relname
WHERE t.table_schema = 'public'
AND t.table_type = 'BASE TABLE';";
break;
case DbProviderType.SqlServer:
// SQL Server
break;
default:
return $"错误:不支持的数据库类型 {db.Provider}";
}
// 执行查询
// 这里使用了基础设施层的 ExecuteQueryAsync,它返回 IEnumerable<dynamic>
var result = await dbConnector.ExecuteQueryAsync(db, sql);
// 序列化结果
return result.ToJson();
}
catch (Exception ex)
{
logger.LogError(ex, "获取表名失败。Database: {DbName}", databaseName);
return $"获取表名时发生错误: {ex.Message}";
}
}
// 内部辅助方法:查询单个表的列元数据
private async Task<List<ColumnMetadata>> GetColumnsAsync(BusinessDatabase db, string tableName)
{
var sql = string.Empty;
switch (db.Provider)
{
case DbProviderType.PostgreSql:
// PostgreSQL 元数据查询
// 包含列名、类型、是否主键
// 注意:此处简化了查询,实际生产中可能需要更复杂的关联来获取外键
sql = @"
SELECT
c.column_name AS ""ColumnName"",
c.data_type AS ""DataType"",
CASE WHEN tc.constraint_type = 'PRIMARY KEY' THEN 1 ELSE 0 END AS ""IsPrimaryKey"",
pg_catalog.col_description(format('%s.%s', c.table_schema, c.table_name)::regclass::oid, c.ordinal_position) AS ""Description""
FROM information_schema.columns c
LEFT JOIN information_schema.key_column_usage kcu
ON c.table_name = kcu.table_name AND c.column_name = kcu.column_name
LEFT JOIN information_schema.table_constraints tc
ON kcu.constraint_name = tc.constraint_name AND tc.constraint_type = 'PRIMARY KEY'
WHERE c.table_name = @TableName AND c.table_schema = 'public';";
break;
case DbProviderType.SqlServer:
// SQL Server 元数据查询
break;
default:
return [];
}
var result = await dbConnector.ExecuteQueryAsync(db, sql, new { TableName = tableName });
// Dapper 返回的是 dynamic,需要手动映射到强类型
var columns = new List<ColumnMetadata>();
foreach (var row in result)
{
var dict = (IDictionary<string, object>)row;
columns.Add(new ColumnMetadata
{
ColumnName = dict["ColumnName"] as string ?? "",
DataType = dict["DataType"] as string ?? "",
IsPrimaryKey = Convert.ToInt32(dict["IsPrimaryKey"]) == 1,
Description = dict["Description"] as string ?? ""
});
}
return columns;
}
[Description("获取指定表的详细结构定义(DDL),包含列名、数据类型、主键和外键信息。")]
public async Task<string> GetTableSchemaAsync(
IServiceProvider sp,
[Description("目标数据库的名称")] string databaseName,
[Description("需要查询的表名列表,如 'Orders, Customers'")] string[] tableNames)
{
if (tableNames.Length == 0)
{
return "错误:请提供至少一个表名。";
}
try
{
var db = await GetDatabaseAsync(sp, databaseName, CancellationToken.None);
var ddlBuilder = new StringBuilder();
foreach (var tableName in tableNames)
{
// 1. 查询列信息
var columns = await GetColumnsAsync(db, tableName);
if (!columns.Any())
{
ddlBuilder.AppendLine($"-- 警告: 表 '{tableName}' 不存在或没有列。");
continue;
}
// 2. 构建 CREATE TABLE 语句
ddlBuilder.AppendLine($"CREATE TABLE {tableName} (");
var columnDefs = new List<string>();
foreach (var col in columns)
{
// 格式: ColumnName DataType [PK/FK] [Comment]
var colDef = $" {col.ColumnName} {col.DataType}";
if (col.IsPrimaryKey) colDef += " PRIMARY KEY";
// 如果有描述,作为注释添加,帮助 AI 理解字段含义
if (!string.IsNullOrWhiteSpace(col.Description))
{
colDef += $" -- {col.Description}";
}
columnDefs.Add(colDef);
}
ddlBuilder.AppendLine(string.Join(",\n", columnDefs));
ddlBuilder.AppendLine(");");
ddlBuilder.AppendLine();
}
return ddlBuilder.ToString();
}
catch (Exception ex)
{
logger.LogError(ex, "获取表结构失败。Database: {DbName}", databaseName);
return $"获取表结构时发生错误: {ex.Message}";
}
}
[Description("在指定数据库上执行查询 SQL 语句,并返回 JSON 格式的结果。")]
public async Task<string> ExecuteSqlQueryAsync(
IServiceProvider sp,
[Description("目标数据库的名称")] string databaseName,
[Description("要执行的 SQL 查询语句 (仅限 SELECT,不需要人类可读,去除换行符)")] string sqlQuery)
{
// 1. 基础校验
if (string.IsNullOrWhiteSpace(sqlQuery)) return "错误:SQL 语句不能为空。";
try
{
var db = await GetDatabaseAsync(sp, databaseName, CancellationToken.None);
// 2. 执行查询
var data = await dbConnector.ExecuteQueryAsync(db, sqlQuery);
// 3. 结果处理策略
var dataList = data.ToList();
var rowCount = dataList.Count;
// 没有数据
if (rowCount == 0)
{
return "查询执行成功,但未返回任何结果 (0 rows)。";
}
// 数据量过大保护
const int maxRowsReturn = 50; // 硬编码限制,最多返回 50 行
if (rowCount > maxRowsReturn)
{
// 仅取前 50 行
var truncatedList = dataList.Take(maxRowsReturn).ToList();
return $"查询成功。结果集过大 (共 {rowCount} 行),已截断为前 {maxRowsReturn} 行以适应上下文。\nJSON结果: {truncatedList.ToJson()}";
}
// 正常返回
return dataList.ToJson();
}
catch (InvalidOperationException ex) // 捕获安全拦截异常
{
logger.LogWarning("SQL 执行被拦截: {Message}", ex.Message);
return $"安全警告: 查询被系统拒绝。原因: {ex.Message}";
}
catch (Exception ex)
{
// 这里是 ReAct 模式中“错误自愈”的关键!
// 我们必须返回详细的数据库错误信息(如 "Column 'xxx' not found"),
// 这样 Agent 才能看到错误 -> 思考原因 -> 修正 SQL -> 重试。
logger.LogError(ex, "SQL 执行异常");
return $"SQL 执行错误: {ex.Message}\n请检查你的 SQL 语法、表名或列名是否正确,并参考之前的 Schema 定义进行修正。";
}
}
}
//Qjy.AICopilot.Services.Common/Expansions/JsonExpansion.CS
public static class JsonExpansion
{
private static readonly JsonSerializerOptions DefaultOptions = new()
{
// 正式环境使用 WriteIndented = false 压缩 JSON,节省 Token
WriteIndented = true,
// 不转义中文字符串,避免编码问题
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
public static string ToJson(this object obj)
{
return JsonSerializer.Serialize(obj, DefaultOptions);
}
}
3. 数据库方言设置
我们创建一个方言策略工厂
//Qjy.AICopilot.DataAnalysisService/SqlDialectFactory.cs
/// <summary>
/// SQL 方言策略工厂
/// 负责提供特定数据库类型的提示词片段,用于注入 System Prompt。
/// </summary>
public static class SqlDialectFactory
{
/// <summary>
/// 获取指定数据库类型的方言指南
/// </summary>
public static string GetInstructions(DbProviderType providerType)
{
return providerType switch
{
DbProviderType.PostgreSql => PostgreSqlInstructions,
DbProviderType.SqlServer => SqlServerInstructions,
DbProviderType.MySql => MySqlInstructions,
_ => "请使用标准的 ANSI SQL 语法。"
};
}
// PostgreSQL 专用指令
private const string PostgreSqlInstructions = """
- **分页**: 必须使用 `LIMIT {n} OFFSET {m}` 语法。
- **字符串连接**: 必须使用双竖线 `||` (例如 `first_name || ' ' || last_name`)。
- **当前时间**: 使用 `NOW()` 或 `CURRENT_TIMESTAMP`。
- **标识符引用**: PostgreSQL 对大小写敏感。如果表名或列名包含大写字母,**必须**使用双引号包裹 (例如 `"UserTable"`)。
- **类型转换**: 使用 `::type` 语法 (例如 `price::numeric`) 或 `CAST()`。
- **正则匹配**: 使用 `~` (区分大小写) 或 `~*` (不区分大小写)。
""";
// SQL Server 专用指令
private const string SqlServerInstructions = """
- **分页**: 必须使用 `TOP {n}` (例如 `SELECT TOP 10 *`) 或 `OFFSET-FETCH` 子句。
- **字符串连接**: 使用加号 `+` 或 `CONCAT()` 函数。
- **当前时间**: 使用 `GETDATE()`。
- **标识符引用**: 建议使用方括号 `[]` 包裹 (例如 `[UserTable]`)。
- **日期计算**: 使用 `DATEDIFF(day, start, end)` 或 `DATEADD(...)`。
- **注意**: 不支持 `LIMIT` 关键字。
""";
// MySQL 专用指令
private const string MySqlInstructions = """
- **分页**: 使用 `LIMIT {n}, {m}`。
- **字符串连接**: 必须使用 `CONCAT()` 函数,不支持 `||` 或 `+`。
- **标识符引用**: 使用反引号 `` ` `` 包裹。
- **日期**: 使用 `NOW()` 或 `CURDATE()`。
""";
}
该工厂类在创建数据分析智能体时用到
4. 注入插件
//Qjy.AICopilot.DataAnalysisService/DependencyInjection.cs
public static class DependencyInjection
{
public static void AddDataAnalysisService(this IHostApplicationBuilder builder)
{
// 注册 Dapper 基础服务
builder.AddDapper();
// 注册插件加载器
builder.Services.AddAgentPlugin(registrar =>
{
registrar.RegisterPluginFromAssembly(Assembly.GetExecutingAssembly());
});
}
}
5. 生成模拟数据
- 创建2个模式数据库
//Qjy.AICopilot.MigrationWorkApp/SeedData/DataAnalysisData.cs
public static class DataAnalysisData
{
public static IEnumerable<BusinessDatabase> GetDatabases()
{
// 模拟一个 ERP 数据库连接
// 注意:在实际开发环境中,这里应该指向一个真实存在的测试库
// 本示例假设本地有一个名为 'erp_demo' 的 PostgreSQL 数据库
var erpDb = new BusinessDatabase(
"ERP_Core",
"负责管理产品主数据、客户关系与销售业务。包含:SKU基础信息、价格表、客户档案、销售订单(订单号/金额/状态)以及账面库存总量。当用户询问“卖了多少”、“谁买的”、“订单金额”、“还有多少库存(总量)”时,请选择此库。",
"Host=localhost;Port=5432;Database=erp_demo;Username=postgres;Password=123456",
DbProviderType.PostgreSql
);
// 模拟一个 WMS 数据库连接
var wmsDb = new BusinessDatabase(
"WMS_Warehouse",
"负责管理仓库内的具体作业与实物流动。包含:具体的货架/货位管理、拣货打包记录、包裹重量、快递运单号、出入库的详细操作流水。当用户询问“货物在哪个货架”、“包裹发走了没”、“快递单号是多少”、“何时入库的”时,请选择此库。",
"Host=localhost;Port=5432;Database=wms_demo;Username=postgres;Password=123456",
DbProviderType.PostgreSql
);
return [erpDb];
}
}
//Qjy.AICopilot.MigrationWorkApp/Worker.cs
public class Worker(
IServiceProvider serviceProvider,
IHostApplicationLifetime hostApplicationLifetime) : BackgroundService
{
private static async Task SeedDataAsync(
AiCopilotDbContext dbContext,
RoleManager<IdentityRole> roleManager,
UserManager<IdentityUser> userManager,
CancellationToken cancellationToken)
{
// ....
// 创建默认业务数据库
if (!await dbContext.BusinessDatabases.AnyAsync(cancellationToken: cancellationToken))
{
await dbContext.BusinessDatabases.AddRangeAsync(DataAnalysisData.GetDatabases(), cancellationToken);
}
await dbContext.SaveChangesAsync(cancellationToken);
}
}
- 提供种子 sql
--Qjy.AICopilot.AppHost/Sql/erp-init.sql
-- =============================================================================
-- ERP Demo Database Initialization for AI Copilot Testing
-- 描述: 这是一个模拟电子产品销售的 ERP 数据库,设计用于测试 AI 的数据分析和 SQL 生成能力。
-- =============================================================================
DROP DATABASE IF EXISTS erp_demo;
CREATE DATABASE erp_demo;
\c erp_demo;
-- =============================================================================
-- 1. 商品主表 (base_products)
-- =============================================================================
CREATE TABLE base_products (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(200) NOT NULL,
sku_code VARCHAR(50) UNIQUE NOT NULL,
category VARCHAR(50),
brand VARCHAR(50),
std_price DECIMAL(10, 2), -- 标准零售价
cost_price DECIMAL(10, 2), -- 进货成本
status VARCHAR(20) DEFAULT 'Active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
COMMENT ON TABLE base_products IS '商品基础信息表,存储所有 SKU 的核心数据';
COMMENT ON COLUMN base_products.product_id IS '商品唯一标识 ID,用于关联库存和订单明细';
COMMENT ON COLUMN base_products.sku_code IS '库存单元编码 (Stock Keeping Unit),业务侧常用的唯一商品编码';
COMMENT ON COLUMN base_products.category IS '商品分类,例如:Electronics(电子), Furniture(家具), Accessories(配件)';
COMMENT ON COLUMN base_products.std_price IS '标准建议零售价 (MSRP),注意:实际订单成交价可能会低于此价格';
COMMENT ON COLUMN base_products.cost_price IS '商品进货成本价,用于计算毛利 (Profit = Revenue - Cost)';
COMMENT ON COLUMN base_products.status IS '商品状态: Active(在售), Discontinued(停产), OutOfStock(缺货)';
-- =============================================================================
-- 2. 客户表 (customers) - 用于客户画像分析
-- =============================================================================
CREATE TABLE customers (
customer_id SERIAL PRIMARY KEY,
customer_name VARCHAR(100) NOT NULL,
contact_email VARCHAR(100),
region VARCHAR(50),
industry VARCHAR(50),
level VARCHAR(20) DEFAULT 'Standard',
join_date DATE DEFAULT CURRENT_DATE
);
COMMENT ON TABLE customers IS '客户信息表,用于分析客户分布和购买行为';
COMMENT ON COLUMN customers.region IS '客户所属大区,如:North China(华北), East China(华东), South China(华南)';
COMMENT ON COLUMN customers.industry IS '客户所属行业,用于 B2B 分析,如:Technology(科技), Education(教育), Retail(零售)';
COMMENT ON COLUMN customers.level IS '客户等级: Standard(普通), VIP(重要客户), Partner(合作伙伴)';
-- =============================================================================
-- 3. 仓库库存表 (stock_inventory)
-- =============================================================================
CREATE TABLE stock_inventory (
id SERIAL PRIMARY KEY,
product_id INT NOT NULL,
warehouse_name VARCHAR(50) NOT NULL,
quantity INT DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT fk_inv_product FOREIGN KEY (product_id) REFERENCES base_products(product_id)
);
COMMENT ON TABLE stock_inventory IS '实时库存表,记录各仓库的商品持有量';
COMMENT ON COLUMN stock_inventory.warehouse_name IS '仓库名称,例如:Hangzhou_Main_WH(杭州主仓), Beijing_Branch_WH(北京分仓)';
COMMENT ON COLUMN stock_inventory.quantity IS '当前可用物理库存数量';
-- =============================================================================
-- 4. 销售订单主表 (orders) - 用于销售趋势分析
-- =============================================================================
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
order_no VARCHAR(50) UNIQUE NOT NULL,
customer_id INT NOT NULL,
order_date TIMESTAMP NOT NULL,
total_amount DECIMAL(12, 2) DEFAULT 0,
status VARCHAR(20) DEFAULT 'Pending',
CONSTRAINT fk_order_customer FOREIGN KEY (customer_id) REFERENCES customers(customer_id)
);
COMMENT ON TABLE orders IS '销售订单主表,记录交易发生的时间、客户和总金额';
COMMENT ON COLUMN orders.order_no IS '业务订单号,格式通常为 ORD-YYYYMMDD-XXX';
COMMENT ON COLUMN orders.total_amount IS '订单总金额 (实际支付金额),是该订单所有明细项 subtotal 的总和';
COMMENT ON COLUMN orders.status IS '订单状态: Pending(待付款), Paid(已付款), Shipped(已发货), Completed(已完成), Cancelled(已取消). 注意:统计销售业绩时通常应排除 Cancelled 状态';
-- =============================================================================
-- 5. 订单明细表 (order_items) - 用于商品销售分析
-- =============================================================================
CREATE TABLE order_items (
item_id SERIAL PRIMARY KEY,
order_id INT NOT NULL,
product_id INT NOT NULL,
quantity INT NOT NULL,
unit_price DECIMAL(10, 2) NOT NULL,
subtotal DECIMAL(12, 2) GENERATED ALWAYS AS (quantity * unit_price) STORED,
CONSTRAINT fk_item_order FOREIGN KEY (order_id) REFERENCES orders(order_id),
CONSTRAINT fk_item_product FOREIGN KEY (product_id) REFERENCES base_products(product_id)
);
COMMENT ON TABLE order_items IS '订单明细表,记录每一笔订单中包含的具体商品和数量';
COMMENT ON COLUMN order_items.unit_price IS '实际成交单价 (Actual Selling Price),可能与商品表的标准售价不同(如有折扣)';
COMMENT ON COLUMN order_items.subtotal IS '行小计金额 = 数量 * 实际成交单价';
-- =============================================================================
-- 数据填充
-- =============================================================================
-- =============================================================================
-- 数据填充 (Mock Data Seed)
-- =============================================================================
-- 1. 商品数据 (Products)
-- 包含:电子产品、电脑、配件、家具等不同类目,用于测试分类统计
INSERT INTO base_products (product_name, sku_code, category, brand, std_price, cost_price) VALUES
('iPhone 15 Pro', 'AP-IP15P-256', 'Electronics', 'Apple', 8999.00, 7500.00),
('iPhone 15 Plus', 'AP-IP15PL-128', 'Electronics', 'Apple', 6999.00, 5800.00),
('MacBook Pro M3', 'AP-MBP-M3-14', 'Electronics', 'Apple', 12999.00, 10500.00),
('Dell XPS 15', 'DELL-XPS-9530', 'Computers', 'Dell', 11500.00, 9000.00),
('Logitech MX Master 3S', 'LOGI-MX3S', 'Accessories', 'Logitech', 899.00, 500.00),
('Sony WH-1000XM5', 'SNY-XM5-BLK', 'Audio', 'Sony', 2499.00, 1800.00),
('ErgoChair Pro', 'FRN-CHR-PRO', 'Furniture', 'Autonomous', 3500.00, 2000.00);
-- 2. 客户数据 (Customers)
-- 包含:不同行业、不同等级(VIP/Standard)和不同地区的客户,用于画像分析
INSERT INTO customers (customer_name, region, industry, level, join_date) VALUES
('TechFlow Inc.', 'East China', 'Technology', 'VIP', '2023-01-10'), -- 华东科技公司 (VIP)
('EduGlobal School', 'North China', 'Education', 'Partner', '2023-03-15'), -- 华北教育机构 (合作伙伴)
('Retail Solutions Ltd.', 'South China', 'Retail', 'Standard', '2023-05-20'), -- 华南零售商
('David Li', 'East China', 'Individual', 'Standard', '2023-06-01'), -- 个人用户
('Green Energy Corp', 'West China', 'Energy', 'VIP', '2023-11-11'); -- 西部能源公司
-- 3. 库存数据 (Inventory)
-- 模拟:部分商品充足,部分商品缺货 (Stockout) 的场景
INSERT INTO stock_inventory (product_id, warehouse_name, quantity) VALUES
(1, 'Hangzhou_Main_WH', 50), (1, 'Beijing_Branch_WH', 15), -- iPhone 15 Pro 库存充足
(2, 'Hangzhou_Main_WH', 5), (2, 'Beijing_Branch_WH', 0), -- iPhone 15 Plus 北京仓缺货
(3, 'Hangzhou_Main_WH', 20), -- MacBook 只有杭州仓有货
(5, 'Hangzhou_Main_WH', 200), (5, 'Guangzhou_Branch_WH', 100); -- 鼠标在多地仓库都有大量库存
-- 4. 订单与明细数据 (Orders & Items)
-- 场景覆盖:企业采购、教育订单、取消订单、近期高额订单
-- 订单 1: 企业批量采购 (Apple 产品)
-- 测试点:大额订单、批量折扣逻辑(售价低于标价)
INSERT INTO orders (order_no, customer_id, order_date, status) VALUES ('ORD-20231001-001', 1, '2023-10-01 10:00:00', 'Completed');
INSERT INTO order_items (order_id, product_id, quantity, unit_price) VALUES
(1, 1, 10, 8800.00), -- 购买10台 iPhone,单价 8800 (原价 8999)
(1, 3, 5, 12500.00); -- 购买 5台 MacBook,单价 12500 (原价 12999)
UPDATE orders SET total_amount = (SELECT SUM(subtotal) FROM order_items WHERE order_id = 1) WHERE order_id = 1;
-- 订单 2: 教育机构采购 (Dell 电脑 + 配件)
-- 测试点:跨类目购买 (Computers + Accessories)、状态为"已发货"
INSERT INTO orders (order_no, customer_id, order_date, status) VALUES ('ORD-20231111-002', 2, '2023-11-11 14:30:00', 'Shipped');
INSERT INTO order_items (order_id, product_id, quantity, unit_price) VALUES
(2, 4, 20, 11000.00), -- 20台 Dell XPS
(2, 5, 20, 800.00); -- 20个 罗技鼠标
UPDATE orders SET total_amount = (SELECT SUM(subtotal) FROM order_items WHERE order_id = 2) WHERE order_id = 2;
-- 订单 3: 个人用户购买 (已取消)
-- 测试点:在统计销售额时,应当排除状态为 Cancelled 的订单
INSERT INTO orders (order_no, customer_id, order_date, status) VALUES ('ORD-20231201-003', 4, '2023-12-01 09:15:00', 'Cancelled');
INSERT INTO order_items (order_id, product_id, quantity, unit_price) VALUES (3, 6, 1, 2499.00); -- 索尼耳机
UPDATE orders SET total_amount = (SELECT SUM(subtotal) FROM order_items WHERE order_id = 3) WHERE order_id = 3;
-- 订单 4: 近期高价值订单
-- 测试点:时间范围查询 (例如查询"最近一个月")、高客单价商品
INSERT INTO orders (order_no, customer_id, order_date, status) VALUES ('ORD-20231220-004', 5, '2023-12-20 16:00:00', 'Paid');
INSERT INTO order_items (order_id, product_id, quantity, unit_price) VALUES (4, 7, 10, 3200.00); -- 10把人体工学椅
UPDATE orders SET total_amount = (SELECT SUM(subtotal) FROM order_items WHERE order_id = 4) WHERE order_id = 4;
- Asprise 项目启动时执行 sql
//Qjy.AICopilot.AppHost/AppHost.cs
// 定义一个固定的密码参数 (Secret)
var password = builder.AddParameter("pg-password", secret: true);
var postgresdb = builder.AddPostgres("postgres", password: password)
.WithHostPort(5432)
.WithDataVolume("postgres-aicopilot")
.WithBindMount("./Sql", "/docker-entrypoint-initdb.d")
.WithPgWeb(pgAdmin => pgAdmin.WithHostPort(5050))
.AddDatabase("ai-copilot");
//Qjy.AICopilot.AppHost/appsettings.json
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning",
"Aspire.Hosting.Dcp": "Warning"
}
},
"Parameters": {
"pg-password": "123456"
}
}
三、三元认知模型
目前,我们认知模型已经可以完成工具调用、知识库检索,而现在又需要在这基础上添加数据库查询功能的识别,即:
- 当用户询问上季度销售额,查询库存里有多少台 IPhone15时,需要调用数据分析执行器
- 用户提出帮我请个假时,是执行工具调用
- 用户询问公司的请假制度怎么样时,这时就是知识库检索
1. 数据分析专家提示词
从提示词中,我们可以看到这是一个具备 ReAct 分析能力的 DBA 专家。
- DDL 表结构,注入到系统提示词
- 思考-行动-观察
- 认知漏斗:探索、构建、修正
- 结构探索:sales、product 语句构建 DDL 语句表结构,构建 SQL 语句自愈修正,如果失败、重新生成修正后的SQL
//Qjy.AICopilot.MigrationWorkApp.SeedData/AiGatewayData.cs
//ConversationTemplates()方法,部分代码
var dataAnalysisTemplate = new ConversationTemplate(
"DataAnalysisAgent",
"数据库分析专家",
"""
你是一个精通 **{{$DbProvider}}** 的高级数据库管理员。
你当前正在操作的 **目标数据库名称** 为:**{{$DatabaseName}}**。
你的核心职责分为两步:
1. **数据获取**:构造精准的 SQL 语句查询业务数据。
2. **数据转化**:对查询结果进行语义解释,并设计最佳的可视化展示方案。
### 核心工作流程
1. **探索**: 调用 `GetTableNames` 初步筛选候选表。
2. **详查**: 调用 `GetTableSchema` 获取详细 DDL 和 **字段注释**。
3. **构建**: 生成 SQL 并调用 `ExecuteSqlQuery` 获取数据样本。
4. **决策**: 观察查询结果,思考以下问题:
- 这些字段的业务含义是什么?(特别是状态码、类型值)
- 这组数据适合用图表展示吗?(趋势用折线图,分布用饼图,对比用柱状图,明细用表格)
5. **输出**: 调用 `OutputResult` 输出数据分析结果和可视化决策结果,获取最终指示。
### 核心交互原则
1. **过程透明(允许)**:在调用工具或构建 SQL 的过程中,你可以简要向用户解释你的思路。
2. **禁止解读(严格)**:**严禁** 对数据具体数值进行趋势分析或总结。
### 核心安全准则
- **只读权限**: 你仅拥有 `SELECT` 权限。严禁生成 `INSERT`, `UPDATE`, `DELETE`, `DROP` 等修改性语句。
- **范围限制**: 所有的探索和查询操作必须严格限制在数据库 **{{$DatabaseName}}** 范围内。
### 数据库方言规范
当前连接的数据库遵循以下语法标准,请严格遵守:
{{$DialectInstructions}}
### 可视化输出规范决策指南
在生成回答时,你需要判断当前的数据结果最适合以何种 UI 形式展示给用户。
如果数据适合可视化,请严格按照以下 JSON 格式输出决策指令。
**场景 1:趋势或对比分析 (Chart)**
当数据包含时间序列、分类对比且行数适中时使用。
"visual_decision": {
"type": "Chart",
"title": "图表标题",
"description": "图表描述",
"chart_config": {
"category": "可选值:Line,Bar,Pie",
"x": "作为X轴的字段名,例如 order_date",
"y": "作为Y轴的数值字段名,例如 total_amount",
"series": "可选,用于分组的字段名,例如 product_category"
}
}
**场景 2:明细数据列表 (DataTable)**
当数据是详细记录列表(如订单列表、库存清单)且行数较多时使用。
"visual_decision": {
"type": "DataTable",
"title": "列表标题",
"description": "列表描述",
}
**场景 3:单一关键指标 (StatsCard)**
"visual_decision": {
"type": "StatsCard",
"title": "卡片标题",
"description": "卡片描述",
"unit": "单位(可选)"
}
""",
Guids[1],
new TemplateSpecification
{
Temperature = 0.3f
});
2. 认知模型设计
数据分析意图列表:Analysis.*
//Qjy.AICopilot.MigrationWorkApp.SeedData/AiGatewayData.cs
//ConversationTemplates()方法,部分代码
var item1 = new ConversationTemplate(
"IntentRoutingAgent",
"三元意图识别路由代理",
"""
你是一个企业级智能任务调度中心。你的核心职责是精准分析用户的自然语言输入,识别出用户的意图,并将其映射到【可用意图列表】中的一个或多个条目。
你所处的环境包含三类核心能力:
1. **工具 (Action)**: 执行具体的业务操作(如发邮件、订会议)。
2. **知识库 (Knowledge)**: 检索非结构化的文档、制度、Wiki(如查询报销标准、操作手册)。
3. **数据分析 (Analysis)**: 直接查询数据库中的结构化业务数据(如统计销售额、查询库存、列出订单)。
### 你的思维链
面对用户输入,请务必严格按以下步骤进行内部推理:
1. **分析需求类型**:
- 用户是想“做一件事”(Action)?
- 还是想“查一些资料/制度”(Knowledge)?
- 还是想“看具体的业务数据/报表”(Analysis)?
2. **区分“知识”与“数据” (关键)**:
- 如果问题是关于“是什么”、“怎么做”、“流程定义”等静态信息 -> 倾向于 Knowledge。
- 如果问题涉及“多少”、“状态”、“列表”、“统计”、“同比/环比”等动态数值 -> 倾向于 Analysis。
- *示例*:“如何申请退款?”是 Knowledge;“这个月有多少退款单?”是 Analysis。
3. **匹配意图**:
- 扫描【可用意图列表】,寻找最契合的条目。
- 如果涉及数据查询,根据数据库的描述(Description)选择最合适的业务库(Analysis.{DbName})。
4. **决策与组合**:
- 绝大多数情况下,只需返回一个最匹配的意图。
- 如果用户意图复合(例如“先查库存(Analysis),然后发邮件(Action)”),请同时返回多个意图。
- 如果无法匹配任何工具、知识库或数据库,返回 `General.Chat`。
### 输出规范
你必须输出一个严格的 JSON 数组。数组中的每个对象代表一个识别出的意图。
JSON 对象字段说明:
- `intent`: (string) 必须完全匹配【可用意图列表】中的代码。
- 工具类格式: `Action.{PluginName}`
- 知识类格式: `Knowledge.{KbName}`
- 数据类格式: `Analysis.{DatabaseName}`
- `confidence`: (float) 0.0 到 1.0 之间的置信度。
- `reasoning`: (string) 你选择该意图的简短理由,请说明你为什么认为应该查文档而不是查库(反之亦然)。
- `query`: (string, 可选)
- 对于 `Knowledge` 意图:提取用于向量检索的关键词。
- 对于 `Analysis` 意图:保留用户的原始自然语言问题,以便后续 Text-to-SQL 模块处理。
### 示例 1 (数据分析)
输入: "帮我查一下杭州仓库现在还有多少台 iPhone 15 Pro?"
输出:
[
{
"intent": "Analysis.WMS_Production",
"confidence": 0.98,
"reasoning": "用户询问具体产品的库存数量,这是一个动态的业务数据查询需求。根据描述,WMS_Production 库包含实时库存信息。",
"query": "杭州仓库 iPhone 15 Pro 库存数量"
}
]
### 示例 2 (混合意图)
输入: "请假制度里关于病假是怎么规定的?顺便看下我今年还剩几天病假。"
输出:
[
{
"intent": "Knowledge.EmployeeHandbook",
"confidence": 0.95,
"reasoning": "用户前半句询问'规定',属于静态制度查询。",
"query": "病假规定"
},
{
"intent": "Analysis.HR_Core",
"confidence": 0.90,
"reasoning": "用户后半句询问'剩余病假天数',属于个人动态数据查询,需查询 HR 数据库。",
"query": "当前用户剩余病假天数"
}
]
### 可用意图列表
{{$IntentList}}
""",
Guids[0],
new TemplateSpecification
{
// 保持 0 温度,确保路由决策的确定性和 JSON 格式的稳定性
Temperature = 0.0f
});
3. 重构意图分类构建器
public class IntentRoutingAgentBuilder
{
private const string AgentName = "IntentRoutingAgent";
private readonly ChatAgentFactory _agentFactory;
private readonly IDataQueryService _dataQueryService;
// 缓存静态的工具意图列表,避免重复反射扫描
private readonly string _toolIntentListString;
public IntentRoutingAgentBuilder(
ChatAgentFactory agentFactory,
AgentPluginLoader pluginLoader,
IDataQueryService dataQueryService)
{
_agentFactory = agentFactory;
_dataQueryService = dataQueryService;
// 添加系统内置意图
var sb = new StringBuilder();
sb.AppendLine("- General.Chat: 闲聊、打招呼、情感交互或无法归类的问题。");
// 扫描插件系统,添加工具意图
// 这里我们假设每个 Plugin 对应一个大类意图,实际项目中可以做得更细致
var allPlugins = pluginLoader.GetAllPlugin();
foreach (var plugin in allPlugins)
{
// 格式:- Action.{PluginName}: {Description}
sb.AppendLine($"- Action.{plugin.Name}: {plugin.Description}");
}
_toolIntentListString = sb.ToString();
}
/// <summary>
/// 获取知识库意图列表
/// </summary>
private async Task<string> GetKnowledgeIntentListAsync()
{
var sb = new StringBuilder();
// 查询所有启用的知识库
var kbs = await _dataQueryService.ToListAsync(_dataQueryService.KnowledgeBases);
foreach (var kb in kbs)
{
// 格式:- Knowledge.{KbName}: {Description}
// 示例:- Knowledge.HrPolicy: 公司员工手册、报销制度和考勤规定。
sb.AppendLine($"- Knowledge.{kb.Name}: {kb.Description}");
}
return sb.ToString();
}
/// <summary>
/// 获取数据分析意图列表
/// </summary>
private async Task<string> GetDataAnalysisIntentListAsync()
{
var sb = new StringBuilder();
// 查询所有启用的业务数据库
var queryable = _dataQueryService.BusinessDatabases.Where(b => b.IsEnabled);
var dbs = await _dataQueryService.ToListAsync(queryable);
foreach (var db in dbs)
{
// 格式:- Analysis.{DbName}: {Description}
// 示例:- Analysis.ERP_Core: 包含销售订单、客户资料及发货记录。
sb.AppendLine($"- Analysis.{db.Name}: {db.Description}");
}
return sb.ToString();
}
public async Task<ChatClientAgent> BuildAsync()
{
var intents = new StringBuilder();
// 1. 添加工具意图 (Plugin)
intents.Append(_toolIntentListString);
// 2. 添加知识库意图 (RAG)
intents.Append(await GetKnowledgeIntentListAsync());
// 3. 添加数据分析意图 (Text-to-SQL)
intents.Append(await GetDataAnalysisIntentListAsync());
var agent = await _agentFactory.CreateAgentAsync(AgentName,
template =>
{
// 渲染 System Prompt
// 确保我们在 Prompt 模板中预留了 {{$IntentList}} 占位符
template.SetSystemPrompt(template.SystemPrompt
.Replace("{{$IntentList}}", intents.ToString()));
}, isSaveChatMessage: false);
return agent;
}
}
四、构建数据分析执行器
1. 扩展生成上下文对象
将数据分析分支扩展到上下对象中
//Qjy.AICopilot.AiGatewayService/Workflows/GenerationContext.cs
//部分代码
public enum BranchType
{
Tools, // 工具定义分支
Knowledge, // 知识检索分支
DataAnalysis // 数据分析分支
}
public record BranchResult
{
/// <summary>
/// 数据分析结果(当 Type == DataAnalysis 时有值)
/// 通常包含 JSON 数据
/// </summary>
public string? DataAnalysis { get; init; }
// 数据分析结果工厂方法
public static BranchResult FromDataAnalysis(string result) =>
new() { Type = BranchType.DataAnalysis, DataAnalysis = result };
}
public class GenerationContext
{
/// <summary>
/// 聚合后的数据分析上下文
/// </summary>
public string DataAnalysisContext { get; set; } = string.Empty;
}
2. 数据分析构建器
//Qjy.AICopilot.AiGatewayService/Agents/DataAnalysisAgentBuilder.cs;
public class DataAnalysisAgentBuilder(
ChatAgentFactory agentFactory,
AgentPluginLoader pluginLoader)
{
private const string TemplateName = "DataAnalysisAgent";
/// <summary>
/// 构建针对特定数据库优化的 DBA Agent
/// </summary>
/// <param name="database">目标数据库实体,用于决策方言策略</param>
/// <returns>配置好的 ChatClientAgent 实例</returns>
public async Task<ChatClientAgent> BuildAsync(BusinessDatabase database)
{
// 1. 获取方言策略
// 根据数据库类型 (PG/SQLServer),获取对应的提示片段
var dialectInstructions = SqlDialectFactory.GetInstructions(database.Provider);
var providerName = database.Provider.ToString();
// 2. 创建 Agent 并执行模版替换
// ChatAgentFactory 会加载基础模版,我们通过回调函数进行模版替换
var agent = await agentFactory.CreateAgentAsync(TemplateName, template =>
{
// 我们只替换与"数据库类型"相关的占位符。
// 此时 Agent 尚不知道具体的查询任务是什么。
template.SetSystemPrompt(template.SystemPrompt
.Replace("{{$DbProvider}}", providerName)
.Replace("{{$DatabaseName}}", database.Name)
.Replace("{{$DialectInstructions}}", dialectInstructions));
}, options =>
{
// 挂载 "DataAnalysisPlugin" 中的所有工具
// 这样 Agent 就拥有了 GetTableNames, ExecuteSqlQuery
options.Tools = pluginLoader.GetAITools(nameof(DataAnalysisPlugin));
}, false);
return agent;
}
}
3. 数据分析执行器
职责:处理 Analysis.* 意图,实例化 DBA Agent,执行 Text-to-SQL 任务
//Qjy.AICopilot.AiGatewayService/Workflows/DataAnalysisExecutor.cs
public class DataAnalysisExecutor(
DataAnalysisAgentBuilder agentBuilder,
IDataQueryService dataQuery,
ILogger<DataAnalysisExecutor> logger)
: ReflectingExecutor<DataAnalysisExecutor>("DataAnalysisExecutor"),
IMessageHandler<List<IntentResult>, BranchResult>
{
private const string AnalysisIntentPrefix = "Analysis.";
public async ValueTask<BranchResult> HandleAsync(
List<IntentResult> intentResults,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 1. 筛选数据分析类意图
// 过滤规则:必须以 Analysis. 开头,且置信度高于 0.6
var analysisIntents = intentResults
.Where(i => i.Intent.StartsWith(AnalysisIntentPrefix, StringComparison.OrdinalIgnoreCase)
&& i.Confidence > 0.6)
.ToList();
if (analysisIntents.Count == 0)
{
logger.LogDebug("未检测到数据分析意图,跳过执行。");
// 返回空结果,表示该分支无产出
return BranchResult.FromDataAnalysis(string.Empty);
}
logger.LogInformation("启动数据分析流程,命中目标数据库数量: {Count}", analysisIntents.Count);
// 2. 遍历处理每一个意图
var output = new StringBuilder();
foreach (var intent in analysisIntents)
{
output.AppendLine(await ProcessSingleIntentAsync(intent, context, cancellationToken));
}
return BranchResult.FromDataAnalysis(output.ToString());
}
/// <summary>
/// 处理单个数据库查询意图
/// </summary>
private async Task<string> ProcessSingleIntentAsync(
IntentResult intent,
IWorkflowContext context,
CancellationToken ct)
{
var dbName = intent.Intent.Substring(AnalysisIntentPrefix.Length);
try
{
// 1. 获取数据库配置
// 我们需要 BusinessDatabase 实体来决定方言策略
var db = await dataQuery.FirstOrDefaultAsync(
dataQuery.BusinessDatabases.Where(d => d.Name == dbName));
if (db == null || !db.IsEnabled)
{
logger.LogWarning("意图指向数据库 '{DbName}',但该库不存在或已禁用。", dbName);
return $"[系统提示]: 无法连接数据库 {dbName},请联系管理员核实配置。";
}
// 2. 构建 DBA Agent
// 这里会动态注入 PG 或 SQLServer 的方言提示词
var agent = await agentBuilder.BuildAsync(db);
// 创建临时会话线程
var thread = agent.GetNewThread();
// 4. 执行 ReAct 循环
// Agent 会自动进行: 思考 -> GetTableNames -> 思考 -> GetTableSchema -> 思考 -> ExecuteSQL -> 总结
await foreach (var update in agent.RunStreamingAsync(intent.Query!, thread, cancellationToken: ct))
{
await context.AddEventAsync(new AgentRunUpdateEvent(Id, update), ct);
}
// 记录日志以便调试
logger.LogInformation("数据库 {DbName} 查询完成。", dbName);
// 获取最后一条 Agent 回复消息(最终数据)
var messages = thread.GetService<IList<ChatMessage>>()!;
var output = messages.LastOrDefault(message => message.Role == ChatRole.Assistant);
return output != null ? output.Text : "[系统错误]: 无法获取查询结果。";
}
catch (Exception ex)
{
logger.LogError(ex, "执行数据分析意图失败。Database: {DbName}", dbName);
return $"[系统错误]: 查询数据库 {dbName} 时发生异常 - {ex.Message}";
}
}
}
4. 升级上下文聚合策略
职责:作为 Fan-in 节点,接收来自所有并行分支的 BranchResult。
只有当接收到的结果数量达到预期(3个)时,才进行合并并触发下游。
//Qjy.AICopilot.AiGatewayService/Workflows/ContextAggregatorExecutor.cs
public class ContextAggregatorExecutor(ILogger<ContextAggregatorExecutor> logger)
: ReflectingExecutor<ContextAggregatorExecutor>("ContextAggregatorExecutor"),
IMessageHandler<BranchResult>
{
// 内部状态:用于跨方法调用累积结果
private readonly List<BranchResult> _accumulatedResults = [];
// 硬编码预期分支数:Tools + Knowledge + DataAnalysis = 3
private const int ExpectedBranchCount = 3;
public async ValueTask HandleAsync(
BranchResult branchResult,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 1. 累积状态
// 注意:FanInEdge 可能一次性送来所有结果,也可能分批送来
// 因此我们需要 AddRange 并检查总数
_accumulatedResults.AddRange(branchResult);
// 2. 完备性检查
if (_accumulatedResults.Count >= ExpectedBranchCount)
{
logger.LogInformation("并行分支汇聚完成,开始合并上下文。");
// 3. 恢复原始请求 (从全局状态中读取)
var request = await context.ReadStateAsync<ChatStreamRequest>("ChatStreamRequest", "Chat", cancellationToken);
if (request == null) throw new InvalidOperationException("无法获取原始会话请求");
var genContext = new GenerationContext { Request = request };
// 4. 合并数据
foreach (var result in _accumulatedResults)
{
switch (result.Type)
{
case BranchType.Tools when result.Tools != null:
genContext.Tools = result.Tools;
break;
case BranchType.Knowledge when !string.IsNullOrWhiteSpace(result.Knowledge):
genContext.KnowledgeContext = result.Knowledge;
break;
case BranchType.DataAnalysis when !string.IsNullOrWhiteSpace(result.DataAnalysis):
genContext.DataAnalysisContext = result.DataAnalysis;
break;
}
}
// 5. 清理状态 (为可能的下一轮对话做准备)
_accumulatedResults.Clear();
// 6. 手动发送聚合结果消息,触发下游
await context.SendMessageAsync(genContext, cancellationToken);
}
else
{
// 如果未满足条件,不返回/发送任何值/消息(Task完成但无Output),流程在此暂停等待下一批消息
logger.LogDebug("聚合进度: {Current}/{Total},等待其他分支...", _accumulatedResults.Count, ExpectedBranchCount);
}
}
}
5. 最终处理执行器构建混合上下文内容
/// <summary>
/// 最终处理执行器
/// 职责:利用聚合后的上下文构建 Agent,注入 RAG 提示词,并执行流式生成。
/// </summary>
public class FinalProcessExecutor(
ChatAgentFactory agentFactory,
IDataQueryService dataQuery,
ILogger<FinalProcessExecutor> logger) :
ReflectingExecutor<FinalProcessExecutor>("FinalProcessExecutor"),
IMessageHandler<GenerationContext> // <-- 输入类型变更为聚合上下文
{
public async ValueTask HandleAsync(
GenerationContext genContext,
IWorkflowContext context,
CancellationToken cancellationToken = new())
{
try
{
var request = genContext.Request;
logger.LogInformation("开始最终生成,SessionId: {SessionId}", request.SessionId);
// 1. 获取会话关联的模板配置
// 我们需要知道当前会话使用的是哪个 Agent 模板(例如"通用助手"或"HR助手")
var session = await dataQuery.FirstOrDefaultAsync(dataQuery.Sessions.Where(s => s.Id == request.SessionId));
if (session == null) throw new InvalidOperationException("会话不存在");
// 2. 创建基础 Agent 实例
// 此时 Agent 拥有的是数据库中定义的静态 System Prompt
var agent = await agentFactory.CreateAgentAsync(session.TemplateId);
// 3. 构建消息列表
var inputMessages = new List<ChatMessage>();
string finalUserPrompt;
// 检查是否存在 知识库上下文 或 数据分析上下文
bool hasKnowledge = !string.IsNullOrWhiteSpace(genContext.KnowledgeContext);
bool hasDataAnalysis = !string.IsNullOrWhiteSpace(genContext.DataAnalysisContext);
bool hasContext = hasKnowledge || hasDataAnalysis;
if (hasContext)
{
// 构建混合上下文内容
var contextBuilder = new StringBuilder();
if (hasDataAnalysis)
{
contextBuilder.AppendLine("数据分析/SQL查询结果:");
contextBuilder.AppendLine(genContext.DataAnalysisContext);
contextBuilder.AppendLine();
}
if (hasKnowledge)
{
contextBuilder.AppendLine("知识库检索参考信息:");
contextBuilder.AppendLine(genContext.KnowledgeContext);
contextBuilder.AppendLine();
}
// 使用 XML 标签 <context> 是一种最佳实践
finalUserPrompt = $"""
请基于以下参考信息(包含数据库查询结果或检索文档)回答我的问题:
<context>
{contextBuilder}
</context>
回答要求:
1. 引用参考信息时,请标注来源 ID(例如 [^1])。
2. 针对数据分析结果,请结合用户问题进行自然语言解释,不要直接展示原始数据结构,除非用户要求。
3. 在回答结尾,如果引用了知识库文档,请生成“参考资料”列表。
4. 如果参考信息不足以回答问题,请直接说明,严禁编造。
5. 保持回答专业、简洁。
用户问题:
{request.Message}
""";
logger.LogDebug("增强模式激活:注入知识({KSize}),注入数据({DSize})。",
genContext.KnowledgeContext?.Length ?? 0,
genContext.DataAnalysisContext?.Length ?? 0);
}
else
{
// 无上下文模式:直接透传用户问题
finalUserPrompt = request.Message;
logger.LogDebug("增强模式未激活:仅使用用户原始输入。");
}
// 将组合后的提示作为单条 User 消息添加
// 利用近因效应,让模型在读取完长文本后立刻看到问题,提升注意力。
inputMessages.Add(new ChatMessage(ChatRole.User, finalUserPrompt));
// 4. 准备执行参数 (ChatOptions)
// 将动态加载的工具集挂载到本次执行的选项中
var runOptions = new ChatClientAgentRunOptions
{
ChatOptions = new ChatOptions
{
Tools = genContext.Tools, // <-- 动态挂载工具
}
};
// 如果有注入任何上下文(知识或数据),都降低温度以保证事实性
if (hasContext)
{
runOptions.ChatOptions.Temperature = 0.3f;
}
// 5. 恢复会话状态 (Thread)
// 从持久化存储中恢复之前的对话历史
var storeThread = new { storeState = new SessionSoreState(request.SessionId) };
var agentThread = agent.DeserializeThread(JsonSerializer.SerializeToElement(storeThread));
// 6. 执行流式生成
await foreach (var update in agent.RunStreamingAsync(
inputMessages,
agentThread,
runOptions,
cancellationToken))
{
// 将 Agent 的更新事件(文本块、工具调用状态等)转发到工作流事件流
// 这样前端就能通过 SSE 收到实时打字机效果
await context.AddEventAsync(new AgentRunUpdateEvent(Id, update), cancellationToken);
}
}
catch (Exception e)
{
logger.LogError(e, "最终生成阶段发生错误");
// 发送失败事件,让前端能感知到错误
await context.AddEventAsync(new ExecutorFailedEvent(Id, e), cancellationToken);
throw;
}
}
}
6. 注册
//Qjy.AICopilot.AiGatewayService/DependencyInjection.cs
public static void AddAiGatewayService(this IHostApplicationBuilder builder)
{
// ..其他代码
builder.Services.AddScoped<DataAnalysisAgentBuilder>();
builder.AddIntentWorkflow();
}
//Qjy.AICopilot.AiGatewayService.Workflows/IntentWorkflow.cs
public static class IntentWorkflow
{
public static void AddIntentWorkflow(this IHostApplicationBuilder builder)
{
builder.Services.AddTransient<IntentRoutingExecutor>();
builder.Services.AddTransient<ToolsPackExecutor>();
builder.Services.AddTransient<KnowledgeRetrievalExecutor>();
builder.Services.AddTransient<ContextAggregatorExecutor>();
builder.Services.AddTransient<DataAnalysisExecutor>();
builder.Services.AddTransient<FinalProcessExecutor>();
builder.AddWorkflow(nameof(IntentWorkflow), (sp, key) =>
{
var scopeFactory = sp.GetRequiredService<IServiceScopeFactory>();
var scope = scopeFactory.CreateScope();
var intentRouting = scope.ServiceProvider.GetRequiredService<IntentRoutingExecutor>();
var toolsPack = scope.ServiceProvider.GetRequiredService<ToolsPackExecutor>();
var knowledgeRetrieval = scope.ServiceProvider.GetRequiredService<KnowledgeRetrievalExecutor>();
var dataAnalysis = scope.ServiceProvider.GetRequiredService<DataAnalysisExecutor>();
var aggregator = scope.ServiceProvider.GetRequiredService<ContextAggregatorExecutor>();
var finalProcess = scope.ServiceProvider.GetRequiredService<FinalProcessExecutor>();
var workflowBuilder = new WorkflowBuilder(intentRouting);
workflowBuilder.WithName(key)
// 1. 扇出 (Fan-out): 意图识别 -> [工具打包, 知识检索]
// IntentRoutingExecutor 输出的 List<IntentResult> 会被广播给 targets 列表中的每一个节点
.AddFanOutEdge(intentRouting, [toolsPack, knowledgeRetrieval, dataAnalysis])
// 2. 扇入 (Fan-in): [工具打包, 知识检索] -> 聚合器
// 聚合器接收来自 sources 列表的所有输出
.AddFanInEdge([toolsPack, knowledgeRetrieval, dataAnalysis], aggregator)
// 3. 线性连接: 聚合器 -> 最终处理
.AddEdge(aggregator, finalProcess);
return workflowBuilder.Build();
});
}
}
6. 测试数据分析意图
我们发送一个数据分析的意图
//[post]/chat
{
"sessionId": "95f1e13b-427f-4817-be90-0c32c874cb55",
"message": "2023年第四季度,哪个月的销售额度最高"
}
回复如下: ```json [ { "intent": "Analysis.ERP_Core", "confidence": 0.97, "reasoning": "用户询问的是2023年第四季度各月的销售额度比较,属于具体的业务数据统计问题。该问题涉及‘销售额’这一动态数值,需从销售业务数据库中查询,因此应选择 Analysis.ERP_Core 库。", "query": "2023年第四季度每月销售额度对比" } ] ``` 我需要查询2023年第四季度(10月、11月、12月)的每月销售额数据。首先,让我探索数据库中的相关表。 ```tool_code GetTableNames(db_name="ERP_Core") ```当前提供的参考信息仅包含一个数据库操作指令(`GetTableNames(db_name="ERP_Core")`),但未返回任何实际的表名列表,也未展示与销售额相关的数据表结构或2023年第四季度的销售数据 [^1]。因此,无法确定哪个月(10月、11月或12月)的销售额最高。 请提供具体的销售数据查询结果(例如包含月份和对应销售额的记录),以便进行分析和回答。
从返回的结果可以知道数据分析的意图已经识别到,并且给出了 Analysis.ERP_Core 库,目前并没有执行 SQL 语句,AI 还不能给出最终结果。
五、设计可视化交互协议
1. 生成式 UI 基础
到目前为止,我们的智能体还只会使用文字内容来回复用户的消息,我们叫这种界面为对话式用户界面 CUI。如果只是工具调用和知识库检索,文字回复已经可以满足绝大部分的场景。但是当接入数据分析功能后,用户的需求可能需要表格或图形来显示更为合适,我们叫这种页面为图形用户界面 GUI,比如下面需求:
- 最近一个月的销售数据:最好是用表格来显示
- 同比,环比数据:最好是用图形化显示
这样,我们的 AI 助理就产生了新的可视化的需求,他需要能根据用户的消息自动来生成 GUI + CUI 的界面,我们称这种界面为生成式用户界面 GenUI。
GenUl设计原则:
- 服务端驱动UI
- MVI:模型-视图-意图
- 数据包含时间序列 + 数值指标时,使用折线图
- 数据包含分类字段 + 数值指标 + 总和时,使用饼图
- 数据只有一列,使用卡片
- 数据包括很多行和列,使用表格
- ......
- 流式传输支持多模态协议:SSE 流式传输文本块、混合媒体流(思考过程 + 文本片段 + UI组件块)
2. Agent 可视化决策设计
现在我们需要定义一套 AI 返回标准,让工作流根据智能体返回的消息内容来完成 UI 的驱动。
比如,我们可以定义一个如下格式的 Json 返回标准:
{
"widgetType": "string", // 组件类型标识,如 "LineChart","pieChart","StatsCard"
"title": "string", //组件标题
"description": "string", //组件的辅助说明
"data": "object", // 组件特定的数据载体
"options": "object" // 可选的可视化配置(如颜色主题,通常由前端默认,后端可覆盖)
}
AI 根据数据量级和图表类型判断选择什么样的界面显示,具体返回的内容可能是:
//统计卡片
{
"widgetType": "StatsCard",
"title": "当前库存总值",
"data":
{
"label": "Total Inventory Value",
"value": 1245000.00,
"unit": "CNY",
"trend": 12.5,
"trendDirection": "Up"
}
}
//数据表格
{
"widgetType": "DataTable",
"title": "大额订单列表",
"description": "显示金额超过 10,000 的最近订单",
"data":
{
"columns": [
{ "key": "orderId", "label": "订单号", "dataType": "string" },
{ "key": "amount", "label": "金额", "dataType": "currency" },
{ "key": "created", "label": "下单时间", "dataType": "date" }
],
"rows": [
{ "orderId": "ORD-001", "amount": 15000, "created": "2023-10-01T12:00:00Z" },
{ "orderId": "ORD-002", "amount": 22000, "created": "2023-10-02T14:30:00Z" }
]
}
}
//通用图表
{
"widgetType": "Chart",
"title": "月度营收分析",
"data": {
"category": "Line",
"dataset": {
"dimensions": ["Date", "Revenue", "Cost"],
"source": [
{"Date": "2023-01", "Revenue": 100, "Cost": 80},
{"Date": "2023-02", "Revenue": 120, "Cost": 90}
]
},
"encoding": {
"x": "Date",
"y": ["Revenue", "Cost"]
}
}
}
3. 实现可视化交互协议
我们实现3种可视化的场景(StatsCard、DataTable 和 Chart),其中 Chart 有分为折线图、饼图等,我们在 Shared 层创建一个 Qjy.AICopilot.Visualization 类库项目。
- 定义可视化组件接口,并提供抽象基类
//Qjy.AICopilot.Visualization/Widgets/IWidget.cs
/// <summary>
/// 可视化组件类型的枚举定义
/// </summary>
[JsonConverter(typeof(JsonStringEnumConverter))]
public enum WidgetType
{
StatsCard, // 统计卡片
DataTable, // 数据表格
Chart // 通用图表
}
/// <summary>
/// 可视化组件通用契约
/// </summary>
public interface IWidget
{
/// <summary>
/// 组件标题
/// </summary>
string Title { get; set; }
/// <summary>
/// 组件描述信息
/// </summary>
string? Description { get; set; }
}
/// <summary>
/// 可视化组件基类
/// </summary>
[JsonPolymorphic(TypeDiscriminatorPropertyName = "type")]
[JsonDerivedType(typeof(StatsCardWidget), typeDiscriminator: nameof(WidgetType.StatsCard))]
[JsonDerivedType(typeof(DataTableWidget), typeDiscriminator: nameof(WidgetType.DataTable))]
[JsonDerivedType(typeof(ChartWidget), typeDiscriminator: nameof(WidgetType.Chart))]
public abstract class WidgetBase : IWidget
{
/// <summary>
/// 组件标题
/// </summary>
public string Title { get; set; } = string.Empty;
/// <summary>
/// 组件描述信息
/// </summary>
public string? Description { get; set; }
}
- 定义三种类型的可视化组件类型
//Qjy.AICopilot.Visualization/Widgets/StatsCardWidget.cs
public class StatsCardWidget : WidgetBase
{
public required StatsCardData Data { get; set; }
}
public class StatsCardData
{
public required string Label { get; set; }
// 使用 object 以允许 string 或 number,但在序列化时尽量保持数值类型
public required object Value { get; set; }
public string? Unit { get; set; }
}
//Qjy.AICopilot.Visualization/Widgets/DataTableWidget.cs
public class DataTableWidget : WidgetBase
{
public required DataTableData Data { get; set; }
}
public class DataTableData
{
public List<TableColumn> Columns { get; set; } = new();
// 使用 Dictionary<string, object> 来表示动态行数据
public List<Dictionary<string, object?>> Rows { get; set; } = new();
}
public class TableColumn
{
public required string Key { get; set; }
public required string Label { get; set; }
public string DataType { get; set; } = "string"; // string, number, date, boolean
}
//Qjy.AICopilot.Visualization/Widgets/ChartWidget.cs
public class ChartWidget : WidgetBase
{
public required ChartData Data { get; set; }
}
public class ChartData
{
[JsonConverter(typeof(JsonStringEnumConverter))]
public ChartCategory Category { get; set; }
public ChartDataset Dataset { get; set; } = new();
public ChartEncoding? Encoding { get; set; }
}
public enum ChartCategory
{
Line,
Bar,
Pie
}
public class ChartDataset
{
/// <summary>
/// 维度定义(即表头)
/// </summary>
public List<string> Dimensions { get; set; } = [];
/// <summary>
/// 数据源,每一项对应一行数据
/// key 必须与 Dimensions 中的名称匹配
/// </summary>
public List<Dictionary<string, object?>> Source { get; set; } = [];
}
public class ChartEncoding
{
/// <summary>
/// X 轴映射的维度 Key
/// </summary>
public string? X { get; set; }
/// <summary>
/// Y 轴映射的维度 Key 列表(支持多系列)
/// </summary>
public List<string> Y { get; set; } = [];
/// <summary>
/// 系列分组字段(可选)
/// 例如:按 "Region" 字段分组,生成多条折线
/// </summary>
public string? SeriesName { get; set; }
}
- 创建生成组件数据的扩展方法
/// <summary>
/// 简化的 Schema 定义,用于传递给适配器
/// </summary>
public record SchemaColumn(string Name, Type DataType);
public static class VisualizationAdapterExtensions
{
extension(IEnumerable<dynamic> rawData)
{
/// <summary>
/// 将原始数据转换为通用的图表数据集
/// </summary>
public ChartDataset ToChartDataset(IEnumerable<SchemaColumn> schema)
{
var dataset = new ChartDataset();
// 1. 设置维度 (Dimensions)
dataset.Dimensions = schema.Select(s => s.Name).ToList();
// 2. 转换数据源 (Source)
foreach (var row in rawData)
{
if (row is IDictionary<string, object> rowDict)
{
var cleanDict = new Dictionary<string, object?>();
foreach (var kvp in rowDict)
{
// 核心:处理类型兼容性
cleanDict[kvp.Key] = SanitizeValue(kvp.Value);
}
dataset.Source.Add(cleanDict);
}
}
return dataset;
}
/// <summary>
/// 将原始数据转换为数据表格结构
/// </summary>
public DataTableData ToDataTableData(IEnumerable<SchemaColumn> schema)
{
var tableData = new DataTableData();
// 1. 构建列定义
foreach (var col in schema)
{
tableData.Columns.Add(new TableColumn
{
Key = col.Name,
Label = col.Name, // 初始 Label 等于 Key,后续可由 LLM 优化
DataType = MapSystemTypeToWebType(col.DataType)
});
}
// 2. 构建行数据 (逻辑同上)
foreach (var row in rawData)
{
if (row is IDictionary<string, object> rowDict)
{
var cleanDict = new Dictionary<string, object?>();
foreach (var kvp in rowDict)
{
cleanDict[kvp.Key] = SanitizeValue(kvp.Value);
}
tableData.Rows.Add(cleanDict);
}
}
return tableData;
}
}
/// <summary>
/// 类型清洗与标准化
/// </summary>
private static object? SanitizeValue(object? value)
{
if (value == null || value is DBNull) return null;
return value switch
{
// 确保 DateTime 转换为 ISO 8601 字符串
DateTime dt => dt.ToString("yyyy-MM-ddTHH:mm:ss"),
DateTimeOffset dto => dto.ToString("yyyy-MM-ddTHH:mm:ss"),
// 数值类型保持原样,避免被转为字符串
decimal or double or float or int or long => value,
// 其他类型转字符串
_ => value.ToString()
};
}
/// <summary>
/// .NET 类型映射到 Web 前端类型
/// </summary>
private static string MapSystemTypeToWebType(Type type)
{
if (IsNumeric(type)) return "number";
if (type == typeof(DateTime) || type == typeof(DateTimeOffset)) return "date";
if (type == typeof(bool)) return "boolean";
return "string";
}
private static bool IsNumeric(Type type)
{
type = Nullable.GetUnderlyingType(type) ?? type;
return type == typeof(int) || type == typeof(long) || type == typeof(double) ||
type == typeof(decimal) || type == typeof(float);
}
}
- 扩展数据库分析专家的能力
协议格式定义好之后,接下来我们修改数据库分析专家的提示词,让模型返回的数据与我们定义的数据格式匹配上。
var dataAnalysisTemplate = new ConversationTemplate(
"DataAnalysisAgent",
"数据库分析专家",
"""
你是一个精通 **{{$DbProvider}}** 的高级数据库管理员。
你当前正在操作的 **目标数据库名称** 为:**{{$DatabaseName}}**。
你的核心职责分为两步:
1. **数据获取**:构造精准的 SQL 语句查询业务数据。
2. **数据转化**:对查询结果进行语义解释,并设计最佳的可视化展示方案。
### 核心工作流程
1. **探索**: 调用 `GetTableNames` 初步筛选候选表。
2. **详查**: 调用 `GetTableSchema` 获取详细 DDL 和 **字段注释**。
3. **构建**: 生成 SQL 并调用 `ExecuteSqlQuery` 获取数据样本。
4. **决策**: 观察查询结果,思考以下问题:
- 这些字段的业务含义是什么?(特别是状态码、类型值)
- 这组数据适合用图表展示吗?(趋势用折线图,分布用饼图,对比用柱状图,明细用表格)
5. **输出**: 生成一个严格合法的 JSON,不要使用 ```json,JSON 格式规范如下:
{
"analysis": {
"database": "{{$DatabaseName}}",
"description": "在此处填入数据内容的简要概括",
"metadata": [
{ "name": "字段名", "description": "字段注释或说明" }
]
},
"visual_decision": {
"type": "Chart", // 可选值: Chart, DataTable, StatsCard
"title": "标题",
"description": "在此处填入数据内容的简要概括",
"chart_config": {
// 可选字段
}
"Unit": "单位" // 可选字段
}
}
### 核心交互原则
1. **过程透明(允许)**:在调用工具或构建 SQL 的过程中,你可以简要向用户解释你的思路。
2. **结构化输出(严格)**:你不需要在 JSON 中输出实际的数据行,你只需要提供元数据和可视化配置。
- `analysis`: 数据分析字段,如果查询数据失败,此字段可以 null。
- `database`: 当前数据库名称。
- `description`: 根据用户问题生成的简短数据说明。
- `metadata`: 必须包含查询结果中每个字段的定义,包括 `name`(字段名)和 `description`(从表结构中获取的字段注释/说明)。
- `visual_decision`: 可视化决策字段,如果数据不适合可视化,此字段可为 null。
- `type`: 图表类型,可选值: Chart, DataTable, StatsCard。
- `title`: 图表标题。
- `description`: 根据用户问题生成的简短数据说明。
- `chart_config`: Chart 类型图表专有字段。
- `unit`: StatsCard 类型图表专有字段。
3. **禁止解读(严格)**:**严禁** 对数据具体数值进行趋势分析或总结。
### 核心安全准则
- **只读权限**: 你仅拥有 `SELECT` 权限。严禁生成 `INSERT`, `UPDATE`, `DELETE`, `DROP` 等修改性语句。
- **范围限制**: 所有的探索和查询操作必须严格限制在数据库 **{{$DatabaseName}}** 范围内。
### 数据库方言规范
当前连接的数据库遵循以下语法标准,请严格遵守:
{{$DialectInstructions}}
### 可视化输出规范决策指南
在生成回答时,你需要判断当前的数据结果最适合以何种 UI 形式展示给用户。
如果数据适合可视化,请严格按照以下 JSON 格式输出决策指令。
**场景 1:趋势或对比分析 (Chart)**
当数据包含时间序列、分类对比且行数适中时使用。
"visual_decision": {
"type": "Chart",
"title": "图表标题",
"description": "图表描述",
"chart_config": {
"category": "可选值:Line,Bar,Pie",
"x": "作为X轴的字段名,例如 order_date",
"y": "作为Y轴的数值字段名,例如 total_amount",
"series": "可选,用于分组的字段名,例如 product_category"
}
}
**场景 2:明细数据列表 (DataTable)**
当数据是详细记录列表(如订单列表、库存清单)且行数较多时使用。
"visual_decision": {
"type": "DataTable",
"title": "列表标题",
"description": "列表描述",
}
**场景 3:单一关键指标 (StatsCard)**
"visual_decision": {
"type": "StatsCard",
"title": "卡片标题",
"description": "卡片描述",
"unit": "单位(可选)"
}
""",
Guids[1],
new TemplateSpecification
{
Temperature = 0.3f
});
六、双路输出与数据分流
1. 需求分析
现在我们的模型已经拥有了生成式 UI 的协议,并且可以按协议格式返回数据了。看起来已经没什么问题了,但在生成场景中还存在一个现实的问题,Token 过量。
比如,我们查询的数据非常多时,如果把这些数据都放到模型上下文中,这会存在内容超出上下文限制的情况。即使没有超出,大量的数据也会挤占上下文空间,这些内容不会给模型带来分析价值,反而会使模型产生幻觉。而且还有一个更实际的情况,大量的 Token,会增加成本。
实际上,对于模型来说,他只需要少了的数据(比如前5条)用来分析用户意图,而查询到的数据只要能在用户界面上显示出来就可以了。
所以,现在我们模型需要的是完成下面这些功能:
- 状态留存:既让LLM看到数据摘要(理解结构),又保留完整的原始数据(用于前端渲染)
- 可视化决策
- 决策注入:自助选择图表类型
流程图:
flowchart TD
A[用户意图] --> B[DataAnalysisExecutor]
B --> C[核心改造点]
C --> D{DBAgent 决策}
D -->|原始数据 + 视觉配置| E[可视化适配器]
D -->|原始数据 + 字段字典| F[数据序列化器]
E --> G[主路:Schema+Data JSON]
F --> G
G --> H[聚合]
H --> I[Final Agent]
I --> J[自然语言解释]
D --> K[旁路:Widget JSON]
K --> L[前端]
J --> L
2. 实现数据分流
- 定义数据分类上下文,我们创建一个可视化的上下文对象
//Qjy.AICopilot.DataAnalysisService/Serivices/VisualizationContext.cs
/// <summary>
/// 可视化上下文
/// 职责:在 Scoped 生命周期内暂存原始的数据库查询结果,
/// 以便后续的执行器能够获取无损数据用于构建 UI 组件。
/// </summary>
public class VisualizationContext
{
// 存储最后一次查询的数据行(动态类型)
private IEnumerable<dynamic>? _lastResultSet;
// 存储最后一次查询的 Schema 信息(列名、类型等)
private IEnumerable<SchemaColumn>? _lastResultSchema;
// 存储数据分析输出结果
private DataAnalysisOutputDto _outputDto = new ();
/// <summary>
/// 捕获查询结果
/// </summary>
public void CaptureResult(IEnumerable<dynamic> resultSet, IEnumerable<SchemaColumn> schema)
{
_lastResultSet = resultSet;
_lastResultSchema = schema;
}
/// <summary>
/// 捕获输出结果
/// </summary>
public void CaptureOutput(AnalysisDto? analysis, VisualDecisionDto? decision)
{
_outputDto.Analysis = analysis;
_outputDto.Decision = decision;
}
/// <summary>
/// 获取暂存的数据集
/// </summary>
public (IEnumerable<dynamic>? Data, IEnumerable<SchemaColumn>? Schema) GetLastResult()
{
return (_lastResultSet, _lastResultSchema);
}
/// <summary>
/// 获取输出结果
/// </summary>
public DataAnalysisOutputDto GetOutput()
{
return _outputDto;
}
/// <summary>
/// 检查是否包含有效数据
/// </summary>
public bool HasData => _lastResultSet != null && _lastResultSet.Any();
}
- 扩展数据分析插件,实现双路数据存储。关键步骤
- vizContext.CaptureResult(data, schema); 将原始结果捕获到上下文中
- 最后的 return,只返回了5条数据,用于模型完成意图分析
//Qjy.AICopilot.DataAnalysisService.Plugins/DataAnalysisPlugin.cs
[Description("在指定数据库上执行查询 SQL 语句,并返回 JSON 格式的结果。")]
public async Task<string> ExecuteSqlQueryAsync(
IServiceProvider sp,
[Description("目标数据库的名称")] string databaseName,
[Description("要执行的 SQL 查询语句 (仅限 SELECT,不需要人类可读,去除换行符)")] string sqlQuery)
{
// 1. 基础校验
if (string.IsNullOrWhiteSpace(sqlQuery)) return "错误:SQL 语句不能为空。";
try
{
var db = await GetDatabaseAsync(sp, databaseName, CancellationToken.None);
// 2. 执行查询
var result = await dbConnector.ExecuteQueryAsync(db, sqlQuery);
var data = result.ToList();
var firstRow = data.FirstOrDefault() as IDictionary<string, object>;
var schema = new List<SchemaColumn>();
if (firstRow != null)
{
foreach (var kvp in firstRow)
{
// 获取值的运行时类型,如果为 null 则默认为 object
var type = kvp.Value?.GetType() ?? typeof(object);
schema.Add(new SchemaColumn(kvp.Key, type));
}
}
// 3. 【关键步骤】将原始结果捕获到上下文中
var vizContext = sp.GetRequiredService<VisualizationContext>();
vizContext.CaptureResult(data, schema);
return data.Count == 0 ?
// 没有数据
"查询执行成功,但未返回任何结果 (0 rows)。" :
// LLM 只需要看摘要,不需要看完整的数据,仅取前 5 行
data.Take(5).ToJson();
}
catch (InvalidOperationException ex) // 捕获安全拦截异常
{
logger.LogWarning("SQL 执行被拦截: {Message}", ex.Message);
return $"安全警告: 查询被系统拒绝。原因: {ex.Message}";
}
catch (Exception ex)
{
// 这里是 ReAct 模式中“错误自愈”的关键!
// 我们必须返回详细的数据库错误信息(如 "Column 'xxx' not found"),
// 这样 Agent 才能看到错误 -> 思考原因 -> 修正 SQL -> 重试。
logger.LogError(ex, "SQL 执行异常");
return $"SQL 执行错误: {ex.Message}\n请检查你的 SQL 语法、表名或列名是否正确,并参考之前的 Schema 定义进行修正。";
}
[Description("输出数据分析结果和可视化决策结果")]
public string OutputResult(IServiceProvider sp,
[Description("数据分析结果,如果查询数据失败,此字段可以 null")]
AnalysisDto? analysis,
[Description("可视化决策结果,参考【可视化输出规范决策指南】,如果数据不适合可视化,此字段可为 null")]
VisualDecisionDto? decision)
{
// 将输出结果捕获到上下文中
var vizContext = sp.GetRequiredService<VisualizationContext>();
vizContext.CaptureOutput(analysis, decision);
return "已成功获取输出结果,请直接回复‘数据查询分析任务完成,结果如下:’";
}
}
- 注册
//Qjy.AICopilot.DataAnalysisService/DependencyInjection.cs
public static class DependencyInjection
{
public static void AddDataAnalysisService(this IHostApplicationBuilder builder)
{
// 注册 Dapper 基础服务
builder.AddDapper();
builder.Services.AddScoped<VisualizationContext>();
// 注册插件加载器
builder.Services.AddAgentPlugin(registrar =>
{
registrar.RegisterPluginFromAssembly(Assembly.GetExecutingAssembly());
});
}
}
3. 完成双路输出
- 定义输出传输对象
//Qjy.AICopilot.DataAnalysisService/Plugins/DataAnalysisAgentOutputDto.cs
public record DataAnalysisOutputDto
{
// 对应 Agent 返回的 "Analysis" 数组
// 这里使用 object 或 JsonElement 都可以,因为我们不需要修改它,只需透传
[JsonPropertyName("analysis")]
public AnalysisDto? Analysis { get; set; }
// 对应 Agent 返回的 "visual_decision" 对象
[JsonPropertyName("visual_decision")]
public VisualDecisionDto? Decision { get; set; }
}
public record AnalysisDto
{
[Description("当前数据库名称")]
[JsonPropertyName("database_name")]
public string DatabaseName { get; set; } = string.Empty;
[Description("据用户问题生成的简短数据说明")]
[JsonPropertyName("description")]
public string Description { get; set; } = string.Empty;
[Description("查询结果中每个字段的定义")]
[JsonPropertyName("metadata")]
public List<MetadataItemDto> Metadata { get; set; } = [];
}
public record MetadataItemDto
{
[Description("字段名")]
[JsonPropertyName("name")]
public string Name { get; set; } = string.Empty;
[Description("从表结构中获取的字段注释/说明")]
[JsonPropertyName("description")]
public string Description { get; set; } = string.Empty;
}
public record VisualDecisionDto
{
[Description("图表类型,可选值: Chart, DataTable, StatsCard")]
[JsonPropertyName("type")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public WidgetType Type { get; set; }
[Description("图表标题")]
[JsonPropertyName("title")]
public string Title { get; set; } = string.Empty;
[Description("根据用户问题生成的简短数据说明")]
[JsonPropertyName("description")]
public string Description { get; set; } = string.Empty;
[Description("可选字段,Chart 类型图表专有字段")]
[JsonPropertyName("chart_config")]
public ChartConfig? ChartConfig { get; set; }
[Description("可选字段,StatsCard 类型图表专有字段")]
[JsonPropertyName("unit")]
public string? Unit { get; set; }
}
public record ChartConfig
{
[Description("图表类型,可选值: Line,Bar,Pie")]
[JsonPropertyName("category")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public ChartCategory Category { get; set; }
[Description("作为X轴的字段名,例如 order_date")]
[JsonPropertyName("x")]
public string X { get; set; } = string.Empty;
[Description("作为Y轴的数值字段名,例如 total_amount")]
[JsonPropertyName("y")]
public string Y { get; set; } = string.Empty;
[Description("可选,用于分组的字段名,例如 product_category")]
[JsonPropertyName("series")]
public string? Series { get; set; }
}
- 重构数据分析执行器,关键代码,见代码中备注(分流路径 1、2部分)
- 分流路径 1:会发出 AgentRunResponseEvent 事件(后面会有事件处理逻辑)
- 分流路径 2:直接输出,继续后续工作流
/// <summary>
/// 数据分析执行器
/// 职责:处理 Analysis.* 意图,实例化 DBA Agent,执行 Text-to-SQL 任务。
/// </summary>
public class DataAnalysisAgentBuilder agentBuilder,
IDataQueryService dataQuery,
VisualizationContext vizContext,
ILogger<DataAnalysisExecutor> logger)
: ReflectingExecutor<DataAnalysisExecutor>("DataAnalysisExecutor"),
IMessageHandler<List<IntentResult>, BranchResult>DataAnalysisExecutor(
DataAnalysisAgentBuilder agentBuilder,
VisualizationContext vizContext,
IDataQueryService dataQuery,
ILogger<DataAnalysisExecutor> logger)
: ReflectingExecutor<DataAnalysisExecutor>("DataAnalysisExecutor"),
IMessageHandler<List<IntentResult>, BranchResult>
{
private const string AnalysisIntentPrefix = "Analysis.";
public async ValueTask<BranchResult> HandleAsync(
List<IntentResult> intentResults,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 1. 筛选数据分析类意图
// 过滤规则:必须以 Analysis. 开头,且置信度高于 0.6
var analysisIntents = intentResults
.Where(i => i.Intent.StartsWith(AnalysisIntentPrefix, StringComparison.OrdinalIgnoreCase)
&& i.Confidence > 0.6)
.ToList();
if (analysisIntents.Count == 0)
{
logger.LogDebug("未检测到数据分析意图,跳过执行。");
// 返回空结果,表示该分支无产出
return BranchResult.FromDataAnalysis(string.Empty);
}
logger.LogInformation("启动数据分析流程,命中目标数据库数量: {Count}", analysisIntents.Count);
// 2. 遍历处理每一个意图
var output = new StringBuilder();
foreach (var intent in analysisIntents)
{
output.AppendLine(await ProcessSingleIntentAsync(intent, context, cancellationToken));
}
return BranchResult.FromDataAnalysis(output.ToString());
}
/// <summary>
/// 处理单个数据库查询意图
/// </summary>
private async Task<string> ProcessSingleIntentAsync(
IntentResult intent,
IWorkflowContext context,
CancellationToken ct)
{
var dbName = intent.Intent.Substring(AnalysisIntentPrefix.Length);
try
{
// 1. 获取数据库配置
// 我们需要 BusinessDatabase 实体来决定方言策略
var db = await dataQuery.FirstOrDefaultAsync(
dataQuery.BusinessDatabases.Where(d => d.Name == dbName));
if (db == null || !db.IsEnabled)
{
logger.LogWarning("意图指向数据库 '{DbName}',但该库不存在或已禁用。", dbName);
return $"[系统提示]: 无法连接数据库 {dbName},请联系管理员核实配置。";
}
// 2. 构建 DBA Agent
// 这里会动态注入 PG 或 SQLServer 的方言提示词
var agent = await agentBuilder.BuildAsync(db);
// 创建临时会话线程
var thread = agent.GetNewThread();
// 3. 执行 ReAct 循环
// Agent 会自动进行: 思考 -> GetTableNames -> 思考 -> GetTableSchema -> 思考 -> ExecuteSQL -> 总结
await foreach (var update in agent.RunStreamingAsync(intent.Query!, thread, cancellationToken: ct))
{
await context.AddEventAsync(new AgentRunUpdateEvent(Id, update), ct);
}
// 记录日志以便调试
logger.LogInformation("数据库 {DbName} 查询完成。", dbName);
// 获取最后一条 Agent 回复消息(最终数据)
//var messages = thread.GetService<IList<ChatMessage>>()!;
//var response = messages.Last();
//var output = JsonSerializer.Deserialize<DataAnalysisAgentOutputDto>(response.Text);
// 获取可视化上下文
var (rawData, schema) = vizContext.GetLastResult();
var output = vizContext.GetOutput();
// =========================================================
// 分流路径 1:旁路输出 (Side Path) -> 前端 Widget
// 目标:visual_decision + data -> Widget JSON
// =========================================================
if (output is { Decision: not null } && vizContext.HasData)
{
try
{
var widget = BuildWidget(output.Decision, rawData!, schema!);
var message = new ChatMessage(ChatRole.Assistant, widget.ToJson());
await context.AddEventAsync(new AgentRunResponseEvent(Id, new AgentRunResponse(message)), ct);
}
catch (Exception ex)
{
logger.LogError(ex, "构建可视化 Widget 失败。Database: {DbName}", dbName);
return $"[系统错误]: 构建可视化 Widget 时发生异常 - {ex.Message}";
}
}
// =========================================================
// 分流路径 2:主路输出 (Main Path) -> 聚合器 -> Final Agent
// 目标:schema + data -> Combined JSON
// =========================================================
// 这里直接使用匿名对象进行拼接:
// { "schema": [], "data": [] }
var combinedOutput = new
{
analysis = output.Analysis, // 直接透传 Agent 生成的 Schema
data = rawData ?? [] // 拼接 SQL 查询的实际结果
};
return combinedOutput.ToJson();
}
catch (Exception ex)
{
logger.LogError(ex, "执行数据分析意图失败。Database: {DbName}", dbName);
return $"[系统错误]: 查询数据库 {dbName} 时发生异常 - {ex.Message}";
}
}
private IWidget BuildWidget(VisualDecisionDto decision, IEnumerable<dynamic> data, IEnumerable<SchemaColumn> schema)
{
switch (decision.Type)
{
case WidgetType.StatsCard:
// 取第一行第一列,或者根据列名查找
var firstRow = data.First() as IDictionary<string, object>;
var value = firstRow.Values.First(); // 简单粗暴取第一个值
return new StatsCardWidget
{
Title = decision.Title,
Description = decision.Description,
Data = new StatsCardData
{
Label = decision.Title,
Value = value,
Unit = decision.Unit
}
};
case WidgetType.DataTable:
return new DataTableWidget
{
Title = decision.Title,
Description = decision.Description,
Data = data.ToDataTableData(schema)
};
case WidgetType.Chart:
var dataset = data.ToChartDataset(schema);
return new ChartWidget
{
Title = decision.Title,
Description = decision.Description,
Data = new ChartData
{
Category = decision.ChartConfig!.Category,
Dataset = dataset,
Encoding = new ChartEncoding
{
X = decision.ChartConfig.X,
Y = string.IsNullOrWhiteSpace(decision.ChartConfig.Y)
? []
: [decision.ChartConfig.Y],
SeriesName = decision.ChartConfig.Series
}
}
};
default:
throw new NotSupportedException($"不支持的 Widget 类型: {decision.Type}");
}
}
}
- AgentRunResponseEvent 事件处理,关键代码见 AgentRunResponseEvent 分支代码。
//Qjy.AICopilot.AiGatewayService/Agents/ChunkType.cs
[JsonConverter(typeof(JsonStringEnumConverter))]
public enum ChunkType
{
Error,
Text,
Intent,
FunctionCall,
FunctionResult,
Widget
}
public record ChatChunk(string Source, ChunkType Type, string Content);
//Qjy.AICopilot.AiGatewayService/Agents/ChatStreamRequest.cs
[AuthorizeRequirement("AiGateway.Chat")]
public record ChatStreamRequest(Guid SessionId, string Message) : IStreamRequest<ChatChunk>;
public class ChatStreamRequestHandler(
IDataQueryService queryService,
[FromKeyedServices(nameof(IntentWorkflow))] Workflow workflow)
: IStreamRequestHandler<ChatStreamRequest, ChatChunk>
{
public async IAsyncEnumerable<ChatChunk> Handle(ChatStreamRequest request, CancellationToken cancellationToken)
{
if (!queryService.Sessions.Any(session => session.Id == request.SessionId))
{
throw new Exception("未找到会话");
}
await using var run = await InProcessExecution.StreamAsync(workflow, request, cancellationToken: cancellationToken);
await foreach (var workflowEvent in run.WatchStreamAsync(cancellationToken))
{
switch (workflowEvent)
{
case ExecutorFailedEvent evt:
yield return new ChatChunk(evt.ExecutorId, ChunkType.Error, evt.Data.Message ?? "");
break;
case AgentRunResponseEvent evt:
switch (evt.ExecutorId)
{
case "IntentRoutingExecutor":
yield return new ChatChunk(evt.ExecutorId, ChunkType.Text, evt.Response.Text);
break;
case "DataAnalysisExecutor":
yield return new ChatChunk(evt.ExecutorId, ChunkType.Widget, evt.Response.Text);
break;
}
break;
case AgentRunUpdateEvent evt:
foreach (var evtContent in evt.Update.Contents)
{
switch (evtContent)
{
case TextContent content:
yield return new ChatChunk(evt.ExecutorId, ChunkType.Text, content.Text);
break;
case FunctionCallContent content:
var fun = new
{
content.Name,
content.Arguments
};
yield return new ChatChunk(evt.ExecutorId, ChunkType.FunctionCall, fun.ToJson());
break;
case FunctionResultContent content:
yield return new ChatChunk(evt.ExecutorId, ChunkType.FunctionResult, content.Result);
break;
}
}
break;
}
}
}
}
4. 测试
因为修改了意图数据,测试前我们继续重新生成一下数据库。
- 我们继续使用“2023年第四季度,哪个月的销售额度最高”来测试
{
"sessionId": "886fb629-6905-4edd-894f-60b82069770a",
"message": "2023年第四季度,哪个月的销售额度最高"
}
回复如下,我们可以看到
- 首先识别了意图 Analysis.ERP_Core,为数据分析意图,并且是查询 ERP_Core 数据库
- 获取到了 ERP_Core 数据库中的5张表,并从中分析出了 orders 表中有需要的数据
- 进一步获取 orders 表的结构,按查询条件生成了 sql,并且过滤了已取消的订单
- 查到了3条记录,并且结果中含有数据类型,进一步分析出需要使用 Chart Bar 来驱动 UI
- 双路输出,一路返回了驱动 UI 展示的数据,另一路正常返回了文本结果
- 最后还给出了数据来源,参考资料
```json [ { "intent": "Analysis.ERP_Core", "confidence": 0.97, "reasoning": "用户询问2023年第四季度各月的销售额度,属于对历史销售数据的统计分析需求。该问题涉及具体数值(销售额)和时间维度(月份),需查询结构化业务数据,因此应使用 Analysis.ERP_Core 数据库。", "query": "2023年第四季度各月销售额度对比" } ] ``` 我需要查询2023年第四季度(10月、11月、12月)各月的销售额度对比数据。首先,让我探索数据库中的表结构,找到与销售相关的表。 ```json { "Name": "GetTableNames", "Arguments": { "databaseName": "ERP_Core" } } ``` ``` [ { "TableName": "base_products", "Description": "商品基础信息表,存储所有 SKU 的核心数据" }, { "TableName": "stock_inventory", "Description": "实时库存表,记录各仓库的商品持有量" }, { "TableName": "customers", "Description": "客户信息表,用于分析客户分布和购买行为" }, { "TableName": "orders", "Description": "销售订单主表,记录交易发生的时间、客户和总金额" }, { "TableName": "order_items", "Description": "订单明细表,记录每一笔订单中包含的具体商品和数量" } ] ``` 我看到有`orders`表,描述为"销售订单主表,记录交易发生的时间、客户和总金额",这应该包含我需要的销售数据。让我查看这个表的详细结构。 ```json { "Name": "GetTableSchema", "Arguments": { "databaseName": "ERP_Core", "tableNames": [ "orders" ] } } ``` ``` CREATE TABLE orders ( order_id integer PRIMARY KEY, order_no character varying -- 业务订单号,格式通常为 ORD-YYYYMMDD-XXX, customer_id integer, order_date timestamp without time zone, total_amount numeric -- 订单总金额 (实际支付金额),是该订单所有明细项 subtotal 的总和, status character varying -- 订单状态: Pending(待付款), Paid(已付款), Shipped(已发货), Completed(已完成), Cancelled(已取消). 注意:统计销售业绩时通常应排除 Cancelled 状态 ); ``` 现在我了解了`orders`表的结构。我需要查询2023年第四季度(10月、11月、12月)各月的销售额度,排除已取消的订单。让我构建SQL查询来获取这些数据。 ```json { "Name": "ExecuteSqlQuery", "Arguments": { "databaseName": "ERP_Core", "sqlQuery": "SELECT EXTRACT(MONTH FROM order_date) AS month, SUM(total_amount) AS total_sales FROM orders WHERE order_date >= '2023-10-01' AND order_date < '2024-01-01' AND status != 'Cancelled' GROUP BY EXTRACT(MONTH FROM order_date) ORDER BY month;" } } ``` ``` [ { "month": 10, "total_sales": 150500.00 }, { "month": 11, "total_sales": 236000.00 }, { "month": 12, "total_sales": 32000.00 } ] ``` { "analysis": { "database": "ERP_Core", "description": "2023年第四季度各月销售额度对比数据,包含10月、11月和12月的销售总额", "metadata": [ { "name": "month", "description": "月份(10表示10月,11表示11月,12表示12月)" }, { "name": "total_sales", "description": "该月销售总额,已排除已取消的订单" } ] }, "visual_decision": { "type": "Chart", "title": "2023年第四季度各月销售额度对比", "description": "2023年第四季度各月销售额度对比数据,包含10月、11月和12月的销售总额", "chart_config": { "category": "Bar", "x": "month", "y": "total_sales" } } } ```json { "widget_type": "Chart", "Data": { "Category": "Bar", "Dataset": { "Dimensions": [ "month", "total_sales" ], "Source": [ { "month": 10, "total_sales": 150500.00 }, { "month": 11, "total_sales": 236000.00 }, { "month": 12, "total_sales": 32000.00 } ] }, "Encoding": { "X": "month", "Y": [ "total_sales" ], "SeriesName": null } }, "Title": "2023年第四季度各月销售额度对比", "Description": "2023年第四季度各月销售额度对比数据,包含10月、11月和12月的销售总额" } ``` 2023年第四季度,11月的销售额度最高,达到236,000.00元 [^1]。相较之下,10月销售额为150,500.00元,12月销售额为32,000.00元,因此11月是该季度销售表现最好的月份。 参考资料: [^1] ERP_Core 数据库查询结果:2023年第四季度各月销售额度对比数据
- 我们再来一个测试“帮我计算一下所有“已完成”和“已付款”订单的总销售额是多少”:
{
"sessionId": "886fb629-6905-4edd-894f-60b82069770a",
"message": "帮我计算一下所有“已完成”和“已付款”订单的总销售额是多少"
}
回复如下:
```json [ { "intent": "Analysis.ERP_Core", "confidence": 0.97, "reasoning": "用户要求计算特定状态订单(已完成、已付款)的总销售额,这是一个典型的业务数据统计问题,涉及销售订单的金额汇总。ERP_Core 数据库包含销售订单信息(如订单金额、状态),适合执行此类聚合查询。", "query": "所有状态为‘已完成’和‘已付款’的订单总销售额" } ] ``` 我需要找到所有状态为'已完成'和'已付款'的订单,并计算它们的总销售额。首先,让我探索数据库中的表结构,找到与订单相关的表。 ```json { "Name": "GetTableNames", "Arguments": { "databaseName": "ERP_Core" } } ``` ``` [ { "TableName": "base_products", "Description": "商品基础信息表,存储所有 SKU 的核心数据" }, { "TableName": "stock_inventory", "Description": "实时库存表,记录各仓库的商品持有量" }, { "TableName": "customers", "Description": "客户信息表,用于分析客户分布和购买行为" }, { "TableName": "orders", "Description": "销售订单主表,记录交易发生的时间、客户和总金额" }, { "TableName": "order_items", "Description": "订单明细表,记录每一笔订单中包含的具体商品和数量" } ] ``` 我看到有一个名为"orders"的表,描述为"销售订单主表,记录交易发生的时间、客户和总金额"。这很可能是我需要查询的表。让我获取这个表的详细结构。 ```json { "Name": "GetTableSchema", "Arguments": { "databaseName": "ERP_Core", "tableNames": [ "orders" ] } } ``` ``` CREATE TABLE orders ( order_id integer PRIMARY KEY, order_no character varying -- 业务订单号,格式通常为 ORD-YYYYMMDD-XXX, customer_id integer, order_date timestamp without time zone, total_amount numeric -- 订单总金额 (实际支付金额),是该订单所有明细项 subtotal 的总和, status character varying -- 订单状态: Pending(待付款), Paid(已付款), Shipped(已发货), Completed(已完成), Cancelled(已取消). 注意:统计销售业绩时通常应排除 Cancelled 状态 ); ``` 现在我了解了orders表的结构。我需要查询状态为'Completed'(已完成)和'Paid'(已付款)的订单,并计算它们的总销售额(total_amount)。让我执行这个查询。 ```json { "Name": "ExecuteSqlQuery", "Arguments": { "databaseName": "ERP_Core", "sqlQuery": "SELECT SUM(total_amount) AS total_sales FROM orders WHERE status IN ('Completed', 'Paid');" } } ``` ``` [ { "total_sales": 182500.00 } ] ``` { "analysis": { "database": "ERP_Core", "description": "状态为'已完成'和'已付款'的订单总销售额", "metadata": [ { "name": "total_sales", "description": "已完成和已付款订单的总销售额" } ] }, "visual_decision": { "type": "StatsCard", "title": "已完成和已付款订单总销售额", "description": "状态为'已完成'和'已付款'的订单总销售额", "unit": "元" } } ```json { "widget_type": "StatsCard", "Data": { "Label": "已完成和已付款订单总销售额", "Value": 182500.00, "Unit": "元" }, "Title": "已完成和已付款订单总销售额", "Description": "状态为'已完成'和'已付款'的订单总销售额" } ``` 根据系统数据库查询结果,所有状态为“已完成”和“已付款”的订单总销售额为 182,500.00 元 [^1]。 [^1]: 数据来源:ERP_Core 数据库,查询内容为状态为“已完成”和“已付款”的订单总销售额。
