df

xiaoxiao2021-02-28  127

private void runNewMapper(JobConf job, org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex splitIndex, TaskUmbilicalProtocol umbilical, Task.TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException { ....................... //反射获取map,默认org/apache/hadoop/mapreduce/Mapper,通过job.setMapperClass设置 mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job); //反射获取inputFormat,默认org/apache/hadoop/mapreduce/lib/input/TextInputFormat,通过job.setInputFormatClass设置 InputFormat inputFormat = (InputFormat)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); //获取该mapTask要处理的文件,split对象包括文件位置,起始读取偏移量,读取总长度等信息,该MapTask计算任务可以被分配在文件所在节点执行,体现了mapreduce计算向数据移动的特点. split = (InputSplit)getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); //通过以上的split和inputFormat,获取一个RecordReader读取器,用于读取split的信息 input = new NewTrackingRecordReader(split,inputFormat, reporter, taskContext); //根据reduce的个数,构造RecordWriter对象,reduce个数默认1个,通过job.setNumReduceTasks设置 if(job.getNumReduceTasks() == 0) output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); else output = new NewOutputCollector(taskContext, job, umbilical, reporter);//重点分析 ........................ input.initialize(split, mapperContext);//重点分析 mapper.run(mapperContext); ........................ } sss
转载请注明原文地址: https://www.6miu.com/read-47236.html

最新回复(0)