diff --git a/apps/entities/enum_var.py b/apps/entities/enum_var.py index fea0806e6ec1c2c28ad3980fba0037f78b78f76e..bb3cdfebd1f1bc57689f453fa17c7aac0f051743 100644 --- a/apps/entities/enum_var.py +++ b/apps/entities/enum_var.py @@ -77,3 +77,13 @@ class AppPermissionType(str, Enum): PROTECTED = "protected" PUBLIC = "public" PRIVATE = "private" + + +class EdgeType(str, Enum): + """边类型 + + 注:此处为临时定义,待扩展 + """ + + NORMAL = "normal" + LOOP = "loop" diff --git a/apps/entities/vector.py b/apps/entities/vector.py index 029283d52bf11b325570539a0477e2192b6ef4e6..13aea86ac578804f908196f547d0b0d93a1693ce 100644 --- a/apps/entities/vector.py +++ b/apps/entities/vector.py @@ -9,8 +9,8 @@ from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() -class FlowVector(Base): - """Flow向量信息""" +class AppVector(Base): + """App向量信息""" __tablename__ = "flow_vector" id = Column(String(length=100), primary_key=True, nullable=False, unique=True) @@ -25,9 +25,9 @@ class ServiceVector(Base): embedding = Column(Vector(1024), nullable=False) -class StepPoolVector(Base): - """StepPool向量信息""" +class NodeVector(Base): + """Node向量信息""" - __tablename__ = "step_pool_vector" + __tablename__ = "node_vector" id = Column(String(length=100), primary_key=True, nullable=False, unique=True) embedding = Column(Vector(1024), nullable=False) diff --git a/apps/llm/reasoning.py b/apps/llm/reasoning.py index d31bb56bde64b89cd6065de2992f2960d76f2733..7eb636716c349f7d7876cc78d956eae7eefce2e4 100644 --- a/apps/llm/reasoning.py +++ b/apps/llm/reasoning.py @@ -5,8 +5,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. from collections.abc import AsyncGenerator import tiktoken -from langchain_core.messages import ChatMessage -from langchain_openai import ChatOpenAI +from openai import AsyncOpenAI from apps.common.config import config from apps.common.singleton import Singleton @@ -20,12 +19,9 @@ class ReasoningLLM(metaclass=Singleton): def __init__(self) -> None: """判断配置文件里用了哪种大模型;初始化大模型客户端""" - self._client = ChatOpenAI( + self._client = AsyncOpenAI( api_key=config["LLM_KEY"], base_url=config["LLM_URL"], - model=config["LLM_MODEL"], - tiktoken_model_name="cl100k_base", - streaming=True, ) def _calculate_token_length(self, messages: list[dict[str, str]], *, pure_text: bool = False) -> int: @@ -38,6 +34,17 @@ class ReasoningLLM(metaclass=Singleton): result += len(self._encoder.encode(msg["content"])) return result + + def _validate_messages(self, messages: list[dict[str, str]]) -> list[dict[str, str]]: + """验证消息格式是否正确""" + if messages[0]["role"] != "system": + # 添加默认系统消息 + messages.insert(0, {"role": "system", "content": "You are a helpful assistant."}) + + if messages[-1]["role"] != "user": + raise ValueError("消息格式错误:最后一个消息必须是用户消息。") + + return messages async def call(self, task_id: str, messages: list[dict[str, str]], max_tokens: int = 8192, temperature: float = 0.07, *, streaming: bool = True) -> AsyncGenerator[str, None]: @@ -50,17 +57,30 @@ class ReasoningLLM(metaclass=Singleton): :param temperature: 模型温度(随机化程度) """ input_tokens = self._calculate_token_length(messages) - msg_list = [ChatMessage(content=msg["content"], role=msg["role"]) for msg in messages] + try: + msg_list = self._validate_messages(messages) + except ValueError as e: + raise ValueError(f"消息格式错误:{e}") from e + + stream = self._client.chat.completions.create( + model=config["LLM_MODEL"], + messages=msg_list, + stream=True, + max_tokens=max_tokens, + temperature=temperature, + ) if streaming: result = "" - async for chunk in self._client.astream(msg_list, max_tokens=max_tokens, temperature=temperature): # type: ignore[arg-type] - yield str(chunk.content) - result += str(chunk.content) + async for chunk in stream: + text = chunk.choices[0].delta.content or "" + yield text + result += text else: - result = await self._client.ainvoke(msg_list, max_tokens=max_tokens, temperature=temperature) # type: ignore[arg-type] - yield str(result.content) - result = str(result.content) + result = "" + async for chunk in stream: + result += chunk.choices[0].delta.content or "" + yield result output_tokens = self._calculate_token_length([{"role": "assistant", "content": result}], pure_text=True) await TaskManager.update_token_summary(task_id, input_tokens, output_tokens) diff --git a/apps/main.py b/apps/main.py index 4f2f8f3748ec2be400f4f8315f9c34dcd84c9b4a..1189366819eb382b74ebb9f392c8e2d3ba67bde6 100644 --- a/apps/main.py +++ b/apps/main.py @@ -26,7 +26,6 @@ from apps.routers import ( document, health, knowledge, - plugin, record, ) from apps.scheduler.pool.loader import Loader @@ -49,7 +48,6 @@ app.include_router(api_key.router) app.include_router(comment.router) app.include_router(record.router) app.include_router(health.router) -app.include_router(plugin.router) app.include_router(chat.router) app.include_router(client.router) app.include_router(blacklist.router) diff --git a/apps/routers/plugin.py b/apps/routers/plugin.py deleted file mode 100644 index 693cee187d62bc2fa8b145c59c016cf8977c1b2f..0000000000000000000000000000000000000000 --- a/apps/routers/plugin.py +++ /dev/null @@ -1,37 +0,0 @@ -"""FastAPI 插件信息接口 - -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. -""" -from fastapi import APIRouter, Depends, status -from fastapi.responses import JSONResponse - -from apps.dependency.user import verify_user -from apps.entities.response_data import GetPluginListMsg, GetPluginListRsp -from apps.scheduler.pool.pool import Pool - -router = APIRouter( - prefix="/api/plugin", - tags=["plugin"], - dependencies=[ - Depends(verify_user), - ], -) - - -# 前端展示插件详情 -@router.get("", response_model=GetPluginListRsp) -async def get_plugin_list(): # noqa: ANN201 - """获取插件列表""" - plugins = Pool().get_plugin_list() - return JSONResponse(status_code=status.HTTP_200_OK, content=GetPluginListRsp( - code=status.HTTP_200_OK, - message="success", - result=GetPluginListMsg(plugins=plugins), - ).model_dump(exclude_none=True, by_alias=True), - ) - -# TODO(zwt): 热重载插件 -# 004 -# @router.post("") -# async def reload_plugin(): -# pass diff --git a/apps/scheduler/pool/check.py b/apps/scheduler/pool/check.py new file mode 100644 index 0000000000000000000000000000000000000000..6270b3340c2502a417b43b2cece4ec6729596196 --- /dev/null +++ b/apps/scheduler/pool/check.py @@ -0,0 +1,74 @@ +"""文件检查器;检查文件是否存在、Hash是否发生变化;生成更新列表和删除列表 + +Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +""" +from pathlib import Path + +from apps.common.config import config +from apps.entities.enum_var import MetadataType +from apps.models.mongo import MongoDB +from apps.scheduler.pool.util import get_long_hash + + +class FileChecker: + """文件检查器""" + + def __init__(self) -> None: + """初始化文件检查器""" + self._hashes = {} + self._dir_path = Path(config["SERVICE_DIR"]) + + + def check_one(self, path: Path) -> None: + """检查单个App/Service文件是否有变动""" + if not path.exists(): + err = FileNotFoundError(f"File {path} not found") + raise err + if path.is_file(): + err = NotADirectoryError(f"File {path} is not a directory") + raise err + + for file in path.iterdir(): + if file.is_file(): + self._hashes[str(file.relative_to(self._dir_path))] = get_long_hash(file.read_bytes()) + elif file.is_dir(): + self.check_one(file) + + + def diff_one(self, path: Path, previous_hashes: dict[str, str]) -> bool: + """检查文件是否发生变化""" + self._hashes = {} + self.check_one(path) + return self._hashes != previous_hashes + + + async def diff(self, check_type: MetadataType) -> tuple[list[str], list[str]]: + """生成更新列表和删除列表""" + if check_type == MetadataType.APP: + collection = MongoDB.get_collection("app") + self._dir_path = Path(config["SERVICE_DIR"]) / "app" + elif check_type == MetadataType.SERVICE: + collection = MongoDB.get_collection("service") + self._dir_path = Path(config["SERVICE_DIR"]) / "service" + + changed_list = [] + deleted_list = [] + + try: + # 查询所有条目 + cursor = collection.find({}) + async for item in cursor: + hashes = item.get("hashes", {}) + # 判断是否存在? + if not (self._dir_path / item.get("name")).exists(): + deleted_list.append(item.get("name")) + continue + # 判断是否发生变化 + if self.diff_one(self._dir_path / item.get("name"), hashes): + changed_list.append(item.get("name")) + + except Exception as e: + err = f"Failed to check {check_type} files: {e!s}" + raise RuntimeError(err) from e + + return changed_list, deleted_list diff --git a/requirements.txt b/requirements.txt index 1c2d791b156978267e952dcd96e09ca878ff7ab1..889a1146cad116afb51c786952b54259dd8e4cc1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,11 +2,9 @@ JSON-minify==0.3.0 aiofiles==24.1.0 aiohttp==3.10.11 apscheduler==3.10.4 -asgiref==3.8.1 asyncer==0.0.8 coverage==7.6.4 cryptography==43.0.3 -eval-type-backport==0.2.0 fastapi==0.115.4 gunicorn==23.0.0 jinja2==3.1.4 @@ -15,17 +13,10 @@ jsonnet-binary==0.17.0 jsonschema==4.23.0 jieba==0.42.1 httpx==0.27.2 -langchain-community==0.3.5 -langchain-core==0.3.15 -langchain-openai==0.2.5 -langchain==0.3.7 -limits==3.7.0 minio==7.2.11 -numpy==1.26.4 ollama==0.4.4 openai==1.57.0 openpyxl==3.1.5 -paramiko==3.4.0 pgvector==0.3.6 pillow==11.1.0 psycopg2-binary==2.9.9 diff --git a/sample/apps/test_app/flows/flow.yaml b/sample/apps/test_app/flows/flow.yaml index c0efd3a3ccc71ecd460f39e2cc41c9055654c6aa..5d558dc40849792308049c571e03a1c62f539bc8 100644 --- a/sample/apps/test_app/flows/flow.yaml +++ b/sample/apps/test_app/flows/flow.yaml @@ -1,46 +1,51 @@ -# Flow ID -id: test -# Flow 描述 +# [必填] Flow 名称 +name: test +# [必填] Flow 描述。将影响大模型效果。 description: 测试工作流 -# Flow无法恢复时的错误处理步骤 +# Flow出错时的错误处理步骤 on_error: - # Call类型 - call_type: llm - # Call参数 - params: - system_prompt: 你是一个擅长Linux系统性能优化,且能够根据具体情况撰写分析报告的智能助手。 # 系统提示词,jinja2语法 - user_prompt: | # 用户提示词,jinja2语法,多行;有预定义变量:last - 最后一个Step报错后的数据 - {% if context %} - 背景信息: - {{ context }} - {% endif %} - - 错误信息: - {{ last.output }} + # [必填] 是否直接使用大模型自动向用户报错? + use_llm: false + # 不使用大模型时,使用以下模板拼接提示信息并返回 + output_format: | + **当前工作流{{ flow.name }}执行发生错误!** + + 错误信息:{{ error.message }} - 使用自然语言解释这一信息,并给出可能的解决方法。 -# 各个步骤定义 -steps: - - name: start # start步骤,入口点 - call_type: api # Call类型:API - confirm: true # 是否操作前向用户确认,默认为False - params: +# 各个节点定义 +nodes: + - id: query_data # 节点的Pool ID + name: 查询数据 # 节点名称 + description: 从API中查询测试数据 # 节点描述 + pos: # 节点在画布上的位置 + x: 100 + y: 100 + params: # 节点的参数 endpoint: GET /api/test # API Endpoint名称 - next: flow_choice # 下一个Step的名称 - - name: flow_choice - call_type: choice # Call类型:Choice + - id: check_data + name: 判断数据 + description: 判断工具的返回值是否包含有效数据 + pos: # 节点在画布上的位置 + x: 200 + y: 100 params: - propose: 工具的返回值是否包含有效数据? # 判断命题 + propose: 上一步返回值是否包含有效数据? # 判断命题 choices: # 判断选项 - - step: get_report # 跳转步骤 - description: 返回值存在有效数据时,选择此项 # 选项说明,满足就会选择此项 - - step: get_data - description: 返回值不存在有效数据时,选择此项 - - name: get_report - call_type: llm + - branch: valid + description: 返回值存在有效数据 # 选项说明,满足就会选择此项 + - branch: invalid + description: 返回值不存在有效数据 + - id: gen_reply + name: 生成回复 + description: 使用大模型生成回复 + pos: # 节点在画布上的位置 + x: 300 + y: 100 + depends: # 节点的后向依赖 + - format_output params: - system_prompt: 你是一个擅长Linux系统性能优化,且能够根据具体情况撰写分析报告的智能助手。 # 系统提示词,jinja2语法 - user_prompt: | # 用户提示词,jinja2语法,多行;可以使用step name引用对应的数据;可以使用storage[-1]引用上一个步骤的数据 + system: 你是一个擅长Linux系统性能优化,且能够根据具体情况撰写分析报告的智能助手。 # 系统提示词,jinja2语法 + user: | # 用户提示词,jinja2语法,多行;可以使用step name引用对应的数据;可以使用storage[-1]引用上一个步骤的数据 上下文: {{ context }} @@ -53,18 +58,14 @@ steps: 测试数据:{{ storage[-1].output.result.machines[0].data }} 使用自然语言解释这一信息,并展示为Markdown列表。 - next: end - - name: get_data - call_type: sql # Call类型:SQL - params: - statement: select * from test limit 30; # 固定的SQL语句;不设置则使用大模型猜解 - next: test - - name: test - call_type: render # Call类型:Render,没有参数 - - name: end - call_type: reformat # Call类型:Reformat,用于重新格式化数据 + - id: format_output + name: 格式化输出 + description: 按照特定格式输出 + pos: # 节点在画布上的位置 + x: 400 + y: 100 params: - text: | # 对生成的文字信息进行格式化,没有则不改动;jinja2语法 + message: | # 对生成的文字信息进行格式化,没有则不改动;jinja2语法 # 测试报告 声明: 这是一份由AI生成的报告,仅供参考。 @@ -76,15 +77,39 @@ steps: ## 数据解析 ...... {% endif %} - data: | # 对生成的原始数据(JSON)进行格式化,没有则不改动;jsonnet语法 + output: | # 对生成的结构化数据进行格式化,没有则不改动;jsonnet语法 # 注意: 只能使用storage访问之前的数据,不能通过step名访问;其他内容在extra变量中 { "id": storage[-1].id, "time": extras.time, "machines": [x for x.id in storage[-1].output.result.machines] } - + +# 各个边定义 +# 格式:边ID: 来源节点名称 --> 目标节点名称 +# 若来源节点有多个输出分支,则使用“节点名称.分支名称”表示 +edges: + - id: edge_00 + from: + to: query_data + type: normal # 边的类型,可不填 + - id: edge_01 + from: query_data + to: check_data + - id: edge_02 + from: check_data.valid + to: gen_reply + - id: edge_03 + from: check_data.invalid + to: format_output + - id: edge_04 + from: gen_reply + to: + - id: edge_05 + from: format_output + to: + # 手动设置Flow推荐 next_flow: - - id: test2 # 展示在推荐区域的Flow + - flow_id: test2 # 展示在推荐区域的Flow question: xxxxx # 固定的推荐问题