# data_chunker **Repository Path**: ugly-xue/data_chunker ## Basic Information - **Project Name**: data_chunker - **Description**: 高效数据分块处理模块,用于将大型字典列表数据分割成固定大小的块 - **Primary Language**: Python - **License**: MIT - **Default Branch**: master - **Homepage**: https://gitee.com/ugly-xue/data_chunker - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-09-18 - **Last Updated**: 2025-09-18 ## Categories & Tags **Categories**: Uncategorized **Tags**: pyt, Python ## README # DataChunker 高效数据分块处理模块,用于将大型字典列表数据分割成固定大小的块,并将每个块保存为JSON文件。 ## 功能特性 - 支持顺序和并行处理模式 - 支持流式数据处理 - 可配置的分块大小和输出目录 - 详细的日志记录 - 异常处理和错误恢复 - 文件管理功能 ## 安装 ### 从PyPI安装 ```bash pip install py-data-chunker ``` ### 从源代码安装 1. 克隆仓库或下载源代码 ```bash git clone https://gitee.com/ugly-xue/data_chunker.git ``` 2. 进入项目目录 ```bash cd data_chunker ``` 3. 运行安装命令 ```bash pip install . ``` ## 快速开始 ### 基本使用 ```python from data_chunker import DataChunker, generate_sample_data # 创建数据分块处理器 chunker = DataChunker(chunk_size=500, output_dir="output") # 生成示例数据 data = generate_sample_data(3000) # 处理数据 results = chunker.process(data) print(f"处理完成,共保存 {len(results)} 个文件") ``` ### 并行处理 ```python from data_chunker import DataChunker # 创建支持并行处理的实例 chunker = DataChunker(parallel=True, workers=4) # 处理数据 results = chunker.process(data) ``` ### 流式处理 ```python from data_chunker import DataChunker, json_stream_reader # 创建实例 chunker = DataChunker() # 处理流式数据 results = chunker.process_stream(json_stream_reader("large_data.json")) ``` ## API参考 ### DataChunker类 #### 初始化参数 - `chunk_size` (int): 每个分块的大小,默认为500 - `output_dir` (str): 输出目录,默认为"output" - `prefix` (str): 文件名前缀,默认为"chunk" - `timestamp` (bool): 是否在文件名中添加时间戳,默认为True - `parallel` (bool): 是否使用并行处理,默认为False - `workers` (int): 并行工作线程数,默认为4 - `logger` (Logger): 自定义日志记录器,默认为None #### 主要方法 - `process(data)`: 处理数据并将其分割保存为JSON文件 - `process_stream(stream_generator)`: 处理流式数据 - `split(data)`: 将数据分割成多个固定大小的块(生成器) - `set_config(**kwargs)`: 动态更新配置参数 - `get_file_list()`: 获取已保存的文件列表 - `clear_output()`: 清空输出目录 ### 工具函数 - `setup_logger()`: 初始化日志记录器 - `generate_sample_data(size=3000)`: 生成示例数据 - `json_stream_reader(file_path)`: 流式读取JSON文件 - `read_json_file(file_path)`: 读取JSON文件 - `write_json_file(data, file_path, indent=2)`: 写入JSON文件 ## 高级用法 ### 自定义处理逻辑 继承DataChunker类并覆盖`_process_chunk`方法,自定义处理逻辑。 ```python from datetime import datetime from data_chunker import DataChunker class CustomDataChunker(DataChunker): def _process_chunk(self, chunk, chunk_index): # 自定义处理逻辑 for item in chunk: item["processed"] = True item["process_time"] = datetime.now().isoformat() # 调用父类方法保存 return super()._process_chunk(chunk, chunk_index) # 使用自定义处理器 custom_chunker = CustomDataChunker() results = custom_chunker.process(data) ``` ### 集成到数据处理管道 集成DataChunker到数据处理管道中,实现数据处理和保存。 ```python from data_chunker import DataChunker def data_processing_pipeline(): # 创建分块处理器 chunker = DataChunker(output_dir="processed_data") # 从数据源获取数据 raw_data = get_data_from_source() # 处理数据 results = chunker.process(raw_data) # 对处理结果进行后续操作 for file_path in results: process_result_file(file_path) return results ``` ## 异常处理 模块定义了以下异常类: - `DataChunkerError`: 基础异常类 - `FileSaveError`: 文件保存错误 - `ChunkSizeError`: 分块大小错误 - `InvalidDataError`: 无效数据错误 ### 使用方法 ```python from data_chunker import DataChunker, FileSaveError try: chunker = DataChunker(chunk_size=0) # 这会抛出ChunkSizeError except ChunkSizeError as e: print(f"配置错误: {e}") try: results = chunker.process(data) except FileSaveError as e: print(f"文件保存失败: {e}") ``` ## 性能建议 1. **小数据集**:使用顺序处理模式(`parallel=False`) 2. **大数据集**:使用并行处理模式(`parallel=True`)并适当增加工作线程数 3. **极大数据集**:使用`process_stream`方法进行流式处理,避免内存溢出 4. **文件I/O密集型**:考虑使用SSD存储或增加并行工作线程数 ## 贡献指南 1. Fork本项目 2. 创建特性分支(`git checkout -b feature/AmazingFeature`) 3. 提交更改(`git commit -m 'Add some AmazingFeature'`) 4. 推送到分支(`git push origin feature/AmazingFeature`) 5. 开启Pull Request ## 许可证 本项目采用MIT许可证。详见[LICENSE](LICENSE)文件。 ## 支持 如果您遇到任何问题或有改进建议,请通过以下方式联系: - 邮箱: basui6996@gmail.com - Gitee: [提交Issue](https://gitee.com/ugly-xue/data_chunker/issues) ## 版本历史 - 1.0.0 (2025-09-18) - 初始版本发布 - 实现基本数据分块功能 - 支持顺序和并行处理 - 添加流式处理支持