Spark算子[09]：aggregateByKey、aggregate详解

xiaoxiao2021-02-28  31

aggregateByKey

aggregateByKey的用法同combineByKey，针对combineByKey的三个参数：

createCombiner: V => C，mergeValue: (C, V) => C，mergeCombiners: (C, C) => C

zeroValue: C，mergeValue: (C, V) => C，mergeCombiners: (C, C) => C

/** * 底层同样调用的是 combineByKeyWithClassTag */ def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { ... combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner) } def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

案例同combineByKey

Scala实战案例

/***/ def avgScore(): Unit = { //省略代码 ... val avgscoreRdd = studentDetailRdd.aggregateByKey( //1、zeroValue // * 区别点：(0.0f,0) 代替 ：x => (x.score,1) (0.0f,0) , //2、 mergeValue：合并值函数 (acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1), //3、 mergeCombiners：合并组合器函数 (acc1: (Float, Int), acc2: (Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ) }

Java实战案例

public static void avgScore() { /**省略代码*/ //2、mergeValue：合并值函数 Function2<Tuple2<Float,Integer>,ScoreDetail003,Tuple2<Float,Integer>> mergeValue = new Function2<Tuple2<Float, Integer>, ScoreDetail003, Tuple2<Float, Integer>>() { @Override public Tuple2<Float, Integer> call(Tuple2<Float, Integer> v1, ScoreDetail003 v2) throws Exception { return new Tuple2<Float, Integer>(v1._1()+v2.score,v1._2()+1); } }; //3、mergeCombiners：合并组合器函数 Function2<Tuple2<Float,Integer>,Tuple2<Float,Integer>,Tuple2<Float,Integer>> mergeCombiners = new Function2<Tuple2<Float, Integer>, Tuple2<Float, Integer>, Tuple2<Float, Integer>>() { @Override public Tuple2<Float, Integer> call(Tuple2<Float, Integer> v1, Tuple2<Float, Integer> v2) throws Exception { return new Tuple2<Float, Integer>(v1._1()+v2._1(),v1._2()+v2._2()); } }; //4、aggregateByKey并求均值 // * 区别点：new Tuple2<Float,Integer>(0.0f,0) 代替 createCombiner JavaPairRDD<String,Float> res = pairRDD .aggregateByKey(new Tuple2<Float,Integer>(0.0f,0), mergeValue, mergeCombiners, 2) .mapToPair(x -> new Tuple2<String, Float>(x._1(),x._2()._1()/x._2()._2())); }

aggregate

/** * 聚合每个分区的元素，然后是聚合所有分区的结果，使用给定的组合函数和一个中性的"zero value". * * @param zeroValue "seqOp"操作符的每个分区的合并结果的初始值，同时也是"combOp"操作对于不同分区合并结果的初始值； * 这通常是中性元素(比如 `Nil` 对于集合操作 或者 `0` for 求和操作) * @param seqOp 用于在分区中累积结果的操作符 * @param combOp 用于组合不同分区的结果的关联运算符 */ def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {}

Scala实战案例

def aggregateOp(): Unit ={ val conf = new SparkConf().setAppName("aggregateOp").setMaster("local") val sc = new SparkContext(conf) val list = List("aa","bb","cc") val rdd = sc.parallelize(list,2) def seqOp(a:String,b:String): String ={ a+"-"+b } def combOp(a:String,b:String): String ={ a+":"+b } val res = rdd.aggregate("oo")(seqOp,combOp) println(res) }

def main(args: Array[String]) { val conf = new SparkConf().setAppName("aggregateOp").setMaster("local") val sc = new SparkContext(conf) val list = List(5) val rdd = sc.parallelize(list) def seqOp(a: Int, b: Int): Int = { a + b } def combOp(a: Int, b: Int): Int = { a + b } val res = rdd.aggregate(2)(seqOp,combOp) println(res) //结果是 9 aggregate函数的zeroVal会被加两次seqOp、combOp }

Java实战案例

public static void aggregateOp(){ SparkConf conf = new SparkConf().setMaster("local").setAppName("aggregateOp"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9),2); // 1、seqOp Function2<Integer,Integer,Integer> seqOp = new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return Math.max(v1,v2); } }; //2、combOp Function2<Integer,Integer,Integer> combOp = new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return Math.min(v1,v2); } }; //3、aggregate Integer res = rdd1.aggregate(0,seqOp,combOp); System.out.println(res); }