diff --git a/apps/entities/flow.py b/apps/entities/flow.py index bafe3664f5156fe533135b8701f8117e84df7a29..accf440ab0e4b05584fe7c96d89eba2d7817e3f0 100644 --- a/apps/entities/flow.py +++ b/apps/entities/flow.py @@ -58,6 +58,7 @@ class Flow(BaseModel): edges: list[Edge] = Field(description="边列表", default=[]) debug: bool = Field(description="是否经过调试", default=False) + class MetadataBase(BaseModel): """Service或App的元数据""" @@ -67,6 +68,7 @@ class MetadataBase(BaseModel): description: str = Field(description="元数据描述") version: str = Field(description="元数据版本") author: str = Field(description="创建者的用户名") + hashes: Optional[dict[str, str]] = Field(description="资源(App、Service等)下所有文件的hash值", default=None) class ServiceApiAuthOidc(BaseModel): diff --git a/apps/scheduler/executor/message.py b/apps/scheduler/executor/message.py index 2cea3e59e4081749f99d8144391d6b0020dc81d2..1d63144efb1f4c196fbb15b4047f6dd559ffe7d4 100644 --- a/apps/scheduler/executor/message.py +++ b/apps/scheduler/executor/message.py @@ -17,14 +17,6 @@ from apps.llm.patterns.executor import ExecutorResult from apps.manager.task import TaskManager -async def _calculate_step_order(flow: Flow, step_name: str) -> str: - """计算步骤序号""" - for i, step in enumerate(flow.steps.keys()): - if step == step_name: - return f"{i + 1}/{len(flow.steps)}" - return f"{len(flow.steps) + 1}/{len(flow.steps)}" - - async def push_step_input(task_id: str, queue: MessageQueue, state: ExecutorState, flow: Flow) -> None: """推送步骤输入""" # 获取Task @@ -40,9 +32,7 @@ async def push_step_input(task_id: str, queue: MessageQueue, state: ExecutorStat flow_history = FlowHistory( task_id=task_id, flow_id=state.name, - plugin_id=state.plugin_id, step_id=state.step_id, - step_order=await _calculate_step_order(flow, state.step_id), status=state.status, input_data=state.slot_data, output_data={}, diff --git a/apps/scheduler/openapi.py b/apps/scheduler/openapi.py index af0fe621c1187daa4bf61f033e6628c2a5118e57..ec8c4ed2dc542ebd3fb095c4164b4b8d157ad861 100644 --- a/apps/scheduler/openapi.py +++ b/apps/scheduler/openapi.py @@ -13,7 +13,6 @@ from pydantic import BaseModel, Field class ReducedOpenAPIEndpoint(BaseModel): """精简后的OpenAPI文档中的单个API""" - id: Optional[str] = Field(default=None, description="API的Operation ID") uri: str = Field(..., description="API的URI") method: str = Field(..., description="API的请求方法") name: str = Field(..., description="API的自定义名称") @@ -48,11 +47,11 @@ def _retrieve_ref(path: str, schema: dict) -> dict: def _dereference_refs_helper( - obj: Any, # noqa: ANN401 + obj: Any, full_schema: dict[str, Any], skip_keys: Sequence[str], processed_refs: Optional[set[str]] = None, -) -> Any: # noqa: ANN401 +) -> Any: """递归地将OpenAPI中的$ref替换为实际的schema""" if processed_refs is None: processed_refs = set() @@ -154,7 +153,6 @@ def reduce_openapi_spec(spec: dict) -> ReducedOpenAPISpec: msg = f'Endpoint error at "{operation_name.upper()} {route}": missing {", ".join(missing_fields)}.' raise ValueError(msg) endpoint = ReducedOpenAPIEndpoint( - id=docs.get("operationId", None), uri=route, method=operation_name, name=name, diff --git a/apps/scheduler/pool/check.py b/apps/scheduler/pool/check.py index 54bd1fce9bab99f4f75c50cb63600be4a42394dc..5175b2049a9b8036914f9ed4e4f8081ffa5f4d1f 100644 --- a/apps/scheduler/pool/check.py +++ b/apps/scheduler/pool/check.py @@ -4,6 +4,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ from hashlib import sha256 from pathlib import Path +from typing import Optional from apps.common.config import config from apps.constants import APP_DIR, LOGGER, SERVICE_DIR @@ -16,12 +17,13 @@ class FileChecker: def __init__(self) -> None: """初始化文件检查器""" - self._hashes = {} + self.hashes = {} self._dir_path = Path(config["SEMANTICS_DIR"]) - def check_one(self, path: Path) -> None: + def check_one(self, path: Path) -> dict[str, str]: """检查单个App/Service文件是否有变动""" + hashes = {} if not path.exists(): err = FileNotFoundError(f"File {path} not found") raise err @@ -31,16 +33,20 @@ class FileChecker: for file in path.iterdir(): if file.is_file(): - self._hashes[str(file.relative_to(self._dir_path))] = sha256(file.read_bytes()).hexdigest() + relative_path = file.relative_to(self._resource_path) + hashes[relative_path.as_posix()] = sha256(file.read_bytes()).hexdigest() elif file.is_dir(): - self.check_one(file) + hashes.update(self.check_one(file)) + return hashes - def diff_one(self, path: Path, previous_hashes: dict[str, str]) -> bool: + + def diff_one(self, path: Path, previous_hashes: Optional[dict[str, str]] = None) -> bool: """检查文件是否发生变化""" - self._hashes = {} - self.check_one(path) - return self._hashes != previous_hashes + self._resource_path = path + path_diff = self._resource_path.relative_to(config["SEMANTICS_DIR"]) + self.hashes[path_diff.as_posix()] = self.check_one(path) + return self.hashes[path_diff.as_posix()] != previous_hashes async def diff(self, check_type: MetadataType) -> tuple[list[str], list[str]]: @@ -70,13 +76,15 @@ class FileChecker: deleted_list.append(list_item["_id"]) continue # 判断是否发生变化 - if "hashes" not in list_item or self.diff_one(self._dir_path / list_item["_id"], list_item["hashes"]): + if self.diff_one(self._dir_path / list_item["_id"], list_item.get("hashes", None)): changed_list.append(list_item["_id"]) # 遍历目录 for service_folder in self._dir_path.iterdir(): # 判断是否新增? - if service_folder.name not in items: + if (service_folder.name not in items and + service_folder.name not in deleted_list and + service_folder.name not in changed_list): changed_list += [service_folder.name] return changed_list, deleted_list diff --git a/apps/scheduler/pool/loader/call.py b/apps/scheduler/pool/loader/call.py index eba0a750eabc1357de0b6bd4cd3ccf53d29c1e10..520cd3006e8ddee654868369e9acf446a5f7cbfd 100644 --- a/apps/scheduler/pool/loader/call.py +++ b/apps/scheduler/pool/loader/call.py @@ -159,10 +159,18 @@ class CallLoader: # TODO: 动态卸载 + async def _delete_one(self, call_name: str) -> None: + """删除单个Call""" + pass + + + async def _delete_from_db(self, call_name: str) -> None: + """从数据库中删除单个Call""" + pass # 更新数据库 - async def _update_db(self, call_metadata: list[CallPool]) -> None: + async def _add_to_db(self, call_metadata: list[CallPool]) -> None: """更新数据库""" # 更新MongoDB call_collection = MongoDB.get_collection("call") @@ -219,7 +227,7 @@ class CallLoader: call_metadata = sys_call_metadata + user_call_metadata # 更新数据库 - await self._update_db(call_metadata) + await self._add_to_db(call_metadata) async def load_one(self, call_name: str) -> None: @@ -233,4 +241,4 @@ class CallLoader: # 有数据时更新数据库 if call_metadata: - await self._update_db(call_metadata) + await self._add_to_db(call_metadata) diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 703a99b292308fb7c2b23284807b1331e91b314f..8f7e1ea026cc3e435cd31224a149ce7991ad2db6 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -2,9 +2,8 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from pathlib import Path - import yaml +from anyio import Path from apps.common.config import config from apps.entities.flow import Flow @@ -14,21 +13,15 @@ class FlowLoader: """工作流加载器""" @classmethod - def load(cls, app_id: str, flow_id: str) -> Flow: + async def load(cls, app_id: str, flow_id: str) -> Flow: """从文件系统中加载【单个】工作流""" - flow_path = Path(config["SERVICE_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" - - with flow_path.open(encoding="utf-8") as f: - flow_yaml = yaml.safe_load(f) + flow_path = Path(config["SEMANTICS_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" - if "name" not in flow_yaml: - err = f"工作流名称不能为空:{flow_path!s}" - raise ValueError(err) - - if "::" in flow_yaml["id"]: - err = f"工作流名称包含非法字符:{flow_path!s}" - raise ValueError(err) + f = await flow_path.open(encoding="utf-8") + flow_yaml = yaml.safe_load(await f.read()) + await f.aclose() + flow_yaml["id"] = flow_id try: # 检查Flow格式,并转换为Flow对象 flow = Flow.model_validate(flow_yaml) @@ -40,6 +33,10 @@ class FlowLoader: @classmethod - def save(cls, app_id: str, flow_id: str, flow: Flow) -> None: + async def save(cls, app_id: str, flow_id: str, flow: Flow) -> None: """保存工作流""" - pass + file = Path(config["SEMANTICS_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" + + file_handler = await file.open(mode="w", encoding="utf-8") + yaml.dump(flow.model_dump(), file_handler) + await file_handler.aclose() diff --git a/apps/scheduler/pool/loader/metadata.py b/apps/scheduler/pool/loader/metadata.py index 7998c2c344c531bde768b9ac587e9525b695ece9..bfc4b5992adf706e090de63359a8d1efa0c8e236 100644 --- a/apps/scheduler/pool/loader/metadata.py +++ b/apps/scheduler/pool/loader/metadata.py @@ -55,7 +55,7 @@ class MetadataLoader: return metadata - async def save_one(self, metadata_type: MetadataType, metadata: dict[str, Any], resource_id: str) -> None: + async def save_one(self, metadata_type: MetadataType, metadata: Union[dict[str, Any], AppMetadata, ServiceMetadata], resource_id: str) -> None: """保存单个元数据""" class_dict = { MetadataType.APP: AppMetadata, @@ -69,13 +69,17 @@ class MetadataLoader: resource_path = Path(config["SEMANTICS_DIR"]) / SERVICE_DIR / resource_id / "metadata.yaml" # 保存元数据 - try: - metadata_class: type[Union[AppMetadata, ServiceMetadata]] = class_dict[metadata_type] - data = metadata_class(**metadata) - except Exception as e: - err = f"metadata.yaml格式错误: {e}" - LOGGER.error(err) - raise RuntimeError(err) from e + if isinstance(metadata, dict): + try: + # 检查类型匹配 + metadata_class: type[Union[AppMetadata, ServiceMetadata]] = class_dict[metadata_type] + data = metadata_class(**metadata) + except Exception as e: + err = f"metadata.yaml格式错误: {e}" + LOGGER.error(err) + raise RuntimeError(err) from e + else: + data = metadata - yaml_data = data.model_dump(by_alias=True, exclude_none=True) - await resource_path.write_text(yaml.safe_dump(yaml_data)) + yaml_data = yaml.safe_dump(data.model_dump(by_alias=True, exclude_none=True)) + await resource_path.write_text(yaml_data) diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index b53fad13afb02a84722e34cf17cab0b2398e64a0..dbde4e4c9e92ca1491aaf15d3dc65eda00584bbc 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -24,7 +24,7 @@ class ServiceLoader: _collection = MongoDB.get_collection("service") - async def load(self, service_id: str) -> None: + async def load(self, service_id: str, hashes: dict[str, str]) -> None: """加载单个Service""" service_path = Path(config["SEMANTICS_DIR"]) / "service" / service_id # 载入元数据 @@ -33,6 +33,7 @@ class ServiceLoader: err = f"元数据类型错误: {service_path / 'metadata.yaml'}" LOGGER.error(err) raise TypeError(err) + metadata.hashes = hashes # 载入OpenAPI文档,获取Node列表 openapi_loader = OpenAPILoader.remote() @@ -89,10 +90,10 @@ class ServiceLoader: async def save(self, service_id: str, metadata: ServiceMetadata) -> None: - """在文件系统上保存Service""" + """在文件系统上保存Service,并更新数据库""" pass async def delete(self, service_id: str) -> None: - """删除Service""" + """删除Service,并更新数据库""" pass diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py index ab5efa04dea5722216c518bae30a7d194faa7a13..14f2dc54c35226d0a3df4f1e1e23061f4899a151 100644 --- a/apps/scheduler/pool/pool.py +++ b/apps/scheduler/pool/pool.py @@ -2,8 +2,13 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ +from pathlib import Path +from typing import Optional + import ray +from apps.common.config import config +from apps.constants import LOGGER, SERVICE_DIR from apps.entities.enum_var import MetadataType from apps.entities.flow_topology import FlowItem from apps.scheduler.pool.check import FileChecker @@ -19,13 +24,16 @@ class Pool: # 加载Call await CallLoader().load() - # 加载Services + # 检查文件变动 checker = FileChecker() changed_service, deleted_service = await checker.diff(MetadataType.SERVICE) + LOGGER.info(f"11111, checker.hashes: {checker.hashes}, changed_service: {changed_service}, deleted_service: {deleted_service}") + + # 处理Service service_loader = ServiceLoader() for service in changed_service: # 重载变化的Service - await service_loader.load(service) + await service_loader.load(service, checker.hashes[Path(SERVICE_DIR + "/" + service).as_posix()]) for service in deleted_service: # 删除消失的Service await service_loader.delete(service) @@ -39,5 +47,11 @@ class Pool: pass - def get_flow(self, app_id: str, flow_id: str) -> FlowItem: + def get_flow_metadata(self, app_id: str) -> Optional[FlowItem]: + """从数据库中获取全部Flow的元数据""" + pass + + + def get_flow(self, app_id: str, flow_id: str) -> Optional[FlowItem]: + """从数据库中获取单个Flow的全部数据""" pass diff --git a/apps/scheduler/scheduler/flow.py b/apps/scheduler/scheduler/flow.py index 155203268f769e4d013e09adcaf944f5a6d401bc..5e59626b877e5f35e258aa199a37f31092844d3f 100644 --- a/apps/scheduler/scheduler/flow.py +++ b/apps/scheduler/scheduler/flow.py @@ -4,21 +4,52 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ from typing import Optional +from apps.entities.flow import Flow from apps.entities.task import RequestDataApp from apps.llm.patterns import Select from apps.scheduler.pool.pool import Pool -async def choose_flow(task_id: str, question: str, origin_app: RequestDataApp) -> tuple[str, Optional[RequestDataApp]]: - """依据用户的输入和选择,构造对应的Flow。 +class PredifinedRAGFlow(Flow): + """预定义的RAG Flow""" - - 当用户没有选择任何Plugin时,直接进行智能问答 - - 当用户选择auto时,自动识别最合适的n个Plugin,并在其中挑选flow - - 当用户选择Plugin时,在plugin内挑选最适合的flow + name: str = "KnowledgeBase" + description: str = "当上述工具无法直接解决用户问题时,使用知识库进行回答。" - :param question: 用户输入(用户问题) - :param origin_app: 用户选择的app信息 - :result: 经LLM选择的App ID和Flow ID - """ - # TODO: 根据用户选择的App,选一次top_k flow - return "", None \ No newline at end of file + +class FlowChooser: + """Flow选择器""" + + def __init__(self, task_id: str, question: str, user_selected: Optional[RequestDataApp] = None): + """初始化Flow选择器""" + self._task_id = task_id + self._question = question + self._user_selected = user_selected + + + def get_top_flow(self) -> str: + """获取Top1 Flow""" + pass + + + def choose_flow(self) -> Optional[RequestDataApp]: + """依据用户的输入和选择,构造对应的Flow。 + + - 当用户没有选择任何app时,直接进行智能问答 + - 当用户选择了特定的app时,在plugin内挑选最适合的flow + """ + if not self._user_selected or not self._user_selected.app_id: + return None + + if self._user_selected.flow_id: + return self._user_selected + + top_flow = self.get_top_flow() + if top_flow == "KnowledgeBase": + return None + + return RequestDataApp( + appId=self._user_selected.app_id, + flowId=top_flow, + params={}, + ) diff --git a/apps/scheduler/slot/parser/const.py b/apps/scheduler/slot/parser/const.py index ed0d68dc53c36ab65422358864ad6ba9b0e657e9..9c0329eb47f40a3fb0d0f2fc17f875c30419a0f6 100644 --- a/apps/scheduler/slot/parser/const.py +++ b/apps/scheduler/slot/parser/const.py @@ -5,17 +5,16 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. from typing import Any from apps.entities.enum_var import SlotType -from apps.scheduler.slot.parser.core import SlotParser -class SlotConstParser(SlotParser): +class SlotConstParser: """给字段设置固定值""" type: SlotType = SlotType.KEYWORD name: str = "const" @classmethod - def convert(cls, data: Any, **kwargs) -> Any: # noqa: ANN003, ANN401 + def convert(cls, data: Any, **kwargs) -> Any: # noqa: ANN003 """生成keyword的验证器 如果没有对应逻辑则不实现 diff --git a/apps/scheduler/slot/parser/core.py b/apps/scheduler/slot/parser/core.py deleted file mode 100644 index c2789958c4c29df4f5557a419ed7fae3af049f4f..0000000000000000000000000000000000000000 --- a/apps/scheduler/slot/parser/core.py +++ /dev/null @@ -1,53 +0,0 @@ -"""参数槽解析器类结构 - -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. -""" -from typing import Any - -from jsonschema import TypeChecker -from jsonschema.protocols import Validator - -from apps.entities.enum_var import SlotType - - -class SlotParser: - """参数槽Schema处理器""" - - type: SlotType = SlotType.TYPE - name: str = "" - - - @classmethod - def convert(cls, data: Any, **kwargs) -> Any: # noqa: ANN003, ANN401 - """将请求或返回的字段进行处理 - - 若没有对应逻辑则不实现 - """ - raise NotImplementedError - - - @classmethod - def type_validate(cls, checker: TypeChecker, instance: Any) -> bool: # noqa: ANN401 - """生成type的验证器 - - 若没有对应逻辑则不实现 - """ - raise NotImplementedError - - - @classmethod - def format_validate(cls) -> None: - """生成format的验证器 - - 若没有对应逻辑则不实现 - """ - raise NotImplementedError - - - @classmethod - def keyword_processor(cls, validator: Validator, keyword_value: Any, instance: Any, schema: dict[str, Any]) -> None: # noqa: ANN401 - """生成keyword的验证器 - - 如果没有对应逻辑则不实现 - """ - raise NotImplementedError diff --git a/apps/scheduler/slot/parser/date.py b/apps/scheduler/slot/parser/date.py index 5d8086cafd4f1d525473bd9d424b209e7c3b5282..f0f58530eb631521eb385be2ec644fb3d18f2d7f 100644 --- a/apps/scheduler/slot/parser/date.py +++ b/apps/scheduler/slot/parser/date.py @@ -11,10 +11,9 @@ from jsonschema import TypeChecker from apps.constants import LOGGER from apps.entities.enum_var import SlotType -from apps.scheduler.slot.parser.core import SlotParser -class SlotDateParser(SlotParser): +class SlotDateParser: """日期解析器""" type: SlotType = SlotType.TYPE @@ -50,7 +49,7 @@ class SlotDateParser(SlotParser): @classmethod - def type_validate(cls, _checker: TypeChecker, instance: Any) -> bool: # noqa: ANN401 + def type_validate(cls, _checker: TypeChecker, instance: Any) -> bool: """生成对应类型的验证器""" if not isinstance(instance, str): return False diff --git a/apps/scheduler/slot/parser/default.py b/apps/scheduler/slot/parser/default.py index 89ec9c1940483464bd2e6a399de2e7340661a1c9..998ebba92852f686e06eacc0d3324217484fb11f 100644 --- a/apps/scheduler/slot/parser/default.py +++ b/apps/scheduler/slot/parser/default.py @@ -5,17 +5,16 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. from typing import Any from apps.entities.enum_var import SlotType -from apps.scheduler.slot.parser.core import SlotParser -class SlotDefaultParser(SlotParser): +class SlotDefaultParser: """给字段设置默认值""" type: SlotType = SlotType.KEYWORD name: str = "default" @classmethod - def convert(cls, data: Any, **kwargs) -> Any: # noqa: ANN003, ANN401 + def convert(cls, data: Any, **kwargs) -> Any: # noqa: ANN003 """给字段设置默认值 如果没有对应逻辑则不实现 diff --git a/apps/scheduler/slot/parser/timestamp.py b/apps/scheduler/slot/parser/timestamp.py index 4dede63f79a4b1a5d1a7ad07141fb3b312249f0f..a0d643667233d5e7e718e21dbd31405a5bcffd09 100644 --- a/apps/scheduler/slot/parser/timestamp.py +++ b/apps/scheduler/slot/parser/timestamp.py @@ -10,10 +10,9 @@ from jsonschema import TypeChecker from apps.constants import LOGGER from apps.entities.enum_var import SlotType -from apps.scheduler.slot.parser.core import SlotParser -class SlotTimestampParser(SlotParser): +class SlotTimestampParser: """时间戳解析器""" type: SlotType = SlotType.TYPE @@ -31,7 +30,7 @@ class SlotTimestampParser(SlotParser): @classmethod - def type_validate(cls, _checker: TypeChecker, instance: Any) -> bool: # noqa: ANN401 + def type_validate(cls, _checker: TypeChecker, instance: Any) -> bool: """生成type的验证器 若没有对应的处理逻辑则返回True diff --git a/apps/scheduler/slot/slot.py b/apps/scheduler/slot/slot.py index 8e90485395e159116f673d2ca2f3aa9325501bbd..927165aac175ea7e607fecbcace393ab01d1e6fb 100644 --- a/apps/scheduler/slot/slot.py +++ b/apps/scheduler/slot/slot.py @@ -92,7 +92,7 @@ class Slot: @staticmethod - def _process_json_value(json_value: Any, spec_data: dict[str, Any]) -> Any: # noqa: ANN401, C901, PLR0911, PLR0912 + def _process_json_value(json_value: Any, spec_data: dict[str, Any]) -> Any: # noqa: C901, PLR0911, PLR0912 """使用递归的方式对JSON返回值进行处理 :param json_value: 返回值中的字段 @@ -270,7 +270,7 @@ class Slot: if previous_output is not None: tool_str = f"""I used a tool to get extra information from other sources. \ The output data of the tool is `{previous_output}`. - The schema of the output is `{json.dumps(previous_output.output_schema, ensure_ascii=False)}`, which contains description of the output. + The schema of the output is `{json.dumps(previous_output["output_schema"], ensure_ascii=False)}`, which contains description of the output. """ conversation.append({"role": "tool", "content": tool_str})