diff --git a/core/agent/api/v1/base_api.py b/core/agent/api/v1/base_api.py index 888b3758ccd7372a14b09ec757c41708e891c091..d953b3297e5b254ff6f403cdd14d117350128e1c 100644 --- a/core/agent/api/v1/base_api.py +++ b/core/agent/api/v1/base_api.py @@ -22,16 +22,36 @@ from infra import agent_config def json_serializer(obj: Any) -> Any: - """Custom JSON serializer to handle set objects.""" + """自定义JSON序列化器,用于处理集合对象 + + 参数: + obj: 需要序列化的对象 + + 返回: + 序列化后的对象 + + 异常: + TypeError: 当对象类型不支持序列化时抛出 + """ if isinstance(obj, set): - return list(obj) + return list(obj) # 将set转换为list进行序列化 raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") @dataclass class RunContext: - """Runtime context parameters""" - + """运行时上下文参数 + + 用于在Completion执行过程中传递和共享状态信息 + + 属性: + error: 错误信息对象 + error_log: 错误日志详情 + chunk_logs: 响应块日志列表 + span: 分布式追踪Span + node_trace: 节点追踪信息 + meter: 指标计量器 + """ error: BaseExc error_log: str chunk_logs: List[str] @@ -41,6 +61,16 @@ class RunContext: class CompletionBase(BaseModel, ABC): + """Completion基类 - 定义AI补全请求的通用处理逻辑 + + 这是一个抽象基类,提供了Completion请求的标准处理流程, + 包括运行器构建、节点追踪、指标收集和流式响应生成。 + + 属性: + app_id: 应用标识 + inputs: 输入参数 + log_caller: 日志调用者标识 + """ app_id: str inputs: BaseInputs log_caller: str @@ -49,10 +79,35 @@ class CompletionBase(BaseModel, ABC): @abstractmethod async def build_runner(self, span: Span) -> Any: - """Subclasses need to implement the logic for building runner""" + """构建运行器 - 子类需要实现具体的运行器构建逻辑 + + 参数: + span: 分布式追踪Span对象 + + 返回: + 具体的运行器实例 + + 说明: + 这是一个抽象方法,子类必须实现以提供具体的业务逻辑运行器 + """ async def build_node_trace(self, bot_id: str, span: Span) -> NodeTracePatch: + """构建节点追踪信息 + + 参数: + bot_id: 机器人标识 + span: 分布式追踪Span对象 + + 返回: + NodeTracePatch: 节点追踪补丁对象 + + 功能: + - 初始化节点追踪信息 + - 记录开始时间 + - 添加追踪事件 + """ with span.start("BuildNodeTrace") as sp: + # 创建节点追踪对象,使用bot_id作为service_id node_trace: NodeTracePatch = NodeTracePatch( service_id=bot_id, # Use bot_id as service_id sid=sp.sid, @@ -64,14 +119,26 @@ class CompletionBase(BaseModel, ABC): log_caller=self.log_caller, question=self.inputs.get_last_message_content(), ) - node_trace.record_start() + node_trace.record_start() # 记录开始时间 + # 添加节点追踪信息到Span事件 sp.add_info_events({"node-trace": node_trace.model_dump_json()}) return node_trace async def build_meter(self, span: Span) -> Meter: - + """构建指标计量器 + + 参数: + span: 分布式追踪Span对象 + + 返回: + Meter: 指标计量器实例 + + 功能: + - 初始化计量器 + - 添加应用信息和函数标识 + """ with span.start("BuildMeter") as sp: sp.add_info_events({"app-id": self.app_id, "func": self.log_caller}) @@ -81,19 +148,32 @@ class CompletionBase(BaseModel, ABC): async def _process_chunk( self, chunk: Any, chunk_logs: List[str] ) -> AsyncGenerator[str, None]: - """Logic for processing individual chunk""" + """处理单个响应块的逻辑 + + 参数: + chunk: 响应块数据 + chunk_logs: 块日志列表 + + 返回: + AsyncGenerator: 处理后的块数据生成器 + + 功能: + - 过滤日志类型的块 + - 处理知识元数据块 + - 记录块日志信息 + """ if chunk.object == "chat.completion.log": - # span.add_info_events(attributes={ - # "log": json.dumps(chunk.log, ensure_ascii=False) - # }) + # 日志类型的块不生成输出,仅用于内部记录 return # Do not generate chunk output if chunk.object == "chat.completion.chunk": + # 标准补全块,记录日志并生成输出 chunk_logs.append(chunk.model_dump_json()) yield await self.create_chunk(chunk) return if chunk.object == "chat.completion.knowledge_metadata": + # 知识元数据块,根据调用者决定是否输出 if self.log_caller == "chat_open_api": return # Do not generate chunk output @@ -101,14 +181,29 @@ class CompletionBase(BaseModel, ABC): yield await self.create_chunk(chunk) async def _finalize_run(self, context: RunContext) -> AsyncGenerator[str, None]: - """Cleanup work after completing the run""" + """运行结束后的清理工作 + + 参数: + context: 运行时上下文 + + 返回: + AsyncGenerator: 最终块数据生成器 + + 功能: + - 错误处理和日志记录 + - 使用量统计汇总 + - 指标上报 + - 节点追踪上传 + """ + # 错误处理:如果有错误,添加到Span事件中 if context.error.c != 0: context.error.m += f",{context.span.sid}" context.span.add_error_events({"traceback": context.error_log}) + # 创建停止块 stop_chunk = await self.create_stop(context.span, context.error) - # Attach usage from node_trace if available + # 从节点追踪中汇总使用量统计 if context.node_trace.trace: from openai.types.completion_usage import CompletionUsage @@ -117,6 +212,7 @@ class CompletionBase(BaseModel, ABC): "prompt_tokens": 0, "total_tokens": 0 } + # 遍历所有追踪节点,累加使用量 for node in context.node_trace.trace: if hasattr(node, "data") and hasattr(node.data, "usage"): total_usage["completion_tokens"] += ( @@ -129,26 +225,30 @@ class CompletionBase(BaseModel, ABC): node.data.usage.total_tokens ) + # 如果有使用量数据,添加到停止块中 if total_usage["total_tokens"] > 0: stop_chunk.usage = CompletionUsage(**total_usage) + # 记录停止块日志 context.chunk_logs.append(stop_chunk.model_dump_json()) + # 将所有块日志添加到Span事件中 for chunk_log in context.chunk_logs: context.span.add_info_events({"response-chunk": chunk_log}) + # 生成停止块和完成标记 yield await self.create_chunk(stop_chunk) yield await self.create_done() + # 指标上报:错误计数 if agent_config.UPLOAD_METRICS: context.meter.in_error_count(context.error.c) - # context.meter.in_error_count( - # context.error.c, lables={"msg": context.error.m} - # ) + # 设置Span属性和信息事件 context.span.set_attributes(attributes={"code": context.error.c}) context.span.add_info_events({"message": context.error.m}) + # 节点追踪结束记录和上传 context.node_trace.record_end() if agent_config.UPLOAD_NODE_TRACE: node_trace_log = context.node_trace.upload( @@ -167,30 +267,50 @@ class CompletionBase(BaseModel, ABC): async def run_runner( self, node_trace: NodeTrace, meter: Meter, span: Span ) -> AsyncGenerator[str, None]: - + """执行运行器的主要方法 + + 参数: + node_trace: 节点追踪对象 + meter: 指标计量器 + span: 分布式追踪Span对象 + + 返回: + AsyncGenerator: 流式响应生成器 + + 功能: + - 构建并执行运行器 + - 处理流式响应块 + - 异常捕获和处理 + - 最终清理工作 + """ with span.start("RunRunner") as sp: - error: BaseExc = AgentNormalExc() + error: BaseExc = AgentNormalExc() # 默认正常异常 error_log: str = "" chunk_logs: List[str] = [] try: + # 构建运行器实例 runner = await self.build_runner(sp) if runner is None: raise AgentInternalExc("Failed to build runner") + # 执行运行器并处理流式响应 async for chunk in runner.run(span=sp, node_trace=node_trace): - chunk.id = span.sid + chunk.id = span.sid # 设置块ID为Span ID async for processed_chunk in self._process_chunk(chunk, chunk_logs): yield processed_chunk except BaseExc as e: + # 处理已知异常 error = e error_log = traceback.format_exc() except Exception as e: # pylint: disable=broad-exception-caught + # 处理未知异常,转换为内部异常 error = AgentInternalExc(str(e)) error_log = traceback.format_exc() finally: + # 创建运行时上下文并执行最终清理 context = RunContext( error=error, error_log=error_log, @@ -204,10 +324,27 @@ class CompletionBase(BaseModel, ABC): @staticmethod async def create_chunk(chunk: Any) -> str: + """创建标准化的块响应格式 + + 参数: + chunk: 块数据对象 + + 返回: + str: 格式化后的块响应字符串 + """ return f"data: {chunk.model_dump_json()}\n\n" @staticmethod async def create_stop(span: Span, e: BaseExc) -> ReasonChatCompletionChunk: + """创建停止块 + + 参数: + span: 分布式追踪Span对象 + e: 异常信息 + + 返回: + ReasonChatCompletionChunk: 停止块对象 + """ chunk = ReasonChatCompletionChunk( id=span.sid, code=e.c, @@ -223,4 +360,9 @@ class CompletionBase(BaseModel, ABC): @staticmethod async def create_done() -> str: - return "data: [DONE]\n\n" + """创建完成标记 + + 返回: + str: 完成标记字符串 + """ + return "data: [DONE]\n\n" \ No newline at end of file