From ca43e164bda23c8de583ad27f4b6fbf20dfc5b96 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 7 Apr 2025 11:44:55 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E6=96=87=E6=A1=A3=E8=BF=94=E5=9B=9E=E6=95=B0=E6=8D=AE=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/routers/document.py | 87 ++++++++++++++++++++++++++-------------- 1 file changed, 56 insertions(+), 31 deletions(-) diff --git a/apps/routers/document.py b/apps/routers/document.py index cc2457c6a..dced25e87 100644 --- a/apps/routers/document.py +++ b/apps/routers/document.py @@ -1,7 +1,9 @@ -"""FastAPI文件上传路由 +""" +FastAPI文件上传路由 -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ + from typing import Annotated from fastapi import APIRouter, Depends, File, Query, UploadFile, status @@ -32,7 +34,9 @@ router = APIRouter( @router.post("/{conversation_id}", dependencies=[Depends(verify_csrf_token)]) async def document_upload( # noqa: ANN201 - conversation_id: str, documents: Annotated[list[UploadFile], File(...)], user_sub: Annotated[str, Depends(get_user)], + conversation_id: str, + documents: Annotated[list[UploadFile], File(...)], + user_sub: Annotated[str, Depends(get_user)], ): """上传文档""" result = await DocumentManager.storage_docs(user_sub, conversation_id, documents) @@ -45,19 +49,27 @@ async def document_upload( # noqa: ANN201 name=doc.name, type=doc.type, size=doc.size, - ) for doc in result + ) + for doc in result ] - return JSONResponse(status_code=200, content=UploadDocumentRsp( - code=status.HTTP_200_OK, - message="上传成功", - result=UploadDocumentMsg(documents=succeed_document), - ).model_dump(exclude_none=True, by_alias=False)) + return JSONResponse( + status_code=200, + content=UploadDocumentRsp( + code=status.HTTP_200_OK, + message="上传成功", + result=UploadDocumentMsg(documents=succeed_document), + ).model_dump(exclude_none=True, by_alias=False), + ) @router.get("/{conversation_id}", response_model=ConversationDocumentRsp) async def get_document_list( # noqa: ANN201 - conversation_id: str, user_sub: Annotated[str, Depends(get_user)], used: Annotated[bool, Query()] = False, unused: Annotated[bool, Query()] = True, # noqa: FBT002 + conversation_id: str, + user_sub: Annotated[str, Depends(get_user)], + *, + used: Annotated[bool, Query()] = False, + unused: Annotated[bool, Query()] = True, ): """获取文档列表""" result = [] @@ -72,7 +84,8 @@ async def get_document_list( # noqa: ANN201 size=round(item.size, 2), status=DocumentStatus.USED, created_at=item.created_at, - ) for item in docs + ) + for item in docs ] if unused: @@ -103,11 +116,14 @@ async def get_document_list( # noqa: ANN201 ] # 对外展示的时候用id,不用alias - return JSONResponse(status_code=status.HTTP_200_OK, content=ConversationDocumentRsp( - code=status.HTTP_200_OK, - message="获取成功", - result=ConversationDocumentMsg(documents=result), - ).model_dump(exclude_none=True, by_alias=False)) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=ConversationDocumentRsp( + code=status.HTTP_200_OK, + message="获取成功", + result=ConversationDocumentMsg(documents=result), + ).model_dump(exclude_none=True, by_alias=False), + ) @router.delete("/{document_id}", response_model=ResponseData) @@ -116,22 +132,31 @@ async def delete_single_document(document_id: str, user_sub: Annotated[str, Depe # 在Framework侧删除 result = await DocumentManager.delete_document(user_sub, [document_id]) if not result: - return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=ResponseData( - code=status.HTTP_500_INTERNAL_SERVER_ERROR, - message="删除文件失败", - result={}, - ).model_dump(exclude_none=True, by_alias=False)) + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=ResponseData( + code=status.HTTP_500_INTERNAL_SERVER_ERROR, + message="删除文件失败", + result={}, + ).model_dump(exclude_none=True, by_alias=False), + ) # 在RAG侧删除 result = await KnowledgeBaseService.delete_doc_from_rag([document_id]) if not result: - return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=ResponseData( - code=status.HTTP_500_INTERNAL_SERVER_ERROR, - message="RAG端删除文件失败", + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=ResponseData( + code=status.HTTP_500_INTERNAL_SERVER_ERROR, + message="RAG端删除文件失败", + result={}, + ).model_dump(exclude_none=True, by_alias=False), + ) + + return JSONResponse( + status_code=status.HTTP_200_OK, + content=ResponseData( + code=status.HTTP_200_OK, + message="删除成功", result={}, - ).model_dump(exclude_none=True, by_alias=False)) - - return JSONResponse(status_code=status.HTTP_200_OK, content=ResponseData( - code=status.HTTP_200_OK, - message="删除成功", - result={}, - ).model_dump(exclude_none=True, by_alias=False)) + ).model_dump(exclude_none=True, by_alias=False), + ) -- Gitee From 3faaadedc57b804aff308eae6a214241641edc0b Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 7 Apr 2025 11:46:11 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E4=BD=BF=E7=94=A8=E6=96=B0=E7=89=88?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6=20&=20=E5=9B=BA=E5=AE=9Ada?= =?UTF-8?q?ta=E6=96=87=E4=BB=B6=E5=A4=B9=E5=86=85=E5=AD=90=E8=B7=AF?= =?UTF-8?q?=E5=BE=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/__init__.py | 5 ++-- apps/scheduler/pool/__init__.py | 5 ++-- apps/scheduler/pool/check.py | 22 +++++++------- apps/scheduler/pool/pool.py | 52 +++++++++++++++------------------ 4 files changed, 40 insertions(+), 44 deletions(-) diff --git a/apps/scheduler/__init__.py b/apps/scheduler/__init__.py index 176ccd35d..c004fec74 100644 --- a/apps/scheduler/__init__.py +++ b/apps/scheduler/__init__.py @@ -1,4 +1,5 @@ -"""Framework Scheduler模块 +""" +Framework Scheduler模块 -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ diff --git a/apps/scheduler/pool/__init__.py b/apps/scheduler/pool/__init__.py index 27aeb112a..636c1a7b5 100644 --- a/apps/scheduler/pool/__init__.py +++ b/apps/scheduler/pool/__init__.py @@ -1,5 +1,6 @@ -"""数据池 +""" +数据池 包含Flow、Plugin、Call等的Loader -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ diff --git a/apps/scheduler/pool/check.py b/apps/scheduler/pool/check.py index 47b955527..2f646a8fe 100644 --- a/apps/scheduler/pool/check.py +++ b/apps/scheduler/pool/check.py @@ -1,20 +1,19 @@ -"""文件检查器;检查文件是否存在、Hash是否发生变化;生成更新列表和删除列表 +""" +文件检查器;检查文件是否存在、Hash是否发生变化;生成更新列表和删除列表 -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ import logging from hashlib import sha256 -from typing import Optional from anyio import Path -from apps.common.config import config -from apps.constants import APP_DIR, SERVICE_DIR +from apps.common.config import Config from apps.entities.enum_var import MetadataType from apps.models.mongo import MongoDB -logger = logging.getLogger("ray") +logger = logging.getLogger(__name__) class FileChecker: @@ -23,7 +22,7 @@ class FileChecker: def __init__(self) -> None: """初始化文件检查器""" self.hashes = {} - self._dir_path = Path(config["SEMANTICS_DIR"]) + self._dir_path = Path(Config().get_config().deploy.data_dir) / "semantics" async def check_one(self, path: Path) -> dict[str, str]: """检查单个App/Service文件是否有变动""" @@ -45,10 +44,10 @@ class FileChecker: return hashes - async def diff_one(self, path: Path, previous_hashes: Optional[dict[str, str]] = None) -> bool: + async def diff_one(self, path: Path, previous_hashes: dict[str, str] | None = None) -> bool: """检查文件是否发生变化""" self._resource_path = path - path_diff = self._resource_path.relative_to(config["SEMANTICS_DIR"]) + path_diff = self._resource_path.relative_to(self._dir_path) self.hashes[path_diff.as_posix()] = await self.check_one(path) return self.hashes[path_diff.as_posix()] != previous_hashes @@ -57,10 +56,10 @@ class FileChecker: """生成更新列表和删除列表""" if check_type == MetadataType.APP: collection = MongoDB.get_collection("app") - self._dir_path = Path(config["SEMANTICS_DIR"]) / APP_DIR + self._dir_path = Path(Config().get_config().deploy.data_dir) / "semantics" / "app" elif check_type == MetadataType.SERVICE: collection = MongoDB.get_collection("service") - self._dir_path = Path(config["SEMANTICS_DIR"]) / SERVICE_DIR + self._dir_path = Path(Config().get_config().deploy.data_dir) / "semantics" / "service" changed_list = [] deleted_list = [] @@ -83,6 +82,7 @@ class FileChecker: if await self.diff_one(Path(self._dir_path / list_item["_id"]), list_item.get("hashes", None)): changed_list.append(list_item["_id"]) + logger.info("[FileChecker] 文件变动: %s;文件删除: %s", changed_list, deleted_list) # 遍历目录 item_names = [item["_id"] for item in items] async for service_folder in self._dir_path.iterdir(): diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py index 33850ba51..e45926a2a 100644 --- a/apps/scheduler/pool/pool.py +++ b/apps/scheduler/pool/pool.py @@ -1,16 +1,16 @@ -"""资源池,包含语义接口、应用等的载入和保存 +""" +资源池,包含语义接口、应用等的载入和保存 -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ import importlib import logging -from typing import Any, Optional +from typing import Any -import ray from anyio import Path -from apps.common.config import config -from apps.constants import APP_DIR, CALL_DIR, SERVICE_DIR +from apps.common.config import Config +from apps.common.singleton import SingletonMeta from apps.entities.enum_var import MetadataType from apps.entities.flow import Flow from apps.entities.pool import AppFlow, CallPool @@ -23,26 +23,25 @@ from apps.scheduler.pool.loader import ( ServiceLoader, ) -logger = logging.getLogger("ray") +logger = logging.getLogger(__name__) -@ray.remote -class Pool: +class Pool(metaclass=SingletonMeta): """资源池""" @staticmethod async def check_dir() -> None: """检查文件夹是否存在""" - root_dir = config["SEMANTICS_DIR"] + "/" - if not await Path(root_dir + APP_DIR).exists(): - logger.warning("[Pool] App目录%s不存在,创建中", root_dir + APP_DIR) - await Path(root_dir + APP_DIR).mkdir(parents=True, exist_ok=True) - if not await Path(root_dir + SERVICE_DIR).exists(): - logger.warning("[Pool] Service目录%s不存在,创建中", root_dir + SERVICE_DIR) - await Path(root_dir + SERVICE_DIR).mkdir(parents=True, exist_ok=True) - if not await Path(root_dir + CALL_DIR).exists(): - logger.warning("[Pool] Call目录%s不存在,创建中", root_dir + CALL_DIR) - await Path(root_dir + CALL_DIR).mkdir(parents=True, exist_ok=True) + root_dir = Config().get_config().deploy.data_dir.rstrip("/") + "/semantics/" + if not await Path(root_dir + "app").exists(): + logger.warning("[Pool] App目录%s不存在,创建中", root_dir + "app") + await Path(root_dir + "app").mkdir(parents=True, exist_ok=True) + if not await Path(root_dir + "service").exists(): + logger.warning("[Pool] Service目录%s不存在,创建中", root_dir + "service") + await Path(root_dir + "service").mkdir(parents=True, exist_ok=True) + if not await Path(root_dir + "call").exists(): + logger.warning("[Pool] Call目录%s不存在,创建中", root_dir + "call") + await Path(root_dir + "call").mkdir(parents=True, exist_ok=True) async def init(self) -> None: @@ -71,7 +70,7 @@ class Pool: # 批量加载 for service in changed_service: - hash_key = Path(SERVICE_DIR + "/" + service).as_posix() + hash_key = Path("service/" + service).as_posix() if hash_key in checker.hashes: await service_loader.load(service, checker.hashes[hash_key]) @@ -88,17 +87,11 @@ class Pool: # 批量加载App for app in changed_app: - hash_key = Path(APP_DIR + "/" + app).as_posix() + hash_key = Path("app/" + app).as_posix() if hash_key in checker.hashes: await app_loader.load(app, checker.hashes[hash_key]) - # TODO: 使用统一的保存入口 - async def save(self, *, is_deletion: bool = False) -> None: - """保存【单个】资源""" - pass - - async def get_flow_metadata(self, app_id: str) -> list[AppFlow]: """从数据库中获取特定App的全部Flow的元数据""" app_collection = MongoDB.get_collection("app") @@ -109,13 +102,14 @@ class Pool: return [] for flow in flow_list["flows"]: flow_metadata_list += [AppFlow.model_validate(flow)] - return flow_metadata_list except Exception: logger.exception("[Pool] 获取App %s 的Flow列表失败", app_id) return [] + else: + return flow_metadata_list - async def get_flow(self, app_id: str, flow_id: str) -> Optional[Flow]: + async def get_flow(self, app_id: str, flow_id: str) -> Flow | None: """从文件系统中获取单个Flow的全部数据""" logger.info("[Pool] 获取工作流 %s", flow_id) flow_loader = FlowLoader() -- Gitee From 01b3cbeab15a125606d5886cffad58d0817701ce Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 7 Apr 2025 11:46:54 +0800 Subject: [PATCH 3/5] CleanCode --- apps/scheduler/json_schema.py | 107 +++++++++++++++++----------------- apps/scheduler/openapi.py | 13 +++-- 2 files changed, 61 insertions(+), 59 deletions(-) diff --git a/apps/scheduler/json_schema.py b/apps/scheduler/json_schema.py index 12d18394b..79a4e3b78 100644 --- a/apps/scheduler/json_schema.py +++ b/apps/scheduler/json_schema.py @@ -1,10 +1,12 @@ -"""JSON Schema转为正则表达式 +""" +JSON Schema转为正则表达式 来源:https://github.com/dottxt-ai/outlines/blob/main/outlines/fsm/json_schema.py """ + import json import re -from typing import Any, Optional, Union +from typing import Any from jsonschema.protocols import Validator from pydantic import BaseModel @@ -30,7 +32,9 @@ type_to_regex = { "null": NULL, } -DATE_TIME = r'"(-?(?:[1-9][0-9]*)?[0-9]{4})-(1[0-2]|0[1-9])-(3[01]|0[1-9]|[12][0-9])T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])(\.[0-9]{3})?(Z)?"' +DATE_TIME = ( + r'"(-?(?:[1-9][0-9]*)?[0-9]{4})-(1[0-2]|0[1-9])-(3[01]|0[1-9]|[12][0-9])T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])(\.[0-9]{3})?(Z)?"' +) DATE = r'"(?:\d{4})-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1-2][0-9]|3[0-1])"' TIME = r'"(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])(\\.[0-9]+)?(Z)?"' UUID = r'"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"' @@ -43,7 +47,7 @@ format_to_regex = { } -def build_regex_from_schema(schema: str, whitespace_pattern: Optional[str] = None) -> str: +def build_regex_from_schema(schema: str, whitespace_pattern: str | None = None) -> str: """将JSON Schema转换为正则表达式""" schema_dict: dict[str, Any] = json.loads(schema) Validator.check_schema(schema_dict) @@ -58,10 +62,10 @@ def build_regex_from_schema(schema: str, whitespace_pattern: Optional[str] = Non resolver = registry.resolver() content = schema_resource.contents - return to_regex(resolver, content, whitespace_pattern) + return to_regex(resolver, content, whitespace_pattern) # type: ignore[arg-type] -def convert_json_schema_to_str(json_schema: Union[dict, str, type[BaseModel]]) -> str: +def convert_json_schema_to_str(json_schema: dict | str | type[BaseModel]) -> str: """将JSON Schema转换为字符串""" if isinstance(json_schema, dict): schema_str = json.dumps(json_schema) @@ -73,7 +77,7 @@ def convert_json_schema_to_str(json_schema: Union[dict, str, type[BaseModel]]) - return schema_str -def _get_num_items_pattern(min_items: int, max_items: Optional[int]) -> Optional[str]: +def _get_num_items_pattern(min_items: int, max_items: int | None) -> str | None: """用于数组和对象的辅助函数""" min_items = int(min_items or 0) if max_items is None: @@ -86,7 +90,9 @@ def _get_num_items_pattern(min_items: int, max_items: Optional[int]) -> Optional def validate_quantifiers( - min_bound: Optional[str], max_bound: Optional[str], start_offset: int = 0, + min_bound: str | None, + max_bound: str | None, + start_offset: int = 0, ) -> tuple[str, str]: """确保数字的边界有效。边界用于正则表达式中的量化器""" min_bound = "" if min_bound is None else str(int(min_bound) - start_offset) @@ -98,7 +104,9 @@ def validate_quantifiers( def to_regex( # noqa: C901, PLR0911, PLR0912, PLR0915 - resolver: Resolver, instance: dict, whitespace_pattern: Optional[str] = None, + resolver: Resolver, + instance: dict, + whitespace_pattern: str | None = None, ) -> str: """将 JSON Schema 实例转换为对应的正则表达式""" # set whitespace pattern @@ -168,26 +176,20 @@ def to_regex( # noqa: C901, PLR0911, PLR0912, PLR0915 # To validate against allOf, the given data must be valid against all of the # given subschemas. if "allOf" in instance: - subregexes = [ - to_regex(resolver, t, whitespace_pattern) for t in instance["allOf"] - ] + subregexes = [to_regex(resolver, t, whitespace_pattern) for t in instance["allOf"]] subregexes_str = [f"{subregex}" for subregex in subregexes] return rf"({''.join(subregexes_str)})" # To validate against `anyOf`, the given data must be valid against # any (one or more) of the given subschemas. if "anyOf" in instance: - subregexes = [ - to_regex(resolver, t, whitespace_pattern) for t in instance["anyOf"] - ] + subregexes = [to_regex(resolver, t, whitespace_pattern) for t in instance["anyOf"]] return rf"({'|'.join(subregexes)})" # To validate against oneOf, the given data must be valid against exactly # one of the given subschemas. if "oneOf" in instance: - subregexes = [ - to_regex(resolver, t, whitespace_pattern) for t in instance["oneOf"] - ] + subregexes = [to_regex(resolver, t, whitespace_pattern) for t in instance["oneOf"]] xor_patterns = [f"(?:{subregex})" for subregex in subregexes] @@ -195,9 +197,7 @@ def to_regex( # noqa: C901, PLR0911, PLR0912, PLR0915 # Create pattern for tuples, per JSON Schema spec, `prefixItems` determines types at each idx if "prefixItems" in instance: - element_patterns = [ - to_regex(resolver, t, whitespace_pattern) for t in instance["prefixItems"] - ] + element_patterns = [to_regex(resolver, t, whitespace_pattern) for t in instance["prefixItems"]] comma_split_pattern = rf"{whitespace_pattern},{whitespace_pattern}" tuple_inner = comma_split_pattern.join(element_patterns) return rf"\[{whitespace_pattern}{tuple_inner}{whitespace_pattern}\]" @@ -284,10 +284,12 @@ def to_regex( # noqa: C901, PLR0911, PLR0912, PLR0915 start_offset=1, ) min_digits_fraction, max_digits_fraction = validate_quantifiers( - instance.get("minDigitsFraction"), instance.get("maxDigitsFraction"), + instance.get("minDigitsFraction"), + instance.get("maxDigitsFraction"), ) min_digits_exponent, max_digits_exponent = validate_quantifiers( - instance.get("minDigitsExponent"), instance.get("maxDigitsExponent"), + instance.get("minDigitsExponent"), + instance.get("maxDigitsExponent"), ) integers_quantifier = ( f"{{{min_digits_integer},{max_digits_integer}}}" @@ -304,20 +306,27 @@ def to_regex( # noqa: C901, PLR0911, PLR0912, PLR0915 if min_digits_exponent or max_digits_exponent else "+" ) - return rf"((-)?(0|[1-9][0-9]{integers_quantifier}))(\.[0-9]{fraction_quantifier})?([eE][+-][0-9]{exponent_quantifier})?" + return ( + rf"((-)?(0|[1-9][0-9]{integers_quantifier}))" + rf"(\.[0-9]{fraction_quantifier})?" + rf"([eE][+-][0-9]{exponent_quantifier})?" + ) return type_to_regex["number"] if instance_type == "integer": if "minDigits" in instance or "maxDigits" in instance: min_digits, max_digits = validate_quantifiers( - instance.get("minDigits"), instance.get("maxDigits"), start_offset=1, + instance.get("minDigits"), + instance.get("maxDigits"), + start_offset=1, ) return rf"(-)?(0|[1-9][0-9]{{{min_digits},{max_digits}}})" return type_to_regex["integer"] if instance_type == "array": num_repeats = _get_num_items_pattern( - instance["minItems"], instance["maxItems"], + instance["minItems"], + instance["maxItems"], ) if num_repeats is None: return rf"\[{whitespace_pattern}\]" @@ -326,7 +335,10 @@ def to_regex( # noqa: C901, PLR0911, PLR0912, PLR0915 if "items" in instance: items_regex = to_regex(resolver, instance["items"], whitespace_pattern) - return rf"\[{whitespace_pattern}(({items_regex})(,{whitespace_pattern}({items_regex})){num_repeats}){allow_empty}{whitespace_pattern}\]" + return ( + rf"\[{whitespace_pattern}(({items_regex})" + rf",{whitespace_pattern}({items_regex})){num_repeats}){allow_empty}{whitespace_pattern}\]" + ) # Here we need to make the choice to exclude generating list of objects # if the specification of the object is not given, even though a JSON @@ -343,10 +355,11 @@ def to_regex( # noqa: C901, PLR0911, PLR0912, PLR0915 legal_types.append({"type": "object", "depth": depth - 1}) legal_types.append({"type": "array", "depth": depth - 1}) - regexes = [ - to_regex(resolver, t, whitespace_pattern) for t in legal_types - ] - return rf"\[{whitespace_pattern}({'|'.join(regexes)})(,{whitespace_pattern}({'|'.join(regexes)})){num_repeats}{allow_empty}{whitespace_pattern}\]" + regexes = [to_regex(resolver, t, whitespace_pattern) for t in legal_types] + return ( + rf"\[{whitespace_pattern}({'|'.join(regexes)})" + rf",{whitespace_pattern}({'|'.join(regexes)})){num_repeats}{allow_empty}{whitespace_pattern}\]" + ) if instance_type == "object": # pattern for json object with values defined by instance["additionalProperties"] @@ -384,23 +397,17 @@ def to_regex( # noqa: C901, PLR0911, PLR0912, PLR0915 additional_properties = {"anyOf": legal_types} value_pattern = to_regex( - resolver, additional_properties, whitespace_pattern, + resolver, + additional_properties, + whitespace_pattern, ) - key_value_pattern = ( - f"{STRING}{whitespace_pattern}:{whitespace_pattern}{value_pattern}" + key_value_pattern = f"{STRING}{whitespace_pattern}:{whitespace_pattern}{value_pattern}" + key_value_successor_pattern = f"{whitespace_pattern},{whitespace_pattern}{key_value_pattern}" + multiple_key_value_pattern = ( + f"({key_value_pattern}({key_value_successor_pattern}){num_repeats}){allow_empty}" ) - key_value_successor_pattern = ( - f"{whitespace_pattern},{whitespace_pattern}{key_value_pattern}" - ) - multiple_key_value_pattern = f"({key_value_pattern}({key_value_successor_pattern}){num_repeats}){allow_empty}" - return ( - r"\{" - + whitespace_pattern - + multiple_key_value_pattern - + whitespace_pattern - + r"\}" - ) + return r"\{" + whitespace_pattern + multiple_key_value_pattern + whitespace_pattern + r"\}" if instance_type == "boolean": return type_to_regex["boolean"] @@ -412,17 +419,11 @@ def to_regex( # noqa: C901, PLR0911, PLR0912, PLR0915 # Here we need to make the choice to exclude generating an object # if the specification of the object is not give, even though a JSON # object that contains an object here would be valid under the specification. - regexes = [ - to_regex(resolver, {"type": t}, whitespace_pattern) - for t in instance_type - if t != "object" - ] + regexes = [to_regex(resolver, {"type": t}, whitespace_pattern) for t in instance_type if t != "object"] return rf"({'|'.join(regexes)})" # 以上都没有匹配到,则抛出错误 - err = ( - f"""Could not translate the instance {instance} to a + err = f"""Could not translate the instance {instance} to a regular expression. Make sure it is valid to the JSON Schema specification. If it is, please open an issue on the Outlines repository""" - ) raise NotImplementedError(err) diff --git a/apps/scheduler/openapi.py b/apps/scheduler/openapi.py index ec8c4ed2d..59e5ce14f 100644 --- a/apps/scheduler/openapi.py +++ b/apps/scheduler/openapi.py @@ -1,11 +1,12 @@ -"""OpenAPI文档相关操作 +""" +OpenAPI文档相关操作 -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ from collections.abc import Sequence from copy import deepcopy -from typing import Any, Optional +from typing import Any from pydantic import BaseModel, Field @@ -50,7 +51,7 @@ def _dereference_refs_helper( obj: Any, full_schema: dict[str, Any], skip_keys: Sequence[str], - processed_refs: Optional[set[str]] = None, + processed_refs: set[str] | None = None, ) -> Any: """递归地将OpenAPI中的$ref替换为实际的schema""" if processed_refs is None: @@ -84,7 +85,7 @@ def _dereference_refs_helper( def _infer_skip_keys( obj: Any, full_schema: dict, - processed_refs: Optional[set[str]] = None, + processed_refs: set[str] | None = None, ) -> list[str]: """推断需要跳过的OpenAPI文档中的键""" if processed_refs is None: @@ -111,7 +112,7 @@ def _infer_skip_keys( def dereference_refs( schema_obj: dict, *, - full_schema: Optional[dict] = None, + full_schema: dict | None = None, ) -> dict: """将OpenAPI中的$ref替换为实际的schema""" full_schema = full_schema or schema_obj -- Gitee From 7100756111979fa66b4ee779fd5b0c295b7f6723 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 7 Apr 2025 11:48:00 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=87=BD=E6=95=B0?= =?UTF-8?q?=E5=8D=A0=E4=BD=8D=EF=BC=8C=E9=81=BF=E5=85=8Draise=20Exception?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/slot/parser/__init__.py | 6 ++++-- apps/scheduler/slot/parser/const.py | 16 +++++++++++++--- apps/scheduler/slot/parser/date.py | 12 ++++++++---- apps/scheduler/slot/parser/default.py | 16 +++++++++++++--- apps/scheduler/slot/parser/timestamp.py | 22 +++++++++++++--------- 5 files changed, 51 insertions(+), 21 deletions(-) diff --git a/apps/scheduler/slot/parser/__init__.py b/apps/scheduler/slot/parser/__init__.py index 25804d688..29f5f0c44 100644 --- a/apps/scheduler/slot/parser/__init__.py +++ b/apps/scheduler/slot/parser/__init__.py @@ -1,7 +1,9 @@ -"""Slot处理模块 +""" +Slot处理模块 -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ + from apps.scheduler.slot.parser.const import SlotConstParser from apps.scheduler.slot.parser.date import SlotDateParser from apps.scheduler.slot.parser.default import SlotDefaultParser diff --git a/apps/scheduler/slot/parser/const.py b/apps/scheduler/slot/parser/const.py index 9c0329eb4..3c0232302 100644 --- a/apps/scheduler/slot/parser/const.py +++ b/apps/scheduler/slot/parser/const.py @@ -1,9 +1,13 @@ -"""固定值设置器 +""" +固定值设置器 -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ + from typing import Any +from jsonschema import Validator + from apps.entities.enum_var import SlotType @@ -15,9 +19,15 @@ class SlotConstParser: @classmethod def convert(cls, data: Any, **kwargs) -> Any: # noqa: ANN003 - """生成keyword的验证器 + """ + 生成keyword的验证器 如果没有对应逻辑则不实现 """ raise NotImplementedError + @classmethod + def keyword_validate(cls, validator: Validator, keyword: str, instance: Any, schema: dict[str, Any]) -> bool: + """生成对应类型的验证器""" + ... + diff --git a/apps/scheduler/slot/parser/date.py b/apps/scheduler/slot/parser/date.py index 8f0a52108..ee5931896 100644 --- a/apps/scheduler/slot/parser/date.py +++ b/apps/scheduler/slot/parser/date.py @@ -1,7 +1,9 @@ -"""日期解析器 +""" +日期解析器 -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ + import logging from datetime import datetime from typing import Any @@ -12,7 +14,7 @@ from jsonschema import TypeChecker from apps.entities.enum_var import SlotType -logger = logging.getLogger("ray") +logger = logging.getLogger(__name__) class SlotDateParser: @@ -24,7 +26,8 @@ class SlotDateParser: @classmethod def convert(cls, data: str, **kwargs) -> tuple[str, str]: # noqa: ANN003 - """将日期字符串转换为日期对象 + """ + 将日期字符串转换为日期对象 返回的格式:(开始时间, 结束时间) """ @@ -59,6 +62,7 @@ class SlotDateParser: try: parse_time(instance) except Exception: + logger.exception("[Slot] Date解析失败: %s", instance) return False return True diff --git a/apps/scheduler/slot/parser/default.py b/apps/scheduler/slot/parser/default.py index 998ebba92..548d66b2c 100644 --- a/apps/scheduler/slot/parser/default.py +++ b/apps/scheduler/slot/parser/default.py @@ -1,9 +1,13 @@ -"""默认值设置器 +""" +默认值设置器 -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ + from typing import Any +from jsonschema import Validator + from apps.entities.enum_var import SlotType @@ -15,9 +19,15 @@ class SlotDefaultParser: @classmethod def convert(cls, data: Any, **kwargs) -> Any: # noqa: ANN003 - """给字段设置默认值 + """ + 给字段设置默认值 如果没有对应逻辑则不实现 """ raise NotImplementedError + @classmethod + def keyword_validate(cls, validator: Validator, keyword: str, instance: Any, schema: dict[str, Any]) -> bool: + """给字段设置默认值""" + ... + diff --git a/apps/scheduler/slot/parser/timestamp.py b/apps/scheduler/slot/parser/timestamp.py index cfdde028e..9bb8c30f7 100644 --- a/apps/scheduler/slot/parser/timestamp.py +++ b/apps/scheduler/slot/parser/timestamp.py @@ -1,17 +1,19 @@ -"""时间戳解析器 +""" +时间戳解析器 -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ + import logging from datetime import datetime -from typing import Any, Union +from typing import Any import pytz from jsonschema import TypeChecker from apps.entities.enum_var import SlotType -logger = logging.getLogger("ray") +logger = logging.getLogger(__name__) class SlotTimestampParser: @@ -21,19 +23,21 @@ class SlotTimestampParser: name: str = "timestamp" @classmethod - def convert(cls, data: Union[str, int], **_kwargs) -> str: # noqa: ANN003 + def convert(cls, data: str | int, **_kwargs) -> str: # noqa: ANN003 """将日期字符串转换为日期对象""" try: timestamp_int = int(data) - return datetime.fromtimestamp(timestamp_int, tz=pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + return datetime.fromtimestamp(timestamp_int, tz=pytz.timezone("Asia/Shanghai")).strftime( + "%Y-%m-%d %H:%M:%S", + ) except Exception: logger.exception("[SlotTimestampParser] Timestamp解析失败") return str(data) - @classmethod def type_validate(cls, _checker: TypeChecker, instance: Any) -> bool: - """生成type的验证器 + """ + 生成type的验证器 若没有对应的处理逻辑则返回True """ @@ -46,7 +50,7 @@ class SlotTimestampParser: timestamp_int = int(instance) datetime.fromtimestamp(timestamp_int, tz=pytz.timezone("Asia/Shanghai")) except Exception: + logger.exception("[SlotTimestampParser] Timestamp验证失败: %s", instance) return False return True - -- Gitee From 8f9054657d54e1a9ac082977b36b00ed770321ed Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 7 Apr 2025 11:48:52 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E5=8E=BB=E9=99=A4AppLoader=E7=9A=84?= =?UTF-8?q?=E5=90=91=E9=87=8F=E5=8C=96=E6=AD=A5=E9=AA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/pool/loader/__init__.py | 6 ++- apps/scheduler/pool/loader/app.py | 60 +++++++------------------- 2 files changed, 20 insertions(+), 46 deletions(-) diff --git a/apps/scheduler/pool/loader/__init__.py b/apps/scheduler/pool/loader/__init__.py index 8a842a898..e74642026 100644 --- a/apps/scheduler/pool/loader/__init__.py +++ b/apps/scheduler/pool/loader/__init__.py @@ -1,7 +1,9 @@ -"""配置加载器 +""" +配置加载器 -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ + from apps.scheduler.pool.loader.app import AppLoader from apps.scheduler.pool.loader.call import CallLoader from apps.scheduler.pool.loader.flow import FlowLoader diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py index 5e371b1fe..7cfd878a7 100644 --- a/apps/scheduler/pool/loader/app.py +++ b/apps/scheduler/pool/loader/app.py @@ -1,6 +1,7 @@ -"""App加载器 +""" +App加载器 -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ import logging @@ -8,32 +9,28 @@ import shutil from anyio import Path from fastapi.encoders import jsonable_encoder -from sqlalchemy import delete -from sqlalchemy.dialects.postgresql import insert -from apps.common.config import config -from apps.constants import APP_DIR, FLOW_DIR +from apps.common.config import Config from apps.entities.flow import AppFlow, AppMetadata, MetadataType, Permission from apps.entities.pool import AppPool -from apps.entities.vector import AppPoolVector from apps.models.mongo import MongoDB -from apps.models.postgres import PostgreSQL from apps.scheduler.pool.check import FileChecker from apps.scheduler.pool.loader.flow import FlowLoader from apps.scheduler.pool.loader.metadata import MetadataLoader -logger = logging.getLogger("ray") +logger = logging.getLogger(__name__) class AppLoader: """应用加载器""" async def load(self, app_id: str, hashes: dict[str, str]) -> None: - """从文件系统中加载应用 + """ + 从文件系统中加载应用 :param app_id: 应用 ID """ - app_path = Path(config["SEMANTICS_DIR"]) / APP_DIR / app_id + app_path = Path(Config().get_config().deploy.data_dir) / "semantics" / "app" / app_id metadata_path = app_path / "metadata.yaml" metadata = await MetadataLoader().load_one(metadata_path) if not metadata: @@ -46,7 +43,7 @@ class AppLoader: raise TypeError(err) # 加载工作流 - flow_path = app_path / FLOW_DIR + flow_path = app_path / "flow" flow_loader = FlowLoader() flow_ids = [app_flow.id for app_flow in metadata.flows] @@ -80,13 +77,14 @@ class AppLoader: async def save(self, metadata: AppMetadata, app_id: str) -> None: - """保存应用 + """ + 保存应用 :param metadata: 应用元数据 :param app_id: 应用 ID """ # 创建文件夹 - app_path = Path(config["SEMANTICS_DIR"]) / APP_DIR / app_id + app_path = Path(Config().get_config().deploy.data_dir) / "semantics" / "app" / app_id if not await app_path.exists(): await app_path.mkdir(parents=True, exist_ok=True) # 保存元数据 @@ -94,11 +92,12 @@ class AppLoader: # 重新载入 file_checker = FileChecker() await file_checker.diff_one(app_path) - await self.load(app_id, file_checker.hashes[f"{APP_DIR}/{app_id}"]) + await self.load(app_id, file_checker.hashes[f"app/{app_id}"]) async def delete(self, app_id: str, *, is_reload: bool = False) -> None: - """删除App,并更新数据库 + """ + 删除App,并更新数据库 :param app_id: 应用 ID """ @@ -119,17 +118,8 @@ class AppLoader: except Exception: logger.exception("[AppLoader] MongoDB删除App失败") - session = await PostgreSQL.get_session() - try: - await session.execute(delete(AppPoolVector).where(AppPoolVector.id == app_id)) - await session.commit() - except Exception: - logger.exception("[AppLoader] PostgreSQL删除App失败") - - await session.aclose() - if not is_reload: - app_path = Path(config["SEMANTICS_DIR"]) / APP_DIR / app_id + app_path = Path(Config().get_config().deploy.data_dir) / "semantics" / "app" / app_id if await app_path.exists(): shutil.rmtree(str(app_path), ignore_errors=True) @@ -167,21 +157,3 @@ class AppLoader: ) except Exception: logger.exception("[AppLoader] 更新 MongoDB 失败") - - # 向量化所有数据并保存 - session = await PostgreSQL.get_session() - service_embedding = await PostgreSQL.get_embedding([metadata.description]) - insert_stmt = ( - insert(AppPoolVector) - .values( - id=metadata.id, - embedding=service_embedding[0], - ) - .on_conflict_do_update( - index_elements=["id"], - set_={"embedding": service_embedding[0]}, - ) - ) - await session.execute(insert_stmt) - await session.commit() - await session.aclose() \ No newline at end of file -- Gitee