# flink-hbase **Repository Path**: mpv945/flink-hbase ## Basic Information - **Project Name**: flink-hbase - **Description**: flink实现Hbase操作示例 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-07-14 - **Last Updated**: 2025-07-18 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Flink HBase ETL 基于 **Apache Flink 1.17.x** 的高性能 ETL 作业,支持从 Kerberos 认证的 HBase 异步并行读取数据,转换后写入 HDFS 的 Parquet 格式文件。 ## 🚀 功能特性 - **异步并行读取**: 使用 HBase AsyncTable API 和线程池实现高吞吐量数据读取 - **Kerberos 安全认证**: 完整支持 HBase 和 HDFS 的 Kerberos 认证 - **Parquet 高效存储**: 支持 Snappy/Zstd 压缩,按日期分区存储 - **数据质量保障**: 内置数据清洗、验证和质量评分机制 - **水平可扩展**: 支持动态调整并行度,适应不同集群规模 - **行级更新覆盖**: 基于 RowKey 的幂等写入,确保数据一致性 - **生产级监控**: 完善的日志、指标和检查点机制 ## 📋 系统要求 - **Java**: 11 或以上版本 - **Apache Flink**: 1.17.x - **Apache HBase**: 2.x - **Apache Hadoop**: 3.x - **Maven**: 3.6+ (用于构建) ## 🏗️ 项目结构 ``` flink-hbase/ ├── pom.xml # Maven 项目配置 ├── README.md # 项目文档 ├── src/ │ ├── main/ │ │ ├── java/com/flink/hbase/ │ │ │ ├── User.java # 用户数据模型 │ │ │ ├── HBaseAsyncSource.java # HBase 异步数据源 │ │ │ ├── UserMapper.java # 数据转换器 │ │ │ ├── ParquetFileSink.java # Parquet 文件输出 │ │ │ ├── JobLauncher.java # 主启动类 │ │ │ ├── DimensionData.java # 维度数据模型 │ │ │ ├── JdbcDimensionSource.java # JDBC 维度数据源 │ │ │ ├── EnrichedUser.java # 丰富化用户模型 │ │ │ ├── UserEnrichmentFunction.java # 广播流处理函数 │ │ │ └── BroadcastJobLauncher.java # 广播流作业启动器 │ │ └── resources/ │ │ ├── application.conf # 应用配置文件 │ │ └── log4j2.xml # 日志配置 │ └── test/ # 测试代码 ├── scripts/ # 部署和运行脚本 └── docs/ # 项目文档 ├── data-lake-formats-guide.md # 数据湖格式指南 ├── quick-start-examples.md # 快速开始示例 ├── configuration-comparison.md # 配置参数对比 └── broadcast-stream-example.md # 广播流使用示例 ``` ## 🔧 快速开始 ### 1. 项目构建 ```bash # 克隆项目 git clone cd flink-hbase # Maven 编译打包 mvn clean package -DskipTests # 生成的 JAR 文件 ls target/flink-hbase-etl-1.0-SNAPSHOT.jar ``` ### 2. 配置文件准备 #### Kerberos 配置文件 确保以下文件可用: - `krb5.conf` - Kerberos 配置文件 - `user.keytab` - 用户认证密钥文件 - `core-site.xml`, `hdfs-site.xml`, `hbase-site.xml` - Hadoop/HBase 配置 #### 应用参数配置 编辑 `src/main/resources/application.conf` 或通过命令行参数指定: ```hocon # 基本配置 hbase.table.name = "user" output.path = "/data/warehouse/user/" # Kerberos 配置 security.kerberos.enabled = true security.kerberos.login.principal = "user@REALM.COM" security.kerberos.login.keytab = "/path/to/user.keytab" # HBase 连接 hbase.zookeeper.quorum = "zk1,zk2,zk3" hbase.zookeeper.port = 2181 ``` ### 3. 本地测试运行 #### HBase ETL 作业 ```bash # 本地模式运行(使用模拟数据) java -cp target/flink-hbase-etl-1.0-SNAPSHOT.jar \ com.flink.hbase.JobLauncher \ --hbase.table.name user \ --output.path /tmp/output \ --source.type mock \ --mock.data.size 100 \ --parallelism 2 ``` #### 广播流数据丰富化作业 ```bash # 广播流丰富化作业(需要 MySQL 数据库) java -cp target/flink-hbase-etl-1.0-SNAPSHOT.jar \ com.flink.hbase.BroadcastJobLauncher \ --jdbc.url "jdbc:mysql://localhost:3306/flink_demo?useSSL=false&serverTimezone=UTC" \ --jdbc.username root \ --jdbc.password your_password \ --dimension.table.name user_dimensions \ --user.source.type mock \ --output.type print \ --parallelism 2 ``` ## 🚀 生产部署 ### YARN 集群模式 #### 1. 准备 Kerberos 文件 ```bash # 上传 Kerberos 相关文件到 HDFS hdfs dfs -put krb5.conf /user/flink/conf/ hdfs dfs -put user.keytab /user/flink/conf/ hdfs dfs -put core-site.xml /user/flink/conf/ hdfs dfs -put hdfs-site.xml /user/flink/conf/ hdfs dfs -put hbase-site.xml /user/flink/conf/ ``` #### 2. 提交作业 ```bash # YARN Session 模式 export HADOOP_CONF_DIR=/etc/hadoop/conf export HBASE_CONF_DIR=/etc/hbase/conf flink run \ --target yarn-per-job \ --jobmanager-memory 2048m \ --taskmanager-memory 4096m \ --parallelism 8 \ --files "hdfs:///user/flink/conf/krb5.conf,hdfs:///user/flink/conf/user.keytab,hdfs:///user/flink/conf/core-site.xml,hdfs:///user/flink/conf/hdfs-site.xml,hdfs:///user/flink/conf/hbase-site.xml" \ target/flink-hbase-etl-1.0-SNAPSHOT.jar \ --job.name "HBase-ETL-Production" \ --hbase.table.name user \ --output.path /data/warehouse/user/ \ --source.type file \ --source.file.path /data/input/user_ids.txt \ --security.kerberos.enabled true \ --security.kerberos.login.principal user@REALM.COM \ --security.kerberos.login.keytab ./user.keytab \ --hbase.zookeeper.quorum zk1,zk2,zk3 \ --async.pool.size 20 \ --sink.file.size.mb 128 \ --sink.compression.codec SNAPPY ``` #### 3. 批量提交脚本 创建 `scripts/submit-job.sh`: ```bash #!/bin/bash # 参数检查 if [ $# -lt 2 ]; then echo "使用方法: $0 [additional_params...]" exit 1 fi TABLE_NAME=$1 OUTPUT_PATH=$2 shift 2 # 环境变量 export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop/conf} export HBASE_CONF_DIR=${HBASE_CONF_DIR:-/etc/hbase/conf} export FLINK_HOME=${FLINK_HOME:-/opt/flink} # Kerberos 文件路径 KRB5_CONF="hdfs:///user/flink/conf/krb5.conf" KEYTAB_FILE="hdfs:///user/flink/conf/user.keytab" HADOOP_CONFIGS="hdfs:///user/flink/conf/core-site.xml,hdfs:///user/flink/conf/hdfs-site.xml,hdfs:///user/flink/conf/hbase-site.xml" # 提交作业 ${FLINK_HOME}/bin/flink run \ --target yarn-per-job \ --jobmanager-memory 2048m \ --taskmanager-memory 4096m \ --parallelism 8 \ --files "${KRB5_CONF},${KEYTAB_FILE},${HADOOP_CONFIGS}" \ target/flink-hbase-etl-1.0-SNAPSHOT.jar \ --hbase.table.name ${TABLE_NAME} \ --output.path ${OUTPUT_PATH} \ --security.kerberos.enabled true \ --security.kerberos.login.principal user@REALM.COM \ --security.kerberos.login.keytab ./user.keytab \ --hbase.zookeeper.quorum zk1,zk2,zk3 \ "$@" echo "作业提交完成: ${TABLE_NAME} -> ${OUTPUT_PATH}" ``` ### Standalone 集群模式 ```bash # 启动 Flink 集群 $FLINK_HOME/bin/start-cluster.sh # 提交作业 $FLINK_HOME/bin/flink run \ --parallelism 4 \ target/flink-hbase-etl-1.0-SNAPSHOT.jar \ --hbase.table.name user \ --output.path /data/warehouse/user/ ``` ## ⚙️ 参数配置 ### 必需参数 | 参数 | 说明 | 示例值 | |------|------|--------| | `--hbase.table.name` | HBase 表名 | `user` | | `--output.path` | HDFS 输出路径 | `/data/warehouse/user/` | ### 性能调优参数 | 参数 | 说明 | 默认值 | 建议值 | |------|------|--------|--------| | `--parallelism` | 整体并行度 | `4` | `8-16` | | `--async.pool.size` | 异步查询线程池大小 | `10` | `20-50` | | `--async.timeout.ms` | 异步查询超时时间 | `5000` | `3000-10000` | | `--sink.file.size.mb` | 输出文件大小限制 | `128` | `64-256` | | `--checkpoint.interval` | 检查点间隔 | `60000` | `30000-300000` | ### Kerberos 安全参数 | 参数 | 说明 | 示例值 | |------|------|--------| | `--security.kerberos.enabled` | 启用 Kerberos | `true` | | `--security.kerberos.login.principal` | 用户主体 | `user@REALM.COM` | | `--security.kerberos.login.keytab` | Keytab 文件路径 | `./user.keytab` | ### HBase 连接参数 | 参数 | 说明 | 示例值 | |------|------|--------| | `--hbase.zookeeper.quorum` | ZooKeeper 地址 | `zk1,zk2,zk3` | | `--hbase.zookeeper.port` | ZooKeeper 端口 | `2181` | | `--hbase.client.retries` | 重试次数 | `3` | ## 📊 监控和运维 ### 性能指标监控 作业运行时关键指标: 1. **吞吐量指标** - Records/second:每秒处理记录数 - Bytes/second:每秒处理字节数 2. **延迟指标** - HBase 查询平均延迟 - End-to-end 处理延迟 3. **资源使用** - CPU 使用率 - 内存使用率 - 网络 I/O ### 数据质量监控 ```bash # 检查输出文件 hdfs dfs -ls /data/warehouse/user/dt=2023-12-01/ # 验证 Parquet 文件 parquet-tools head hdfs://namenode:8020/data/warehouse/user/dt=2023-12-01/user-data-0-0.parquet # 统计记录数 spark-shell --packages org.apache.spark:spark-sql_2.12:3.4.0 scala> spark.read.parquet("/data/warehouse/user/dt=2023-12-01").count() ``` ### 故障排查 #### 常见问题 1. **Kerberos 认证失败** ```bash # 检查 Kerberos 票据 klist -kt user.keytab kinit -kt user.keytab user@REALM.COM ``` 2. **HBase 连接超时** ```bash # 测试 HBase 连接 hbase shell > status > list ``` 3. **HDFS 写入权限问题** ```bash # 检查 HDFS 权限 hdfs dfs -ls /data/warehouse/ hdfs dfs -chmod 755 /data/warehouse/user/ ``` #### 日志分析 ```bash # 查看应用日志 tail -f logs/app.log # 查看错误日志 tail -f logs/flink-hbase-etl-error.log # 在 YARN 中查看日志 yarn logs -applicationId application_xxxx_xxxx ``` ## 🔧 性能调优指南 ### 读取端优化 1. **增加异步线程池大小** ```bash --async.pool.size 50 ``` 2. **调整 HBase 客户端缓存** ```bash --hbase.client.scanner.caching 2000 ``` 3. **优化连接数** ```bash --hbase.connection.pool.size 20 ``` ### 写入端优化 1. **调整文件滚动策略** ```bash --sink.file.size.mb 256 --sink.rollover.interval.ms 300000 ``` 2. **选择合适的压缩格式** ```bash --sink.compression.codec ZSTD # 更高压缩比 --sink.compression.codec SNAPPY # 更快压缩速度 ``` 3. **增加 Sink 并行度** ```bash --sink.parallelism 16 ``` ### 内存优化 ```bash # JobManager 内存配置 --jobmanager-memory 4096m # TaskManager 内存配置 --taskmanager-memory 8192m # 网络缓冲区配置 -Dtaskmanager.network.memory.max=2gb -Dtaskmanager.memory.managed.size=4gb ``` ## 🧪 测试验证 ### 单元测试 ```bash # 运行单元测试 mvn test # 运行特定测试类 mvn test -Dtest=UserMapperTest ``` ### 集成测试 ```bash # 使用 Docker 启动 HBase docker-compose up -d hbase # 运行集成测试 mvn verify -Pintegration-test ``` ### 性能测试 ```bash # 生成测试数据 ./scripts/generate-test-data.sh 1000000 # 运行性能测试 ./scripts/performance-test.sh user /tmp/perf-output ``` ## 📝 版本历史 - **v1.0.0** (2023-12-01) - 初始版本发布 - 支持 HBase 异步读取 - 支持 Parquet 格式输出 - 完整 Kerberos 认证支持 ## 🤝 贡献指南 1. Fork 项目 2. 创建特性分支 (`git checkout -b feature/amazing-feature`) 3. 提交更改 (`git commit -m 'Add amazing feature'`) 4. 推送到分支 (`git push origin feature/amazing-feature`) 5. 创建 Pull Request ## 📄 许可证 本项目采用 Apache License 2.0 许可证 - 查看 [LICENSE](LICENSE) 文件了解详情。 ## 📞 技术支持 - 提交 Issue: [GitHub Issues](https://github.com/your-org/flink-hbase/issues) - 文档网站: [项目文档](https://your-org.github.io/flink-hbase) - 邮件联系: support@yourorg.com --- **⚡ 立即开始您的大数据 ETL 之旅!**