码说MapReduce

xiaoxiao2021-02-28  20

MapReduce框架作为Hadoop发展初期的核心计算框架,为大数据处理技术飞速演进提供了基石。在Hadoop生态圈中,MapReduce框架由于其成熟稳定的性能,仍然是离线批处理技术的主力。以我们的北京移动大数据集群为例,Hive、SparkSQL是支撑探索性数据查询的主要工具,其简单易懂的SQL语句查询,可以使具备基础数据库管理能力的人员轻松上手,完美地支撑了实时数据查询需求。

在我最初使用Java写MapReduce程序之前,总有一个疑问:既然可以用SQL这么通俗易懂的语句直接操作数据,而且不需要过多了解MapReduce执行过程,为什么还要费力地用Java垒代码,去了解MapReduce的底层执行过程。什么样的应用场景需要我们来开发MapReduce呢?

首先,SQL非常适用于处理结构化数据,对于非结构化数据以及需要特殊函数处理的数据比如文本数据,SQL则会力不从心。举一个小例子,从海量文本数据中提取各种字符编码并翻译为中文,过程中还涉及自动识别是utf-8还是ANSI亦或是其他编码格式,这个需求用MapReduce程序实现起来更为合理;另外,在处理业务逻辑较为复杂的任务时,使用SQL很难实现,其执行效率方面也很难满足业务需求。举例来说,我们需要将业务日志中的域名识别为相应的互联网应用,现实操作中需要分多种情况使用多重判断进行规则匹配,并剔除钓鱼网站和fake url,使用SQL很难实现业务逻辑。再例如,使用SQL进行多表join并叠加复杂的数学运算时,其效率也很难满足业务需求。

在我们的机器学习工具开发过程中,为了使用原有数据建立特征向量,我们需要对原有表结构进行转化,需要迭代原始数据生成具有较多特征值的特征向量。原始数据量为13亿条,共13.2GB,我们尝试使用Hive SQL进行实现,经过测试,任务执行时间过长无法满足需求。而使用MapReduce编写两个Job实现业务逻辑,同时使用哈希算法优化字符串查询效率,最终处理时长为15分钟。应对这些复杂情况,使用MapReduce编程可以使我们获得更多对程序实现的控制和方法选择,通过底层算法优化实现效率提升。

基于不同的业务场景,结合不同工具特点,我们采用SQL脚本和MapReduce开发程序结合的策略,使日常数据处理任务在效率上得到了很好地满足。在我们平台中,MapReduce程序承担了如关键字提取、应用匹配和标签规则运算等近30%的日常数据处理任务。

总之,我们在实际应用中依据灵活性和效率来选择是否自己开发程序。

概览MapReduce

认识MapReduce先从架构入手,在此我们一图以蔽之:

图 1

现在广泛使用的MapReduce v2基于YARN架构,其角色包括Resource Manager(RM)、NodeManager(NM)、Application Master(AM)。RM由Master主机承担,主要负责任务调度和资源调配,NM和AM由各工作节点Slave承担,负责任务的处理和资源读写,其计算单位抽象为container。MapReduce的计算流程可以抽象为Splitting、Mapping、Shuffling、Reducing阶段,其中shuffling包括了Grouping、Sorting、Partitioning过程。以WordCount为例,如下图:

图 2

在掌握了MapReduce架构和原理的基础上,从代码的角度认识MapReduce才是程序员的正确打开方式。

开发MapReduce

MapReduce程序中,Map和Reduce逻辑功能分别通过扩展Mapper类和Reducer类实现。具体在实现过程中,我们在主类中将Mapper和Reducer类扩展并作为内部类调用,最后通过main函数定义输入输出以及Job配置,从而作为程序主入口。

Map实现

Mapper类扩展需要实现map方法,如下:

private static class MyMapper extends Mapper<NullWritable, Writable, IntWritable, Text> { @Override protected void map( NullWritable key, Writable value, Mapper<NullWritable, Writable, IntWritable, Text>.Context context) throws IOException, InterruptedException { } }

根据需求可以扩展setup、cleanup和自定义方法等,扩展Mapper类时需要声明键值对类型,如 Mapper< NullWritable,Writable,IntWritable,Text >,依次分别为输入输出< key,value >类型,其中< NullWritable,Writable >是orc文件格式输入< key,value >类型。

需要强调的是,MapReduce中所有输入输出字段类型都必须实现Writable或者WritableComparable类型,这是因为MapReduce中磁盘读写和节点数据传输过程涉及到数据的序列化和反序列化,需要通过这两类来实现。经常用到的IntWritable、LongWritable、Text等都是实现自WritableComparable类,如果需要,我们也可以扩展这两类实现自定义数据类型。例如,在通过MapReduce实现两表和多表Join的过程中,我通过实现WritableComparable类自定义Map输出的key字段类型,来实现对于Grouping和Sorting阶段不同比较字段的控制。

