本文通过Java书写MapReduce的方式来对Hbase进行操作 - 使用 MapReduce将 HDFS 的文件导入到 hbase - 从 HBase 实现备份数据到 HDFS - 将 HBase 中的数据导入到 MySQL
创建项目
首先,使用开发工具创建一个maven项目 具体pom文件如下.
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0
</modelVersion>
<groupId>com.cfl
</groupId>
<artifactId>mapreduce_hbase_demo
</artifactId>
<version>1.0-SNAPSHOT
</version>
<packaging>jar
</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop
</groupId>
<artifactId>hadoop-client
</artifactId>
<version>2.7.3
</version>
</dependency>
<dependency>
<groupId>org.apache.hbase
</groupId>
<artifactId>hbase-client
</artifactId>
<version>1.2.6
</version>
</dependency>
<dependency>
<groupId>org.apache.hbase
</groupId>
<artifactId>hbase-server
</artifactId>
<version>1.2.6
</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins
</groupId>
<artifactId>maven-compiler-plugin
</artifactId>
<version>3.5.1
</version>
<configuration>
<source>1.8
</source>
<target>1.8
</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
注意版本兼容问题!!!
日志跟踪
创建log4j.properties
#OFF,systemOut,logFile,logDailyFile,logRollingFile,logMail,logDB,ALL
log4j
.rootLogger=ALL,systemOut
log4j
.appender.systemOut= org
.apache.log4j
.ConsoleAppender
log4j
.appender.systemOut.layout= org
.apache.log4j
.PatternLayout
log4j
.appender.systemOut.layout.ConversionPattern= [%-
5p][%-
22d{yyyy/MM/dd HH:mm:ssS}][%l]%n%m%n
log4j
.appender.systemOut.Threshold= INFO
log4j
.appender.systemOut.ImmediateFlush= TRUE
log4j
.appender.systemOut.Target= System
.out
接下来,将hadoop的如下配置文件放入项目中 - core-site.xml - hdfs-site.xml - mapred-site.xml - yarn-site.xml - slaves 以及hbase的配置文件 - hbase-site.xml - regionservers
将HBase中的数据导出到HDFS
package com.cfl.mapreduce.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* MapReduce操作HBase:将HBase中的数据写入到HDFS
*/
public class ImpHDFSFromHBase extends Configured implements Tool {
public static class MyTableMapper extends TableMapper<NullWritable, Text>{
private Text text =
new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
String name =
null;
String num =
null;
String fee =
null;
for (Cell cell: value.listCells()) {
if (Bytes.toString(CellUtil.cloneQualifier(cell)).equals(
"name")){
name = Bytes.toString(CellUtil.cloneValue(cell));
}
if (Bytes.toString(CellUtil.cloneQualifier(cell)).equals(
"num")){
num = Bytes.toString(CellUtil.cloneValue(cell));
}
if (Bytes.toString(CellUtil.cloneQualifier(cell)).equals(
"fee")){
fee = Bytes.toString(CellUtil.cloneValue(cell));
}
}
text.set(name +
" " + num +
" " + fee);
context.write(NullWritable.get(), text);
}
}
public static class MyReduce extends Reducer<NullWritable, Text, NullWritable, Text>{
@Override
protected void reduce(NullWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value: values) {
context.write(NullWritable.get(), value);
}
}
}
@Override
public int run(String[] args)
throws Exception {
Configuration cfg =
new Configuration();
cfg.set(
"mapred.jar",
"E:\\code\\workspace_idea\\hadoopproject\\hadoop_mapreduce_demo\\target\\hadoop_mapreduce_demo-1.0-SNAPSHOT.jar");
Job job = Job.getInstance(cfg,
"从HBase备份免费课程到HDFS中");
job.setJarByClass(ImpHDFSFromHBase.class);
Scan scan =
new Scan();
Filter filter =
new SingleColumnValueFilter(Bytes.toBytes(
"info"), Bytes.toBytes(
"fee"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(
"免费"));
scan.setFilter(filter);
TableMapReduceUtil.initTableMapperJob(args[
0] ,scan, MyTableMapper.class,NullWritable.class, Text.class, job);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job,
new Path(args[
1]));
return job.waitForCompletion(
true) ?
0 :
1;
}
public static void main(String[] args)
throws Exception {
System.out.println(ToolRunner.run(
new ImpHDFSFromHBase(), args));
}
}
将HDFS的文件导入到HBase
首先,在HDFS上需要有一个数据文件 比如这样,路径为/user/hadoop/input
package com.cfl.mapreduce.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* MapReduce操作HBase:读取HDFS文件存储到HBase中
*/
public class ImpHBaseFormHDFS extends Configured implements Tool {
/**
* LongWritable 文件中一行文本的偏移量
* Text 文件中一行文本内容
* ImmutableBytesWritable 对应行健
* Put 对应一条数据
*/
public static class HDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{
private ImmutableBytesWritable rowkey =
new ImmutableBytesWritable();
private byte[] info = Bytes.toBytes(
"info");
private byte[] name = Bytes.toBytes(
"name");
private byte[] num = Bytes.toBytes(
"num");
private byte[] fee = Bytes.toBytes(
"fee");
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] strings = value.toString().split(
"\\s+");
if (strings.length ==
3) {
rowkey.set(Bytes.toBytes(strings[
0]));
Put put =
new Put(Bytes.toBytes(strings[
0]));
put.addColumn(info, name, Bytes.toBytes(strings[
0]));
put.addColumn(info, num, Bytes.toBytes(strings[
1]));
put.addColumn(info, fee, Bytes.toBytes(strings[
2]));
context.write(rowkey, put);
}
}
}
@Override
public int run(String[] args)
throws Exception {
Configuration cfg =
new Configuration();
cfg.set(
"mapred.jar",
"E:\\code\\workspace_idea\\hadoopproject\\hadoop_mapreduce_demo\\target\\hadoop_mapreduce_demo-1.0-SNAPSHOT.jar");
Job job = Job.getInstance(cfg,
"导入课程到HBase中");
job.setJarByClass(ImpHBaseFormHDFS.class);
job.setMapperClass(HDFSMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
FileInputFormat.addInputPath(job,
new Path(args[
0]));
TableMapReduceUtil.initTableReducerJob(
args[
1],
null,
job);
job.setNumReduceTasks(
1);
return job.waitForCompletion(
true) ?
0 :
1;
}
public static void main(String[] args)
throws Exception {
int n = ToolRunner.run(
new ImpHBaseFormHDFS(), args);
System.out.println(n);
}
}
运行之前需要为项目加上参数
将hbase中的数据导入到mysql
package
com.cfl.mapreduce.hbase
import org
.apache.hadoop.conf.Configuration
import org
.apache.hadoop.conf.Configured
import org
.apache.hadoop.fs.FileSystem
import org
.apache.hadoop.fs.Path
import org
.apache.hadoop.hbase.CellUtil
import java
.sql.Connection
import org
.apache.hadoop.hbase.client.Result
import org
.apache.hadoop.hbase.client.Scan
import org
.apache.hadoop.hbase.filter.CompareFilter
import org
.apache.hadoop.hbase.filter.Filter
import org
.apache.hadoop.hbase.filter.RegexStringComparator
import org
.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org
.apache.hadoop.hbase.io.ImmutableBytesWritable
import org
.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
import org
.apache.hadoop.hbase.mapreduce.TableMapper
import org
.apache.hadoop.hbase.util.Bytes
import org
.apache.hadoop.io.NullWritable
import org
.apache.hadoop.io.Text
import org
.apache.hadoop.mapreduce.Job
import org
.apache.hadoop.mapreduce.Mapper
import org
.apache.hadoop.mapreduce.Reducer
import org
.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org
.apache.hadoop.util.Tool
import org
.apache.hadoop.util.ToolRunner
import java
.io.IOException
import java
.sql.DriverManager
import java
.sql.SQLException
import java
.sql.Statement
public class HBaseToMySql extends Configured implements Tool {
public static void addTmpJar(String jarPath, Configuration conf) throws IOException {
System
.setProperty(
"path.separator",
":")
FileSystem fs = FileSystem
.getLocal(conf)
String newJarPath = new Path(jarPath)
.makeQualified(fs)
.toString()
String tmpjars = conf
.get(
"tmpjars")
if (tmpjars == null || tmpjars
.length() ==
0) {
conf
.set(
"tmpjars", newJarPath)
} else {
conf
.set(
"tmpjars", tmpjars +
"," + newJarPath)
}
}
public static class ReadMap extends TableMapper<NullWritable, Text>{
private Text sql = new Text()
// 获取列的值
private String getValue(String qualifier, Result result){
return Bytes
.toString(result
.getValue(Bytes
.toBytes(
"info"), Bytes
.toBytes(qualifier)))
}
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
String name = getValue(
"name", value)
String numStr = getValue(
"num", value)
String pay = getValue(
"fee", value)
int num = Integer
.parseInt(numStr)
String str =
"insert into tb_course(name,num,pay) values('"+name+
"',"+num+
",'"+pay+
"')"
sql
.set(str)
context
.write(NullWritable
.get(), sql)
}
}
public static class WriteReduce extends Reducer<NullWritable, Text, NullWritable, NullWritable>{
private Connection conn = null
private Statement
st = null
// 连接mysql
@Override
protected void setup(Context context) throws IOException, InterruptedException {
try {
Class
.forName(
"com.mysql.jdbc.Driver")
conn = DriverManager
.getConnection(
"jdbc:mysql://192.168.19.95:3306/kgc",
"root",
"root")
st = conn
.createStatement()
} catch (SQLException e) {
throw new InterruptedException(e
.getMessage())
} catch (ClassNotFoundException e) {
throw new InterruptedException(e
.getMessage())
}
}
// 不做任何输出,插入数据
@Override
protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text v: values) {
try {
st.executeUpdate(v
.toString())
} catch (SQLException e) {
throw new InterruptedException(e
.getMessage())
}
}
}
// 关闭连接
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
try {
if (
st != null) {
st.close()
}
if (conn != null) {
conn
.close()
}
} catch (SQLException e) {
e
.printStackTrace()
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration cfg = getConf()
addTmpJar(args[
0], cfg)
cfg
.set(
"mapreduce.job.jar",
"E:\\code\\workspace_idea\\hadoopproject\\hadoop_mapreduce_demo\\target\\hadoop_mapreduce_demo-1.0-SNAPSHOT.jar")
Job job = Job
.getInstance(cfg,
"从 HBase 将收费课程导入到MySQL DB")
job
.setJarByClass(HBaseToMySql
.class)
// 查询含有“K币”的课程
Scan scan = new Scan()
//Filter filter = new SingleColumnValueFilter(Bytes
.toBytes(
"info"), Bytes
.toBytes(
"fee"), CompareFilter
.CompareOp.EQUAL, new RegexStringComparator(
"K币"))
Filter filter = new SingleColumnValueFilter(Bytes
.toBytes(
"info"), Bytes
.toBytes(
"fee"), CompareFilter
.CompareOp.EQUAL, Bytes
.toBytes(
"免费"))
scan
.setFilter(filter)
TableMapReduceUtil
.initTableMapperJob(args[
1] ,scan, ReadMap
.class, NullWritable
.class, Text
.class, job)
job
.setReducerClass(WriteReduce
.class)
job
.setOutputKeyClass(NullWritable
.class)
job
.setOutputValueClass(NullWritable
.class)
FileOutputFormat
.setOutputPath(job, new Path(args[
2]))
return job
.waitForCompletion(true) ?
0 :
1
}
public static void main(String[] args) throws Exception {
System
.out.println(ToolRunner
.run(new HBaseToMySql(), args))
}
}
我们要把hbase中的数据导入到mysql,这个过程需要使用第三方的jar,上面笔者是单独用了一个方法 addTmpJar() 来添加第三方jar,因为如果直接使用windows的路径提交会报错,Linux下解析不了windows下的路径,如果你想添加多个第三方jar可以多调用几次addTmpJar()方法。除了这种方式,还可以使用如下方式来提交第三方jar,比如mysql的驱动jar 注意:使用-libjars提交第三方jar时,它不作为参数,只是hadoop会读取它
public int run(String[] args) throws Exception {
Configuration cfg = getConf()
cfg
.set(
"mapreduce.job.jar",
"E:\\code\\workspace_idea\\hadoopproject\\hadoop_mapreduce_demo\\target\\hadoop_mapreduce_demo-1.0-SNAPSHOT.jar")
Job job = Job
.getInstance(cfg,
"从 HBase 将收费课程导入到MySQL DB")
job
.setJarByClass(HBaseToMySql
.class)
// 查询含有“K币”的课程
Scan scan = new Scan()
//Filter filter = new SingleColumnValueFilter(Bytes
.toBytes(
"info"), Bytes
.toBytes(
"fee"), CompareFilter
.CompareOp.EQUAL, new RegexStringComparator(
"K币"))
Filter filter = new SingleColumnValueFilter(Bytes
.toBytes(
"info"), Bytes
.toBytes(
"fee"), CompareFilter
.CompareOp.EQUAL, Bytes
.toBytes(
"免费"))
scan
.setFilter(filter)
TableMapReduceUtil
.initTableMapperJob(args[
0] ,scan, ReadMap
.class, NullWritable
.class, Text
.class, job)
job
.setReducerClass(WriteReduce
.class)
job
.setOutputKeyClass(NullWritable
.class)
job
.setOutputValueClass(NullWritable
.class)
FileOutputFormat
.setOutputPath(job, new Path(args[
1]))
return job
.waitForCompletion(true) ?
0 :
1