# threadpool_example **Repository Path**: rainbow2019/threadpool_example ## Basic Information - **Project Name**: threadpool_example - **Description**: No description available - **Primary Language**: Python - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2025-06-01 - **Last Updated**: 2025-06-01 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ### 一、实际开发中使用线程池的好处 在实际的后端开发中,线程池是一种非常常用的并发处理手段,合理使用线程池可以带来以下几个明显的好处: #### ✅ 避免频繁创建和销毁线程带来的开销 问题:线程的创建和销毁是比较昂贵的操作,涉及内核资源分配、上下文切换等。 线程池的作用:线程池在初始化时会创建一定数量的线程(核心线程),任务来的时候复用这些线程,避免频繁创建销毁,提升性能。 #### ✅ 控制线程并发数量,避免系统资源耗尽 问题:如果每个任务都创建新线程,可能导致线程数量失控(线程爆炸),系统资源(内存、CPU)耗尽,严重时甚至会OOM。 线程池的作用:线程池有最大线程数和队列长度的限制,可以有计划地处理任务,平稳度过高峰期。 #### ✅ 提高系统吞吐量和响应能力 方式:任务可以被快速提交到线程池中排队执行,而不是阻塞在主线程中等待,释放主线程去处理更多请求。 场景举例: Web 服务中,处理 I/O 密集型任务(如访问数据库、远程接口)时,使用线程池能让主线程快速响应请求。 #### ✅ 支持任务调度与异步处理 比如: - 延迟任务(ScheduledThreadPool) - 定时任务 批量任务异步执行,提高整体效率 使用线程池 + 回调函数/Future/Channel,可以方便地实现异步逻辑、事件驱动编程。 #### ✅ 统一任务管理,提升代码结构整洁性 所有并发任务由线程池统一管理,便于调试和日志收集。 可以结合任务队列、限流、熔断等机制,打造稳健的任务处理系统。 #### ✅ 常见使用场景 | 场景 | 描述 | |------------------|---------------------------------------| | Web 后端并发任务 | 处理多个异步接口、数据库并发查询 | | 消息队列消费者 | 多线程消费消息,提高消费能力 | | 批量数据处理 | 并发处理多个文件、数据分片 | | 定时任务调度 | 使用定时线程池执行周期任务 | | 爬虫/采集系统 | 控制抓取任务的并发量,防止 IP 被封 | ### 二、项目功能说明 #### 自定义线程池`MyThreadPoolExecutor` - 限制同时执行的线程数量以及等待队列的大小 - 使用内置的`atexit`模块实现在flask项目终止前执行`shutdown_gracefully`方法,实现了: 项目重启优雅的停止线程池(等待线程池所有的任务结束后再释放线程池) - 日志实时打印等待队列的大小、提供一个接口检查等待队列的大小,用于确认线程池使用状态 - 封装了一个异步任务装饰器`async_thread`:被装饰函数可以加上 `raise_error=True` 决定当等待队列满的时候是被装饰的任务是抛出异常还是直接忽略 - **`测试等待队列满的情况.py`中测试了当等待队列满的时候,记录错误log的场景!** #### app.py中flask接口 - `/async/works`接口用于测试并发执行多个耗时的函数并整合所有函数的结果的使用方法 - `/hello_world`接口用于测试被`asyn_thread`装饰器装饰的函数是否能正常异步执行 - `/demo_error_api`接口跟hello_word接口比较: 看一下同步与异步的区别 - `/health/queue_size`接口查看当前等待队列的大小,便于监测线程池使用情况 ### 三、(拓展)使用组合的方式实现的自定义线程池 #### 代码位置 代码在 [threadpool_bounded.py](utils%2Fthreadpool_bounded.py) #### 优点 - 不继承 ThreadPoolExecutor,避免修改私有属性 - 使用组合 + 自定义 Queue 控制提交行为 - 兼容 Future,使用方式与原生线程池一致 - 可配置阻塞策略(你可以改成不阻塞而是抛异常) #### 队列满时的处理 如果你希望在提交时队列满了就抛出异常而不是阻塞,只需把: ```python self.task_queue.put((fn, args, kwargs, future), block=True) ``` 改成: ```python self.task_queue.put_nowait((fn, args, kwargs, future)) ```