From bfae1935cee872e3a8bfa7e1845983a42893b723 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Tue, 5 Aug 2025 12:16:52 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E8=BF=81=E7=A7=BB=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/models/app.py | 2 +- apps/models/user.py | 2 +- apps/routers/appcenter.py | 1 + apps/scheduler/pool/loader/app.py | 2 +- apps/scheduler/pool/loader/flow.py | 72 ++++----- apps/scheduler/scheduler/flow.py | 1 + apps/schemas/appcenter.py | 4 +- apps/schemas/flow.py | 2 +- apps/schemas/flow_topology.py | 21 +-- apps/schemas/response_data.py | 4 +- apps/services/appcenter.py | 242 +++++++++++++---------------- apps/services/flow.py | 157 +++++++------------ apps/services/service.py | 8 +- 13 files changed, 227 insertions(+), 291 deletions(-) diff --git a/apps/models/app.py b/apps/models/app.py index b6b415f0..27400fcb 100644 --- a/apps/models/app.py +++ b/apps/models/app.py @@ -22,7 +22,7 @@ class App(Base): """应用描述""" author: Mapped[str] = mapped_column(String(50), ForeignKey("framework_user.userSub"), nullable=False) """应用作者""" - type: Mapped[AppType] = mapped_column(Enum(AppType), nullable=False) + appType: Mapped[AppType] = mapped_column(Enum(AppType), nullable=False) # noqa: N815 """应用类型""" id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default_factory=uuid.uuid4) """应用ID""" diff --git a/apps/models/user.py b/apps/models/user.py index d63564e1..185d1184 100644 --- a/apps/models/user.py +++ b/apps/models/user.py @@ -55,7 +55,7 @@ class UserFavorite(Base): """用户收藏ID""" userSub: Mapped[str] = mapped_column(String(50), ForeignKey("framework_user.userSub"), index=True, nullable=False) # noqa: N815 """用户名""" - type: Mapped[UserFavoriteType] = mapped_column(Enum(UserFavoriteType), nullable=False) + favouriteType: Mapped[UserFavoriteType] = mapped_column(Enum(UserFavoriteType), nullable=False) # noqa: N815 """收藏类型""" itemId: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), index=True, nullable=False) # noqa: N815 """收藏项目ID(App/Service ID)""" diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py index a38aaa61..192e3eb0 100644 --- a/apps/routers/appcenter.py +++ b/apps/routers/appcenter.py @@ -57,6 +57,7 @@ async def get_applications( # noqa: PLR0913 result={}, ).model_dump(exclude_none=True, by_alias=True), ) + try: filter_type = ( AppFilterType.USER if createdByMe else (AppFilterType.FAVORITE if favorited else AppFilterType.ALL) diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py index 8cab6899..30503a0c 100644 --- a/apps/scheduler/pool/loader/app.py +++ b/apps/scheduler/pool/loader/app.py @@ -144,7 +144,7 @@ class AppLoader: name=metadata.name, description=metadata.description, author=metadata.author, - type=metadata.app_type, + appType=metadata.app_type, isPublished=metadata.published, permission=metadata.permission.type if metadata.permission else PermissionType.PRIVATE, ) diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 477aca4d..4c9d9671 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -110,48 +110,50 @@ class FlowLoader: @staticmethod - async def load(app_id: uuid.UUID, flow_id: uuid.UUID) -> Flow | None: + async def load(app_id: uuid.UUID, flow_id: uuid.UUID) -> Flow: """从文件系统中加载【单个】工作流""" logger.info("[FlowLoader] 应用 %s:加载工作流 %s...", flow_id, app_id) # 构建工作流文件路径 flow_path = BASE_PATH / str(app_id) / "flow" / f"{flow_id}.yaml" if not await flow_path.exists(): - logger.error("[FlowLoader] 应用 %s:工作流文件 %s 不存在", app_id, flow_path) - return None - - try: - # 加载YAML文件 - flow_yaml = await FlowLoader._load_yaml_file(flow_path) + err = f"[FlowLoader] 应用 {app_id}:工作流文件 {flow_path} 不存在" + logger.error(err) + raise FileNotFoundError(err) + + # 加载YAML文件 + flow_yaml = await FlowLoader._load_yaml_file(flow_path) + if not flow_yaml: + err = f"[FlowLoader] 应用 {app_id}:工作流文件 {flow_path} 加载失败" + logger.error(err) + raise RuntimeError(err) + + # 按顺序处理工作流配置 + for processor in [ + lambda y: FlowLoader._validate_basic_fields(y, flow_path), + lambda y: FlowLoader._process_edges(y, flow_id, app_id), + lambda y: FlowLoader._process_steps(y, flow_id, app_id), + ]: + flow_yaml = await processor(flow_yaml) if not flow_yaml: - return None - - # 按顺序处理工作流配置 - for processor in [ - lambda y: FlowLoader._validate_basic_fields(y, flow_path), - lambda y: FlowLoader._process_edges(y, flow_id, app_id), - lambda y: FlowLoader._process_steps(y, flow_id, app_id), - ]: - flow_yaml = await processor(flow_yaml) - if not flow_yaml: - return None - flow_config = Flow.model_validate(flow_yaml) - await FlowLoader._update_db( - app_id, - FlowInfo( - appId=app_id, - id=flow_id, - name=flow_config.name, - description=flow_config.description, - enabled=True, - path=str(flow_path), - debug=flow_config.debug, - ), - ) - return Flow.model_validate(flow_yaml) - except Exception: - logger.exception("[FlowLoader] 应用 %s:工作流 %s 格式不合法", app_id, flow_id) - return None + err = f"[FlowLoader] 应用 {app_id}:工作流文件 {flow_path} 格式不合法" + logger.error(err) + raise RuntimeError(err) + + flow_config = Flow.model_validate(flow_yaml) + await FlowLoader._update_db( + app_id, + FlowInfo( + appId=app_id, + id=flow_id, + name=flow_config.name, + description=flow_config.description, + enabled=True, + path=str(flow_path), + debug=flow_config.debug, + ), + ) + return Flow.model_validate(flow_yaml) @staticmethod diff --git a/apps/scheduler/scheduler/flow.py b/apps/scheduler/scheduler/flow.py index fd7dc3b1..8fc5754b 100644 --- a/apps/scheduler/scheduler/flow.py +++ b/apps/scheduler/scheduler/flow.py @@ -57,6 +57,7 @@ class FlowChooser: return self._user_selected top_flow = await self.get_top_flow() + # FIXME KnowledgeBase不是UUID,要改个值 if top_flow == "KnowledgeBase": return None diff --git a/apps/schemas/appcenter.py b/apps/schemas/appcenter.py index 42bb1d48..b37716f3 100644 --- a/apps/schemas/appcenter.py +++ b/apps/schemas/appcenter.py @@ -1,6 +1,8 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. """应用中心相关 API 基础数据结构定义""" +import uuid + from pydantic import BaseModel, Field from .enum_var import AppType, PermissionType @@ -9,7 +11,7 @@ from .enum_var import AppType, PermissionType class AppCenterCardItem(BaseModel): """应用中心卡片数据结构""" - app_id: str = Field(..., alias="appId", description="应用ID") + app_id: uuid.UUID = Field(..., alias="appId", description="应用ID") app_type: AppType = Field(..., alias="appType", description="应用类型") icon: str = Field(..., description="应用图标") name: str = Field(..., description="应用名称") diff --git a/apps/schemas/flow.py b/apps/schemas/flow.py index ffa9fb98..5cacb2ab 100644 --- a/apps/schemas/flow.py +++ b/apps/schemas/flow.py @@ -28,7 +28,7 @@ class Edge(BaseModel): class Step(BaseModel): """Flow中Step的数据""" - node: uuid.UUID = Field(description="Step的Node ID") + node: str = Field(description="Step的Node ID") type: str = Field(description="Step的类型") name: str = Field(description="Step的名称") description: str = Field(description="Step的描述") diff --git a/apps/schemas/flow_topology.py b/apps/schemas/flow_topology.py index 690a2bf4..09839373 100644 --- a/apps/schemas/flow_topology.py +++ b/apps/schemas/flow_topology.py @@ -12,10 +12,10 @@ from .enum_var import EdgeType class NodeMetaDataBase(BaseModel): """节点元数据基类""" - node_id: uuid.UUID = Field(alias="nodeId") + node_id: str = Field(alias="nodeId") call_id: str = Field(alias="callId") name: str - created_at: float | None = Field(alias="createdAt") + updated_at: float | None = Field(alias="updatedAt") class NodeMetaDataItem(NodeMetaDataBase): @@ -30,7 +30,7 @@ class NodeServiceItem(BaseModel): service_id: uuid.UUID = Field(..., alias="serviceId", description="服务ID") name: str = Field(..., description="服务名称") - node_meta_datas: list[NodeMetaDataItem] = Field(alias="nodeMetaDatas", default=[]) + data: list[NodeMetaDataBase] = Field(default=[]) created_at: str | None = Field(default=None, alias="createdAt", description="创建时间") @@ -41,27 +41,17 @@ class PositionItem(BaseModel): y: float = Field(default=0.0) -class DependencyItem(BaseModel): - """请求/响应中的节点依赖变量类""" - - node_id: str = Field(alias="nodeId") - type: str - - class NodeItem(BaseModel): """请求/响应中的节点变量类""" step_id: str = Field(alias="stepId", default="") service_id: str = Field(alias="serviceId", default="") - node_id: uuid.UUID = Field(alias="nodeId", default=uuid.UUID("00000000-0000-0000-0000-000000000000")) + node_id: str = Field(alias="nodeId", default="") name: str = Field(default="") call_id: str = Field(alias="callId", default="Empty") description: str = Field(default="") - enable: bool = Field(default=True) parameters: dict[str, Any] = Field(default={}) - depedency: DependencyItem | None = None position: PositionItem = Field(default=PositionItem()) - editable: bool = Field(default=True) class EdgeItem(BaseModel): @@ -77,11 +67,10 @@ class EdgeItem(BaseModel): class FlowItem(BaseModel): """请求/响应中的流变量类""" - flow_id: str | None = Field(alias="flowId", default="工作流ID") + flow_id: uuid.UUID = Field(alias="flowId", default=uuid.UUID("00000000-0000-0000-0000-000000000000")) name: str = Field(default="工作流名称") description: str = Field(default="工作流描述") enable: bool = Field(default=True) - editable: bool = Field(default=True) nodes: list[NodeItem] = Field(default=[]) edges: list[EdgeItem] = Field(default=[]) created_at: float | None = Field(alias="createdAt", default=0) diff --git a/apps/schemas/response_data.py b/apps/schemas/response_data.py index 966f7d4b..3908c29b 100644 --- a/apps/schemas/response_data.py +++ b/apps/schemas/response_data.py @@ -18,7 +18,7 @@ from .flow_topology import ( NodeServiceItem, PositionItem, ) -from .mcp import MCPTool, MCPType +from .mcp import MCPType from .record import RecordData from .user import UserInfo @@ -288,7 +288,7 @@ class GetAppListRsp(ResponseData): class RecentAppListItem(BaseModel): """GET /api/app/recent 列表项数据结构""" - app_id: str = Field(..., alias="appId", description="应用ID") + app_id: uuid.UUID = Field(..., alias="appId", description="应用ID") name: str = Field(..., description="应用名称") diff --git a/apps/services/appcenter.py b/apps/services/appcenter.py index 525a024a..17bc8275 100644 --- a/apps/services/appcenter.py +++ b/apps/services/appcenter.py @@ -6,13 +6,13 @@ import uuid from datetime import UTC, datetime from typing import Any -from sqlalchemy import and_, select +from sqlalchemy import and_, func, or_, select from apps.common.postgres import postgres from apps.constants import APP_DEFAULT_HISTORY_LEN, SERVICE_PAGE_SIZE from apps.exceptions import InstancePermissionError -from apps.models.app import App -from apps.models.user import User, UserAppUsage, UserFavorite, UserFavoriteType +from apps.models.app import App, AppACL +from apps.models.user import UserAppUsage, UserFavorite, UserFavoriteType from apps.scheduler.pool.loader.app import AppLoader from apps.schemas.agent import AgentAppMetadata from apps.schemas.appcenter import AppCenterCardItem, AppData, AppPermissionData @@ -21,7 +21,6 @@ from apps.schemas.flow import AppMetadata, MetadataType, Permission from apps.schemas.response_data import RecentAppList, RecentAppListItem from .mcp_service import MCPServiceManager -from .user import UserManager logger = logging.getLogger(__name__) @@ -100,64 +99,86 @@ class AppCenterManager: :param filter_type: 过滤类型 :return: 应用列表, 总应用数 """ - filters: dict[str, Any] = { - "$or": [ - {"permission.type": PermissionType.PUBLIC.value}, - {"$and": [ - {"permission.type": PermissionType.PROTECTED.value}, - {"permission.users": {"$in": [user_sub]}}, - ]}, - {"$and": [ - {"permission.type": PermissionType.PRIVATE.value}, - {"author": user_sub}, - ]}, - ], - } - - user_favorite_app_ids = await AppCenterManager._get_favorite_app_ids_by_user(user_sub) - - if filter_type == AppFilterType.ALL: - # 获取所有已发布的应用 - filters["published"] = True - elif filter_type == AppFilterType.USER: - # 获取用户创建的应用 - filters["author"] = user_sub - elif filter_type == AppFilterType.FAVORITE: - # 获取用户收藏的应用 - filters = { - "_id": {"$in": user_favorite_app_ids}, - "published": True, - } - - # 添加关键字搜索条件 - if keyword: - filters["$or"] = [ - {"name": {"$regex": keyword, "$options": "i"}}, - {"description": {"$regex": keyword, "$options": "i"}}, - {"author": {"$regex": keyword, "$options": "i"}}, - ] + async with postgres.session() as session: + protected_apps = select(AppACL.appId).where( + AppACL.userSub == user_sub, + ) + app_data = select(App).where( + or_( + App.permission == PermissionType.PUBLIC, + and_( + App.permission == PermissionType.PRIVATE, + App.author == user_sub, + ), + and_( + App.permission == PermissionType.PROTECTED, + App.id.in_(protected_apps), + ), + ), + ).cte() - # 添加应用类型过滤条件 - if app_type is not None: - filters["app_type"] = app_type.value - - # 获取应用列表 - apps, total_apps = await AppCenterManager._search_apps_by_filter(filters, page, SERVICE_PAGE_SIZE) - - # 构建返回的应用卡片列表 - app_cards = [ - AppCenterCardItem( - appId=app.id, - appType=app.app_type, - icon=app.icon, - name=app.name, - description=app.description, - author=app.author, - favorited=(app.id in user_favorite_app_ids), - published=app.published, + # 获取用户所有收藏的应用 + user_favourite_apps = select(UserFavorite.itemId).where( + and_( + UserFavorite.userSub == user_sub, + UserFavorite.favouriteType == UserFavoriteType.APP, + ), ) - for app in apps - ] + + # 根据搜索类型加入搜索条件 + if filter_type == AppFilterType.ALL: + filtered_apps = select(app_data).where(App.isPublished == True).cte() # noqa: E712 + elif filter_type == AppFilterType.USER: + filtered_apps = select(app_data).where(App.author == user_sub).cte() + elif filter_type == AppFilterType.FAVORITE: + filtered_apps = select(app_data).where( + and_( + App.id.in_(user_favourite_apps), + App.isPublished == True, # noqa: E712 + ), + ).cte() + + # 根据应用类型加入搜索条件 + if app_type is not None: + type_apps = select(filtered_apps).where(App.appType == AppType(app_type)).cte() + else: + type_apps = filtered_apps + + # 加入关键字搜索条件 + if keyword: + keyword_apps = select(type_apps).where( + or_( + App.name.ilike(f"%{keyword}%"), + App.description.ilike(f"%{keyword}%"), + App.author.ilike(f"%{keyword}%"), + ), + ).cte() + else: + keyword_apps = type_apps + + # 进行搜索 + total_apps = (await session.scalars( + select(func.count()).select_from(keyword_apps), + )).one() + result: list[App] = list((await session.scalars( + select(keyword_apps).order_by(App.updatedAt.desc()) + .offset((page - 1) * SERVICE_PAGE_SIZE).limit(SERVICE_PAGE_SIZE), + )).all()) + + # 构建返回的应用卡片列表 + app_cards = [ + AppCenterCardItem( + appId=app.id, + appType=app.appType, + icon=app.icon, + name=app.name, + description=app.description, + author=app.author, + favorited=(app.id in user_favourite_apps), + published=app.isPublished, + ) + for app in result + ] return app_cards, total_apps @@ -213,7 +234,7 @@ class AppCenterManager: # 不允许更改应用类型 if app_data.appType != data.app_type: - err = f"【AppCenterManager】不允许更改应用类型: {app_data.app_type} -> {data.app_type}" + err = f"【AppCenterManager】不允许更改应用类型: {app_data.appType} -> {data.app_type}" raise ValueError(err) await AppCenterManager._process_app_and_save( @@ -257,13 +278,12 @@ class AppCenterManager: await session.commit() await AppCenterManager._process_app_and_save( - app_type=app_data.app_type, + app_type=app_data.appType, app_id=app_id, user_sub=user_sub, app_data=app_data, published=published, ) - return published @@ -289,7 +309,7 @@ class AppCenterManager: and_( UserFavorite.userSub == user_sub, UserFavorite.itemId == app_id, - UserFavorite.type == UserFavoriteType.APP, + UserFavorite.favouriteType == UserFavoriteType.APP, ), ), )).one_or_none() @@ -297,7 +317,7 @@ class AppCenterManager: # 添加收藏 app_favorite = UserFavorite( userSub=user_sub, - type=UserFavoriteType.APP, + favouriteType=UserFavoriteType.APP, itemId=app_id, ) session.add(app_favorite) @@ -394,28 +414,7 @@ class AppCenterManager: @staticmethod - async def _search_apps_by_filter( - search_conditions: dict[str, Any], - page: int, - page_size: int, - ) -> tuple[list[App], int]: - """根据过滤条件搜索应用并计算总页数""" - mongo = MongoDB() - app_collection = mongo.get_collection("app") - total_apps = await app_collection.count_documents(search_conditions) - db_data = ( - await app_collection.find(search_conditions) - .sort("created_at", -1) - .skip((page - 1) * page_size) - .limit(page_size) - .to_list(length=page_size) - ) - apps = [App.model_validate(doc) for doc in db_data] - return apps, total_apps - - - @staticmethod - async def _get_app_data(app_id: uuid.UUID, user_sub: str, *, check_author: bool = True) -> AppData: + async def _get_app_data(app_id: uuid.UUID, user_sub: str, *, check_author: bool = True) -> App: """ 从数据库获取应用数据并验证权限 @@ -454,7 +453,7 @@ class AppCenterManager: metadata.first_questions = data.first_questions elif app_data: metadata.links = app_data.links - metadata.first_questions = app_data.first_questions + metadata.first_questions = app_data.firstQuestions # 处理 'flows' 字段 if app_data: @@ -516,11 +515,14 @@ class AppCenterManager: return metadata @staticmethod - async def _create_metadata( + async def _create_metadata( # noqa: PLR0913 app_type: AppType, app_id: uuid.UUID, user_sub: str, - **kwargs: Any, + data: AppData | None = None, + app_data: App | None = None, + *, + published: bool | None = None, ) -> AppMetadata | AgentAppMetadata: """ 创建应用元数据 @@ -528,17 +530,12 @@ class AppCenterManager: :param app_type: 应用类型 :param app_id: 应用唯一标识 :param user_sub: 用户唯一标识 - :param kwargs: 可选参数,包含: - - data: 应用数据,用于新建或更新时提供 - - app_data: 现有应用数据,用于更新时提供 - - published: 发布状态,用于更新时提供 + :param data: 应用数据,用于新建或更新时提供 + :param app_data: 现有应用数据,用于更新时提供 + :param published: 发布状态,用于更新时提供 :return: 应用元数据 :raises ValueError: 无效应用类型或缺少必要数据 """ - data: AppData | None = kwargs.get("data") - app_data: App | None = kwargs.get("app_data") - published: bool | None = kwargs.get("published") - # 验证必要数据 source = data if data else app_data if not source: @@ -554,33 +551,31 @@ class AppCenterManager: "name": source.name, "description": source.description, "history_len": data.history_len if data else APP_DEFAULT_HISTORY_LEN, - } - # 设置权限 - if data: - common_params["permission"] = Permission( + "permission": Permission( type=data.permission.type, users=data.permission.users or [], - ) - elif app_data: + ) if data else app_data.permission, + } + # 设置权限 + if app_data: common_params["permission"] = app_data.permission # 根据应用类型创建不同的元数据 - if app_type == AppType.FLOW: - return AppCenterManager._create_flow_metadata(common_params, data, app_data, published) - if app_type == AppType.AGENT: - return AppCenterManager._create_agent_metadata(common_params, user_sub, data, app_data, published) + return AppCenterManager._create_agent_metadata(common_params, user_sub, data, app_data, published=published) - msg = "无效的应用类型" - raise ValueError(msg) + return AppCenterManager._create_flow_metadata(common_params, data, app_data, published=published) @staticmethod - async def _process_app_and_save( + async def _process_app_and_save( # noqa: PLR0913 app_type: AppType, app_id: uuid.UUID, user_sub: str, - **kwargs: Any, + data: AppData | None = None, + app_data: App | None = None, + *, + published: bool | None = None, ) -> Any: """ 处理应用元数据创建和保存 @@ -596,29 +591,12 @@ class AppCenterManager: app_type=app_type, app_id=app_id, user_sub=user_sub, - **kwargs, + data=data, + app_data=app_data, + published=published, ) # 保存应用 app_loader = AppLoader() await app_loader.save(metadata, app_id) return metadata - - - @staticmethod - async def _get_favorite_app_ids_by_user(user_sub: str) -> list[uuid.UUID]: - """获取用户收藏的应用ID""" - async with postgres.session() as session: - favorite_apps = list((await session.scalars( - select(UserFavorite.itemId).where( - and_( - UserFavorite.userSub == user_sub, - UserFavorite.type == UserFavoriteType.APP, - ), - ), - )).all()) - if not favorite_apps: - msg = f"[AppCenterManager] 用户错误或收藏为空: {user_sub}" - raise ValueError(msg) - - return favorite_apps diff --git a/apps/services/flow.py b/apps/services/flow.py index 5ab0108f..8d371c99 100644 --- a/apps/services/flow.py +++ b/apps/services/flow.py @@ -8,9 +8,10 @@ from sqlalchemy import and_, select from apps.common.postgres import postgres from apps.models.app import App +from apps.models.flow import Flow as FlowInfo from apps.models.node import NodeInfo from apps.models.service import Service -from apps.models.user import User, UserFavorite, UserFavoriteType +from apps.models.user import UserFavorite, UserFavoriteType from apps.scheduler.pool.loader.flow import FlowLoader from apps.scheduler.slot.slot import Slot from apps.schemas.enum_var import EdgeType @@ -27,6 +28,7 @@ from apps.schemas.flow_topology import ( from .node import NodeManager +FLOW_SPLIT_LEN = 2 logger = logging.getLogger(__name__) @@ -41,36 +43,23 @@ class FlowManager: :param service_id: 服务id :return: 节点基础信息的列表,按创建时间排序 """ - node_pool_collection = MongoDB().get_collection("node") # 获取节点集合 - try: - cursor = node_pool_collection.find({"service_id": service_id}).sort("created_at", ASCENDING) - - nodes_meta_data_items = [] - async for node_pool_record in cursor: - params_schema, output_schema = await NodeManager.get_node_params(node_pool_record["_id"]) - try: - # TODO: 由于现在没有动态表单,所以暂时使用Slot的create_empty_slot方法 - parameters = { - "input_parameters": Slot(params_schema).create_empty_slot(), - "output_parameters": Slot(output_schema).extract_type_desc_from_schema(), - } - except Exception: - logger.exception("[FlowManager] generate_from_schema 失败") - continue - node_meta_data_item = NodeMetaDataItem( - nodeId=node_pool_record["_id"], - callId=node_pool_record["call_id"], - name=node_pool_record["name"], - description=node_pool_record["description"], - createdAt=node_pool_record["created_at"], - parameters=parameters, # 添加 parametersTemplate 参数 - ) - nodes_meta_data_items.append(node_meta_data_item) - except Exception: - logger.exception("[FlowManager] 获取节点元数据失败") - return None - else: - return nodes_meta_data_items + async with postgres.session() as session: + node_data = list((await session.scalars( + select(NodeInfo.id, NodeInfo.callId, NodeInfo.name, NodeInfo.updatedAt).where( + NodeInfo.serviceId == service_id, + ).order_by(NodeInfo.updatedAt.desc()), + )).all()) + + # 进行格式转换 + result = [] + for item in node_data: + result += [NodeMetaDataBase( + nodeId=item.id, + callId=item.callId, + name=item.name, + updatedAt=round(item.updatedAt.timestamp(), 3), + )] + return result @staticmethod @@ -88,7 +77,7 @@ class FlowManager: select(UserFavorite.itemId).where( and_( UserFavorite.userSub == user_sub, - UserFavorite.type == UserFavoriteType.SERVICE, + UserFavorite.favouriteType == UserFavoriteType.SERVICE, ), ), )).all()) @@ -113,14 +102,14 @@ class FlowManager: service_items = [NodeServiceItem( serviceId=uuid.UUID("00000000-0000-0000-0000-000000000000"), name="系统", - nodeMetaDatas=[], + data=[], createdAt=None, )] service_items += [ NodeServiceItem( serviceId=item.id, name=item.name, - nodeMetaDatas=[], + data=[], createdAt=str(item.updatedAt.timestamp()), ) for item in service_data @@ -130,7 +119,7 @@ class FlowManager: node_meta_datas = await FlowManager.get_node_id_by_service_id(service_item.service_id) if node_meta_datas is None: node_meta_datas = [] - service_item.node_meta_datas = node_meta_datas + service_item.data = node_meta_datas return service_items @@ -143,31 +132,35 @@ class FlowManager: :param node_id: node的id :return: node_id对应的节点元数据信息 """ - node_pool_collection = MongoDB().get_collection("node") # 获取节点集合 - try: - node_pool_record = await node_pool_collection.find_one({"_id": node_id}) - if node_pool_record is None: - logger.error("[FlowManager] 节点元数据 %s 不存在", node_id) - return None + async with postgres.session() as session: + node_data = (await session.scalars( + select(NodeInfo).where(NodeInfo.id == node_id), + )).one_or_none() + # 判断如果没有Node + if node_data is None: + err = f"[FlowManager] 节点元数据 {node_id} 不存在" + logger.error(err) + raise ValueError(err) + + # 处理Schema + params_schema, output_schema = await NodeManager.get_node_params(node_data.id) parameters = { - "input_parameters": node_pool_record["params_schema"], - "output_parameters": node_pool_record["output_schema"], + "input_parameters": Slot(params_schema).create_empty_slot(), + "output_parameters": Slot(output_schema).extract_type_desc_from_schema(), } + return NodeMetaDataItem( - nodeId=node_pool_record["_id"], - callId=node_pool_record["call_id"], - name=node_pool_record["name"], - description=node_pool_record["description"], + nodeId=node_data.id, + callId=node_data.callId, + name=node_data.name, + description=node_data.description, parameters=parameters, - createdAt=node_pool_record["created_at"], + updatedAt=round(node_data.updatedAt.timestamp(), 3), ) - except Exception: - logger.exception("[FlowManager] 获取节点元数据失败") - return None @staticmethod - async def get_flow_by_app_and_flow_id(app_id: str, flow_id: str) -> FlowItem | None: # noqa: C901, PLR0911, PLR0912 + async def get_flow_by_app_and_flow_id(app_id: uuid.UUID, flow_id: uuid.UUID) -> FlowItem | None: """ 通过appId flowId获取flow config的路径和focus,并通过flow config的路径获取flow config,并将其转换为flow item。 @@ -175,72 +168,45 @@ class FlowManager: :param flow_id: 流的id :return: 流的item和用户在这个流上的视觉焦点 """ - try: - app_collection = MongoDB().get_collection("app") - app_record = await app_collection.find_one({"_id": app_id}) - if app_record is None: - logger.error("[FlowManager] 应用 %s 不存在", app_id) - return None - cursor = app_collection.find( - {"_id": app_id, "flows.id": flow_id}, - {"flows.$": 1}, # 只返回 flows 数组中符合条件的第一个元素 - ) - # 获取结果列表,并限制长度为1,因为我们只期待一个结果 - app_records = await cursor.to_list(length=1) - if len(app_records) == 0: - return None - app_record = app_records[0] - if "flows" not in app_record or len(app_record["flows"]) == 0: - return None - for flow in app_record["flows"]: - if flow["id"] == flow_id: - flow_record = flow - break - if flow_record is None: - return None - except Exception: - logger.exception("[FlowManager] 获取流失败") - return None - - try: + async with postgres.session() as session: + flow_data = (await session.scalars( + select(FlowInfo).where(and_(FlowInfo.appId == app_id, FlowInfo.id == flow_id)), + )).one_or_none() + if flow_data is None: + err = f"[FlowManager] 流 {flow_id} 不存在" + logger.error(err) + raise ValueError(err) + flow_config = await FlowLoader.load(app_id, flow_id) - if not flow_config: - logger.error("[FlowManager] 获取流配置失败") - return None focus_point = flow_config.focus_point or PositionItem(x=0, y=0) flow_item = FlowItem( flowId=flow_id, name=flow_config.name, description=flow_config.description, enable=True, - editable=True, nodes=[], edges=[], focusPoint=focus_point, connectivity=flow_config.connectivity, debug=flow_config.debug, ) + for node_id, node_config in flow_config.steps.items(): input_parameters = node_config.params - if node_config.node not in ("Empty"): - _, output_parameters = await NodeManager.get_node_params(node_config.node) - else: - output_parameters = {} + _, output_parameters = await NodeManager.get_node_params(node_config.node) parameters = { "input_parameters": input_parameters, "output_parameters": Slot(output_parameters).extract_type_desc_from_schema(), } + node_item = NodeItem( stepId=node_id, nodeId=node_config.node, name=node_config.name, description=node_config.description, - enable=True, - editable=True, callId=node_config.type, parameters=parameters, - position=PositionItem( - x=node_config.pos.x, y=node_config.pos.y), + position=PositionItem(x=node_config.pos.x, y=node_config.pos.y), ) flow_item.nodes.append(node_item) @@ -248,10 +214,10 @@ class FlowManager: edge_from = edge_config.edge_from branch_id = "" tmp_list = edge_config.edge_from.split(".") - if len(tmp_list) == 0 or len(tmp_list) > 2: + if len(tmp_list) == 0 or len(tmp_list) > FLOW_SPLIT_LEN: logger.error("[FlowManager] Flow中边的格式错误") continue - if len(tmp_list) == 2: + if len(tmp_list) == FLOW_SPLIT_LEN: edge_from = tmp_list[0] branch_id = tmp_list[1] flow_item.edges.append( @@ -264,9 +230,6 @@ class FlowManager: ), ) return flow_item - except Exception: - logger.exception("[FlowManager] 获取流失败") - return None @staticmethod diff --git a/apps/services/service.py b/apps/services/service.py index d6648309..e4ded160 100644 --- a/apps/services/service.py +++ b/apps/services/service.py @@ -471,7 +471,7 @@ class ServiceCenterManager: await session.execute( delete(UserFavorite).where( UserFavorite.itemId == service_id, - UserFavorite.type == UserFavoriteType.SERVICE, + UserFavorite.favouriteType == UserFavoriteType.SERVICE, ), ) await session.commit() @@ -512,7 +512,7 @@ class ServiceCenterManager: and_( UserFavorite.itemId == service_id, UserFavorite.userSub == user_sub, - UserFavorite.type == UserFavoriteType.SERVICE, + UserFavorite.favouriteType == UserFavoriteType.SERVICE, ), ), )).one_or_none() @@ -521,7 +521,7 @@ class ServiceCenterManager: user_favourite = UserFavorite( itemId=service_id, userSub=user_sub, - type=UserFavoriteType.SERVICE, + favouriteType=UserFavoriteType.SERVICE, ) session.add(user_favourite) await session.commit() @@ -538,7 +538,7 @@ class ServiceCenterManager: user_favourite = (await session.scalars( select(UserFavorite).where( UserFavorite.userSub == user_sub, - UserFavorite.type == UserFavoriteType.SERVICE, + UserFavorite.favouriteType == UserFavoriteType.SERVICE, ), )).all() return [user_favourite.itemId for user_favourite in user_favourite] -- Gitee From d3fa021f7b1065d6fb41833b6a1bc040e988c014 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Tue, 5 Aug 2025 17:04:09 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=9B=9E=E5=90=88agent=E5=88=86=E6=94=AF?= =?UTF-8?q?=E8=87=B3!597?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/queue.py | 2 +- apps/llm/patterns/rewrite.py | 2 +- apps/main.py | 2 + apps/routers/chat.py | 9 +++- apps/routers/parameter.py | 74 ++++++++++++++++++++++++++++++++ apps/routers/record.py | 21 ++++----- apps/schemas/enum_var.py | 3 ++ apps/schemas/mcp.py | 2 + apps/schemas/message.py | 9 +++- apps/schemas/parameters.py | 69 ++++++++++++++++++++++++++++++ apps/schemas/record.py | 5 +++ apps/schemas/request_data.py | 5 ++- apps/schemas/response_data.py | 51 +++++++++++++++++++++- apps/schemas/task.py | 3 ++ apps/services/parameter.py | 80 +++++++++++++++++++++++++++++++++++ apps/services/rag.py | 4 ++ 16 files changed, 323 insertions(+), 18 deletions(-) create mode 100644 apps/routers/parameter.py create mode 100644 apps/schemas/parameters.py create mode 100644 apps/services/parameter.py diff --git a/apps/common/queue.py b/apps/common/queue.py index 5601c93a..96a87073 100644 --- a/apps/common/queue.py +++ b/apps/common/queue.py @@ -58,7 +58,7 @@ class MessageQueue: flowId=task.state.flow_id, stepId=task.state.step_id, stepName=task.state.step_name, - stepStatus=task.state.status, + stepStatus=task.state.step_status, ) else: flow = None diff --git a/apps/llm/patterns/rewrite.py b/apps/llm/patterns/rewrite.py index 05db345d..46d89e4a 100644 --- a/apps/llm/patterns/rewrite.py +++ b/apps/llm/patterns/rewrite.py @@ -29,7 +29,7 @@ class QuestionRewrite(CorePattern): user_prompt: str = r""" - 根据上面的对话,推断用户的实际意图并补全用户的提问内容。 + 根据历史对话,推断用户的实际意图并补全用户的提问内容,历史对话被包含在标签中,用户意图被包含在标签中。 要求: 1. 请使用JSON格式输出,参考下面给出的样例;不要包含任何XML标签,不要包含任何解释说明; 2. 若用户当前提问内容与对话上文不相关,或你认为用户的提问内容已足够完整,请直接输出用户的提问内容。 diff --git a/apps/main.py b/apps/main.py index f132e9c2..7b1338e7 100644 --- a/apps/main.py +++ b/apps/main.py @@ -33,6 +33,7 @@ from .routers import ( knowledge, llm, mcp_service, + parameter, personal_token, record, service, @@ -71,6 +72,7 @@ app.include_router(llm.admin_router) app.include_router(mcp_service.router) app.include_router(flow.router) app.include_router(user.router) +app.include_router(parameter.router) app.include_router(tag.admin_router) app.include_router(tag.router) diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 1db26e31..b1e92b8a 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -35,10 +35,17 @@ router = APIRouter( async def init_task(post_body: RequestData, user_sub: str, session_id: str) -> Task: """初始化Task""" + if post_body.new_task: + # 创建或还原Task + task = await TaskManager.get_task(session_id=session_id, post_body=post_body, user_sub=user_sub) + if task: + await TaskManager.delete_task_by_task_id(task.id) # 创建或还原Task task = await TaskManager.get_task(session_id=session_id, post_body=post_body, user_sub=user_sub) # 更改信息并刷新数据库 - task.runtime.question = post_body.question + if post_body.new_task: + task.runtime.question = post_body.question + task.ids.group_id = post_body.group_id return task diff --git a/apps/routers/parameter.py b/apps/routers/parameter.py new file mode 100644 index 00000000..833e0cab --- /dev/null +++ b/apps/routers/parameter.py @@ -0,0 +1,74 @@ +from typing import Annotated + +from fastapi import APIRouter, Depends, Query, status +from fastapi.responses import JSONResponse + +from apps.dependency import get_user +from apps.dependency.user import verify_user +from apps.schemas.response_data import GetOperaRsp, GetParamsRsp +from apps.services.appcenter import AppCenterManager +from apps.services.flow import FlowManager +from apps.services.parameter import ParameterManager + +router = APIRouter( + prefix="/api/parameter", + tags=["parameter"], + dependencies=[ + Depends(verify_user), + ], +) + + +@router.get("", response_model=GetParamsRsp) +async def get_parameters( + user_sub: Annotated[str, Depends(get_user)], + app_id: Annotated[str, Query(alias="appId")], + flow_id: Annotated[str, Query(alias="flowId")], + step_id: Annotated[str, Query(alias="stepId")], +) -> JSONResponse: + """Get parameters for node choice.""" + if not await AppManager.validate_user_app_access(user_sub, app_id): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content=GetParamsRsp( + code=status.HTTP_403_FORBIDDEN, + message="用户没有权限访问该流", + result=[], + ).model_dump(exclude_none=True, by_alias=True), + ) + flow = await FlowManager.get_flow_by_app_and_flow_id(app_id, flow_id) + if not flow: + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content=GetParamsRsp( + code=status.HTTP_404_NOT_FOUND, + message="未找到该流", + result=[], + ).model_dump(exclude_none=True, by_alias=True), + ) + result = await ParameterManager.get_pre_params_by_flow_and_step_id(flow, step_id) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=GetParamsRsp( + code=status.HTTP_200_OK, + message="获取参数成功", + result=result, + ).model_dump(exclude_none=True, by_alias=True), + ) + + +@router.get("/operate", response_model=GetOperaRsp) +async def get_operate_parameters( + user_sub: Annotated[str, Depends(get_user)], + param_type: Annotated[str, Query(alias="ParamType")], +) -> JSONResponse: + """Get parameters for node choice.""" + result = await ParameterManager.get_operate_and_bind_type(param_type) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=GetOperaRsp( + code=status.HTTP_200_OK, + message="获取操作成功", + result=result, + ).model_dump(exclude_none=True, by_alias=True), + ) diff --git a/apps/routers/record.py b/apps/routers/record.py index 9ed98825..4f424e81 100644 --- a/apps/routers/record.py +++ b/apps/routers/record.py @@ -86,22 +86,23 @@ async def get_record(request: Request, conversationId: Annotated[uuid.UUID, Path tmp_record.document = await DocumentManager.get_used_docs_by_record(user_sub, record_group.id) # 获得Record关联的flow数据 - flow_list = await TaskManager.get_context_by_record_id(record_group.id, record.id) - if flow_list: - first_flow = FlowStepHistory.model_validate(flow_list[0]) + flow_step_list = await TaskManager.get_context_by_record_id(record_group.id, record.id) + if flow_step_list: + first_step_history = FlowStepHistory.model_validate(flow_step_list[0]) tmp_record.flow = RecordFlow( - id=first_flow.flow_name, #TODO: 此处前端应该用name + id=first_step_history.flow_name, # TODO: 此处前端应该用name recordId=record.id, - flowId=first_flow.id, - stepNum=len(flow_list), + flowStatus=first_step_history.flow_status, + flowId=first_step_history.id, + stepNum=len(flow_step_list), steps=[], ) - for flow in flow_list: - flow_step = FlowStepHistory.model_validate(flow) + for flow_step in flow_step_list: + flow_step = FlowStepHistory.model_validate(flow_step) tmp_record.flow.steps.append( RecordFlowStep( - stepId=flow_step.step_name, #TODO: 此处前端应该用name - stepStatus=flow_step.status, + stepId=flow_step.step_name, # TODO: 此处前端应该用name + stepStatus=flow_step.step_status, input=flow_step.input_data, output=flow_step.output_data, ), diff --git a/apps/schemas/enum_var.py b/apps/schemas/enum_var.py index 02e42993..6970781e 100644 --- a/apps/schemas/enum_var.py +++ b/apps/schemas/enum_var.py @@ -15,6 +15,7 @@ class SlotType(str, Enum): class StepStatus(str, Enum): """步骤状态""" + WAITING = "waiting" RUNNING = "running" SUCCESS = "success" ERROR = "error" @@ -38,6 +39,8 @@ class EventType(str, Enum): TEXT_ADD = "text.add" GRAPH = "graph" DOCUMENT_ADD = "document.add" + STEP_WAITING_FOR_START = "step.waiting_for_start" + STEP_WAITING_FOR_PARAM = "step.waiting_for_param" FLOW_START = "flow.start" STEP_INPUT = "step.input" STEP_OUTPUT = "step.output" diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py index c6366ea1..41f5b303 100644 --- a/apps/schemas/mcp.py +++ b/apps/schemas/mcp.py @@ -1,6 +1,7 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP 相关数据结构""" +import uuid from enum import Enum from pydantic import BaseModel, Field @@ -66,6 +67,7 @@ class MCPSelectResult(BaseModel): class MCPPlanItem(BaseModel): """MCP 计划""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) content: str = Field(description="计划内容") tool: str = Field(description="工具名称") instruction: str = Field(description="工具指令") diff --git a/apps/schemas/message.py b/apps/schemas/message.py index 1c41cfdf..0a441dd2 100644 --- a/apps/schemas/message.py +++ b/apps/schemas/message.py @@ -2,6 +2,7 @@ """队列中的消息结构""" import uuid +from datetime import UTC, datetime from typing import Any from pydantic import BaseModel, Field @@ -25,6 +26,8 @@ class MessageFlow(BaseModel): flow_id: str = Field(description="Flow ID", alias="flowId") step_id: str = Field(description="当前步骤ID", alias="stepId") step_name: str = Field(description="当前步骤名称", alias="stepName") + sub_step_id: str | None = Field(description="当前子步骤ID", alias="subStepId", default=None) + sub_step_name: str | None = Field(description="当前子步骤名称", alias="subStepName", default=None) step_status: StepStatus = Field(description="当前步骤状态", alias="stepStatus") @@ -59,12 +62,16 @@ class TextAddContent(BaseModel): class DocumentAddContent(BaseModel): """document.add消息的content""" - document_id: str = Field(min_length=36, max_length=36, description="文档UUID", alias="documentId") + document_id: str = Field(description="文档UUID", alias="documentId") document_order: int = Field(description="文档在对话中的顺序,从1开始", alias="documentOrder") + document_author: str = Field(description="文档作者", alias="documentAuthor", default="") document_name: str = Field(description="文档名称", alias="documentName") document_abstract: str = Field(description="文档摘要", alias="documentAbstract", default="") document_type: str = Field(description="文档MIME类型", alias="documentType", default="") document_size: float = Field(ge=0, description="文档大小,单位是KB,保留两位小数", alias="documentSize", default=0) + created_at: float = Field( + description="文档创建时间,单位是秒", alias="createdAt", default_factory=lambda: round(datetime.now(tz=UTC).timestamp(), 3) + ) class FlowStartContent(BaseModel): diff --git a/apps/schemas/parameters.py b/apps/schemas/parameters.py new file mode 100644 index 00000000..bd908d23 --- /dev/null +++ b/apps/schemas/parameters.py @@ -0,0 +1,69 @@ +from enum import Enum + + +class NumberOperate(str, Enum): + """Choice 工具支持的数字运算符""" + + EQUAL = "number_equal" + NOT_EQUAL = "number_not_equal" + GREATER_THAN = "number_greater_than" + LESS_THAN = "number_less_than" + GREATER_THAN_OR_EQUAL = "number_greater_than_or_equal" + LESS_THAN_OR_EQUAL = "number_less_than_or_equal" + + +class StringOperate(str, Enum): + """Choice 工具支持的字符串运算符""" + + EQUAL = "string_equal" + NOT_EQUAL = "string_not_equal" + CONTAINS = "string_contains" + NOT_CONTAINS = "string_not_contains" + STARTS_WITH = "string_starts_with" + ENDS_WITH = "string_ends_with" + LENGTH_EQUAL = "string_length_equal" + LENGTH_GREATER_THAN = "string_length_greater_than" + LENGTH_GREATER_THAN_OR_EQUAL = "string_length_greater_than_or_equal" + LENGTH_LESS_THAN = "string_length_less_than" + LENGTH_LESS_THAN_OR_EQUAL = "string_length_less_than_or_equal" + REGEX_MATCH = "string_regex_match" + + +class ListOperate(str, Enum): + """Choice 工具支持的列表运算符""" + + EQUAL = "list_equal" + NOT_EQUAL = "list_not_equal" + CONTAINS = "list_contains" + NOT_CONTAINS = "list_not_contains" + LENGTH_EQUAL = "list_length_equal" + LENGTH_GREATER_THAN = "list_length_greater_than" + LENGTH_GREATER_THAN_OR_EQUAL = "list_length_greater_than_or_equal" + LENGTH_LESS_THAN = "list_length_less_than" + LENGTH_LESS_THAN_OR_EQUAL = "list_length_less_than_or_equal" + + +class BoolOperate(str, Enum): + """Choice 工具支持的布尔运算符""" + + EQUAL = "bool_equal" + NOT_EQUAL = "bool_not_equal" + + +class DictOperate(str, Enum): + """Choice 工具支持的字典运算符""" + + EQUAL = "dict_equal" + NOT_EQUAL = "dict_not_equal" + CONTAINS_KEY = "dict_contains_key" + NOT_CONTAINS_KEY = "dict_not_contains_key" + + +class Type(str, Enum): + """Choice 工具支持的类型""" + + STRING = "string" + NUMBER = "number" + LIST = "list" + DICT = "dict" + BOOL = "bool" diff --git a/apps/schemas/record.py b/apps/schemas/record.py index 3fc44aa7..7f4f3103 100644 --- a/apps/schemas/record.py +++ b/apps/schemas/record.py @@ -14,8 +14,10 @@ class RecordDocument(BaseModel): """GET /api/record/{conversation_id} Result中的document数据结构""" id: str = Field(alias="_id", default="") + order: int = Field(default=0, description="文档顺序") abstract: str = Field(default="", description="文档摘要") user_sub: None = None + author: str = Field(default="", description="文档作者") associated: Literal["question", "answer"] class Config: @@ -91,11 +93,14 @@ class RecordGroupDocument(BaseModel): """RecordGroup关联的文件""" id: str = Field(default_factory=lambda: str(uuid.uuid4()), alias="_id") + order: int = Field(default=0, description="文档顺序") + author: str = Field(default="", description="文档作者") name: str = Field(description="文档名称") abstract: str = Field(description="文档摘要", default="") extension: str = Field(description="文档扩展名", default="") size: int = Field(description="文档大小,单位是KB", default=0) associated: Literal["question", "answer"] + created_at: float = Field(default=0.0, description="文档创建时间") class Record(RecordData): diff --git a/apps/schemas/request_data.py b/apps/schemas/request_data.py index 8303a96d..ceca9e74 100644 --- a/apps/schemas/request_data.py +++ b/apps/schemas/request_data.py @@ -16,8 +16,8 @@ class RequestDataApp(BaseModel): """模型对话中包含的app信息""" app_id: uuid.UUID = Field(description="应用ID", alias="appId") - flow_id: uuid.UUID = Field(description="Flow ID", alias="flowId") - params: dict[str, Any] = Field(description="插件参数") + flow_id: str | None = Field(default=None, description="Flow ID", alias="flowId") + params: dict[str, Any] | None = Field(default=None, description="插件参数") class RequestData(BaseModel): @@ -31,6 +31,7 @@ class RequestData(BaseModel): files: list[str] = Field(default=[], description="文件列表") app: RequestDataApp | None = Field(default=None, description="应用") debug: bool = Field(default=False, description="是否调试") + new_task: bool = Field(default=True, description="是否新建任务") class QuestionBlacklistRequest(BaseModel): diff --git a/apps/schemas/response_data.py b/apps/schemas/response_data.py index 3908c29b..f41fdb02 100644 --- a/apps/schemas/response_data.py +++ b/apps/schemas/response_data.py @@ -14,11 +14,17 @@ from .appcenter import AppCenterCardItem, AppData from .enum_var import DocumentStatus from .flow_topology import ( FlowItem, - NodeMetaDataItem, NodeServiceItem, - PositionItem, ) from .mcp import MCPType +from .parameters import ( + BoolOperate, + DictOperate, + ListOperate, + NumberOperate, + StringOperate, + Type, +) from .record import RecordData from .user import UserInfo @@ -588,3 +594,44 @@ class ListLLMRsp(ResponseData): """GET /api/llm 返回数据结构""" result: list[LLMProviderInfo] = Field(default=[], title="Result") + + +class ParamsNode(BaseModel): + """参数数据结构""" + + param_name: str = Field(..., description="参数名称", alias="paramName") + param_path: str = Field(..., description="参数路径", alias="paramPath") + param_type: Type = Field(..., description="参数类型", alias="paramType") + sub_params: list["ParamsNode"] | None = Field( + default=None, description="子参数列表", alias="subParams", + ) + + +class StepParams(BaseModel): + """参数数据结构""" + + step_id: str = Field(..., description="步骤ID", alias="stepId") + name: str = Field(..., description="Step名称") + params_node: ParamsNode | None = Field( + default=None, description="参数节点", alias="paramsNode") + + +class GetParamsRsp(ResponseData): + """GET /api/params 返回数据结构""" + + result: list[StepParams] = Field( + default=[], description="参数列表", alias="result", + ) + + +class OperateAndBindType(BaseModel): + """操作和绑定类型数据结构""" + + operate: NumberOperate | StringOperate | ListOperate | BoolOperate | DictOperate = Field(description="操作类型") + bind_type: Type = Field(description="绑定类型") + + +class GetOperaRsp(ResponseData): + """GET /api/operate 返回数据结构""" + + result: list[OperateAndBindType] = Field(..., title="Result") diff --git a/apps/schemas/task.py b/apps/schemas/task.py index 555eade9..afb445d1 100644 --- a/apps/schemas/task.py +++ b/apps/schemas/task.py @@ -9,6 +9,7 @@ from pydantic import BaseModel, Field from .enum_var import StepStatus from .flow import Step +from .mcp import MCPPlan class FlowStepHistory(BaseModel): @@ -42,6 +43,7 @@ class ExecutorState(BaseModel): # 附加信息 step_id: str = Field(description="当前步骤ID") step_name: str = Field(description="当前步骤名称") + step_description: str = Field(description="当前步骤描述", default="") app_id: str = Field(description="应用ID") slot: dict[str, Any] = Field(description="待填充参数的JSON Schema", default={}) error_info: dict[str, Any] = Field(description="错误信息", default={}) @@ -74,6 +76,7 @@ class TaskRuntime(BaseModel): summary: str = Field(description="摘要", default="") filled: dict[str, Any] = Field(description="填充的槽位", default={}) documents: list[dict[str, Any]] = Field(description="文档列表", default=[]) + temporary_plans: MCPPlan | None = Field(description="临时计划列表", default=None) class Task(BaseModel): diff --git a/apps/services/parameter.py b/apps/services/parameter.py new file mode 100644 index 00000000..3105af85 --- /dev/null +++ b/apps/services/parameter.py @@ -0,0 +1,80 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. +"""Parameter Manager""" + +import logging + +from pymongo import ASCENDING + +from apps.scheduler.call.choice.condition_handler import ConditionHandler +from apps.scheduler.call.choice.schema import BoolOperate, DictOperate, ListOperate, NumberOperate, StringOperate, Type +from apps.scheduler.slot.slot import Slot +from apps.schemas.flow_topology import FlowItem +from apps.schemas.response_data import ( + OperateAndBindType, + ParamsNode, + StepParams, +) +from apps.services.node import NodeManager + +logger = logging.getLogger(__name__) + + +class ParameterManager: + """Parameter Manager""" + + @staticmethod + async def get_operate_and_bind_type(param_type: Type) -> list[OperateAndBindType]: + """Get operate and bind type""" + result = [] + operate = None + if param_type == Type.NUMBER: + operate = NumberOperate + elif param_type == Type.STRING: + operate = StringOperate + elif param_type == Type.LIST: + operate = ListOperate + elif param_type == Type.BOOL: + operate = BoolOperate + elif param_type == Type.DICT: + operate = DictOperate + if operate: + for item in operate: + result.append(OperateAndBindType( + operate=item, + bind_type=ConditionHandler.get_value_type_from_operate(item))) + return result + + @staticmethod + async def get_pre_params_by_flow_and_step_id(flow: FlowItem, step_id: str) -> list[StepParams]: + """Get pre params by flow and step id""" + index = 0 + q = [step_id] + in_edges = {} + step_id_to_node_id = {} + for step in flow.nodes: + step_id_to_node_id[step.step_id] = step.node_id + for edge in flow.edges: + if edge.target_node not in in_edges: + in_edges[edge.target_node] = [] + in_edges[edge.target_node].append(edge.source_node) + while index < len(q): + tmp_step_id = q[index] + index += 1 + for i in range(len(in_edges.get(tmp_step_id, []))): + pre_node_id = in_edges[tmp_step_id][i] + if pre_node_id not in q: + q.append(pre_node_id) + pre_step_params = [] + for step_id in q: + node_id = step_id_to_node_id.get(step_id) + params_schema, output_schema = await NodeManager.get_node_params(node_id) + slot = Slot(output_schema) + params_node = slot.get_params_node_from_schema(root='/output') + pre_step_params.append( + StepParams( + stepId=node_id, + name=params_schema.get("name", ""), + paramsNode=params_node, + ) + ) + return pre_step_params diff --git a/apps/services/rag.py b/apps/services/rag.py index 9691ce8d..148e39cf 100644 --- a/apps/services/rag.py +++ b/apps/services/rag.py @@ -4,6 +4,7 @@ import json import logging from collections.abc import AsyncGenerator +from datetime import UTC, datetime from typing import Any import httpx @@ -39,6 +40,7 @@ class RAG: 2.脚注的格式为[[1]],[[2]],[[3]]等,脚注的内容为提供的文档的id。 3.脚注只出现在回答的句子的末尾,例如句号、问号等标点符号后面。 4.不要对脚注本身进行解释或说明。 + 5.请不要使用中的文档的id作为脚注。 @@ -157,9 +159,11 @@ class RAG: "id": doc_chunk["docId"], "order": doc_cnt, "name": doc_chunk.get("docName", ""), + "author": doc_chunk.get("docAuthor", ""), "extension": doc_chunk.get("docExtension", ""), "abstract": doc_chunk.get("docAbstract", ""), "size": doc_chunk.get("docSize", 0), + "created_at": doc_chunk.get("docCreatedAt", round(datetime.now(UTC).timestamp(), 3)), }) doc_id_map[doc_chunk["docId"]] = doc_cnt doc_index = doc_id_map[doc_chunk["docId"]] -- Gitee