From 02a73e2f7963bf45c13c0490a86f54a19ecd05f2 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 13 Oct 2025 20:28:38 +0800 Subject: [PATCH 1/8] =?UTF-8?q?=E6=9B=B4=E6=96=B0flowexecutor=E5=92=8Ccomm?= =?UTF-8?q?ent=E6=A8=A1=E5=9D=97=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- design/services/comment.md | 819 +++++++++++++++++++++++++++++++++++++ design/services/flow.md | 14 +- 2 files changed, 821 insertions(+), 12 deletions(-) create mode 100644 design/services/comment.md diff --git a/design/services/comment.md b/design/services/comment.md new file mode 100644 index 000000000..fd6510cf3 --- /dev/null +++ b/design/services/comment.md @@ -0,0 +1,819 @@ +# Comment Module Documentation + +## 模块概述 + +评论模块(Comment Module)负责处理用户对问答记录(Record)的反馈功能,包括点赞(Like)、点踩(Dislike)以及详细的反馈意见收集。该模块是系统用户体验反馈机制的核心组件。 + +## 目录结构 + +```text +apps/ +├── routers/ +│ └── comment.py # FastAPI路由层,处理HTTP请求 +├── services/ +│ └── comment.py # 业务逻辑层,评论管理器 +├── models/ +│ └── comment.py # 数据库模型定义 +└── schemas/ + ├── comment.py # 请求数据模型 + └── record.py # 记录相关数据模型(包含评论) +``` + +## 核心组件 + +### 1. 数据模型层 (models/comment.py) + +#### Comment Table Schema + +| 字段名 | 类型 | 说明 | 约束 | +|--------|------|------|------| +| id | BigInteger | 主键ID | Primary Key, Auto Increment | +| recordId | UUID | 问答对ID | Foreign Key → framework_record.id, Indexed | +| userSub | String(50) | 用户标识 | Foreign Key → framework_user.userSub | +| commentType | Enum(CommentType) | 评论类型 | Not Null | +| feedbackType | ARRAY(String) | 投诉类别列表 | Not Null | +| feedbackLink | String(1000) | 投诉相关链接 | Not Null | +| feedbackContent | String(1000) | 投诉详细内容 | Not Null | +| createdAt | DateTime(TZ) | 创建时间 | Not Null, Default: UTC Now | + +#### CommentType Enum + +```python +class CommentType(str, PyEnum): + LIKE = "liked" # 点赞 + DISLIKE = "disliked" # 点踩 + NONE = "none" # 无评论 +``` + +### 2. 业务逻辑层 (services/comment.py) + +#### CommentManager + +提供评论的核心业务逻辑操作: + +- **query_comment(record_id: str)**: 根据问答ID查询评论 +- **update_comment(record_id: str, data: RecordComment, user_sub: str)**: 创建或更新评论 + +### 3. 路由层 (routers/comment.py) + +#### API Endpoint + +- **POST /api/comment**: 添加或更新评论 + - 认证要求: Session验证 + Personal Token验证 + - 请求体: AddCommentData + - 响应: ResponseData + +### 4. 数据传输对象 (schemas) + +#### AddCommentData (请求模型) + +```python +{ + "record_id": str, # 问答记录ID + "comment": CommentType, # 评论类型 (liked/disliked/none) + "dislike_reason": str, # 点踩原因 (分号分隔, max 200字符) + "reason_link": str, # 相关链接 (max 200字符) + "reason_description": str # 详细描述 (max 500字符) +} +``` + +#### RecordComment (内部数据模型) + +```python +{ + "comment": CommentType, # 评论类型 + "feedback_type": list[str], # 反馈类型列表 (别名: dislike_reason) + "feedback_link": str, # 反馈链接 (别名: reason_link) + "feedback_content": str, # 反馈内容 (别名: reason_description) + "feedback_time": float # 反馈时间戳 +} +``` + +## 架构设计 + +### 系统架构图 + +```mermaid +graph TB + subgraph "Client Layer" + Client[前端客户端] + end + + subgraph "API Layer" + Router[FastAPI Router
routers/comment.py] + Auth1[Session验证] + Auth2[Token验证] + end + + subgraph "Service Layer" + Manager[CommentManager
services/comment.py] + end + + subgraph "Data Layer" + Model[Comment Model
models/comment.py] + DB[(PostgreSQL Database)] + end + + subgraph "Schema Layer" + Schema1[AddCommentData] + Schema2[RecordComment] + end + + Client -->|HTTP POST| Router + Router -->|依赖注入| Auth1 + Router -->|依赖注入| Auth2 + Router -->|数据验证| Schema1 + Router -->|调用业务逻辑| Manager + Manager -->|数据转换| Schema2 + Manager -->|ORM操作| Model + Model -->|SQLAlchemy| DB +``` + +### 数据流程图 + +```mermaid +flowchart TD + Start([用户提交评论]) --> Validate{数据验证} + Validate -->|验证失败| Error1[返回400错误] + Validate -->|验证成功| Auth{身份认证} + + Auth -->|认证失败| Error2[返回401错误] + Auth -->|认证成功| ParseData[解析dislike_reason
分号分隔转列表] + + ParseData --> CreateDTO[创建RecordComment对象] + CreateDTO --> QueryDB{查询数据库
记录是否存在?} + + QueryDB -->|存在| Update[更新现有记录
commentType
feedbackType
feedbackLink
feedbackContent] + QueryDB -->|不存在| Create[创建新记录
包含recordId
userSub等字段] + + Update --> Commit[提交事务] + Create --> Merge[Merge操作] + Merge --> Commit + + Commit --> Success{提交成功?} + Success -->|失败| Error3[返回400错误] + Success -->|成功| Return[返回200 OK] + + Error1 --> End([结束]) + Error2 --> End + Error3 --> End + Return --> End +``` + +## 时序图 + +### 添加评论完整流程 + +```mermaid +sequenceDiagram + actor User as 用户 + participant Client as 前端客户端 + participant Router as FastAPI Router + participant AuthMiddleware as 认证中间件 + participant Manager as CommentManager + participant Session as DB Session + participant DB as PostgreSQL + + User->>Client: 点赞/点踩并提交反馈 + Client->>Router: POST /api/comment
{record_id, comment, ...} + + activate Router + Router->>AuthMiddleware: verify_session() + activate AuthMiddleware + AuthMiddleware-->>Router: session验证结果 + deactivate AuthMiddleware + + Router->>AuthMiddleware: verify_personal_token() + activate AuthMiddleware + AuthMiddleware-->>Router: token验证结果 + deactivate AuthMiddleware + + Router->>Router: 解析dislike_reason
分号分隔 → list + Router->>Router: 创建RecordComment对象
设置feedback_time + + Router->>Manager: update_comment(record_id, data, user_sub) + activate Manager + + Manager->>Session: 创建异步会话 + activate Session + + Manager->>DB: SELECT * FROM framework_comment
WHERE recordId = ? + activate DB + DB-->>Manager: 查询结果 + deactivate DB + + alt 记录存在 + Manager->>Manager: 更新现有对象字段
commentType, feedbackType等 + else 记录不存在 + Manager->>Manager: 创建新Comment对象 + Manager->>Session: merge(comment_info) + end + + Manager->>Session: commit() + Session->>DB: 提交事务 + activate DB + DB-->>Session: 提交成功 + deactivate DB + + Session-->>Manager: 完成 + deactivate Session + Manager-->>Router: None (成功) + deactivate Manager + + Router->>Client: 200 OK
{code: 200, message: "success"} + deactivate Router + Client->>User: 显示反馈成功 +``` + +### 查询评论流程 + +```mermaid +sequenceDiagram + participant Service as 业务服务 + participant Manager as CommentManager + participant Session as DB Session + participant DB as PostgreSQL + + Service->>Manager: query_comment(record_id) + activate Manager + + Manager->>Session: 创建异步会话 + activate Session + + Manager->>DB: SELECT * FROM framework_comment
WHERE recordId = UUID(record_id) + activate DB + DB-->>Manager: 查询结果 + deactivate DB + + alt 记录存在 + Manager->>Manager: 构建RecordComment对象
映射字段名称
转换时间戳 + Manager-->>Service: RecordComment对象 + else 记录不存在 + Manager-->>Service: None + end + + deactivate Session + deactivate Manager +``` + +## 状态图 + +### 评论状态转换 + +```mermaid +stateDiagram-v2 + [*] --> NONE: 创建问答记录 + + NONE --> LIKE: 用户点赞 + NONE --> DISLIKE: 用户点踩 + + LIKE --> DISLIKE: 改为点踩 + LIKE --> NONE: 取消点赞 + + DISLIKE --> LIKE: 改为点赞 + DISLIKE --> NONE: 取消点踩 + + NONE --> [*] + LIKE --> [*] + DISLIKE --> [*] + + note right of DISLIKE + 点踩时需要提供: + - feedbackType (类别列表) + - feedbackLink (相关链接) + - feedbackContent (详细说明) + end note +``` + +## 类图 + +```mermaid +classDiagram + class Comment { + +int id + +UUID recordId + +str userSub + +CommentType commentType + +list~str~ feedbackType + +str feedbackLink + +str feedbackContent + +datetime createdAt + } + + class CommentType { + <> + LIKE + DISLIKE + NONE + } + + class AddCommentData { + +str record_id + +CommentType comment + +str dislike_reason + +str reason_link + +str reason_description + +model_validate() + } + + class RecordComment { + +CommentType comment + +list~str~ feedback_type + +str feedback_link + +str feedback_content + +float feedback_time + } + + class CommentManager { + +query_comment(record_id)$ RecordComment|None + +update_comment(record_id, data, user_sub)$ None + } + + class Router { + +add_comment(request, post_body) JSONResponse + } + + Comment --> CommentType : uses + AddCommentData --> CommentType : uses + RecordComment --> CommentType : uses + Router --> AddCommentData : validates + Router --> CommentManager : calls + CommentManager --> RecordComment : transforms + CommentManager --> Comment : operates +``` + +## 数据库ER图 + +```mermaid +erDiagram + FRAMEWORK_USER ||--o{ FRAMEWORK_COMMENT : creates + FRAMEWORK_RECORD ||--o| FRAMEWORK_COMMENT : has + + FRAMEWORK_USER { + string userSub PK + string userName + datetime createdAt + } + + FRAMEWORK_RECORD { + uuid id PK + uuid conversationId + string userSub FK + string content + datetime createdAt + } + + FRAMEWORK_COMMENT { + bigint id PK + uuid recordId FK + string userSub FK + enum commentType + array feedbackType + string feedbackLink + string feedbackContent + datetime createdAt + } +``` + +## 核心业务逻辑 + +### 1. 评论创建/更新逻辑 + +评论模块采用幂等性设计,支持对同一问答记录进行多次评论更新。核心流程如下: + +#### 步骤一:查询现有评论 + +系统首先根据问答记录ID查询数据库中是否已存在对应的评论记录。 + +#### 步骤二:判断操作类型 + +- 如果记录已存在,则执行更新操作,修改现有记录的评论类型、反馈类型、反馈链接和反馈内容 +- 如果记录不存在,则创建新的评论记录,包含问答记录ID、用户标识、评论类型等完整信息 + +#### 步骤三:数据持久化 + +使用数据库事务确保数据一致性,通过SQLAlchemy的merge操作实现UPSERT语义,自动处理插入或更新逻辑。 + +### 2. 数据转换逻辑 + +#### API → Service 层转换 + +路由层接收到前端请求后,需要进行数据格式转换: + +- 将分号分隔的字符串格式的点踩原因转换为数组格式 +- 将API字段名映射为内部数据模型字段名 +- 生成当前时间戳作为反馈时间 + +#### Service → Model 层映射 + +业务逻辑层将处理后的数据映射到数据库模型: + +- 评论类型字段直接映射 +- 反馈类型列表映射为PostgreSQL数组类型 +- 反馈链接和反馈内容直接映射 +- 用户标识和记录ID保持原有格式 + +#### Model → Schema 层查询映射 + +查询操作时将数据库模型数据转换为API响应格式: + +- 数据库字段名转换为前端友好的字段名 +- 时间戳格式转换,将数据库的datetime对象转换为Unix时间戳 +- 数组类型数据保持原有格式返回给前端 + +## 接口文档 + +### POST /api/comment + +添加或更新评论 + +#### 认证要求 + +- Session验证 (verify_session) +- Personal Token验证 (verify_personal_token) + +#### 请求 + +**Headers:** + +```http +Content-Type: application/json +Authorization: Bearer +Cookie: session= +``` + +**Body:** + +```json +{ + "record_id": "550e8400-e29b-41d4-a716-446655440000", + "comment": "disliked", + "dislike_reason": "答非所问;信息不准确;", + "reason_link": "https://example.com/issue/123", + "reason_description": "回答内容与问题不符,建议补充相关文档引用。" +} +``` + +**字段说明:** + +| 字段 | 类型 | 必填 | 说明 | 限制 | +|------|------|------|------|------| +| record_id | string | 是 | 问答记录UUID | UUID格式 | +| comment | string | 是 | 评论类型 | "liked", "disliked", "none" | +| dislike_reason | string | 否 | 点踩原因 | 分号分隔,最长200字符 | +| reason_link | string | 否 | 相关链接 | 最长200字符 | +| reason_description | string | 否 | 详细描述 | 最长500字符 | + +#### 响应 + +**成功 (200 OK):** + +```json +{ + "code": 200, + "message": "success", + "result": {} +} +``` + +**失败 (400 Bad Request):** + +```json +{ + "code": 400, + "message": "record_id not found", + "result": {} +} +``` + +**失败 (401 Unauthorized):** + +```json +{ + "code": 401, + "message": "Authentication failed" +} +``` + +#### 示例 + +**cURL:** + +```bash +curl -X POST "http://localhost:8000/api/comment" \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer eyJhbGc..." \ + --cookie "session=abc123..." \ + -d '{ + "record_id": "550e8400-e29b-41d4-a716-446655440000", + "comment": "liked", + "dislike_reason": "", + "reason_link": "", + "reason_description": "" + }' +``` + +**Python:** + +```python +import requests + +response = requests.post( + "http://localhost:8000/api/comment", + json={ + "record_id": "550e8400-e29b-41d4-a716-446655440000", + "comment": "disliked", + "dislike_reason": "答非所问;信息不准确;", + "reason_link": "https://example.com/issue/123", + "reason_description": "回答内容与问题不符" + }, + headers={"Authorization": "Bearer "}, + cookies={"session": ""} +) +``` + +## 关键特性 + +### 1. 幂等性设计 + +- 同一个 record_id 的评论支持多次更新 +- 使用 SQLAlchemy 的 `merge` 操作实现 UPSERT 语义 +- 自动识别创建或更新操作 + +### 2. 数据完整性 + +- recordId 外键约束 → framework_record.id +- userSub 外键约束 → framework_user.userSub +- 索引优化:recordId 字段建立索引提高查询性能 + +### 3. 字段别名映射 + +使用 Pydantic 的 `Field(alias=...)` 实现前端友好的字段命名: + +| 内部字段 | API别名 | +|----------|---------| +| feedback_type | dislike_reason | +| feedback_link | reason_link | +| feedback_content | reason_description | + +### 4. 时间处理 + +- 数据库存储:`datetime` 对象,带时区(UTC) +- API传输:`float` 类型的 Unix 时间戳(秒,保留3位小数) + +### 5. 数组字段处理 + +- API输入:分号分隔的字符串 `"reason1;reason2;"` +- 数据转换:`split(";")[:-1]` → `["reason1", "reason2"]` +- 数据库存储:PostgreSQL ARRAY 类型 + +## 错误处理 + +### 常见错误场景 + +| 错误场景 | HTTP状态码 | 处理方式 | +|----------|-----------|----------| +| Session验证失败 | 401 | 依赖注入层拦截 | +| Token验证失败 | 401 | 依赖注入层拦截 | +| record_id不存在 | 400 | 业务逻辑层返回None | +| UUID格式错误 | 400 | Pydantic验证失败 | +| 字段长度超限 | 422 | Pydantic验证失败 | +| 数据库连接失败 | 500 | 异常传播至错误处理中间件 | +| 外键约束违反 | 500 | 数据库异常 | + +### 异常传播链 + +```mermaid +flowchart LR + A[Router Layer] --> B{验证异常?} + B -->|是| C[422 Unprocessable Entity] + B -->|否| D[Service Layer] + D --> E{业务异常?} + E -->|是| F[400 Bad Request] + E -->|否| G[Database Layer] + G --> H{DB异常?} + H -->|是| I[500 Internal Server Error] + H -->|否| J[200 OK] +``` + +## 性能优化 + +### 1. 数据库索引 + +```sql +CREATE INDEX idx_comment_record_id ON framework_comment(recordId); +``` + +- 优化基于 recordId 的查询性能 +- 支持快速定位单个记录的评论 + +### 2. 异步IO + +- 使用 SQLAlchemy 异步会话:`async with postgres.session()` +- 非阻塞数据库操作 +- 提高并发处理能力 + +### 3. 连接池管理 + +- PostgreSQL 连接池复用 +- 避免频繁建立/关闭连接的开销 + +### 4. 查询优化 + +```python +# 使用 one_or_none() 替代 all()[0] +result = (await session.scalars( + select(Comment).where(Comment.recordId == uuid.UUID(record_id)) +)).one_or_none() +``` + +- 提前终止查询(最多返回1条) +- 减少数据传输量 + +## 安全考虑 + +### 1. 认证与授权 + +- **双重认证**:Session + Personal Token +- **用户隔离**:userSub 关联确保数据隔离 +- **依赖注入**:在路由层统一进行身份验证 + +### 2. 输入验证 + +- **字段长度限制**: + - dislike_reason: 200字符 + - reason_link: 200字符 + - reason_description: 500字符 +- **类型校验**:Pydantic自动验证数据类型 +- **枚举约束**:CommentType限定为三个固定值 + +### 3. SQL注入防护 + +- 使用 SQLAlchemy ORM +- 参数化查询 +- 无原生SQL拼接 + +### 4. 数据完整性 + +- 外键约束防止孤立记录 +- 事务保证原子性操作 +- NOT NULL约束防止空值 + +## 扩展性设计 + +### 1. 评论类型扩展 + +当前支持三种类型,如需扩展: + +```python +class CommentType(str, PyEnum): + LIKE = "liked" + DISLIKE = "disliked" + NONE = "none" + # 未来扩展 + REPORT = "reported" # 举报 + FAVORITE = "favorited" # 收藏 +``` + +### 2. 反馈类型扩展 + +feedbackType 使用数组类型,支持多选和动态扩展: + +```python +# 前端定义的反馈类型选项 +FEEDBACK_OPTIONS = [ + "答非所问", + "信息不准确", + "内容不完整", + "格式混乱", + "其他问题" +] +``` + +### 3. 多语言支持 + +在 RecordComment 中添加语言字段: + +```python +class RecordComment(BaseModel): + comment: CommentType + language: str = "zh-CN" # 新增字段 + # ... 其他字段 +``` + +### 4. 统计分析扩展 + +可基于现有数据进行扩展: + +- 点赞率统计 +- 点踩原因分布 +- 时间趋势分析 +- 用户反馈热点 + +## 部署注意事项 + +### 1. 数据库迁移 + +使用 Alembic 进行数据库版本管理: + +```bash +# 生成迁移脚本 +alembic revision --autogenerate -m "create comment table" + +# 执行迁移 +alembic upgrade head +``` + +### 2. 环境变量配置 + +```bash +# PostgreSQL 连接配置 +DATABASE_URL=postgresql+asyncpg://user:pass@host:5432/dbname + +# 日志级别 +LOG_LEVEL=INFO +``` + +### 3. 性能调优 + +```python +# 连接池配置 +pool_size = 20 # 连接池大小 +max_overflow = 10 # 最大溢出连接数 +pool_timeout = 30 # 连接超时时间 +pool_recycle = 3600 # 连接回收时间 +``` + +## 未来优化方向 + +### 1. 缓存策略 + +- Redis 缓存热点评论数据 +- 减少数据库查询压力 +- 设置合理的缓存过期时间 + +### 2. 批量操作支持 + +- 支持批量查询评论 +- 支持批量更新评论 +- 减少网络往返次数 + +### 3. 评论审核机制 + +- 敏感词过滤 +- 内容审核工作流 +- 人工复审接口 + +### 4. 数据分析增强 + +- 实时统计面板 +- 评论情感分析 +- 用户行为画像 + +## 常见问题 (FAQ) + +### Q1: 为什么使用 merge 而不是 add? + +**A:** `merge` 操作实现了 UPSERT 语义,可以自动判断是插入还是更新,简化了业务逻辑。 + +### Q2: dislike_reason 为什么使用分号分隔? + +**A:** 前端传递多个原因时使用分号分隔的字符串格式,后端进行split转换为数组存储到数据库。 + +### Q3: 为什么 update_comment 返回 None? + +**A:** 当前设计中更新操作无需返回值,失败时通过异常处理。未来可以改为返回布尔值或更新后的对象。 + +### Q4: 如何防止恶意刷评论? + +**A:** + +- 身份认证确保用户真实性 +- 可增加频率限制(Rate Limiting) +- 可增加同一用户对同一记录的评论次数限制 + +### Q5: 评论数据如何归档? + +**A:** + +- 可按时间分区存储历史数据 +- 定期将旧数据迁移到冷存储 +- 保留索引供查询分析使用 + +## 参考资料 + +- [FastAPI Documentation](https://fastapi.tiangolo.com/) +- [SQLAlchemy Async Documentation](https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html) +- [Pydantic V2 Documentation](https://docs.pydantic.dev/latest/) +- [PostgreSQL ARRAY Types](https://www.postgresql.org/docs/current/arrays.html) + +## 版本历史 + +| 版本 | 日期 | 变更说明 | +|------|------|----------| +| 1.0.0 | 2025-10-13 | 初始版本,完成基础评论功能 | + +--- + +**文档维护者**: Euler Copilot Framework Team +**最后更新**: 2025-10-13 diff --git a/design/services/flow.md b/design/services/flow.md index f682d7853..89dcf2693 100644 --- a/design/services/flow.md +++ b/design/services/flow.md @@ -163,7 +163,6 @@ sequenceDiagram API->>FSM: validate_flow_illegal() FSM->>FSM: 验证节点ID唯一性 FSM->>FSM: 验证边的合法性 - FSM->>FSM: 验证起始/终止节点 FSM-->>API: 验证通过 API->>FSM: validate_flow_connectivity() FSM->>FSM: BFS 检查连通性 @@ -234,13 +233,7 @@ flowchart TD CheckSelfLoop -->|是| Error4[抛出异常: 起止节点相同] CheckSelfLoop -->|否| CheckBranch{分支合法?} CheckBranch -->|否| Error5[抛出异常: 分支非法] - CheckBranch -->|是| CalcDegree[计算入度/出度] - - CalcDegree --> CheckStartDeg{起始节点入度=0?} - CheckStartDeg -->|否| Error6[抛出异常: 起始节点有入边] - CheckStartDeg -->|是| CheckEndDeg{终止节点出度=0?} - CheckEndDeg -->|否| Error7[抛出异常: 终止节点有出边] - CheckEndDeg -->|是| ValidateConn[验证连通性] + CheckBranch -->|是| ValidateConn[验证连通性] ValidateConn --> BFS[BFS遍历图] BFS --> CheckReachable{所有节点可达?} @@ -259,8 +252,6 @@ flowchart TD Error3 --> End Error4 --> End Error5 --> End - Error6 --> End - Error7 --> End Success --> End ``` @@ -473,7 +464,6 @@ Authorization: Bearer | `validate_flow_connectivity` | 验证工作流连通性(BFS) | - | | `_validate_node_ids` | 验证节点ID唯一性 | `FlowNodeValidationError` | | `_validate_edges` | 验证边的合法性 | `FlowEdgeValidationError` | -| `_validate_node_degrees` | 验证起始/终止节点的度数 | `FlowNodeValidationError` | ### 5.3 FlowLoader @@ -588,7 +578,7 @@ flowchart LR | 异常类 | 触发条件 | 处理方式 | |--------|----------|----------| -| `FlowNodeValidationError` | 节点ID重复、起始/终止节点不存在、度数错误 | 返回400错误 | +| `FlowNodeValidationError` | 节点ID重复、起始/终止节点不存在 | 返回400错误 | | `FlowEdgeValidationError` | 边ID重复、自环、分支非法 | 返回400错误 | | `FlowBranchValidationError` | 分支字段缺失/为空、分支重复、非法字符 | 返回400错误 | | `ValueError` | 应用不存在、工作流不存在、配置错误 | 返回404/500错误 | -- Gitee From 3a4c4b80fe2e4e662cae3a124d559c31de16ec87 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 13 Oct 2025 20:29:03 +0800 Subject: [PATCH 2/8] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E6=89=8B=E6=9C=BA?= =?UTF-8?q?=E5=8F=B7=E9=AA=8C=E8=AF=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/oidc_provider/openeuler.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/apps/common/oidc_provider/openeuler.py b/apps/common/oidc_provider/openeuler.py index 5d85b30c4..8adfe09f7 100644 --- a/apps/common/oidc_provider/openeuler.py +++ b/apps/common/oidc_provider/openeuler.py @@ -87,10 +87,6 @@ class OpenEulerOIDCProvider(OIDCProviderBase): logger.info("[OpenEuler] 获取OIDC用户成功: %s", resp.text) result = resp.json() - if not result["phone_number_verified"]: - err = "Could not validate credentials." - raise RuntimeError(err) - return { "user_sub": result["sub"], } -- Gitee From 11ce37e8c7dfbb5dd8a19b4dfbb1b7dcdaca78fa Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 13 Oct 2025 20:29:38 +0800 Subject: [PATCH 3/8] =?UTF-8?q?=E5=88=A0=E9=99=A4=E8=84=9A=E6=B3=A8?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E7=8B=AC=E7=AB=8B=E8=A1=A8=EF=BC=8C=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E5=85=B6=E4=BB=96=E6=96=B9=E6=B3=95=E5=A4=84=E7=90=86?= =?UTF-8?q?=E8=84=9A=E6=B3=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/models/__init__.py | 4 +--- apps/models/record.py | 31 +------------------------------ 2 files changed, 2 insertions(+), 33 deletions(-) diff --git a/apps/models/__init__.py b/apps/models/__init__.py index 8db4b8394..98bd73674 100644 --- a/apps/models/__init__.py +++ b/apps/models/__init__.py @@ -11,7 +11,7 @@ from .flow import Flow from .llm import LLMData, LLMProvider, LLMType from .mcp import MCPActivated, MCPInfo, MCPInstallStatus, MCPTools, MCPType from .node import NodeInfo -from .record import FootNoteType, Record, RecordFootNote, RecordMetadata +from .record import Record, RecordMetadata from .service import Service, ServiceACL, ServiceHashes from .session import Session, SessionActivity, SessionType from .tag import Tag @@ -45,7 +45,6 @@ __all__ = [ "ExecutorHistory", "ExecutorStatus", "Flow", - "FootNoteType", "LLMData", "LLMProvider", "LLMType", @@ -58,7 +57,6 @@ __all__ = [ "NodeInfo", "PermissionType", "Record", - "RecordFootNote", "RecordMetadata", "Service", "ServiceACL", diff --git a/apps/models/record.py b/apps/models/record.py index 0645fde9e..c99d8ba64 100644 --- a/apps/models/record.py +++ b/apps/models/record.py @@ -3,10 +3,9 @@ import uuid from datetime import UTC, datetime -from enum import Enum as PyEnum from typing import Any -from sqlalchemy import BigInteger, DateTime, Enum, Float, ForeignKey, Integer, String, Text +from sqlalchemy import DateTime, Float, ForeignKey, Integer, String, Text from sqlalchemy.dialects.postgresql import JSONB, UUID from sqlalchemy.orm import Mapped, mapped_column @@ -57,31 +56,3 @@ class RecordMetadata(Base): """问答对输出token数""" featureSwitch: Mapped[dict[str, Any]] = mapped_column(JSONB, default_factory=dict, nullable=False) # noqa: N815 """问答对功能开关""" - - -class FootNoteType(str, PyEnum): - """脚注类型""" - - RAG = "rag" - FRAMEWORK = "framework" - WEB = "web" - - -class RecordFootNote(Base): - """问答对脚注""" - - __tablename__ = "framework_record_foot_note" - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True, init=False) - """主键ID""" - recordId: Mapped[uuid.UUID] = mapped_column( # noqa: N815 - UUID(as_uuid=True), ForeignKey("framework_record.id"), nullable=False, - ) - """问答对ID""" - releatedId: Mapped[str] = mapped_column(String(64), default="", nullable=False) # noqa: N815 - """脚注数字""" - insertPosition: Mapped[int] = mapped_column(Integer, default=0, nullable=False) # noqa: N815 - """插入位置""" - footSource: Mapped[str] = mapped_column(String(4096), default="", nullable=False) # noqa: N815 - """脚注来源""" - footType: Mapped[FootNoteType] = mapped_column(Enum(FootNoteType), default=FootNoteType.RAG, nullable=False) # noqa: N815 - """脚注类型""" -- Gitee From 20b8e7bf6539f5580aa6e87540d4741091553905 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 13 Oct 2025 20:30:01 +0800 Subject: [PATCH 4/8] =?UTF-8?q?=E4=BF=AE=E6=AD=A3AppCenter=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/routers/appcenter.py | 109 ++++++++++++++++++++++++++------------ 1 file changed, 74 insertions(+), 35 deletions(-) diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py index 5fdff565f..304aab520 100644 --- a/apps/routers/appcenter.py +++ b/apps/routers/appcenter.py @@ -10,7 +10,8 @@ from fastapi.responses import JSONResponse from apps.dependency.user import verify_personal_token, verify_session from apps.exceptions import InstancePermissionError -from apps.models import AppType +from apps.models import AppType, PermissionType +from apps.schemas.agent import AgentAppMetadata from apps.schemas.appcenter import ( AppFlowInfo, AppMcpServiceInfo, @@ -28,6 +29,7 @@ from apps.schemas.appcenter import ( GetRecentAppListRsp, ) from apps.schemas.enum_var import AppFilterType +from apps.schemas.flow import AppMetadata from apps.schemas.response_data import ResponseData from apps.services.appcenter import AppCenterManager from apps.services.mcp_service import MCPServiceManager @@ -214,46 +216,83 @@ async def get_application(appId: Annotated[uuid.UUID, Path()]) -> JSONResponse: result={}, ).model_dump(exclude_none=True, by_alias=True), ) - workflows = [ - AppFlowInfo( - id=flow.id, - name=flow.name, - description=flow.description, - debug=flow.debug, + + # 根据 Metadata 类型组装对应的 GetAppPropertyMsg + if isinstance(app_data, AppMetadata): + # 处理工作流应用(FLOW类型) + workflows = [ + AppFlowInfo( + id=flow.id, + name=flow.name, + description=flow.description, + debug=flow.debug, + ) + for flow in app_data.flows + ] + result_msg = GetAppPropertyMsg( + appId=str(app_data.id), + appType=app_data.app_type, + published=app_data.published, + name=app_data.name, + description=app_data.description, + icon=app_data.icon, + links=app_data.links, + recommendedQuestions=app_data.first_questions, + dialogRounds=app_data.history_len, + permission=AppPermissionData( + visibility=app_data.permission.type if app_data.permission else PermissionType.PRIVATE, + authorizedUsers=app_data.permission.users if app_data.permission else [], + ), + workflows=workflows, + mcpService=[], + ) + elif isinstance(app_data, AgentAppMetadata): + # 处理智能体应用(AGENT类型) + mcp_service = [] + if app_data.mcp_service: + for service in app_data.mcp_service: + mcp_collection = await MCPServiceManager.get_mcp_service(service) + # 当 mcp_collection 为 None 时忽略当前的 MCP Server + if mcp_collection is not None: + mcp_service.append(AppMcpServiceInfo( + id=uuid.UUID(service) if isinstance(service, str) else service, + name=mcp_collection.name, + description=mcp_collection.description, + )) + result_msg = GetAppPropertyMsg( + appId=str(app_data.id), + appType=app_data.app_type, + published=app_data.published, + name=app_data.name, + description=app_data.description, + icon=app_data.icon, + links=[], + recommendedQuestions=[], + dialogRounds=app_data.history_len, + permission=AppPermissionData( + visibility=app_data.permission.type if app_data.permission else PermissionType.PRIVATE, + authorizedUsers=app_data.permission.users if app_data.permission else [], + ), + workflows=[], + mcpService=mcp_service, + ) + else: + logger.error("[AppCenter] 未知的应用元数据类型: %s", type(app_data).__name__) + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=ResponseData( + code=status.HTTP_500_INTERNAL_SERVER_ERROR, + message="UNKNOWN_APP_TYPE", + result={}, + ).model_dump(exclude_none=True, by_alias=True), ) - for flow in app_data.flows - ] - mcp_service = [] - if app_data.mcp_service: - for service in app_data.mcp_service: - mcp_collection = await MCPServiceManager.get_mcp_service(service) - mcp_service.append(AppMcpServiceInfo( - id=mcp_collection.id, - name=mcp_collection.name, - description=mcp_collection.description, - )) + return JSONResponse( status_code=status.HTTP_200_OK, content=GetAppPropertyRsp( code=status.HTTP_200_OK, message="OK", - result=GetAppPropertyMsg( - appId=app_data.id, - appType=app_data.app_type, - published=app_data.published, - name=app_data.name, - description=app_data.description, - icon=app_data.icon, - links=app_data.links, - recommendedQuestions=app_data.first_questions, - dialogRounds=app_data.history_len, - permission=AppPermissionData( - visibility=app_data.permission.type, - authorizedUsers=app_data.permission.users, - ), - workflows=workflows, - mcpService=mcp_service, - ), + result=result_msg, ).model_dump(exclude_none=True, by_alias=True), ) -- Gitee From 229250d023d2575a26746990fb4dc4ab17217173 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 13 Oct 2025 20:30:47 +0800 Subject: [PATCH 5/8] =?UTF-8?q?=E6=95=B4=E7=90=86Scheduler=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=8C=E5=88=A0=E9=99=A4=E6=97=A7=E7=9A=84context.p?= =?UTF-8?q?y?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/scheduler/context.py | 108 -------- apps/scheduler/scheduler/scheduler.py | 380 ++++++++++++++++++++------ 2 files changed, 293 insertions(+), 195 deletions(-) delete mode 100644 apps/scheduler/scheduler/context.py diff --git a/apps/scheduler/scheduler/context.py b/apps/scheduler/scheduler/context.py deleted file mode 100644 index 2265e115f..000000000 --- a/apps/scheduler/scheduler/context.py +++ /dev/null @@ -1,108 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""上下文管理""" - -import logging -import re -from datetime import UTC, datetime - -from apps.common.security import Security -from apps.models import Record, RecordMetadata, StepStatus -from apps.schemas.record import ( - FlowHistory, - RecordContent, -) -from apps.services.appcenter import AppCenterManager -from apps.services.document import DocumentManager -from apps.services.record import RecordManager -from apps.services.task import TaskManager - -logger = logging.getLogger(__name__) - - -async def save_data(scheduler: "Scheduler") -> None: - """保存当前Executor、Task、Record等的数据""" - foot_note_pattern = re.compile(r"\[\[(\d+)\]\]") - footnote_list = [] - offset = 0 - for match in foot_note_pattern.finditer(task.runtime.answer): - order = int(match.group(1)) - if order in order_to_id: - # 计算移除脚注后的插入位置 - original_start = match.start() - new_position = original_start - offset - - footnote_list.append( - FootNoteMetaData( - releatedId=order_to_id[order], - insertPosition=new_position, - footSource="rag_search", - footType="document", - ), - ) - - # 更新偏移量,因为脚注被移除会导致后续内容前移 - offset += len(match.group(0)) - - # 最后统一移除所有脚注 - task.runtime.answer = foot_note_pattern.sub("", task.runtime.answer).strip() - record_content = RecordContent( - question=task.runtime.question, - answer=task.runtime.answer, - facts=task.runtime.facts, - data={}, - ) - - try: - # 加密Record数据 - encrypt_data, encrypt_config = Security.encrypt(record_content.model_dump_json(by_alias=True)) - except Exception: - logger.exception("[Scheduler] 问答对加密错误") - return - - # 保存Flow信息 - if task.state: - # 遍历查找数据,并添加 - await TaskManager.save_flow_context(task.id, task.context) - - # 整理Record数据 - current_time = round(datetime.now(UTC).timestamp(), 2) - record = Record( - id=task.ids.record_id, - conversationId=task.ids.conversation_id, - taskId=task.id, - userSub=user_sub, - content=encrypt_data, - key=encrypt_config, - metadata=RecordMetadata( - timeCost=task.tokens.full_time, - inputTokens=task.tokens.input_tokens, - outputTokens=task.tokens.output_tokens, - feature={}, - footNoteMetadataList=footnote_list, - ), - createdAt=current_time, - flow=FlowHistory( - flow_id=task.state.flow_id, - flow_name=task.state.flow_name, - flow_status=task.state.flow_status, - history_ids=[context.id for context in task.context], - ), - ) - - # 修改文件状态 - await DocumentManager.change_doc_status(user_sub, post_body.conversation_id, record_group) - # 保存Record - await RecordManager.insert_record_data(user_sub, post_body.conversation_id, record) - # 保存与答案关联的文件 - await DocumentManager.save_answer_doc(user_sub, record_group, used_docs) - - if post_body.app and post_body.app.app_id: - # 更新最近使用的应用 - await AppCenterManager.update_recent_app(user_sub, post_body.app.app_id) - - # 若状态为成功,删除Task - if not task.state or task.state.flow_status == StepStatus.SUCCESS or task.state.flow_status == StepStatus.ERROR or task.state.flow_status == StepStatus.CANCELLED: - await TaskManager.delete_task_by_task_id(task.id) - else: - # 更新Task - await TaskManager.save_task(task.id, task) diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index 23180c3de..8295eb44c 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -10,8 +10,19 @@ from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from apps.common.queue import MessageQueue +from apps.common.security import Security from apps.llm import Embedding, FunctionLLM, JsonGenerator, ReasoningLLM -from apps.models import AppType, Conversation, ExecutorStatus, Task, TaskRuntime, User +from apps.models import ( + AppType, + Conversation, + ExecutorStatus, + Record, + RecordMetadata, + StepStatus, + Task, + TaskRuntime, + User, +) from apps.scheduler.executor.agent import MCPAgentExecutor from apps.scheduler.executor.flow import FlowExecutor from apps.scheduler.executor.qa import QAExecutor @@ -21,13 +32,16 @@ from apps.schemas.message import ( InitContent, InitContentFeature, ) +from apps.schemas.record import FlowHistory, RecordContent from apps.schemas.request_data import RequestData from apps.schemas.scheduler import LLMConfig, TopFlow from apps.schemas.task import TaskData from apps.services.activity import Activity from apps.services.appcenter import AppCenterManager from apps.services.conversation import ConversationManager +from apps.services.document import DocumentManager from apps.services.llm import LLMManager +from apps.services.record import RecordManager from apps.services.task import TaskManager from apps.services.user import UserManager @@ -68,23 +82,8 @@ class Scheduler: raise RuntimeError(err) self.user = user - # 获取Task - task = await TaskManager.get_task_data_by_task_id(task_id) - if not task: - _logger.info("[Scheduler] 新建任务") - task = TaskData( - metadata=Task( - id=task_id, - userSub=user_sub, - conversationId=self.post_body.conversation_id, - ), - runtime=TaskRuntime( - taskId=task_id, - ), - state=None, - context=[], - ) - self.task = task + # 初始化Task + await self._init_task(task_id, user_sub) # Jinja2 self._env = SandboxedEnvironment( @@ -96,7 +95,7 @@ class Scheduler: ) # LLM - await self._get_scheduler_llm(post_body.llm_id) + self.llm = await self._get_scheduler_llm(post_body.llm_id) async def _push_init_message( @@ -131,6 +130,120 @@ class Scheduler: data=InitContent(feature=feature, createdAt=created_at).model_dump(exclude_none=True, by_alias=True), ) + async def _push_done_message(self) -> None: + """推送任务完成消息""" + _logger.info("[Scheduler] 发送结束消息") + await self.queue.push_output(self.task, self.llm, event_type=EventType.DONE.value, data={}) + + async def _determine_app_id(self) -> uuid.UUID | None: + """ + 确定最终使用的 app_id + + Returns: + final_app_id: 最终使用的 app_id,如果为 None 则使用 QA 模式 + + """ + conversation = None + + if self.task.metadata.conversationId: + conversation = await ConversationManager.get_conversation_by_conversation_id( + self.task.metadata.userSub, + self.task.metadata.conversationId, + ) + + if conversation and conversation.appId: + # Conversation中有appId,使用它 + final_app_id = conversation.appId + _logger.info("[Scheduler] 使用Conversation中的appId: %s", final_app_id) + + # 如果post_body中也有app_id且与Conversation不符,忽略post_body中的app信息 + if self.post_body.app and self.post_body.app.app_id and self.post_body.app.app_id != conversation.appId: + _logger.warning( + "[Scheduler] post_body中的app_id(%s)与Conversation中的appId(%s)不符,忽略post_body中的app信息", + self.post_body.app.app_id, + conversation.appId, + ) + # 清空post_body中的app信息,以Conversation为准 + self.post_body.app.app_id = conversation.appId + elif self.post_body.app and self.post_body.app.app_id: + # Conversation中appId为None,使用post_body中的app信息 + final_app_id = self.post_body.app.app_id + _logger.info("[Scheduler] Conversation中无appId,使用post_body中的app_id: %s", final_app_id) + else: + # 两者都为None,fallback到QAExecutor + final_app_id = None + _logger.info("[Scheduler] Conversation和post_body中均无appId,fallback到智能问答") + + return final_app_id + + async def _create_executor_task(self, final_app_id: uuid.UUID | None) -> asyncio.Task | None: + """ + 根据 app_id 创建对应的执行器任务 + + Args: + final_app_id: 要使用的 app_id,None 表示使用 QA 模式 + + Returns: + 创建的异步任务,如果发生错误则返回 None + + """ + if final_app_id is None: + # 没有app相关信息,运行QAExecutor + _logger.info("[Scheduler] 运行智能问答模式") + await self._push_init_message(3, is_flow=False) + return asyncio.create_task(self._run_qa()) + + # 有app信息,获取app详情和元数据 + try: + app_data = await AppCenterManager.fetch_app_metadata_by_id(final_app_id) + except ValueError: + _logger.exception("[Scheduler] App %s 不存在或元数据文件缺失", final_app_id) + await self.queue.close() + return None + + # 获取上下文窗口并根据app类型决定执行器 + context_num = app_data.history_len + _logger.info("[Scheduler] App上下文窗口: %d", context_num) + + if app_data.app_type == AppType.FLOW: + _logger.info("[Scheduler] 运行Flow应用") + await self._push_init_message(context_num, is_flow=True) + return asyncio.create_task(self._run_flow()) + + _logger.info("[Scheduler] 运行MCP Agent应用") + await self._push_init_message(context_num, is_flow=False) + return asyncio.create_task(self._run_agent()) + + async def _handle_task_cancellation(self, main_task: asyncio.Task) -> None: + """ + 处理任务取消的逻辑 + + Args: + main_task: 需要取消的主任务 + + """ + _logger.warning("[Scheduler] 用户取消执行,正在终止...") + main_task.cancel() + try: + await main_task + except asyncio.CancelledError: + _logger.info("[Scheduler] 主任务已取消") + except Exception: + _logger.exception("[Scheduler] 终止工作流时发生错误") + + # 检查ExecutorState,若为init、running或waiting,将状态改为cancelled + if self.task.state and self.task.state.executorStatus in [ + ExecutorStatus.INIT, + ExecutorStatus.RUNNING, + ExecutorStatus.WAITING, + ]: + self.task.state.executorStatus = ExecutorStatus.CANCELLED + _logger.info("[Scheduler] ExecutorStatus已设置为CANCELLED") + elif self.task.state: + _logger.info("[Scheduler] ExecutorStatus为 %s,保持不变", self.task.state.executorStatus) + else: + _logger.warning("[Scheduler] task.state为None,无法更新ExecutorStatus") + async def _monitor_activity(self, kill_event: asyncio.Event, user_sub: str) -> None: """监控用户活动状态,不活跃时终止工作流""" try: @@ -263,76 +376,105 @@ class Scheduler: return new_conv - async def _init_task(self) -> None: + def _create_new_task(self, task_id: uuid.UUID, user_sub: str, conversation_id: uuid.UUID | None) -> TaskData: + """创建新的TaskData""" + return TaskData( + metadata=Task( + id=task_id, + userSub=user_sub, + conversationId=conversation_id, + ), + runtime=TaskRuntime( + taskId=task_id, + ), + state=None, + context=[], + ) + + async def _init_task(self, task_id: uuid.UUID, user_sub: str) -> None: """初始化Task""" - self.task = await TaskManager.get_task_data_by_task_id(self.post_body.task_id) - if not self.task: - self.task = await TaskManager.init_new_task(self.post_body.task_id, self.post_body.conversation_id, self.post_body.language, self.post_body.app.app_id) - self.task.runtime.question = self.post_body.question - self.task.state.app_id = self.post_body.app.app_id if self.post_body.app else None + conversation_id = self.post_body.conversation_id + + # 若没有Conversation ID则直接创建task + if not conversation_id: + _logger.info("[Scheduler] 无Conversation ID,直接创建新任务") + self.task = self._create_new_task(task_id, user_sub, None) + return + + # 有ConversationID则尝试从ConversationID中恢复task + _logger.info("[Scheduler] 尝试从Conversation ID %s 恢复任务", conversation_id) + + # 尝试恢复任务 + restored = False + try: + # 先验证conversation是否存在且属于该用户 + conversation = await ConversationManager.get_conversation_by_conversation_id( + user_sub, + conversation_id, + ) + + if conversation: + # 尝试从Conversation中获取最后一个Task + last_task = await TaskManager.get_task_by_conversation_id(conversation_id, user_sub) + + # 如果能获取到task,则加载完整的TaskData + if last_task and last_task.id: + _logger.info("[Scheduler] 从Conversation恢复任务 %s", last_task.id) + task_data = await TaskManager.get_task_data_by_task_id(last_task.id) + if task_data: + self.task = task_data + # 更新task_id为新的task_id + self.task.metadata.id = task_id + self.task.runtime.taskId = task_id + if self.task.state: + self.task.state.taskId = task_id + restored = True + else: + _logger.warning( + "[Scheduler] Conversation %s 不存在或无权访问,创建新任务", + conversation_id, + ) + except Exception: + _logger.exception("[Scheduler] 从Conversation恢复任务失败,创建新任务") + + # 恢复不成功则新建task + if not restored: + _logger.info("[Scheduler] 无法恢复任务,创建新任务") + self.task = self._create_new_task(task_id, user_sub, conversation_id) async def run(self) -> None: """运行调度器""" - # 如果是智能问答,直接执行 _logger.info("[Scheduler] 开始执行") - # 创建用于通信的事件 + + # 创建用于通信的事件和监控任务 kill_event = asyncio.Event() monitor = asyncio.create_task(self._monitor_activity(kill_event, self.task.metadata.userSub)) - rag_method = True - if self.post_body.app and self.post_body.app.app_id: - rag_method = False + # 确定最终使用的 app_id + final_app_id = await self._determine_app_id() + + # 根据 app_id 创建对应的执行器任务 + main_task = await self._create_executor_task(final_app_id) + if main_task is None: + # 创建任务失败(通常是因为 app 不存在),直接返回 + return - if rag_method: - kb_ids = await KnowledgeBaseManager.get_selected_kb(self.task.metadata.userSub) - await self._push_init_message(3, is_flow=False) - # 启动监控任务和主任务 - main_task = asyncio.create_task(self._run_qa( - self.task, self.queue, self.task.ids.user_sub, llm, history, doc_ids, rag_data)) - else: - # 查找对应的App元数据 - app_data = await AppCenterManager.fetch_app_metadata_by_id(self.post_body.app.app_id) - if not app_data: - _logger.error("[Scheduler] App %s 不存在", self.post_body.app.app_id) - await self.queue.close() - return - - # 获取上下文 - if app_data.app_type == AppType.FLOW: - # 需要执行Flow - is_flow = True - else: - # Agent 应用 - is_flow = False - # 启动监控任务和主任务 - main_task = asyncio.create_task(self._run_agent(self.queue, self.post_body, executor_background)) # 等待任一任务完成 done, pending = await asyncio.wait( [main_task, monitor], return_when=asyncio.FIRST_COMPLETED, ) - # 如果用户手动终止,则cancel主任务 + # 如果用户手动终止,则取消主任务 if kill_event.is_set(): - _logger.warning("[Scheduler] 用户取消执行,正在终止...") - main_task.cancel() - if self.task.state.executorStatus in [ExecutorStatus.RUNNING, ExecutorStatus.WAITING]: - self.task.state.executorStatus = ExecutorStatus.CANCELLED - try: - await main_task - _logger.info("[Scheduler] 工作流执行已被终止") - except Exception: - _logger.exception("[Scheduler] 终止工作流时发生错误") + await self._handle_task_cancellation(main_task) # 更新Task,发送结束消息 - _logger.info("[Scheduler] 发送结束消息") - await self.queue.push_output(self.task, event_type=EventType.DONE.value, data={}) + await self._push_done_message() # 关闭Queue await self.queue.close() - return - async def _run_qa(self) -> None: qa_executor = QAExecutor( @@ -417,24 +559,88 @@ class Scheduler: self.task = agent_exec.task - async def _save_task(self) -> None: - """保存Task""" - # 构造RecordContent + async def _save_data(self) -> None: + """保存当前Executor、Task、Record等的数据""" + task = self.task + user_sub = self.task.metadata.userSub + post_body = self.post_body + + # 构造文档列表 used_docs = [] - order_to_id = {} - for docs in task.runtime.documents: - used_docs.append( - RecordGroupDocument( - _id=docs["id"], - author=docs.get("author", ""), - order=docs.get("order", 0), - name=docs["name"], - abstract=docs.get("abstract", ""), - extension=docs.get("extension", ""), - size=docs.get("size", 0), - associated="answer", - created_at=docs.get("created_at", round(datetime.now(UTC).timestamp(), 3)), - ), - ) - if docs.get("order") is not None: - order_to_id[docs["order"]] = docs["id"] + record_group = None # TODO: 需要从适当的地方获取record_group + + # 处理文档 + if hasattr(task.runtime, "documents") and task.runtime.documents: + for docs in task.runtime.documents: + doc_dict = docs if isinstance(docs, dict) else (docs.model_dump() if hasattr(docs, "model_dump") else docs) + used_docs.append(doc_dict) + + # 组装RecordContent + record_content = RecordContent( + question=task.runtime.question if hasattr(task.runtime, "question") else "", + answer=task.runtime.answer if hasattr(task.runtime, "answer") else "", + facts=task.runtime.facts if hasattr(task.runtime, "facts") else [], + data={}, + ) + + try: + # 加密Record数据 + encrypt_data, encrypt_config = Security.encrypt(record_content.model_dump_json(by_alias=True)) + except Exception: + _logger.exception("[Scheduler] 问答对加密错误") + return + + # 保存Flow信息 + if task.state: + # 遍历查找数据,并添加 + await TaskManager.save_flow_context(task.context) + + # 整理Record数据 + current_time = round(datetime.now(UTC).timestamp(), 2) + record = Record( + id=task.metadata.id, # record_id + conversationId=task.metadata.conversationId, + taskId=task.metadata.id, + userSub=user_sub, + content=encrypt_data, + key=encrypt_config, + metadata=RecordMetadata( + timeCost=0, # TODO: 需要从task中获取时间成本 + inputTokens=0, # TODO: 需要从task中获取token信息 + outputTokens=0, # TODO: 需要从task中获取token信息 + feature={}, + ), + createdAt=current_time, + flow=FlowHistory( + flow_id=task.state.flow_id if task.state else "", + flow_name=task.state.flow_name if task.state else "", + flow_status=task.state.flow_status if task.state else StepStatus.SUCCESS, + history_ids=[context.id for context in task.context], + ) if task.state else None, + ) + + # 修改文件状态 + if record_group and post_body.conversation_id: + await DocumentManager.change_doc_status(user_sub, post_body.conversation_id, record_group) + + # 保存Record + if post_body.conversation_id: + await RecordManager.insert_record_data(user_sub, post_body.conversation_id, record) + + # 保存与答案关联的文件 + if record_group and used_docs: + await DocumentManager.save_answer_doc(user_sub, record_group, used_docs) + + if post_body.app and post_body.app.app_id: + # 更新最近使用的应用 + await AppCenterManager.update_recent_app(user_sub, post_body.app.app_id) + + # 若状态为成功,删除Task + if not task.state or task.state.flow_status in [StepStatus.SUCCESS, StepStatus.ERROR, StepStatus.CANCELLED]: + await TaskManager.delete_task_by_task_id(task.metadata.id) + else: + # 更新Task + await TaskManager.save_task(task.metadata.id, task.metadata) + await TaskManager.save_task_runtime(task.runtime) + if task.state: + await TaskManager.save_executor_checkpoint(task.state) -- Gitee From 2ce1ac2dbc0768ccfd1ebfffbbae40c4c85c234b Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 13 Oct 2025 20:31:18 +0800 Subject: [PATCH 6/8] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=BB=93=E6=9E=84=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/schemas/appcenter.py | 2 +- apps/schemas/record.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/schemas/appcenter.py b/apps/schemas/appcenter.py index 090909b4a..ac295a8db 100644 --- a/apps/schemas/appcenter.py +++ b/apps/schemas/appcenter.py @@ -48,7 +48,7 @@ class AppLink(BaseModel): class AppFlowInfo(BaseModel): """应用工作流数据结构""" - id: str = Field(..., description="工作流ID") + id: uuid.UUID = Field(..., description="工作流ID") name: str = Field(default="", description="工作流名称") description: str = Field(default="", description="工作流简介") debug: bool = Field(default=False, description="是否经过调试") diff --git a/apps/schemas/record.py b/apps/schemas/record.py index b8da8bd23..cc8f53957 100644 --- a/apps/schemas/record.py +++ b/apps/schemas/record.py @@ -36,7 +36,7 @@ class RecordFlowStep(BaseModel): ex_data: dict[str, Any] | None = Field(default=None, alias="exData") -class RecordFlow(BaseModel): +class RecordExecutor(BaseModel): """Flow的执行信息""" id: str @@ -83,7 +83,7 @@ class RecordData(BaseModel): conversation_id: uuid.UUID = Field(alias="conversationId") task_id: uuid.UUID | None = Field(alias="taskId", default=None) document: list[RecordDocument] = [] - flow: RecordFlow | None = None + flow: RecordExecutor | None = None content: RecordContent metadata: RecordMetadata comment: CommentType = Field(default=CommentType.NONE) -- Gitee From 6f0728a26779c7d5ec523e35ca5767440a4b3e1f Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 13 Oct 2025 20:31:56 +0800 Subject: [PATCH 7/8] =?UTF-8?q?=E7=A7=BB=E9=99=A4Chat=E4=B8=AD=E5=88=9B?= =?UTF-8?q?=E5=BB=BATask=E7=9A=84=E9=80=BB=E8=BE=91=EF=BC=8C=E7=A7=BB?= =?UTF-8?q?=E5=85=A5Scheduler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/routers/chat.py | 10 +--------- apps/routers/record.py | 4 ++-- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 61698c838..33d813cd0 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -13,14 +13,12 @@ from apps.common.wordscheck import words_check from apps.dependency import verify_personal_token, verify_session from apps.models import ExecutorStatus from apps.scheduler.scheduler import Scheduler -from apps.scheduler.scheduler.context import save_data from apps.schemas.request_data import RequestData from apps.schemas.response_data import ResponseData from apps.services.activity import Activity from apps.services.blacklist import QuestionBlacklistManager, UserBlacklistManager from apps.services.flow import FlowManager -RECOMMEND_TRES = 5 _logger = logging.getLogger(__name__) router = APIRouter( prefix="/api", @@ -31,7 +29,6 @@ router = APIRouter( ], ) - async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) -> AsyncGenerator[str, None]: """进行实际问答,并从MQ中获取消息""" try: @@ -44,15 +41,13 @@ async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) await Activity.remove_active(user_sub) return - task = await init_task(post_body, user_sub, session_id) - # 创建queue;由Scheduler进行关闭 queue = MessageQueue() await queue.init() # 在单独Task中运行Scheduler,拉齐queue.get的时机 scheduler = Scheduler() - await scheduler.init(task.metadata.id, queue, post_body, user_sub) + await scheduler.init(queue, post_body, user_sub) _logger.info(f"[Chat] 用户是否活跃: {await Activity.is_active(user_sub)}") scheduler_task = asyncio.create_task(scheduler.run()) @@ -80,9 +75,6 @@ async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) await Activity.remove_active(user_sub) return - # 创建新Record,存入数据库 - await save_data(task, user_sub, post_body) - if post_body.app and post_body.app.flow_id: await FlowManager.update_flow_debug_by_app_and_flow_id( post_body.app.app_id, diff --git a/apps/routers/record.py b/apps/routers/record.py index b3e488691..a4cacae15 100644 --- a/apps/routers/record.py +++ b/apps/routers/record.py @@ -14,7 +14,7 @@ from apps.models import ExecutorHistory from apps.schemas.record import ( RecordContent, RecordData, - RecordFlow, + RecordExecutor, RecordFlowStep, RecordMetadata, ) @@ -86,7 +86,7 @@ async def get_record(request: Request, conversationId: Annotated[uuid.UUID, Path # 获得Record关联的flow数据 flow_step_list = await TaskManager.get_context_by_record_id(record_group.id, record.id) if flow_step_list: - tmp_record.flow = RecordFlow( + tmp_record.flow = RecordExecutor( id=record.flow.flow_id, # TODO: 此处前端应该用name recordId=record.id, flowId=record.flow.flow_id, -- Gitee From 0559066d639376f989aa708c1565da2e02062188 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 13 Oct 2025 20:32:23 +0800 Subject: [PATCH 8/8] =?UTF-8?q?=E5=8E=BB=E9=99=A4start=E5=92=8Cend?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E6=A3=80=E6=B5=8B=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/services/flow_service.py | 32 ++++---------------------------- 1 file changed, 4 insertions(+), 28 deletions(-) diff --git a/apps/services/flow_service.py b/apps/services/flow_service.py index d12bd1312..5fa99df74 100644 --- a/apps/services/flow_service.py +++ b/apps/services/flow_service.py @@ -118,21 +118,16 @@ class FlowServiceManager: # 验证节点ID并获取起始和终止节点 start_id, end_id = await FlowServiceManager._validate_node_ids(flow_item) - # 验证边的合法性并获取节点的入度和出度 - in_deg, out_deg = await FlowServiceManager._validate_edges(flow_item.edges) - - # 验证起始和终止节点的入度和出度 - await FlowServiceManager._validate_node_degrees(str(start_id), str(end_id), in_deg, out_deg) + # 验证边的合法性 + await FlowServiceManager._validate_edges(flow_item.edges) return start_id, end_id @staticmethod - async def _validate_edges(edges: list[EdgeItem]) -> tuple[dict[str, int], dict[str, int]]: - """验证边的合法性并计算节点的入度和出度;当边的ID重复、起始终止节点相同、分支重复或分支包含非法字符时抛出异常""" + async def _validate_edges(edges: list[EdgeItem]) -> None: + """验证边的合法性;当边的ID重复、起始终止节点相同、分支重复或分支包含非法字符时抛出异常""" ids = set() branches = {} - in_deg = {} - out_deg = {} for e in edges: # 验证分支ID是否包含非法字符 @@ -162,25 +157,6 @@ class FlowServiceManager: branches[e.source_branch].add(e.branch_id) - in_deg[e.target_branch] = in_deg.get(e.target_branch, 0) + 1 - out_deg[e.source_branch] = out_deg.get(e.source_branch, 0) + 1 - - return in_deg, out_deg - - @staticmethod - async def _validate_node_degrees( - start_id: str, end_id: str, in_deg: dict[str, int], out_deg: dict[str, int], - ) -> None: - """验证起始和终止节点的入度和出度;当起始节点入度不为0或终止节点出度不为0时抛出异常""" - if start_id in in_deg and in_deg[start_id] != 0: - err = f"[FlowService] 起始节点{start_id}的入度不为0" - logger.error(err) - raise FlowNodeValidationError(err) - if end_id in out_deg and out_deg[end_id] != 0: - err = f"[FlowService] 终止节点{end_id}的出度不为0" - logger.error(err) - raise FlowNodeValidationError(err) - @staticmethod async def validate_flow_connectivity(flow_item: FlowItem) -> bool: # noqa: C901 """ -- Gitee