Scala中的aggregateByKey()函数

xiaoxiao2021-02-28  6

一.Scala中的aggregateByKey()函数

1.先看源码:

/** * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, * as in scala.TraversableOnce. The former operation is used for merging values within a * partition, and the latter is used for merging values between partitions. To avoid memory * allocation, both of these functions are allowed to modify and return their first argument * instead of creating a new U. */ def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp) }

2.翻译: 对每个键进行聚合操作,使用给出的聚合函数和一个中性的“0”。aggregate()()函数能够返回一个不同的值类型—U,而不是RDD中的值类型-V。【注,这里的v指的是key-value形式中的value】因此,我们需要一种操作来合并V(value)作为U,同时需要另一种操作两两合并U(的value)。在scala中,TranversableOnce。第一种操作是用来将同一个分区中的值合并,第二种操作是将不同分区中的值合并。为了避免内存分配,这两种操作均被允许修改,并且返回他们的第一个参数,而非创建一个新的结果类型U。 3.样例1:

scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> val rdd1 = rdd.aggregateByKey(0)(math.max(_,_),_+_) rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at aggregateByKey at <console>:26 scala> rdd1.foreach(println) (2,3) (3,8) (1,7)

样例2:

scala> val pairRDD = sc.parallelize(Array(("cat", 2), ("cat", 5),("mouse", 4), ("dog", 6)),2) pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:24 scala> val resultPair = pairRDD.aggregateByKey(0)(_+_,_+_) resultPair: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at aggregateByKey at <console>:26 scala> resultPair.foreach(println) (mouse,4) (dog,6) (cat,7)

4.解释: 样例1: 这里有一个rdd,里面存放的是List,而List中存放的是map,即键值对,在这里的键值都是Int型的数字。

分区内的操作是对相同的键的map取max,如果不是相同键,则保留。这里一个有三个分区,恰好每个分区存放两个map。在进行math.max(_,_)操作之后,其实就变成了这个样子:分区1:(1,3),分区2:(1,4)(2,3);分区3:(3,8)再进行第二轮操作—分区间的操作:对不同分区的相同键的值进行合并。然后得到的结果即是:(1,7)(2,3)(3,8)

样例2:

转载请注明原文地址: https://www.6miu.com/read-2350172.html

最新回复(0)