PageRank是用于解决网页重要性排序的关键技术之一,其基于网页之间链接关系构建一个有向图结构,实现各个网页级别的划分。一个网页的PageRank值(后面简称PR值),取决于其他网页对该网页的贡献和,以公式形式表示为,其中U表示所有网页指向网页b的网页集合,L(a)表示网页a的出度,d表示用户浏览一个网页的随机概率,用于解决网页关系间的陷阱问题。根据公式递归计算,各网页的PR值将最终趋于稳定。可以发现,该算法的执行实质是一个概率矩阵的迭代乘法运算。
由于Hadoop与Spark对于PageRank算法的实现过程不同,这里分别对Hadoop与Spark算法输入文件进行说明。 对于Hadoop输入文件,每行的数据信息包含网页ID、网页初始PR值1.0以及该网页链接的其他网页ID,以制表符隔开,如
A 1 B,C B 1 C C 1 A,D D 1 B,E E 1 A对于Spark输入文件,以网页ID以及该网页链接的每一个网页ID,作为单独一行保存,如
A B A C B C C A C D D B D E E A为了完成后续的迭代计算,map过程需要将链接关系图和对其他网页的贡献值分别传递给reduce端。 reduce过程根据key将最终计算的PR值与链接关系图合并输出,用于下次迭代的map端。 测试以10次为收敛标准迭代进行,具体代码实现如下:
package org.hadoop.test; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PageRank { private static final double d = 0.85; public static class PRMapper extends Mapper<Object, Text, Text, Text>{ Text link_key = new Text(); Text pr_value = new Text(); Text id_key = new Text(); Text link_value = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ StringTokenizer line = new StringTokenizer(value.toString()); String id = line.nextToken(); double pr = Double.parseDouble(line.nextToken()); String elements = line.nextToken(); //为链接的网页组作标记 link_value.set("@"+elements); String[] links = elements.split(","); int count = links.length; for (String link : links){ String prValue = String.valueOf(pr/count); link_key.set(link); pr_value.set(prValue); //传递所贡献的pr值 context.write(link_key, pr_value); } id_key.set(id); //传递拓扑图 context.write(id_key, link_value); } } public static class PRReducer extends Reducer<Text, Text, Text, Text>{ Text result = new Text(); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ double rank = 0; String pages = ""; for (Text value : values){ String tmp = value.toString(); if (tmp.startsWith("@")){ pages = "\t"+tmp.substring(tmp.indexOf("@")+1); continue; } rank += Double.parseDouble(tmp); } rank = 1-d+d*rank; result.set(rank+pages); context.write(key, result); } } public static void main(String[] args) throws Exception{ if (args.length != 2){ System.err.println("Usage: <in> <out>"); System.exit(2); } Configuration conf = new Configuration(); Job job1 = new Job(conf, "PageRank_tmp"); job1.setJarByClass(PageRank.class); job1.setMapperClass(PRMapper.class); job1.setReducerClass(PRReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1]+"_tmp/output1")); job1.waitForCompletion(true); for (int i=0;i<8;i++){ String inpath = args[1]+"_tmp/output"+String.valueOf(i+1); String outpath = args[1]+"_tmp/output"+String.valueOf(i+2); Job job2 = new Job(conf, "PageRank_tmp"); job2.setJarByClass(PageRank.class); job2.setMapperClass(PRMapper.class); job2.setReducerClass(PRReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2, new Path(inpath)); FileOutputFormat.setOutputPath(job2, new Path(outpath)); job2.waitForCompletion(true); } String inpath = args[1]+"_tmp/output9"; String outpath = args[1]; Job job3 = new Job(conf, "PageRank"); job3.setJarByClass(PageRank.class); job3.setMapperClass(PRMapper.class); job3.setReducerClass(PRReducer.class); job3.setOutputKeyClass(Text.class); job3.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job3, new Path(inpath)); FileOutputFormat.setOutputPath(job3, new Path(outpath)); job3.waitForCompletion(true); } }Spark实现过程将该网页的链接网页RDD与PR值RDD合并为一个RDD执行算子操作,并将链接关系图缓存于内存,便于后续迭代计算。 具体代码实现如下:
import org.apache.spark.{SparkConf, SparkContext} /** * Created by rose on 16-4-26. */ object PageRank { def main(args:Array[String]): Unit ={ if(args.length != 2){ println("Usage: <in> <out>") return } val conf = new SparkConf().setAppName("PageRank") val sc = new SparkContext(conf) val lines = sc.textFile(args(0)) val links = lines.map(line=>{ val parts = line.split("\\s") (parts(0), parts(1)) }).distinct().groupByKey().cache() var ranks = links.mapValues(v => 1.00) for(i <- 1 to 10){ val contrib = links.join(ranks).values.flatMap { case (urls, rank) => { val size = urls.size urls.map(url => (url, rank/size)) } } ranks = contrib.reduceByKey(_+_).mapValues(0.15+0.85*_) } ranks.saveAsTextFile(args(1)) } }1)上传本地文件到HDFS目录下 在HDFS上创建输入文件夹
$hadoop fs -mkdir -p pageRank/input上传本地测试文件到集群的input目录下
$hadoop fs -put ~/file* pageRank/input查看集群文件目录
$hadoop fs -ls pageRank/input2)运行程序 将矩阵乘法算法程序PageRank打包为后缀名为jar的压缩文件PageRank.jar,进入到压缩文件所在文件夹(这里以一个file输入文件和一个output输出文件夹为例说明)。 Hadoop程序运行如下命令执行
$hadoop jar ~/hadoop/PageRank.jar org.hadoop.test.PageRank pageRank/input/file pageRank/hadoop/outputSpark程序运行如下命令执行
$spark-submit --master yanr-client --class PageRank ~/spark/PageRank.jar hdfs://master:9000/pageRank/input/file hdfs://master:9000/pageRank/spark/output3)查看运行结果 查看Hadoop执行结果
$hadoop fs -ls pageRank/hadoop/output查看Spark执行结果
$hadoop fs -ls pageRank/spark/output