复习总结01:Hadoop

xiaoxiao2021-02-28  8

一、关于设置hdfs ①获取hdfs文件系统 1 Configuration configuration = new Configuration(); 2 FileSystem fSystem = fSystem = FileSystem.get(new URI("hdfs://server:9000"), configuration, "hadoop"); 备注:抛出异常Exception;直接操作hdfs上的文件,需要输入用户名,而提交Job等不需要输入用户名。 或者Run Configuration 的Environment variables HADOOP_USER_NAME hadoop②单元测试 @before  标识的函数 A---->对某函数B进行单元测试前必须先执行的函数A @Test  被测试的函数 B 二、运行模式 /***********设定本地测试模式 输入输出文件,,在本地或者集群*************/ 1        Configuration conf = new Configuration(); 2        Job job = Job.getInstance(conf); 3 4        job.setJarByClass(WordCountDriver.class); 5        job.setMapperClass(WordCountMapper.class); 6        job.setReducerClass(WordCountReducer.class); 7 8        job.setMapOutputKeyClass(Text.class); 9        job.setMapOutputValueClass(IntWritable.class); 10 11        job.setOutputKeyClass(Text.class); 12        job.setOutputValueClass(IntWritable.class); 13 14        //输入输出的文件都在本地,适合来测试代码可靠性 15        FileInputFormat.setInputPaths(job, new Path("d:/a.txt"));   //执行本地模式 16        FileOutputFormat.setOutputPath(job, new Path("d:/a_out"));  //路径必须是新的 17        //将上两句设置为以下两句即可 实现:输入输出的文件都在集群,在本地运行,实际中大文件传输耗损!不能发挥云计算的优点 18        FileInputFormat.setInputPaths(job, new Path("hdfs://server:9000/a.txt")); 19        FileOutputFormat.setOutputPath(job, new Path("hdfs://server:9000/aaaa_out")); 20 21        boolean res = job.waitForCompletion(true); 22        System.exit(res ? 0 : 1); 备注:本地运行时,可以不 设置Run Configuration 的Environment variables HADOOP_USER_NAME hadoop 也能正常运行。 (即本地运行,用户可设置也可不设置) 只是文件的user name 是本机电脑的 user_name 不是hadoop /***********设定集群运行模式 输入输出文件都在集群*************/ 方式A 将程序打成 JAR 包,然后在集群的任意一个节点上用 hadoop 命令启动 推荐 $ hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver inputpath outputpath 方式B 修改YarnRunner,并且将四个配置文件放在src下。在执行时也需要打包放到linux上。 只是把控制台的消息打印在了windows的IDEA上,实际意义不大,放弃! 故总结运行模式:①本地来测试代码逻辑;②打包提交到集群模式运行 三、Hadoop开发 MapReduce框架用到的Java API ①TestMapper类继承 Mapper <KEYIN, VALUEIN, KEYOUT, VALUEOUT>{} 1 KEYIN :  LongWritable   //KEYIN: 默认情况下是mr框架所读到的一行文本的起始偏移量。但是在hadoop中有自己的更精简的序列化接口LongWritable(extends WritableComparable) 2 VALUEIN :  Text    //读取的一行数据String 原因同上故用Text 3 KEYOUT   :  Text    //用户自定义输出的数据格式key   一般用Text满足序列化 4 VALUEOUT : IntWritable   //用户自定义输出的数据value   一般输出是个对象,算法中可能会用IntWritable(1); 实现方法A:protected void  map (LongWritable key, Text value, Context context) throw Exception{}    //map阶段的业务逻辑就写在自定义的map()方法中, maptask会对每一行输入数据调用一次自定义的map()方法。   context.write(new Text(key) ,new IntWritable(value)) 无参数时用nullWritable 实现方法B:protected void setup(Context context ,)throws Exception{}     //task启动的时候调用。 实现方法C:protected void cleanup(Context context ,)throws Exception{}  //task结束的时候调用。 ② TextReducer类继承 Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}类 1 KEYIN, VALUEIN   //对应 mapper输出的KEYOUT,VALUEOUT类型对应 2 KEYOUT, VALUEOUT  //是自定义reduce逻辑处理结果的输出数据类型一般是<Text,Bean> 实现方法A:protected void  reduce(Text  key , Iterable<IntWritable>  values , Context  context throws  Exception {}类   //按照key分组,确定reduceTask的个数。每个reduceTask都会执行此自定义的reduce()方法,对values进行迭代   //如果key来传过来的是对象, 每个对象都是不一样的, 所以每个对象都调用一次reduce方法。 实现方法B:protected void setup(Context context ,)throws Exception{} //task启动的时候调用。 实现方法C:protected void cleanup(Context context ,)throws Exception{} //task结束的时候调用。 ③TextDriver类 执行main方法<完成九行设置,两行IO,两行等待结束> ……………………………………………………以上是标准MR程序的设计规范,以下补充一些常用的API………………………………………………………………………… 扩展API ①String[] values=value.toString().split("\t");//将读取的第一行数据,按照字符串里面内容分片. 数据表格:可将多重数据存在字符串中,然后再次利用split(",")方法取出各数据 判断读入的字符串是否非空         StringUtils.isNotEmpty(line) 检测字符串相同与否用       .equals(); 将字符串中的数字转变成int型         Integer.parseInt(fields[0]) ②StringBuffer:起到String没有的功能      stringBuffer.append(variable).append(","); ③Text, IntWritable,LongWritable.  new Text("A");   new  IntWritable(1);   new  LongWritable(); ④Iterable<T>接口,Iterator<T> iterator();用在reduce()方法直接写出结果的情况。context.write(values.iterator().next(), bean); 在处理切片时,通过文件名区别读入的切片 FileSplit inputSplit = (FileSplit) context.getInputSplit();//获得输入切片 String fileName = inputSplit.getPath().getName();//获得切片的源文件名 ⑥遍历values: for(String value : values); partitioner<key ,value>抽象类。继承后重写i nt getPartition(KEY key, VALUE valueint numPartitions);实现该方法,自定义某(k,v)对对应的reduceTask。 1 // 指定我们自定义的数据分区器 2 job.setPartitionerClass(ProvincePartitioner.class); 3 // 同时指定相应“分区”数量的reducetask 4 job.setNumReduceTasks(5); 构建ProvincePartitioner.class 继承Partitioner 实现字段匹配分类 1   public static HashMap<String, Integer> proviceDict = new HashMap<String, Integer>(); 2 3    static { 4        proviceDict.put("136", 0); 5        proviceDict.put("137", 1); 6        proviceDict.put("138", 2); 7        proviceDict.put("139", 3); 8   }     9 10    String prefix = key.toString().substring(0, 3);  //根据字段,区分ID 11    Integer provinceId = proviceDict.get(prefix);    //获取ID 12 13    return (provinceId == null) ? 4 : provinceId;    //返回ID 四、关于Map等集合:大数据处理中的(k,v)对,Java一定会用到Map。 Map用于保存具有映射关系的数据,因此Map集合里保存着两组值, K-V,Key不能重复。 元素的Key索引 从Map里取出该元素 Map里: 所有Key放在一起来看==>一个Set集合。Value放在一起==>一个List 。即 Map<Set , List>. Map接口下有HashMap、LinkedHashMap、SortedMap、TreeMap、EnumMap。 1 Map常用方法: 2 void clear();    //删除该Map对象的所有key-value对。 3 boolean containsKey(Object key);       //查询Map中是否包含指定的key,如果包含则返回true。 4 boolean containsValue(Obejct,value);        //查询Map中是否包含一个或多个Value,如果包含则返回true。 5 Set entrySet();返回Map中包含的key-value对所组成的Set集合,每个集合元素都是Map.Entry对象。 6 Object get(Object key);返回指定key所对应的value,如果此Map中不包含该key,则返回null。 7 boolean isEmpty();查询该Map是否为空。若空为true。 8 Set keySet();    //返回该Map中所有key组成的Set集合。 9 Object put(Object key,Object value);     //添加一个key-value对。如果有则覆盖。 10 void putAll(Map m);      //将m中的key-value对复制到本Map中。 11 Object remove(Object key);      //删除指定key所对应的key-value对,返回被删除key所关联的value,如果没有则返回null。 12 boolean remove(Object key,Object value);     //删除指定k-v对。成功删除则返回true,否则为false。 13 int size();       //返回该Map里的k-v对的个数。 14 Collection values();        //返回该Map里所有value组成的Collection。 15 16 Map内部类Entry,该类封装了k-v对,Entry包含三个方法: 17 Object getKey();返回该Entry里包含的key值。 18 Object getValue();返回该Entry里包含的value值。 19 Object setValue(V value);设置Entry里包含的value值,并返回新设置的value。 五、关于MapReduce编程的整理与总结 mapreduce在编程的时候,基本上一个固化的模式,没有太多可灵活改变的地方,除了以下几处: 1、输入数据接口:InputFormat   --->     FileInputFormat(文件类型数据读取的通用抽象类)  DBInputFormat (数据库数据读取的通用抽象类)    默认使用的实现类是: TextInputFormat     job.setInputFormatClass(TextInputFormat.class)    TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回     2、逻辑处理接口: Mapper      完全需要用户自己去实现其中  map()   setup()   clean()        3、map输出的结果在shuffle阶段会被partition以及sort,此处有两个接口可自定义: Partitioner 有默认实现 HashPartitioner,逻辑是  根据key和numReduces来返回一个分区号; key.hashCode()&Integer.MAXVALUE % numReduces。 通常情况下,用默认的这个HashPartitioner就可以,如果业务上有特别的需求,可以自定义 Comparable 当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,override其中的compareTo()方法 4、reduce端的数据分组比较接口 : Groupingcomparator  reduceTask拿到输入数据(一个partition的所有数据)后,首先需要对数据进行分组,其分组的默认原则是key相同,然后对每一组kv数据调用一次reduce()方法,并且将这一组kv中的第一个kv的key作为参数传给reduce的key,将这一组数据的value的迭代器传给reduce()的values参数。 利用上述这个机制,我们可以实现一个高效的分组取最大值的逻辑: 自定义一个bean对象用来封装我们的数据,然后改写其compareTo方法产生倒序排序的效果, 然后自定义一个Groupingcomparator,将bean对象的分组逻辑改成按照我们的业务分组id来分组(比如订单号) 这样,我们要取的最大值就是reduce()方法中传进来key 5、逻辑处理接口:Reducer 完全需要用户自己去实现其中  reduce()   setup()   clean()    6、输出数据接口: OutputFormat  ---> 有一系列子类  FileOutputformat  DBoutputFormat  ..... 默认实现类是TextOutputFormat,功能逻辑是:  将每一个KV对向目标文本文件中输出为一行 六、MR框架编程要点: 详见eclipse的Test模板包 手机号的流量数据分析: ①从hdfs上获取文件的手机号及其流量统计的数据 ②输出结果按照流量大小排序 ③输出结果按照省份分割数据到不同的文件 一、项目分析: ①对hdfs上的数据格式分析,确定(k,v)手机号来做key;上下行数据量构成对象做value。 ②构造对象Bean,成员变量及其方法。 ③编写Mapper和Reducer类及方法 1、完成Bean的对象构造 ①根据对象特点,完善其成员变量以及其get(),set()方法; ②因为要使用context.write(k,v)所以对象要实现Writable的接口;实现后即可以被写 ③实现String toString();方法。 备注:如果作为对象的模板学会 修改成员变量; 2、编写Mapper类(在TestMR类中写静态mapper类) ①:继承Mapper确定输入输出的数据格式; ②:读取一行内容,用空格"\t"或者某些标识切割一行数据,获取内容存储在字符串数组中; ③:数据解析后构造对象,写入context。 3、编写Reducer类(在TestMR类中写静态reducer类) ①确定输入输出kv对与Mapper输出的kv对格式相同; ②按照key完成Values的遍历,迭代等操作; ③输出结果文件,注意输出的对象的定义,最后封装对象输出。 4、编写Driver类(一般在TestMR类中写main方法实现) ①设定Job; ②完成一系列的设置,类,输入输出格式,文件路径; ③提交Job,检测。 5、提交Job,进行云计算 ①打包jar,sftp提交到host上; ②执行 hadoop jar Test.jar test.TestMR ③执行 hadoop fs -cat /output/part-r-00000 读取结果文件
转载请注明原文地址: https://www.6miu.com/read-1250134.html

最新回复(0)