利用MapReduce实现倒排索引
什么是倒排索引
倒排索引源于实际应用中需要根据属性的值来查找记录。这种索引表中的每一项都包括一个属性值和具有该属性值的各记录的地址。
倒排索引简单地就是:根据单词,返回它在哪个文件中出现过,而且频率是多少的结果。这就像百度里的搜索,你输入一个关键字,那么百度引擎就迅速的在它的服务器里找到有该关键字的文件,并根据频率和其他一些策略(如页面点击投票率)等来给你返回结果。这个过程中,倒排索引就起到很关键的作用。
案例解析
有两个文档,分别是a.txt和b.txt,里面的内容如下图:
通过倒排索引来实现查找词语在不同文章中出现的次数,输出结果为:1
2
3
4hello "a.txt->5 b.txt->3"
tom "a.txt->2 b.txt->1"
kitty "a.txt->1"
...
代码详解
倒排索引涉及几个过程:Map过程,Combine过程,Reduce过程,闲话少说,直接来代码。
Mapper
首先编写Mapper,实现的功能是将读取文件中的每一行1
<0,"hello tom">
输出成1
2
3
4context.write("hello->a.txt",1);
context.write("tom->a.txt",1);
context.write("hello->a.txt",1);
...
代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public static class IndexMapper extends Mapper<LongWritable, Text, Text, Text>{
private Text k = new Text();
private Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
FileSplit inputSplit = (FileSplit)context.getInputSplit();
String path = inputSplit.getPath().toString();
for(String w:words){
k.set(w+"->"+path);
v.set("1");
context.write(k, v);
}
}
}
Combiner
Combiner阶段用来实现将1
2<"hello->a.txt",1>
<"hello->a.txt",1>
转化成1
2context.write("hello","a.txt->5");
context.write("hello","b.txt->3");
代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public static class IndexCombiner extends Reducer<Text, Text, Text, Text>{
private Text k = new Text();
private Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
String[] wordAndPath = key.toString().split("->");
String word = wordAndPath[0];
String path = wordAndPath[1];
int counter = 0;
for(Text t:values){
counter += Integer.parseInt(t.toString());
}
k.set(word);
v.set(path+"->"+counter);
}
Reducer
Reducer阶段用来实现将1
2<"hello",{"a.txt->5","b.txt->3"}>
...
转化成1
2("hello","a.txt->5 b.txt->3")
...
代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public static class IndexReducer extends Reducer<Text, Text, Text, Text>{
private Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
String result = "";
for(Text t:values){
result += t.toString() +"\t";
}
v.set(result);
context.write(key, v);
}
}
}
main方法
最后编写main方法,装载相应的类(固定套路)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(InverseIndex.class);
job.setMapperClass(IndexMapper.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setCombinerClass(IndexCombiner.class);
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.waitForCompletion(true);
job.setPartitionerClass(ProviderPartitioner.class);
job.setNumReduceTasks(Integer.parseInt(args[2]));
job.waitForCompletion(true);
}
运行MapReduce程序
将项目导出成Jar文件test.jar,1
hadoop fs -put a.txt b.txt /myfiles 上传到myfiles目录下
运行
1 | hadoop jar/root/test.jar cn.zju.hadoop.mr.InverseIndex /myfiles /resultfiles |
Thanks