hadoop自定义输入格式(InputFormat)

xiaoxiao2021-02-28  124

个人感觉如果没有能自己实现输入格式的话,其实对mapreduce的程序运行,是不能理解深刻的。实现目标:自定义输入格式从本地文本信息中统计单词出现个数。感觉很熟悉吧。第一步首先要实现抽象类InputFormat。里面有要两个实现的方法,得到分片信息,和得到记录阅读类(RecordReader)。下面是源代码 public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }

接着来看自定义的输入类代码

public class TextInputFormat extends InputFormat<IntWritable, Text>{ @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub //这里为了方便直接以1,3条记录为一个分片,4,6记录为一个分片。将分片信息放在集合里面。 ArrayList<InputSplit> arrayList=new ArrayList<>(); TestSplit testSplit=new TestSplit(1, 3); TestSplit testSplit2=new TestSplit(4, 6); arrayList.add(testSplit); arrayList.add(testSplit2); return arrayList; } @Override public RecordReader<IntWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new TestReader(); } }

3.首先来看得到分片信息,我们可以看到返回类型是inputsplit的list类型。很容易得到自定义分片,应该是inputsplit的子类。于是我们实现抽象类InputSplit.有两个要实现方法。getlength()得到分片大小。这里主要是后面会根据分片大小排序。getLocations()得到分片信息来自于哪一个节点的名称,如果返回null的话,这里是会出异常的。可以任意返回与分片数量相等的字符串数组。来看源代码

public class TestSplit extends InputSplit implements Writable{ private int start=0; private int end=0; //分片一定有一个空的构造函数,不然会报错、 public TestSplit() { // TODO Auto-generated constructor stub } //返回分片大小 @Override public long getLength() throws IOException, InterruptedException { // TODO Auto-generated method stub return end-start+1; } //返回位置信息 @Override public String[] getLocations() throws IOException, InterruptedException { // TODO Auto-generated method stub return new String[]{"hadoop1","hadoop2"}; } //序列化这个地方不用讲吧 @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeInt(start); out.writeInt(end); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.start=in.readInt(); this.end=in.readInt(); } public TestSplit(int start, int end) { this.start = start; this.end = end; } public int getStart() { return start; } public void setStart(int start) { this.start = start; } public int getEnd() { return end; } public void setEnd(int end) { this.end = end; } }

4.再来看RecordReader类,这个类主要是将记录拆成key/value这种形式,我里以行数为key,以行内容为value。实现这个功能的主要是nextKeyValue()方法。这个方法只要还有记录没有拆分就返回true.这个地方先要讲一个如何得到记录,我写了一个工具类。可以读本地文件将每行信息保存到map中。下面来看源代码

public class TestReader extends RecordReader<IntWritable, Text>{ private TestSplit testsplit; private int start; //记录的开始 private int end; //记录的结束 private IntWritable key=new IntWritable(); //返回的key private Text value=new Text(); //返回的value private Map<Integer, String> map=new HashMap<>(); @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub testsplit=(TestSplit) split; start=testsplit.getStart(); end=testsplit.getEnd(); map=TestText.getText(); //得到文件信息 } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub if (start<=end) { key.set(start); //设置key和value value.set(map.get(start)); start++; return true; } else return false; } @Override public IntWritable getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return value; } //后面两个方法可以不用实现 @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void close() throws IOException { // TODO Auto-generated method stub } } 工具类代码 public class TestText { public static Map<Integer, String> getText() { Map<Integer, String> map=new HashMap(); File file = new File("D:/Test.txt"); BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(file)); String tempString = null; int line = 1; while ((tempString = reader.readLine()) != null) { map.put(line, tempString); line++; } reader.close(); } catch (IOException e) { e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { } } return map; } } }

5.Mapper类

public class TestMapper extends Mapper<IntWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(IntWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String line = value.toString(); StringTokenizer st = new StringTokenizer(line," "); while(st.hasMoreTokens()){ word.set(st.nextToken()); context.write(word,one); } } }

6.Reducer类

public class TestReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException { int sum = 0; for(IntWritable val:values) { sum += val.get(); } result.set(sum); context.write(key,result); } }

7.主类

public class WordCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "wordcount"); job.setJarByClass(WordCount.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TestMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(TestReduce.class); FileOutputFormat.setOutputPath(job, new Path("01/")); //输出信息保存在本地项目目录下 System.exit(job.waitForCompletion(true)?0:1); } }

8.github项目地址https://github.com/iareuniqe/InputFormat

转载请注明原文地址: https://www.6miu.com/read-69154.html

最新回复(0)