参考:
JeffreyZhou的博客园
《Hadoop权威指南》第四版
0 倒排索引(Inverted Index)
前面我们运行过WordCount例子,得到的单词计数结果,如果输入3篇文档,得到的结果是这3个文档所有的单词总数计数,得到如下这样
但是,如果我想知道“hello”这个单词在各个文档中的计数情况呢?也就是最后得到的结果是: 理解一下,上面的结果,是根据文档来查单词的频率,下面是根据单词来查在文档中出现的频率,所以称为倒排索引(Inverted Index)。 那么,这个结果又是咋形成的呢?
4.1 输入输出过程
首先是map过程,输入的是文本,一条条的行记录,输出呢?应该包含:单词,所在文档编号,单词数。那么第一个问题来了,map的输入是Key-value,这有三个参数,谁是key是,谁是value呢?不够分啊。分析一下,数量是需要累计的,所以单词数肯定在value里,单词在key里,文档编号呢?这个参数不能进行累加等操作,不同文件内的相同单词也不能累加,所以它应该放在key中。所以这就是一个复合键,value则是默认的数量1。map后的输出应该是这样:
keyvalue
Hello;T11world;T11Hello;T11Bye;T31……
combine过程,此时的combine的输入就应该是刚才map定义的复合键类型了,在此时将上述的key-value格式进行一轮合并,这个输出应该不改变数据类型,照样传到下一环节,这一轮的输出应该是:
keyvalue
Hello;T12world;T11Bye;T33……
注: 此处与参考教程中有点不同,上面的按照combine的原理进行推理的,但按照源代码,其输出应该是:
keyvalue
Hello;T1T1:2world;T1T1:1Bye;T3T3:3……
reduce过程,此时只需要按照相同的key(此处为复合键中的单词),将不同map的value结果进行合并处理,就可以得到最终结果:
keyvalue
HelloT1:2;T2:1worldT1:1;T2:2ByeT2:1;T3:3……
那么各个环节的数据格式变换也看到了,接下来就用代码来实现各个环节吧。
有一点需要说明:以下程序中有些代码已经`deprecated`,现在java语法已经有更好的实现方法,但本例中还是照抄过来,学习其思路和框架后,再进行修改。
在学习中,不用纠结于具体的语法,而且其逻辑思路。
4.2 map类
前面说到了,这个key是复合的,所以常用的几种基本类型已经满足不了我了,先来设置一个复合键MyType.class。
public static class Mytype implements WritableComparable<MyType> {
public MyType() {}
private String word
;
public void setWord(String word
) {this.word
= word
;}
public String
getWord() {return word
;}
private String filePath
;
public void setFile(String filePath
) {this.filePath
= filePath
;}
public String
getFile() {return filePath
;}
@Override
public void write(DataOutput out
) throws IOException
{
out
.writeUTF(word
);
out
.writeUTF(filePath
);
}
@Override
public void readFile(DataInput in
) throws IOException
{
word
= in
.readUTF();
filePath
= in
.readUTF();
}
@Override
public int compareTo(MYtype arg0
) {
if (word
!= aeg0
.word
) {
return word
.compareTo(arg0
.word
);
return filePath
.compareTo(arg0
.filePath
);
}
}
然后,再来写map函数:
public static class InvertedIndexMapper extends Mapper<Object, Text, MyType, Text> {
public void map(Object key
, Text value
, Context context
)
throws IOException
, InterruptedException
{
FileSplit split
= (FileSplit
)context
.getInputSplit();
StringTokenizer itr
= new StringTonizer(value
.toString());
while(itr
.hasMoreTokens()) {
MyType key
= new MyType();
key
.setWord(itr
.nextToken());
key
.setFile(split
.getPath().toUri().getPath().replace("/user/hadoop/input/",""));
context
.write(key
,new Text("1"));
}
}
}
4.3 Combine类
public static class InvertedIndexCombiner extends Reducer<MyType,Text,MyType,Text> {
public void reduce(MyType key
, Text values
, Context context
)
throws IOException
,InterruptException
{
int sum
= 0;
for (Text value
: values
) {
sum
+= Integer
.parseInt(value
.toString());
}
context
.write(key
,new Text(key
.getFile()+":"+sum
));
}
}
4.4 Reduce类
public static class InvertedIndexReducer extends Reducer<MyType, Text, Text, Text> {
public void reduce(MyType key
, Iterable
<Text> values
, Context context
)
throws IOException
,InterruptionException
{
Text result
= new Text();
String fileList
= new String();
for (Text value
: values
) {
fileList
+= value
.toString() + ";";
}
result
.set(fileList
);
context
.write(new Text(key
.getWord()),result
);
}
}
4.5 Job配置
public static void main(String
[] args
) throws IOException
{
Configuration conf
= new Configuration();
job
= Job
.getInstance(conf
,"MyInvertedIndex");
job
.setJarByClass(MyInvertedIndex
.class);
job
.setMapperClass(InvertedIndexMapper
.class);
job
.setMapOutputKeyClass(MyType
.class);
job
.setMapOutputValueClass(Text
.class);
job
.setCombinerClass(InvertedIndexCombiner
.class);
job
.setReducerClass(InvertedIndexReducer
.class);
job
.setOutputKeyClass(Text
.class);
job
.setOutputValueClass(Text
.class);
FileInputFormat
.addInputPath(job
,new Path("input"));
FileOutputFormat
.addOutputPath(job
,new Path("output"));
job
.waitForCompletion(true);
}
4.x 后记
为什么自定义的Combine类中,reduce方法传入的参数是(Iterable
values),上一环节map的输出明明是
new Text("1")。。。再接着看Reduce环节的reduce方法,发现里面也是
Iterable<Text> values,想明白了,可能这中间还有一个操作,将上一环节传来的序列化Text(value)变为可迭代数据。