setup方法在类调用起始阶段运行,可以实现初始阶段对于参数读取和变量赋值的操作。在app应用识别案例中,我们在setup阶段实现对于平台DPI文件的读取操作,以在之后的map阶段实现MapJoin操作,代码如下:

protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); //读取DPI文件HDFS存放路径 String ini=DefaultStringifier.load(conf, "ini", Text.class).toString(); FileSystem fs = FileSystem.get(conf); FSDataInputStream in = fs.open(new Path(ini)); BufferedReader bf = new BufferedReader(new InputStreamReader(in)); String str = null; while ((str = bf.readLine()) != null) { String[] rules = str.split("\\|\\|"); String host = rules[5]; String appname = rules[1] DPIMap.put(host,appname); } if (bf != null) { bf.close(); } }

其中DPIMap是需要在主类中定义的HashMap变量,在map阶段将使用HashMap实现快速查找。

map方法是实现Mapper类的核心方法,map阶段主要逻辑都需要在map方法中实现。map方法参数定义包括输入< key,value >和上下文对象context声明。Context对象负责在MapReduce执行过程中平台配置和Job配置的传递。Job执行过程中,写入的业务逻辑会对每一条数据进行操作,并将中间结果< key,value >值通过context对象写入后台进行之后的shuffle和reduce操作。

例如我需要将业务数据中的host字段与DPI数据的host字段进行等值连接,统计出使用app的次数。我们可以在map方法中实现如下:

