MapReduce框架学习(4)——倒排索引程序实战

xiaoxiao2025-07-31  32

参考: JeffreyZhou的博客园 《Hadoop权威指南》第四版

0 倒排索引(Inverted Index)

前面我们运行过WordCount例子,得到的单词计数结果,如果输入3篇文档,得到的结果是这3个文档所有的单词总数计数,得到如下这样

但是,如果我想知道“hello”这个单词在各个文档中的计数情况呢?也就是最后得到的结果是: 理解一下,上面的结果,是根据文档来查单词的频率,下面是根据单词来查在文档中出现的频率,所以称为倒排索引(Inverted Index)。   那么,这个结果又是咋形成的呢?

4.1 输入输出过程

首先是map过程,输入的是文本,一条条的行记录,输出呢?应该包含:单词,所在文档编号,单词数。那么第一个问题来了,map的输入是Key-value,这有三个参数,谁是key是,谁是value呢?不够分啊。分析一下,数量是需要累计的,所以单词数肯定在value里,单词在key里,文档编号呢?这个参数不能进行累加等操作,不同文件内的相同单词也不能累加,所以它应该放在key中。所以这就是一个复合键,value则是默认的数量1。map后的输出应该是这样: keyvalueHello;T11world;T11Hello;T11Bye;T31…… combine过程,此时的combine的输入就应该是刚才map定义的复合键类型了,在此时将上述的key-value格式进行一轮合并,这个输出应该不改变数据类型,照样传到下一环节,这一轮的输出应该是: keyvalueHello;T12world;T11Bye;T33……

注: 此处与参考教程中有点不同,上面的按照combine的原理进行推理的,但按照源代码,其输出应该是:

keyvalueHello;T1T1:2world;T1T1:1Bye;T3T3:3…… reduce过程,此时只需要按照相同的key(此处为复合键中的单词),将不同map的value结果进行合并处理,就可以得到最终结果: keyvalueHelloT1:2;T2:1worldT1:1;T2:2ByeT2:1;T3:3……

那么各个环节的数据格式变换也看到了,接下来就用代码来实现各个环节吧。

有一点需要说明:以下程序中有些代码已经`deprecated`,现在java语法已经有更好的实现方法,但本例中还是照抄过来,学习其思路和框架后,再进行修改。 在学习中,不用纠结于具体的语法,而且其逻辑思路。

4.2 map类

前面说到了,这个key是复合的,所以常用的几种基本类型已经满足不了我了,先来设置一个复合键MyType.class。

public static class Mytype implements WritableComparable<MyType> { public MyType() {} // 单词 private String word; public void setWord(String word) {this.word = word;} public String getWord() {return word;} // 文档编号 private String filePath; public void setFile(String filePath) {this.filePath = filePath;} public String getFile() {return filePath;} // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(word); out.writeUTF(filePath); } // @Override public void readFile(DataInput in) throws IOException { word = in.readUTF(); filePath = in.readUTF(); } // 比较器 @Override public int compareTo(MYtype arg0) { if (word != aeg0.word) { return word.compareTo(arg0.word); return filePath.compareTo(arg0.filePath); } }

然后,再来写map函数:

public static class InvertedIndexMapper extends Mapper<Object, Text, MyType, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit)context.getInputSplit(); StringTokenizer itr = new StringTonizer(value.toString()); while(itr.hasMoreTokens()) { MyType key = new MyType(); key.setWord(itr.nextToken()); key.setFile(split.getPath().toUri().getPath().replace("/user/hadoop/input/","")); context.write(key,new Text("1")); } } }

4.3 Combine类

public static class InvertedIndexCombiner extends Reducer<MyType,Text,MyType,Text> { public void reduce(MyType key, Text values, Context context) throws IOException,InterruptException { int sum = 0; for (Text value : values) { sum += Integer.parseInt(value.toString()); } context.write(key,new Text(key.getFile()+":"+sum)); } }

4.4 Reduce类

public static class InvertedIndexReducer extends Reducer<MyType, Text, Text, Text> { public void reduce(MyType key, Iterable<Text> values, Context context) throws IOException,InterruptionException { Text result = new Text(); String fileList = new String(); for (Text value : values) { fileList += value.toString() + ";"; } result.set(fileList); context.write(new Text(key.getWord()),result); } }

4.5 Job配置

public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); // System.out.println("url:"+conf.get("fs.defaultFS")) job = Job.getInstance(conf,"MyInvertedIndex"); job.setJarByClass(MyInvertedIndex.class); job.setMapperClass(InvertedIndexMapper.class); job.setMapOutputKeyClass(MyType.class); job.setMapOutputValueClass(Text.class); job.setCombinerClass(InvertedIndexCombiner.class); job.setReducerClass(InvertedIndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 检测输出目录output是否已存在,若存在,则删除 // Path path = new Path("output"); // FileSystem hdfs = new FileSystem.get(conf); // if (hdfs.exists(path)) // hdfs.delete(path,true); FileInputFormat.addInputPath(job,new Path("input")); FileOutputFormat.addOutputPath(job,new Path("output")); job.waitForCompletion(true); }

4.x 后记

为什么自定义的Combine类中,reduce方法传入的参数是(Iterable values),上一环节map的输出明明是 new Text("1")。。。再接着看Reduce环节的reduce方法,发现里面也是 Iterable<Text> values,想明白了,可能这中间还有一个操作,将上一环节传来的序列化Text(value)变为可迭代数据。
转载请注明原文地址: https://www.6miu.com/read-5034015.html

最新回复(0)