# 分布式数据实验 **Repository Path**: wylblog/distributed-data-experiment ## Basic Information - **Project Name**: 分布式数据实验 - **Description**: 桂林电子科技大学大数据管理与应用分布式数据实验 作  者:無以菱 联系邮箱:huangjing2001.guet@qq.com - **Primary Language**: Python - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-06-05 - **Last Updated**: 2025-06-09 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 实时支付数据分析系统 ## 项目概述 (个人实验课作业) ​ 本项目是一个实时支付数据分析系统,通过Kafka、Spark、HBase和Redis等组件,实现了从数据生成、处理到可视化展示的完整流程。系统模拟生成支付订单数据,进行实时统计分析,并通过Web界面展示结果。 ## 系统架构 ![image-20250605134011565](https://hj-typora-images-1319512400.cos.ap-guangzhou.myqcloud.com/images/202506051719403.png) 系统由以下几个主要组件构成: 1. **Kafka生产者**:模拟生成实时支付订单数据 2. **Kafka消费者**:从Kafka读取数据并写入HBase 3. **HBase数据库**:存储订单数据 4. **Spark分析引擎**:定期从HBase读取数据并进行统计分析 5. **Redis缓存**:存储分析结果 6. **Flask Web应用**:展示数据分析结果 ## 数据流向 ```mermaid sequenceDiagram participant Producer as Kafka生产者 participant Kafka as Kafka集群 participant Consumer as Kafka消费者 participant HBase as HBase数据库 participant Spark as Spark分析引擎 participant Redis as Redis缓存 participant Web as Flask Web应用 Producer->>Kafka: 1. 生成模拟订单数据 Kafka->>Consumer: 2. 消费订单数据 Consumer->>HBase: 3. 写入订单数据 Spark->>HBase: 4. 读取订单数据 Spark->>Redis: 5. 写入统计结果 Web->>Redis: 6. 读取统计结果 Web->>Web: 7. 展示数据分析结果 ``` 1. Kafka生产者生成模拟订单数据并发送到Kafka主题 2. Kafka消费者从Kafka读取数据并写入HBase数据库 3. Spark定期从HBase读取数据并进行分析 4. 分析结果存入Redis缓存 5. Flask Web应用从Redis读取结果并展示 ## 主要组件说明 ### Kafka生产者 (kafka_producer.py) - 功能:模拟生成支付订单数据,包含订单ID、支付金额、支付方式和创建时间 - 配置:连接到Kafka集群(192.168.10.10:9092等) - 主题:real_time_orders ### Kafka消费者 (kafka_consumer.py) - 功能:从Kafka读取订单数据并写入HBase - 配置:连接到Kafka集群和HBase服务器 - 消费组:order_processor ### HBase表创建 (create_hbase_table.py) - 功能:创建HBase表用于存储订单数据 - 表名:real_time_orders_table - 列族:cf(带有版本控制和缓存优化) ### Spark分析引擎 (spark_hbase_redis.py) - 功能:从HBase读取数据,按支付方式统计金额,结果存入Redis - 分析频率:每5秒执行一次 - 输出:JSON格式的统计结果 ### Flask Web应用 (flask_app.py) - 功能:提供Web界面展示统计结果 - 访问地址:http://localhost:5000 - API端点:/api/stats(返回最新统计数据) - 页面:支付方式金额统计图表(带自动刷新) ### 主程序 (main.py) - 功能:协调各组件运行 - 线程1:Kafka生产者(后台) - 线程2:Kafka消费者(后台) - 线程3:Spark分析任务(后台,定时执行) - 主线程:Flask Web应用 ### 配置管理 (config.py) - 功能:集中管理所有组件的配置参数 - 包含:Kafka、HBase、Redis、Spark、Flask等配置 ## 使用方法 ### 环境要求 - Python 3.10+ - Apache Kafka - Apache HBase - Apache Spark - Redis - 相关Python库:pyspark, happybase, kafka-python, redis, flask, pandas ### 版本兼容性 #### Java与PySpark版本兼容性 | PySpark版本 | 兼容的Java版本 | |------------|--------------| | 4.0.0 | Java 17+ | | 3.5.x | Java 8/11/17 | | 3.4.x | Java 8/11/17 | | 3.3.x | Java 8/11/17 | | 3.2.x | Java 8/11 | | 3.1.x | Java 8/11 | | 3.0.x | Java 8/11 | | 2.4.x | Java 8 | **注意**:本项目默认使用PySpark 3.3.3,兼容Java 8/11/17。 ### 环境设置 ##### 1. 设置Java环境 略 ##### 2. 安装Python依赖 使用提供的requirements.txt安装依赖: ```bash pip install -r requirements.txt ``` 或者手动安装: ```bash pip install pyspark==3.3.3 happybase kafka-python redis Flask pandas ``` ##### 3. 检查环境配置 运行环境检查脚本,确认所有组件都配置正确: ```bash python3 check_environment.py ``` ![image-20250605172249829](https://hj-typora-images-1319512400.cos.ap-guangzhou.myqcloud.com/images/202506051722908.png) ### 启动步骤 1. 确保Kafka、HBase和Redis服务已启动 2. 创建HBase表 ```bash python3 create_hbase_table.py ``` 3. 启动主程序 ```bash python3 main.py ``` 4. 访问Web界面:http://localhost:5000 ![image-20250605140630962](https://hj-typora-images-1319512400.cos.ap-guangzhou.myqcloud.com/images/202506051747696.png) ## 调试与测试 ### 组件单独调试 每个组件都可以单独运行进行调试: 1. **Kafka生产者调试** ```bash python3 kafka_producer.py ``` ![image-20250605173614188](https://hj-typora-images-1319512400.cos.ap-guangzhou.myqcloud.com/images/202506051736234.png) 2. **Kafka消费者调试** ```bash python3 kafka_consumer.py ``` ![image-20250605173750850](https://hj-typora-images-1319512400.cos.ap-guangzhou.myqcloud.com/images/202506051737919.png) 3. **HBase表创建调试** ```bash python3 create_hbase_table.py ``` ![image-20250605173929520](https://hj-typora-images-1319512400.cos.ap-guangzhou.myqcloud.com/images/202506051739573.png) 4. **Spark分析调试** ```bash python3 spark_hbase_redis.py ``` ![image-20250605174122554](https://hj-typora-images-1319512400.cos.ap-guangzhou.myqcloud.com/images/202506051741603.png) 5. **Flask应用调试** ```bash python3 flask_app.py ``` ![image-20250605174336153](https://hj-typora-images-1319512400.cos.ap-guangzhou.myqcloud.com/images/202506051743219.png) ### 连接测试 使用测试脚本测试各组件连接和功能: ```bash python3 test_components.py ``` ![image-20250605172501896](https://hj-typora-images-1319512400.cos.ap-guangzhou.myqcloud.com/images/202506051741650.png) 测试脚本会检查: - Kafka连接 - HBase连接 - Redis连接 - Kafka生产者和消费者功能 - HBase读写功能 ### 配置检查 检查当前配置参数: ```bash python config.py ``` ![image-20250605174513560](https://hj-typora-images-1319512400.cos.ap-guangzhou.myqcloud.com/images/202506051745621.png) ## 常见问题与解决方案 ### 1. Kafka功能测试错误 **问题**:Kafka功能测试失败,出现错误 `KafkaTimeoutError: Failed to update metadata after 60.0 secs.` **原因**: - Kafka集群内部使用主机名(node1, node2, node3)进行通信 - 测试环境无法解析这些主机名,导致DNS查找失败 **解决方案**: - 修改Kafka集群配置,使用IP地址而非主机名 - 或在测试环境中添加主机名解析(如修改/etc/hosts文件) ### 2. Spark分析组件错误 **问题**:Spark分析组件失败,出现错误 `UnsupportedClassVersionError: org/apache/spark/launcher/Main has been compiled by a more recent version of the Java Runtime (class file version 61.0)` **原因**: - PySpark 4.0.0需要Java 17,而系统中使用的是Java 11或Java 8 - Java版本与PySpark版本不兼容 **解决方案**: - 降级PySpark到与当前Java版本兼容的版本:`pip install pyspark==3.3.3` - 或升级Java到版本17 **执行命令**: ```bash pip uninstall -y pyspark pip install pyspark==3.3.3 ``` ### 3. Java环境配置问题 **问题**:系统无法找到Java或使用了错误的Java版本 **原因**: - JAVA_HOME环境变量未设置或设置不正确 - PATH中没有包含Java可执行文件路径 **解决方案**: ```bash # 检查当前Java版本 java -version # 设置JAVA_HOME环境变量 export JAVA_HOME=/usr/local/jdk # 将Java添加到PATH export PATH=$JAVA_HOME/bin:$PATH # 将以上设置添加到~/.bashrc文件以永久生效 echo 'export JAVA_HOME=/usr/local/jdk' >> ~/.bashrc echo 'export PATH=$JAVA_HOME/bin:$PATH' >> ~/.bashrc source ~/.bashrc ``` ## 注意事项 - 需要配置正确的Kafka、HBase和Redis连接信息 - 默认配置假设Kafka和HBase运行在192.168.10.10,Redis运行在localhost - 系统使用的端口:Kafka(9092)、HBase(9090)、Redis(6379)、Flask(5000) - 确保Java版本与PySpark版本兼容 - 项目默认使用/usr/local/jdk作为Java安装路径,如有不同请修改环境设置 ## 系统监控 - Kafka生产者会在控制台输出启动信息 - Kafka消费者会在控制台输出数据处理信息 - Spark分析任务会在控制台输出统计结果 - Flask应用会在控制台输出访问日志 - Web界面提供系统状态监控 ## 最近改进 1. **添加了Kafka消费者**:完善了从Kafka到HBase的数据流 2. **增强了错误处理**:所有组件都添加了异常处理和日志记录 3. **改进了Web界面**:添加了Bootstrap样式、自动刷新和状态监控 4. **优化了HBase表结构**:添加了版本控制和缓存配置 5. **添加了API端点**:提供JSON格式的数据接口 6. **完善了文档**:详细说明了系统架构、组件和使用方法 7. **添加了调试功能**:每个组件可独立运行和调试 8. **创建了测试脚本**:用于测试各组件连接和功能 9. **集中化配置**:创建config.py统一管理配置参数 10. **修复版本兼容性问题**:解决了Java与PySpark版本不兼容的问题 11. **自动化环境设置**:添加了环境检查脚本和自动设置脚本 12. **依赖管理**:提供了requirements.txt文件管理Python依赖 13. **优化Spark数据处理**:使用Spark DataFrame API替代pandas进行数据分析,提高大数据处理能力 14. **增强数据分析维度**:添加了订单数量和平均金额等多维度分析指标 15. **改进数据结构**:使用明确的Schema定义提高数据处理效率和可靠性 16. **优化JSON处理**:使用Spark原生JSON转换功能,提高大数据集的处理性能 ## Spark数据处理优化说明 ### 优化前后对比 #### 优化前 - 使用pandas进行数据分析,不适合大数据量处理 - 只计算支付方式的总金额一个维度 - 数据转换步骤繁琐,效率较低 - 无法利用分布式计算能力 #### 优化后 - 完全使用Spark DataFrame API进行数据分析 - 增加了订单数量和平均金额等多维度分析 - 使用明确的Schema定义,提高数据处理效率 - 充分利用Spark的分布式计算能力,适合大数据量场景 - 使用Spark的排序功能对结果进行排序,便于分析重要数据 - 优化了结果显示和JSON转换流程 ### 性能提升 - 大数据集处理能力显著提升 - 内存使用更加高效 - 处理速度更快,特别是在数据量增长时 - 系统扩展性更好,可以通过增加节点线性提升性能 ### 使用说明 运行优化后的Spark分析组件: ```bash python spark_hbase_redis.py ``` 分析结果将显示在控制台并存储到Redis中,包含以下指标: - PAYWAY: 支付方式 - TOTAL_AMOUNT: 支付总金额 - ORDER_COUNT: 订单数量 - AVG_AMOUNT: 平均订单金额