RDD-Partitioner

xiaoxiao2021-02-28  5

概要

Spark RDD主要由Dependency、Partition、Partitioner组成,这篇介绍最后一部分Partitioner。Partition记录了数据split的逻辑,Dependency记录的是transformation操作过程中Partition的演化,Partitioner是shuffle过程中key重分区时的策略,即计算key决定k-v属于哪个分区。

Partitioner作用

Partitioner是在shuffle阶段起作用,无论对于mapreduce还是spark,shuffle都是重中之重,因为shuffle的性能直接影响着整个程序,先了解下shuffle:详细探究Spark的shuffle实现,图片也来自此博客(侵删),shuffle涉及到网络开销及可能导致的数据倾斜问题,是调优关注的重点。  如上图,在shuffle中,map阶段处理结果使用ShuffleWriter,根据Partitioner逻辑写到不同bucket中,不同的bucket后续被不同的reducer使用,源码如下: 上图是HashShuffleWriter中的write方法实现,图中圈出了Partitioner发挥作用的代码,shuffle的实现又分为HashShuffleManager和SortShuffleManager,会在后续shuffle阶段详解。 

至此,我们对Partitioner有了初步的了解,其在shuffle阶段发挥作用,依据Partitioner的逻辑计算key,得出对应的k-v属于哪个分区。

Partitioner定义

抽象类Partitioner,定义了两个抽象方法numPartitions和getPartition,如上图所示 Partitioner的伴生对象定义了defaultPartitioner方法,实现了类似cogroup这类操作,如何从父RDD中选择Partitioner,逻辑如图中所示,Partitioner实现有两个,如下:

HashPartitioner numPartitions方法返回传入的分区数,getPartition方法,使用key的hashCode值对分区数取模得到PartitionId,结合第一幅图,写入到对应的bucket中。java中的hashmap和mapreduce的HashPartitioner也类似,细节处不同,如hashmap不是取模,是通过&实现的。RangePartitioner RangePartitioner的实现较HashPartitioner复杂,单独整理,查看这里。

Partitioner使用

使用Partitioner必须满足两个前提,1、RDD是k-v形式,如RDD[(K, V)],2、有shuffle操作。常见的触发shuffle的操作有: 1.combineByKey(groupByKey, reduceByKey , aggregateByKey) 2. sortByKey 3. join(leftOuterJoin, rightOuterJoin, fullOuterJoin) 4. cogroup 5. repartition(coalesce(shuffle=true)) 6. groupWith 7. repartitionAndSortWithinPartitions

自定义Partitioner

上面我们已给出了Partitioner的定义,只需根据需求实现两个抽象方法,下面的例子我们以key的长度进行分区

来源: http://blog.csdn.net/u011564172/article/details/54667057   class CustomPartitioner(partitions: Int) extends Partitioner { def numPartitions: Int = partitions def getPartition(key: Any): Int = { val k = key.asInstanceOf[String] return k.length() & (partitions -1) } } 调整Partitioner  val data = sc.parallelize(List("a c", "a b", "bb c", "bb d", "c d"), 2) data.flatMap(_.split(" ")).map((_, 1)) .partitionBy(new CustomPartitioner(2)).reduceByKey(_ + _) .collect() 使用partitionBy调整Partitioner,传入我们自定义的Partitioner,通过web UI查看执行细节 

从上图可以看出,长度为1的8条数据经过k.length() & (partitions -1)处理写到Partition 1中(1 & (2-1) ==1),长度为2的两条数据写入到Partition 0中。

总结

简要介绍了Partitioner的作用,通过源码了解Partitioner的两个实现类,并举了使用自定义Partitioner的例子。最为重要的,Partitioner要发挥作用必须满足的两个前提:RDD[(K, V)和shuffle操作。此外,自定义Partitioner也是解决数据倾斜问题的手段之一。

转载请注明原文地址: https://www.6miu.com/read-2350042.html

最新回复(0)