diff --git a/apps/common/task.py b/apps/common/task.py index a7c9f5181455fd393ae13e3ca57dfb516062b3b0..9443cd5940f1298bfa6f9e07ab5a450adbfda29a 100644 --- a/apps/common/task.py +++ b/apps/common/task.py @@ -137,7 +137,7 @@ class Task: err = f"Task {task_id} not found" raise ValueError(err) - await task_collection.update_one({"_id": task_id}, {"$set": task.model_dump(by_alias=True)}, upsert=True) + await task_collection.update_one({"_id": task_id}, {"$set": task.model_dump(by_alias=True, exclude_none=True)}, upsert=True) # 从task_map中删除任务块,释放内存 del self._task_map[task_id] diff --git a/apps/constants.py b/apps/constants.py index af64c8776e7c794353032f5364f7efbcd11f8a2f..e16e834686242e4d4d42cbf8d7309407f38b6bde 100644 --- a/apps/constants.py +++ b/apps/constants.py @@ -14,8 +14,8 @@ SLIDE_WINDOW_TIME = 60 SLIDE_WINDOW_QUESTION_COUNT = 10 # API Call 最大返回值长度(字符) MAX_API_RESPONSE_LENGTH = 8192 -# Scheduler最大历史轮次 -MAX_SCHEDULER_HISTORY_SIZE = 3 +# Executor最大步骤历史数 +STEP_HISTORY_SIZE = 3 # 语义接口目录中工具子目录 CALL_DIR = "call" # 语义接口目录中服务子目录 diff --git a/apps/entities/flow.py b/apps/entities/flow.py index e40a37e0f06ad1585ef21d467537f94b8b087fa0..199912b8772b8c020d23f0de4008bf7c93fee1e3 100644 --- a/apps/entities/flow.py +++ b/apps/entities/flow.py @@ -32,7 +32,6 @@ class Edge(BaseModel): class Step(BaseModel): """Flow中Step的数据""" - id: str = Field(description="Step的ID") node: str = Field(description="Step的Node ID") type: str = Field(description="Step的类型") name: str = Field(description="Step的名称") @@ -54,7 +53,7 @@ class Flow(BaseModel): name: str = Field(description="Flow的名称", min_length=1) description: str = Field(description="Flow的描述") on_error: FlowError = FlowError(use_llm=True) - steps: list[Step] = Field(description="节点列表", default=[]) + steps: dict[str, Step] = Field(description="节点列表", default=[]) edges: list[Edge] = Field(description="边列表", default=[]) debug: bool = Field(description="是否经过调试", default=False) diff --git a/apps/entities/node.py b/apps/entities/node.py new file mode 100644 index 0000000000000000000000000000000000000000..530fe6717536a23d4401b1be4a3b0c0161747fb8 --- /dev/null +++ b/apps/entities/node.py @@ -0,0 +1,28 @@ +"""Node实体类""" +from typing import Any, Optional + +from pydantic import BaseModel, Field + +from apps.entities.pool import NodePool + + +class APINodeInput(BaseModel): + """API节点覆盖输入""" + + param_schema: Optional[dict[str, Any]] = Field(description="API节点输入参数Schema", default=None) + body_schema: Optional[dict[str, Any]] = Field(description="API节点输入请求体Schema", default=None) + +class APINodeOutput(BaseModel): + """API节点覆盖输出""" + + resp_schema: Optional[dict[str, Any]] = Field(description="API节点输出Schema", default=None) + + +class APINode(NodePool): + """API节点""" + + call_id: str = "API" + override_input: Optional[APINodeInput] = Field(description="API节点输入覆盖", default=None) + override_output: Optional[APINodeOutput] = Field(description="API节点输出覆盖", default=None) + + diff --git a/apps/llm/patterns/select.py b/apps/llm/patterns/select.py index 58baa7c59180d0c2d44e06efec526b42f7a97168..ca39fed460ed581342d98de64d98f493cf3cfc59 100644 --- a/apps/llm/patterns/select.py +++ b/apps/llm/patterns/select.py @@ -15,38 +15,47 @@ from apps.llm.reasoning import ReasoningLLM class Select(CorePattern): """通过投票选择最佳答案""" - system_prompt: str = r"" - """系统提示词""" - user_prompt: str = r""" - 根据历史对话(包括工具调用结果)和用户问题,从给出的选项列表中,选出最符合要求的那一项。 - 在输出之前,请先思考,并使用“”标签给出思考过程。 - - ==样例== - 用户问题: 使用天气API,查询明天杭州的天气信息 - - 选项列表: - - [API] 请求特定API,获得返回的JSON数据 - - [SQL] 查询数据库,获得数据库表中的数据 - - - API 工具可以通过 API 来获取外部数据,而天气信息可能就存储在外部数据中,由于用户说明中明确提到了天气 API 的使用,因此应该优先使用 API 工具。\ - SQL 工具用于从数据库中获取信息,考虑到天气数据的可变性和动态性,不太可能存储在数据库中,因此 SQL 工具的优先级相对较低,\ - 最佳选择似乎是“API:请求特定 API,获取返回的 JSON 数据”。 - - - 最符合要求的选项是: - API - ==结束样例== - - 用户问题: {question} - - 选项列表: - {choice_list} + + + 根据历史对话(包括工具调用结果)和用户问题,从给出的选项列表中,选出最符合要求的那一项。 + 在输出之前,请先思考,并使用“”标签给出思考过程。 + + + + + 使用天气API,查询明天杭州的天气信息 + + + [API] 请求特定API,获得返回的JSON数据 + [SQL] 查询数据库,获得数据库表中的数据 + + + + + API 工具可以通过 API 来获取外部数据,而天气信息可能就存储在外部数据中,由于用户说明中明确提到了天气 API 的使用,因此应该优先使用 API 工具。\ + SQL 工具用于从数据库中获取信息,考虑到天气数据的可变性和动态性,不太可能存储在数据库中,因此 SQL 工具的优先级相对较低,\ + 最佳选择似乎是“API:请求特定 API,获取返回的 JSON 数据”。 + + + + API + + + + + + + {question} + + + + {choice_list} + + - 思考: - 让我们一步一步思考。 + 让我们一步一步思考。 """ """用户提示词""" diff --git a/apps/manager/node.py b/apps/manager/node.py new file mode 100644 index 0000000000000000000000000000000000000000..05f0a3d76dce89b9e80f7c1a71611ae59409ecd5 --- /dev/null +++ b/apps/manager/node.py @@ -0,0 +1,16 @@ +"""Node管理器""" +from apps.models.mongo import MongoDB + + +class NodeManager: + """Node管理器""" + + @staticmethod + async def get_node_call_id(node_id: str) -> str: + """获取Node的call_id""" + node_collection = MongoDB().get_collection("node") + node = await node_collection.find_one({"id": node_id}, {"call_id": 1}) + if not node: + err = f"[NodeManager] Node {node_id} not found." + raise ValueError(err) + return node["call_id"] diff --git a/apps/routers/user.py b/apps/routers/user.py index 8fe0431628cdebe8d433d7bbcc560034100d57bb..ab6c85effa9d0fd70f22b79415f10c3639adddf1 100644 --- a/apps/routers/user.py +++ b/apps/routers/user.py @@ -1,11 +1,9 @@ from typing import Annotated -from fastapi import APIRouter, Depends, HTTPException, status -from fastapi.responses import JSONResponse, StreamingResponse +from fastapi import APIRouter, Depends, status +from fastapi.responses import JSONResponse -from apps.common.queue import MessageQueue -from apps.common.wordscheck import WordsCheck from apps.constants import LOGGER from apps.dependency import ( get_session, @@ -13,13 +11,9 @@ from apps.dependency import ( verify_csrf_token, verify_user, ) -from apps.entities.request_data import RequestData -from apps.entities.response_data import ResponseData, UserGetMsp, UserGetRsp +from apps.entities.response_data import UserGetMsp, UserGetRsp from apps.entities.user import UserInfo -from apps.manager.appcenter import AppCenterManager from apps.manager.user import UserManager -from apps.scheduler.scheduler import Scheduler -from apps.service.activity import Activity router = APIRouter( prefix="/api/user", diff --git a/apps/scheduler/call/api.py b/apps/scheduler/call/api.py index 8405e7bebbe623c4b7886490773b266b5e811cf6..8035ec4768b34c48cecb50a181ba93cc44dd66a3 100644 --- a/apps/scheduler/call/api.py +++ b/apps/scheduler/call/api.py @@ -128,11 +128,6 @@ class API(metaclass=CoreCall, param_cls=APIParams, output_cls=_APIOutput): response_status = response.status response_data = await response.text() - # 返回值只支持JSON的情况 - if "responses" in self._spec[2]: - response_schema = self._spec[2]["responses"]["content"]["application/json"]["schema"] - else: - response_schema = {} LOGGER.info(f"调用接口{params.url}, 结果为 {response_data}") # 组装message diff --git a/apps/scheduler/call/direct.py b/apps/scheduler/call/direct.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index 97922ca8f56ac0cc9a253840742ddba044580075..6472d9f6ed216498ada0ff4f9c5b9ca532432a75 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -7,7 +7,7 @@ from typing import Any, Optional import ray -from apps.constants import LOGGER, MAX_SCHEDULER_HISTORY_SIZE +from apps.constants import LOGGER, STEP_HISTORY_SIZE from apps.entities.enum_var import StepStatus from apps.entities.flow import Step from apps.entities.scheduler import ( @@ -17,6 +17,7 @@ from apps.entities.scheduler import ( from apps.entities.task import ExecutorState, TaskBlock from apps.llm.patterns import ExecutorThought from apps.llm.patterns.executor import ExecutorBackground +from apps.manager.node import NodeManager from apps.manager.task import TaskManager from apps.scheduler.executor.message import ( push_flow_start, @@ -42,11 +43,12 @@ class Executor: async def load_state(self, sysexec_vars: SysExecVars) -> None: """从JSON中加载FlowExecutor的状态""" # 获取Task - task_pool = ray.get_actor("task") - task = await task_pool.get_task.remote(sysexec_vars.task_id) - if not task: - err = "[Executor] Task error." - raise ValueError(err) + task_actor = ray.get_actor("task") + try: + self._task: TaskBlock = await task_actor.get_task.remote(sysexec_vars.task_id) + except Exception as e: + err = f"[Executor] Task error. {e!s}" + raise ValueError(err) from e # 加载Flow信息 pool = ray.get_actor("pool") @@ -67,12 +69,12 @@ class Executor: self._flow_data = flow_data # 尝试恢复State - if task.flow_state: - self.flow_state = task.flow_state + if self._task.flow_state: + self.flow_state = self._task.flow_state # 如果flow_context为空,则从flow_history中恢复 - if not task.flow_context: - task.flow_context = await TaskManager.get_flow_history_by_task_id(self._vars.task_id) - task.new_context = [] + if not self._task.flow_context: + self._task.flow_context = await TaskManager.get_flow_history_by_task_id(self._vars.task_id) + self._task.new_context = [] else: # 创建ExecutorState self.flow_state = ExecutorState( @@ -86,7 +88,7 @@ class Executor: ) # 是否结束运行 self._stop = False - await task.set_task(self._vars.task_id, task) + await task_actor.set_task.remote(self._vars.task_id, self._task) async def _get_last_output(self, task: TaskBlock) -> dict[str, Any]: @@ -96,15 +98,8 @@ class Executor: return CallResult(**task.flow_context[self.flow_state.step_id].output_data) - async def _run_step(self, step_data: Step) -> dict[str, Any]: # noqa: PLR0915 + async def _run_step(self, step_data: Step) -> dict[str, Any]: """运行单个步骤""" - # 获取Task - task_pool = ray.get_actor("task") - task = await task_pool.get_task.remote(self._vars.task_id) - if not task: - err = "[Executor] Task error." - raise ValueError(err) - # 更新State self.flow_state.step_id = step_data.name self.flow_state.status = StepStatus.RUNNING @@ -112,20 +107,22 @@ class Executor: # Call类型为none,直接错误 node_id = step_data.node if node_id == "none": - self.flow_state.status = StepStatus.ERROR return {} + # 获取对应Node的call_id + call_id = await NodeManager.get_node_call_id(node_id) # 从Pool中获取对应的Call - call_data, call_cls = Pool().get_call(node_id, self.flow_state.app_id) - if call_data is None or call_cls is None: - err = f"[FlowExecutor] 尝试执行工具{node_id}时发生错误:找不到该工具。\n{traceback.format_exc()}" - LOGGER.error(err) + pool = ray.get_actor("pool") + try: + call_data, call_cls = await pool.get_call.remote(call_id, self.flow_state.app_id) + except Exception as e: + LOGGER.error(f"[FlowExecutor] 尝试执行工具{node_id}时发生错误:{e}。\n{traceback.format_exc()}") self.flow_state.status = StepStatus.ERROR return {} # 准备history - history = list(task.flow_context.values()) - length = min(MAX_SCHEDULER_HISTORY_SIZE, len(history)) + history = list(self._task.flow_context.values()) + length = min(STEP_HISTORY_SIZE, len(history)) history = history[-length:] # 准备SysCallVars diff --git a/apps/scheduler/executor/message.py b/apps/scheduler/executor/message.py index b5c0e6efa9eb79450d716e48ba8f545519a0736f..0aa7a87d522f91565a48a146f60148bf20ede7f3 100644 --- a/apps/scheduler/executor/message.py +++ b/apps/scheduler/executor/message.py @@ -53,12 +53,12 @@ async def push_step_input(task_id: str, queue: MessageQueue, state: ExecutorStat err = "当前步骤不存在错误处理步骤!" raise ValueError(err) content = StepInputContent( - callType=flow.on_error, + callType="llm", params=state.slot_data, ) else: content = StepInputContent( - callType=flow.steps[state.step_id].call_type, + callType=flow.steps[state.step_id].node, params=state.slot_data, ) # 推送消息 @@ -86,7 +86,7 @@ async def push_step_output(task_id: str, queue: MessageQueue, state: ExecutorSta # 组装消息;只保留message和output content = StepOutputContent( - callType=flow.steps[state.step_id].call_type, + callType=flow.steps[state.step_id].node, message=output["message"] if output and "message" in output else "", output=output["output"] if output and "output" in output else {}, ) @@ -133,7 +133,7 @@ async def push_flow_stop(task_id: str, queue: MessageQueue, state: ExecutorState ).model_dump(exclude_none=True, by_alias=True) elif call_type == "render": # 如果当前Flow是图表,则推送Chart - chart_option = CallResult(**task.flow_context[state.step_id].output_data).output + chart_option = task.flow_context[state.step_id].output_data["output"] content = FlowStopContent( type=FlowOutputType.CHART, data=chart_option, diff --git a/apps/scheduler/pool/loader/openapi.py b/apps/scheduler/pool/loader/openapi.py index 487c6b20cb6a0e2374ce38f48363c5e154214456..9a90a4eae46294855697a84879aa023777f048f1 100644 --- a/apps/scheduler/pool/loader/openapi.py +++ b/apps/scheduler/pool/loader/openapi.py @@ -12,7 +12,7 @@ from anyio import Path from apps.constants import LOGGER from apps.entities.enum_var import ContentType, HTTPMethod from apps.entities.flow import ServiceMetadata -from apps.entities.pool import NodePool +from apps.entities.node import APINode, APINodeInput, APINodeOutput from apps.scheduler.openapi import ( ReducedOpenAPIEndpoint, ReducedOpenAPISpec, @@ -54,7 +54,7 @@ class OpenAPILoader: return schema - async def _get_api_data(self, spec: ReducedOpenAPIEndpoint, service_metadata: ServiceMetadata) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]: + async def _get_api_data(self, spec: ReducedOpenAPIEndpoint, service_metadata: ServiceMetadata) -> tuple[APINodeInput, APINodeOutput, dict[str, Any]]: """从OpenAPI文档中获取API数据""" try: method = HTTPMethod[spec.method.upper()] @@ -79,45 +79,23 @@ class OpenAPILoader: LOGGER.error(msg=err) raise RuntimeError(err) - input_schema = { - "type": "object", - "properties": { - "url_parameters": { - "type": "object", - "properties": {}, - "required": [], - }, - "post_body": { - "type": "object", - "properties": {}, - "required": [], - }, - }, - "required": ["url_parameters", "post_body"], - } - try: - input_schema["properties"]["url_parameters"] = await self.parameters_to_spec(spec.spec["parameters"]) - except KeyError: - err = f"接口{spec.name}不存在URL参数定义" - LOGGER.error(msg=err) - - try: - input_schema["properties"]["post_body"] = spec.spec["requestBody"]["content"][content_type]["schema"] + inp = APINodeInput( + param_schema=await self.parameters_to_spec(spec.spec["parameters"]) if "parameters" in spec.spec else None, + body_schema=spec.spec["requestBody"]["content"][content_type]["schema"] if "requestBody" in spec.spec else None, + ) except KeyError: - err = f"接口{spec.name}不存在请求体定义" + err = f"接口{spec.name}请求体定义错误" LOGGER.error(msg=err) try: - output_schema = spec.spec["responses"]["200"]["content"]["application/json"]["schema"] + out = APINodeOutput( + resp_schema=spec.spec["responses"]["200"]["content"]["application/json"]["schema"], + ) except KeyError: err = f"接口{spec.name}不存在响应体定义" LOGGER.error(msg=err) - output_schema = { - "type": "object", - "properties": {}, - "required": [], - } + out = APINodeOutput() known_params = { "url": url, @@ -125,15 +103,15 @@ class OpenAPILoader: "content_type": content_type, } - return input_schema, output_schema, known_params + return inp, out, known_params - async def _process_spec(self, service_id: str, yaml_filename: str, spec: ReducedOpenAPISpec, service_metadata: ServiceMetadata) -> list[NodePool]: + async def _process_spec(self, service_id: str, yaml_filename: str, spec: ReducedOpenAPISpec, service_metadata: ServiceMetadata) -> list[APINode]: """将OpenAPI文档拆解为Node""" nodes = [] for api_endpoint in spec.endpoints: # 组装新的NodePool item - node = NodePool( + node = APINode( _id=str(uuid.uuid4()), name=api_endpoint.name, # 此处固定Call的ID是"API" @@ -149,7 +127,7 @@ class OpenAPILoader: return nodes - async def load_one(self, service_id: str, yaml_path: Path, service_metadata: ServiceMetadata) -> list[NodePool]: + async def load_one(self, service_id: str, yaml_path: Path, service_metadata: ServiceMetadata) -> list[APINode]: """加载单个OpenAPI文档,可以直接指定路径""" try: spec = await self._read_yaml(yaml_path) diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index 61afaf8b396358e4b7d0de0bb0bb810d041d2532..0a2ecd2a13cb4f1a024767330a4ff1c7af14e4bb 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -46,8 +46,10 @@ class ServiceLoader: ] nodes = (await asyncio.gather(*nodes))[0] # 更新数据库 + nodes = [NodePool(**node.model_dump(exclude_none=True, by_alias=True)) for node in nodes] await self._update_db(nodes, metadata) + async def _update_db(self, nodes: list[NodePool], metadata: ServiceMetadata) -> None: """更新数据库""" # 更新MongoDB diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index 82b5e6dc558deac121728b464b261187b7ea8cba..d9c7a9a17a10774d628d6183a04e9ca6b4f9f22a 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -24,11 +24,10 @@ from apps.entities.scheduler import ExecutorBackground, SysExecVars from apps.entities.task import RequestDataApp from apps.manager.document import DocumentManager from apps.manager.record import RecordManager +from apps.manager.task import TaskManager from apps.manager.user import UserManager from apps.scheduler.executor import Executor from apps.scheduler.scheduler.context import generate_facts, get_context - -# from apps.scheduler.scheduler.flow import choose_flow from apps.scheduler.scheduler.message import ( push_document_message, push_init_message, @@ -90,8 +89,6 @@ class Scheduler: top_k=5, ) - # 状态位:是否需要生成推荐问题? - need_recommend = True # 如果是智能问答,直接执行 if not post_body.app or post_body.app.app_id == "": await push_init_message(self._task_id, self._queue, post_body, is_flow=False) @@ -112,13 +109,8 @@ class Scheduler: facts=facts, ) - # 生成推荐问题和事实提取 - # 如果需要生成推荐问题,则生成 - # routine_results = await asyncio.gather(generate_facts(self._task_id, post_body.question)) - - # 保存事实信息 - # self._facts = routine_results[0] - self._facts = [] + # 记忆提取 + self._facts = await generate_facts(self._task_id, post_body.question) # 发送结束消息 await self._queue.push_output(event_type=EventType.DONE, data={}) @@ -129,10 +121,11 @@ class Scheduler: await self._queue.close() - async def run_executor(self, session_id: str, post_body: RequestData, background: ExecutorBackground, user_selected_flow: RequestDataApp) -> bool: + async def run_executor(self, session_id: str, post_body: RequestData, background: ExecutorBackground, selected_flow: RequestDataApp) -> bool: """构造FlowExecutor,并执行所选择的流""" # 获取当前Task - task = await Task.get_task(self._task_id) + task_pool = ray.get_actor("task") + task = await task_pool.get_task.remote(self._task_id) if not task: err = "[Scheduler] Task error." raise ValueError(err) @@ -143,7 +136,7 @@ class Scheduler: question=post_body.question, task_id=self._task_id, session_id=session_id, - app_data=user_selected_flow, + app_data=selected_flow, background=background, ) diff --git a/sample/app/test_app/flows/test.yaml b/sample/app/test_app/flows/test.yaml index d25e4bfbeb46abf71742224b5c6f3f74354d4439..87393f33af6f24b7ee0c1129d3ad8fb4d57bb2fa 100644 --- a/sample/app/test_app/flows/test.yaml +++ b/sample/app/test_app/flows/test.yaml @@ -15,7 +15,7 @@ on_error: # 各个节点定义 steps: - - id: query_data # 节点的Pool ID + query_data: # 节点的Pool ID node: api name: 查询数据 # 节点名称 description: 从API中查询测试数据 # 节点描述 @@ -24,7 +24,7 @@ steps: y: 100 params: # 节点的参数 endpoint: GET /api/test # API Endpoint名称 - - id: check_data + check_data: node: choice name: 判断数据 description: 判断工具的返回值是否包含有效数据 @@ -38,7 +38,7 @@ steps: description: 返回值存在有效数据 # 选项说明,满足就会选择此项 - branch: invalid description: 返回值不存在有效数据 - - id: gen_reply + gen_reply: node: llm name: 生成回复 description: 使用大模型生成回复 @@ -62,7 +62,7 @@ steps: 测试数据:{{ storage[-1].output.result.machines[0].data }} 使用自然语言解释这一信息,并展示为Markdown列表。 - - id: format_output + format_output: node: convert name: 格式化输出 description: 按照特定格式输出 @@ -89,7 +89,7 @@ steps: "time": extras.time, "machines": [x for x.id in storage[-1].output.result.machines] } - - id: gen_suggest + gen_suggest: node: suggest name: 问题推荐 description: 推荐问题