diff --git a/Dockerfile b/Dockerfile index 27c7bab79d4372d614e85c2ef97503b2a6fbe699..4cabfda46abbf4a2a37034301eeba3a0060792b7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM hub.oepkgs.net/neocopilot/framework_base:0.9.5-x86-test +FROM hub.oepkgs.net/neocopilot/framework-baseimg:dev USER root RUN sed -i 's/umask 002/umask 027/g' /etc/bashrc && \ diff --git a/apps/manager/flow.py b/apps/manager/flow.py index 3cfe2d90791e810b9991d32533c5449dc8e54f83..80a046dc3af1bb72da97bd519ad29ac410894e3f 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -22,7 +22,7 @@ from apps.entities.flow_topology import ( from apps.manager.node import NodeManager from apps.models.mongo import MongoDB from apps.scheduler.pool.loader.flow import FlowLoader -from apps.service.flow import generate_from_schema +from apps.scheduler.slot.slot import Slot logger = logging.getLogger(__name__) @@ -87,8 +87,9 @@ class FlowManager: async for node_pool_record in cursor: params_schema, output_schema = await NodeManager.get_node_params(node_pool_record["_id"]) try: + # TODO: 由于现在没有动态表单,所以暂时使用Slot的create_empty_slot方法 parameters = { - "input_parameters": generate_from_schema(params_schema), + "input_parameters": Slot(params_schema).create_empty_slot(), "output_parameters": output_schema, } except Exception: diff --git a/apps/manager/service.py b/apps/manager/service.py index 5cb870795a6ab67c3e25f057138bb3c89ade914b..c71fc73ff5afdb2f44452f99056a51cb48d70964 100644 --- a/apps/manager/service.py +++ b/apps/manager/service.py @@ -225,7 +225,7 @@ class ServiceCenterManager: """获取服务数据""" # 验证用户权限 service_collection = MongoDB.get_collection("service") - db_service = await service_collection.find_one({"_id": service_id}) + db_service = await service_collection.find_one({"_id": service_id, "author": user_sub}) if not db_service: msg = "Service not found" raise ServiceIDError(msg) @@ -241,6 +241,26 @@ class ServiceCenterManager: return service_pool_store.name, service_data + @staticmethod + async def get_service_metadata( + user_sub: str, + service_id: str, + ) -> ServiceMetadata: + """获取服务元数据""" + service_collection = MongoDB.get_collection("service") + db_service = await service_collection.find_one({"_id": service_id, "author": user_sub}) + if not db_service: + msg = "Service not found" + raise ServiceIDError(msg) + + metadata_path = ( + Path(Config().get_config().deploy.data_dir) / "semantics" / "service" / service_id / "metadata.yaml" + ) + async with await metadata_path.open() as f: + metadata_data = yaml.safe_load(await f.read()) + return ServiceMetadata.model_validate(metadata_data) + + @staticmethod async def delete_service( user_sub: str, diff --git a/apps/routers/chat.py b/apps/routers/chat.py index a8414cddd98a6b6f28b7a0ac972cf2aa6ae71710..a17f4ee57116a9a149acedb7cbe12fada6fde176 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -25,6 +25,7 @@ from apps.entities.request_data import RequestData from apps.entities.response_data import ResponseData from apps.manager.flow import FlowManager from apps.manager.blacklist import QuestionBlacklistManager, UserBlacklistManager +from apps.manager.flow import FlowManager from apps.manager.task import TaskManager from apps.scheduler.scheduler import Scheduler from apps.scheduler.scheduler.context import save_data @@ -106,7 +107,12 @@ async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) yield "data: [DONE]\n\n" if post_body.app and post_body.app.flow_id: - await FlowManager.update_flow_debug_by_app_and_flow_id(post_body.app.app_id, post_body.app.flow_id, debug=True) + await FlowManager.update_flow_debug_by_app_and_flow_id( + post_body.app.app_id, + post_body.app.flow_id, + debug=True, + ) + except Exception: logger.exception("[Chat] 生成答案失败") yield "data: [ERROR]\n\n" diff --git a/apps/routers/flow.py b/apps/routers/flow.py index e9844696c317eb484f08791b7f473e09f7479744..6060e61e2ea622de7be8e52af5481c3bd13ed4a4 100644 --- a/apps/routers/flow.py +++ b/apps/routers/flow.py @@ -96,9 +96,9 @@ async def get_services( status.HTTP_404_NOT_FOUND: {"model": ResponseData}, }) async def get_flow( - user_sub: Annotated[str, Depends(get_user)], - app_id: Annotated[str, Query(alias="appId")], - flow_id: Annotated[str, Query(alias="flowId")], + user_sub: Annotated[str, Depends(get_user)], + app_id: Annotated[str, Query(alias="appId")], + flow_id: Annotated[str, Query(alias="flowId")], ) -> JSONResponse: """获取流拓扑结构""" if not await AppManager.validate_user_app_access(user_sub, app_id): diff --git a/apps/scheduler/call/api/api.py b/apps/scheduler/call/api/api.py index 0e42b973cc0ee65cd35730c9e8626dba979d3b61..dc7bf0e569f5fadde8fa66247f8230033b28d4a9 100644 --- a/apps/scheduler/call/api/api.py +++ b/apps/scheduler/call/api/api.py @@ -7,7 +7,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. import json import logging from collections.abc import AsyncGenerator -from typing import TYPE_CHECKING, Any, Self +from typing import Any import aiohttp from fastapi import status @@ -15,7 +15,6 @@ from pydantic import Field from apps.common.oidc import oidc_provider from apps.entities.enum_var import CallOutputType, ContentType, HTTPMethod -from apps.entities.flow import ServiceMetadata from apps.entities.scheduler import ( CallError, CallInfo, @@ -27,10 +26,6 @@ from apps.manager.token import TokenManager from apps.scheduler.call.api.schema import APIInput, APIOutput from apps.scheduler.call.core import CoreCall -if TYPE_CHECKING: - from apps.scheduler.executor.step import StepExecutor - - logger = logging.getLogger(__name__) SUCCESS_HTTP_CODES = [ status.HTTP_200_OK, @@ -72,22 +67,25 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): async def _init(self, call_vars: CallVars) -> APIInput: """初始化API调用工具""" - # 获取对应API的Service Metadata - try: - service_metadata = await ServiceCenterManager.get_service_data( - call_vars.ids.user_sub, self.node.service_id or "", - ) - service_metadata = ServiceMetadata.model_validate(service_metadata) - except Exception as e: - raise CallError( - message="API接口的Service Metadata获取失败", - data={}, - ) from e - - # 获取Service对应的Auth - self._auth = service_metadata.api.auth + self._service_id = "" self._session_id = call_vars.ids.session_id - self._service_id = call_vars.ids.service_id + self._auth = None + + if self.node: + # 获取对应API的Service Metadata + try: + service_metadata = await ServiceCenterManager.get_service_metadata( + call_vars.ids.user_sub, self.node.service_id or "", + ) + except Exception as e: + raise CallError( + message="API接口的Service Metadata获取失败", + data={}, + ) from e + + # 获取Service对应的Auth + self._auth = service_metadata.api.auth + self._service_id = self.node.service_id or "" return APIInput( url=self.url, @@ -111,7 +109,7 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): await self._session.close() - async def _make_api_call(self, data: dict | None, files: aiohttp.FormData): # noqa: ANN202 + async def _make_api_call(self, data: dict | None, files: aiohttp.FormData): # noqa: ANN202, C901 """组装API请求Session""" # 获取必要参数 req_header = { @@ -141,7 +139,10 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): ) req_header.update({"access-token": token}) - if self.method in ["get", "delete"]: + if self.method in [ + HTTPMethod.GET.value, + HTTPMethod.DELETE.value, + ]: req_params.update(data) return self._session.request( self.method, @@ -152,8 +153,15 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): timeout=self._timeout, ) - if self.method in ["post", "put", "patch"]: - if self.content_type == "form": + if self.method in [ + HTTPMethod.POST.value, + HTTPMethod.PUT.value, + HTTPMethod.PATCH.value, + ]: + if self.content_type in [ + ContentType.FORM_URLENCODED.value, + ContentType.MULTIPART_FORM_DATA.value, + ]: form_data = files for key, val in data.items(): form_data.add_field(key, val) @@ -174,8 +182,10 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): timeout=self._timeout, ) - err = "Method not implemented." - raise NotImplementedError(err) + raise CallError( + message="API接口的HTTP Method不支持", + data={}, + ) async def _call_api(self, final_data: dict[str, Any] | None = None) -> APIOutput: """实际调用API,并处理返回值""" @@ -207,8 +217,11 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): # 如果返回值是JSON try: response_dict = json.loads(response_data) - except Exception: - logger.exception("[API] 返回值不是JSON格式!") + except Exception as e: + raise CallError( + message="API接口的返回值不是JSON格式", + data={}, + ) from e return APIOutput( http_code=response_status, diff --git a/apps/scheduler/call/graph/graph.py b/apps/scheduler/call/graph/graph.py index 020326b62dfd8f023d4b406abb7903704ae9335d..8bfeb7c037a18e7a69a16ee9d74c85913acde6b8 100644 --- a/apps/scheduler/call/graph/graph.py +++ b/apps/scheduler/call/graph/graph.py @@ -26,7 +26,7 @@ from apps.scheduler.call.graph.style import RenderStyle class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): """Render Call,用于将SQL Tool查询出的数据转换为图表""" - data: list[dict[str, Any]] = Field(description="用于绘制图表的数据", exclude=True, frozen=True) + dataset_key: str = Field(description="图表的数据来源(字段名)") @classmethod @@ -37,8 +37,6 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): async def _init(self, call_vars: CallVars) -> RenderInput: """初始化Render Call,校验参数,读取option模板""" - await super()._init(call_vars) - try: option_location = Path(__file__).parent / "option.json" f = await Path(option_location).open(encoding="utf-8") @@ -48,6 +46,9 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): except Exception as e: raise CallError(message=f"图表模板读取失败:{e!s}", data={}) from e + # 获取数据 + needed_history = call_vars.history.values() + return RenderInput( question=call_vars.question, task_id=call_vars.ids.task_id, diff --git a/apps/scheduler/call/suggest/schema.py b/apps/scheduler/call/suggest/schema.py index c263192a0c4d2a3017f7c11a9eee7d854a74016d..d744107045f240fd7841d16f9aabcab6b41d91b0 100644 --- a/apps/scheduler/call/suggest/schema.py +++ b/apps/scheduler/call/suggest/schema.py @@ -24,6 +24,6 @@ class SuggestionOutput(DataBase): """问题推荐结果""" question: str - app_id: str = Field(alias="appId") + flow_name: str = Field(alias="flowName") flow_id: str = Field(alias="flowId") flow_description: str = Field(alias="flowDescription") diff --git a/apps/scheduler/call/suggest/suggest.py b/apps/scheduler/call/suggest/suggest.py index 443e5d2cad15b2cc0a881e28f974dfe2830e7a6f..8b6ef3c5614cbf7cab9ac25fc6eab526d01963a8 100644 --- a/apps/scheduler/call/suggest/suggest.py +++ b/apps/scheduler/call/suggest/suggest.py @@ -89,7 +89,10 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO app_metadata = await AppCenterManager.fetch_app_data_by_id(self._app_id) self._avaliable_flows = {} for flow in app_metadata.flows: - self._avaliable_flows[flow.id] = flow.description + self._avaliable_flows[flow.id] = { + "name": flow.name, + "description": flow.description, + } return SuggestionInput( question=call_vars.question, @@ -154,9 +157,9 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO type=CallOutputType.DATA, content=SuggestionOutput( question=question, - appId=self._app_id, + flowName=self._avaliable_flows[config.flow_id]["name"], flowId=config.flow_id, - flowDescription=self._avaliable_flows[config.flow_id], + flowDescription=self._avaliable_flows[config.flow_id]["description"], ).model_dump(by_alias=True, exclude_none=True), ) pushed_questions += 1 @@ -181,9 +184,9 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO type=CallOutputType.DATA, content=SuggestionOutput( question=question, - appId=self._app_id, + flowName=self._avaliable_flows[self._flow_id]["name"], flowId=self._flow_id, - flowDescription=self._avaliable_flows[self._flow_id], + flowDescription=self._avaliable_flows[self._flow_id]["description"], ).model_dump(by_alias=True, exclude_none=True), ) pushed_questions += 1 diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 78ee41eb540a780c3701aebd4c52b7647a598f89..55f0b78e6f77be21321dc5a4160ffc038a596321 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -88,7 +88,11 @@ class FlowLoader: step["description"] = "结束节点" step["type"] = "end" else: - step["type"] = await NodeManager.get_node_call_id(step["node"]) + try: + step["type"] = await NodeManager.get_node_call_id(step["node"]) + except ValueError as e: + logger.warning("[FlowLoader] 获取节点call_id失败:%s,错误信息:%s", step["node"], e) + step["type"] = "Empty" step["name"] = ( (await NodeManager.get_node_name(step["node"])) if "name" not in step or step["name"] == "" diff --git a/apps/scheduler/pool/loader/openapi.py b/apps/scheduler/pool/loader/openapi.py index 7b01ed97ddf043b72ba279f029c4eb7b592bdb59..59379887025d68d5346608f54a50abd75c243328 100644 --- a/apps/scheduler/pool/loader/openapi.py +++ b/apps/scheduler/pool/loader/openapi.py @@ -19,6 +19,7 @@ from apps.scheduler.openapi import ( ReducedOpenAPISpec, reduce_openapi_spec, ) +from apps.scheduler.slot.slot import Slot from apps.scheduler.util import yaml_str_presenter logger = logging.getLogger(__name__) @@ -102,6 +103,10 @@ class OpenAPILoader: logger.warning(err) raise ValueError(err) from e + # 将数据转为已知JSON + known_body = Slot(inp.body).create_empty_slot() if inp.body else {} + known_query = Slot(inp.query).create_empty_slot() if inp.query else {} + try: out = APINodeOutput( result=spec.spec["responses"]["content"]["application/json"]["schema"], @@ -114,6 +119,9 @@ class OpenAPILoader: known_params = { "url": url, "method": method, + "content_type": content_type, + "body": known_body, + "query": known_query, } return inp, out, known_params diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index 2513ed2798bf1c07669fcc1d0232738ed3fd569c..af2ad92cdf08bcd3ae473bb7ca5a66fdb43b87e8 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -8,13 +8,12 @@ import asyncio import logging from apps.common.queue import MessageQueue -from apps.entities.enum_var import EventType, StepStatus +from apps.entities.enum_var import EventType from apps.entities.rag_data import RAGQueryReq from apps.entities.request_data import RequestData from apps.entities.scheduler import ExecutorBackground from apps.entities.task import Task from apps.manager.appcenter import AppCenterManager -from apps.manager.flow import FlowManager from apps.manager.task import TaskManager from apps.manager.user import UserManager from apps.scheduler.executor.flow import FlowExecutor @@ -175,5 +174,4 @@ class Scheduler: # 更新Task task = await TaskManager.get_task(task.id) - # 如果状态正常,则更新Flow的debug状态 return diff --git a/apps/scheduler/slot/slot.py b/apps/scheduler/slot/slot.py index 521e228a6bd37d86fbf0c5aea4ef96c2b4c591f3..49add954aa442eea0d0fa7460438f44c2bbecb92 100644 --- a/apps/scheduler/slot/slot.py +++ b/apps/scheduler/slot/slot.py @@ -95,65 +95,106 @@ class Slot: Draft7Validator, type_checker=type_checker, format_checker=format_checker, validators=_KEYWORD_CHECKER, ) - @staticmethod - def _process_json_value(json_value: Any, spec_data: dict[str, Any]) -> Any: # noqa: C901, PLR0911, PLR0912 - """ - 使用递归的方式对JSON返回值进行处理 - - :param json_value: 返回值中的字段 - :param spec_data: 返回值字段对应的JSON Schema - :return: 处理后的这部分返回值字段 - """ - if "allOf" in spec_data: - processed_dict = {} - for item in spec_data["allOf"]: - processed_dict.update(Slot._process_json_value(json_value, item)) - return processed_dict - - for key in ("anyOf", "oneOf"): - if key in spec_data: - for item in spec_data[key]: - processed_dict = Slot._process_json_value(json_value, item) - if processed_dict is not None: - return processed_dict - - if "type" in spec_data: - if spec_data["type"] == "array" and isinstance(json_value, list): - # 若Schema不标准,则不进行处理 - if "items" not in spec_data: - return json_value - # Schema标准 - return [Slot._process_json_value(item, spec_data["items"]) for item in json_value] - if spec_data["type"] == "object" and isinstance(json_value, dict): - # 若Schema不标准,则不进行处理 - if "properties" not in spec_data: - return json_value - # Schema标准 - processed_dict = {} - for key, val in json_value.items(): - if key not in spec_data["properties"]: - processed_dict[key] = val - continue - processed_dict[key] = Slot._process_json_value(val, spec_data["properties"][key]) - return processed_dict - for converter in _TYPE_CONVERTER: - # 如果是自定义类型 - if converter.name == spec_data["type"]: - # 如果类型有附加字段 - if converter.name in spec_data: - return converter.convert(json_value, **spec_data[converter.name]) - return converter.convert(json_value) - - return json_value - - def process_json(self, json_data: str | dict[str, Any]) -> dict[str, Any]: + def process_json(self, json_data: str | dict[str, Any]) -> dict[str, Any]: # noqa: C901 """将提供的JSON数据进行处理""" if isinstance(json_data, str): json_data = json.loads(json_data) + def _process_json_value(json_value: Any, spec_data: dict[str, Any]) -> Any: # noqa: C901, PLR0911, PLR0912 + """ + 使用递归的方式对JSON返回值进行处理 + + :param json_value: 返回值中的字段 + :param spec_data: 返回值字段对应的JSON Schema + :return: 处理后的这部分返回值字段 + """ + if "allOf" in spec_data: + processed_dict = {} + for item in spec_data["allOf"]: + processed_dict.update(_process_json_value(json_value, item)) + return processed_dict + + for key in ("anyOf", "oneOf"): + if key in spec_data: + for item in spec_data[key]: + processed_dict = _process_json_value(json_value, item) + if processed_dict is not None: + return processed_dict + + if "type" in spec_data: + if spec_data["type"] == "array" and isinstance(json_value, list): + # 若Schema不标准,则不进行处理 + if "items" not in spec_data: + return json_value + # Schema标准 + return [_process_json_value(item, spec_data["items"]) for item in json_value] + if spec_data["type"] == "object" and isinstance(json_value, dict): + # 若Schema不标准,则不进行处理 + if "properties" not in spec_data: + return json_value + # Schema标准 + processed_dict = {} + for key, val in json_value.items(): + if key not in spec_data["properties"]: + processed_dict[key] = val + continue + processed_dict[key] = _process_json_value(val, spec_data["properties"][key]) + return processed_dict + + for converter in _TYPE_CONVERTER: + # 如果是自定义类型 + if converter.name == spec_data["type"]: + # 如果类型有附加字段 + if converter.name in spec_data: + return converter.convert(json_value, **spec_data[converter.name]) + return converter.convert(json_value) + + return json_value + # 遍历JSON,处理每一个字段 - return Slot._process_json_value(json_data, self._schema) + return _process_json_value(json_data, self._schema) + + + def create_empty_slot(self) -> dict[str, Any]: + """创建一个空的槽位""" + def _generate_example(schema_node: dict) -> Any: # noqa: PLR0911 + # 处理类型为 object 的节点 + if "default" in schema_node: + return schema_node["default"] + + if "type" not in schema_node: + return None + + if schema_node["type"] == "object": + data = {} + properties = schema_node.get("properties", {}) + for name, schema in properties.items(): + data[name] = _generate_example(schema) + return data + + # 处理类型为 array 的节点 + if schema_node["type"] == "array": + items_schema = schema_node.get("items", {}) + return [_generate_example(items_schema)] + + # 处理类型为 string 的节点 + if schema_node["type"] == "string": + return "" + + # 处理类型为 number 或 integer 的节点 + if schema_node["type"] in ["number", "integer"]: + return 0 + + # 处理类型为 boolean 的节点 + if schema_node["type"] == "boolean": + return False + + # 处理其他类型或未定义类型 + return None + + return _generate_example(self._schema) + def _flatten_schema(self, schema: dict[str, Any]) -> tuple[dict[str, Any], list[str]]: """将JSON Schema扁平化""" diff --git a/apps/service/flow.py b/apps/service/flow.py index 1fe71d6b6c7b97120e5b1fa48663ea001db79ffb..71ec160c40590e602a8233fdc31938958361b539 100644 --- a/apps/service/flow.py +++ b/apps/service/flow.py @@ -256,46 +256,3 @@ class FlowService: # 检查是否能到达终止节点 return end_id in vis - - -def generate_from_schema(schema: dict) -> Any: - """ - 根据 JSON Schema 生成示例 JSON 数据。 - - :param schema: JSON Schema 字典 - :return: 生成的示例 JSON 数据,可能是字典、列表、字符串、数字、布尔值或 None - """ - - def _generate_example(schema_node: dict) -> Any: - # 处理类型为 object 的节点 - if "default" in schema_node: - return schema_node["default"] - if schema_node.get("type") == "object": - example = {} - properties = schema_node.get("properties", {}) - for prop_name, prop_schema in properties.items(): - example[prop_name] = _generate_example(prop_schema) - return example - - # 处理类型为 array 的节点 - if schema_node.get("type") == "array": - items_schema = schema_node.get("items", {}) - return [_generate_example(items_schema)] - - # 处理类型为 string 的节点 - if schema_node.get("type") == "string": - return "example_string" - - # 处理类型为 number 或 integer 的节点 - if schema_node.get("type") in ["number", "integer"]: - return 0 - - # 处理类型为 boolean 的节点 - if schema_node.get("type") == "boolean": - return False - - # 处理其他类型或未定义类型 - return None - - # 生成示例数据 - return _generate_example(schema)