From 858d93ec5f1fdbd6be1b213b4ec8df4b14fdb0fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E9=B8=BF=E5=AE=87?= Date: Mon, 24 Feb 2025 16:12:56 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20ServiceLoader=20=E4=BF=9D=E5=AD=98?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=EF=BC=88=E9=99=84=E5=B8=A6clean=20code?= =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 史鸿宇 --- apps/scheduler/pool/loader/service.py | 68 +++++++++++++++++---------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index a4b8d6d91..61afaf8b3 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -2,6 +2,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ + import asyncio import uuid @@ -39,13 +40,14 @@ class ServiceLoader: # 载入OpenAPI文档,获取Node列表 openapi_loader = OpenAPILoader.remote() - nodes = [openapi_loader.load_one.remote(service_id, yaml_path, metadata) # type: ignore[arg-type] - async for yaml_path in (service_path / "openapi").rglob("*.yaml")] + nodes = [ + openapi_loader.load_one.remote(service_id, yaml_path, metadata) # type: ignore[arg-type] + async for yaml_path in (service_path / "openapi").rglob("*.yaml") + ] nodes = (await asyncio.gather(*nodes))[0] # 更新数据库 await self._update_db(nodes, metadata) - async def _update_db(self, nodes: list[NodePool], metadata: ServiceMetadata) -> None: """更新数据库""" # 更新MongoDB @@ -53,15 +55,25 @@ class ServiceLoader: node_collection = MongoDB.get_collection("node") try: # 部分映射 ServiceMetadata 到 ServicePool, 忽略其他字段 - await service_collection.update_one({"_id": metadata.id}, {"$set": { - "name": metadata.name, - "description": metadata.description, - "author": metadata.author, - "permission": metadata.permission, - "hashes": metadata.hashes, - }}, upsert=True) + await service_collection.update_one( + {"_id": metadata.id}, + { + "$set": { + "name": metadata.name, + "description": metadata.description, + "author": metadata.author, + "permission": metadata.permission, + "hashes": metadata.hashes, + }, + }, + upsert=True, + ) for node in nodes: - await node_collection.update_one({"_id": node.id}, {"$set": node.model_dump(exclude_none=True, by_alias=True)}, upsert=True) + await node_collection.update_one( + {"_id": node.id}, + {"$set": node.model_dump(exclude_none=True, by_alias=True)}, + upsert=True, + ) except Exception as e: err = f"更新MongoDB失败:{e}" LOGGER.error(err) @@ -70,12 +82,16 @@ class ServiceLoader: # 向量化所有数据并保存 session = await PostgreSQL.get_session() service_embedding = await PostgreSQL.get_embedding([metadata.description]) - insert_stmt = insert(ServicePoolVector).values( - id=metadata.id, - embedding=service_embedding[0], - ).on_conflict_do_update( - index_elements=["id"], - set_={"embedding": service_embedding[0]}, + insert_stmt = ( + insert(ServicePoolVector) + .values( + id=metadata.id, + embedding=service_embedding[0], + ) + .on_conflict_do_update( + index_elements=["id"], + set_={"embedding": service_embedding[0]}, + ) ) await session.execute(insert_stmt) @@ -85,12 +101,16 @@ class ServiceLoader: node_vecs = await PostgreSQL.get_embedding(node_descriptions) for i, data in enumerate(node_vecs): - insert_stmt = insert(NodePoolVector).values( - id=nodes[i].id, - embedding=data, - ).on_conflict_do_update( - index_elements=["id"], - set_={"embedding": data}, + insert_stmt = ( + insert(NodePoolVector) + .values( + id=nodes[i].id, + embedding=data, + ) + .on_conflict_do_update( + index_elements=["id"], + set_={"embedding": data}, + ) ) await session.execute(insert_stmt) @@ -111,7 +131,7 @@ class ServiceLoader: call_id="api", ) nodes.append(node_data) - + await self._update_db(nodes, metadata) async def delete(self, service_id: str) -> None: """删除Service,并更新数据库""" -- Gitee