1.在本地创建一个文件spark.txt
hello tom hello jerry hello kitty hello world hello tom hi nihao ni ni hi hi hello hi hi hello
2.用winscp 将spark.txt上传到虚拟机上
3.将spark.txt上传到hadoop集群
hadoop fs -put spark.txt /spark,txt
4.在eclipse上写代码如下
package cn.pbj.sparkWcount; import java.io.Serializable; import java.util.Iterator; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*; import scala.Tuple2; public class WCCluster implements Serializable { public void execute(String inputPath, String outputFile) { SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> input = sc.textFile(inputPath); JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() { /// https://stackoverflow.com/questions/38880956/spark-2-0-0-arrays-aslist-not-working-incompatible-types/38881118#38881118 /// In 2.0, FlatMapFunction.call() returns an Iterator rather than Iterable. Try this: public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); } }); JavaPairRDD<String, Integer> counts = words.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String x) { return new Tuple2(x, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer x, Integer y) throws Exception { return x + y; } }); counts.saveAsTextFile(outputFile); sc.stop(); } public static void main( String[] args ) { String inputFile = "hdfs://spark01:9000/spark.txt"; //输入路径 String outputFile = "/root/wordCount/"; /// 输出路径 WCCluster wc = new WCCluster(); wc.execute(inputFile, outputFile); } } 5.将项目文件打包成WcCluster,jar6.将WcCluster,jar上传到虚拟机中
7.在spark/bin目录下执行
./spark-submit --class cn.pbj.sparkWcount.WCCluster --master spark://spark01:7077 /usr/local/WcCluster.jar
--class 包名.类名 --master主节点 jar路径/jar文件
8.执行后出现INFO output.FileOutputCommitter: Saved output of task 'attempt_20170606235459_0001_m_000000_1' tohdfs://spark01:9000/root/wordCount/_temporary/0/task_20170606235459_0001_m_000000
在spark01:50070、root/wordCount/ 输出
下载到本地文件如下
(hi,5) (tom,2) (hello,7) (jerry,1) (ni,2) (kitty,1) (nihao,1) (world,1)
总结:
1,个人Java基础一般代码只能懂个大概,代码是从GitHub上copy的,还需要提高。
2.就是iterator和iterable两个的区别搞不懂
看别人都是用iterable 但是我的eclipse上就是用不了。
JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); } }); 区别一下 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); 现在还是搞不懂这两个。