# SparkStreamingDemo **Repository Path**: mylovextt/SparkStreamingDemo ## Basic Information - **Project Name**: SparkStreamingDemo - **Description**: spark流式计算:1、主要采用structured streaming2、配置文件的方式完成数据的来源、处理、存储3、主要以SQL的方式进行数据处理 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 4 - **Created**: 2020-11-04 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # SparkStreamingDemo [toc] ## 介绍 spark流式计算: 1、主要采用structured streaming 2、配置文件的方式完成数据的来源、处理、存储 3、主要以SQL的方式进行数据处理 Structured Streaming ### 概述 SSF_LRS是一个从HDFS或Kafka读取流,进行规则过滤和数据处理并最终将处理结果存入HDFS\Kafka的流式数据处理方案。 ### 特性 1. 数据过滤规则可配置 2. 针对匹配的规则可配置数据处理操作 3. 面向接口编程规则过滤可自行扩展 4. 采用structured streaming编程方式进行书写 ## 软件架构 软件架构说明 ## 安装教程 1. xxxx 2. xxxx 3. xxxx ## 使用说明 ### 快速开始 来自HDFS或者Kafka的每条数据要以规定的分割符分割,并且每条数据占据一行,在使用的时候根据数据格式不同对[my.properties](https://gitee.com/happymorty/SparkStreamingDemo/blob/master/src/main/resources/my.properties)文件中data.separator做相应的配置 - 以“|”分割数据 ```c 17836479865|阿珍|北京 13436479866|阿强|上海 ``` - 以空格分割数据 ``` 17836479865 阿珍 北京 13436479866 阿强 上海 ``` 下载[Jar包](https://note.youdao.com/)与相应的配置文件,选择所需的配置文件进行配置(参考配置文件部分说明),检查配置文件无误后上传,根据需要在不同的应用[程序入口](#jump01)(main方法)的启动spark应用程序。 ```js spark-submit \ --class cn.com.bonc.app.StructuredStreaming...XXX... --master local[4] \ --deploy-mode client \ --driver-cores 1 \ --driver-memory 512M \ --num-executors 3 \ --executor-cores 3 \ --executor-memory 512M \ --files /...your path.../my.properties,/...your path.../rule.json,/...your path.../action.json \ /...your path.../SparkStreamingDemo-1.0-SNAPSHOT-uber.jar ``` 为了方便的运行和使用SSF_LRS内置类了多个的程序入口,以满足不同的需求。 程序入口 | 需求 ---|--- StructuredStreamingHDFS|读取来自HDFS的数据,经过外部配置文件的过滤操作 StructuredStreamingJoinHDFS | 读取来自HDFS的数据,经过外部配置文件的过滤操作之后,关联HDFS外部数据执行SQL StructuredStreamingJoinRedis | 读取来自HDFS的数据,经过外部配置文件的过滤操作之后,关联Redis外部数据执行SQL ### 基本配置文件([my.properties](https://gitee.com/happymorty/SparkStreamingDemo/blob/master/src/main/resources/my.properties)) 程序运行锁必须的基本配置,包括Spark的配置、Kafka配置、HDFS配置、Redis配置等。 ### 配置数据过滤文件([rule.json](https://gitee.com/happymorty/SparkStreamingDemo/blob/master/src/main/resources/rule.json)) rule.json可配置多条数据匹配规则,通常每条规则又由多个列上的简单规则组成。当数据命中一条规则后可以选择是否继续匹配进行数据过滤,但是需要**注意**的是多条规则的执行顺序是按照在配置文件中的顺序从上到下顺序执行的。下面是个简单的例子。 ```json [ { "sceneID": "sce01", "rules": [ { "columnIndex": "0", "operator": "LK", "value": "^1(3[0-2]|5[256]|8[56])\\d{8}$" }, { "columnIndex": "3", "operator": "NE", "value": "Impossible Equivalent Strings" } ], "logicalExpr":"C0 && C3", "hasNextRule":false, } ] ``` 上面的过滤规则将会匹配0列为手机号并且3列字符的值与 "Impossible Equivalent Strings"不相等的数据。 属性 | 含义 ---|--- sceneID | 本条规则的唯一ID rules |各个列上的简单规则集合 columnIndex| 列序号,第一列表示为0 operator|支持的操作符包括:
LK:正则表达式匹配
EQ:等于,判断字符串相等
NE:不等于,判断字符串不相等
GT:大于,将String类型的转为Double进行值比较
LT:小于,同上转换为Double比较
GE:大于等于,同上转换比较 此列>=value
LE:小于等于 value|用于比较所设定的值 logicalExpr|表达式,指定列规则之间的关系,使用C(0..5..6..9)指定列,与列序号对应 hasNextRule|当数据符合本条规则后是否继续匹配后续规则 ### 配置数据操作文件([action.json](https://gitee.com/happymorty/SparkStreamingDemo/blob/master/src/main/resources/action.json)) action.json一般与rule.json成对出现,这意味着每条规则都有与之对应的行为操作。它们的配置形式极为相似,使用scenelID来产生对应关系。当然也不是必须存在相对应的ID来进行数据的操作,应该根据具体实际情况来进行配置。下面是一个简单的例子。 ``` [ { "sceneID": "sce01", "act": [ { "operator": "DPR" } ] } ] ``` 上面的行为,与之前所配置的规则产生对应关系,通过sceneID可以判断与之对应的规则是哪一条。当数据符合规则后,将会删除本条数据。 属性 | 含义 ---|--- sceneID | 本条行为的唯一ID,与规则对应 act |各个列上的简单行为集合 columnIndex| 列序号,第一列表示为0 operator|支持的操作符包括:
REP(替换) 将指定的列替换为新的字符串
ADD(增加) 在指定列数据的基础上追加内容,默认在列后追加字符串,若需要在特定位置增加则使用'?'表示原始数据
例如,原始数据为123 在开头增加"SA" 在value中表示为: SA?
DLC(清空列) 将指定的列数据置空,但是不会删除此列
DPR(删除行) 将符合规则的行数据抹除 value|用于处理所设定的值 ### 列映射配置文件([column-mapping.properties](https://gitee.com/happymorty/SparkStreamingDemo/blob/master/src/main/resources/column-mapping.properties)) 将单列数据按照指定分割符分割后转变为多列,在配置文件中需要指定列序号以及此列的列名。你可以只选择某些列进行映射,从而方便使用Sql操作数据。下面是一个简单的例子。 ``` #列转换:将单列数据转换为多列,从0开始计数作为第一列 #包语盈|37152319110501168X|山东济宁市金乡县王丕镇|18166541879|baoyuying@msn.com|6224228625456113315 #example: 0=userName 2=address 3=phoneId 5=unionId ``` 1. 2. xxxx 3. xxxx ## 参与贡献 1. Fork 本仓库 2. 新建 Feat_xxx 分支 3. 提交代码 4. 新建 Pull Request ## 码云特技 1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md 2. 码云官方博客 [blog.gitee.com](https://blog.gitee.com) 3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解码云上的优秀开源项目 4. [GVP](https://gitee.com/gvp) 全称是码云最有价值开源项目,是码云综合评定出的优秀开源项目 5. 码云官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help) 6. 码云封面人物是一档用来展示码云会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)