5.7 HBase和MapReduce整合

xiaoxiao2021-02-28  27

读hdfs数据写到HBase中: 注意hadoop和HBase的包都需要导入, 先创建表 WordCountMapper: package com.laoxiao.mr.hbase; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /**  *  * @author root  * 定义map任务输入和输出数据类型。  *  */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{  /**   * map方法是一行数据调用一次。每一次调用传入一行数据。该行数据的下标位为key。内容为value   */  protected void map(LongWritable key, Text value,    Context context)    throws IOException, InterruptedException {    String[] words =value.toString().split(" ");    for (int i = 0; i < words.length; i++) {    String w = words[i];    Text outkey =new Text(w);    IntWritable outvalue=new IntWritable(1);    context.write(outkey, outvalue);   }  } } WordCountReducer: package com.laoxiao.mr.hbase; import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; public class WordCountReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{  /*   * 该方法每一组调用一次。   */  protected void reduce(Text key, Iterable<IntWritable> arg1,    Context context)    throws IOException, InterruptedException {   int sum =0;   for(IntWritable i :arg1){    sum=sum+i.get();   }   Put put = new Put(key.toString().getBytes());   put.add("cf".getBytes(), "count".getBytes(), (sum+"").getBytes());   context.write(null, put);  } } RunJob: package com.laoxiao.mr.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class RunJob {  public static void main(String[] args) throws Exception {   Configuration config =new Configuration();   config.set("fs.defaultFS", "hdfs://node1:9000");   config.set("yarn.resourcemanager.hostname", "node1"); // config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");   // hbase zk集群   config.set("hbase.zookeeper.quorum", "node1"); // try {    Job job =Job.getInstance(config);    job.setJobName("word count");    job.setJarByClass(RunJob.class);//job的入口类        job.setMapperClass(WordCountMapper.class);        job.setMapOutputKeyClass(Text.class);    job.setMapOutputValueClass(IntWritable.class);        String tableName = "wc";    // 本地方式运行 参数addDependencyJars 需要设置为false 如果集群模式运行 保持true即可 // TableMapReduceUtil.initTableReducerJob(tableName, WordCountReducer.class, job);//集群运行    TableMapReduceUtil.initTableReducerJob(tableName, WordCountReducer.class, job,      null, null, null, null, false);          Path inputPath = new Path("/usr/input/wc.txt");    FileInputFormat.addInputPath(job, inputPath);        boolean f= job.waitForCompletion(true);    if(f){     System.out.println("job执行成功!");    } // } catch (Exception e) { // // TODO Auto-generated catch block // e.printStackTrace(); // }    } }
转载请注明原文地址: https://www.6miu.com/read-2624951.html

最新回复(0)