三种方法实现Spark计算WordCount

xiaoxiao2021-02-28  13

1.spark-shell

val lines = sc.textFile("hdfs://spark1:9000/spark.txt") val words = lines.flatMap(line => line.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.foreach(wordcount => println(wordcount._1 + " appeared " + wordcount._2 + " times"))

2.Java实现方式

package cn.itcast.hadoop.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /* * 1.分析具体的业力逻辑,确定输入输出数据样式 * 2.自定义一个类,这个类要继承import org.apache.hadoop.mapreduce.Mapper; * 重写map方法,实现具体业务逻辑,将新的kv输出 * 3.自定义一个类,这个类要继承import org.apache.hadoop.mapreduce.Reducer; * 重写reduce,实现具体业务逻辑 * 4.将自定义的mapper和reducer通过job对象组装起来 */ public class WordCount { public static class WCMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //接收数据V1 String line = value.toString(); //切分数据 String[] wordsStrings = line.split(" "); //循环 for (String w: wordsStrings) { //出现一次,记一个一,输出 context.write(new Text(w), new LongWritable(1)); } } } public static class WCReducer extends Reducer { @Override protected void reduce(Text key, Iterable v2s, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //接收数据 //Text k3 = k2; //定义一个计算器 long counter = 0; //循环v2s for (LongWritable i : v2s) { counter += i.get(); } //输出 context.write(key, new LongWritable(counter)); } } public static void main(String[] args) throws Exception { // 构建Job对象 Job job = Job.getInstance(new Configuration()); // 注意:main方法所在的类 job.setJarByClass(WordCount.class); // 设置Mapper相关属性 job.setMapperClass(WCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); // 设置Reducer相关属性 job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setCombinerClass(WCReducer.class); // 提交任务 job.waitForCompletion(true); } } { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //接收数据V1 String line = value.toString(); //切分数据 String[] wordsStrings = line.split(" "); //循环 for (String w: wordsStrings) { //出现一次,记一个一,输出 context.write(new Text(w), new LongWritable(1)); } } } public static class WCReducer extends Reducer { @Override protected void reduce(Text key, Iterable v2s, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //接收数据 //Text k3 = k2; //定义一个计算器 long counter = 0; //循环v2s for (LongWritable i : v2s) { counter += i.get(); } //输出 context.write(key, new LongWritable(counter)); } } public static void main(String[] args) throws Exception { // 构建Job对象 Job job = Job.getInstance(new Configuration()); // 注意:main方法所在的类 job.setJarByClass(WordCount.class); // 设置Mapper相关属性 job.setMapperClass(WCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); // 设置Reducer相关属性 job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setCombinerClass(WCReducer.class); // 提交任务 job.waitForCompletion(true); } }

最后,需要使用spark submit提交到spark集群中进行运行,执行脚本如下:

/usr/local/spark/bin/spark-submit \ --class cn.spark.study.core.CopyOfWordCountCluster \ --num-executors 3 \ --driver-memory 100m \ --executor-memory 100m \ --executor-cores 3 \ /usr/local/spark-study/java/spark-study-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

3.Scala for eclipse

package cn.spark.study.core import org.apache.spark.SparkConf import org.apache.spark.SparkContext object WordCount { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("WordCount"); val sc = new SparkContext(conf) val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1); val words = lines.flatMap { line => line.split(" ")} val pairs = words.map {word => (word, 1)} val wordCount = pairs.reduceByKey(_ + _) wordCount.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times")) } }

最后,需要使用spark submit提交到spark集群中进行运行,执行脚本如下:

/usr/local/spark/bin/spark-submit \ --class cn.spark.study.core.WordCount \ --num-executors 3 \ --driver-memory 100m \ --executor-memory 100m \ --executor-cores 3 \ /usr/local/spark-study/scala/wordcount.jar \ ~

运行结果如下:


文章最后,给大家推荐一些受欢迎的技术博客链接:

Hadoop相关技术博客链接Spark 核心技术链接JAVA相关的深度技术博客链接超全干货--Flink思维导图,花了3周左右编写、校对深入JAVA 的JVM核心原理解决线上各种故障【附案例】请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂

 


欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!

                                           

       

 

转载请注明原文地址: https://www.6miu.com/read-750155.html

最新回复(0)