Java操作HBase

xiaoxiao2021-02-28  85

本文通过Java书写MapReduce的方式来对Hbase进行操作 - 使用 MapReduce将 HDFS 的文件导入到 hbase - 从 HBase 实现备份数据到 HDFS - 将 HBase 中的数据导入到 MySQL

创建项目

首先,使用开发工具创建一个maven项目 具体pom文件如下.

pom文件

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cfl</groupId> <artifactId>mapreduce_hbase_demo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.6</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>

注意版本兼容问题!!!

日志跟踪

创建log4j.properties

#OFF,systemOut,logFile,logDailyFile,logRollingFile,logMail,logDB,ALL log4j.rootLogger=ALL,systemOut log4j.appender.systemOut= org.apache.log4j.ConsoleAppender log4j.appender.systemOut.layout= org.apache.log4j.PatternLayout log4j.appender.systemOut.layout.ConversionPattern= [%-5p][%-22d{yyyy/MM/dd HH:mm:ssS}][%l]%n%m%n log4j.appender.systemOut.Threshold= INFO log4j.appender.systemOut.ImmediateFlush= TRUE log4j.appender.systemOut.Target= System.out

接下来,将hadoop的如下配置文件放入项目中 - core-site.xml - hdfs-site.xml - mapred-site.xml - yarn-site.xml - slaves 以及hbase的配置文件 - hbase-site.xml - regionservers

将HBase中的数据导出到HDFS

