diff --git a/apps/common/queue.py b/apps/common/queue.py index 0965fc6363ef6db9b4d82431c8746343faabee5f..e5ad4e1bc9cc1819112313c9506600cfb6383a63 100644 --- a/apps/common/queue.py +++ b/apps/common/queue.py @@ -78,7 +78,7 @@ class MessageQueue: async def get(self) -> AsyncGenerator[str, None]: """从Queue中获取消息;变为async generator""" while True: - if self._close: + if self._close and self._queue.empty(): break try: diff --git a/apps/manager/service.py b/apps/manager/service.py index 683d573c8913a955acaa7e4297daff282a816ad3..8d3a0d6d2d420a4b47c8f55630a98c61eae68684 100644 --- a/apps/manager/service.py +++ b/apps/manager/service.py @@ -68,9 +68,8 @@ class ServiceCenterManager: page_size: int, ) -> tuple[list[ServiceCardItem], int]: """获取用户创建的服务""" - if search_type == SearchType.AUTHOR: - if keyword is not None and keyword not in user_sub: - return [], 0 + if search_type == SearchType.AUTHOR and keyword is not None and keyword not in user_sub: + return [], 0 base_filter = {"author": user_sub} filters = ServiceCenterManager._build_filters(base_filter, search_type, keyword) if keyword else base_filter service_pools, total_count = await ServiceCenterManager._search_service(filters, page, page_size) diff --git a/apps/routers/chat.py b/apps/routers/chat.py index c00e0a4098ef6411f65d4d5ef2b54540a2f5c479..f4af2ba507330504d6d78df549bec883e7e96adf 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -23,7 +23,6 @@ from apps.dependency import ( ) from apps.entities.request_data import RequestData from apps.entities.response_data import ResponseData -from apps.manager.flow import FlowManager from apps.manager.blacklist import QuestionBlacklistManager, UserBlacklistManager from apps.manager.flow import FlowManager from apps.manager.task import TaskManager @@ -81,8 +80,6 @@ async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) break yield "data: " + content + "\n\n" - import time - time.sleep(0.3) # 等待Scheduler运行完毕 await scheduler_task diff --git a/apps/scheduler/call/api/api.py b/apps/scheduler/call/api/api.py index dc7bf0e569f5fadde8fa66247f8230033b28d4a9..d4db20faf6355bc3cf16ecff29c51dab2c58a672 100644 --- a/apps/scheduler/call/api/api.py +++ b/apps/scheduler/call/api/api.py @@ -50,6 +50,8 @@ SUCCESS_HTTP_CODES = [ class API(CoreCall, input_model=APIInput, output_model=APIOutput): """API调用工具""" + enable_filling: bool = Field(description="是否需要进行自动参数填充", default=True) + url: str = Field(description="API接口的完整URL") method: HTTPMethod = Field(description="API接口的HTTP Method") content_type: ContentType = Field(description="API接口的Content-Type") @@ -99,8 +101,9 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): """调用API,然后返回LLM解析后的数据""" self._session = aiohttp.ClientSession() self._timeout = aiohttp.ClientTimeout(total=self.timeout) + input_obj = APIInput.model_validate(input_data) try: - result = await self._call_api(input_data) + result = await self._call_api(input_obj) yield CallOutputChunk( type=CallOutputType.DATA, content=result.model_dump(exclude_none=True, by_alias=True), @@ -109,7 +112,7 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): await self._session.close() - async def _make_api_call(self, data: dict | None, files: aiohttp.FormData): # noqa: ANN202, C901 + async def _make_api_call(self, data: APIInput, files: aiohttp.FormData): # noqa: ANN202, C901 """组装API请求Session""" # 获取必要参数 req_header = { @@ -118,8 +121,6 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): req_cookie = {} req_params = {} - data = data or {} - if self._auth: if self._auth.header: for item in self._auth.header: @@ -143,7 +144,7 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): HTTPMethod.GET.value, HTTPMethod.DELETE.value, ]: - req_params.update(data) + req_params.update(data.query) return self._session.request( self.method, self.url, @@ -158,12 +159,13 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): HTTPMethod.PUT.value, HTTPMethod.PATCH.value, ]: + req_body = data.body if self.content_type in [ ContentType.FORM_URLENCODED.value, ContentType.MULTIPART_FORM_DATA.value, ]: form_data = files - for key, val in data.items(): + for key, val in req_body.items(): form_data.add_field(key, val) return self._session.request( self.method, @@ -176,7 +178,7 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): return self._session.request( self.method, self.url, - json=data, + json=req_body, headers=req_header, cookies=req_cookie, timeout=self._timeout, @@ -187,14 +189,14 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): data={}, ) - async def _call_api(self, final_data: dict[str, Any] | None = None) -> APIOutput: + async def _call_api(self, final_data: APIInput) -> APIOutput: """实际调用API,并处理返回值""" # 获取必要参数 logger.info("[API] 调用接口 %s,请求数据为 %s", self.url, final_data) session_context = await self._make_api_call(final_data, aiohttp.FormData()) async with session_context as response: - if response.status in SUCCESS_HTTP_CODES: + if response.status not in SUCCESS_HTTP_CODES: text = f"API发生错误:API返回状态码{response.status}, 原因为{response.reason}。" logger.error(text) raise CallError( diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py index 1ab76c808fe88ad8bf749c9a8f4d06eb05bca8c6..f7e09cea06595c7d74813dbf446423c53259e345 100644 --- a/apps/scheduler/call/core.py +++ b/apps/scheduler/call/core.py @@ -57,6 +57,7 @@ class CoreCall(BaseModel): ) to_user: bool = Field(description="是否需要将输出返回给用户", default=False) + enable_filling: bool = Field(description="是否需要进行自动参数填充", default=False) model_config = ConfigDict( arbitrary_types_allowed=True, diff --git a/apps/scheduler/call/llm/llm.py b/apps/scheduler/call/llm/llm.py index 6bb3ad1cf5fa9e9fd8657a29b7c12777331020a5..60c25370471493572cb25aa4277d9bf0a018078f 100644 --- a/apps/scheduler/call/llm/llm.py +++ b/apps/scheduler/call/llm/llm.py @@ -31,6 +31,9 @@ logger = logging.getLogger(__name__) class LLM(CoreCall, input_model=LLMInput, output_model=LLMOutput): """大模型调用工具""" + to_user: bool = Field(default=True) + + # 大模型参数 temperature: float = Field(description="大模型温度(随机化程度)", default=0.7) enable_context: bool = Field(description="是否启用上下文", default=True) step_history_size: int = Field(description="上下文信息中包含的步骤历史数量", default=3, ge=1, le=10) diff --git a/apps/scheduler/executor/step.py b/apps/scheduler/executor/step.py index 999df2ecaf339df760a49de84748a14ec5eb25ee..9ee1dac5b8834db476c5666aa77ae13cd44122b6 100644 --- a/apps/scheduler/executor/step.py +++ b/apps/scheduler/executor/step.py @@ -9,7 +9,6 @@ from pydantic import ConfigDict from apps.entities.enum_var import ( EventType, - SpecialCallType, StepStatus, ) from apps.entities.message import TextAddContent @@ -98,15 +97,8 @@ class StepExecutor(BaseExecutor): # 判断State是否为空 self.validate_flow_state(self.task) - # 特殊步骤跳过填参 - if self.step.step.type in [ - SpecialCallType.SUMMARY.value, - SpecialCallType.FACTS.value, - SpecialCallType.SLOT.value, - SpecialCallType.EMPTY.value, - SpecialCallType.START.value, - SpecialCallType.END.value, - ]: + # 判断是否需要进行自动参数填充 + if not self.obj.enable_filling: return # 暂存旧数据 @@ -136,6 +128,7 @@ class StepExecutor(BaseExecutor): async for chunk in iterator: result: SlotOutput = SlotOutput.model_validate(chunk.content) + await self.push_message(EventType.STEP_OUTPUT.value, result.slot_data) self.obj.input.update(result.slot_data) # 恢复State @@ -182,6 +175,9 @@ class StepExecutor(BaseExecutor): self.validate_flow_state(self.task) logger.info("[StepExecutor] 运行步骤 %s", self.step.step.name) + # 进行自动参数填充 + await self._run_slot_filling() + # 推送输入 await self.push_message(EventType.STEP_INPUT.value, self.obj.input) diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index af2ad92cdf08bcd3ae473bb7ca5a66fdb43b87e8..a963669f4043a3867e38dd020e0ab626eba72c51 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -119,6 +119,7 @@ class Scheduler: return + async def run_executor( self, task: Task, queue: MessageQueue, post_body: RequestData, background: ExecutorBackground, ) -> None: