# spark-api **Repository Path**: githuawei/spark-api ## Basic Information - **Project Name**: spark-api - **Description**: spark-core/spark-sql/spark-streaming - **Primary Language**: Scala - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2022-02-17 - **Last Updated**: 2023-02-21 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README 1.RDD -弹性分布式数据集 -弹性 *存储 *计算和容错 *分区 -分布式 不同分区中的数据会分配给集群中的不同服务器节点进行计算 -数据集 和集合不一样,没有存放数据,存的是计算逻辑 2.RDD的五个特性 -一组分区 getPartitions -分区计算函数 compute -RDD之间的依赖 getDependence -分区器Partitioner(对于KV类型的RDD) -数据存储优先位置 getPreferedLocation 3.RDD创建的方式 -通过内存集合创建 -读取外部文件创建 -通过RDD的转换算子转换 4.创建RDD分区方式 -通过内存集合创建 *默认的分区方式 取决于分配给当前应用的CPU核数 *指定分区 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } } -读取外部文件创建 *默认的分区方式 取决于分配给当前应用的CPU核数和2取最小值 *指定分区 >FileInputFormat getSplits 切片 >LineRecordReader next 读取 5.RDD的常用算子 -转换算子Transformation *转换算子执行完毕之后,会创建新的RDD,并不会马上执行计算 *map 对RDD中的元素进行一个个映射 *mapPartitions 以分区为单位,对RDD中的元素进行映射 *mapPartitionsWithIndex 以分区为单位,对RDD中的元素进行映射,并且带分区编号 *flatMap 对RDD中的元素进行扁平化处理 *glom 将RDD中每一个分区中的单个元素,转换为数组 *groupBy 按照一定的规则,对RDD中的元素进行分组 *filter 按照一定的规则,对RDD中的元素进行过滤 *sample >参数1:是否抽样放回 true放回 false不放回 >参数2 参数1 true 期望元素出现的次数 > 0 参数1 false 每一个元素出现的概率 [0,1] >参数3 随机算法的初始值(种子) >takeSample(行动算子) *distinct 去重 底层是通过map + reduceByKey完成去重操作 *改变分区 >coalesce 一般用于缩减分区,默认不执行shuffle >repartition 一般用于扩大分区,默认执行shuffle 底层调用的就是coalesce *sortBy 按照指定规则,对RDD中的元素进行排序,默认升序 *pipe 对于RDD中的每一个分区,都会执行pipe算子中指定的脚本 *union 合集 *intersection 交集 *subtract 差集 *zip 拉链 注意:必须要保证分区数以及每一个分区中元素的个数一致 *partitionBy 按照指定的分区器,通过key对RDD中的元素进行分区 默认分区器 HashPartitioner *reduceByKey 将相同的key放在一起,对Value进行聚合操作 *groupByKey 按照key对RDD中的元素进行分组 *aggregateByKey(zeroValue)(分区内计算规则,分区间计算规则) *foldByKey(zereValue)(内间计算规则) 是aggregateByKey的简化,区内和分区间计算规则相同 *combineByKey(对当前key的value进行转换,分区内计算规则,分区间计算规则) *几种聚合算子对比 >reduceByKey(_+_) combineByKeyWithClassTag[V]((v: V) => v, func, func) >aggregateByKey(zeroValue)(cleanedSeqOp,combOp) combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),cleanedSeqOp, combOp) >foldByKey combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),cleanedFunc, cleanedFunc) >combineByKey combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners) *sortByKey 按照RDD中的key对元素进行排序 *mapValues 只对RDD中的Value进行操作 *join&cogroup -行动算子Action *行动算子执行后,才会触发计算 *reduce 对RDD中的元素进行聚合 *collect.foreach和foreach >collect.foreach 将每一个Excutor中的数据收集到Driver,形成一个新的数组 .foreach不是一个算子,是集合的方法,是对数组中的元素进行遍历 >对RDD中的元素进行遍历 *count 获取RDD中元素的个数 *countByKey 获取RDD中每个key对应的元素个数 *first 获取RDD中第一个元素 *take 获取RDD中的前几个元素 *takeOrdered 获取排序后的RDD中的前几个元素 *aggregate&fold >aggregateByKey 处理kv类型的RDD,并且在进行分区间聚合的时候,初始值不参与运算 >fold 是aggregate的简化版 *save相关的算子 >saveAsTextFile >saveAsObjectFile >saveAsSequenceFile(只针对KV类型RDD) 6.序列化以及闭包检查 -为什么要序列化 因为在Spark程序中,算子相关的操作在Excutor上执行,算子之外的代码在Driver端执行, 在执行有些算子的时候,需要使用到Driver里面定义的数据,这就涉及到了跨进程或者跨节点之间的通讯, 所以要求传递给Excutor中的数组所属的类型必须实现Serializable接口 -如何判断是否实现了序列化接口 在作业job提交之前,其中有一行代码 val cleanF = sc.clean(f),用于进行闭包检查 之所以叫闭包检查,是因为在当前函数的内部访问了外部函数的变量,属于闭包的形式。 如果算子的参数是函数的形式,都会存在这种情况 -java和kryo序列化方式 7.查看RDD的血缘关系以及依赖关系 -血缘关系 toDebugString -依赖关系 >dependencies >窄依赖 父RDD一个分区中的数据,还是交给子RDD的一个分区处理 >宽依赖 父RDD一个分区中的数据,交给子RDD的多个分区处理 8.Spark的Job调度 -集群(Standalone|Yarn) *一个Spark集群可以同时运行多个Spark应用 -应用 *我们所编写的完成某些功能的程序 *一个应用可以并发的运行多个Job -Job *Job对应着我们应用中的行动算子,每次执行一个行动算子,都会提交一个Job *一个Job由多个Stage组成 -Stage *一个宽依赖做一次阶段的划分 *阶段的个数 = 宽依赖个数 + 1 *一个Stage由多个Task组成 -Task *每一个阶段最后一个RDD的分区数,就是当前阶段的Task个数 9.RDD的持久化 -cache 底层调用的就是persist,默认存储在内存中 -persist 可以通过参数指定存储级别 -checkpoint *可以当做缓存理解,存储在HDFS上,更稳定 *为了避免容错执行时间过长 -缓存不会切断血缘,但是检查点会切断血缘 10.数据的读取和保存 -textFile -sequenceFile -objectFile -Json 本质还是通过textFile读取文本,对读到的内容进行处理 -HDFS -MySQL map-------mapPartition foreach---foreachPartition 11.在Spark里面,有三大结构 -RDD 弹性分布式数据集 -累加器 分布式共享只写变量 -广播变量 分布式共享只读变量 ------------------------------------------------------------------------------------------------ 1.SparkSQL和Hive对比 -hive---引擎---MR -SparkSQL---引擎---spark 2.SparkSQL对数据集的抽象 -RDD -DataFrame *spark.read.不同类型的文件 *通过SQL风格操作df,需要创建临时视图 *通过DSL风格操作df,不需要创建临时视图 *DF->RDD rdd *RDD->DF toDF *在编程的时候,需要导入 import SparkSession对象名.implicits._ -DataSet(强类型) *RDD->DS toDS *DS->RDD rdd *DF->DS as[类型] *DS->DF toDF 3.用户自定义函数 -UDF 一进一出 -UDAF 多进一出 *弱类型 继承UserDefinedAggregateFunction *强类型 Aggregator[输入,缓存,输出] -UDTF 一进多出 4.文件的加载与数据保存 -spark.read.format("类型").load("路径") -df.writer.format("类型").save("路径") ---------------------------------------------------------------------------------- 1. SparkStreaming -是Spark的内置模块,主要用于实时计算 -微批次、流式实时计算框架 -离线和实时 数据处理的延迟 -批和流式 2. 架构图 3. 背压机制(采集线程与Executor) 4. 创建方式 -DStream 是 SparkStreaming对处理的数据集的一个抽象 -读取指定端口创建DS -通过RDD队列创建DS(了解) -通过定义Receiver读取指定数据源的数据创建DS -通过读取kafka数据源创建DS *spark-streaming-kafka-0.8(spark 2.3.0之后,就过时了。但外面很多公司都还在用) >Receiver DStream &默认情况下,offset维护在zk中 >Direct DStream &默认情况下,offset维护在checkpoint中,需要改变ssc的创建方式 &可以手动指定offset维护位置,为了保证数据的精准一致性,一般维护在有事务的存储上 *spark-streaming-kafka-0.10 > Direct DStream 5. DStream算子 -转换算子 *无状态的转换操作 transform map\flatMap... *有状态的转换操作 &updateStateByKey(Seq,Option) 第一个参数Seq:表示的是相同的key对于的value的集合 第二个参数Option:相同的key对应的状态(上一个采集周期计算的结果) &window -输出算子