diff --git a/apps/common/oidc_provider/openeuler.py b/apps/common/oidc_provider/openeuler.py
index 5d85b30c48f4726d749014198f29b1ba309c89bb..8adfe09f76253e36bdc4ff462775e89c1ffd3baa 100644
--- a/apps/common/oidc_provider/openeuler.py
+++ b/apps/common/oidc_provider/openeuler.py
@@ -87,10 +87,6 @@ class OpenEulerOIDCProvider(OIDCProviderBase):
logger.info("[OpenEuler] 获取OIDC用户成功: %s", resp.text)
result = resp.json()
- if not result["phone_number_verified"]:
- err = "Could not validate credentials."
- raise RuntimeError(err)
-
return {
"user_sub": result["sub"],
}
diff --git a/apps/models/__init__.py b/apps/models/__init__.py
index 8db4b8394ac484ccaca0ea2c61e503177a4a5fc8..98bd73674641913a678db2b29abd355de3701db1 100644
--- a/apps/models/__init__.py
+++ b/apps/models/__init__.py
@@ -11,7 +11,7 @@ from .flow import Flow
from .llm import LLMData, LLMProvider, LLMType
from .mcp import MCPActivated, MCPInfo, MCPInstallStatus, MCPTools, MCPType
from .node import NodeInfo
-from .record import FootNoteType, Record, RecordFootNote, RecordMetadata
+from .record import Record, RecordMetadata
from .service import Service, ServiceACL, ServiceHashes
from .session import Session, SessionActivity, SessionType
from .tag import Tag
@@ -45,7 +45,6 @@ __all__ = [
"ExecutorHistory",
"ExecutorStatus",
"Flow",
- "FootNoteType",
"LLMData",
"LLMProvider",
"LLMType",
@@ -58,7 +57,6 @@ __all__ = [
"NodeInfo",
"PermissionType",
"Record",
- "RecordFootNote",
"RecordMetadata",
"Service",
"ServiceACL",
diff --git a/apps/models/record.py b/apps/models/record.py
index 0645fde9e2faa18d681ee8a91942b7e35ec95fc8..c99d8ba6448a80fc5858698dff3b37f6b66ee0ad 100644
--- a/apps/models/record.py
+++ b/apps/models/record.py
@@ -3,10 +3,9 @@
import uuid
from datetime import UTC, datetime
-from enum import Enum as PyEnum
from typing import Any
-from sqlalchemy import BigInteger, DateTime, Enum, Float, ForeignKey, Integer, String, Text
+from sqlalchemy import DateTime, Float, ForeignKey, Integer, String, Text
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import Mapped, mapped_column
@@ -57,31 +56,3 @@ class RecordMetadata(Base):
"""问答对输出token数"""
featureSwitch: Mapped[dict[str, Any]] = mapped_column(JSONB, default_factory=dict, nullable=False) # noqa: N815
"""问答对功能开关"""
-
-
-class FootNoteType(str, PyEnum):
- """脚注类型"""
-
- RAG = "rag"
- FRAMEWORK = "framework"
- WEB = "web"
-
-
-class RecordFootNote(Base):
- """问答对脚注"""
-
- __tablename__ = "framework_record_foot_note"
- id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True, init=False)
- """主键ID"""
- recordId: Mapped[uuid.UUID] = mapped_column( # noqa: N815
- UUID(as_uuid=True), ForeignKey("framework_record.id"), nullable=False,
- )
- """问答对ID"""
- releatedId: Mapped[str] = mapped_column(String(64), default="", nullable=False) # noqa: N815
- """脚注数字"""
- insertPosition: Mapped[int] = mapped_column(Integer, default=0, nullable=False) # noqa: N815
- """插入位置"""
- footSource: Mapped[str] = mapped_column(String(4096), default="", nullable=False) # noqa: N815
- """脚注来源"""
- footType: Mapped[FootNoteType] = mapped_column(Enum(FootNoteType), default=FootNoteType.RAG, nullable=False) # noqa: N815
- """脚注类型"""
diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py
index 5fdff565f7123ea889ce8b033b7ebde325c58215..304aab52092bf825829ed1a7273787a8d3746d28 100644
--- a/apps/routers/appcenter.py
+++ b/apps/routers/appcenter.py
@@ -10,7 +10,8 @@ from fastapi.responses import JSONResponse
from apps.dependency.user import verify_personal_token, verify_session
from apps.exceptions import InstancePermissionError
-from apps.models import AppType
+from apps.models import AppType, PermissionType
+from apps.schemas.agent import AgentAppMetadata
from apps.schemas.appcenter import (
AppFlowInfo,
AppMcpServiceInfo,
@@ -28,6 +29,7 @@ from apps.schemas.appcenter import (
GetRecentAppListRsp,
)
from apps.schemas.enum_var import AppFilterType
+from apps.schemas.flow import AppMetadata
from apps.schemas.response_data import ResponseData
from apps.services.appcenter import AppCenterManager
from apps.services.mcp_service import MCPServiceManager
@@ -214,46 +216,83 @@ async def get_application(appId: Annotated[uuid.UUID, Path()]) -> JSONResponse:
result={},
).model_dump(exclude_none=True, by_alias=True),
)
- workflows = [
- AppFlowInfo(
- id=flow.id,
- name=flow.name,
- description=flow.description,
- debug=flow.debug,
+
+ # 根据 Metadata 类型组装对应的 GetAppPropertyMsg
+ if isinstance(app_data, AppMetadata):
+ # 处理工作流应用(FLOW类型)
+ workflows = [
+ AppFlowInfo(
+ id=flow.id,
+ name=flow.name,
+ description=flow.description,
+ debug=flow.debug,
+ )
+ for flow in app_data.flows
+ ]
+ result_msg = GetAppPropertyMsg(
+ appId=str(app_data.id),
+ appType=app_data.app_type,
+ published=app_data.published,
+ name=app_data.name,
+ description=app_data.description,
+ icon=app_data.icon,
+ links=app_data.links,
+ recommendedQuestions=app_data.first_questions,
+ dialogRounds=app_data.history_len,
+ permission=AppPermissionData(
+ visibility=app_data.permission.type if app_data.permission else PermissionType.PRIVATE,
+ authorizedUsers=app_data.permission.users if app_data.permission else [],
+ ),
+ workflows=workflows,
+ mcpService=[],
+ )
+ elif isinstance(app_data, AgentAppMetadata):
+ # 处理智能体应用(AGENT类型)
+ mcp_service = []
+ if app_data.mcp_service:
+ for service in app_data.mcp_service:
+ mcp_collection = await MCPServiceManager.get_mcp_service(service)
+ # 当 mcp_collection 为 None 时忽略当前的 MCP Server
+ if mcp_collection is not None:
+ mcp_service.append(AppMcpServiceInfo(
+ id=uuid.UUID(service) if isinstance(service, str) else service,
+ name=mcp_collection.name,
+ description=mcp_collection.description,
+ ))
+ result_msg = GetAppPropertyMsg(
+ appId=str(app_data.id),
+ appType=app_data.app_type,
+ published=app_data.published,
+ name=app_data.name,
+ description=app_data.description,
+ icon=app_data.icon,
+ links=[],
+ recommendedQuestions=[],
+ dialogRounds=app_data.history_len,
+ permission=AppPermissionData(
+ visibility=app_data.permission.type if app_data.permission else PermissionType.PRIVATE,
+ authorizedUsers=app_data.permission.users if app_data.permission else [],
+ ),
+ workflows=[],
+ mcpService=mcp_service,
+ )
+ else:
+ logger.error("[AppCenter] 未知的应用元数据类型: %s", type(app_data).__name__)
+ return JSONResponse(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ content=ResponseData(
+ code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ message="UNKNOWN_APP_TYPE",
+ result={},
+ ).model_dump(exclude_none=True, by_alias=True),
)
- for flow in app_data.flows
- ]
- mcp_service = []
- if app_data.mcp_service:
- for service in app_data.mcp_service:
- mcp_collection = await MCPServiceManager.get_mcp_service(service)
- mcp_service.append(AppMcpServiceInfo(
- id=mcp_collection.id,
- name=mcp_collection.name,
- description=mcp_collection.description,
- ))
+
return JSONResponse(
status_code=status.HTTP_200_OK,
content=GetAppPropertyRsp(
code=status.HTTP_200_OK,
message="OK",
- result=GetAppPropertyMsg(
- appId=app_data.id,
- appType=app_data.app_type,
- published=app_data.published,
- name=app_data.name,
- description=app_data.description,
- icon=app_data.icon,
- links=app_data.links,
- recommendedQuestions=app_data.first_questions,
- dialogRounds=app_data.history_len,
- permission=AppPermissionData(
- visibility=app_data.permission.type,
- authorizedUsers=app_data.permission.users,
- ),
- workflows=workflows,
- mcpService=mcp_service,
- ),
+ result=result_msg,
).model_dump(exclude_none=True, by_alias=True),
)
diff --git a/apps/routers/chat.py b/apps/routers/chat.py
index 61698c838017b8c66d3c52bf5ea06a969b42d0de..33d813cd054897ed9ca2b7c3965723d527328260 100644
--- a/apps/routers/chat.py
+++ b/apps/routers/chat.py
@@ -13,14 +13,12 @@ from apps.common.wordscheck import words_check
from apps.dependency import verify_personal_token, verify_session
from apps.models import ExecutorStatus
from apps.scheduler.scheduler import Scheduler
-from apps.scheduler.scheduler.context import save_data
from apps.schemas.request_data import RequestData
from apps.schemas.response_data import ResponseData
from apps.services.activity import Activity
from apps.services.blacklist import QuestionBlacklistManager, UserBlacklistManager
from apps.services.flow import FlowManager
-RECOMMEND_TRES = 5
_logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api",
@@ -31,7 +29,6 @@ router = APIRouter(
],
)
-
async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) -> AsyncGenerator[str, None]:
"""进行实际问答,并从MQ中获取消息"""
try:
@@ -44,15 +41,13 @@ async def chat_generator(post_body: RequestData, user_sub: str, session_id: str)
await Activity.remove_active(user_sub)
return
- task = await init_task(post_body, user_sub, session_id)
-
# 创建queue;由Scheduler进行关闭
queue = MessageQueue()
await queue.init()
# 在单独Task中运行Scheduler,拉齐queue.get的时机
scheduler = Scheduler()
- await scheduler.init(task.metadata.id, queue, post_body, user_sub)
+ await scheduler.init(queue, post_body, user_sub)
_logger.info(f"[Chat] 用户是否活跃: {await Activity.is_active(user_sub)}")
scheduler_task = asyncio.create_task(scheduler.run())
@@ -80,9 +75,6 @@ async def chat_generator(post_body: RequestData, user_sub: str, session_id: str)
await Activity.remove_active(user_sub)
return
- # 创建新Record,存入数据库
- await save_data(task, user_sub, post_body)
-
if post_body.app and post_body.app.flow_id:
await FlowManager.update_flow_debug_by_app_and_flow_id(
post_body.app.app_id,
diff --git a/apps/routers/record.py b/apps/routers/record.py
index b3e488691c4134a38c002ee636afd744ddceed96..a4cacae15e20ff0ddcec4acd2b61e59a9f1deddf 100644
--- a/apps/routers/record.py
+++ b/apps/routers/record.py
@@ -14,7 +14,7 @@ from apps.models import ExecutorHistory
from apps.schemas.record import (
RecordContent,
RecordData,
- RecordFlow,
+ RecordExecutor,
RecordFlowStep,
RecordMetadata,
)
@@ -86,7 +86,7 @@ async def get_record(request: Request, conversationId: Annotated[uuid.UUID, Path
# 获得Record关联的flow数据
flow_step_list = await TaskManager.get_context_by_record_id(record_group.id, record.id)
if flow_step_list:
- tmp_record.flow = RecordFlow(
+ tmp_record.flow = RecordExecutor(
id=record.flow.flow_id, # TODO: 此处前端应该用name
recordId=record.id,
flowId=record.flow.flow_id,
diff --git a/apps/scheduler/scheduler/context.py b/apps/scheduler/scheduler/context.py
deleted file mode 100644
index 2265e115ff7e4d52982e8256593a7f690be3a618..0000000000000000000000000000000000000000
--- a/apps/scheduler/scheduler/context.py
+++ /dev/null
@@ -1,108 +0,0 @@
-# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
-"""上下文管理"""
-
-import logging
-import re
-from datetime import UTC, datetime
-
-from apps.common.security import Security
-from apps.models import Record, RecordMetadata, StepStatus
-from apps.schemas.record import (
- FlowHistory,
- RecordContent,
-)
-from apps.services.appcenter import AppCenterManager
-from apps.services.document import DocumentManager
-from apps.services.record import RecordManager
-from apps.services.task import TaskManager
-
-logger = logging.getLogger(__name__)
-
-
-async def save_data(scheduler: "Scheduler") -> None:
- """保存当前Executor、Task、Record等的数据"""
- foot_note_pattern = re.compile(r"\[\[(\d+)\]\]")
- footnote_list = []
- offset = 0
- for match in foot_note_pattern.finditer(task.runtime.answer):
- order = int(match.group(1))
- if order in order_to_id:
- # 计算移除脚注后的插入位置
- original_start = match.start()
- new_position = original_start - offset
-
- footnote_list.append(
- FootNoteMetaData(
- releatedId=order_to_id[order],
- insertPosition=new_position,
- footSource="rag_search",
- footType="document",
- ),
- )
-
- # 更新偏移量,因为脚注被移除会导致后续内容前移
- offset += len(match.group(0))
-
- # 最后统一移除所有脚注
- task.runtime.answer = foot_note_pattern.sub("", task.runtime.answer).strip()
- record_content = RecordContent(
- question=task.runtime.question,
- answer=task.runtime.answer,
- facts=task.runtime.facts,
- data={},
- )
-
- try:
- # 加密Record数据
- encrypt_data, encrypt_config = Security.encrypt(record_content.model_dump_json(by_alias=True))
- except Exception:
- logger.exception("[Scheduler] 问答对加密错误")
- return
-
- # 保存Flow信息
- if task.state:
- # 遍历查找数据,并添加
- await TaskManager.save_flow_context(task.id, task.context)
-
- # 整理Record数据
- current_time = round(datetime.now(UTC).timestamp(), 2)
- record = Record(
- id=task.ids.record_id,
- conversationId=task.ids.conversation_id,
- taskId=task.id,
- userSub=user_sub,
- content=encrypt_data,
- key=encrypt_config,
- metadata=RecordMetadata(
- timeCost=task.tokens.full_time,
- inputTokens=task.tokens.input_tokens,
- outputTokens=task.tokens.output_tokens,
- feature={},
- footNoteMetadataList=footnote_list,
- ),
- createdAt=current_time,
- flow=FlowHistory(
- flow_id=task.state.flow_id,
- flow_name=task.state.flow_name,
- flow_status=task.state.flow_status,
- history_ids=[context.id for context in task.context],
- ),
- )
-
- # 修改文件状态
- await DocumentManager.change_doc_status(user_sub, post_body.conversation_id, record_group)
- # 保存Record
- await RecordManager.insert_record_data(user_sub, post_body.conversation_id, record)
- # 保存与答案关联的文件
- await DocumentManager.save_answer_doc(user_sub, record_group, used_docs)
-
- if post_body.app and post_body.app.app_id:
- # 更新最近使用的应用
- await AppCenterManager.update_recent_app(user_sub, post_body.app.app_id)
-
- # 若状态为成功,删除Task
- if not task.state or task.state.flow_status == StepStatus.SUCCESS or task.state.flow_status == StepStatus.ERROR or task.state.flow_status == StepStatus.CANCELLED:
- await TaskManager.delete_task_by_task_id(task.id)
- else:
- # 更新Task
- await TaskManager.save_task(task.id, task)
diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py
index 23180c3de387aff202cbf126adf3e8feb7426ae2..8295eb44c2a31a2022525325db005d2eae945a94 100644
--- a/apps/scheduler/scheduler/scheduler.py
+++ b/apps/scheduler/scheduler/scheduler.py
@@ -10,8 +10,19 @@ from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
from apps.common.queue import MessageQueue
+from apps.common.security import Security
from apps.llm import Embedding, FunctionLLM, JsonGenerator, ReasoningLLM
-from apps.models import AppType, Conversation, ExecutorStatus, Task, TaskRuntime, User
+from apps.models import (
+ AppType,
+ Conversation,
+ ExecutorStatus,
+ Record,
+ RecordMetadata,
+ StepStatus,
+ Task,
+ TaskRuntime,
+ User,
+)
from apps.scheduler.executor.agent import MCPAgentExecutor
from apps.scheduler.executor.flow import FlowExecutor
from apps.scheduler.executor.qa import QAExecutor
@@ -21,13 +32,16 @@ from apps.schemas.message import (
InitContent,
InitContentFeature,
)
+from apps.schemas.record import FlowHistory, RecordContent
from apps.schemas.request_data import RequestData
from apps.schemas.scheduler import LLMConfig, TopFlow
from apps.schemas.task import TaskData
from apps.services.activity import Activity
from apps.services.appcenter import AppCenterManager
from apps.services.conversation import ConversationManager
+from apps.services.document import DocumentManager
from apps.services.llm import LLMManager
+from apps.services.record import RecordManager
from apps.services.task import TaskManager
from apps.services.user import UserManager
@@ -68,23 +82,8 @@ class Scheduler:
raise RuntimeError(err)
self.user = user
- # 获取Task
- task = await TaskManager.get_task_data_by_task_id(task_id)
- if not task:
- _logger.info("[Scheduler] 新建任务")
- task = TaskData(
- metadata=Task(
- id=task_id,
- userSub=user_sub,
- conversationId=self.post_body.conversation_id,
- ),
- runtime=TaskRuntime(
- taskId=task_id,
- ),
- state=None,
- context=[],
- )
- self.task = task
+ # 初始化Task
+ await self._init_task(task_id, user_sub)
# Jinja2
self._env = SandboxedEnvironment(
@@ -96,7 +95,7 @@ class Scheduler:
)
# LLM
- await self._get_scheduler_llm(post_body.llm_id)
+ self.llm = await self._get_scheduler_llm(post_body.llm_id)
async def _push_init_message(
@@ -131,6 +130,120 @@ class Scheduler:
data=InitContent(feature=feature, createdAt=created_at).model_dump(exclude_none=True, by_alias=True),
)
+ async def _push_done_message(self) -> None:
+ """推送任务完成消息"""
+ _logger.info("[Scheduler] 发送结束消息")
+ await self.queue.push_output(self.task, self.llm, event_type=EventType.DONE.value, data={})
+
+ async def _determine_app_id(self) -> uuid.UUID | None:
+ """
+ 确定最终使用的 app_id
+
+ Returns:
+ final_app_id: 最终使用的 app_id,如果为 None 则使用 QA 模式
+
+ """
+ conversation = None
+
+ if self.task.metadata.conversationId:
+ conversation = await ConversationManager.get_conversation_by_conversation_id(
+ self.task.metadata.userSub,
+ self.task.metadata.conversationId,
+ )
+
+ if conversation and conversation.appId:
+ # Conversation中有appId,使用它
+ final_app_id = conversation.appId
+ _logger.info("[Scheduler] 使用Conversation中的appId: %s", final_app_id)
+
+ # 如果post_body中也有app_id且与Conversation不符,忽略post_body中的app信息
+ if self.post_body.app and self.post_body.app.app_id and self.post_body.app.app_id != conversation.appId:
+ _logger.warning(
+ "[Scheduler] post_body中的app_id(%s)与Conversation中的appId(%s)不符,忽略post_body中的app信息",
+ self.post_body.app.app_id,
+ conversation.appId,
+ )
+ # 清空post_body中的app信息,以Conversation为准
+ self.post_body.app.app_id = conversation.appId
+ elif self.post_body.app and self.post_body.app.app_id:
+ # Conversation中appId为None,使用post_body中的app信息
+ final_app_id = self.post_body.app.app_id
+ _logger.info("[Scheduler] Conversation中无appId,使用post_body中的app_id: %s", final_app_id)
+ else:
+ # 两者都为None,fallback到QAExecutor
+ final_app_id = None
+ _logger.info("[Scheduler] Conversation和post_body中均无appId,fallback到智能问答")
+
+ return final_app_id
+
+ async def _create_executor_task(self, final_app_id: uuid.UUID | None) -> asyncio.Task | None:
+ """
+ 根据 app_id 创建对应的执行器任务
+
+ Args:
+ final_app_id: 要使用的 app_id,None 表示使用 QA 模式
+
+ Returns:
+ 创建的异步任务,如果发生错误则返回 None
+
+ """
+ if final_app_id is None:
+ # 没有app相关信息,运行QAExecutor
+ _logger.info("[Scheduler] 运行智能问答模式")
+ await self._push_init_message(3, is_flow=False)
+ return asyncio.create_task(self._run_qa())
+
+ # 有app信息,获取app详情和元数据
+ try:
+ app_data = await AppCenterManager.fetch_app_metadata_by_id(final_app_id)
+ except ValueError:
+ _logger.exception("[Scheduler] App %s 不存在或元数据文件缺失", final_app_id)
+ await self.queue.close()
+ return None
+
+ # 获取上下文窗口并根据app类型决定执行器
+ context_num = app_data.history_len
+ _logger.info("[Scheduler] App上下文窗口: %d", context_num)
+
+ if app_data.app_type == AppType.FLOW:
+ _logger.info("[Scheduler] 运行Flow应用")
+ await self._push_init_message(context_num, is_flow=True)
+ return asyncio.create_task(self._run_flow())
+
+ _logger.info("[Scheduler] 运行MCP Agent应用")
+ await self._push_init_message(context_num, is_flow=False)
+ return asyncio.create_task(self._run_agent())
+
+ async def _handle_task_cancellation(self, main_task: asyncio.Task) -> None:
+ """
+ 处理任务取消的逻辑
+
+ Args:
+ main_task: 需要取消的主任务
+
+ """
+ _logger.warning("[Scheduler] 用户取消执行,正在终止...")
+ main_task.cancel()
+ try:
+ await main_task
+ except asyncio.CancelledError:
+ _logger.info("[Scheduler] 主任务已取消")
+ except Exception:
+ _logger.exception("[Scheduler] 终止工作流时发生错误")
+
+ # 检查ExecutorState,若为init、running或waiting,将状态改为cancelled
+ if self.task.state and self.task.state.executorStatus in [
+ ExecutorStatus.INIT,
+ ExecutorStatus.RUNNING,
+ ExecutorStatus.WAITING,
+ ]:
+ self.task.state.executorStatus = ExecutorStatus.CANCELLED
+ _logger.info("[Scheduler] ExecutorStatus已设置为CANCELLED")
+ elif self.task.state:
+ _logger.info("[Scheduler] ExecutorStatus为 %s,保持不变", self.task.state.executorStatus)
+ else:
+ _logger.warning("[Scheduler] task.state为None,无法更新ExecutorStatus")
+
async def _monitor_activity(self, kill_event: asyncio.Event, user_sub: str) -> None:
"""监控用户活动状态,不活跃时终止工作流"""
try:
@@ -263,76 +376,105 @@ class Scheduler:
return new_conv
- async def _init_task(self) -> None:
+ def _create_new_task(self, task_id: uuid.UUID, user_sub: str, conversation_id: uuid.UUID | None) -> TaskData:
+ """创建新的TaskData"""
+ return TaskData(
+ metadata=Task(
+ id=task_id,
+ userSub=user_sub,
+ conversationId=conversation_id,
+ ),
+ runtime=TaskRuntime(
+ taskId=task_id,
+ ),
+ state=None,
+ context=[],
+ )
+
+ async def _init_task(self, task_id: uuid.UUID, user_sub: str) -> None:
"""初始化Task"""
- self.task = await TaskManager.get_task_data_by_task_id(self.post_body.task_id)
- if not self.task:
- self.task = await TaskManager.init_new_task(self.post_body.task_id, self.post_body.conversation_id, self.post_body.language, self.post_body.app.app_id)
- self.task.runtime.question = self.post_body.question
- self.task.state.app_id = self.post_body.app.app_id if self.post_body.app else None
+ conversation_id = self.post_body.conversation_id
+
+ # 若没有Conversation ID则直接创建task
+ if not conversation_id:
+ _logger.info("[Scheduler] 无Conversation ID,直接创建新任务")
+ self.task = self._create_new_task(task_id, user_sub, None)
+ return
+
+ # 有ConversationID则尝试从ConversationID中恢复task
+ _logger.info("[Scheduler] 尝试从Conversation ID %s 恢复任务", conversation_id)
+
+ # 尝试恢复任务
+ restored = False
+ try:
+ # 先验证conversation是否存在且属于该用户
+ conversation = await ConversationManager.get_conversation_by_conversation_id(
+ user_sub,
+ conversation_id,
+ )
+
+ if conversation:
+ # 尝试从Conversation中获取最后一个Task
+ last_task = await TaskManager.get_task_by_conversation_id(conversation_id, user_sub)
+
+ # 如果能获取到task,则加载完整的TaskData
+ if last_task and last_task.id:
+ _logger.info("[Scheduler] 从Conversation恢复任务 %s", last_task.id)
+ task_data = await TaskManager.get_task_data_by_task_id(last_task.id)
+ if task_data:
+ self.task = task_data
+ # 更新task_id为新的task_id
+ self.task.metadata.id = task_id
+ self.task.runtime.taskId = task_id
+ if self.task.state:
+ self.task.state.taskId = task_id
+ restored = True
+ else:
+ _logger.warning(
+ "[Scheduler] Conversation %s 不存在或无权访问,创建新任务",
+ conversation_id,
+ )
+ except Exception:
+ _logger.exception("[Scheduler] 从Conversation恢复任务失败,创建新任务")
+
+ # 恢复不成功则新建task
+ if not restored:
+ _logger.info("[Scheduler] 无法恢复任务,创建新任务")
+ self.task = self._create_new_task(task_id, user_sub, conversation_id)
async def run(self) -> None:
"""运行调度器"""
- # 如果是智能问答,直接执行
_logger.info("[Scheduler] 开始执行")
- # 创建用于通信的事件
+
+ # 创建用于通信的事件和监控任务
kill_event = asyncio.Event()
monitor = asyncio.create_task(self._monitor_activity(kill_event, self.task.metadata.userSub))
- rag_method = True
- if self.post_body.app and self.post_body.app.app_id:
- rag_method = False
+ # 确定最终使用的 app_id
+ final_app_id = await self._determine_app_id()
+
+ # 根据 app_id 创建对应的执行器任务
+ main_task = await self._create_executor_task(final_app_id)
+ if main_task is None:
+ # 创建任务失败(通常是因为 app 不存在),直接返回
+ return
- if rag_method:
- kb_ids = await KnowledgeBaseManager.get_selected_kb(self.task.metadata.userSub)
- await self._push_init_message(3, is_flow=False)
- # 启动监控任务和主任务
- main_task = asyncio.create_task(self._run_qa(
- self.task, self.queue, self.task.ids.user_sub, llm, history, doc_ids, rag_data))
- else:
- # 查找对应的App元数据
- app_data = await AppCenterManager.fetch_app_metadata_by_id(self.post_body.app.app_id)
- if not app_data:
- _logger.error("[Scheduler] App %s 不存在", self.post_body.app.app_id)
- await self.queue.close()
- return
-
- # 获取上下文
- if app_data.app_type == AppType.FLOW:
- # 需要执行Flow
- is_flow = True
- else:
- # Agent 应用
- is_flow = False
- # 启动监控任务和主任务
- main_task = asyncio.create_task(self._run_agent(self.queue, self.post_body, executor_background))
# 等待任一任务完成
done, pending = await asyncio.wait(
[main_task, monitor],
return_when=asyncio.FIRST_COMPLETED,
)
- # 如果用户手动终止,则cancel主任务
+ # 如果用户手动终止,则取消主任务
if kill_event.is_set():
- _logger.warning("[Scheduler] 用户取消执行,正在终止...")
- main_task.cancel()
- if self.task.state.executorStatus in [ExecutorStatus.RUNNING, ExecutorStatus.WAITING]:
- self.task.state.executorStatus = ExecutorStatus.CANCELLED
- try:
- await main_task
- _logger.info("[Scheduler] 工作流执行已被终止")
- except Exception:
- _logger.exception("[Scheduler] 终止工作流时发生错误")
+ await self._handle_task_cancellation(main_task)
# 更新Task,发送结束消息
- _logger.info("[Scheduler] 发送结束消息")
- await self.queue.push_output(self.task, event_type=EventType.DONE.value, data={})
+ await self._push_done_message()
# 关闭Queue
await self.queue.close()
- return
-
async def _run_qa(self) -> None:
qa_executor = QAExecutor(
@@ -417,24 +559,88 @@ class Scheduler:
self.task = agent_exec.task
- async def _save_task(self) -> None:
- """保存Task"""
- # 构造RecordContent
+ async def _save_data(self) -> None:
+ """保存当前Executor、Task、Record等的数据"""
+ task = self.task
+ user_sub = self.task.metadata.userSub
+ post_body = self.post_body
+
+ # 构造文档列表
used_docs = []
- order_to_id = {}
- for docs in task.runtime.documents:
- used_docs.append(
- RecordGroupDocument(
- _id=docs["id"],
- author=docs.get("author", ""),
- order=docs.get("order", 0),
- name=docs["name"],
- abstract=docs.get("abstract", ""),
- extension=docs.get("extension", ""),
- size=docs.get("size", 0),
- associated="answer",
- created_at=docs.get("created_at", round(datetime.now(UTC).timestamp(), 3)),
- ),
- )
- if docs.get("order") is not None:
- order_to_id[docs["order"]] = docs["id"]
+ record_group = None # TODO: 需要从适当的地方获取record_group
+
+ # 处理文档
+ if hasattr(task.runtime, "documents") and task.runtime.documents:
+ for docs in task.runtime.documents:
+ doc_dict = docs if isinstance(docs, dict) else (docs.model_dump() if hasattr(docs, "model_dump") else docs)
+ used_docs.append(doc_dict)
+
+ # 组装RecordContent
+ record_content = RecordContent(
+ question=task.runtime.question if hasattr(task.runtime, "question") else "",
+ answer=task.runtime.answer if hasattr(task.runtime, "answer") else "",
+ facts=task.runtime.facts if hasattr(task.runtime, "facts") else [],
+ data={},
+ )
+
+ try:
+ # 加密Record数据
+ encrypt_data, encrypt_config = Security.encrypt(record_content.model_dump_json(by_alias=True))
+ except Exception:
+ _logger.exception("[Scheduler] 问答对加密错误")
+ return
+
+ # 保存Flow信息
+ if task.state:
+ # 遍历查找数据,并添加
+ await TaskManager.save_flow_context(task.context)
+
+ # 整理Record数据
+ current_time = round(datetime.now(UTC).timestamp(), 2)
+ record = Record(
+ id=task.metadata.id, # record_id
+ conversationId=task.metadata.conversationId,
+ taskId=task.metadata.id,
+ userSub=user_sub,
+ content=encrypt_data,
+ key=encrypt_config,
+ metadata=RecordMetadata(
+ timeCost=0, # TODO: 需要从task中获取时间成本
+ inputTokens=0, # TODO: 需要从task中获取token信息
+ outputTokens=0, # TODO: 需要从task中获取token信息
+ feature={},
+ ),
+ createdAt=current_time,
+ flow=FlowHistory(
+ flow_id=task.state.flow_id if task.state else "",
+ flow_name=task.state.flow_name if task.state else "",
+ flow_status=task.state.flow_status if task.state else StepStatus.SUCCESS,
+ history_ids=[context.id for context in task.context],
+ ) if task.state else None,
+ )
+
+ # 修改文件状态
+ if record_group and post_body.conversation_id:
+ await DocumentManager.change_doc_status(user_sub, post_body.conversation_id, record_group)
+
+ # 保存Record
+ if post_body.conversation_id:
+ await RecordManager.insert_record_data(user_sub, post_body.conversation_id, record)
+
+ # 保存与答案关联的文件
+ if record_group and used_docs:
+ await DocumentManager.save_answer_doc(user_sub, record_group, used_docs)
+
+ if post_body.app and post_body.app.app_id:
+ # 更新最近使用的应用
+ await AppCenterManager.update_recent_app(user_sub, post_body.app.app_id)
+
+ # 若状态为成功,删除Task
+ if not task.state or task.state.flow_status in [StepStatus.SUCCESS, StepStatus.ERROR, StepStatus.CANCELLED]:
+ await TaskManager.delete_task_by_task_id(task.metadata.id)
+ else:
+ # 更新Task
+ await TaskManager.save_task(task.metadata.id, task.metadata)
+ await TaskManager.save_task_runtime(task.runtime)
+ if task.state:
+ await TaskManager.save_executor_checkpoint(task.state)
diff --git a/apps/schemas/appcenter.py b/apps/schemas/appcenter.py
index 090909b4a2caccc1c302535c20e56d86f9690274..ac295a8db8fbb7c457609d6aca89f19cbccdb673 100644
--- a/apps/schemas/appcenter.py
+++ b/apps/schemas/appcenter.py
@@ -48,7 +48,7 @@ class AppLink(BaseModel):
class AppFlowInfo(BaseModel):
"""应用工作流数据结构"""
- id: str = Field(..., description="工作流ID")
+ id: uuid.UUID = Field(..., description="工作流ID")
name: str = Field(default="", description="工作流名称")
description: str = Field(default="", description="工作流简介")
debug: bool = Field(default=False, description="是否经过调试")
diff --git a/apps/schemas/record.py b/apps/schemas/record.py
index b8da8bd234ae57f9e33066fead7b04b11c7639a3..cc8f53957eb602a540a41671b28702643f4fd4a1 100644
--- a/apps/schemas/record.py
+++ b/apps/schemas/record.py
@@ -36,7 +36,7 @@ class RecordFlowStep(BaseModel):
ex_data: dict[str, Any] | None = Field(default=None, alias="exData")
-class RecordFlow(BaseModel):
+class RecordExecutor(BaseModel):
"""Flow的执行信息"""
id: str
@@ -83,7 +83,7 @@ class RecordData(BaseModel):
conversation_id: uuid.UUID = Field(alias="conversationId")
task_id: uuid.UUID | None = Field(alias="taskId", default=None)
document: list[RecordDocument] = []
- flow: RecordFlow | None = None
+ flow: RecordExecutor | None = None
content: RecordContent
metadata: RecordMetadata
comment: CommentType = Field(default=CommentType.NONE)
diff --git a/apps/services/flow_service.py b/apps/services/flow_service.py
index d12bd131239f769a5f40bbb20461c9eca32c6fba..5fa99df74701e0b42d8dcff125baf48bd9967f05 100644
--- a/apps/services/flow_service.py
+++ b/apps/services/flow_service.py
@@ -118,21 +118,16 @@ class FlowServiceManager:
# 验证节点ID并获取起始和终止节点
start_id, end_id = await FlowServiceManager._validate_node_ids(flow_item)
- # 验证边的合法性并获取节点的入度和出度
- in_deg, out_deg = await FlowServiceManager._validate_edges(flow_item.edges)
-
- # 验证起始和终止节点的入度和出度
- await FlowServiceManager._validate_node_degrees(str(start_id), str(end_id), in_deg, out_deg)
+ # 验证边的合法性
+ await FlowServiceManager._validate_edges(flow_item.edges)
return start_id, end_id
@staticmethod
- async def _validate_edges(edges: list[EdgeItem]) -> tuple[dict[str, int], dict[str, int]]:
- """验证边的合法性并计算节点的入度和出度;当边的ID重复、起始终止节点相同、分支重复或分支包含非法字符时抛出异常"""
+ async def _validate_edges(edges: list[EdgeItem]) -> None:
+ """验证边的合法性;当边的ID重复、起始终止节点相同、分支重复或分支包含非法字符时抛出异常"""
ids = set()
branches = {}
- in_deg = {}
- out_deg = {}
for e in edges:
# 验证分支ID是否包含非法字符
@@ -162,25 +157,6 @@ class FlowServiceManager:
branches[e.source_branch].add(e.branch_id)
- in_deg[e.target_branch] = in_deg.get(e.target_branch, 0) + 1
- out_deg[e.source_branch] = out_deg.get(e.source_branch, 0) + 1
-
- return in_deg, out_deg
-
- @staticmethod
- async def _validate_node_degrees(
- start_id: str, end_id: str, in_deg: dict[str, int], out_deg: dict[str, int],
- ) -> None:
- """验证起始和终止节点的入度和出度;当起始节点入度不为0或终止节点出度不为0时抛出异常"""
- if start_id in in_deg and in_deg[start_id] != 0:
- err = f"[FlowService] 起始节点{start_id}的入度不为0"
- logger.error(err)
- raise FlowNodeValidationError(err)
- if end_id in out_deg and out_deg[end_id] != 0:
- err = f"[FlowService] 终止节点{end_id}的出度不为0"
- logger.error(err)
- raise FlowNodeValidationError(err)
-
@staticmethod
async def validate_flow_connectivity(flow_item: FlowItem) -> bool: # noqa: C901
"""
diff --git a/design/services/comment.md b/design/services/comment.md
new file mode 100644
index 0000000000000000000000000000000000000000..fd6510cf30c633d76e5fbb4c302d595dbf3e689f
--- /dev/null
+++ b/design/services/comment.md
@@ -0,0 +1,819 @@
+# Comment Module Documentation
+
+## 模块概述
+
+评论模块(Comment Module)负责处理用户对问答记录(Record)的反馈功能,包括点赞(Like)、点踩(Dislike)以及详细的反馈意见收集。该模块是系统用户体验反馈机制的核心组件。
+
+## 目录结构
+
+```text
+apps/
+├── routers/
+│ └── comment.py # FastAPI路由层,处理HTTP请求
+├── services/
+│ └── comment.py # 业务逻辑层,评论管理器
+├── models/
+│ └── comment.py # 数据库模型定义
+└── schemas/
+ ├── comment.py # 请求数据模型
+ └── record.py # 记录相关数据模型(包含评论)
+```
+
+## 核心组件
+
+### 1. 数据模型层 (models/comment.py)
+
+#### Comment Table Schema
+
+| 字段名 | 类型 | 说明 | 约束 |
+|--------|------|------|------|
+| id | BigInteger | 主键ID | Primary Key, Auto Increment |
+| recordId | UUID | 问答对ID | Foreign Key → framework_record.id, Indexed |
+| userSub | String(50) | 用户标识 | Foreign Key → framework_user.userSub |
+| commentType | Enum(CommentType) | 评论类型 | Not Null |
+| feedbackType | ARRAY(String) | 投诉类别列表 | Not Null |
+| feedbackLink | String(1000) | 投诉相关链接 | Not Null |
+| feedbackContent | String(1000) | 投诉详细内容 | Not Null |
+| createdAt | DateTime(TZ) | 创建时间 | Not Null, Default: UTC Now |
+
+#### CommentType Enum
+
+```python
+class CommentType(str, PyEnum):
+ LIKE = "liked" # 点赞
+ DISLIKE = "disliked" # 点踩
+ NONE = "none" # 无评论
+```
+
+### 2. 业务逻辑层 (services/comment.py)
+
+#### CommentManager
+
+提供评论的核心业务逻辑操作:
+
+- **query_comment(record_id: str)**: 根据问答ID查询评论
+- **update_comment(record_id: str, data: RecordComment, user_sub: str)**: 创建或更新评论
+
+### 3. 路由层 (routers/comment.py)
+
+#### API Endpoint
+
+- **POST /api/comment**: 添加或更新评论
+ - 认证要求: Session验证 + Personal Token验证
+ - 请求体: AddCommentData
+ - 响应: ResponseData
+
+### 4. 数据传输对象 (schemas)
+
+#### AddCommentData (请求模型)
+
+```python
+{
+ "record_id": str, # 问答记录ID
+ "comment": CommentType, # 评论类型 (liked/disliked/none)
+ "dislike_reason": str, # 点踩原因 (分号分隔, max 200字符)
+ "reason_link": str, # 相关链接 (max 200字符)
+ "reason_description": str # 详细描述 (max 500字符)
+}
+```
+
+#### RecordComment (内部数据模型)
+
+```python
+{
+ "comment": CommentType, # 评论类型
+ "feedback_type": list[str], # 反馈类型列表 (别名: dislike_reason)
+ "feedback_link": str, # 反馈链接 (别名: reason_link)
+ "feedback_content": str, # 反馈内容 (别名: reason_description)
+ "feedback_time": float # 反馈时间戳
+}
+```
+
+## 架构设计
+
+### 系统架构图
+
+```mermaid
+graph TB
+ subgraph "Client Layer"
+ Client[前端客户端]
+ end
+
+ subgraph "API Layer"
+ Router[FastAPI Router
routers/comment.py]
+ Auth1[Session验证]
+ Auth2[Token验证]
+ end
+
+ subgraph "Service Layer"
+ Manager[CommentManager
services/comment.py]
+ end
+
+ subgraph "Data Layer"
+ Model[Comment Model
models/comment.py]
+ DB[(PostgreSQL Database)]
+ end
+
+ subgraph "Schema Layer"
+ Schema1[AddCommentData]
+ Schema2[RecordComment]
+ end
+
+ Client -->|HTTP POST| Router
+ Router -->|依赖注入| Auth1
+ Router -->|依赖注入| Auth2
+ Router -->|数据验证| Schema1
+ Router -->|调用业务逻辑| Manager
+ Manager -->|数据转换| Schema2
+ Manager -->|ORM操作| Model
+ Model -->|SQLAlchemy| DB
+```
+
+### 数据流程图
+
+```mermaid
+flowchart TD
+ Start([用户提交评论]) --> Validate{数据验证}
+ Validate -->|验证失败| Error1[返回400错误]
+ Validate -->|验证成功| Auth{身份认证}
+
+ Auth -->|认证失败| Error2[返回401错误]
+ Auth -->|认证成功| ParseData[解析dislike_reason
分号分隔转列表]
+
+ ParseData --> CreateDTO[创建RecordComment对象]
+ CreateDTO --> QueryDB{查询数据库
记录是否存在?}
+
+ QueryDB -->|存在| Update[更新现有记录
commentType
feedbackType
feedbackLink
feedbackContent]
+ QueryDB -->|不存在| Create[创建新记录
包含recordId
userSub等字段]
+
+ Update --> Commit[提交事务]
+ Create --> Merge[Merge操作]
+ Merge --> Commit
+
+ Commit --> Success{提交成功?}
+ Success -->|失败| Error3[返回400错误]
+ Success -->|成功| Return[返回200 OK]
+
+ Error1 --> End([结束])
+ Error2 --> End
+ Error3 --> End
+ Return --> End
+```
+
+## 时序图
+
+### 添加评论完整流程
+
+```mermaid
+sequenceDiagram
+ actor User as 用户
+ participant Client as 前端客户端
+ participant Router as FastAPI Router
+ participant AuthMiddleware as 认证中间件
+ participant Manager as CommentManager
+ participant Session as DB Session
+ participant DB as PostgreSQL
+
+ User->>Client: 点赞/点踩并提交反馈
+ Client->>Router: POST /api/comment
{record_id, comment, ...}
+
+ activate Router
+ Router->>AuthMiddleware: verify_session()
+ activate AuthMiddleware
+ AuthMiddleware-->>Router: session验证结果
+ deactivate AuthMiddleware
+
+ Router->>AuthMiddleware: verify_personal_token()
+ activate AuthMiddleware
+ AuthMiddleware-->>Router: token验证结果
+ deactivate AuthMiddleware
+
+ Router->>Router: 解析dislike_reason
分号分隔 → list
+ Router->>Router: 创建RecordComment对象
设置feedback_time
+
+ Router->>Manager: update_comment(record_id, data, user_sub)
+ activate Manager
+
+ Manager->>Session: 创建异步会话
+ activate Session
+
+ Manager->>DB: SELECT * FROM framework_comment
WHERE recordId = ?
+ activate DB
+ DB-->>Manager: 查询结果
+ deactivate DB
+
+ alt 记录存在
+ Manager->>Manager: 更新现有对象字段
commentType, feedbackType等
+ else 记录不存在
+ Manager->>Manager: 创建新Comment对象
+ Manager->>Session: merge(comment_info)
+ end
+
+ Manager->>Session: commit()
+ Session->>DB: 提交事务
+ activate DB
+ DB-->>Session: 提交成功
+ deactivate DB
+
+ Session-->>Manager: 完成
+ deactivate Session
+ Manager-->>Router: None (成功)
+ deactivate Manager
+
+ Router->>Client: 200 OK
{code: 200, message: "success"}
+ deactivate Router
+ Client->>User: 显示反馈成功
+```
+
+### 查询评论流程
+
+```mermaid
+sequenceDiagram
+ participant Service as 业务服务
+ participant Manager as CommentManager
+ participant Session as DB Session
+ participant DB as PostgreSQL
+
+ Service->>Manager: query_comment(record_id)
+ activate Manager
+
+ Manager->>Session: 创建异步会话
+ activate Session
+
+ Manager->>DB: SELECT * FROM framework_comment
WHERE recordId = UUID(record_id)
+ activate DB
+ DB-->>Manager: 查询结果
+ deactivate DB
+
+ alt 记录存在
+ Manager->>Manager: 构建RecordComment对象
映射字段名称
转换时间戳
+ Manager-->>Service: RecordComment对象
+ else 记录不存在
+ Manager-->>Service: None
+ end
+
+ deactivate Session
+ deactivate Manager
+```
+
+## 状态图
+
+### 评论状态转换
+
+```mermaid
+stateDiagram-v2
+ [*] --> NONE: 创建问答记录
+
+ NONE --> LIKE: 用户点赞
+ NONE --> DISLIKE: 用户点踩
+
+ LIKE --> DISLIKE: 改为点踩
+ LIKE --> NONE: 取消点赞
+
+ DISLIKE --> LIKE: 改为点赞
+ DISLIKE --> NONE: 取消点踩
+
+ NONE --> [*]
+ LIKE --> [*]
+ DISLIKE --> [*]
+
+ note right of DISLIKE
+ 点踩时需要提供:
+ - feedbackType (类别列表)
+ - feedbackLink (相关链接)
+ - feedbackContent (详细说明)
+ end note
+```
+
+## 类图
+
+```mermaid
+classDiagram
+ class Comment {
+ +int id
+ +UUID recordId
+ +str userSub
+ +CommentType commentType
+ +list~str~ feedbackType
+ +str feedbackLink
+ +str feedbackContent
+ +datetime createdAt
+ }
+
+ class CommentType {
+ <>
+ LIKE
+ DISLIKE
+ NONE
+ }
+
+ class AddCommentData {
+ +str record_id
+ +CommentType comment
+ +str dislike_reason
+ +str reason_link
+ +str reason_description
+ +model_validate()
+ }
+
+ class RecordComment {
+ +CommentType comment
+ +list~str~ feedback_type
+ +str feedback_link
+ +str feedback_content
+ +float feedback_time
+ }
+
+ class CommentManager {
+ +query_comment(record_id)$ RecordComment|None
+ +update_comment(record_id, data, user_sub)$ None
+ }
+
+ class Router {
+ +add_comment(request, post_body) JSONResponse
+ }
+
+ Comment --> CommentType : uses
+ AddCommentData --> CommentType : uses
+ RecordComment --> CommentType : uses
+ Router --> AddCommentData : validates
+ Router --> CommentManager : calls
+ CommentManager --> RecordComment : transforms
+ CommentManager --> Comment : operates
+```
+
+## 数据库ER图
+
+```mermaid
+erDiagram
+ FRAMEWORK_USER ||--o{ FRAMEWORK_COMMENT : creates
+ FRAMEWORK_RECORD ||--o| FRAMEWORK_COMMENT : has
+
+ FRAMEWORK_USER {
+ string userSub PK
+ string userName
+ datetime createdAt
+ }
+
+ FRAMEWORK_RECORD {
+ uuid id PK
+ uuid conversationId
+ string userSub FK
+ string content
+ datetime createdAt
+ }
+
+ FRAMEWORK_COMMENT {
+ bigint id PK
+ uuid recordId FK
+ string userSub FK
+ enum commentType
+ array feedbackType
+ string feedbackLink
+ string feedbackContent
+ datetime createdAt
+ }
+```
+
+## 核心业务逻辑
+
+### 1. 评论创建/更新逻辑
+
+评论模块采用幂等性设计,支持对同一问答记录进行多次评论更新。核心流程如下:
+
+#### 步骤一:查询现有评论
+
+系统首先根据问答记录ID查询数据库中是否已存在对应的评论记录。
+
+#### 步骤二:判断操作类型
+
+- 如果记录已存在,则执行更新操作,修改现有记录的评论类型、反馈类型、反馈链接和反馈内容
+- 如果记录不存在,则创建新的评论记录,包含问答记录ID、用户标识、评论类型等完整信息
+
+#### 步骤三:数据持久化
+
+使用数据库事务确保数据一致性,通过SQLAlchemy的merge操作实现UPSERT语义,自动处理插入或更新逻辑。
+
+### 2. 数据转换逻辑
+
+#### API → Service 层转换
+
+路由层接收到前端请求后,需要进行数据格式转换:
+
+- 将分号分隔的字符串格式的点踩原因转换为数组格式
+- 将API字段名映射为内部数据模型字段名
+- 生成当前时间戳作为反馈时间
+
+#### Service → Model 层映射
+
+业务逻辑层将处理后的数据映射到数据库模型:
+
+- 评论类型字段直接映射
+- 反馈类型列表映射为PostgreSQL数组类型
+- 反馈链接和反馈内容直接映射
+- 用户标识和记录ID保持原有格式
+
+#### Model → Schema 层查询映射
+
+查询操作时将数据库模型数据转换为API响应格式:
+
+- 数据库字段名转换为前端友好的字段名
+- 时间戳格式转换,将数据库的datetime对象转换为Unix时间戳
+- 数组类型数据保持原有格式返回给前端
+
+## 接口文档
+
+### POST /api/comment
+
+添加或更新评论
+
+#### 认证要求
+
+- Session验证 (verify_session)
+- Personal Token验证 (verify_personal_token)
+
+#### 请求
+
+**Headers:**
+
+```http
+Content-Type: application/json
+Authorization: Bearer
+Cookie: session=
+```
+
+**Body:**
+
+```json
+{
+ "record_id": "550e8400-e29b-41d4-a716-446655440000",
+ "comment": "disliked",
+ "dislike_reason": "答非所问;信息不准确;",
+ "reason_link": "https://example.com/issue/123",
+ "reason_description": "回答内容与问题不符,建议补充相关文档引用。"
+}
+```
+
+**字段说明:**
+
+| 字段 | 类型 | 必填 | 说明 | 限制 |
+|------|------|------|------|------|
+| record_id | string | 是 | 问答记录UUID | UUID格式 |
+| comment | string | 是 | 评论类型 | "liked", "disliked", "none" |
+| dislike_reason | string | 否 | 点踩原因 | 分号分隔,最长200字符 |
+| reason_link | string | 否 | 相关链接 | 最长200字符 |
+| reason_description | string | 否 | 详细描述 | 最长500字符 |
+
+#### 响应
+
+**成功 (200 OK):**
+
+```json
+{
+ "code": 200,
+ "message": "success",
+ "result": {}
+}
+```
+
+**失败 (400 Bad Request):**
+
+```json
+{
+ "code": 400,
+ "message": "record_id not found",
+ "result": {}
+}
+```
+
+**失败 (401 Unauthorized):**
+
+```json
+{
+ "code": 401,
+ "message": "Authentication failed"
+}
+```
+
+#### 示例
+
+**cURL:**
+
+```bash
+curl -X POST "http://localhost:8000/api/comment" \
+ -H "Content-Type: application/json" \
+ -H "Authorization: Bearer eyJhbGc..." \
+ --cookie "session=abc123..." \
+ -d '{
+ "record_id": "550e8400-e29b-41d4-a716-446655440000",
+ "comment": "liked",
+ "dislike_reason": "",
+ "reason_link": "",
+ "reason_description": ""
+ }'
+```
+
+**Python:**
+
+```python
+import requests
+
+response = requests.post(
+ "http://localhost:8000/api/comment",
+ json={
+ "record_id": "550e8400-e29b-41d4-a716-446655440000",
+ "comment": "disliked",
+ "dislike_reason": "答非所问;信息不准确;",
+ "reason_link": "https://example.com/issue/123",
+ "reason_description": "回答内容与问题不符"
+ },
+ headers={"Authorization": "Bearer "},
+ cookies={"session": ""}
+)
+```
+
+## 关键特性
+
+### 1. 幂等性设计
+
+- 同一个 record_id 的评论支持多次更新
+- 使用 SQLAlchemy 的 `merge` 操作实现 UPSERT 语义
+- 自动识别创建或更新操作
+
+### 2. 数据完整性
+
+- recordId 外键约束 → framework_record.id
+- userSub 外键约束 → framework_user.userSub
+- 索引优化:recordId 字段建立索引提高查询性能
+
+### 3. 字段别名映射
+
+使用 Pydantic 的 `Field(alias=...)` 实现前端友好的字段命名:
+
+| 内部字段 | API别名 |
+|----------|---------|
+| feedback_type | dislike_reason |
+| feedback_link | reason_link |
+| feedback_content | reason_description |
+
+### 4. 时间处理
+
+- 数据库存储:`datetime` 对象,带时区(UTC)
+- API传输:`float` 类型的 Unix 时间戳(秒,保留3位小数)
+
+### 5. 数组字段处理
+
+- API输入:分号分隔的字符串 `"reason1;reason2;"`
+- 数据转换:`split(";")[:-1]` → `["reason1", "reason2"]`
+- 数据库存储:PostgreSQL ARRAY 类型
+
+## 错误处理
+
+### 常见错误场景
+
+| 错误场景 | HTTP状态码 | 处理方式 |
+|----------|-----------|----------|
+| Session验证失败 | 401 | 依赖注入层拦截 |
+| Token验证失败 | 401 | 依赖注入层拦截 |
+| record_id不存在 | 400 | 业务逻辑层返回None |
+| UUID格式错误 | 400 | Pydantic验证失败 |
+| 字段长度超限 | 422 | Pydantic验证失败 |
+| 数据库连接失败 | 500 | 异常传播至错误处理中间件 |
+| 外键约束违反 | 500 | 数据库异常 |
+
+### 异常传播链
+
+```mermaid
+flowchart LR
+ A[Router Layer] --> B{验证异常?}
+ B -->|是| C[422 Unprocessable Entity]
+ B -->|否| D[Service Layer]
+ D --> E{业务异常?}
+ E -->|是| F[400 Bad Request]
+ E -->|否| G[Database Layer]
+ G --> H{DB异常?}
+ H -->|是| I[500 Internal Server Error]
+ H -->|否| J[200 OK]
+```
+
+## 性能优化
+
+### 1. 数据库索引
+
+```sql
+CREATE INDEX idx_comment_record_id ON framework_comment(recordId);
+```
+
+- 优化基于 recordId 的查询性能
+- 支持快速定位单个记录的评论
+
+### 2. 异步IO
+
+- 使用 SQLAlchemy 异步会话:`async with postgres.session()`
+- 非阻塞数据库操作
+- 提高并发处理能力
+
+### 3. 连接池管理
+
+- PostgreSQL 连接池复用
+- 避免频繁建立/关闭连接的开销
+
+### 4. 查询优化
+
+```python
+# 使用 one_or_none() 替代 all()[0]
+result = (await session.scalars(
+ select(Comment).where(Comment.recordId == uuid.UUID(record_id))
+)).one_or_none()
+```
+
+- 提前终止查询(最多返回1条)
+- 减少数据传输量
+
+## 安全考虑
+
+### 1. 认证与授权
+
+- **双重认证**:Session + Personal Token
+- **用户隔离**:userSub 关联确保数据隔离
+- **依赖注入**:在路由层统一进行身份验证
+
+### 2. 输入验证
+
+- **字段长度限制**:
+ - dislike_reason: 200字符
+ - reason_link: 200字符
+ - reason_description: 500字符
+- **类型校验**:Pydantic自动验证数据类型
+- **枚举约束**:CommentType限定为三个固定值
+
+### 3. SQL注入防护
+
+- 使用 SQLAlchemy ORM
+- 参数化查询
+- 无原生SQL拼接
+
+### 4. 数据完整性
+
+- 外键约束防止孤立记录
+- 事务保证原子性操作
+- NOT NULL约束防止空值
+
+## 扩展性设计
+
+### 1. 评论类型扩展
+
+当前支持三种类型,如需扩展:
+
+```python
+class CommentType(str, PyEnum):
+ LIKE = "liked"
+ DISLIKE = "disliked"
+ NONE = "none"
+ # 未来扩展
+ REPORT = "reported" # 举报
+ FAVORITE = "favorited" # 收藏
+```
+
+### 2. 反馈类型扩展
+
+feedbackType 使用数组类型,支持多选和动态扩展:
+
+```python
+# 前端定义的反馈类型选项
+FEEDBACK_OPTIONS = [
+ "答非所问",
+ "信息不准确",
+ "内容不完整",
+ "格式混乱",
+ "其他问题"
+]
+```
+
+### 3. 多语言支持
+
+在 RecordComment 中添加语言字段:
+
+```python
+class RecordComment(BaseModel):
+ comment: CommentType
+ language: str = "zh-CN" # 新增字段
+ # ... 其他字段
+```
+
+### 4. 统计分析扩展
+
+可基于现有数据进行扩展:
+
+- 点赞率统计
+- 点踩原因分布
+- 时间趋势分析
+- 用户反馈热点
+
+## 部署注意事项
+
+### 1. 数据库迁移
+
+使用 Alembic 进行数据库版本管理:
+
+```bash
+# 生成迁移脚本
+alembic revision --autogenerate -m "create comment table"
+
+# 执行迁移
+alembic upgrade head
+```
+
+### 2. 环境变量配置
+
+```bash
+# PostgreSQL 连接配置
+DATABASE_URL=postgresql+asyncpg://user:pass@host:5432/dbname
+
+# 日志级别
+LOG_LEVEL=INFO
+```
+
+### 3. 性能调优
+
+```python
+# 连接池配置
+pool_size = 20 # 连接池大小
+max_overflow = 10 # 最大溢出连接数
+pool_timeout = 30 # 连接超时时间
+pool_recycle = 3600 # 连接回收时间
+```
+
+## 未来优化方向
+
+### 1. 缓存策略
+
+- Redis 缓存热点评论数据
+- 减少数据库查询压力
+- 设置合理的缓存过期时间
+
+### 2. 批量操作支持
+
+- 支持批量查询评论
+- 支持批量更新评论
+- 减少网络往返次数
+
+### 3. 评论审核机制
+
+- 敏感词过滤
+- 内容审核工作流
+- 人工复审接口
+
+### 4. 数据分析增强
+
+- 实时统计面板
+- 评论情感分析
+- 用户行为画像
+
+## 常见问题 (FAQ)
+
+### Q1: 为什么使用 merge 而不是 add?
+
+**A:** `merge` 操作实现了 UPSERT 语义,可以自动判断是插入还是更新,简化了业务逻辑。
+
+### Q2: dislike_reason 为什么使用分号分隔?
+
+**A:** 前端传递多个原因时使用分号分隔的字符串格式,后端进行split转换为数组存储到数据库。
+
+### Q3: 为什么 update_comment 返回 None?
+
+**A:** 当前设计中更新操作无需返回值,失败时通过异常处理。未来可以改为返回布尔值或更新后的对象。
+
+### Q4: 如何防止恶意刷评论?
+
+**A:**
+
+- 身份认证确保用户真实性
+- 可增加频率限制(Rate Limiting)
+- 可增加同一用户对同一记录的评论次数限制
+
+### Q5: 评论数据如何归档?
+
+**A:**
+
+- 可按时间分区存储历史数据
+- 定期将旧数据迁移到冷存储
+- 保留索引供查询分析使用
+
+## 参考资料
+
+- [FastAPI Documentation](https://fastapi.tiangolo.com/)
+- [SQLAlchemy Async Documentation](https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html)
+- [Pydantic V2 Documentation](https://docs.pydantic.dev/latest/)
+- [PostgreSQL ARRAY Types](https://www.postgresql.org/docs/current/arrays.html)
+
+## 版本历史
+
+| 版本 | 日期 | 变更说明 |
+|------|------|----------|
+| 1.0.0 | 2025-10-13 | 初始版本,完成基础评论功能 |
+
+---
+
+**文档维护者**: Euler Copilot Framework Team
+**最后更新**: 2025-10-13
diff --git a/design/services/flow.md b/design/services/flow.md
index f682d78536663804aa3a32c4ca5054b448bfdff9..89dcf269309646a59e3ce635054bbc2620536e8b 100644
--- a/design/services/flow.md
+++ b/design/services/flow.md
@@ -163,7 +163,6 @@ sequenceDiagram
API->>FSM: validate_flow_illegal()
FSM->>FSM: 验证节点ID唯一性
FSM->>FSM: 验证边的合法性
- FSM->>FSM: 验证起始/终止节点
FSM-->>API: 验证通过
API->>FSM: validate_flow_connectivity()
FSM->>FSM: BFS 检查连通性
@@ -234,13 +233,7 @@ flowchart TD
CheckSelfLoop -->|是| Error4[抛出异常: 起止节点相同]
CheckSelfLoop -->|否| CheckBranch{分支合法?}
CheckBranch -->|否| Error5[抛出异常: 分支非法]
- CheckBranch -->|是| CalcDegree[计算入度/出度]
-
- CalcDegree --> CheckStartDeg{起始节点入度=0?}
- CheckStartDeg -->|否| Error6[抛出异常: 起始节点有入边]
- CheckStartDeg -->|是| CheckEndDeg{终止节点出度=0?}
- CheckEndDeg -->|否| Error7[抛出异常: 终止节点有出边]
- CheckEndDeg -->|是| ValidateConn[验证连通性]
+ CheckBranch -->|是| ValidateConn[验证连通性]
ValidateConn --> BFS[BFS遍历图]
BFS --> CheckReachable{所有节点可达?}
@@ -259,8 +252,6 @@ flowchart TD
Error3 --> End
Error4 --> End
Error5 --> End
- Error6 --> End
- Error7 --> End
Success --> End
```
@@ -473,7 +464,6 @@ Authorization: Bearer
| `validate_flow_connectivity` | 验证工作流连通性(BFS) | - |
| `_validate_node_ids` | 验证节点ID唯一性 | `FlowNodeValidationError` |
| `_validate_edges` | 验证边的合法性 | `FlowEdgeValidationError` |
-| `_validate_node_degrees` | 验证起始/终止节点的度数 | `FlowNodeValidationError` |
### 5.3 FlowLoader
@@ -588,7 +578,7 @@ flowchart LR
| 异常类 | 触发条件 | 处理方式 |
|--------|----------|----------|
-| `FlowNodeValidationError` | 节点ID重复、起始/终止节点不存在、度数错误 | 返回400错误 |
+| `FlowNodeValidationError` | 节点ID重复、起始/终止节点不存在 | 返回400错误 |
| `FlowEdgeValidationError` | 边ID重复、自环、分支非法 | 返回400错误 |
| `FlowBranchValidationError` | 分支字段缺失/为空、分支重复、非法字符 | 返回400错误 |
| `ValueError` | 应用不存在、工作流不存在、配置错误 | 返回404/500错误 |