spark2.1.0WordCountCluster

xiaoxiao2021-02-27  100

1.在本地创建一个文件spark.txt

hello tom hello jerry hello kitty hello world hello tom hi nihao ni ni hi hi hello hi hi hello

2.用winscp 将spark.txt上传到虚拟机上

3.将spark.txt上传到hadoop集群

hadoop  fs  -put spark.txt  /spark,txt

4.在eclipse上写代码如下

package cn.pbj.sparkWcount; import java.io.Serializable; import java.util.Iterator; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*; import scala.Tuple2; public class WCCluster implements Serializable { public void execute(String inputPath, String outputFile) { SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> input = sc.textFile(inputPath); JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() { /// https://stackoverflow.com/questions/38880956/spark-2-0-0-arrays-aslist-not-working-incompatible-types/38881118#38881118 /// In 2.0, FlatMapFunction.call() returns an Iterator rather than Iterable. Try this: public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); } }); JavaPairRDD<String, Integer> counts = words.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String x) { return new Tuple2(x, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer x, Integer y) throws Exception { return x + y; } }); counts.saveAsTextFile(outputFile); sc.stop(); } public static void main( String[] args ) { String inputFile = "hdfs://spark01:9000/spark.txt"; //输入路径 String outputFile = "/root/wordCount/"; /// 输出路径 WCCluster wc = new WCCluster(); wc.execute(inputFile, outputFile); } } 5.将项目文件打包成WcCluster,jar

6.将WcCluster,jar上传到虚拟机中

7.在spark/bin目录下执行

./spark-submit --class cn.pbj.sparkWcount.WCCluster --master spark://spark01:7077 /usr/local/WcCluster.jar

--class 包名.类名  --master主节点  jar路径/jar文件

8.执行后出现INFO output.FileOutputCommitter: Saved output of task 'attempt_20170606235459_0001_m_000000_1' tohdfs://spark01:9000/root/wordCount/_temporary/0/task_20170606235459_0001_m_000000

在spark01:50070、root/wordCount/  输出

下载到本地文件如下

(hi,5) (tom,2) (hello,7) (jerry,1) (ni,2) (kitty,1) (nihao,1) (world,1)

总结:

1,个人Java基础一般代码只能懂个大概,代码是从GitHub上copy的,还需要提高。

2.就是iterator和iterable两个的区别搞不懂

看别人都是用iterable 但是我的eclipse上就是用不了。

JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); } });

区别一下

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); 现在还是搞不懂这两个。

转载请注明原文地址: https://www.6miu.com/read-15338.html

最新回复(0)