RDD算子

xiaoxiao2022-06-11  95

transform算子

//定义一个内置数组 val arr = Array(1,2,3,4,5) //将数组转化为rdd val rdd1 = sparkContext.parallelize(arr) 1.map rdd1.map(x=>x*2) rdd1.map(x=>(x,x)) 2.filter //保留奇数 rdd1.filter(x=>if(x%2 == 1) true else false) 3.flatmap val arr2 = Array("hello a","hello b","hello c") rdd2.flatmap(line=>line.split(" ")) 4.mapPartitions val rdd1 = sparkContext.parallelize(arr , 3 ) //分区数设置为3,默认是2 rdd1.mapPartitions(x:Iterator[Int]=>{ //每个分区执行一次这些代码 val newlist:List[Int] = x.toList.map(y=>y*y) newlist.toIterator }) 5.mapPartitionsWithIndex rdd3.mapPartitionsWithIndex((index:Int,data:Iterator[Int])=>{ println("执行操作的分区编号是"+index) val newlist:List[Int] = data.toList.map(y=>y*y) newlist.toIterator }) 6.smaple //是否放回,采样比例,种子数 rdd1.sample(true,0.1,0) 7.groupbykey//根据key整合 reducebykey//根据key,把value进行计算,相同的key进行求和 rdd1.reducebykey((x:Int,y:Int)=> x+y) //就是把value依次相加得到和 区别参考:https://blog.csdn.net/do_yourself_go_on/article/details/76033252 8.sortbykey 排序 //true升序,false降序 rdd.sortbykey(true/false) sortby 9.aggregatebykey groupbykey+aggregate aggregate 针对单个元素的rdd aggregatebykey 针对key-value形式 参数 aggregatebykey(1)(2,3) 分区执行的,执行之后每个区进行合并 1)初始值 2)迭代操作,拿rdd中的每个元素和初始值进行合并 3)分区合并逻辑 aggregate求平均值 //第一个参数可以是元祖,第一个是sum,第二个是个数 //第二个参数,后面的int合并到前面的元祖中(int,int) //第三个参数,相当于不同分区的元祖进行合并 val res = aggregate(0,0)((u:(Int,Int),x:Int)=>(u._1+x,u._2+1), (x,y)=>((x._1+y._1),(x._2+y._2))) val avg = res._1.toDouble/res._2

这几个算子, 核心就要弄明白combineByKey, 其他三个都是调用它. 上文主要也是从combingByKey传入的三个函数的角度在分析.

而在实际运用中, 最先要考虑的应该是类型. combingByKey和aggregateByKey输入跟输出的类型可以不一致, 而foldByKey和reduceByKey不行. 类型确定后再根据自己的业务选择最简洁的算子.

转载请注明原文地址: https://www.6miu.com/read-4932097.html

最新回复(0)