From 9c5b9313b50ece5ac1f0429bb8d5f80abf93ca5e Mon Sep 17 00:00:00 2001 From: z30057876 Date: Sun, 20 Apr 2025 16:28:52 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E8=AF=AD=E4=B9=89=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E7=9B=B8=E5=85=B3=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 2 +- apps/manager/flow.py | 5 +- apps/manager/service.py | 22 +++- apps/routers/chat.py | 8 +- apps/routers/flow.py | 6 +- apps/scheduler/call/api/api.py | 71 +++++++----- apps/scheduler/call/graph/graph.py | 7 +- apps/scheduler/call/suggest/schema.py | 2 +- apps/scheduler/call/suggest/suggest.py | 13 ++- apps/scheduler/pool/loader/flow.py | 6 +- apps/scheduler/pool/loader/openapi.py | 8 ++ apps/scheduler/scheduler/scheduler.py | 4 +- apps/scheduler/slot/slot.py | 147 ++++++++++++++++--------- apps/service/flow.py | 43 -------- 14 files changed, 198 insertions(+), 146 deletions(-) diff --git a/Dockerfile b/Dockerfile index 27c7bab7..4cabfda4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM hub.oepkgs.net/neocopilot/framework_base:0.9.5-x86-test +FROM hub.oepkgs.net/neocopilot/framework-baseimg:dev USER root RUN sed -i 's/umask 002/umask 027/g' /etc/bashrc && \ diff --git a/apps/manager/flow.py b/apps/manager/flow.py index 3cfe2d90..80a046dc 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -22,7 +22,7 @@ from apps.entities.flow_topology import ( from apps.manager.node import NodeManager from apps.models.mongo import MongoDB from apps.scheduler.pool.loader.flow import FlowLoader -from apps.service.flow import generate_from_schema +from apps.scheduler.slot.slot import Slot logger = logging.getLogger(__name__) @@ -87,8 +87,9 @@ class FlowManager: async for node_pool_record in cursor: params_schema, output_schema = await NodeManager.get_node_params(node_pool_record["_id"]) try: + # TODO: 由于现在没有动态表单,所以暂时使用Slot的create_empty_slot方法 parameters = { - "input_parameters": generate_from_schema(params_schema), + "input_parameters": Slot(params_schema).create_empty_slot(), "output_parameters": output_schema, } except Exception: diff --git a/apps/manager/service.py b/apps/manager/service.py index 5cb87079..c71fc73f 100644 --- a/apps/manager/service.py +++ b/apps/manager/service.py @@ -225,7 +225,7 @@ class ServiceCenterManager: """获取服务数据""" # 验证用户权限 service_collection = MongoDB.get_collection("service") - db_service = await service_collection.find_one({"_id": service_id}) + db_service = await service_collection.find_one({"_id": service_id, "author": user_sub}) if not db_service: msg = "Service not found" raise ServiceIDError(msg) @@ -241,6 +241,26 @@ class ServiceCenterManager: return service_pool_store.name, service_data + @staticmethod + async def get_service_metadata( + user_sub: str, + service_id: str, + ) -> ServiceMetadata: + """获取服务元数据""" + service_collection = MongoDB.get_collection("service") + db_service = await service_collection.find_one({"_id": service_id, "author": user_sub}) + if not db_service: + msg = "Service not found" + raise ServiceIDError(msg) + + metadata_path = ( + Path(Config().get_config().deploy.data_dir) / "semantics" / "service" / service_id / "metadata.yaml" + ) + async with await metadata_path.open() as f: + metadata_data = yaml.safe_load(await f.read()) + return ServiceMetadata.model_validate(metadata_data) + + @staticmethod async def delete_service( user_sub: str, diff --git a/apps/routers/chat.py b/apps/routers/chat.py index a8414cdd..a17f4ee5 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -25,6 +25,7 @@ from apps.entities.request_data import RequestData from apps.entities.response_data import ResponseData from apps.manager.flow import FlowManager from apps.manager.blacklist import QuestionBlacklistManager, UserBlacklistManager +from apps.manager.flow import FlowManager from apps.manager.task import TaskManager from apps.scheduler.scheduler import Scheduler from apps.scheduler.scheduler.context import save_data @@ -106,7 +107,12 @@ async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) yield "data: [DONE]\n\n" if post_body.app and post_body.app.flow_id: - await FlowManager.update_flow_debug_by_app_and_flow_id(post_body.app.app_id, post_body.app.flow_id, debug=True) + await FlowManager.update_flow_debug_by_app_and_flow_id( + post_body.app.app_id, + post_body.app.flow_id, + debug=True, + ) + except Exception: logger.exception("[Chat] 生成答案失败") yield "data: [ERROR]\n\n" diff --git a/apps/routers/flow.py b/apps/routers/flow.py index e9844696..6060e61e 100644 --- a/apps/routers/flow.py +++ b/apps/routers/flow.py @@ -96,9 +96,9 @@ async def get_services( status.HTTP_404_NOT_FOUND: {"model": ResponseData}, }) async def get_flow( - user_sub: Annotated[str, Depends(get_user)], - app_id: Annotated[str, Query(alias="appId")], - flow_id: Annotated[str, Query(alias="flowId")], + user_sub: Annotated[str, Depends(get_user)], + app_id: Annotated[str, Query(alias="appId")], + flow_id: Annotated[str, Query(alias="flowId")], ) -> JSONResponse: """获取流拓扑结构""" if not await AppManager.validate_user_app_access(user_sub, app_id): diff --git a/apps/scheduler/call/api/api.py b/apps/scheduler/call/api/api.py index 0e42b973..dc7bf0e5 100644 --- a/apps/scheduler/call/api/api.py +++ b/apps/scheduler/call/api/api.py @@ -7,7 +7,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. import json import logging from collections.abc import AsyncGenerator -from typing import TYPE_CHECKING, Any, Self +from typing import Any import aiohttp from fastapi import status @@ -15,7 +15,6 @@ from pydantic import Field from apps.common.oidc import oidc_provider from apps.entities.enum_var import CallOutputType, ContentType, HTTPMethod -from apps.entities.flow import ServiceMetadata from apps.entities.scheduler import ( CallError, CallInfo, @@ -27,10 +26,6 @@ from apps.manager.token import TokenManager from apps.scheduler.call.api.schema import APIInput, APIOutput from apps.scheduler.call.core import CoreCall -if TYPE_CHECKING: - from apps.scheduler.executor.step import StepExecutor - - logger = logging.getLogger(__name__) SUCCESS_HTTP_CODES = [ status.HTTP_200_OK, @@ -72,22 +67,25 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): async def _init(self, call_vars: CallVars) -> APIInput: """初始化API调用工具""" - # 获取对应API的Service Metadata - try: - service_metadata = await ServiceCenterManager.get_service_data( - call_vars.ids.user_sub, self.node.service_id or "", - ) - service_metadata = ServiceMetadata.model_validate(service_metadata) - except Exception as e: - raise CallError( - message="API接口的Service Metadata获取失败", - data={}, - ) from e - - # 获取Service对应的Auth - self._auth = service_metadata.api.auth + self._service_id = "" self._session_id = call_vars.ids.session_id - self._service_id = call_vars.ids.service_id + self._auth = None + + if self.node: + # 获取对应API的Service Metadata + try: + service_metadata = await ServiceCenterManager.get_service_metadata( + call_vars.ids.user_sub, self.node.service_id or "", + ) + except Exception as e: + raise CallError( + message="API接口的Service Metadata获取失败", + data={}, + ) from e + + # 获取Service对应的Auth + self._auth = service_metadata.api.auth + self._service_id = self.node.service_id or "" return APIInput( url=self.url, @@ -111,7 +109,7 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): await self._session.close() - async def _make_api_call(self, data: dict | None, files: aiohttp.FormData): # noqa: ANN202 + async def _make_api_call(self, data: dict | None, files: aiohttp.FormData): # noqa: ANN202, C901 """组装API请求Session""" # 获取必要参数 req_header = { @@ -141,7 +139,10 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): ) req_header.update({"access-token": token}) - if self.method in ["get", "delete"]: + if self.method in [ + HTTPMethod.GET.value, + HTTPMethod.DELETE.value, + ]: req_params.update(data) return self._session.request( self.method, @@ -152,8 +153,15 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): timeout=self._timeout, ) - if self.method in ["post", "put", "patch"]: - if self.content_type == "form": + if self.method in [ + HTTPMethod.POST.value, + HTTPMethod.PUT.value, + HTTPMethod.PATCH.value, + ]: + if self.content_type in [ + ContentType.FORM_URLENCODED.value, + ContentType.MULTIPART_FORM_DATA.value, + ]: form_data = files for key, val in data.items(): form_data.add_field(key, val) @@ -174,8 +182,10 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): timeout=self._timeout, ) - err = "Method not implemented." - raise NotImplementedError(err) + raise CallError( + message="API接口的HTTP Method不支持", + data={}, + ) async def _call_api(self, final_data: dict[str, Any] | None = None) -> APIOutput: """实际调用API,并处理返回值""" @@ -207,8 +217,11 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): # 如果返回值是JSON try: response_dict = json.loads(response_data) - except Exception: - logger.exception("[API] 返回值不是JSON格式!") + except Exception as e: + raise CallError( + message="API接口的返回值不是JSON格式", + data={}, + ) from e return APIOutput( http_code=response_status, diff --git a/apps/scheduler/call/graph/graph.py b/apps/scheduler/call/graph/graph.py index 020326b6..8bfeb7c0 100644 --- a/apps/scheduler/call/graph/graph.py +++ b/apps/scheduler/call/graph/graph.py @@ -26,7 +26,7 @@ from apps.scheduler.call.graph.style import RenderStyle class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): """Render Call,用于将SQL Tool查询出的数据转换为图表""" - data: list[dict[str, Any]] = Field(description="用于绘制图表的数据", exclude=True, frozen=True) + dataset_key: str = Field(description="图表的数据来源(字段名)") @classmethod @@ -37,8 +37,6 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): async def _init(self, call_vars: CallVars) -> RenderInput: """初始化Render Call,校验参数,读取option模板""" - await super()._init(call_vars) - try: option_location = Path(__file__).parent / "option.json" f = await Path(option_location).open(encoding="utf-8") @@ -48,6 +46,9 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): except Exception as e: raise CallError(message=f"图表模板读取失败:{e!s}", data={}) from e + # 获取数据 + needed_history = call_vars.history.values() + return RenderInput( question=call_vars.question, task_id=call_vars.ids.task_id, diff --git a/apps/scheduler/call/suggest/schema.py b/apps/scheduler/call/suggest/schema.py index c263192a..d7441070 100644 --- a/apps/scheduler/call/suggest/schema.py +++ b/apps/scheduler/call/suggest/schema.py @@ -24,6 +24,6 @@ class SuggestionOutput(DataBase): """问题推荐结果""" question: str - app_id: str = Field(alias="appId") + flow_name: str = Field(alias="flowName") flow_id: str = Field(alias="flowId") flow_description: str = Field(alias="flowDescription") diff --git a/apps/scheduler/call/suggest/suggest.py b/apps/scheduler/call/suggest/suggest.py index 443e5d2c..8b6ef3c5 100644 --- a/apps/scheduler/call/suggest/suggest.py +++ b/apps/scheduler/call/suggest/suggest.py @@ -89,7 +89,10 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO app_metadata = await AppCenterManager.fetch_app_data_by_id(self._app_id) self._avaliable_flows = {} for flow in app_metadata.flows: - self._avaliable_flows[flow.id] = flow.description + self._avaliable_flows[flow.id] = { + "name": flow.name, + "description": flow.description, + } return SuggestionInput( question=call_vars.question, @@ -154,9 +157,9 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO type=CallOutputType.DATA, content=SuggestionOutput( question=question, - appId=self._app_id, + flowName=self._avaliable_flows[config.flow_id]["name"], flowId=config.flow_id, - flowDescription=self._avaliable_flows[config.flow_id], + flowDescription=self._avaliable_flows[config.flow_id]["description"], ).model_dump(by_alias=True, exclude_none=True), ) pushed_questions += 1 @@ -181,9 +184,9 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO type=CallOutputType.DATA, content=SuggestionOutput( question=question, - appId=self._app_id, + flowName=self._avaliable_flows[self._flow_id]["name"], flowId=self._flow_id, - flowDescription=self._avaliable_flows[self._flow_id], + flowDescription=self._avaliable_flows[self._flow_id]["description"], ).model_dump(by_alias=True, exclude_none=True), ) pushed_questions += 1 diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 78ee41eb..55f0b78e 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -88,7 +88,11 @@ class FlowLoader: step["description"] = "结束节点" step["type"] = "end" else: - step["type"] = await NodeManager.get_node_call_id(step["node"]) + try: + step["type"] = await NodeManager.get_node_call_id(step["node"]) + except ValueError as e: + logger.warning("[FlowLoader] 获取节点call_id失败:%s,错误信息:%s", step["node"], e) + step["type"] = "Empty" step["name"] = ( (await NodeManager.get_node_name(step["node"])) if "name" not in step or step["name"] == "" diff --git a/apps/scheduler/pool/loader/openapi.py b/apps/scheduler/pool/loader/openapi.py index 7b01ed97..59379887 100644 --- a/apps/scheduler/pool/loader/openapi.py +++ b/apps/scheduler/pool/loader/openapi.py @@ -19,6 +19,7 @@ from apps.scheduler.openapi import ( ReducedOpenAPISpec, reduce_openapi_spec, ) +from apps.scheduler.slot.slot import Slot from apps.scheduler.util import yaml_str_presenter logger = logging.getLogger(__name__) @@ -102,6 +103,10 @@ class OpenAPILoader: logger.warning(err) raise ValueError(err) from e + # 将数据转为已知JSON + known_body = Slot(inp.body).create_empty_slot() if inp.body else {} + known_query = Slot(inp.query).create_empty_slot() if inp.query else {} + try: out = APINodeOutput( result=spec.spec["responses"]["content"]["application/json"]["schema"], @@ -114,6 +119,9 @@ class OpenAPILoader: known_params = { "url": url, "method": method, + "content_type": content_type, + "body": known_body, + "query": known_query, } return inp, out, known_params diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index 2513ed27..af2ad92c 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -8,13 +8,12 @@ import asyncio import logging from apps.common.queue import MessageQueue -from apps.entities.enum_var import EventType, StepStatus +from apps.entities.enum_var import EventType from apps.entities.rag_data import RAGQueryReq from apps.entities.request_data import RequestData from apps.entities.scheduler import ExecutorBackground from apps.entities.task import Task from apps.manager.appcenter import AppCenterManager -from apps.manager.flow import FlowManager from apps.manager.task import TaskManager from apps.manager.user import UserManager from apps.scheduler.executor.flow import FlowExecutor @@ -175,5 +174,4 @@ class Scheduler: # 更新Task task = await TaskManager.get_task(task.id) - # 如果状态正常,则更新Flow的debug状态 return diff --git a/apps/scheduler/slot/slot.py b/apps/scheduler/slot/slot.py index 521e228a..49add954 100644 --- a/apps/scheduler/slot/slot.py +++ b/apps/scheduler/slot/slot.py @@ -95,65 +95,106 @@ class Slot: Draft7Validator, type_checker=type_checker, format_checker=format_checker, validators=_KEYWORD_CHECKER, ) - @staticmethod - def _process_json_value(json_value: Any, spec_data: dict[str, Any]) -> Any: # noqa: C901, PLR0911, PLR0912 - """ - 使用递归的方式对JSON返回值进行处理 - - :param json_value: 返回值中的字段 - :param spec_data: 返回值字段对应的JSON Schema - :return: 处理后的这部分返回值字段 - """ - if "allOf" in spec_data: - processed_dict = {} - for item in spec_data["allOf"]: - processed_dict.update(Slot._process_json_value(json_value, item)) - return processed_dict - - for key in ("anyOf", "oneOf"): - if key in spec_data: - for item in spec_data[key]: - processed_dict = Slot._process_json_value(json_value, item) - if processed_dict is not None: - return processed_dict - - if "type" in spec_data: - if spec_data["type"] == "array" and isinstance(json_value, list): - # 若Schema不标准,则不进行处理 - if "items" not in spec_data: - return json_value - # Schema标准 - return [Slot._process_json_value(item, spec_data["items"]) for item in json_value] - if spec_data["type"] == "object" and isinstance(json_value, dict): - # 若Schema不标准,则不进行处理 - if "properties" not in spec_data: - return json_value - # Schema标准 - processed_dict = {} - for key, val in json_value.items(): - if key not in spec_data["properties"]: - processed_dict[key] = val - continue - processed_dict[key] = Slot._process_json_value(val, spec_data["properties"][key]) - return processed_dict - for converter in _TYPE_CONVERTER: - # 如果是自定义类型 - if converter.name == spec_data["type"]: - # 如果类型有附加字段 - if converter.name in spec_data: - return converter.convert(json_value, **spec_data[converter.name]) - return converter.convert(json_value) - - return json_value - - def process_json(self, json_data: str | dict[str, Any]) -> dict[str, Any]: + def process_json(self, json_data: str | dict[str, Any]) -> dict[str, Any]: # noqa: C901 """将提供的JSON数据进行处理""" if isinstance(json_data, str): json_data = json.loads(json_data) + def _process_json_value(json_value: Any, spec_data: dict[str, Any]) -> Any: # noqa: C901, PLR0911, PLR0912 + """ + 使用递归的方式对JSON返回值进行处理 + + :param json_value: 返回值中的字段 + :param spec_data: 返回值字段对应的JSON Schema + :return: 处理后的这部分返回值字段 + """ + if "allOf" in spec_data: + processed_dict = {} + for item in spec_data["allOf"]: + processed_dict.update(_process_json_value(json_value, item)) + return processed_dict + + for key in ("anyOf", "oneOf"): + if key in spec_data: + for item in spec_data[key]: + processed_dict = _process_json_value(json_value, item) + if processed_dict is not None: + return processed_dict + + if "type" in spec_data: + if spec_data["type"] == "array" and isinstance(json_value, list): + # 若Schema不标准,则不进行处理 + if "items" not in spec_data: + return json_value + # Schema标准 + return [_process_json_value(item, spec_data["items"]) for item in json_value] + if spec_data["type"] == "object" and isinstance(json_value, dict): + # 若Schema不标准,则不进行处理 + if "properties" not in spec_data: + return json_value + # Schema标准 + processed_dict = {} + for key, val in json_value.items(): + if key not in spec_data["properties"]: + processed_dict[key] = val + continue + processed_dict[key] = _process_json_value(val, spec_data["properties"][key]) + return processed_dict + + for converter in _TYPE_CONVERTER: + # 如果是自定义类型 + if converter.name == spec_data["type"]: + # 如果类型有附加字段 + if converter.name in spec_data: + return converter.convert(json_value, **spec_data[converter.name]) + return converter.convert(json_value) + + return json_value + # 遍历JSON,处理每一个字段 - return Slot._process_json_value(json_data, self._schema) + return _process_json_value(json_data, self._schema) + + + def create_empty_slot(self) -> dict[str, Any]: + """创建一个空的槽位""" + def _generate_example(schema_node: dict) -> Any: # noqa: PLR0911 + # 处理类型为 object 的节点 + if "default" in schema_node: + return schema_node["default"] + + if "type" not in schema_node: + return None + + if schema_node["type"] == "object": + data = {} + properties = schema_node.get("properties", {}) + for name, schema in properties.items(): + data[name] = _generate_example(schema) + return data + + # 处理类型为 array 的节点 + if schema_node["type"] == "array": + items_schema = schema_node.get("items", {}) + return [_generate_example(items_schema)] + + # 处理类型为 string 的节点 + if schema_node["type"] == "string": + return "" + + # 处理类型为 number 或 integer 的节点 + if schema_node["type"] in ["number", "integer"]: + return 0 + + # 处理类型为 boolean 的节点 + if schema_node["type"] == "boolean": + return False + + # 处理其他类型或未定义类型 + return None + + return _generate_example(self._schema) + def _flatten_schema(self, schema: dict[str, Any]) -> tuple[dict[str, Any], list[str]]: """将JSON Schema扁平化""" diff --git a/apps/service/flow.py b/apps/service/flow.py index 1fe71d6b..71ec160c 100644 --- a/apps/service/flow.py +++ b/apps/service/flow.py @@ -256,46 +256,3 @@ class FlowService: # 检查是否能到达终止节点 return end_id in vis - - -def generate_from_schema(schema: dict) -> Any: - """ - 根据 JSON Schema 生成示例 JSON 数据。 - - :param schema: JSON Schema 字典 - :return: 生成的示例 JSON 数据,可能是字典、列表、字符串、数字、布尔值或 None - """ - - def _generate_example(schema_node: dict) -> Any: - # 处理类型为 object 的节点 - if "default" in schema_node: - return schema_node["default"] - if schema_node.get("type") == "object": - example = {} - properties = schema_node.get("properties", {}) - for prop_name, prop_schema in properties.items(): - example[prop_name] = _generate_example(prop_schema) - return example - - # 处理类型为 array 的节点 - if schema_node.get("type") == "array": - items_schema = schema_node.get("items", {}) - return [_generate_example(items_schema)] - - # 处理类型为 string 的节点 - if schema_node.get("type") == "string": - return "example_string" - - # 处理类型为 number 或 integer 的节点 - if schema_node.get("type") in ["number", "integer"]: - return 0 - - # 处理类型为 boolean 的节点 - if schema_node.get("type") == "boolean": - return False - - # 处理其他类型或未定义类型 - return None - - # 生成示例数据 - return _generate_example(schema) -- Gitee