# spark_3 **Repository Path**: cch-bigdata/spark_3 ## Basic Information - **Project Name**: spark_3 - **Description**: Spark原理及源码剖析章节作业 - **Primary Language**: Scala - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-09-21 - **Last Updated**: 2021-09-21 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # spark_3 #### 介绍 Spark原理及源码剖析章节作业 ## 输出内容: ```aidl join-rdd3:==================== rdd1:==================== None 16 ArrayBuffer((4,user1), (5,user2), (2,user3)) ArrayBuffer((7,user4), (6,user5), (0,user6)) ArrayBuffer((1,user7), (5,user8), (7,user9)) ArrayBuffer((8,user10), (7,user11), (9,user12)) ArrayBuffer((4,user13), (1,user14), (5,user15)) ArrayBuffer((5,user16), (6,user17), (8,user18)) ArrayBuffer((7,user19), (5,user20), (9,user21)) ArrayBuffer((7,user22), (4,user23), (2,user24)) ArrayBuffer((0,user25), (9,user26), (9,user27)) ArrayBuffer((0,user28), (2,user29), (4,user30)) ArrayBuffer((6,user31), (1,user32), (8,user33)) ArrayBuffer((2,user34), (4,user35), (7,user36)) ArrayBuffer((0,user37), (6,user38), (8,user39)) ArrayBuffer((3,user40), (7,user41), (6,user42)) ArrayBuffer((1,user43), (9,user44), (3,user45)) ArrayBuffer((5,user46), (5,user47), (8,user48), (6,user49)) rdd2:==================== None 16 ArrayBuffer() ArrayBuffer((0,BJ)) ArrayBuffer() ArrayBuffer((1,SH)) ArrayBuffer((2,GZ)) ArrayBuffer() ArrayBuffer((3,SZ)) ArrayBuffer((4,TJ)) ArrayBuffer() ArrayBuffer((5,CQ)) ArrayBuffer() ArrayBuffer((6,HZ)) ArrayBuffer((7,NJ)) ArrayBuffer() ArrayBuffer((8,WH)) ArrayBuffer((0,CD)) join-rdd4:==================== rdd1p:==================== Some(org.apache.spark.HashPartitioner@3) 3 ArrayBuffer((6,user5), (0,user6), (9,user12), (6,user17), (9,user21), (0,user25), (9,user26), (9,user27), (0,user28), (6,user31), (0,user37), (6,user38), (3,user40), (6,user42), (9,user44), (3,user45), (6,user49)) ArrayBuffer((4,user1), (7,user4), (1,user7), (7,user9), (7,user11), (4,user13), (1,user14), (7,user19), (7,user22), (4,user23), (4,user30), (1,user32), (4,user35), (7,user36), (7,user41), (1,user43)) ArrayBuffer((5,user2), (2,user3), (5,user8), (8,user10), (5,user15), (5,user16), (8,user18), (5,user20), (2,user24), (2,user29), (8,user33), (2,user34), (8,user39), (5,user46), (5,user47), (8,user48)) rdd2p:==================== Some(org.apache.spark.HashPartitioner@3) 3 ArrayBuffer((0,BJ), (3,SZ), (6,HZ), (0,CD)) ArrayBuffer((1,SH), (4,TJ), (7,NJ)) ArrayBuffer((2,GZ), (5,CQ), (8,WH)) ``` 从打印数据看到,rdd1和rdd2本身都是没有分区器的,虽然默认都被分了16个分区,但从数据上看相同的key并没有落到相同的分区里,所有rdd1.join(rdd2)本身是需要对原始数据进行分区移动的,也就是rdd1,rdd2中本身分区中的数据可能去往rdd3的任何分区,这个操作是宽依赖 但是,rdd4的产生,是rdd1和rdd2本身已经做了hash分区了,产生的rdd1p和rdd2p是有分区器,分区数相同,相同的key在相同分区。所以join的时候,rdd1p和rdd2p的数据并不会乱跑,会走向rdd4中的对应分区,这个操作是窄依赖 ## 源码 ```aidl def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope { join(other, defaultPartitioner(self, other)) } def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val rdds = (Seq(rdd) ++ others) // 判断传入的rdd有没有设置分区器partitioner val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) //如果设置了partitioner,则取设置partitioner的最大分区数 val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) { Some(hasPartitioner.maxBy(_.partitions.length)) } else { None } //判断是否设置了spark.default.parallelism,如果设置了则取spark.default.parallelism val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) { rdd.context.defaultParallelism } else { rdds.map(_.partitions.length).max } // If the existing max partitioner is an eligible one, or its partitions number is larger // than the default number of partitions, use the existing partitioner. //主要判断传入rdd是否设置了默认的partitioner 以及设置的partitioner是否合法 //或者设置的partitioner分区数大于默认的分区数 //条件成立则取传入rdd最大的分区数,否则取默认的分区数 if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) || defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) { hasMaxPartitioner.get.partitioner.get } else { new HashPartitioner(defaultNumPartitions) } } private def isEligiblePartitioner( hasMaxPartitioner: RDD[_], rdds: Seq[RDD[_]]): Boolean = { val maxPartitions = rdds.map(_.partitions.length).max log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1 } } ``` ```aidl def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) } def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("HashPartitioner cannot partition array keys.") } //partitioner 通过对比得到的默认分区器,主要是分区器中的分区数 val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) cg.mapValues { case Array(vs, w1s) => (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]]) } } ``` ```aidl override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_] => //判断join 左右的rdd是否和上面选择的默认分区器分区数一致,如果一致则是窄依赖,否则就是宽依赖 if (rdd.partitioner == Some(part)) { logDebug("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) } else { logDebug("Adding shuffle dependency with " + rdd) new ShuffleDependency[K, Any, CoGroupCombiner]( rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer) } } } ``` 为什么rdd3是宽依赖但是打印出窄依赖,因为在打印依赖代码中,比较的是被设置默认分区器的两个rdd的比较,这个时候是相等的。如果想知道真实的,可以使用rdd3中cogroup产生的rdd打印依赖关系,此时因为没有分区器不是hash分区器,equels为false,会打印宽依赖