package com.cfl.mapreduce.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * MapReduce操作HBase:将HBase中的数据写入到HDFS */ public class ImpHDFSFromHBase extends Configured implements Tool { public static class MyTableMapper extends TableMapper<NullWritable, Text>{ private Text text = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String name = null; String num = null; String fee = null; for (Cell cell: value.listCells()) { if (Bytes.toString(CellUtil.cloneQualifier(cell)).equals("name")){ name = Bytes.toString(CellUtil.cloneValue(cell)); } if (Bytes.toString(CellUtil.cloneQualifier(cell)).equals("num")){ num = Bytes.toString(CellUtil.cloneValue(cell)); } if (Bytes.toString(CellUtil.cloneQualifier(cell)).equals("fee")){ fee = Bytes.toString(CellUtil.cloneValue(cell)); } } text.set(name + " " + num + " " + fee); context.write(NullWritable.get(), text); } } public static class MyReduce extends Reducer<NullWritable, Text, NullWritable, Text>{ @Override protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value: values) { context.write(NullWritable.get(), value); } } } @Override public int run(String[] args) throws Exception { Configuration cfg = new Configuration(); cfg.set("mapred.jar", "E:\\code\\workspace_idea\\hadoopproject\\hadoop_mapreduce_demo\\target\\hadoop_mapreduce_demo-1.0-SNAPSHOT.jar"); Job job = Job.getInstance(cfg, "从HBase备份免费课程到HDFS中"); job.setJarByClass(ImpHDFSFromHBase.class); // 查询免费的课程 Scan scan = new Scan(); Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("fee"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes("免费")); scan.setFilter(filter); TableMapReduceUtil.initTableMapperJob(args[0] ,scan, MyTableMapper.class,NullWritable.class, Text.class, job); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 成功返回0,失败返回1 return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.out.println(ToolRunner.run(new ImpHDFSFromHBase(), args)); } }

将HDFS的文件导入到HBase

首先,在HDFS上需要有一个数据文件 比如这样,路径为/user/hadoop/input

package com.cfl.mapreduce.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * MapReduce操作HBase:读取HDFS文件存储到HBase中 */ public class ImpHBaseFormHDFS extends Configured implements Tool { /** * LongWritable 文件中一行文本的偏移量 * Text 文件中一行文本内容 * ImmutableBytesWritable 对应行健 * Put 对应一条数据 */ public static class HDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{ private ImmutableBytesWritable rowkey = new ImmutableBytesWritable(); // rowkey private byte[] info = Bytes.toBytes("info");// 列族 private byte[] name = Bytes.toBytes("name");// 列:课程名称 name private byte[] num = Bytes.toBytes("num");// 列:人数 num private byte[] fee = Bytes.toBytes("fee");// 列:费用 fee @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] strings = value.toString().split("\\s+");// 按空格分隔(一个或多个空格) if (strings.length == 3) { rowkey.set(Bytes.toBytes(strings[0])); // 将课程作为rowkey Put put = new Put(Bytes.toBytes(strings[0])); put.addColumn(info, name, Bytes.toBytes(strings[0])); put.addColumn(info, num, Bytes.toBytes(strings[1])); put.addColumn(info, fee, Bytes.toBytes(strings[2])); context.write(rowkey, put); } } } @Override public int run(String[] args) throws Exception { // Configuration 读取 hadoop core-site.xml文件 Configuration cfg = new Configuration(); // 设置生成的jar名字 cfg.set("mapred.jar", "E:\\code\\workspace_idea\\hadoopproject\\hadoop_mapreduce_demo\\target\\hadoop_mapreduce_demo-1.0-SNAPSHOT.jar"); Job job = Job.getInstance(cfg, "导入课程到HBase中"); job.setJarByClass(ImpHBaseFormHDFS.class); job.setMapperClass(HDFSMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); FileInputFormat.addInputPath(job, new Path(args[0])); // TableMapReduceUtil 读取了hadoop的配置文件和hbase的配置文件,并做了合并 TableMapReduceUtil.initTableReducerJob( args[1], // output table null, // reducer class job); job.setNumReduceTasks(1); // at least one, adjust as required // 成功返回0,失败返回1 return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int n = ToolRunner.run(new ImpHBaseFormHDFS(), args); System.out.println(n); } }

运行之前需要为项目加上参数

将hbase中的数据导入到mysql

package com.cfl.mapreduce.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellUtil; import java.sql.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; /** * MapReduce操作HBase:将HBase中的数据导入到MySql * Map的作用是分布式的查询到符合的记录 * Reduce得到map的输出汇总,连接mysql,存储数据(这样只需要连接一次mysql,提高效率) * 如果在map中连接mysql,存储数据,每一次map都会连接,效率低 */ public class HBaseToMySql extends Configured implements Tool { public static void addTmpJar(String jarPath, Configuration conf) throws IOException { System.setProperty("path.separator", ":"); FileSystem fs = FileSystem.getLocal(conf); String newJarPath = new Path(jarPath).makeQualified(fs).toString(); String tmpjars = conf.get("tmpjars"); if (tmpjars == null || tmpjars.length() == 0) { conf.set("tmpjars", newJarPath); } else { conf.set("tmpjars", tmpjars + "," + newJarPath); } } public static class ReadMap extends TableMapper<NullWritable, Text>{ private Text sql = new Text(); // 获取列的值 private String getValue(String qualifier, Result result){ return Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes(qualifier))); } @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String name = getValue("name", value); String numStr = getValue("num", value); String pay = getValue("fee", value); int num = Integer.parseInt(numStr); String str = "insert into tb_course(name,num,pay) values('"+name+"',"+num+",'"+pay+"')"; sql.set(str); context.write(NullWritable.get(), sql); } } public static class WriteReduce extends Reducer<NullWritable, Text, NullWritable, NullWritable>{ private Connection conn = null; private Statement st = null; // 连接mysql @Override protected void setup(Context context) throws IOException, InterruptedException { try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://192.168.19.95:3306/kgc","root","root"); st = conn.createStatement(); } catch (SQLException e) { throw new InterruptedException(e.getMessage()); } catch (ClassNotFoundException e) { throw new InterruptedException(e.getMessage()); } } // 不做任何输出,插入数据 @Override protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text v: values) { try { st.executeUpdate(v.toString()); } catch (SQLException e) { throw new InterruptedException(e.getMessage()); } } } // 关闭连接 @Override protected void cleanup(Context context) throws IOException, InterruptedException { try { if (st != null) { st.close(); } if (conn != null) { conn.close(); } } catch (SQLException e) { e.printStackTrace(); } } } @Override public int run(String[] args) throws Exception { Configuration cfg = getConf(); addTmpJar(args[0], cfg); cfg.set("mapreduce.job.jar", "E:\\code\\workspace_idea\\hadoopproject\\hadoop_mapreduce_demo\\target\\hadoop_mapreduce_demo-1.0-SNAPSHOT.jar"); Job job = Job.getInstance(cfg, "从 HBase 将收费课程导入到MySQL DB"); job.setJarByClass(HBaseToMySql.class); // 查询含有“K币”的课程 Scan scan = new Scan(); //Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("fee"), CompareFilter.CompareOp.EQUAL, new RegexStringComparator("K币")); Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("fee"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes("免费")); scan.setFilter(filter); TableMapReduceUtil.initTableMapperJob(args[1] ,scan, ReadMap.class, NullWritable.class, Text.class, job); job.setReducerClass(WriteReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.out.println(ToolRunner.run(new HBaseToMySql(), args)); } }

我们要把hbase中的数据导入到mysql,这个过程需要使用第三方的jar,上面笔者是单独用了一个方法 addTmpJar() 来添加第三方jar,因为如果直接使用windows的路径提交会报错,Linux下解析不了windows下的路径,如果你想添加多个第三方jar可以多调用几次addTmpJar()方法。除了这种方式,还可以使用如下方式来提交第三方jar,比如mysql的驱动jar 注意:使用-libjars提交第三方jar时,它不作为参数,只是hadoop会读取它

public int run(String[] args) throws Exception { Configuration cfg = getConf(); cfg.set("mapreduce.job.jar", "E:\\code\\workspace_idea\\hadoopproject\\hadoop_mapreduce_demo\\target\\hadoop_mapreduce_demo-1.0-SNAPSHOT.jar"); Job job = Job.getInstance(cfg, "从 HBase 将收费课程导入到MySQL DB"); job.setJarByClass(HBaseToMySql.class); // 查询含有“K币”的课程 Scan scan = new Scan(); //Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("fee"), CompareFilter.CompareOp.EQUAL, new RegexStringComparator("K币")); Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("fee"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes("免费")); scan.setFilter(filter); TableMapReduceUtil.initTableMapperJob(args[0] ,scan, ReadMap.class, NullWritable.class, Text.class, job); job.setReducerClass(WriteReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1;
转载请注明原文地址: https://www.6miu.com/read-22173.html

最新回复(0)