Spark Accumulator示例代码

xiaoxiao2021-02-28  21

package examples; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import java.util.Arrays; public class AccumulatorExample { static final String FILE_PATH = "F:/tmp/spark/hello.txt"; public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local[3]") .setAppName("AccumulatorExample"); JavaSparkContext ctx = new JavaSparkContext(conf); //从驱动器程序读取文件 JavaRDD<String> lines = ctx.textFile(FILE_PATH); //定义一个Accumulator对象, 初始值为0 final Accumulator<Integer> blankLines = ctx.accumulator(0); JavaRDD rdd = lines.flatMap( new FlatMapFunction<String, String>() { public Iterable<String> call(String line) throws Exception { if(line.equals("")){ blankLines.add(1); } return Arrays.asList(line.split(" ")); } } ); rdd.saveAsTextFile("out.txt"); System.out.println(blankLines.value()); } }
转载请注明原文地址: https://www.6miu.com/read-2632224.html

最新回复(0)