protected void map( NullWritable key, Writable value, Mapper<NullWritable, Writable, Text, IntWritable>.Context context) throws IOException, InterruptedException { String appname = new String(); //读入orc格式数据; OrcStruct struct = (OrcStruct)value; TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(SCHEMA); StructObjectInspector inspector = (StructObjectInspector) OrcStruct.createObjectInspector(typeInfo); //读入host字段; try{ host = inspector.getStructFieldData(struct, inspector.getStructFieldRef("host")).toString(); }catch(Exception e){ host = ""; } if ((appname = DpiList.get(host)) != null){ context.write(new Text(appname),new IntWritable(1)); } }

在此默认输入数据为ORC格式,代码中涉及对ORC文件读取方法。

Reduce实现

同Mapper类类似,扩展Reducer类需要实现reduce方法。继续以统计app次数为例,Reducer类扩展实现为:

private static class MyReducer extends Reducer<Text, IntWritable, NullWritable, Writable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { OrcSerde orcSerde = new OrcSerde(); //写orc格式文件操作; Writable row; int sum = 0; StructObjectInspector inspector = (StructObjectInspector) ObjectInspectorFactory .getReflectionObjectInspector(MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); for(IntWritable val:values){ sum = sum + val.get(); } String[] result = {key.toString, Integer.toString(sum)}; row = orcSerde.serialize(new MyRow(result), inspector); context.write(NullWritable.get(), row); } }

其中reduce方法实现的逻辑为对依据key值group之后的value值集合进行加和,并写入HDFS。

在reduce方法中,接收到的value集合通过Iterable接口实现,我们可以通过iterator对象提供的API实现对value值集合的遍历。Reduce的输出我们最终写为ORC格式。

程序主入口main()方法

通过在主类中定义main()方法作为程序的入口,我们需要在此完成对程序参数传递、输入输出配置和HDFS平台配置声明等工作,以app应用识别为例,代码如下:

public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException { String inputPath = args[0]; String outputPath = args[1]; String ini = args[2]; Configuration conf = new Configuration(); //向Mapper传递DPI文件位置; DefaultStringifier.store(conf,ini,"ini"); Job job = new Job(conf); //设置任务队列; conf.set("mapreduce.job.queuename", "background"); job.setJarByClass(StrMatching_dpi_orc.class); //设置reduce数量; job.setNumReduceTasks(40); //定义输入输出文件类型; job.setInputFormatClass(OrcNewInputFormat.class); job.setOutputFormatClass(OrcNewOutputFormat.class); //配置输入输出文件路径; FileInputFormat.addInputPath(job, new Path(inputPath)); FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(outputPath))) { fs.delete(new Path(outputPath), true); } FileOutputFormat.setOutputPath(job, new Path(outputPath)); //设置Map输出的<key,value>类型; job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置最终输出结果<key,value>类型; job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Writable.class); //声明Mapper类和Reducer类; job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //执行任务,结束后自动退出; System.exit(job.waitForCompletion(true) ? 0 : 1); }

此例main()方法主要完成了对输入输出类型和路径的配置、任务执行队列和资源配置的定义。main()方法主要完成对程序接口的定义和资源调配,以上代码展示了一个最基本main()方法的定义。如果任务需要,我们还可以完成诸如自定义Group Comparator、Sort Comparator、Partitoner等对象的定义,并在main()方法中声明,作为MapReduce程序的comparator。

在我们平台的日常任务中,我们放弃使用占用空间较大的Text和Sequence文件格式,完全使用ORC文件格式作为数据存储格式。这样可以实现自定义MapReduce程序与Hive平台的无缝结合,更重要的是,可以为平台节省十倍的存储空间。

ORC存储方法

ORC File是Optimized Row Columnar (ORC) file的简称,它基于RCFile格式进行了优化。ORC文件格式的设计初衷是为了提高Hive数据读写以及数据处理能力,由于其实现了一定的数据压缩,可以占用更小的数据存储。

我们使用ORC格式作为MapReduce和Hive工具的统一存储格式,可以节省平台大量的存储空间,同时也实现了MapReduce程序与Hive的更好结合。

经过我们平台日常任务的实测积累,ORC文件格式可以为Hive提供稳定快速的数据读写,并且与Text文件存储相比,可以节省十倍的存储空间,可以大幅提升平台数据存储和处理能力。对于MapReduce程序读写ORC文件,无法像未压缩的Text文件一样直接读写,还需要做关于表数据结构声明等工作。

读ORC文件

仍然以app应用识别为例,主类中需要定义变量SCHEMA,声明读入表结构:

private static final String SCHEMA = "struct<ID:string,Name:string, time:string,fst_uri:string,host:string>";

读取ORC文件格式的代码如下:

OrcStruct struct = (OrcStruct)value; TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(SCHEMA); StructObjectInspector inspector = (StructObjectInspector) OrcStruct.createObjectInspector(typeInfo); //读入host字段; try{ host = inspector.getStructFieldData(struct, inspector.getStructFieldRef("host")).toString(); }catch(Exception e){ host = ""; }

首先,需要将读入的value值强制类型转换为OrcStruct,然后根据表结构实例化StructObjectInspector对象为inspector,最后使用StructObjectInspector类提供的API对字段进行读取。

写ORC文件

与读入过程相对应,写ORC文件代码如下:

OrcSerde orcSerde = new OrcSerde(); Writable row; int sum = 0; StructObjectInspector inspector = (StructObjectInspector) ObjectInspectorFactory .getReflectionObjectInspector(MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); for(IntWritable val:values){ sum = sum + val.get(); } String[] result = {key.toString, Integer.toString(sum)}; row = orcSerde.serialize(new MyRow(result), inspector); context.write(NullWritable.get(), row);

我们需要根据自定义的数据类型MyRow类实例化StructObjectInspector为inspector,然后使用OrcSerde对象将最终计算结果进行序列化并写入HDFS。其中MyRow类是通过扩展Writable类,对输出数据类型进行了定义,在类中完成了对输出表结构字段的定义和赋值,代码如下:

public class MyRow implements Writable { String appname; int cnt; MyRow(String[] val){ this.appname = val[0]; this.cnt = Integer.parseInt(val[1]); } @Override public void readFields(DataInput arg0) throws IOException { throw new UnsupportedOperationException("no write"); } @Override public void write(DataOutput arg0) throws IOException { throw new UnsupportedOperationException("no read"); } }

在上面的章节中,我们介绍了MapReduce开发在北京移动大数据平台上的应用背景和部分应用案例。尽管MapReduce由于处理机制中大量的磁盘读写带来了数据处理效率的瓶颈,但在日常离线数据处理任务中由于其成熟稳定的性能,MapReduce仍然扮演着十分重要的角色。

随着技术的不断发展,诸如Spark等更加快速的计算引擎也将逐步取代MapReduce的地位,我们也在一步步尝试替换和优化我们的应用场景,也欢迎大家多提意见和建议,谢谢!

作者简介:孙昊,毕业于Auburn Univeristy,获得EE工程硕士学位,专攻信息论与信息安全,现就职于北京移动网运中心。熟悉信息论与数理统计,善用Java、Scala语言编程,熟悉C、C++、Python语言以及H5、Javascript等网络编程语言,精通MapReduce编程框架,熟悉Storm、Spark Streaming大数据实时处理技术。2014年底加入北京移动eBDA大数据分析团队,担任团队大数据软件开发工程师,参与完成了搜索关键字产品、互联网流量视图、应用匹配产品、用户标签体系产品等大数据项目。eBDA大数据分析团队是一支扎根于北京移动网运中心,致力于提升数据价值、改善用户体验,取之于民用之于民的有理想有能力的团队。

责编:郭芮,编辑,关注大数据领域,寻求报道或者投稿请发邮件guorui@csdn.net。

转载请注明原文地址: https://www.6miu.com/read-2400039.html

最新回复(0)