From f547dcb4eb2b88f95e77413aa157de596c0afce2 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Sun, 27 Apr 2025 16:03:04 +0800 Subject: [PATCH] =?UTF-8?q?aiohttp=E6=9B=B4=E6=8D=A2=E4=B8=BAhttpx?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/oidc_provider/authhub.py | 71 ++++++------ apps/common/oidc_provider/openeuler.py | 53 ++++----- apps/llm/embedding.py | 46 ++++---- apps/manager/token.py | 10 +- apps/scheduler/call/api/api.py | 147 +++++++++---------------- apps/scheduler/call/rag/rag.py | 16 +-- apps/scheduler/call/sql/sql.py | 48 ++++---- apps/service/knowledge_base.py | 23 ++-- apps/service/rag.py | 21 ++-- 9 files changed, 205 insertions(+), 230 deletions(-) diff --git a/apps/common/oidc_provider/authhub.py b/apps/common/oidc_provider/authhub.py index f893a9da4..c97d70af7 100644 --- a/apps/common/oidc_provider/authhub.py +++ b/apps/common/oidc_provider/authhub.py @@ -3,7 +3,7 @@ import logging from typing import Any -import aiohttp +import httpx from fastapi import status from apps.common.config import Config @@ -41,15 +41,18 @@ class AuthhubOIDCProvider(OIDCProviderBase): } url = await cls.get_access_token_url() result = None - async with ( - aiohttp.ClientSession() as session, - session.post(url, headers=headers, json=data, timeout=aiohttp.ClientTimeout(total=10)) as resp, - ): - if resp.status != status.HTTP_200_OK: - err = f"[Authhub] 获取OIDC Token失败: {resp.status},完整输出: {await resp.text()}" + async with httpx.AsyncClient() as client: + resp = await client.post( + url, + headers=headers, + json=data, + timeout=10, + ) + if resp.status_code != status.HTTP_200_OK: + err = f"[Authhub] 获取OIDC Token失败: {resp.status_code},完整输出: {resp.text}" raise RuntimeError(err) - logger.info("[Authhub] 获取OIDC Token成功: %s", await resp.text()) - result = await resp.json() + logger.info("[Authhub] 获取OIDC Token成功: %s", resp.text) + result = resp.json() return { "access_token": result["data"]["access_token"], "refresh_token": result["data"]["refresh_token"], @@ -72,15 +75,18 @@ class AuthhubOIDCProvider(OIDCProviderBase): "client_id": login_config.app_id, } result = None - async with ( - aiohttp.ClientSession() as session, - session.post(url, headers=headers, json=data, timeout=aiohttp.ClientTimeout(total=10)) as resp, - ): - if resp.status != status.HTTP_200_OK: - err = f"[Authhub] 获取用户信息失败: {resp.status},完整输出: {await resp.text()}" + async with httpx.AsyncClient() as client: + resp = await client.post( + url, + headers=headers, + json=data, + timeout=10, + ) + if resp.status_code != status.HTTP_200_OK: + err = f"[Authhub] 获取用户信息失败: {resp.status_code},完整输出: {resp.text}" raise RuntimeError(err) - logger.info("[Authhub] 获取用户信息成功: %s", await resp.text()) - result = await resp.json() + logger.info("[Authhub] 获取用户信息成功: %s", resp.text) + result = resp.json() return { "user_sub": result["data"], @@ -98,20 +104,18 @@ class AuthhubOIDCProvider(OIDCProviderBase): "Content-Type": "application/json", } url = login_config.host_inner.rstrip("/") + "/oauth2/login-status" - async with ( - aiohttp.ClientSession() as session, - session.post( + async with httpx.AsyncClient() as client: + resp = await client.post( url, headers=headers, json=data, cookies=cookie, - timeout=aiohttp.ClientTimeout(total=10), - ) as resp, - ): - if resp.status != status.HTTP_200_OK: - err = f"[Authhub] 获取登录状态失败: {resp.status},完整输出: {await resp.text()}" + timeout=10, + ) + if resp.status_code != status.HTTP_200_OK: + err = f"[Authhub] 获取登录状态失败: {resp.status_code},完整输出: {resp.text}" raise RuntimeError(err) - result = await resp.json() + result = resp.json() return { "access_token": result["data"]["access_token"], "refresh_token": result["data"]["refresh_token"], @@ -126,12 +130,15 @@ class AuthhubOIDCProvider(OIDCProviderBase): "Content-Type": "application/json", } url = login_config.host_inner.rstrip("/") + "/oauth2/logout" - async with ( - aiohttp.ClientSession() as session, - session.get(url, headers=headers, cookies=cookie, timeout=aiohttp.ClientTimeout(total=10)) as resp, - ): - if resp.status != status.HTTP_200_OK: - err = f"[Authhub] 登出失败: {resp.status},完整输出: {await resp.text()}" + async with httpx.AsyncClient() as client: + resp = await client.get( + url, + headers=headers, + cookies=cookie, + timeout=10, + ) + if resp.status_code != status.HTTP_200_OK: + err = f"[Authhub] 登出失败: {resp.status_code},完整输出: {resp.text}" raise RuntimeError(err) @classmethod diff --git a/apps/common/oidc_provider/openeuler.py b/apps/common/oidc_provider/openeuler.py index 278bac40d..e8c045d9d 100644 --- a/apps/common/oidc_provider/openeuler.py +++ b/apps/common/oidc_provider/openeuler.py @@ -3,7 +3,7 @@ import logging from typing import Any -import aiohttp +import httpx from fastapi import status from apps.common.config import Config @@ -39,19 +39,22 @@ class OpenEulerOIDCProvider(OIDCProviderBase): "code": code, } url = await cls.get_access_token_url() - headers = { - "Content-Type": "application/x-www-form-urlencoded", - } - result = None - async with ( - aiohttp.ClientSession() as session, - session.post(url, headers=headers, data=data, timeout=aiohttp.ClientTimeout(total=10)) as resp, - ): - if resp.status != status.HTTP_200_OK: - err = f"[OpenEuler] 获取OIDC Token失败: {resp.status},完整输出: {await resp.text()}" + + async with httpx.AsyncClient() as client: + resp = await client.post( + url, + headers={ + "Content-Type": "application/x-www-form-urlencoded", + }, + data=data, + timeout=10.0, + ) + if resp.status_code != status.HTTP_200_OK: + err = f"[OpenEuler] 获取OIDC Token失败: {resp.status_code},完整输出: {resp.text}" raise RuntimeError(err) - logger.info("[OpenEuler] 获取OIDC Token成功: %s", await resp.text()) - result = await resp.json() + logger.info("[OpenEuler] 获取OIDC Token成功: %s", resp.text) + result = resp.json() + return { "access_token": result["access_token"], "refresh_token": result["refresh_token"], @@ -67,20 +70,20 @@ class OpenEulerOIDCProvider(OIDCProviderBase): err = "Access token is empty." raise RuntimeError(err) url = login_config.host_inner.rstrip("/") + "/oneid/oidc/user" - headers = { - "Authorization": access_token, - } - result = None - async with ( - aiohttp.ClientSession() as session, - session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp, - ): - if resp.status != status.HTTP_200_OK: - err = f"[OpenEuler] 获取OIDC用户失败: {resp.status},完整输出: {await resp.text()}" + async with httpx.AsyncClient() as client: + resp = await client.get( + url, + headers={ + "Authorization": access_token, + }, + timeout=10.0, + ) + if resp.status_code != status.HTTP_200_OK: + err = f"[OpenEuler] 获取OIDC用户失败: {resp.status_code},完整输出: {resp.text}" raise RuntimeError(err) - logger.info("[OpenEuler] 获取OIDC用户成功: %s", await resp.text()) - result = await resp.json() + logger.info("[OpenEuler] 获取OIDC用户成功: %s", resp.text) + result = resp.json() if not result["phone_number_verified"]: err = "Could not validate credentials." diff --git a/apps/llm/embedding.py b/apps/llm/embedding.py index 103c80ffd..eca65294a 100644 --- a/apps/llm/embedding.py +++ b/apps/llm/embedding.py @@ -1,6 +1,6 @@ """Embedding模型""" -import aiohttp +import httpx from apps.common.config import Config @@ -9,6 +9,12 @@ class Embedding: """Embedding模型""" # TODO: 应当自动检测向量维度 + @classmethod + async def _get_embedding_dimension(cls) -> int: + """获取Embedding的维度""" + embedding = await cls.get_embedding(["测试文本"]) + return len(embedding[0]) + @classmethod async def _get_openai_embedding(cls, text: list[str]) -> list[list[float]]: @@ -26,16 +32,14 @@ class Embedding: if Config().get_config().embedding.api_key: headers["Authorization"] = f"Bearer {Config().get_config().embedding.api_key}" - async with ( - aiohttp.ClientSession() as session, - session.post( + async with httpx.AsyncClient() as client: + response = await client.post( api, json=data, headers=headers, - timeout=aiohttp.ClientTimeout(total=60), - ) as response, - ): - json = await response.json() + timeout=60.0, + ) + json = response.json() return [item["embedding"] for item in json["data"]] @classmethod @@ -48,22 +52,20 @@ class Embedding: if Config().get_config().embedding.api_key: headers["Authorization"] = f"Bearer {Config().get_config().embedding.api_key}" - session = aiohttp.ClientSession() - - result = [] - for single_text in text: - data = { - "inputs": single_text, - "normalize": True, - } - async with session.post( - api, json=data, headers=headers, timeout=aiohttp.ClientTimeout(total=60), - ) as response: - json = await response.json() + async with httpx.AsyncClient() as client: + result = [] + for single_text in text: + data = { + "inputs": single_text, + "normalize": True, + } + response = await client.post( + api, json=data, headers=headers, timeout=60.0, + ) + json = response.json() result.append(json[0]) - await session.close() - return result + return result @classmethod async def get_embedding(cls, text: list[str]) -> list[list[float]]: diff --git a/apps/manager/token.py b/apps/manager/token.py index eb329783e..d78279b04 100644 --- a/apps/manager/token.py +++ b/apps/manager/token.py @@ -7,7 +7,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. import logging from datetime import UTC, datetime, timedelta -import aiohttp +import httpx from fastapi import status from apps.common.config import Config @@ -109,16 +109,16 @@ class TokenManager: err = "Refresh token均过期,需要重新登录" raise RuntimeError(err) - async with aiohttp.ClientSession() as session: - response = await session.post( + async with httpx.AsyncClient() as client: + response = await client.post( url=access_token_url, json={ "client_id": oidc_config.app_id, "access_token": oidc_access_token, }, ) - ret = await response.json() - if response.status != status.HTTP_200_OK: + ret = response.json() + if response.status_code != status.HTTP_200_OK: logger.error("[TokenManager] 获取 %s 插件所需的token失败", plugin_name) return None diff --git a/apps/scheduler/call/api/api.py b/apps/scheduler/call/api/api.py index 550f1edf1..887cafd11 100644 --- a/apps/scheduler/call/api/api.py +++ b/apps/scheduler/call/api/api.py @@ -7,10 +7,10 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. import json import logging from collections.abc import AsyncGenerator +from functools import partial from typing import Any -import aiohttp -from aiohttp.client import _RequestContextManager +import httpx from fastapi import status from pydantic import Field from pydantic.json_schema import SkipJsonSchema @@ -99,8 +99,7 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """调用API,然后返回LLM解析后的数据""" - self._session = aiohttp.ClientSession() - self._timeout = aiohttp.ClientTimeout(total=self.timeout) + self._client = httpx.AsyncClient(timeout=self.timeout) input_obj = APIInput.model_validate(input_data) try: result = await self._call_api(input_obj) @@ -109,10 +108,10 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): content=result.model_dump(exclude_none=True, by_alias=True), ) finally: - await self._session.close() + await self._client.aclose() - async def _make_api_call(self, data: APIInput, files: aiohttp.FormData) -> _RequestContextManager: - """组装API请求Session""" + async def _make_api_call(self, data: APIInput, files: dict[str, tuple[str, bytes, str]]) -> httpx.Response: + """组装API请求""" # 获取必要参数 if self._auth: req_header, req_cookie, req_params = await self._apply_auth() @@ -121,16 +120,45 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): req_cookie = {} req_params = {} + # 创建请求工厂 + request_factory = partial( + self._client.request, + method=self.method, + url=self.url, + cookies=req_cookie, + ) + + # 根据HTTP方法创建请求 if self.method in [HTTPMethod.GET.value, HTTPMethod.DELETE.value]: - return await self._handle_query_request(data, req_header, req_cookie, req_params) + # GET/DELETE 请求处理 + req_params.update(data.query) + return await request_factory(params=req_params) if self.method in [HTTPMethod.POST.value, HTTPMethod.PUT.value, HTTPMethod.PATCH.value]: - return await self._handle_body_request(data, files, req_header, req_cookie) + # POST/PUT/PATCH 请求处理 + if not self.content_type: + raise CallError(message="API接口的Content-Type未指定", data={}) - raise CallError( - message="API接口的HTTP Method不支持", - data={}, - ) + # 根据Content-Type设置请求参数 + req_body = data.body + req_header.update({"Content-Type": self.content_type}) + + # 根据Content-Type决定如何发送请求体 + content_type_handlers = { + ContentType.JSON.value: lambda body, _: {"json": body}, + ContentType.FORM_URLENCODED.value: lambda body, _: {"data": body}, + ContentType.MULTIPART_FORM_DATA.value: lambda body, files: {"data": body, "files": files}, + } + + handler = content_type_handlers.get(self.content_type) + if not handler: + raise CallError(message="API接口的Content-Type不支持", data={}) + + request_kwargs = {} + request_kwargs.update(handler(req_body, files)) + return await request_factory(**request_kwargs) + + raise CallError(message="API接口的HTTP Method不支持", data={}) async def _apply_auth(self) -> tuple[dict[str, str], dict[str, str], dict[str, str]]: """应用认证信息到请求参数中""" @@ -160,93 +188,24 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): return req_header, req_cookie, req_params - async def _handle_query_request( - self, data: APIInput, req_header: dict[str, str], req_cookie: dict[str, str], req_params: dict[str, str], - ) -> _RequestContextManager: - """处理GET和DELETE请求""" - req_params.update(data.query) - return self._session.request( - self.method, - self.url, - params=req_params, - headers=req_header, - cookies=req_cookie, - timeout=self._timeout, - ) - - async def _handle_body_request( - self, data: APIInput, files: aiohttp.FormData, req_header: dict[str, str], req_cookie: dict[str, str], - ) -> _RequestContextManager: - """处理POST、PUT和PATCH请求""" - if not self.content_type: - raise CallError( - message="API接口的Content-Type未指定", - data={}, - ) - - req_body = data.body - - if self.content_type in [ContentType.FORM_URLENCODED.value, ContentType.MULTIPART_FORM_DATA.value]: - return await self._handle_form_request(req_body, files, req_header, req_cookie) - - if self.content_type == ContentType.JSON.value: - return await self._handle_json_request(req_body, req_header, req_cookie) - - raise CallError( - message="API接口的Content-Type不支持", - data={}, - ) - - async def _handle_form_request( - self, - req_body: dict[str, Any], - form_data: aiohttp.FormData, - req_header: dict[str, str], - req_cookie: dict[str, str], - ) -> _RequestContextManager: - """处理表单类型的请求""" - for key, val in req_body.items(): - form_data.add_field(key, val) - - return self._session.request( - self.method, - self.url, - data=form_data, - headers=req_header, - cookies=req_cookie, - timeout=self._timeout, - ) - - async def _handle_json_request( - self, req_body: dict[str, Any], req_header: dict[str, str], req_cookie: dict[str, str], - ) -> _RequestContextManager: - """处理JSON类型的请求""" - return self._session.request( - self.method, - self.url, - json=req_body, - headers=req_header, - cookies=req_cookie, - timeout=self._timeout, - ) - async def _call_api(self, final_data: APIInput) -> APIOutput: """实际调用API,并处理返回值""" # 获取必要参数 logger.info("[API] 调用接口 %s,请求数据为 %s", self.url, final_data) - session_context = await self._make_api_call(final_data, aiohttp.FormData()) - async with session_context as response: - if response.status not in SUCCESS_HTTP_CODES: - text = f"API发生错误:API返回状态码{response.status}, 原因为{response.reason}。" - logger.error(text) - raise CallError( - message=text, - data={"api_response_data": await response.text()}, - ) + files = {} # httpx需要使用字典格式的files参数 + response = await self._make_api_call(final_data, files) + + if response.status_code not in SUCCESS_HTTP_CODES: + text = f"API发生错误:API返回状态码{response.status_code}, 原因为{response.reason_phrase}。" + logger.error(text) + raise CallError( + message=text, + data={"api_response_data": response.text}, + ) - response_status = response.status - response_data = await response.text() + response_status = response.status_code + response_data = response.text logger.info("[API] 调用接口 %s,结果为 %s", self.url, response_data) diff --git a/apps/scheduler/call/rag/rag.py b/apps/scheduler/call/rag/rag.py index 26612687e..43c62efed 100644 --- a/apps/scheduler/call/rag/rag.py +++ b/apps/scheduler/call/rag/rag.py @@ -8,7 +8,7 @@ import logging from collections.abc import AsyncGenerator from typing import Any, Literal -import aiohttp +import httpx from fastapi import status from pydantic import Field @@ -65,11 +65,13 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): "Content-Type": "application/json", } - # 发送 GET 请求 - async with aiohttp.ClientSession() as session, session.post(url, headers=headers, json=input_data) as response: + # 发送请求 + async with httpx.AsyncClient() as client: + response = await client.post(url, headers=headers, json=input_data) + # 检查响应状态码 - if response.status == status.HTTP_200_OK: - result = await response.json() + if response.status_code == status.HTTP_200_OK: + result = response.json() chunk_list = result["data"] corpus = [] @@ -86,14 +88,14 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): ) return - text = await response.text() + text = response.text logger.error("[RAG] 调用失败:%s", text) raise CallError( message=f"rag调用失败:{text}", data={ "question": data.content, - "status": response.status, + "status": response.status_code, "text": text, }, ) diff --git a/apps/scheduler/call/sql/sql.py b/apps/scheduler/call/sql/sql.py index 2d4a973a8..c85e1d8ca 100644 --- a/apps/scheduler/call/sql/sql.py +++ b/apps/scheduler/call/sql/sql.py @@ -9,7 +9,7 @@ import logging from collections.abc import AsyncGenerator from typing import Any -import aiohttp +import httpx from fastapi import status from pydantic import Field @@ -66,19 +66,19 @@ class SQL(CoreCall, input_model=SQLInput, output_model=SQLOutput): while retry < max_retry and len(sql_list) < self.top_k: try: - async with aiohttp.ClientSession() as session, session.post( - Config().get_config().extra.sql_url + "/database/sql", - headers=headers, - json=post_data, - timeout=aiohttp.ClientTimeout(total=60), - ) as response: - if response.status == status.HTTP_200_OK: - result = await response.json() + async with httpx.AsyncClient() as client: + response = await client.post( + Config().get_config().extra.sql_url + "/database/sql", + headers=headers, + json=post_data, + timeout=60.0, + ) + if response.status_code == status.HTTP_200_OK: + result = response.json() if result["code"] == status.HTTP_200_OK: sql_list.extend(result["result"]["sql_list"]) else: - text = await response.text() - logger.error("[SQL] 生成失败:%s", text) + logger.error("[SQL] 生成失败:%s", response.text) retry += 1 except Exception: logger.exception("[SQL] 生成失败") @@ -96,22 +96,22 @@ class SQL(CoreCall, input_model=SQLInput, output_model=SQLOutput): for sql_dict in sql_list: try: - async with aiohttp.ClientSession() as session, session.post( - Config().get_config().extra.sql_url + "/sql/execute", - headers=headers, - json={ - "database_id": sql_dict["database_id"], - "sql": sql_dict["sql"], - }, - timeout=aiohttp.ClientTimeout(total=60), - ) as response: - if response.status == status.HTTP_200_OK: - result = await response.json() + async with httpx.AsyncClient() as client: + response = await client.post( + Config().get_config().extra.sql_url + "/sql/execute", + headers=headers, + json={ + "database_id": sql_dict["database_id"], + "sql": sql_dict["sql"], + }, + timeout=60.0, + ) + if response.status_code == status.HTTP_200_OK: + result = response.json() if result["code"] == status.HTTP_200_OK: return result["result"], sql_dict["sql"] else: - text = await response.text() - logger.error("[SQL] 调用失败:%s", text) + logger.error("[SQL] 调用失败:%s", response.text) except Exception: logger.exception("[SQL] 调用失败") diff --git a/apps/service/knowledge_base.py b/apps/service/knowledge_base.py index 2ce24d119..c1c33d1e6 100644 --- a/apps/service/knowledge_base.py +++ b/apps/service/knowledge_base.py @@ -4,7 +4,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ -import aiohttp +import httpx from fastapi import status from apps.common.config import Config @@ -36,9 +36,10 @@ class KnowledgeBaseService: ] post_data = RAGFileParseReq(document_list=rag_docs).model_dump(exclude_none=True, by_alias=True) - async with aiohttp.ClientSession() as session, session.post(_RAG_DOC_PARSE_URI, json=post_data) as resp: - resp_data = await resp.json() - if resp.status != status.HTTP_200_OK: + async with httpx.AsyncClient() as client: + resp = await client.post(_RAG_DOC_PARSE_URI, json=post_data) + resp_data = resp.json() + if resp.status_code != status.HTTP_200_OK: return [] return resp_data["data"] @@ -46,9 +47,10 @@ class KnowledgeBaseService: async def delete_doc_from_rag(doc_ids: list[str]) -> list[str]: """删除文件""" post_data = {"ids": doc_ids} - async with aiohttp.ClientSession() as session, session.post(_RAG_DOC_DELETE_URI, json=post_data) as resp: - resp_data = await resp.json() - if resp.status != status.HTTP_200_OK: + async with httpx.AsyncClient() as client: + resp = await client.post(_RAG_DOC_DELETE_URI, json=post_data) + resp_data = resp.json() + if resp.status_code != status.HTTP_200_OK: return [] return resp_data["data"] @@ -56,8 +58,9 @@ class KnowledgeBaseService: async def get_doc_status_from_rag(doc_ids: list[str]) -> list[RAGFileStatusRspItem]: """获取文件状态""" post_data = {"ids": doc_ids} - async with aiohttp.ClientSession() as session, session.post(_RAG_DOC_STATUS_URI, json=post_data) as resp: - resp_data = await resp.json() - if resp.status != status.HTTP_200_OK: + async with httpx.AsyncClient() as client: + resp = await client.post(_RAG_DOC_STATUS_URI, json=post_data) + resp_data = resp.json() + if resp.status_code != status.HTTP_200_OK: return [] return [RAGFileStatusRspItem.model_validate(item) for item in resp_data["data"]] diff --git a/apps/service/rag.py b/apps/service/rag.py index e3d68dc15..485d6791e 100644 --- a/apps/service/rag.py +++ b/apps/service/rag.py @@ -8,7 +8,7 @@ import json import logging from collections.abc import AsyncGenerator -import aiohttp +import httpx from fastapi import status from apps.common.config import Config @@ -32,20 +32,19 @@ class RAG: # asyncio HTTP请求 - async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=300)) as session, session.post( - url, headers=headers, data=payload, ssl=False, - ) as response: - if response.status != status.HTTP_200_OK: - logger.error("[RAG] RAG服务返回错误码: %s\n%s", response.status, await response.text()) + async with ( + httpx.AsyncClient(timeout=300, verify=False) as client, # noqa: S501 + client.stream("POST", url, headers=headers, content=payload) as response, + ): + if response.status_code != status.HTTP_200_OK: + logger.error("[RAG] RAG服务返回错误码: %s\n%s", response.status_code, await response.aread()) return - async for line in response.content: - line_str = line.decode("utf-8") - + async for line in response.aiter_lines(): if not await Activity.is_active(user_sub): return - if "data: [DONE]" in line_str: + if "data: [DONE]" in line: return - yield line_str + yield line -- Gitee