# obs-flink-plugins **Repository Path**: HuaweiCloudDeveloper/obs-flink-plugins ## Basic Information - **Project Name**: obs-flink-plugins - **Description**: Flink是一个分布式的数据处理引擎,用于处理有界和无界的流式数据。Flink定义了文件系统抽象,OBS服务实现了Flink的文件系统抽象,使得OBS可以作为flink StateBackend和数据读写的载体。 - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 1 - **Created**: 2022-09-27 - **Last Updated**: 2024-01-15 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README --- title: Flink对接OBS date: 2023-01-30T19:09:43Z lastmod: 2023-02-01T09:37:38Z --- # Flink对接OBS #### 概述 Flink是一个分布式的数据处理引擎,用于处理有界和无界的流式数据。Flink定义了文件系统抽象,OBS服务实现了Flink的文件系统抽象,使得OBS可以作为flink StateBackend和数据读写的载体。 本文将以Flink 1.16.0版本为基础,介绍Flink对接OBS的整个步骤,并提供样例程序。 #### 注意事项 * flink-obs-fs-hadoop目前仅支持OBS并行文件系统。 * 不推荐state状态数据存储在OBS上。 * 为了减少日志输出,在/opt/flink-1.16.0/conf/log4j.properties文件中增加配置: ``` logger.obs.name=com.obs logger.obs.level=ERROR ``` * flink-obs-fs-hadoop的实现基于flink的plugin加载机制(flink从1.9开始引入),flink-obs-fs-hadoop必须通过flink的plugin机制进行加载,即将flink-obs-fs-hadoop放入/opt/flink-1.16.0/plugins/obs-fs-hadoop目录下。 #### 对接步骤 ##### 前提 1. 若要正常运行flink,必须提前安装好Java JDK。执行以下命令,若如下图所示,返回Java版本号信息,说明Java JDK安装成功。注意,具体返回内容因JavaJDK版本号不同有所差异。 ```bash java -version ``` ![image](assets/image-20230130192220-sxv2hkh.png) 2. 已经开通华为云obs服务,并获取obs服务的Access Key,Secret Key和endPoint信息。 ##### 一、安装flink 若已经安装flink,可跳过此环节。下面以flink-1.16.0为例,介绍整个安装过程。 1. 下载flink-1.16.0-bin-scala_2.12.tgz,并解压到/opt/flink-1.16.0目录。 ```bash $ tar -zxvf flink-1.16.0-bin-scala_2.12.tgz ``` 2. 修改环境变量 a. 打开环境变量 ```bash $ sudo vi /etc/profile ``` b. 添加环境变量 ```bash export FLINK_HOME=/opt/flink-1.12.1 export PATH=$FLINK_HOME/bin:$PATH ``` c. 使环境变量在当前环境中生效 ```bash $ source /etc/profile ``` ##### 二、安装flink-obs-fs-hadoop插件 1. 在Github下载flink-obs-fs-hadoop:[下载地址](https://github.com/huaweicloud/obsa-hdfs/tree/master/flink-release)。说明: * flink-obs-fs-{flinkversion}-hadoop-${}${version}.jar版本规则:flinkversion为对应的flink版本号,version为flink-obs-fs-hadoop版本号。 * 若没有匹配版本的jar包,可自行修改flink-obs-fs-hadoop目录下pom文件中的flink版本重新编译生成。详情见[编译指南](https://github.com/huaweicloud/obsa-hdfs/tree/flink-obs)。 2. 在/opt/flink-1.16.0/plugins目录下创建obs-fs-hadoop目录,并将上述jar放入此目录。 ##### 三、修改程序并验证 1. 在/opt/flink-1.16.0/conf/flink-conf.yaml文件中设置如下参数,说明如表1所示。 表1 flink对接obs所需参数说明 | 参数 | 说明 | | ----------------- | ---------------------------------------------------------------------------------------------------------------- | | fs.obs.impl | obs文件系统实现类,默认为org.apache.hadoop.fs.obs.OBSFileSystem | | fs.obs.access.key | 已开通obs对应华为账号的Access Key。[获取访问密钥(AK/SK)](https://support.huaweicloud.com/qs-obs/obs_qs_0005.html) | | fs.obs.secret.key | 已开通obs对应华为账号的Secret Key。[获取访问密钥(AK/SK)](https://support.huaweicloud.com/qs-obs/obs_qs_0005.html) | | fs.obs.endpoint | OBS终端节点名,各区域的终端节点详情请参见[地区和终端节点](https://developer.huaweicloud.com/endpoint)。 | | fs.obs.buffer.dir | 写数据到OBS时需要的本地临时目录,是绝对路径,flink程序需具备此目录读写权限 | ```bash fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem fs.obs.access.key: xxx fs.obs.secret.key: xxx fs.obs.endpoint: xxx fs.obs.buffer.dir: /data/buf #写数据到OBS时需要的本地临时目录,flink程序需具备此目录读写权限 ``` 2. 修改flink程序,适配obs。 * 以下代码适用于flink 1.16.0版本。示例中的obs-bucket需要替换为已开通obs服务的桶名,下同。 a. Checkpoint设置为OBS中的路径。示例: ```java env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage("obs://obs-bucket/test/checkpoint"); ``` b. FileSink设置为OBS中的路径。示例: ``` FileSink sink= FileSink.forRowFormat(new Path("obs://obs-bucket/test/sink"),new SimpleStringEncoder("UTF-8")). withBucketAssigner(new BasePathBucketAssigner()). withRollingPolicy(DefaultRollingPolicy.builder(). withMaxPartSize(10*1024).build()). withBucketCheckInterval(1000L).build(); ``` * 以下代码适用于flink 1.12.1版本。 a. StateBackend设置为OBS中的路径。 示例: ``` env.setStateBackend(new mFsStateBackend("obs://obs-bucket/test/checkpoint")); ``` b. FileSink设置为OBS中的路径。 示例: ```java final StreamingFileSink sink = StreamingFileSink .forRowFormat(new Path("obs://obs-bucket/test/data"),new SimpleStringEncoder("UTF-8")) .withBucketAssigner(new BasePathBucketAssigner()) .withRollingPolicy(rollingPolicy) .withBucketCheckInterval(1000L) .build() ``` 3. 将程序打包,上传到/opt/flink-1.16.0/examples/路径下。 4. 在/opt/flink-1.16.0/路径下,通过如下指令启动flink主进程。 ```bash $ ./bin/start-cluster.sh ``` 主进程启动后,通过如下指令启动flink任务进程。 ```bash $ ./bin/flink run ./examples/xxxx.jar ``` 5. 登录华为云控制台,查看obs数据情况,如果成功创建文件夹,且内含计算结果数据,则说明flink对接obs插件已经生效。![image](assets/image-20230131202455-igp9cgu.png) ##### 四、异常场景问题定位 若测试过程出现问题,可以通过以下两个方法来定位问题。 1. 在/opt/flink-1.16.0/log路径下,查看各项日志,是否有异常信息。 flink-root-standalonesession-0-ecs-a066.log:flink主进程日志。 flink-root-taskexecutor-0-ecs-a066.log:flink任务相关日志。 ![image](assets/image-20230131203004-94yky9u.png) 2. 通过IDEA debug模式本地联调。 a. 修改启动配置,打开启动配置修改界面后,依次点击“Modify options”,“Add dependencies with "provided" scope to classpath”。将Flink相关依赖 ![image](assets/image-20230131203349-ccxb67f.png) b. 在src/resources/路径下,新增名为core-site.xml的配置文件,文件内容如下: ```bash fs.obs.access.key xxx fs.obs.secret.key xxx fs.obs.endpoint xxx fs.obs.buffer.dir D:\\tmp ``` c. 开启联调。 ‍