Spark从入门到精通第十课:SparkCore实现二次排序&&分组取top n

xiaoxiao2021-02-28  44

1、二次排序

       对两个以上字段排序就叫二次排序

import org.apache.spark.{SparkConf, SparkContext} import src.scala.SecondarySortKey /** * 使用Spark Core API实现二次排序 * 1) 自定义排序的key, 要实现Ordered和Serializable接口 * 2)将要排序的数据,映射成key为自定义排序的key,value就是原始的值 * 3)按照业务逻辑实现compare方法 * 4)使用sortByKey(false/true)算子按照自定义的key进行排序 * 5)丢弃key,取value */ object SecondarySort { def main(args: Array[String]) { val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("SecondarySort")) val rdd = sc.textFile("./datas/b.txt") rdd.map(x => { val splits = x.split("\t") //返回一个tuple,sortByKey按key排序,key的排序规则在自定义类中 (new SecondarySortKey(splits(0).trim.toInt,splits(1).trim.toInt), x) }).sortByKey().map(x => x._2).collect().foreach(println) sc.stop() } } =============================================================== package src.scala class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable { override def compare(that: SecondarySortKey): Int = { if(this.first-that.first!=0){ this.first-that.first }else{ //假设sortByKey括号为true,默认升序,这儿颠倒顺序就可以倒序 this.second-that.second } } }

2、分组取top n

import org.apache.spark.{SparkConf, SparkContext} object SecondarySort { def main(args: Array[String]) { val n=4 //top 4 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("SecondarySort")) val rdd = sc.textFile("./datas/b.txt") val tupleRdd=rdd.map(line => ( //第二个参数要比大小,转int line.split("\t")(0),line.split("\t")(1).toInt)) val groubRdd=tupleRdd.groupByKey() val valueTopNRdd=groubRdd.sortBy(tuple => (tuple._1,tuple._2.toList.sortWith(_>_).take(n))) val keySortRdd=valueTopNRdd.sortByKey() val result=keySortRdd.collect().foreach(println) sc.stop() } }

 

转载请注明原文地址: https://www.6miu.com/read-2632130.html

最新回复(0)