private void runNewMapper(JobConf job, org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex splitIndex, TaskUmbilicalProtocol umbilical, Task.TaskReporter reporter)
throws IOException, ClassNotFoundException, InterruptedException
{
.......................
//反射获取map,默认org/apache/hadoop/mapreduce/Mapper,通过job.setMapperClass设置
mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
//反射获取inputFormat,默认org/apache/hadoop/mapreduce/lib/input/TextInputFormat,通过job.setInputFormatClass设置
InputFormat inputFormat = (InputFormat)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
//获取该mapTask要处理的文件,split对象包括文件位置,起始读取偏移量,读取总长度等信息,该MapTask计算任务可以被分配在文件所在节点执行,体现了mapreduce计算向数据移动的特点.
split = (InputSplit)getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());
//通过以上的split和inputFormat,获取一个RecordReader读取器,用于读取split的信息
input = new NewTrackingRecordReader(split,inputFormat, reporter, taskContext);
//根据reduce的个数,构造RecordWriter对象,reduce个数默认1个,通过job.setNumReduceTasks设置
if(job.getNumReduceTasks() == 0)
output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
else
output = new NewOutputCollector(taskContext, job, umbilical, reporter);//重点分析
........................
input.initialize(split, mapperContext);//重点分析
mapper.run(mapperContext);
........................
}
sss