Nutch-0.9源代码:Injector类

xiaoxiao2026-06-07  5

在对Nutch抓取工作流程分析中,已经简单地提及到了inject操作,如下所示: inject操作调用的是nutch的核心包之一crawl包中的类org.apache.nutch.crawl.Injector。它执行的结果是:crawldb数据库内容得到更新,包括URL及其状态。 inject操作主要作用可以从下面3方面来说明: (1) 将URL集合进行格式化和过滤,消除其中的非法URL,并设定URL状态(UNFETCHED),按照一定方法进行初始化分值; (2) 将URL进行合并,消除重复的URL入口; (3) 将URL及其状态、分值存入crawldb数据库,与原数据库中重复的则删除旧的,更换新的。 现在,根据上面的信息,可以进一步分析一下: 因为已经初始化了一个URL集合,那么这个集合中就存在多个URL,由此可以想到,如果由于输入错误可能导致错误的URL存在,所以inject操作需要对其进行检查核实,合法的才会去执行抓取,否则执行中发现是错误的会浪费CPU这宝贵资源。另外,为了防止重复抓取URL,需要设定一个标志位来标识该URL完成抓取与否。那么这些信息应该被存放到某个地方,以备下次启动抓取工作的时候读取,存放到哪里呢?当然是CrawlDB了,更新已经初始化的 CrawlDB实体的实例信息,对应于文件系统中的crawldb目录。 再考虑,如果本次抓取工作完成了,下次要启动了,同时也对应一个初始化的URL集合,那么这里面出现的URL可能在上次被抓取过,是否被抓取过,可以从CrawlDB中查看到详细的信息。对于重复的URL当然不希望再次被抓取(如果该URL对应的页面信息没有完全变更),应该忽略掉,这就涉及到了对抓取的页面去除重复的URL,这是应该做的,这也可以称为合并操作。 如果你了解MapReduce模型的话,现在已经能够想到,这里面可以实现MapReduce模型的,Nutch就实现了MapReduce模型 (实际上是在Hadoop中实现的,因为对于每个Map和Reduce实现类都分别实现了 org.apache.hadoop.mapred.Mapper接口与org.apache.hadoop.mapred.Reducer)。 Mapper实现对URL集合数据的映射,Reducer实现了对URL的合并操作。这里提及MapReduce模型,有助于对Injector类的两个静态内部类InjectMapper和InjectReducer理解。 org.apache.nutch.crawl.Injector类实现了org.apache.hadoop.util.ToolBase抽象类,如下所示: public class Injector extends ToolBase 而org.apache.hadoop.util.ToolBase抽象类又实现了org.apache.hadoop.util.Tool接口。如果你对Tool类了解,及其配置部署过Hadoop自带的WordCount工具的时候,就理解了,实现Tool接口的实现类可以通过 org.apache.hadoop.util.ToolRunner类来启动执行MapReduce任务的工具。 先看Injector类中如何实现Map的,InjectMapper类的实现如下所示:    /** 标准化初始化的URLs,并且过滤注入的URLs */ public static class InjectMapper implements Mapper { // 实现Mapper接口,就要实现该接口中定义的map函数     private URLNormalizers urlNormalizers; // URL标准化工具,可以实现URL的标准化     private float interval; // 设置抓取间隔时间     private float scoreInjected; // 设置注入URL对应页面的得分值     private JobConf jobConf; // 抓取工作配置实例     private URLFilters filters; // URL过滤器     private ScoringFilters scfilters; // 得分过滤器     private long curTime; // 设置注入时间     public void configure(JobConf job) { // 为一次抓取工作进行配置       this.jobConf = job;       urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT);       interval = jobConf.getFloat("db.default.fetch.interval", 30f);       filters = new URLFilters(jobConf);       scfilters = new ScoringFilters(jobConf);       scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);       curTime = job.getLong("injector.current.time", System.currentTimeMillis());     }     public void close() {}     public void map(WritableComparable key, Writable val,                     OutputCollector output, Reporter reporter)       throws IOException { // map函数的实现是核心       Text value = (Text)val;       String url = value.toString();              // 从初始化URL集合中读取一行,一行是一个URL       // System.out.println("url: " +url);       try {         url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);       // 标准化URL         url = filters.filter(url);             // 过滤URL,去除不合法的URL       } catch (Exception e) {         if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); }         url = null;       }       if (url != null) {                          // 如果合法,则解析该URL         value.set(url);                           // 将合法的URL收集到Text value对象中          // 其中,org.apache.nutch.crawl.CrawlDatum类中定义了URL的各种可以设置的状态,可以在该类的对象中设置与URL相关的有用的信息,比如注入状态、抓取间隔时间,抓取时间、得分等等         CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, interval);         datum.setFetchTime(curTime);         datum.setScore(scoreInjected);         try {           scfilters.injectedScore(value, datum);         } catch (ScoringFilterException e) {           if (LOG.isWarnEnabled()) {             LOG.warn("Cannot filter injected score for url " + url +                      ", using default (" + e.getMessage() + ")");           }           datum.setScore(scoreInjected);         }         output.collect(value, datum); // 收集key/value对,并输出结果       }     } } 从上面InjectMapper类的实现可以看出,其中包含的key应该是URL本身,而value则是与注入的URL对应的信息,比如URL当前状态信息(是否已经抓取过,或者不需要抓取)等等。 接着再看InjectReducer类,它实现了Reduce操作,代码如下所示:    /** 为一个URL合并多个新的入口. */ public static class InjectReducer implements Reducer {     public void configure(JobConf job) {}       public void close() {}     public void reduce(WritableComparable key, Iterator values,                        OutputCollector output, Reporter reporter)       throws IOException { // reduce函数的实现也是核心的       CrawlDatum old = null;       CrawlDatum injected = null;       while (values.hasNext()) { // 根据Map任务映射得到的迭代器,进行遍历得到的中间结果         CrawlDatum val = (CrawlDatum)values.next();         if (val.getStatus() == CrawlDatum.STATUS_INJECTED) { // 如果某个URL已经注入到CrawlDB           injected = val;           injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); // 则设置这个URL对应的页面不用进行抓取         } else {           old = val; // 否则如果没有注入过,则需要对该URL对应的页面进行抓取         }       }       CrawlDatum res = null;       if (old != null) res = old; // 不要重写已经存在的value       else res = injected;       output.collect(key, res); // 收集key/value对;合并的最终结果是,使得将要注入到CrawlDB中URL没有重复的     } } 实现注入的操作是Injector类的inject()方法中,如下所示:    public void inject(Path crawlDb, Path urlDir) throws IOException {     if (LOG.isInfoEnabled()) {       LOG.info("Injector: starting");       LOG.info("Injector: crawlDb: " + crawlDb);       LOG.info("Injector: urlDir: " + urlDir);     }     Path tempDir =       new Path(getConf().get("mapred.temp.dir", ".") +                "/inject-temp-"+                Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); // 临时目录用来存放MapReduce工作中生成的中间结果数据的     // map text input file to a <url,CrawlDatum> file     if (LOG.isInfoEnabled()) {       LOG.info("Injector: Converting injected urls to crawl db entries.");     }     JobConf sortJob = new NutchJob(getConf());     sortJob.setJobName("inject " + urlDir);     sortJob.setInputPath(urlDir);     sortJob.setMapperClass(InjectMapper.class);     sortJob.setOutputPath(tempDir);     sortJob.setOutputFormat(SequenceFileOutputFormat.class);     sortJob.setOutputKeyClass(Text.class);     sortJob.setOutputValueClass(CrawlDatum.class);     sortJob.setLong("injector.current.time", System.currentTimeMillis());     JobClient.runJob(sortJob);     // merge with existing crawl db     if (LOG.isInfoEnabled()) {       LOG.info("Injector: Merging injected urls into crawl db.");     }     JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);     mergeJob.addInputPath(tempDir);     mergeJob.setReducerClass(InjectReducer.class);     JobClient.runJob(mergeJob);     CrawlDb.install(mergeJob, crawlDb);     // 删除临时文件     FileSystem fs = new JobClient(getConf()).getFs();     fs.delete(tempDir);     if (LOG.isInfoEnabled()) { LOG.info("Injector: done"); } } 该方法比较容易理解,在这里安装CrawlDB了,从而真正地将URLs注入到CrawlDB中了,更新crawldb目录中的数据。 要想执行inject操作,需要启动,这是在Injector类中的run()方法中启动的,如下所示:    public int run(String[] args) throws Exception {     if (args.length < 2) { // 根据命令行进行启动       System.err.println("Usage: Injector <crawldb> <url_dir>");       return -1;     }     try {       inject(new Path(args[0]), new Path(args[1])); // 调用inject()方法       return 0;     } catch (Exception e) {       LOG.fatal("Injector: " + StringUtils.stringifyException(e));       return -1;     } } 最后,就是在main主函数中:    public static void main(String[] args) throws Exception {     int res = new Injector().doMain(NutchConfiguration.create(), args);     System.exit(res); } 其中,doMain()方法是在org.apache.hadoop.util.ToolBase抽象类中实现的,就是通过ToolRunner工具启动工作,如下所示:    public final int doMain(Configuration conf, String[] args) throws Exception {     return ToolRunner.run(conf, this, args); } 启动inject操作,实际所做的工作就是对URL进行预处理,检查每个初始化URL的合法性,从而更新到CrawlDB中。待inject操作完成之后,就可以执行后继操作了——应该是生成抓取列表(generate操作)。

相关资源:敏捷开发V1.0.pptx
转载请注明原文地址: https://www.6miu.com/read-5049747.html

最新回复(0)