diff --git a/apps/entities/enum_var.py b/apps/entities/enum_var.py index 9979fee5a677992c76d1e6ba6219106d2e2b079e..328dd626d6e9cf3512f66cc04a383101a9279f5c 100644 --- a/apps/entities/enum_var.py +++ b/apps/entities/enum_var.py @@ -107,6 +107,8 @@ class PermissionType(str, Enum): PROTECTED = "protected" PUBLIC = "public" PRIVATE = "private" + + class SearchType(str, Enum): """搜索类型""" @@ -114,3 +116,21 @@ class SearchType(str, Enum): NAME = "name" DESCRIPTION = "description" AUTHOR = "author" + + +class HTTPMethod(str, Enum): + """HTTP方法""" + + GET = "get" + POST = "post" + PUT = "put" + DELETE = "delete" + PATCH = "patch" + + +class ContentType(str, Enum): + """Content-Type""" + + JSON = "application/json" + FORM_URLENCODED = "application/x-www-form-urlencoded" + MULTIPART_FORM_DATA = "multipart/form-data" diff --git a/apps/entities/pool.py b/apps/entities/pool.py index fc35cfce62a9c39d89581a83c639ce8052e8040a..28155cb78c602ff83ee24e72aa303a00fc76b0f6 100644 --- a/apps/entities/pool.py +++ b/apps/entities/pool.py @@ -47,9 +47,8 @@ class CallPool(BaseData): collection: call - 路径的格式: - 1. 系统Call的路径格式样例:“LLM” - 2. Python Call的路径格式样例:“tune::call.tune.CheckSystem” + “path”的格式如下: + 1. Python代码会被导入成包,路径格式为`python::::`,用于查找Call的包路径和类路径 """ type: CallType = Field(description="Call的类型") @@ -69,12 +68,19 @@ class NodePool(BaseData): """Node合并前的信息(作为附带信息的指针) collection: node + + annotation为Node的路径,指示Node的类型、来源等 + annotation的格式如下: + 1. 无路径(如对应的Call等):为None + 2. 从openapi中获取:`openapi::` """ service_id: Optional[str] = Field(description="Node所属的Service ID", default=None) call_id: str = Field(description="所使用的Call的ID") - path: Optional[str] = Field(description="Node的路径", default=None) - known_params: Optional[dict[str, Any]] = Field(description="已知的用于Call部分的参数", default=None) + annotation: Optional[str] = Field(description="Node的注释", default=None) + known_params: Optional[dict[str, Any]] = Field(description="已知的用于Call部分的参数,独立于输入和输出之外", default=None) + override_input: Optional[dict[str, Any]] = Field(description="Node的输入Schema;用于描述Call的参数中特定的字段", default=None) + override_output: Optional[dict[str, Any]] = Field(description="Node的输出Schema;用于描述Call的输出中特定的字段", default=None) class AppFlow(BaseData): diff --git a/apps/entities/vector.py b/apps/entities/vector.py index d7f207aed0dde7ee7cc431aa7e15de54555bb21f..ec8222b669b417a2a44cab31f1213c1dbad28501 100644 --- a/apps/entities/vector.py +++ b/apps/entities/vector.py @@ -25,6 +25,14 @@ class ServicePoolVector(Base): embedding = Column(Vector(1024), nullable=False) +class CallPoolVector(Base): + """Call向量信息""" + + __tablename__ = "call_vector" + id = Column(String(length=100), primary_key=True, nullable=False, unique=True) + embedding = Column(Vector(1024), nullable=False) + + class NodePoolVector(Base): """Node向量信息""" diff --git a/apps/llm/reasoning.py b/apps/llm/reasoning.py index 31b12e929d3601e6b09e78973a88d71b77bb97f8..39dfbb9b9d3802cb4060ab60375dc28bbb0e49ff 100644 --- a/apps/llm/reasoning.py +++ b/apps/llm/reasoning.py @@ -84,7 +84,7 @@ class ReasoningLLM(metaclass=Singleton): result = "" is_first_chunk = True - is_reasoning = True + is_reasoning = False reasoning_type = "" async for chunk in stream: diff --git a/apps/models/postgres.py b/apps/models/postgres.py index 461d398c2b945ef913f42c13d61208e57c4f0c47..1270626c4418a8032d10fd70026ba330e7141b3a 100644 --- a/apps/models/postgres.py +++ b/apps/models/postgres.py @@ -43,7 +43,7 @@ class PostgreSQL: @staticmethod - async def get_embedding(text: list[str]) -> list[float]: + async def get_embedding(text: list[str]) -> list[list[float]]: """访问OpenAI兼容的Embedding API,获得向量化数据 :param text: 待向量化文本(多条文本组成List) diff --git a/apps/scheduler/call/api.py b/apps/scheduler/call/api.py index bcca063d815e311b0063bb1249dee90d6af29127..ccaac5e90de5ba70ab109e9722a8979e9dce41bf 100644 --- a/apps/scheduler/call/api.py +++ b/apps/scheduler/call/api.py @@ -19,15 +19,17 @@ from apps.scheduler.slot.slot import Slot class APIParams(BaseModel): """API调用工具的参数""" - full_url: str = Field(description="API接口的完整URL") + url: str = Field(description="API接口的完整URL") method: Literal[ "get", "post", "put", "delete", "patch", ] = Field(description="API接口的HTTP Method") + content_type: Literal[ + "application/json", "application/x-www-form-urlencoded", "multipart/form-data", + ] = Field(description="API接口的Content-Type") timeout: int = Field(description="工具超时时间", default=300) - body_override: dict[str, Any] = Field(description="固定数据", default={}) - auth: dict[str, Any] = Field(description="API鉴权信息", default={}) + known_body: dict[str, Any] = Field(description="已知的部分请求体", default={}) input_schema: dict[str, Any] = Field(description="API请求体的JSON Schema", default={}) - service_id: Optional[str] = Field(description="服务ID") + auth: dict[str, Any] = Field(description="API鉴权信息", default={}) class _APIOutput(BaseModel): @@ -44,21 +46,6 @@ class API(metaclass=CoreCall, param_cls=APIParams, output_cls=_APIOutput): name: str = "api" description: str = "根据给定的用户输入和历史记录信息,向某一个API接口发送请求、获取数据。" - - def init(self, syscall_vars: SysCallVars, **kwargs) -> None: # noqa: ANN003 - """初始化API调用工具""" - if kwargs["method"] == "POST": - if "requestBody" in self._spec[2]: - self.slot_schema, self._data_type = self._check_data_type(self._spec[2]["requestBody"]["content"]) - elif kwargs["method"] == "GET": - if "parameters" in self._spec[2]: - self.slot_schema = self.parameters_to_spec(self._spec[2]["parameters"]) - self._data_type = "json" - else: - err = "[API] HTTP method not implemented." - raise NotImplementedError(err) - - async def __call__(self, slot_data: dict[str, Any]) -> _APIOutput: """调用API,然后返回LLM解析后的数据""" self._session = aiohttp.ClientSession() @@ -71,24 +58,6 @@ class API(metaclass=CoreCall, param_cls=APIParams, output_cls=_APIOutput): raise RuntimeError from e - @staticmethod - def parameters_to_spec(raw_schema: list[dict[str, Any]]) -> dict[str, Any]: - """将OpenAPI中GET接口List形式的请求体Spec转换为JSON Schema""" - schema = { - "type": "object", - "required": [], - "properties": {}, - } - for item in raw_schema: - if item["required"]: - schema["required"].append(item["name"]) - schema["properties"][item["name"]] = {} - schema["properties"][item["name"]]["description"] = item["description"] - for key, val in item["schema"].items(): - schema["properties"][item["name"]][key] = val - return schema - - async def _make_api_call(self, data: Optional[dict], files: aiohttp.FormData): # noqa: ANN202, C901 # 获取必要参数 params: APIParams = getattr(self, "_params") @@ -125,7 +94,7 @@ class API(metaclass=CoreCall, param_cls=APIParams, output_cls=_APIOutput): if params.method in ["get", "delete"]: req_params.update(data) - return self._session.request(params.method, params.full_url, params=req_params, headers=req_header, cookies=req_cookie, + return self._session.request(params.method, params.url, params=req_params, headers=req_header, cookies=req_cookie, timeout=params.timeout) if params.method in ["post", "put", "patch"]: @@ -133,31 +102,19 @@ class API(metaclass=CoreCall, param_cls=APIParams, output_cls=_APIOutput): form_data = files for key, val in data.items(): form_data.add_field(key, val) - return self._session.request(params.method, params.full_url, data=form_data, headers=req_header, cookies=req_cookie, + return self._session.request(params.method, params.url, data=form_data, headers=req_header, cookies=req_cookie, timeout=params.timeout) - return self._session.request(params.method, params.full_url, json=data, headers=req_header, cookies=req_cookie, + return self._session.request(params.method, params.url, json=data, headers=req_header, cookies=req_cookie, timeout=params.timeout) err = "Method not implemented." raise NotImplementedError(err) - @staticmethod - def _check_data_type(spec: dict) -> tuple[dict[str, Any], str]: - if "application/json" in spec: - return spec["application/json"]["schema"], "json" - if "x-www-form-urlencoded" in spec: - return spec["x-www-form-urlencoded"]["schema"], "form" - if "multipart/form-data" in spec: - return spec["multipart/form-data"]["schema"], "form" - - err = "Data type not implemented." - raise NotImplementedError(err) - async def _call_api(self, slot_data: Optional[dict[str, Any]] = None) -> _APIOutput: # 获取必要参数 params: APIParams = getattr(self, "_params") - LOGGER.info(f"调用接口{params.full_url},请求数据为{slot_data}") + LOGGER.info(f"调用接口{params.url},请求数据为{slot_data}") session_context = await self._make_api_call(slot_data, aiohttp.FormData()) async with session_context as response: @@ -176,10 +133,10 @@ class API(metaclass=CoreCall, param_cls=APIParams, output_cls=_APIOutput): response_schema = self._spec[2]["responses"]["content"]["application/json"]["schema"] else: response_schema = {} - LOGGER.info(f"调用接口{params.full_url}, 结果为 {response_data}") + LOGGER.info(f"调用接口{params.url}, 结果为 {response_data}") # 组装message - message = f"""You called the HTTP API "{params.full_url}", which is used to "{self._spec[2]['summary']}".""" + message = f"""You called the HTTP API "{params.url}", which is used to "{self._spec[2]['summary']}".""" # 如果没有返回结果 if response_data is None: return _APIOutput( diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py index 61029e6f7ca9beb7050a969568d93a534e1d3ded..15d70f058a1862e41cf39321ab3ba3e53b08f2f4 100644 --- a/apps/scheduler/call/core.py +++ b/apps/scheduler/call/core.py @@ -22,8 +22,8 @@ class CoreCall(type): if "description" not in attrs: err = f"类{cls_name}中不存在属性description" raise AttributeError(err) - if "call" not in attrs or not callable(attrs["call"]): - err = f"类{cls_name}中不存在属性call" + if "__call__" not in attrs or not callable(attrs["__call__"]): + err = f"类{cls_name}中不存在属性__call__" raise AttributeError(err) @staticmethod diff --git a/apps/scheduler/pool/check.py b/apps/scheduler/pool/check.py index 9a2954e985d52ae7bb67059492a809c5d31c87c7..54bd1fce9bab99f4f75c50cb63600be4a42394dc 100644 --- a/apps/scheduler/pool/check.py +++ b/apps/scheduler/pool/check.py @@ -6,6 +6,7 @@ from hashlib import sha256 from pathlib import Path from apps.common.config import config +from apps.constants import APP_DIR, LOGGER, SERVICE_DIR from apps.entities.enum_var import MetadataType from apps.models.mongo import MongoDB @@ -46,29 +47,36 @@ class FileChecker: """生成更新列表和删除列表""" if check_type == MetadataType.APP: collection = MongoDB.get_collection("app") - self._dir_path = Path(config["SEMANTICS_DIR"]) / "app" + self._dir_path = Path(config["SEMANTICS_DIR"]) / APP_DIR elif check_type == MetadataType.SERVICE: collection = MongoDB.get_collection("service") - self._dir_path = Path(config["SEMANTICS_DIR"]) / "service" + self._dir_path = Path(config["SEMANTICS_DIR"]) / SERVICE_DIR changed_list = [] deleted_list = [] + # 查询所有条目 try: - # 查询所有条目 - cursor = collection.find({}) - async for item in cursor: - hashes = item.get("hashes", {}) - # 判断是否存在? - if not (self._dir_path / item.get("name")).exists(): - deleted_list.append(item.get("name")) - continue - # 判断是否发生变化 - if self.diff_one(self._dir_path / item.get("name"), hashes): - changed_list.append(item.get("name")) - + items = await collection.find({}).to_list(None) except Exception as e: - err = f"Failed to check {check_type} files: {e!s}" + err = f"{check_type}类型数据的条目为空: {e!s}" + LOGGER.error(err) raise RuntimeError(err) from e + # 遍历列表 + for list_item in items: + # 判断是否存在? + if not (self._dir_path / list_item["_id"]).exists(): + deleted_list.append(list_item["_id"]) + continue + # 判断是否发生变化 + if "hashes" not in list_item or self.diff_one(self._dir_path / list_item["_id"], list_item["hashes"]): + changed_list.append(list_item["_id"]) + + # 遍历目录 + for service_folder in self._dir_path.iterdir(): + # 判断是否新增? + if service_folder.name not in items: + changed_list += [service_folder.name] + return changed_list, deleted_list diff --git a/apps/scheduler/pool/loader/call.py b/apps/scheduler/pool/loader/call.py index 3332e5aa7dae76c14bc4935099a9e933d22e768e..eba0a750eabc1357de0b6bd4cd3ccf53d29c1e10 100644 --- a/apps/scheduler/pool/loader/call.py +++ b/apps/scheduler/pool/loader/call.py @@ -13,11 +13,8 @@ import apps.scheduler.call as system_call from apps.common.config import config from apps.constants import CALL_DIR, LOGGER from apps.entities.enum_var import CallType -from apps.entities.pool import ( - CallPool, - NodePool, -) -from apps.entities.vector import NodePoolVector +from apps.entities.pool import CallPool +from apps.entities.vector import CallPoolVector from apps.models.mongo import MongoDB from apps.models.postgres import PostgreSQL @@ -50,10 +47,9 @@ class CallLoader: return flag - async def _load_system_call(self) -> tuple[list[CallPool], list[NodePool]]: + async def _load_system_call(self) -> list[CallPool]: """加载系统Call""" call_metadata = [] - node_metadata = [] # 检查合法性 for call_id in system_call.__all__: @@ -69,26 +65,16 @@ class CallLoader: type=CallType.SYSTEM, name=call_cls.name, description=call_cls.description, - path=f"apps.scheduler.call.{call_id}", + path=f"python::apps.scheduler.call::{call_id}", ), ) - node_metadata.append( - NodePool( - _id=call_id, - call_id=call_id, - name=call_cls.name, - description=call_cls.description, - ), - ) + return call_metadata - return call_metadata, node_metadata - - async def _load_single_call_dir(self, call_name: str) -> tuple[list[CallPool], list[NodePool]]: + async def _load_single_call_dir(self, call_name: str) -> list[CallPool]: """加载单个Call package""" call_metadata = [] - node_metadata = [] call_dir = Path(config["SEMANTICS_DIR"]) / CALL_DIR / call_name if not (call_dir / "__init__.py").exists(): @@ -135,25 +121,16 @@ class CallLoader: type=CallType.PYTHON, name=call_cls.name, description=call_cls.description, - path=f"call.{call_name}.{call_id}", - ), - ) - node_metadata.append( - NodePool( - _id=cls_hash, - call_id=cls_hash, - name=call_cls.name, - description=call_cls.description, + path=f"python::call.{call_name}::{call_id}", ), ) - return call_metadata, node_metadata + return call_metadata - async def _load_all_user_call(self) -> tuple[list[CallPool], list[NodePool]]: + async def _load_all_user_call(self) -> list[CallPool]: """加载Python Call""" call_dir = Path(config["SEMANTICS_DIR"]) / CALL_DIR call_metadata = [] - node_metadata = [] # 载入父包 try: @@ -172,46 +149,39 @@ class CallLoader: continue # 载入包 try: - call_metadata, node_metadata = await self._load_single_call_dir(call_file.name) - call_metadata.extend(call_metadata) - node_metadata.extend(node_metadata) - + call_metadata.extend(await self._load_single_call_dir(call_file.name)) except Exception as e: err = f"载入模块{call_file}失败:{e},跳过载入。" LOGGER.info(msg=err) continue - return call_metadata, node_metadata + return call_metadata # TODO: 动态卸载 # 更新数据库 - async def _update_db(self, call_metadata: list[CallPool], node_metadata: list[NodePool]) -> None: - """更新数据库;call和node下标一致""" + async def _update_db(self, call_metadata: list[CallPool]) -> None: + """更新数据库""" # 更新MongoDB call_collection = MongoDB.get_collection("call") - node_collection = MongoDB.get_collection("node") + call_descriptions = [] try: - for call, node in zip(call_metadata, node_metadata): + for call in call_metadata: await call_collection.update_one({"_id": call.id}, {"$set": call.model_dump(exclude_none=True, by_alias=True)}, upsert=True) - await node_collection.update_one({"_id": node.id}, {"$set": node.model_dump(exclude_none=True, by_alias=True)}, upsert=True) + call_descriptions += [call.description] except Exception as e: err = f"更新MongoDB失败:{e}" LOGGER.error(msg=err) raise RuntimeError(err) from e # 进行向量化,更新PostgreSQL - node_descriptions = [] - for node in node_metadata: - node_descriptions += [node.description] - session = await PostgreSQL.get_session() - node_vecs = await PostgreSQL.get_embedding(node_descriptions) - for i, data in enumerate(node_vecs): - insert_stmt = insert(NodePoolVector).values( - id=node_metadata[i].id, + call_vecs = await PostgreSQL.get_embedding(call_descriptions) + for i, data in enumerate(call_vecs): + insert_stmt = insert(CallPoolVector).values( + id=call_metadata[i].id, embedding=data, ).on_conflict_do_update( index_elements=["id"], @@ -225,41 +195,37 @@ class CallLoader: """初始化Call信息""" # 清空collection call_collection = MongoDB.get_collection("call") - node_collection = MongoDB.get_collection("node") try: await call_collection.delete_many({}) - await node_collection.delete_many({}) except Exception as e: - LOGGER.error(msg=f"Call和Node的collection清空失败:{e}") + LOGGER.error(msg=f"Call的collection清空失败:{e}") # 载入所有已知的Call信息 try: - sys_call_metadata, sys_node_metadata = await self._load_system_call() + sys_call_metadata = await self._load_system_call() except Exception as e: err = f"载入系统Call信息失败:{e};停止运行。" LOGGER.error(msg=err) raise RuntimeError(err) from e try: - user_call_metadata, user_node_metadata = await self._load_all_user_call() + user_call_metadata = await self._load_all_user_call() except Exception as e: err = f"载入用户Call信息失败:{e};只可使用基本功能。" LOGGER.error(msg=err) user_call_metadata = [] - user_node_metadata = [] # 合并Call元数据 call_metadata = sys_call_metadata + user_call_metadata - node_metadata = sys_node_metadata + user_node_metadata # 更新数据库 - await self._update_db(call_metadata, node_metadata) + await self._update_db(call_metadata) async def load_one(self, call_name: str) -> None: """加载单个Call""" try: - call_metadata, node_metadata = await self._load_single_call_dir(call_name) + call_metadata = await self._load_single_call_dir(call_name) except Exception as e: err = f"载入Call信息失败:{e}。" LOGGER.error(msg=err) @@ -267,18 +233,4 @@ class CallLoader: # 有数据时更新数据库 if call_metadata: - await self._update_db(call_metadata, node_metadata) - - - async def get(self) -> list[CallPool]: - """获取当前已知的所有Python Call元数据""" - call_collection = MongoDB.get_collection("call") - result: list[CallPool] = [] - try: - cursor = call_collection.find({}) - async for item in cursor: - result.extend([CallPool.model_validate(item)]) - except Exception as e: - LOGGER.error(msg=f"获取Call元数据失败:{e}") - - return result + await self._update_db(call_metadata) diff --git a/apps/scheduler/pool/loader/metadata.py b/apps/scheduler/pool/loader/metadata.py index 3b087f47bf78dd7952f59c81d1cf273dfc70f733..7998c2c344c531bde768b9ac587e9525b695ece9 100644 --- a/apps/scheduler/pool/loader/metadata.py +++ b/apps/scheduler/pool/loader/metadata.py @@ -8,7 +8,7 @@ import yaml from anyio import Path from apps.common.config import config -from apps.constants import LOGGER +from apps.constants import APP_DIR, LOGGER, SERVICE_DIR from apps.entities.enum_var import MetadataType from apps.entities.flow import ( AppMetadata, @@ -19,8 +19,7 @@ from apps.entities.flow import ( class MetadataLoader: """元数据加载器""" - @classmethod - async def load_one(cls, file_path: Path) -> Union[AppMetadata, ServiceMetadata]: + async def load_one(self, file_path: Path) -> Union[AppMetadata, ServiceMetadata]: """加载单个元数据""" # 检查yaml格式 try: @@ -31,27 +30,32 @@ class MetadataLoader: LOGGER.error(err) raise RuntimeError(err) from e - if metadata_type not in MetadataType: + # 尝试匹配格式 + if metadata_type == MetadataType.APP.value: + try: + app_id = file_path.parent.name + metadata = AppMetadata(_id=app_id, **metadata_dict) + except Exception as e: + err = f"App metadata.yaml格式错误: {e}" + LOGGER.error(err) + raise RuntimeError(err) from e + elif metadata_type == MetadataType.SERVICE.value: + try: + service_id = file_path.parent.name + metadata = ServiceMetadata(_id=service_id, **metadata_dict) + except Exception as e: + err = f"Service metadata.yaml格式错误: {e}" + LOGGER.error(err) + raise RuntimeError(err) from e + else: err = f"metadata.yaml类型错误: {metadata_type}" LOGGER.error(err) raise RuntimeError(err) - # 尝试匹配格式 - try: - if metadata_type == MetadataType.APP: - metadata = AppMetadata(**metadata_dict) - elif metadata_type == MetadataType.SERVICE: - metadata = ServiceMetadata(**metadata_dict) - except Exception as e: - err = f"metadata.yaml格式错误: {e}" - LOGGER.error(err) - raise RuntimeError(err) from e - return metadata - @classmethod - async def save_one(cls, metadata_type: MetadataType, metadata: dict[str, Any], resource_id: str) -> None: + async def save_one(self, metadata_type: MetadataType, metadata: dict[str, Any], resource_id: str) -> None: """保存单个元数据""" class_dict = { MetadataType.APP: AppMetadata, @@ -59,10 +63,10 @@ class MetadataLoader: } # 检查资源路径 - if metadata_type == MetadataType.APP: - resource_path = Path(config["SEMANTICS_DIR"]) / "app" / resource_id / "metadata.yaml" - elif metadata_type == MetadataType.SERVICE: - resource_path = Path(config["SEMANTICS_DIR"]) / "service" / resource_id / "metadata.yaml" + if metadata_type == MetadataType.APP.value: + resource_path = Path(config["SEMANTICS_DIR"]) / APP_DIR / resource_id / "metadata.yaml" + elif metadata_type == MetadataType.SERVICE.value: + resource_path = Path(config["SEMANTICS_DIR"]) / SERVICE_DIR / resource_id / "metadata.yaml" # 保存元数据 try: diff --git a/apps/scheduler/pool/loader/openapi.py b/apps/scheduler/pool/loader/openapi.py index 9f72f701127f0024d9e7714d48598184b614a697..7b4c262eca3ac174d08f10fa0aed8529b58842a8 100644 --- a/apps/scheduler/pool/loader/openapi.py +++ b/apps/scheduler/pool/loader/openapi.py @@ -3,26 +3,30 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ import uuid +from typing import Any +import ray import yaml from anyio import Path from apps.constants import LOGGER +from apps.entities.enum_var import ContentType, HTTPMethod from apps.entities.flow import ServiceMetadata from apps.entities.pool import NodePool from apps.scheduler.openapi import ( + ReducedOpenAPIEndpoint, ReducedOpenAPISpec, reduce_openapi_spec, ) +@ray.remote class OpenAPILoader: """OpenAPI文档载入器""" - @classmethod - async def _read_yaml(cls, yaml_path: Path) -> ReducedOpenAPISpec: + async def _read_yaml(self, yaml_path: Path) -> ReducedOpenAPISpec: """从本地磁盘加载OpenAPI文档""" - if not yaml_path.exists(): + if not await yaml_path.exists(): msg = f"File not found: {yaml_path}" raise FileNotFoundError(msg) @@ -33,52 +37,131 @@ class OpenAPILoader: return reduce_openapi_spec(spec) - @classmethod - def _process_spec(cls, service_id: str, spec: ReducedOpenAPISpec, service_metadata: ServiceMetadata) -> list[NodePool]: + def parameters_to_spec(self, raw_schema: list[dict[str, Any]]) -> dict[str, Any]: + """将OpenAPI中GET接口List形式的请求体Spec转换为JSON Schema""" + schema = { + "type": "object", + "required": [], + "properties": {}, + } + for item in raw_schema: + if item["required"]: + schema["required"].append(item["name"]) + schema["properties"][item["name"]] = {} + schema["properties"][item["name"]]["description"] = item["description"] + for key, val in item["schema"].items(): + schema["properties"][item["name"]][key] = val + return schema + + + async def _get_api_data(self, spec: ReducedOpenAPIEndpoint, service_metadata: ServiceMetadata) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]: + """从OpenAPI文档中获取API数据""" + try: + method = HTTPMethod[spec.method.upper()] + except KeyError as e: + err = f"HTTP方法{spec.method}不支持。" + LOGGER.error(msg=err) + raise RuntimeError(err) from e + + url = service_metadata.api.server.rstrip("/") + spec.uri + + if method in (HTTPMethod.POST, HTTPMethod.PUT, HTTPMethod.PATCH): + body_spec = spec.spec["requestBody"]["content"] + + # 从body_spec中找到第一个支持的content type + content_type = next( + (ct for ct in ContentType if ct.value in body_spec), + None, + ) + + if content_type is None: + err = f"接口{spec.name}的Content-Type不支持" + LOGGER.error(msg=err) + raise RuntimeError(err) + + input_schema = { + "type": "object", + "properties": { + "url_parameters": { + "type": "object", + "properties": {}, + "required": [], + }, + "post_body": { + "type": "object", + "properties": {}, + "required": [], + }, + }, + "required": ["url_parameters", "post_body"], + } + + try: + input_schema["properties"]["url_parameters"] = self.parameters_to_spec(spec.spec["parameters"]) + except KeyError: + err = f"接口{spec.name}不存在URL参数定义" + LOGGER.error(msg=err) + + try: + input_schema["properties"]["post_body"] = spec.spec["requestBody"]["content"][content_type]["schema"] + except KeyError: + err = f"接口{spec.name}不存在请求体定义" + LOGGER.error(msg=err) + + try: + output_schema = spec.spec["responses"]["200"]["content"]["application/json"]["schema"] + except KeyError: + err = f"接口{spec.name}不存在响应体定义" + LOGGER.error(msg=err) + output_schema = { + "type": "object", + "properties": {}, + "required": [], + } + + known_params = { + "url": url, + "method": method, + "content_type": content_type, + } + + return input_schema, output_schema, known_params + + + async def _process_spec(self, service_id: str, yaml_filename: str, spec: ReducedOpenAPISpec, service_metadata: ServiceMetadata) -> list[NodePool]: """将OpenAPI文档拆解为Node""" nodes = [] for api_endpoint in spec.endpoints: - # 判断用户是否手动设置了ID - node_id = api_endpoint.id if api_endpoint.id else str(uuid.uuid4()) - # 组装新的NodePool item node = NodePool( - _id=node_id, + _id=str(uuid.uuid4()), name=api_endpoint.name, - # 此处固定Call的ID是“API” + # 此处固定Call的ID是"API" call_id="API", description=api_endpoint.description, service_id=service_id, - path="", + annotation=f"openapi::{yaml_filename}", ) # 合并参数 - node.known_params = { - "method": api_endpoint.method, - "full_url": service_metadata.api.server + api_endpoint.uri, - } - + node.override_input, node.override_output, node.known_params = await self._get_api_data(api_endpoint, service_metadata) nodes.append(node) - return nodes - @classmethod - async def load_one(cls, yaml_folder: Path, service_metadata: ServiceMetadata) -> list[NodePool]: + async def load_one(self, service_id: str, yaml_path: Path, service_metadata: ServiceMetadata) -> list[NodePool]: """加载单个OpenAPI文档,可以直接指定路径""" - async for yaml_path in yaml_folder.rglob("*.yaml"): - try: - spec = await cls._read_yaml(yaml_path) - except Exception as e: - err = f"加载OpenAPI文档{yaml_path}失败:{e}" - LOGGER.error(msg=err) - continue + try: + spec = await self._read_yaml(yaml_path) + except Exception as e: + err = f"加载OpenAPI文档{yaml_path}失败:{e}" + LOGGER.error(msg=err) + raise RuntimeError(err) from e - service_id = yaml_folder.parent.name - return cls._process_spec(service_id, spec, service_metadata) + yaml_filename = yaml_path.name + return await self._process_spec(service_id, yaml_filename, spec, service_metadata) - @classmethod - async def save_one(cls, nodes: list[NodePool]) -> None: + async def save_one(self, nodes: list[NodePool]) -> None: """保存单个OpenAPI文档""" pass diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index 4ad9182a57c404ac0b898f37f3dfff09d58979bf..b53fad13afb02a84722e34cf17cab0b2398e64a0 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -2,9 +2,10 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from typing import Any +import asyncio from anyio import Path +from sqlalchemy.dialects.postgresql import insert from apps.common.config import config from apps.constants import LOGGER @@ -23,32 +24,51 @@ class ServiceLoader: _collection = MongoDB.get_collection("service") - @classmethod - async def load_one(cls, service_id: str) -> None: + async def load(self, service_id: str) -> None: """加载单个Service""" service_path = Path(config["SEMANTICS_DIR"]) / "service" / service_id # 载入元数据 - metadata = await MetadataLoader.load(service_path / "metadata.yaml") + metadata = await MetadataLoader().load_one(service_path / "metadata.yaml") if not isinstance(metadata, ServiceMetadata): err = f"元数据类型错误: {service_path / 'metadata.yaml'}" LOGGER.error(err) raise TypeError(err) # 载入OpenAPI文档,获取Node列表 - nodes = await OpenAPILoader.load_one(service_path / "openapi", metadata) - + openapi_loader = OpenAPILoader.remote() + nodes = [openapi_loader.load_one.remote(service_id, yaml_path, metadata) # type: ignore[arg-type] + async for yaml_path in (service_path / "openapi").rglob("*.yaml")] + nodes = (await asyncio.gather(*nodes))[0] + # 更新数据库 + await self._update_db(nodes, metadata) @classmethod async def _update_db(cls, nodes: list[NodePool], metadata: ServiceMetadata) -> None: """更新数据库""" - # 向量化所有数据 + # 更新MongoDB + service_collection = MongoDB.get_collection("service") + node_collection = MongoDB.get_collection("node") + try: + await service_collection.update_one({"_id": metadata.id}, {"$set": metadata.model_dump(exclude_none=True, by_alias=True)}, upsert=True) + for node in nodes: + await node_collection.update_one({"_id": node.id}, {"$set": node.model_dump(exclude_none=True, by_alias=True)}, upsert=True) + except Exception as e: + err = f"更新MongoDB失败:{e}" + LOGGER.error(err) + raise RuntimeError(err) from e + + # 向量化所有数据并保存 session = await PostgreSQL.get_session() - service_vec = ServicePoolVector( - _id=metadata.id, - embedding=PostgreSQL.get_embedding([metadata.description]), + service_embedding = await PostgreSQL.get_embedding([metadata.description]) + insert_stmt = insert(ServicePoolVector).values( + id=metadata.id, + embedding=service_embedding[0], + ).on_conflict_do_update( + index_elements=["id"], + set_={"embedding": service_embedding[0]}, ) - session.add(service_vec) + await session.execute(insert_stmt) node_descriptions = [] for node in nodes: @@ -56,22 +76,23 @@ class ServiceLoader: node_vecs = await PostgreSQL.get_embedding(node_descriptions) for i, data in enumerate(node_vecs): - node_vec = NodePoolVector( + insert_stmt = insert(NodePoolVector).values( id=nodes[i].id, embedding=data, + ).on_conflict_do_update( + index_elements=["id"], + set_={"embedding": data}, ) - session.add(node_vec) + await session.execute(insert_stmt) await session.commit() - @staticmethod - async def save(cls) -> dict[str, Any]: - """加载所有Service""" + async def save(self, service_id: str, metadata: ServiceMetadata) -> None: + """在文件系统上保存Service""" pass - @staticmethod - async def init() -> None: - """在初始化时加载所有Service""" + async def delete(self, service_id: str) -> None: + """删除Service""" pass diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py index db79c9db991d5434a697a88e60e2bb8c6f119914..ab5efa04dea5722216c518bae30a7d194faa7a13 100644 --- a/apps/scheduler/pool/pool.py +++ b/apps/scheduler/pool/pool.py @@ -7,7 +7,7 @@ import ray from apps.entities.enum_var import MetadataType from apps.entities.flow_topology import FlowItem from apps.scheduler.pool.check import FileChecker -from apps.scheduler.pool.loader import CallLoader +from apps.scheduler.pool.loader import CallLoader, ServiceLoader @ray.remote @@ -22,6 +22,14 @@ class Pool: # 加载Services checker = FileChecker() changed_service, deleted_service = await checker.diff(MetadataType.SERVICE) + service_loader = ServiceLoader() + for service in changed_service: + # 重载变化的Service + await service_loader.load(service) + for service in deleted_service: + # 删除消失的Service + await service_loader.delete(service) + # 加载App changed_app, deleted_app = await checker.diff(MetadataType.APP) diff --git a/deploy/chart/euler_copilot/configs/framework/.env b/deploy/chart/euler_copilot/configs/framework/.env index bf15c85bad099fc1590ac2e40c777bb66a9e2135..46edca6e1700b258757a63dd7fa1912d3525bc9a 100644 --- a/deploy/chart/euler_copilot/configs/framework/.env +++ b/deploy/chart/euler_copilot/configs/framework/.env @@ -86,5 +86,5 @@ SCHEDULER_API_KEY={{ default .Values.models.answer.key .Values.models.functionca SCHEDULER_MAX_TOKENS={{default .Values.models.answer.max_tokens .Values.models.functioncall.max_tokens }} # Agent -PLUGIN_DIR=/euler-copilot-frame/apps/plugin +SEMANTICS_DIR=/euler-copilot-frame/apps/semantics SQL_URL= diff --git a/deploy/chart/euler_copilot/templates/framework/framework.yaml b/deploy/chart/euler_copilot/templates/framework/framework.yaml index 53d10fb1b5b9d8e6f0bfe6648c8e2410f0866dae..d93266e9d3e32a222f7c687af9879fd27ed7cb17 100644 --- a/deploy/chart/euler_copilot/templates/framework/framework.yaml +++ b/deploy/chart/euler_copilot/templates/framework/framework.yaml @@ -67,7 +67,7 @@ spec: name: framework-shared - mountPath: /tmp name: framework-tmp-volume - - mountPath: /euler-copilot-frame/apps/plugin + - mountPath: /euler-copilot-frame/apps/semantics name: framework-semantics-vl securityContext: readOnlyRootFilesystem: {{ default false .Values.euler_copilot.framework.readOnly }} diff --git a/docs/sig-intelligence-operations/responsibility-plot.md b/docs/sig-intelligence-operations/responsibility-plot.md index e0f5878fe8607419a7adffe53bcce05aa12ca88e..7ac15b3611136795229c42ffdb9390b65aadfef0 100644 --- a/docs/sig-intelligence-operations/responsibility-plot.md +++ b/docs/sig-intelligence-operations/responsibility-plot.md @@ -4,7 +4,6 @@ |-------------------------------------------|------------------------|----------------------------| | openeuler/euler-copilot-framework | EulerCopilot 框架 | 何守成[@fromhsc](https://gitee.com/fromhsc)、王铮[@zhengw99](https://gitee.com/zhengw99) | | openeuler/euler-copilot-rag | EulerCopilot RAG 服务 | 赵家麒[@zxstty](https://gitee.com/zxstty) | -| openeuler/euler-copilot-vectorize-agent | EulerCopilot 数据向量化微服务 | | | openeuler/euler-copilot-shell | EulerCopilot Shell 客户端 | 史鸿宇[@hongyu-shi](https://gitee.com/hongyu-shi) | | openeuler/euler-copilot-web | EulerCopilot Web 前端 | | | src-openeuler/metagpt | MetaGPT | 刘峯[@liu-feng-D](https://gitee.com/liu-feng-D) | diff --git "a/docs/user-guide/\351\203\250\347\275\262\346\214\207\345\215\227/\346\217\222\344\273\266\351\203\250\347\275\262\346\214\207\345\215\227/\346\231\272\350\203\275\350\260\203\344\274\230/\346\217\222\344\273\266\342\200\224\346\231\272\350\203\275\350\260\203\344\274\230\351\203\250\347\275\262\346\214\207\345\215\227.md" "b/docs/user-guide/\351\203\250\347\275\262\346\214\207\345\215\227/\346\217\222\344\273\266\351\203\250\347\275\262\346\214\207\345\215\227/\346\231\272\350\203\275\350\260\203\344\274\230/\346\217\222\344\273\266\342\200\224\346\231\272\350\203\275\350\260\203\344\274\230\351\203\250\347\275\262\346\214\207\345\215\227.md" index 50a589da381c58012ae700031d7165301faa7361..667f04f2b6b47b665ffdd1f62b8f9f3a1b7bce35 100644 --- "a/docs/user-guide/\351\203\250\347\275\262\346\214\207\345\215\227/\346\217\222\344\273\266\351\203\250\347\275\262\346\214\207\345\215\227/\346\231\272\350\203\275\350\260\203\344\274\230/\346\217\222\344\273\266\342\200\224\346\231\272\350\203\275\350\260\203\344\274\230\351\203\250\347\275\262\346\214\207\345\215\227.md" +++ "b/docs/user-guide/\351\203\250\347\275\262\346\214\207\345\215\227/\346\217\222\344\273\266\351\203\250\347\275\262\346\214\207\345\215\227/\346\231\272\350\203\275\350\260\203\344\274\230/\346\217\222\344\273\266\342\200\224\346\231\272\350\203\275\350\260\203\344\274\230\351\203\250\347\275\262\346\214\207\345\215\227.md" @@ -104,7 +104,6 @@ kubectl delete pod framework-deploy-service-bb5b58678-jxzqr -n eulercopilot mysql-deploy-databases-57f5f94ccf-sbhzp 2/2 Running 0 17d framework-deploy-service-bb5b58678-jxzqr 2/2 Running 0 16d rag-deploy-service-5b7887644c-sm58z 2/2 Running 0 110m - vectorize-deploy-service-57f5f94ccf-sbhzp 2/2 Running 0 17d web-deploy-service-74fbf7999f-r46rg 1/1 Running 0 2d tune-deploy-agents-5d46bfdbd4-xph7b 1/1 Running 0 2d ``` diff --git "a/docs/user-guide/\351\203\250\347\275\262\346\214\207\345\215\227/\346\227\240\347\275\221\347\273\234\347\216\257\345\242\203\344\270\213\351\203\250\347\275\262\346\214\207\345\215\227.md" "b/docs/user-guide/\351\203\250\347\275\262\346\214\207\345\215\227/\346\227\240\347\275\221\347\273\234\347\216\257\345\242\203\344\270\213\351\203\250\347\275\262\346\214\207\345\215\227.md" index e4135d9c7a3afd487a74553ffb65ec73417f75a2..2a58109e68add15bc13073f9ff0cf50a1a517114 100644 --- "a/docs/user-guide/\351\203\250\347\275\262\346\214\207\345\215\227/\346\227\240\347\275\221\347\273\234\347\216\257\345\242\203\344\270\213\351\203\250\347\275\262\346\214\207\345\215\227.md" +++ "b/docs/user-guide/\351\203\250\347\275\262\346\214\207\345\215\227/\346\227\240\347\275\221\347\273\234\347\216\257\345\242\203\344\270\213\351\203\250\347\275\262\346\214\207\345\215\227.md" @@ -11,7 +11,6 @@ openEuler Copilot System 是一款智能问答工具,使用 openEuler Copilot | euler-copilot-framework | 8002 (内部端口) | 智能体框架服务 | | euler-copilot-web | 8080 | 智能体前端界面 | | euler-copilot-rag | 9988 (内部端口) | 检索增强服务 | -| euler-copilot-vectorize-agent | 8001 (内部端口) | 文本向量化服务 | | mysql | 3306 (内部端口) | MySQL数据库 | | redis | 6379 (内部端口) | Redis数据库 | | postgres | 5432 (内部端口) | 向量数据库 | @@ -118,7 +117,6 @@ openEuler Copilot System 是一款智能问答工具,使用 openEuler Copilot ```bash # X86架构镜像地址如下所示,ARM架构修改tag为arm hub.oepkgs.net/neocopilot/euler-copilot-framework:0.9.3-x86 - hub.oepkgs.net/neocopilot/euler-copilot-vectorize-agent:0.9.3-x86 hub.oepkgs.net/neocopilot/euler-copilot-web:0.9.3-x86 hub.oepkgs.net/neocopilot/data_chain_back_end:0.9.3-x86 hub.oepkgs.net/neocopilot/data_chain_web:0.9.3-x86 @@ -263,7 +261,6 @@ openEuler Copilot System 是一款智能问答工具,使用 openEuler Copilot ├── templates │   ├── NOTES.txt │   ├── rag - │   ├── vectorize │   └── web └── values.yaml ``` @@ -443,7 +440,6 @@ openEuler Copilot System 是一款智能问答工具,使用 openEuler Copilot mysql-deploy-databases-57f5f94ccf-sbhzp 2/2 Running 0 17d framework-deploy-service-bb5b58678-jxzqr 2/2 Running 0 16d rag-deploy-service-5b7887644c-sm58z 2/2 Running 0 110m - vectorize-deploy-service-57f5f94ccf-sbhzp 2/2 Running 0 17d web-deploy-service-74fbf7999f-r46rg 1/1 Running 0 2d ``` diff --git "a/docs/user-guide/\351\203\250\347\275\262\346\214\207\345\215\227/\347\275\221\347\273\234\347\216\257\345\242\203\344\270\213\351\203\250\347\275\262\346\214\207\345\215\227.md" "b/docs/user-guide/\351\203\250\347\275\262\346\214\207\345\215\227/\347\275\221\347\273\234\347\216\257\345\242\203\344\270\213\351\203\250\347\275\262\346\214\207\345\215\227.md" index 8bf2dfd42d4fe63b512385a60359af6ccfa0005a..370c478fb10d74598c88ddcc80813478bd968b54 100644 --- "a/docs/user-guide/\351\203\250\347\275\262\346\214\207\345\215\227/\347\275\221\347\273\234\347\216\257\345\242\203\344\270\213\351\203\250\347\275\262\346\214\207\345\215\227.md" +++ "b/docs/user-guide/\351\203\250\347\275\262\346\214\207\345\215\227/\347\275\221\347\273\234\347\216\257\345\242\203\344\270\213\351\203\250\347\275\262\346\214\207\345\215\227.md" @@ -11,7 +11,6 @@ openEuler Copilot System 是一款智能问答工具,使用 openEuler Copilot | euler-copilot-framework | 8002 (内部端口) | 智能体框架服务 | | euler-copilot-web | 8080 | 智能体前端界面 | | euler-copilot-rag | 9988 (内部端口) | 检索增强服务 | -| euler-copilot-vectorize-agent | 8001 (内部端口) | 文本向量化服务 | | mysql | 3306 (内部端口) | MySQL数据库 | | redis | 6379 (内部端口) | Redis数据库 | | postgres | 5432 (内部端口) | 向量数据库 | @@ -133,7 +132,6 @@ cd /home/euler-copilot-framework/deploy/scripts && tree ├── templates │   ├── NOTES.txt │   ├── rag - │   ├── vectorize │   └── web └── values.yaml ``` @@ -306,15 +304,14 @@ cd /home/euler-copilot-framework/deploy/scripts && tree ```bash NAME READY STATUS RESTARTS AGE - authhub-backend-deploy-authhub-64896f5cdc-m497f 2/2 Running 0 16d - authhub-web-deploy-authhub-7c48695966-h8d2p 1/1 Running 0 17d - pgsql-deploy-databases-86b4dc4899-ppltc 1/1 Running 0 17d - redis-deploy-databases-f8866b56-kj9jz 1/1 Running 0 17d - mysql-deploy-databases-57f5f94ccf-sbhzp 2/2 Running 0 17d - framework-deploy-service-bb5b58678-jxzqr 2/2 Running 0 16d - rag-deploy-service-5b7887644c-sm58z 2/2 Running 0 110m - vectorize-deploy-service-57f5f94ccf-sbhzp 2/2 Running 0 17d - web-deploy-service-74fbf7999f-r46rg 1/1 Running 0 2d + authhub-backend-deploy-authhub-64896f5cdc-m497f 2/2 Running 0 10m + authhub-web-deploy-authhub-7c48695966-h8d2p 1/1 Running 0 10m + pgsql-deploy-databases-86b4dc4899-ppltc 1/1 Running 0 10m + redis-deploy-databases-f8866b56-kj9jz 1/1 Running 0 10m + mysql-deploy-databases-57f5f94ccf-sbhzp 2/2 Running 0 10m + framework-deploy-service-bb5b58678-jxzqr 2/2 Running 0 8m + rag-deploy-service-5b7887644c-sm58z 2/2 Running 0 10m + web-deploy-service-74fbf7999f-r46rg 1/1 Running 0 10m ``` **故障排查:** diff --git a/sample/apps/test_app/flows/test.yaml b/sample/app/test_app/flows/test.yaml similarity index 100% rename from sample/apps/test_app/flows/test.yaml rename to sample/app/test_app/flows/test.yaml diff --git a/sample/apps/test_app/icon.ico b/sample/app/test_app/icon.ico similarity index 100% rename from sample/apps/test_app/icon.ico rename to sample/app/test_app/icon.ico diff --git a/sample/apps/test_app/metadata.yaml b/sample/app/test_app/metadata.yaml similarity index 96% rename from sample/apps/test_app/metadata.yaml rename to sample/app/test_app/metadata.yaml index cc717e78002f194a0cb866fd4af8912977964737..1f125e64bf1643c9b567eda2741aec2c0305a839 100644 --- a/sample/apps/test_app/metadata.yaml +++ b/sample/app/test_app/metadata.yaml @@ -1,8 +1,5 @@ # 元数据种类 type: app - -# 应用的ID -id: test_app # 应用的名称(展示用) name: 测试应用 # 应用的描述(展示用,需少于150字) diff --git a/sample/calls/__init__.py b/sample/call/__init__.py similarity index 100% rename from sample/calls/__init__.py rename to sample/call/__init__.py diff --git a/sample/calls/test_call/__init__.py b/sample/call/test_call/__init__.py similarity index 100% rename from sample/calls/test_call/__init__.py rename to sample/call/test_call/__init__.py diff --git a/sample/calls/test_call/sub_lib/__init__.py b/sample/call/test_call/sub_lib/__init__.py similarity index 100% rename from sample/calls/test_call/sub_lib/__init__.py rename to sample/call/test_call/sub_lib/__init__.py diff --git a/sample/calls/test_call/sub_lib/add.py b/sample/call/test_call/sub_lib/add.py similarity index 100% rename from sample/calls/test_call/sub_lib/add.py rename to sample/call/test_call/sub_lib/add.py diff --git a/sample/calls/test_call/user_tool.py b/sample/call/test_call/user_tool.py similarity index 100% rename from sample/calls/test_call/user_tool.py rename to sample/call/test_call/user_tool.py diff --git a/sample/services/test_service/metadata.yaml b/sample/service/test_service/metadata.yaml similarity index 96% rename from sample/services/test_service/metadata.yaml rename to sample/service/test_service/metadata.yaml index 8d13eaf892fa121751bb45312ac8e410531bae1f..38e4e6192c4f4b19bdceb5cf2ab730c755200bb4 100644 --- a/sample/services/test_service/metadata.yaml +++ b/sample/service/test_service/metadata.yaml @@ -1,8 +1,5 @@ # 元数据种类 type: service - -# 服务的ID -id: test_service # 服务的名称(展示用) name: 测试服务 # 服务的描述(展示用) diff --git a/sample/services/test_service/openapi/api.yaml b/sample/service/test_service/openapi/api.yaml similarity index 100% rename from sample/services/test_service/openapi/api.yaml rename to sample/service/test_service/openapi/api.yaml