# loguru_volcengine **Repository Path**: OB19227/loguru_volcengine ## Basic Information - **Project Name**: loguru_volcengine - **Description**: 为 Loguru 提供"即插即用"的火山引擎云日志上传能力,支持结构化日志、异常日志增强、批量与错误通道分流、异步推送与重试策略,适用于 FastAPI 等异步服务场景。 - **Primary Language**: Unknown - **License**: AGPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-08-16 - **Last Updated**: 2025-08-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ### loguru_volcengine 云日志集成模块 为 Loguru 提供"即插即用"的火山引擎云日志上传能力,支持结构化日志、异常日志增强、批量与错误通道分流、异步推送与重试策略,适用于 FastAPI 等异步服务场景。 --- ### 功能特性 - 🚀 **一行接入**:`add_cloud_handler(config)` 立即把云日志接入 Loguru(显式传入配置) - 🔧 **手动管理**:`CloudLogManager` 独立启动/停止,精细化控制生命周期 - 📦 **结构化日志**:自动收集基础字段,支持 `extra` 自定义字段 - 🐛 **异常增强**:默认异常格式化器,错误日志自动补充堆栈详情 - ⚡ **异步推送**:基于事件循环与专用线程池,批量/定时刷新,高吞吐 - 🔄 **错误分通道**:普通日志与错误日志分开队列与刷新策略 - 🛡️ **重试与退避**:可配置最大重试、固定/指数退避 - 📊 **模块日志等级**:可单独配置模块内部日志等级,便于排障 - 🔒 **并发安全**:改进的并发控制和锁机制,确保线程安全 --- ### 组件与目录 - `cloud_logger.py`:核心管理器与配置 - `handlers.py`:面向 Loguru 的便捷集成函数,使用状态管理类避免全局变量 - `loguru_cloud_example.py`:完整示例 - `volcengine_uplog_example.py`:火山引擎专用示例 --- ### 代码质量 - 遵循 Python 最佳实践,避免使用 `global` 语句 - 使用 `CloudLogState` 类管理内部状态,提高代码可测试性和维护性 - 通过属性访问器提供清晰的状态接口 --- ### 安装与依赖 - Python 3.10+ - `loguru` - `volcengine`(火山引擎 TLS SDK,代码使用 `volcengine.tls.*`) - 其他标准库(`asyncio`、`concurrent.futures` 等) - 网络出站权限(访问火山引擎 TLS 服务) ```bash pip install loguru volcengine ``` --- ### 快速开始 #### 1. 最简单的用法(推荐在已有事件循环环境中,如 FastAPI lifespan) ```python from loguru import logger from dygmserver.libs.loguru_volcengine import CloudLogConfig, add_cloud_handler config = CloudLogConfig( topic_id="你的_topic_id", access_key_id="你的_access_key_id", access_key_secret="你的_access_key_secret", ) if add_cloud_handler(config): logger.info("云日志接入成功") else: logger.warning("云日志接入失败,请检查配置") ``` #### 2. 手动管理(需要自行在事件循环中启动/停止) ```python from loguru import logger from dygmserver.libs.loguru_volcengine import CloudLogConfig, CloudLogManager config = CloudLogConfig( topic_id="你的_topic_id", access_key_id="你的_access_key_id", access_key_secret="你的_access_key_secret", ) cloud_manager = CloudLogManager(config) # 在异步上下文中 await cloud_manager.start() handler_id = logger.add(cloud_manager.get_handler()) # ... 业务日志 logger.info("手动管理云日志测试", extra={"component": "demo"}) # 关闭 logger.remove(handler_id) await cloud_manager.stop() ``` #### 3. 便捷移除与状态查询 ```python from dygmserver.libs.loguru_volcengine import ( remove_cloud_handler, is_cloud_handler_active, get_cloud_manager, get_cloud_state, ) ok = remove_cloud_handler() active = is_cloud_handler_active() manager = get_cloud_manager() state = get_cloud_state() ``` --- ### 与 FastAPI 集成(lifespan 模式) 在 FastAPI 应用启动时启用云日志,关闭时移除 ```python # lifespan.py from contextlib import asynccontextmanager from fastapi import FastAPI from dygmserver.utils.log import logger, log_manager # log_manager 封装了 add/remove @asynccontextmanager async def lifespan(app: FastAPI): # 启动阶段(事件循环已就绪) log_manager.add_volcengine_logger() # ... 你的依赖检查/DB/缓存/任务等启动逻辑 # await check_dependencies(); _setup_db(app); ... yield # 关闭阶段(确保在事件循环内调用移除函数) log_manager.remove_volcengine_logger() ``` 注意: - 避免在模块导入阶段(无事件循环)调用云日志启动/移除,否则可能出现 "no running event loop"。 - 多进程(如 Gunicorn)环境下,每个进程在其自身 lifespan 内独立启停云日志。 --- ### 配置说明 CloudLogConfig #### 必填参数 - `topic_id`: 云日志 Topic ID - `access_key_id`: 访问 Key ID - `access_key_secret`: 访问 Key Secret > 说明:本模块不会自动读取环境变量,必须显式提供以上凭证;传入 `None` 将导致配置构造失败。 #### 默认可选参数(服务端点) - `endpoint`: 云服务端点,默认 "https://tls-cn-beijing.volces.com" - `region`: 云服务区域,默认 "cn-beijing" - `compression`: 压缩方式,默认 "zlib" #### 常用可选项(日志元信息) - `source`: 日志来源标识,默认 "cloud_logger" - `filename`: 日志文件名,默认 "cloud_logger.log" #### 批量与刷新 - `batch_size`: 普通日志批量大小(队列满即触发推送),默认 100 - `error_batch_size`: 错误日志批量大小(错误优先,小批量快推),默认 20 - `flush_interval`: 普通日志定时刷新间隔(秒),默认 60 - `error_flush_interval`: 错误日志定时刷新间隔(秒),默认 10 - `adaptive_batch_enabled`: 启用基于队列压力的动态批量,默认 True #### 队列与内存控制 - `max_queue_size`: 普通日志最大队列长度,默认 10000 - `error_max_queue_size`: 错误日志最大队列长度,默认 2000 - `overflow_strategy`: 队列溢出策略,`drop_oldest` 或 `drop_newest`,默认 `drop_oldest` #### 推送与重试 - `max_retry_attempts`: 最大重试次数,默认 3 - `retry_delay`: 固定重试延迟(秒),默认 1.0 - `exponential_backoff`: 是否启用指数退避,默认 True - `thread_pool_max_workers`: 推送线程池大小,默认 5 #### 模块日志与自定义处理 - `module_logger_level`: 模块内部日志等级,如 "DEBUG"/"INFO"... - `log_formatter`: 自定义格式化器,签名 `f(level, message, **kwargs) -> dict` - `log_filter`: 自定义过滤器,签名 `f(level, message, **kwargs) -> bool` - `extra_fields`: 追加到每条日志的固定字段(dict) #### 配置示例(自定义格式化与过滤) ```python import time def custom_formatter(level, message, **kwargs): return { "lvl": level, "msg": message, "timestamp": int(time.time()), **kwargs, } def custom_filter(level, message, **kwargs): keywords = ["error", "warning", "important"] return any(keyword in message.lower() for keyword in keywords) config = CloudLogConfig( topic_id="xxx", access_key_id="xxx", access_key_secret="xxx", ).copy_with( log_formatter=custom_formatter, log_filter=custom_filter, extra_fields={"service": "order", "env": "prod", "app_version": "1.0.0"}, ) add_cloud_handler(config) ``` --- ### 日志结构与字段 云日志 sink 默认收集下列基础字段,并自动合并 `extra`: - `log_name`: 日志记录器名称 - `log_level`: 等级名(INFO/ERROR…) - `log_path`: 文件路径与行号 - `log_time`: 日志时间戳(毫秒) - `exception`: 异常摘要(如有) - `...extra`: 你通过 `logger.*(..., extra={...})` 传入的扩展字段 对于错误级别(ERROR/CRITICAL)的日志,默认格式化器还会补充: - `exception_type` - `exception_message` - `exception_module` - `exception_stacktrace`(完整堆栈数组) #### 结构化日志示例 ```python import time logger.info("用户操作", extra={ "user_id": "12345", "action": "login", "ip": "192.168.1.100", "user_agent": "Mozilla/5.0...", "timestamp": time.time() }) logger.error("系统错误", extra={ "error_code": "E001", "error_type": "DatabaseConnectionError", "component": "database", "retry_count": 3 }) ``` --- ### API 参考 #### 主要函数(位于 `handlers`) ##### `add_cloud_handler(config)` 添加云日志处理器到 loguru。 - **参数:** - `config` (CloudLogConfig, 必填): 云日志配置。传入 `None` 将导致配置构造失败(本模块不自动读取环境变量)。 - **返回:** - `bool`: 是否成功添加处理器 > 说明:`add_cloud_handler` 同时在包根导出(`dygmserver.libs.loguru_volcengine`)。其他便捷函数请从 `handlers` 导入。 ##### `remove_cloud_handler()` 移除云日志处理器。 - **返回:** - `bool`: 是否成功移除处理器 ##### `get_cloud_manager()` 获取当前云日志管理器实例。 - **返回:** - `CloudLogManager | None`: 云日志管理器实例 ##### `is_cloud_handler_active()` 检查云日志处理器是否已激活。 - **返回:** - `bool`: 是否已激活 ##### `get_cloud_state()` 获取云日志状态管理器(主要用于测试和调试)。 - **返回:** - `CloudLogState`: 云日志状态管理器实例 #### 类 ##### `CloudLogConfig` 云日志配置类。 - **方法:** - `is_valid()`: 检查凭证字段是否齐全 - `copy_with(**overrides)`: 复制配置并覆盖某些参数 ##### `CloudLogManager` 云日志管理器类。 - **方法:** - `start()`: 启动云日志管理器 - `stop()`: 停止云日志管理器 - `get_handler()`: 获取 loguru sink handler - `add_log(level, message, **kwargs)`: 添加日志 ##### `CloudLogState` 云日志状态管理类(内部使用)。 - **属性:** - `cloud_manager`: 当前云日志管理器实例 - `cloud_handler_id`: 当前 loguru handler ID - **方法:** - `set_manager(manager)`: 设置管理器实例 - `set_handler_id(handler_id)`: 设置 handler ID - `clear()`: 清空所有状态 --- ### 运行时行为 - 队列满或到达刷新间隔即推送普通日志;错误日志走独立快速通道。 - 启用动态批量后,会根据队列压力在 [0.5x, 2x] 基准批量范围自适应调整。 - 异步任务后台运行,异常会被采集并记录在模块日志中。 - 停止时会尝试冲刷队列与关闭线程池,尽量保证不丢日志。 --- ### 常见问题与排障 - **"no running event loop"** - 症状:在服务尚未建立事件循环时调用了 `add_cloud_handler` 或 `remove_cloud_handler`。 - 解决:放到 FastAPI `lifespan` 的启动/关闭阶段,或任何已存在事件循环的异步上下文中。 - **无法看到云端日志** - 核对凭证/Topic 是否正确、网络可达性。 - 检查是否命中过滤器(自定义 `log_filter`)。 - 增加 `module_logger_level="DEBUG"` 观察模块内部日志。 - **多进程重复日志** - 每个进程应独立启停云日志(lifespan 内初始化);避免在父进程或模块导入期加 handler。 - **性能问题** - 调整 `batch_size`、`flush_interval`、是否启用 `adaptive_batch_enabled`。 - 检查网络连接和云服务状态。 --- ### 安全与合规 - 切勿在代码库中硬编码 `access_key_secret` 等敏感信息。 - 建议在业务侧通过配置系统注入,并使用这些配置构造 `CloudLogConfig`(本模块不自动读取环境变量)。 --- ### 示例入口 - 基础示例:`loguru_cloud_example.py` - 火山引擎专用示例:`volcengine_uplog_example.py` - 在项目中的接入位置参考: - 启动:`dygmserver/web/lifespan.py` 内调用 `log_manager.add_volcengine_logger()` - 关闭:`log_manager.remove_volcengine_logger()` - 日志初始化:`dygmserver/utils/log.py` --- ### 版本与兼容性 - 适配 Python 3.10+,Loguru 最新稳定版。 - Windows/Linux/macOS 皆可;生产建议 Linux 部署。 --- ### 注意事项 1. **配置验证**: 确保提供有效的 `topic_id`、`access_key_id` 和 `access_key_secret` 2. **网络连接**: 确保网络可以访问云服务端点 3. **异步环境**: 在异步环境中使用时,确保有运行中的事件循环 4. **资源清理**: 应用关闭时建议调用 `remove_cloud_handler()` 清理资源 5. **性能调优**: 根据实际需求调整批量大小、刷新间隔与动态批量开关 --- ### License - 随项目 License。若独立开源,请补充相应协议。