From 7b2585ee739835e713c2ce8de94549bfc902384e Mon Sep 17 00:00:00 2001 From: z30057876 Date: Fri, 17 Oct 2025 10:57:37 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E5=8D=87=E7=BA=A7=E5=8C=85=E7=89=88?= =?UTF-8?q?=E6=9C=AC=EF=BC=8C=E6=9B=B4=E6=96=B0=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- design/call/api.md | 29 +- design/call/choice.md | 2 +- design/call/convert.md | 57 +--- design/call/core.md | 666 +++++++++++++++++++++++++++++++++++++++++ design/call/empty.md | 26 +- design/call/facts.md | 18 +- design/call/graph.md | 27 +- design/call/llm.md | 87 +----- design/call/slot.md | 24 +- design/call/sql.md | 15 +- design/call/suggest.md | 2 +- design/call/summary.md | 22 +- pyproject.toml | 13 +- 13 files changed, 746 insertions(+), 242 deletions(-) create mode 100644 design/call/core.md diff --git a/design/call/api.md b/design/call/api.md index aef4a5f45..aa8deaf28 100644 --- a/design/call/api.md +++ b/design/call/api.md @@ -21,28 +21,6 @@ API Call 模块是 Scheduler 框架中的核心调用工具,用于向外部 AP ```mermaid classDiagram - class CoreCall { - <> - +name: str - +description: str - +node: NodeInfo - +enable_filling: bool - +to_user: bool - +info(language) CallInfo - +instance(executor, node) Self - +exec(executor, input_data) AsyncGenerator - #_init(call_vars) DataBase - #_exec(input_data) AsyncGenerator - #_after_exec(input_data) None - #_llm(messages, streaming) AsyncGenerator - #_json(messages, schema) dict - } - - class DataBase { - <> - +model_json_schema(override, kwargs) dict - } - class API { +enable_filling: bool +url: str @@ -75,11 +53,12 @@ classDiagram +result: dict } - CoreCall <|-- API - DataBase <|-- APIInput - DataBase <|-- APIOutput API ..> APIInput : uses API ..> APIOutput : produces + + note for API "继承自CoreCall基类
详见core.md" + note for APIInput "继承自DataBase
详见core.md" + note for APIOutput "继承自DataBase
详见core.md" ``` ## 执行流程图 diff --git a/design/call/choice.md b/design/call/choice.md index 2d933435a..43496b8d1 100644 --- a/design/call/choice.md +++ b/design/call/choice.md @@ -16,7 +16,7 @@ Choice 模块是一个条件分支选择器,用于根据程序化条件判断或 模块位于 `apps/scheduler/call/choice/` 目录下,主要包含以下文件: -- [choice.py](../../apps/scheduler/call/choice/choice.py) - Choice 工具核心实现 +- [choice.py](../../apps/scheduler/call/choice/choice.py) - Choice 工具核心实现(继承自CoreCall基类,详见[core.md](core.md)) - [schema.py](../../apps/scheduler/call/choice/schema.py) - 数据结构定义 - [condition_handler.py](../../apps/scheduler/call/choice/condition_handler.py) - 条件处理器 diff --git a/design/call/convert.md b/design/call/convert.md index ab309df1f..9f246ad40 100644 --- a/design/call/convert.md +++ b/design/call/convert.md @@ -120,14 +120,6 @@ apps/scheduler/call/convert/ ```mermaid classDiagram - class CoreCall { - +input_model - +output_model - +info() - +_init() - +_exec() - } - class Convert { +text_template: str | None +data_template: str | None @@ -135,44 +127,27 @@ classDiagram +_init() +_exec() } - + class ConvertInput { +text_template: str | None +data_template: str | None +extras: dict[str, Any] } - + class ConvertOutput { +text: str +data: dict } - - class CallVars { - +language: LanguageType - +ids: CallIds - +question: str - +step_data: dict[str, ExecutorHistory] - +step_order: list[str] - +background: ExecutorBackground - +thinking: str - } - - class CallOutputChunk { - +type: CallOutputType - +content: Any - } - + class SandboxedEnvironment { +from_string() } - - Convert --> CoreCall + Convert --> ConvertInput Convert --> ConvertOutput - Convert --> CallVars - Convert --> CallOutputChunk Convert --> SandboxedEnvironment - CallOutputChunk --> CallOutputType + + note for Convert "继承自CoreCall基类
详见core.md" ``` ### 4.2 详细字段说明 @@ -192,17 +167,17 @@ classDiagram | `text` | `str` | 是 | 格式化后的文字信息 | `"当前时间:2023-01-01 12:00:00"` | | `data` | `dict` | 是 | 格式化后的结果数据 | `{"question": "你好", "time": "2023-01-01 12:00:00"}` | -#### 4.2.3 CallVars 系统变量结构 +#### 4.2.3 extras 扩展变量结构 -| 字段名 | 类型 | 必需 | 说明 | 示例值 | -|--------|------|------|------|--------| -| `thinking` | `str` | 是 | 上下文思考信息 | `"用户想了解当前时间,我需要调用时间工具获取信息。"` | -| `question` | `str` | 是 | 改写后的用户输入问题 | `"现在几点了?"` | -| `step_data` | `dict[str, ExecutorHistory]` | 是 | 历史工具的结构化数据 | `{"time_tool": ExecutorHistory(...)` | -| `step_order` | `list[str]` | 是 | 历史工具的执行顺序 | `["time_tool", "llm"]` | -| `background` | `ExecutorBackground` | 是 | 执行器的背景信息 | `ExecutorBackground(...)` | -| `ids` | `CallIds` | 是 | 调用相关的ID信息 | `CallIds(...)` | -| `language` | `LanguageType` | 是 | 当前使用的语言类型 | `LanguageType.CHINESE` | +`extras` 字段包含模板渲染所需的所有变量,来源于 CallVars 系统变量(详见[core.md](core.md#callvars-系统变量结构))。主要包括: + +| 字段名 | 说明 | +|--------|------| +| `time` | 当前时间(亚洲/上海时区) | +| `history` | 历史步骤数据字典 | +| `question` | 用户问题 | +| `background` | 背景信息 | +| `ids` | ID信息集合 | ## 5. 流程图与时序图 diff --git a/design/call/core.md b/design/call/core.md new file mode 100644 index 000000000..e0216dabf --- /dev/null +++ b/design/call/core.md @@ -0,0 +1,666 @@ +# CoreCall基类模块文档 + +## 1. 模块概述 + +CoreCall是欧拉助手框架中所有Call类的抽象基类,定义了Call工具的通用接口规范和核心执行逻辑。该模块基于Pydantic构建,提供了标准化的输入输出定义、生命周期管理、系统变量组装、历史数据访问等基础能力,确保所有具体的Call实现遵循统一的调用契约和执行流程。 + +## 2. 代码结构 + +CoreCall基类位于 `apps/scheduler/call/` 目录下: + +```text +apps/scheduler/call/ +└── core.py # CoreCall基类和DataBase基类定义 +``` + +## 3. 核心类与方法 + +### 3.1 DataBase类 + +`DataBase` 是所有Call输入输出的基类,继承自Pydantic的BaseModel。 + +#### 3.1.1 核心功能 + +DataBase提供了动态Schema填充能力,允许子类在运行时通过override参数动态调整JSON Schema定义。 + +#### 3.1.2 model_json_schema方法 + +**功能描述**:生成类的JSON Schema,并支持通过override参数动态覆盖属性定义。 + +**执行流程**: + +1. 调用父类的model_json_schema方法生成基础Schema +2. 检查是否提供了override参数 +3. 如果提供了override,遍历其中的键值对 +4. 将override中的每个属性定义覆盖到Schema的properties字段中 +5. 返回更新后的完整Schema + +**应用场景**:支持在不修改类定义的情况下,根据运行时需求调整字段的Schema描述,实现灵活的类型系统。 + +### 3.2 CoreCall类 + +`CoreCall` 是所有Call工具的抽象父类,定义了Call的完整生命周期接口。 + +### 3.3 主要属性 + +| 属性名 | 类型 | 默认值 | 描述 | +|--------|------|--------|------| +| `name` | str | - | Step的名称,在JSON Schema中被排除 | +| `description` | str | - | Step的描述信息,在JSON Schema中被排除 | +| `node` | NodeInfo \| None | - | 节点信息,包含执行环境相关配置 | +| `enable_filling` | bool | False | 是否启用自动参数填充功能 | +| `input_model` | ClassVar[type[DataBase]] | - | Call的输入数据类型模板(类变量) | +| `output_model` | ClassVar[type[DataBase]] | - | Call的输出数据类型模板(类变量) | +| `to_user` | bool | False | 是否需要将输出返回给用户 | + +**配置说明**: + +- `arbitrary_types_allowed`: 允许使用任意类型的Python对象 +- `extra="allow"`: 允许接受未在模型中声明的额外字段 + +### 3.4 主要方法 + +#### 3.4.1 __init_subclass__方法 + +**功能描述**:在子类定义时自动执行的类初始化钩子方法,用于设置子类的输入输出模型。 + +**执行时机**:当定义一个继承自CoreCall的新类时,Python解释器会自动调用此方法。 + +**执行流程**: + +1. 调用父类的__init_subclass__方法完成基础初始化 +2. 从类定义参数中提取input_model参数 +3. 从类定义参数中提取output_model参数 +4. 将这两个模型类赋值给子类的类变量 + +**设计意图**:强制所有子类在定义时必须指定输入输出模型,确保类型安全。 + +#### 3.4.2 info方法 + +**功能描述**:返回Call工具的元信息,包括名称和描述,支持国际化。 + +**方法签名**:接受一个language参数,默认为中文,返回CallInfo对象。 + +**实现要求**:这是一个抽象方法,所有子类必须实现此方法提供具体的工具信息。如果子类未实现,调用时会抛出NotImplementedError异常。 + +#### 3.4.3 _assemble_call_vars方法 + +**功能描述**:从执行器中提取和组装系统变量,构建完整的调用上下文。 + +**执行流程**: + +1. **状态检查**:验证执行器的任务状态是否存在,如不存在则记录错误日志并抛出ValueError异常 +2. **历史数据提取**:遍历任务的上下文列表(executor.task.context) +3. **构建历史字典**:以步骤ID为键,执行历史对象为值,创建history字典 +4. **记录执行顺序**:将步骤ID按顺序添加到history_order列表中 +5. **组装CallVars对象**:从执行器的不同部分提取信息,包括: + - 语言配置(language) + - 各类ID信息(task_id、executor_id、session_id等) + - 用户问题(question) + - 历史数据(step_data和step_order) + - 背景信息(background) + - 思考过程(thinking) +6. **返回组装结果**:返回完整的CallVars对象供后续使用 + +**数据来源映射**: + +- `language`: executor.task.runtime.language +- `task_id`: executor.task.metadata.id +- `executor_id`: executor.task.state.executorId +- `session_id`: executor.task.runtime.sessionId +- `user_sub`: executor.task.metadata.userSub +- `app_id`: executor.task.state.appId +- `conversation_id`: executor.task.metadata.conversationId +- `question`: executor.question +- `background`: executor.background +- `thinking`: executor.task.runtime.reasoning + +#### 3.4.4 _extract_history_variables方法 + +**功能描述**:根据路径表达式从历史数据中提取特定变量值。 + +**路径格式**:使用斜杠分隔的路径字符串,格式为 `step_id/key/to/variable`,其中第一段为步骤ID,后续段为数据键的嵌套路径。 + +**执行流程**: + +1. **路径解析**:使用斜杠分割路径字符串为段列表 +2. **路径验证**:检查路径至少包含一个段(步骤ID),否则记录错误并返回None +3. **步骤查找**:检查第一段指定的步骤ID是否存在于历史字典中,不存在则记录错误并返回None +4. **数据定位**:获取该步骤的outputData作为起始数据对象 +5. **路径遍历**:按顺序遍历剩余的路径段 + - 对于每个键,检查是否存在于当前数据对象中 + - 如果键不存在,记录错误并返回None + - 如果存在,将当前数据对象更新为该键对应的值 +6. **返回结果**:返回最终定位到的变量值 + +**错误处理**:所有错误情况都会记录日志并返回None,不会抛出异常。 + +**示例路径**: + +- `step1/result` - 获取step1的输出中的result字段 +- `weather/data/temperature` - 获取weather步骤输出的data.temperature字段 + +#### 3.4.5 instance方法 + +**功能描述**:工厂方法,用于创建Call类的实例并完成初始化。 + +**执行流程**: + +1. **实例创建**:使用类构造函数创建Call对象 +2. **基础属性设置**:从执行器中提取名称和描述信息 +3. **节点信息注入**:设置节点配置信息 +4. **额外参数应用**:通过kwargs传入其他自定义参数 +5. **输入数据初始化**:调用_set_input方法完成输入数据的准备 +6. **返回实例**:返回完全初始化的Call实例 + +**设计模式**:这是一个异步工厂方法,封装了复杂的实例化逻辑。 + +#### 3.4.6 _set_input方法 + +**功能描述**:准备Call执行所需的输入数据和系统变量。 + +**执行流程**: + +1. **LLM对象保存**:将执行器的LLM对象存储到实例变量_llm_obj中,供后续调用 +2. **系统变量组装**:调用_assemble_call_vars方法构建CallVars对象 +3. **输入数据初始化**:调用子类实现的_init方法,传入系统变量 +4. **数据序列化**:将_init返回的DataBase对象转换为字典格式 + - 使用by_alias=True选项,按字段别名序列化 + - 使用exclude_none=True选项,排除值为None的字段 +5. **保存输入数据**:将序列化后的字典赋值给self.input属性 + +**作用范围**:此方法在instance方法中被调用,确保每个Call实例都有正确的输入数据。 + +#### 3.4.7 _init方法 + +**功能描述**:子类必须实现的抽象方法,用于根据系统变量准备Call的具体输入数据。 + +**方法签名**:接受CallVars参数,返回DataBase类型的输入对象。 + +**实现要求**: + +- 所有继承CoreCall的子类必须实现此方法 +- 方法应当根据call_vars中的信息构建输入数据模型实例 +- 如果子类未实现,调用时会抛出NotImplementedError异常 + +**设计意图**:将输入数据的构建逻辑委托给子类,使不同的Call可以有不同的输入准备策略。 + +#### 3.4.8 _exec方法 + +**功能描述**:子类可以重载的执行方法,定义Call的核心业务逻辑,以流式方式返回输出。 + +**方法签名**:接受输入数据字典,返回异步生成器,生成CallOutputChunk对象。 + +**默认实现**:生成一个空内容的文本类型输出块。 + +**重载要求**: + +- 子类应当实现自己的_exec逻辑 +- 使用yield语句逐块返回执行结果 +- 每个输出块应当是CallOutputChunk类型 + +**流式设计**:支持大模型等需要流式输出的场景,提升响应速度。 + +#### 3.4.9 _after_exec方法 + +**功能描述**:在执行完成后调用的钩子方法,用于执行清理或后处理逻辑。 + +**方法签名**:接受输入数据字典,无返回值。 + +**默认实现**:空实现,不执行任何操作。 + +**重载场景**: + +- 需要记录执行日志 +- 需要释放资源 +- 需要执行状态更新 +- 需要触发后续操作 + +#### 3.4.10 exec方法 + +**功能描述**:Call的统一执行入口,编排完整的执行流程。 + +**执行流程**: + +1. **流式执行**:调用_exec方法开始执行核心逻辑 +2. **结果转发**:使用async for循环接收_exec生成的每个输出块 +3. **逐块输出**:使用yield将每个CallOutputChunk传递给调用者 +4. **后处理执行**:在所有输出块生成完毕后,调用_after_exec方法 +5. **异步等待**:使用await等待后处理完成 + +**设计模式**:这是一个模板方法,定义了Call执行的标准流程,子类通过重载_exec和_after_exec方法来定制行为。 + +#### 3.4.11 _llm方法 + +**功能描述**:提供给子类使用的便捷LLM调用接口,封装了与大语言模型的交互。 + +**方法签名**: + +- 接受消息列表(messages)和流式标志(streaming) +- 返回异步生成器,逐块生成字符串内容 + +**执行流程**: + +1. **模型调用**:调用_llm_obj.reasoning.call方法 +2. **传递参数**:将messages和streaming参数传递给底层LLM +3. **结果处理**:接收LLM返回的块对象 +4. **内容提取**:从每个块中提取content字段,如果为None则返回空字符串 +5. **流式输出**:使用yield逐块返回内容字符串 + +**使用场景**:子类在_exec方法中需要调用大模型时,可以直接使用此方法。 + +#### 3.4.12 _json方法 + +**功能描述**:提供给子类使用的结构化JSON生成接口,支持基于Schema的受约束生成。 + +**方法签名**: + +- 接受消息列表(messages)和JSON Schema(schema) +- 返回符合Schema定义的JSON字典对象 + +**执行流程**: + +1. **模型检查**:验证_llm_obj.function是否已配置 +2. **异常处理**:如果未设置函数调用模型,记录错误日志并抛出CallError异常 +3. **函数调用**:调用_llm_obj.function.call方法 +4. **参数传递**:传入消息列表和Schema定义 +5. **返回结果**:等待并返回符合Schema的JSON对象 + +**应用场景**:需要让大模型生成结构化数据,如表单填充、参数提取等。 + +## 4. 类关系图 + +以下是CoreCall模块中主要类的继承和依赖关系: + +```mermaid +classDiagram + %% 基础类定义 + class BaseModel { + <> + +model_dump() dict + +model_json_schema() dict + } + + class DataBase { + +model_json_schema(override: dict | None, **kwargs) dict + } + note for DataBase "所有Call输入输出的基类\n提供动态Schema填充能力" + + class CoreCall { + +name: str + +description: str + +node: NodeInfo | None + +enable_filling: bool + +input_model: ClassVar[type[DataBase]] + +output_model: ClassVar[type[DataBase]] + +to_user: bool + +__init_subclass__(input_model, output_model, **kwargs) None + +info(language: LanguageType) CallInfo + +instance(executor: StepExecutor, node: NodeInfo | None) Self + +exec(executor: StepExecutor, input_data: dict) AsyncGenerator + -_assemble_call_vars(executor: StepExecutor) CallVars + -_extract_history_variables(path: str, history: dict) Any + -_set_input(executor: StepExecutor) None + -_init(call_vars: CallVars) DataBase + -_exec(input_data: dict) AsyncGenerator + -_after_exec(input_data: dict) None + -_llm(messages: list, streaming: bool) AsyncGenerator + -_json(messages: list, schema: dict) dict + } + + class CallVars { + +language: LanguageType + +ids: CallIds + +question: str + +step_data: dict[str, ExecutorHistory] + +step_order: list[str] + +background: ExecutorBackground + +thinking: str + } + note for CallVars "系统变量容器\n包含执行上下文信息" + + class CallIds { + +task_id: UUID + +executor_id: str + +session_id: str | None + +user_sub: str + +app_id: UUID | None + +conversation_id: UUID | None + } + note for CallIds "ID信息集合" + + class ExecutorHistory { + +stepId: str + +outputData: dict + } + note for ExecutorHistory "步骤执行历史记录" + + class ExecutorBackground { + +num: int + +conversation: list[dict] + +facts: list[str] + } + note for ExecutorBackground "执行器背景信息" + + class CallOutputChunk { + +type: CallOutputType + +content: str | dict + } + note for CallOutputChunk "流式输出块" + + class NodeInfo { + } + note for NodeInfo "节点配置信息" + + class CallInfo { + +name: str + +description: str + } + note for CallInfo "Call元信息" + + %% 继承关系 + DataBase --|> BaseModel + CoreCall --|> BaseModel + + %% 依赖关系 + CoreCall --> DataBase : 使用input_model/output_model + CoreCall --> CallVars : 组装和使用 + CoreCall --> CallOutputChunk : 生成输出 + CoreCall --> NodeInfo : 持有节点信息 + CoreCall --> CallInfo : 返回元信息 + CallVars --> CallIds : 包含ID信息 + CallVars --> ExecutorHistory : 引用历史数据 + CallVars --> ExecutorBackground : 包含背景信息 +``` + +## 5. 生命周期流程图 + +以下是CoreCall的完整生命周期流程,从实例化到执行完成: + +```mermaid +sequenceDiagram + participant Executor as 执行器 + participant Factory as CoreCall.instance + participant Call as Call实例 + participant Init as _init方法 + participant Exec as _exec方法 + participant AfterExec as _after_exec方法 + + %% 实例化阶段 + Executor->>Factory: 调用instance方法 + Note over Factory: 创建阶段 + Factory->>Call: 创建实例(__init__) + Factory->>Call: 设置name、description、node + + Note over Factory,Call: 输入准备阶段 + Factory->>Call: 调用_set_input方法 + Call->>Call: 保存LLM对象 + Call->>Call: 调用_assemble_call_vars + Call->>Call: 组装CallVars对象 + Call->>Init: 调用_init方法(传入CallVars) + Init-->>Call: 返回DataBase输入对象 + Call->>Call: 序列化为self.input字典 + + Call-->>Factory: 返回初始化完成的实例 + Factory-->>Executor: 返回Call实例 + + %% 执行阶段 + Executor->>Call: 调用exec方法 + Note over Call,Exec: 核心执行阶段 + Call->>Exec: 调用_exec方法(传入input_data) + + loop 流式输出 + Exec->>Exec: 处理业务逻辑 + Exec-->>Call: yield CallOutputChunk + Call-->>Executor: yield CallOutputChunk + end + + Note over Call,AfterExec: 后处理阶段 + Call->>AfterExec: 调用_after_exec方法 + AfterExec->>AfterExec: 执行清理或后续操作 + AfterExec-->>Call: 完成后处理 + + Call-->>Executor: 执行完成 +``` + +## 6. 数据流转图 + +以下是系统变量和数据在CoreCall中的流转过程: + +```mermaid +flowchart TD + Start([开始:Executor调用]) --> CreateInstance[创建Call实例] + CreateInstance --> SetInput[调用_set_input] + + SetInput --> SaveLLM[保存LLM对象到_llm_obj] + SaveLLM --> AssembleVars[调用_assemble_call_vars] + + AssembleVars --> CheckState{检查Executor
状态是否存在} + CheckState -->|否| StateError[抛出ValueError异常] + CheckState -->|是| ExtractHistory[遍历task.context
构建history字典] + + ExtractHistory --> BuildOrder[构建history_order列表] + BuildOrder --> CreateCallVars[创建CallVars对象] + + CreateCallVars --> ExtractLanguage[提取language] + ExtractLanguage --> ExtractIds[提取各类ID信息] + ExtractIds --> ExtractQuestion[提取question] + ExtractQuestion --> ExtractStepData[提取step_data和step_order] + ExtractStepData --> ExtractBackground[提取background] + ExtractBackground --> ExtractThinking[提取thinking] + ExtractThinking --> ReturnCallVars[返回CallVars对象] + + ReturnCallVars --> CallInit[调用子类_init方法] + CallInit --> ReturnDataBase[返回DataBase对象] + ReturnDataBase --> Serialize[序列化为字典
by_alias=True
exclude_none=True] + Serialize --> SaveInput[保存到self.input] + + SaveInput --> ExecutorCall[Executor调用exec方法] + ExecutorCall --> CallExec[调用_exec方法] + + CallExec --> ProcessLogic[执行业务逻辑] + ProcessLogic --> YieldChunk{有更多输出块} + YieldChunk -->|是| OutputChunk[yield CallOutputChunk] + OutputChunk --> YieldChunk + YieldChunk -->|否| CallAfterExec[调用_after_exec方法] + + CallAfterExec --> CleanUp[执行清理操作] + CleanUp --> End([执行完成]) + + StateError --> ErrorEnd([异常终止]) +``` + +## 7. 历史变量提取流程图 + +以下是_extract_history_variables方法的详细执行流程: + +```mermaid +flowchart TD + Start([接收path和history参数]) --> SplitPath[使用斜杠分割路径] + SplitPath --> CheckLength{路径段数
是否至少为1} + + CheckLength -->|否| LogPathError[记录路径格式错误日志] + LogPathError --> ReturnNone1[返回None] + + CheckLength -->|是| ExtractStepId[提取第一段作为步骤ID] + ExtractStepId --> CheckStepExists{步骤ID是否
存在于history中} + + CheckStepExists -->|否| LogStepError[记录步骤不存在日志] + LogStepError --> ReturnNone2[返回None] + + CheckStepExists -->|是| GetOutputData[获取步骤的outputData] + GetOutputData --> InitData[将data初始化为outputData] + + InitData --> IterateKeys[遍历剩余路径段] + IterateKeys --> HasMoreKeys{还有更多
路径段} + + HasMoreKeys -->|否| ReturnData[返回data] + HasMoreKeys -->|是| GetNextKey[获取下一个键] + + GetNextKey --> CheckKeyExists{键是否存在
于当前data中} + + CheckKeyExists -->|否| LogKeyError[记录键不存在日志] + LogKeyError --> ReturnNone3[返回None] + + CheckKeyExists -->|是| UpdateData[更新data为该键的值] + UpdateData --> IterateKeys + + ReturnNone1 --> End([结束]) + ReturnNone2 --> End + ReturnNone3 --> End + ReturnData --> End +``` + +## 8. 辅助方法调用关系 + +以下是CoreCall提供给子类的辅助方法调用关系: + +```mermaid +graph TD + subgraph "子类实现区域" + SubInit[子类_init方法] + SubExec[子类_exec方法] + SubAfterExec[子类_after_exec方法] + end + + subgraph "CoreCall提供的辅助方法" + LLMMethod[_llm方法] + JSONMethod[_json方法] + ExtractMethod[_extract_history_variables方法] + end + + subgraph "外部资源" + LLMObj[_llm_obj.reasoning] + FunctionObj[_llm_obj.function] + HistoryData[call_vars.step_data] + end + + SubInit -.->|可以调用| ExtractMethod + SubExec -.->|可以调用| LLMMethod + SubExec -.->|可以调用| JSONMethod + SubExec -.->|可以调用| ExtractMethod + + LLMMethod -->|调用| LLMObj + JSONMethod -->|检查并调用| FunctionObj + ExtractMethod -->|访问| HistoryData + + SubInit -.->|访问| HistoryData + + style SubInit fill:#e1f5ff + style SubExec fill:#e1f5ff + style SubAfterExec fill:#e1f5ff + style LLMMethod fill:#ffe1f5 + style JSONMethod fill:#ffe1f5 + style ExtractMethod fill:#ffe1f5 +``` + +## 9. 数据结构详解 + +### 9.1 CallVars 系统变量结构 + +CallVars是CoreCall中最重要的数据结构,包含了执行Call所需的所有上下文信息。 + +| 字段名 | 类型 | 必需 | 说明 | +|--------|------|------|------| +| `language` | LanguageType | ✅ | 当前使用的语言类型(中文/英文) | +| `ids` | CallIds | ✅ | 包含任务ID、执行器ID、会话ID等标识信息 | +| `question` | str | ✅ | 改写或原始的用户问题 | +| `step_data` | dict[str, ExecutorHistory] | ✅ | 步骤执行历史的字典,键为步骤ID | +| `step_order` | list[str] | ✅ | 步骤执行的顺序列表 | +| `background` | ExecutorBackground | ✅ | 执行器的背景信息,包含对话历史和事实 | +| `thinking` | str | ✅ | AI的推理思考过程文本 | + +### 9.2 CallIds ID信息结构 + +| 字段名 | 类型 | 必需 | 说明 | +|--------|------|------|------| +| `task_id` | UUID | ✅ | 当前任务的唯一标识符 | +| `executor_id` | str | ✅ | Flow执行器的ID | +| `session_id` | str \| None | ❌ | 用户会话ID(可选) | +| `user_sub` | str | ✅ | 用户唯一标识符 | +| `app_id` | UUID \| None | ❌ | 应用ID(可选) | +| `conversation_id` | UUID \| None | ❌ | 对话ID(可选) | + +### 9.3 ExecutorHistory 历史记录结构 + +ExecutorHistory记录了每个步骤的执行结果,是提取历史变量的数据源。 + +| 字段名 | 类型 | 说明 | +|--------|------|------| +| `stepId` | str | 步骤的唯一标识符 | +| `outputData` | dict[str, Any] | 步骤的输出数据,可以是嵌套的字典结构 | + +### 9.4 ExecutorBackground 背景信息结构 + +| 字段名 | 类型 | 说明 | +|--------|------|------| +| `num` | int | 对话记录的最大保留数量 | +| `conversation` | list[dict[str, str]] | 历史对话记录列表,每条包含role和content | +| `facts` | list[str] | 当前执行器关联的背景事实信息列表 | + +### 9.5 CallOutputChunk 输出块结构 + +| 字段名 | 类型 | 说明 | +|--------|------|------| +| `type` | CallOutputType | 输出类型枚举(TEXT/DATA/ERROR) | +| `content` | str \| dict[str, Any] | 输出内容,可以是文本或结构化数据 | + +## 10. 子类实现要求 + +继承CoreCall的子类必须遵循以下规范: + +### 10.1 必需实现的方法 + +1. **info方法**:返回Call的名称和描述信息 +2. **_init方法**:根据CallVars准备输入数据 +3. **__init_subclass__参数**:在类定义时提供input_model和output_model + +### 10.2 可选重载的方法 + +1. **_exec方法**:实现核心业务逻辑 +2. **_after_exec方法**:实现后处理逻辑 + +### 10.3 可用的辅助方法 + +1. **_llm方法**:调用大语言模型 +2. **_json方法**:生成结构化JSON数据 +3. **_extract_history_variables方法**:提取历史变量 + +### 10.4 子类定义示例 + +```python +class MyCall(CoreCall, input_model=MyInput, output_model=MyOutput): + """自定义Call实现""" + + @classmethod + def info(cls, language: LanguageType = LanguageType.CHINESE) -> CallInfo: + if language == LanguageType.CHINESE: + return CallInfo(name="我的工具", description="这是一个自定义工具") + return CallInfo(name="My Tool", description="This is a custom tool") + + async def _init(self, call_vars: CallVars) -> MyInput: + # 准备输入数据 + return MyInput(field1="value1", field2="value2") + + async def _exec( + self, input_data: dict[str, Any] + ) -> AsyncGenerator[CallOutputChunk, None]: + # 执行业务逻辑 + result = "处理结果" + yield CallOutputChunk(type=CallOutputType.TEXT, content=result) +``` + +## 11. 工作原理总结 + +CoreCall基类通过以下机制实现了统一的Call执行框架: + +1. **类型系统**:使用Pydantic模型定义输入输出,提供强类型检查和序列化能力 +2. **生命周期管理**:通过instance、_set_input、_init、exec、_exec、_after_exec等方法定义了清晰的执行流程 +3. **上下文组装**:_assemble_call_vars方法从执行器中提取所有必需的上下文信息 +4. **历史数据访问**:_extract_history_variables方法提供了便捷的历史变量访问接口 +5. **LLM集成**:_llm和_json方法封装了与大语言模型的交互逻辑 +6. **流式输出**:通过异步生成器支持流式数据返回,提升响应速度 +7. **模板方法模式**:exec方法定义执行流程框架,子类通过重载特定方法定制行为 +8. **错误处理**:在关键位置提供日志记录和异常处理机制 + +通过这些设计,CoreCall确保了所有Call实现的一致性和可维护性,同时为子类提供了足够的灵活性来实现各自的业务逻辑。 diff --git a/design/call/empty.md b/design/call/empty.md index 94044672e..08a0e6275 100644 --- a/design/call/empty.md +++ b/design/call/empty.md @@ -41,45 +41,37 @@ Empty工具的数据结构非常简单,它使用DataBase作为输入和输出 ```mermaid classDiagram - class DataBase { - <> - +model_json_schema() dict - } - class Empty { +input_model: type[DataBase] +output_model: type[DataBase] +to_user: bool +enable_filling: bool +info() CallInfo - +instance() Self +_init() DataBase +_exec() AsyncGenerator - +exec() AsyncGenerator } - + class CallInfo { +name: str +description: str } - + class CallOutputChunk { +type: CallOutputType +content: str | dict } - - Empty --> DataBase : 输入 - Empty --> DataBase : 输出 + Empty --> CallInfo : 返回 Empty --> CallOutputChunk : 生成 + + note for Empty "继承自CoreCall基类
使用DataBase作为输入输出模型
详见core.md" ``` ### 数据模型说明 -- **DataBase**: 所有Call的输入基类,提供通用的数据验证和序列化功能 - **CallInfo**: 包含工具名称和描述的信息类 - **CallOutputChunk**: Call的输出块,包含类型和内容 -- **Empty**: 主要的工具类,使用DataBase作为输入和输出模型 +- **Empty**: 主要的工具类,使用DataBase作为输入和输出模型(详见[core.md](core.md)) ### 数据流转关系 @@ -285,11 +277,11 @@ graph LR ## 依赖关系 -- `CoreCall`: 基础调用框架,提供通用的Call生命周期管理 -- `DataBase`: 基础数据模型,提供通用的数据验证和序列化功能 +- `CoreCall`: 基础调用框架,提供通用的Call生命周期管理(详见[core.md](core.md)) +- `DataBase`: 基础数据模型,提供通用的数据验证和序列化功能(详见[core.md](core.md)) - `CallInfo`: 工具信息模型,包含名称和描述 - `CallOutputChunk`: 输出块模型,定义输出格式 -- `CallVars`: 系统变量模型,包含执行上下文信息 +- `CallVars`: 系统变量模型,包含执行上下文信息(详见[core.md](core.md)) ## 相关模块 diff --git a/design/call/facts.md b/design/call/facts.md index d7a19483f..32efd0653 100644 --- a/design/call/facts.md +++ b/design/call/facts.md @@ -55,33 +55,27 @@ graph TB ```mermaid classDiagram - class DataBase { - <> - } - class FactsInput { +str user_sub +list~dict~ message } - + class FactsOutput { +list~str~ facts +list~str~ domain } - + class FactsGen { +list~str~ facts } - + class DomainGen { +list~str~ keywords } - - DataBase <|-- FactsInput - DataBase <|-- FactsOutput + FactsGen -- FactsOutput : 生成 DomainGen -- FactsOutput : 生成 - + note for FactsInput "用户ID和对话消息列表" note for FactsOutput "包含提取的事实和领域标签" note for FactsGen "LLM生成的事实条目" @@ -208,7 +202,7 @@ graph TB ### 依赖组件 -- `CoreCall` - 基础调用框架 +- `CoreCall` - 基础调用框架(详见[core.md](core.md)) - `UserTagManager` - 用户标签管理服务 - `LLM` - 大语言模型服务 - `Jinja2` - 模板渲染引擎 diff --git a/design/call/graph.md b/design/call/graph.md index c634f7821..3dd7e637f 100644 --- a/design/call/graph.md +++ b/design/call/graph.md @@ -32,26 +32,6 @@ Graph Call 模块是 Scheduler 框架中的图表渲染工具,用于将 SQL 查 ```mermaid classDiagram - class CoreCall { - <> - +name: str - +description: str - +node: NodeInfo - +enable_filling: bool - +to_user: bool - +info(language) CallInfo - +instance(executor, node) Self - +exec(executor, input_data) AsyncGenerator - #_init(call_vars) DataBase - #_exec(input_data) AsyncGenerator - #_llm(messages, streaming) AsyncGenerator - } - - class DataBase { - <> - +model_json_schema(override, kwargs) dict - } - class Graph { +dataset_key: str +info(language) CallInfo @@ -92,14 +72,15 @@ classDiagram +scale_type: Literal } - CoreCall <|-- Graph - DataBase <|-- RenderInput - DataBase <|-- RenderOutput RenderOutput *-- RenderFormat RenderFormat *-- RenderAxis Graph ..> RenderInput : uses Graph ..> RenderOutput : produces Graph ..> RenderStyleResult : uses + + note for Graph "继承自CoreCall基类
详见core.md" + note for RenderInput "继承自DataBase
详见core.md" + note for RenderOutput "继承自DataBase
详见core.md" ``` ## 执行流程图 diff --git a/design/call/llm.md b/design/call/llm.md index 540cfa23f..759609363 100644 --- a/design/call/llm.md +++ b/design/call/llm.md @@ -162,63 +162,28 @@ LLM调用模块使用了两种主要的提示词模板,定义在`prompt.py`文 ```mermaid classDiagram - %% 核心数据结构说明 - class DataBase { - +model_json_schema(override: dict[str, Any] | None = None, **kwargs: Any) -> dict[str, Any] - // 所有Call输入输出的基类,提供动态填充Schema的能力 - } - class LLMInput { +message: list[dict[str, str]] // LLM工具调用的输入数据结构 } - + class LLMOutput { // LLM工具调用的输出数据结构(当前为空实现) } - - class CallVars { - +thinking: str - +question: str - +step_data: dict[str, ExecutorHistory] - +step_order: list[str] - +background: ExecutorBackground - +ids: CallIds - +language: LanguageType - // 由Executor填充的系统变量,包含调用上下文信息 - } - - class ExecutorBackground { - +num: int - +conversation: list[dict[str, str]] - +facts: list[str] - // 执行器的背景信息 - } - - class CallIds { - +task_id: uuid.UUID - +executor_id: str - +session_id: str | None - +app_id: uuid.UUID | None - +user_sub: str - +conversation_id: uuid.UUID | None - // 调用相关的ID信息 - } - + class CallOutputChunk { +type: CallOutputType +content: str | dict[str, Any] // LLM工具的流式输出数据结构 } - - LLMInput --|> DataBase - LLMOutput --|> DataBase - CallVars --> ExecutorBackground - CallVars --> CallIds + LLM --> LLMInput LLM --> LLMOutput - LLM --> CallVars LLM --> CallOutputChunk + + note for LLM "继承自CoreCall基类
详见core.md" + note for LLMInput "继承自DataBase
详见core.md" + note for LLMOutput "继承自DataBase
详见core.md" ``` ### 5.2 详细字段说明 @@ -236,36 +201,9 @@ classDiagram #### 5.2.2 CallVars 系统变量结构 -| 字段名 | 类型 | 必需 | 说明 | 示例值 | -|--------|------|------|------|--------| -| `thinking` | `str` | ✅ | 上下文思考信息,包含AI的推理过程 | `"用户想了解天气情况,我需要调用天气工具获取信息。"` | -| `question` | `str` | ✅ | 改写后的用户输入问题 | `"今天北京的天气怎么样?"` | -| `step_data` | `dict[str, ExecutorHistory]` | ✅ | 历史工具的结构化数据字典,key为工具名称 | `{"weather": ExecutorHistory(...)}` | -| `step_order` | `list[str]` | ✅ | 历史工具的执行顺序列表 | `["weather", "llm"]` | -| `background` | `ExecutorBackground` | ✅ | 执行器的背景信息对象 | `ExecutorBackground(...)` | -| `ids` | `CallIds` | ✅ | 调用相关的ID信息对象 | `CallIds(...)` | -| `language` | `LanguageType` | ✅ | 当前使用的语言类型 | `LanguageType.CHINESE` | - -#### 5.2.3 ExecutorBackground 背景信息结构 - -| 字段名 | 类型 | 必需 | 说明 | 示例值 | -|--------|------|------|------|--------| -| `num` | `int` | ✅ | 对话记录最大数量限制 | `10` | -| `conversation` | `list[dict[str, str]]` | ✅ | 历史对话记录列表,每个元素包含role和content | `[{"role": "user", "content": "你好"}, {"role": "assistant", "content": "你好!有什么可以帮助您的吗?"}]` | -| `facts` | `list[str]` | ✅ | 当前执行器的背景事实信息列表 | `["用户位于北京", "当前时间是2024年"]` | - -#### 5.2.4 CallIds ID信息结构 - -| 字段名 | 类型 | 必需 | 说明 | 示例值 | -|--------|------|------|------|--------| -| `task_id` | `uuid.UUID` | ✅ | 当前任务的唯一标识符 | `123e4567-e89b-12d3-a456-426614174000` | -| `executor_id` | `str` | ✅ | Flow执行器的ID,对应Flow ID | `"flow_001"` | -| `session_id` | `str \| None` | ❌ | 用户会话ID,可能为空 | `"session_123"` 或 `None` | -| `app_id` | `uuid.UUID \| None` | ❌ | 应用ID,可能为空 | `123e4567-e89b-12d3-a456-426614174001` 或 `None` | -| `user_sub` | `str` | ✅ | 用户唯一标识符 | `"user_12345"` | -| `conversation_id` | `uuid.UUID \| None` | ❌ | 对话ID,可能为空 | `123e4567-e89b-12d3-a456-426614174002` 或 `None` | +LLM模块使用CallVars获取执行上下文信息,详细结构请参见[core.md - CallVars系统变量结构](core.md#callvars-系统变量结构)。 -#### 5.2.5 CallOutputChunk 输出块结构 +#### 5.2.3 CallOutputChunk 输出块结构 | 字段名 | 类型 | 必需 | 说明 | 示例值 | |--------|------|------|------|--------| @@ -280,17 +218,14 @@ classDiagram ### 5.3 数据类型枚举 -#### LanguageType 语言类型 - -- `CHINESE`: 中文 -- `ENGLISH`: 英文 - #### CallOutputType 输出类型 - `TEXT`: 文本输出 - `DATA`: 数据输出 - `ERROR`: 错误输出 +关于LanguageType等其他枚举类型,请参见[core.md](core.md#数据结构详解)。 + ## 6. 调用流程图 以下是LLM调用模块的核心调用流程图,展示了从初始化到执行再到输出结果的完整过程: diff --git a/design/call/slot.md b/design/call/slot.md index 768ca0c8c..0a48e6bda 100644 --- a/design/call/slot.md +++ b/design/call/slot.md @@ -42,20 +42,15 @@ Slot工具涉及多个数据模型,它们之间的关系如下: ```mermaid classDiagram - class DataBase { - <> - +model_json_schema() dict - } - class SlotInput { +remaining_schema: dict } - + class SlotOutput { +slot_data: dict +remaining_schema: dict } - + class Slot { +data: dict +current_schema: dict @@ -69,7 +64,7 @@ classDiagram +_llm_slot_fill() tuple +_function_slot_fill() dict } - + class SlotProcessor { +_validator_cls: type +_validator: Validator @@ -79,21 +74,22 @@ classDiagram +check_json() dict +add_null_to_basic_types() dict } - - DataBase <|-- SlotInput : 继承 - DataBase <|-- SlotOutput : 继承 + Slot --> SlotInput : 输入 Slot --> SlotOutput : 输出 Slot --> SlotProcessor : 使用 SlotProcessor --> SlotInput : 生成remaining_schema SlotProcessor --> SlotOutput : 验证和转换数据 + + note for Slot "继承自CoreCall基类
详见core.md" + note for SlotInput "继承自DataBase
详见core.md" + note for SlotOutput "继承自DataBase
详见core.md" ``` ### 数据模型说明 -- **DataBase**: 所有Call的输入基类,提供通用的数据验证和序列化功能 -- **SlotInput**: 继承自DataBase,包含剩余需要填充的Schema信息 -- **SlotOutput**: 继承自DataBase,包含填充后的数据和剩余Schema +- **SlotInput**: 包含剩余需要填充的Schema信息(继承自DataBase,详见[core.md](core.md)) +- **SlotOutput**: 包含填充后的数据和剩余Schema(继承自DataBase,详见[core.md](core.md)) - **SlotProcessor**: 参数槽处理器,负责JSON Schema验证和数据转换 ### 数据流转关系 diff --git a/design/call/sql.md b/design/call/sql.md index 5e412801c..50d8ab156 100644 --- a/design/call/sql.md +++ b/design/call/sql.md @@ -33,24 +33,17 @@ SQL模块是openEuler Intelligence框架中的一个工具,用于通过将自 ```mermaid classDiagram - class DataBase { - <> - } - class SQLInput { +question: str } - + class SQLOutput { +result: list[dict[str, Any]] +sql: str } - - DataBase <|-- SQLInput : 继承 - DataBase <|-- SQLOutput : 继承 - - note for SQLInput "SQL工具的输入数据结构
包含用户查询问题" - note for SQLOutput "SQL工具的输出数据结构
包含执行结果和SQL语句" + + note for SQLInput "SQL工具的输入数据结构
包含用户查询问题
继承自DataBase(详见core.md)" + note for SQLOutput "SQL工具的输出数据结构
包含执行结果和SQL语句
继承自DataBase(详见core.md)" ``` ## 执行流程 diff --git a/design/call/suggest.md b/design/call/suggest.md index 940dc24a1..7f745e12d 100644 --- a/design/call/suggest.md +++ b/design/call/suggest.md @@ -16,7 +16,7 @@ Suggest 模块是一个智能问题推荐系统,用于在对话场景中为用 模块位于 `apps/scheduler/call/suggest/` 目录下,主要包含以下文件: -- [suggest.py](../../apps/scheduler/call/suggest/suggest.py) - 问题推荐核心实现 +- [suggest.py](../../apps/scheduler/call/suggest/suggest.py) - 问题推荐核心实现(继承自CoreCall基类,详见[core.md](core.md)) - [schema.py](../../apps/scheduler/call/suggest/schema.py) - 数据结构定义 - [prompt.py](../../apps/scheduler/call/suggest/prompt.py) - 提示词模板定义 diff --git a/design/call/summary.md b/design/call/summary.md index 9a5c1af8b..eb7f27a56 100644 --- a/design/call/summary.md +++ b/design/call/summary.md @@ -29,21 +29,16 @@ Summary工具涉及多个数据模型,它们之间的关系如下: ```mermaid classDiagram - class DataBase { - <> - +model_json_schema() dict - } - class SummaryOutput { +summary: str } - + class ExecutorBackground { +num: int +conversation: list[dict[str, str]] +facts: list[str] } - + class Summary { +context: ExecutorBackground +info() CallInfo @@ -52,18 +47,19 @@ classDiagram +_exec() AsyncGenerator +exec() AsyncGenerator } - - DataBase <|-- SummaryOutput : 继承 + Summary --> ExecutorBackground : 使用 Summary --> SummaryOutput : 输出 - Summary --> DataBase : 输入 + + note for Summary "继承自CoreCall基类
输入使用DataBase
详见core.md" + note for SummaryOutput "继承自DataBase
详见core.md" + note for ExecutorBackground "详见core.md" ``` #### 数据模型说明 -- **DataBase**: 所有Call的输入基类,提供通用的数据验证和序列化功能 -- **SummaryOutput**: 继承自DataBase,包含总结内容的输出模型 -- **ExecutorBackground**: 执行器背景信息,包含对话记录和关键事实 +- **SummaryOutput**: 包含总结内容的输出模型(继承自DataBase,详见[core.md](core.md)) +- **ExecutorBackground**: 执行器背景信息,包含对话记录和关键事实(详见[core.md](core.md)) #### 数据流转关系 diff --git a/pyproject.toml b/pyproject.toml index 6aa0bd603..bf5fbbf34 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "euler-copilot-framework" -version = "0.9.6" +version = "0.10.1" description = "EulerCopilot 后端服务" requires-python = "==3.11.6" dependencies = [ @@ -14,10 +14,10 @@ dependencies = [ "jinja2==3.1.6", "jionlp==1.5.20", "jsonschema==4.23.0", - "mcp==1.13.1", + "mcp==1.17.0", "minio==7.2.15", "ollama==0.5.3", - "openai==1.107.1", + "openai==2.3.0", "pandas==2.2.3", "pgvector==0.4.1", "pillow==10.3.0", @@ -26,12 +26,12 @@ dependencies = [ "python-magic==0.4.27", "python-multipart==0.0.20", "pyyaml==6.0.2", - "rich==14.1.0", + "rich==14.2.0", "sqlalchemy==2.0.41", "tiktoken==0.9.0", "toml==0.10.2", "uvicorn==0.34.0", - "xmltodict>=1.0.0", + "xmltodict==1.0.0", ] [[tool.uv.index]] @@ -47,7 +47,4 @@ dev = [ "pytest==8.3.5", "pytest-mock==3.14.0", "ruff==0.12.5", - "sphinx==8.2.3", - "sphinx-rtd-theme==3.0.2", - "sphinxcontrib-plantuml>=0.31", ] -- Gitee From 55d539b9779b24e11311810dc507e4bd9798975d Mon Sep 17 00:00:00 2001 From: z30057876 Date: Fri, 17 Oct 2025 10:58:03 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E4=BD=BF=E7=94=A8PostgreSQL=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/postgres.py | 45 ++++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/apps/common/postgres.py b/apps/common/postgres.py index c0d11824b..ff2d03ed4 100644 --- a/apps/common/postgres.py +++ b/apps/common/postgres.py @@ -6,7 +6,7 @@ import urllib.parse from collections.abc import AsyncGenerator from contextlib import asynccontextmanager -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine from apps.models import Base @@ -27,24 +27,45 @@ class Postgres: f"postgresql+asyncpg://{urllib.parse.quote_plus(config.postgres.user)}:" f"{urllib.parse.quote_plus(config.postgres.password)}@{config.postgres.host}:" f"{config.postgres.port}/{config.postgres.database}", + pool_size=20, # 连接池大小 + max_overflow=10, # 允许的最大溢出连接数 + pool_pre_ping=True, # 使用前检查连接是否有效 + pool_recycle=3600, # 连接回收时间(秒) + echo_pool=False, # 是否打印连接池日志 ) - self._session = async_sessionmaker(self.engine, expire_on_commit=False) logger.info("[Postgres] 创建表") async with self.engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) + async def close(self) -> None: + """关闭Postgres连接器,释放所有连接""" + logger.info("[Postgres] 关闭Postgres连接器") + await self.engine.dispose() + @asynccontextmanager async def session(self) -> AsyncGenerator[AsyncSession, None]: - """获取会话""" - async with self._session() as session: - try: - yield session - except Exception: - logger.exception("[Postgres] 会话错误") - await session.rollback() - raise - finally: - await session.close() + """ + 获取会话,每次从连接池创建新的数据库连接 + + 注意: 此方法不会自动提交,调用方需要显式调用 session.commit() + """ + # 每次调用都直接从 engine 创建新的 session + session = AsyncSession( + self.engine, + expire_on_commit=False, + autoflush=False, + autocommit=False, + ) + try: + yield session + except Exception: + logger.exception("[Postgres] 会话错误") + # 发生异常时回滚 + await session.rollback() + raise + finally: + # 确保连接被正确关闭并归还到连接池 + await session.close() postgres = Postgres() -- Gitee From 1dd9fb472a22ad4b265090dd9b64e096efaf28a4 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Fri, 17 Oct 2025 10:58:39 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/main.py | 26 ++++++++++++++++---------- apps/models/task.py | 2 +- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/apps/main.py b/apps/main.py index a33010894..9f26b4a47 100644 --- a/apps/main.py +++ b/apps/main.py @@ -1,8 +1,9 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """主程序""" -import asyncio import logging +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager import uvicorn from fastapi import FastAPI @@ -33,9 +34,22 @@ from .routers import ( ) from .scheduler.pool.pool import pool + +@asynccontextmanager +async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: + """应用生命周期管理""" + # 启动时初始化资源 + await postgres.init() + await pool.init() + yield + # 关闭时释放资源 + await postgres.close() + + # 定义FastAPI app -app = FastAPI(redoc_url=None) +app = FastAPI(redoc_url=None, lifespan=lifespan) Profiler(app) + # 定义FastAPI全局中间件 app.add_middleware( CORSMiddleware, @@ -81,15 +95,7 @@ logging.basicConfig( ) -async def init_resources() -> None: - """初始化必要资源""" - await postgres.init() - await pool.init() - # 运行 if __name__ == "__main__": - # 初始化必要资源 - asyncio.run(init_resources()) - # 启动FastAPI uvicorn.run(app, host="0.0.0.0", port=8002, log_level="info", log_config=None) diff --git a/apps/models/task.py b/apps/models/task.py index 948d87e4a..9c7a8c7ba 100644 --- a/apps/models/task.py +++ b/apps/models/task.py @@ -67,7 +67,7 @@ class Task(Base): """任务""" __tablename__ = "framework_task" - userSub: Mapped[str] = mapped_column(String(255), ForeignKey("framework_user.sub")) # noqa: N815 + userSub: Mapped[str] = mapped_column(String(255), ForeignKey("framework_user.userSub")) # noqa: N815 """用户ID""" conversationId: Mapped[uuid.UUID | None] = mapped_column( # noqa: N815 UUID(as_uuid=True), ForeignKey("framework_conversation.id"), nullable=True, -- Gitee From eae8a85b4ea4bf1dc9e71ed5248d8ade0f495c4e Mon Sep 17 00:00:00 2001 From: z30057876 Date: Fri, 17 Oct 2025 10:59:16 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E6=9B=B4=E6=96=B0Personal=20Token=E9=89=B4?= =?UTF-8?q?=E6=9D=83=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/dependency/user.py | 87 +++++++++++++++++++++++---------- apps/services/personal_token.py | 8 +-- 2 files changed, 64 insertions(+), 31 deletions(-) diff --git a/apps/dependency/user.py b/apps/dependency/user.py index 104ca4e1f..da6d2bfc8 100644 --- a/apps/dependency/user.py +++ b/apps/dependency/user.py @@ -15,69 +15,102 @@ from apps.services.user import UserManager logger = logging.getLogger(__name__) -async def verify_personal_token(request: HTTPConnection) -> None: +async def verify_session(request: HTTPConnection) -> None: """ - 验证Personal Token是否有效;作为第一层鉴权检查 + 验证Session是否已鉴权;作为第一层鉴权检查 - - 如果Authorization头不存在,抛出401 - - 如果Authorization头存在,检测是否为合法的API Key - - 合法则设置user_sub,不合法则不抛出异常(由后续依赖处理) + - 如果Authorization头不存在或不以Bearer开头,抛出401 + - 如果Bearer token以sk-开头,跳过(由verify_personal_token处理) + - 如果Bearer token不以sk-开头,则作为Session ID校验 + - 如果是合法session则设置user_sub :param request: HTTP请求 :return: """ auth_header = request.headers.get("Authorization") if not auth_header: + logger.warning("鉴权失败:缺少Authorization头") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="鉴权失败") - # 尝试验证是否为合法的Personal Token - user_sub = await PersonalTokenManager.get_user_by_personal_token(auth_header) - if user_sub is not None: - request.state.user_sub = user_sub - # 不合法时不抛出异常,由verify_session继续处理 + if not auth_header.startswith("Bearer "): + logger.warning("鉴权失败:Authorization格式错误,需要Bearer token") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="鉴权失败:需要Bearer token", + ) + token = auth_header.split(" ", 1)[1] -async def verify_session(request: HTTPConnection) -> None: + # 如果以sk-开头,说明是Personal Token,跳过由verify_personal_token处理 + if token.startswith("sk-"): + return + + # 作为Session ID校验 + session_id = token + request.state.session_id = session_id + user = await SessionManager.get_user(session_id) + if not user: + logger.warning("Session ID鉴权失败:无效的session_id=%s", session_id) + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Session ID 鉴权失败", + ) + request.state.user_sub = user + logger.info("Session鉴权成功,user_sub=%s", user) + + +async def verify_personal_token(request: HTTPConnection) -> None: """ - 验证Session是否已鉴权;作为第二层鉴权检查 + 验证Personal Token是否有效;作为第二层鉴权检查 - - 如果已经通过verify_personal_token设置了user_sub,则跳过 - - 如果Authorization不以Bearer开头,抛出401 - - 如果不是合法session,抛出401 - - 是合法session则设置user + - 如果已经通过verify_session设置了user_sub,则跳过 + - 如果Bearer token以sk-开头,则作为Personal Token校验 + - 合法则设置user_sub,不合法则抛出401 :param request: HTTP请求 :return: """ - # 如果已经通过Personal Token验证,则跳过 + # 如果已经通过Session验证,则跳过 if hasattr(request.state, "user_sub"): return auth_header = request.headers.get("Authorization") + # Authorization头格式已在verify_session中检查过 if not auth_header or not auth_header.startswith("Bearer "): + logger.warning("Personal Token鉴权失败:缺少或格式错误的Bearer token") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="Session 鉴权失败:需要Bearer token", + detail="Personal Token 鉴权失败:需要Bearer token", ) - session_id = auth_header.split(" ", 1)[1] - request.state.session_id = session_id - user = await SessionManager.get_user(session_id) - if not user: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Session ID 鉴权失败", - ) - request.state.user_sub = user + token = auth_header.split(" ", 1)[1] + + # 检查是否为Personal Token(以sk-开头) + if token.startswith("sk-"): + # 验证是否为合法的Personal Token + user_sub = await PersonalTokenManager.get_user_by_personal_token(token) + if user_sub is not None: + request.state.user_sub = user_sub + logger.info("Personal Token鉴权成功,user_sub=%s", user_sub) + else: + # Personal Token无效,抛出401 + logger.warning("Personal Token鉴权失败:无效的token") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Personal Token 鉴权失败", + ) async def verify_admin(request: HTTPConnection) -> None: """验证用户是否为管理员""" if not hasattr(request.state, "user_sub"): + logger.warning("管理员鉴权失败:用户未登录") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户未登录") user_sub = request.state.user_sub user = await UserManager.get_user(user_sub) request.state.user = user if not user: + logger.warning("管理员鉴权失败:用户不存在,user_sub=%s", user_sub) raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户不存在") if user.userSub not in config.login.admin_user: + logger.warning("管理员鉴权失败:用户无管理员权限,user_sub=%s", user_sub) raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户无权限") diff --git a/apps/services/personal_token.py b/apps/services/personal_token.py index 07db03310..d72be9854 100644 --- a/apps/services/personal_token.py +++ b/apps/services/personal_token.py @@ -1,7 +1,6 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """API Key管理""" -import hashlib import logging import uuid @@ -46,14 +45,15 @@ class PersonalTokenManager: :param user_sub: 用户ID :return: 更新后的Personal Token """ - personal_token = hashlib.sha256(str(uuid.uuid4().hex).encode()).hexdigest()[:16] + personal_token = uuid.uuid4().hex + personal_token_with_prefix = f"sk-{personal_token}" try: async with postgres.session() as session: await session.execute( - update(User).where(User.id == user_sub).values(personal_token=personal_token), + update(User).where(User.id == user_sub).values(personal_token=personal_token_with_prefix), ) await session.commit() except Exception: logger.exception("[PersonalTokenManager] 更新Personal Token失败") return None - return personal_token + return personal_token_with_prefix -- Gitee