什么都不说了
hadoop的hello,什么都不说了,直接上代码。。
reducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
protected void reduce(Text arg0, Iterable<IntWritable> arg1,Context arg2)throws IOException, InterruptedException {
int sum =
0;
for(IntWritable i: arg1){
sum =
sum + i.get();
}
arg2.write(arg0,
new IntWritable(
sum));
}
}
mapper
import java
.io.IOException
import org
.apache.hadoop.io.IntWritable
import org
.apache.hadoop.io.LongWritable
import org
.apache.hadoop.io.Text
import org
.apache.hadoop.mapreduce.Mapper
import org
.apache.hadoop.util.StringUtils
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, IntWritable>
.Context context)
throws IOException, InterruptedException {
String[] words = StringUtils
.split(value
.toString(),
' ')
for(String w :words){
context
.write(new Text(w), new IntWritable(
1))
}
}
}
runClass
import org
.apache.hadoop.conf.Configuration
import org
.apache.hadoop.fs.FileSystem
import org
.apache.hadoop.fs.Path
import org
.apache.hadoop.io.IntWritable
import org
.apache.hadoop.io.Text
import org
.apache.hadoop.mapreduce.Job
import org
.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org
.apache.hadoop.mapreduce.lib.output.FileOutputFormat
public class RunJob {
public static void main(String[] args) {
Configuration config = new Configuration()
// config
.set(
"fs.defaultFS",
"hdfs://192.168.137.11:8020")
// config
.set(
"yarn.resourcemanager.hostname",
"192.168.137.10")
config
.set(
"mapred.jar",
"D:\\wc.jar")
try {
FileSystem fs = FileSystem
.get(config)
Job job = Job
.getInstance(config)
job
.setJarByClass(RunJob
.class)
job
.setJobName(
"wc")
job
.setMapperClass(WordCountMapper
.class)
job
.setReducerClass(WordCountReducer
.class)
job
.setMapOutputKeyClass(Text
.class)
job
.setMapOutputValueClass(IntWritable
.class)
FileInputFormat
.addInputPath(job, new Path(
"/usr/input/"))
Path outpath = new Path(
"/usr/output/wc")
if(fs
.exists(outpath)){
fs
.delete(outpath, true)
}
FileOutputFormat
.setOutputPath(job, outpath)
boolean f = job
.waitForCompletion(true)
if(f){
System
.out.println(
"job sucessful!")
}
} catch (Exception e) {
e
.printStackTrace()
}
}
}