Hadoop基础教程-第10章 HBase:Hadoop数据库(10.7 HBase 批量导入)

xiaoxiao2021-02-28  117

第10章 HBase:Hadoop数据库

10.7 HBase 批量导入


10.7.1 批量导入数据的方法

向HBase表中导入一条数据可以使用HBase Shell的put命令或HBase API的Put类,但是面对海量数据如何批量导入呢? 一般有三种方法:ImportTsv工具、编写MapReduce程序和Hive外表。本节重点介绍ImportTsv工具和编写MapReduce程序两种方法,Hive外表将在第11章介绍。

10.7.2 ImportTsv

(1)介绍

ImportTsv是HBase官方提供了基于mapreduce进行批量数据导入的工具,ImportTsv可以将HDFS上的TSV格式(默认制表符分隔\t,或者自定义分隔符的其他格式数据文件也可,逗号‘,’分隔的txt亲测可以)的数据文件通过命令简单方便地导入到HBase中,对于大数据量的导入非常实用。

(2)准备数据

生成数据

[root@node1 ~]# cd data [root@node1 data]# ls books.txt cite75_99.txt ncdc.txt rs.txt temperature.txt word1.txt [root@node1 data]# vi gen.sh [root@node1 data]# cat gen.sh #!/bin/sh for i in {1..100000};do echo -e $i'\t'$RANDOM'\t'$RANDOM'\t'$RANDOM done; [root@node1 data]# sh gen.sh > mydata.txt [root@node1 data]# tail -5 mydata.txt 99996 6512 21537 7475 99997 9544 22444 1030 99998 18157 11864 16616 99999 28231 10187 21657 100000 21188 27568 14994 [root@node1 data]#

将生成的数据上传到HDFS

[root@node1 data]# hdfs dfs -mkdir -p input [root@node1 data]# hdfs dfs -put mydata.txt input [root@node1 data]# hdfs dfs -ls input Found 1 items -rw-r--r-- 3 root supergroup 2287354 2017-08-05 05:30 input/mydata.txt [root@node1 data]#

(3)创建表

[root@node1 data]# hbase shell SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/hbase-1.2.6/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] HBase Shell; enter 'help<RETURN>' for list of supported commands. Type "exit<RETURN>" to leave the HBase Shell Version 1.2.6, rUnknown, Mon May 29 02:25:32 CDT 2017 hbase(main):001:0> create 'mydata','info' 0 row(s) in 1.8170 seconds => Hbase::Table - mydata hbase(main):002:0> quit [root@node1 data]#

(4)上传数据 执行导入命令: hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,info:data1,info:data2,info:data3 mydata input/mydata.txt

[root@node1 ~]# hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \ > -Dimporttsv.columns=HBASE_ROW_KEY,info:data1,info:data2,info:data3 mydata input/mydata.txt SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/hbase-1.2.6/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2017-08-05 05:37:23,936 INFO [main] zookeeper.RecoverableZooKeeper: Process identifier=hconnection-0x292b08d6 connecting to ZooKeeper ensemble=node1:2181,node2:2181,node3:2181 2017-08-05 05:37:23,970 INFO [main] zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 2017-08-05 05:37:23,970 INFO [main] zookeeper.ZooKeeper: Client environment:host.name=node1 2017-08-05 05:37:23,971 INFO [main] zookeeper.ZooKeeper: Client environment:java.version=1.8.0_112 2017-08-05 05:37:23,971 INFO [main] zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation 2017-08-05 05:37:23,971 INFO [main] zookeeper.ZooKeeper: Client environment:java.home=/opt/jdk1.8.0_112/jre 2017-08-05 05:37:23,971 INFO [main] zookeeper.ZooKeeper: Client environment:java.class.path=/opt/hbase-1.2.6/conf:/opt/jdk1.8.0_112/lib/tools.jar:/opt/hbase-1.2.6:/opt/hbase- 2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:java.library.path=/opt/hadoop-2.7.3/lib/native 2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp 2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:java.compiler=<NA> 2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:os.name=Linux 2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:os.arch=amd64 2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:os.version=3.10.0-514.el7.x86_64 2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:user.name=root 2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:user.home=/root 2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:user.dir=/root 2017-08-05 05:37:23,981 INFO [main] zookeeper.ZooKeeper: Initiating client connection, connectString=node1:2181,node2:2181,node3:2181 sessionTimeout=90000 watcher=hconnection-0x292b08d60x0, quorum=node1:2181,node2:2181,node3:2181, baseZNode=/hbase 2017-08-05 05:37:24,030 INFO [main-SendThread(node1:2181)] zookeeper.ClientCnxn: Opening socket connection to server node1/192.168.80.131:2181. Will not attempt to authenticate using SASL (unknown error) 2017-08-05 05:37:24,055 INFO [main-SendThread(node1:2181)] zookeeper.ClientCnxn: Socket connection established to node1/192.168.80.131:2181, initiating session 2017-08-05 05:37:24,103 INFO [main-SendThread(node1:2181)] zookeeper.ClientCnxn: Session establishment complete on server node1/192.168.80.131:2181, sessionid = 0x15dafce38380006, negotiated timeout = 40000 2017-08-05 05:37:26,136 INFO [main] Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum 2017-08-05 05:37:26,351 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x15dafce38380006 2017-08-05 05:37:26,372 INFO [main] zookeeper.ZooKeeper: Session: 0x15dafce38380006 closed 2017-08-05 05:37:26,372 INFO [main-EventThread] zookeeper.ClientCnxn: EventThread shut down 2017-08-05 05:37:26,977 INFO [main] Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum 2017-08-05 05:37:31,821 INFO [main] input.FileInputFormat: Total input paths to process : 1 2017-08-05 05:37:32,132 INFO [main] mapreduce.JobSubmitter: number of splits:1 2017-08-05 05:37:32,460 INFO [main] Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum 2017-08-05 05:37:33,154 INFO [main] mapreduce.JobSubmitter: Submitting tokens for job: job_1501893260326_0001 2017-08-05 05:37:34,742 INFO [main] impl.YarnClientImpl: Submitted application application_1501893260326_0001 2017-08-05 05:37:34,945 INFO [main] mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1501893260326_0001/ 2017-08-05 05:37:34,946 INFO [main] mapreduce.Job: Running job: job_1501893260326_0001 2017-08-05 05:37:56,389 INFO [main] mapreduce.Job: Job job_1501893260326_0001 running in uber mode : false 2017-08-05 05:37:56,390 INFO [main] mapreduce.Job: map 0% reduce 0% 2017-08-05 05:38:17,452 INFO [main] mapreduce.Job: map 15% reduce 0% 2017-08-05 05:38:20,567 INFO [main] mapreduce.Job: map 46% reduce 0% 2017-08-05 05:38:23,686 INFO [main] mapreduce.Job: map 71% reduce 0% 2017-08-05 05:38:26,787 INFO [main] mapreduce.Job: map 100% reduce 0% 2017-08-05 05:38:27,826 INFO [main] mapreduce.Job: Job job_1501893260326_0001 completed successfully 2017-08-05 05:38:28,034 INFO [main] mapreduce.Job: Counters: 31 File System Counters FILE: Number of bytes read=0 FILE: Number of bytes written=150773 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=2287457 HDFS: Number of bytes written=0 HDFS: Number of read operations=2 HDFS: Number of large read operations=0 HDFS: Number of write operations=0 Job Counters Launched map tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=27098 Total time spent by all reduces in occupied slots (ms)=0 Total time spent by all map tasks (ms)=27098 Total vcore-seconds taken by all map tasks=27098 Total megabyte-seconds taken by all map tasks=27748352 Map-Reduce Framework Map input records=100000 Map output records=100000 Input split bytes=103 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=946 CPU time spent (ms)=12890 Physical memory (bytes) snapshot=128389120 Virtual memory (bytes) snapshot=2108084224 Total committed heap usage (bytes)=30474240 ImportTsv Bad Lines=0 File Input Format Counters Bytes Read=2287354 File Output Format Counters Bytes Written=0 [root@node1 ~]#

(5)查看导入结果

[root@node2 ~]# hbase shell SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/hbase-1.2.6/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] HBase Shell; enter 'help<RETURN>' for list of supported commands. Type "exit<RETURN>" to leave the HBase Shell Version 1.2.6, rUnknown, Mon May 29 02:25:32 CDT 2017 hbase(main):001:0> count 'mydata' Current count: 1000, row: 10897 Current count: 2000, row: 11797 Current count: 3000, row: 12697 Current count: 4000, row: 13597 Current count: 5000, row: 14497 Current count: 6000, row: 15397 Current count: 7000, row: 16297 Current count: 8000, row: 17197 Current count: 9000, row: 18097 Current count: 10000, row: 18998 Current count: 11000, row: 19898 Current count: 12000, row: 20797 Current count: 13000, row: 21697 Current count: 14000, row: 22597 Current count: 15000, row: 23497 Current count: 16000, row: 24397 Current count: 17000, row: 25297 Current count: 18000, row: 26197 Current count: 19000, row: 27097 Current count: 20000, row: 27998 Current count: 21000, row: 28898 Current count: 22000, row: 29798 Current count: 23000, row: 30697 Current count: 24000, row: 31597 Current count: 25000, row: 32497 Current count: 26000, row: 33397 Current count: 27000, row: 34297 Current count: 28000, row: 35197 Current count: 29000, row: 36097 Current count: 30000, row: 36998 Current count: 31000, row: 37898 Current count: 32000, row: 38798 Current count: 33000, row: 39698 Current count: 34000, row: 40597 Current count: 35000, row: 41497 Current count: 36000, row: 42397 Current count: 37000, row: 43297 Current count: 38000, row: 44197 Current count: 39000, row: 45097 Current count: 40000, row: 45998 Current count: 41000, row: 46898 Current count: 42000, row: 47798 Current count: 43000, row: 48698 Current count: 44000, row: 49598 Current count: 45000, row: 50497 Current count: 46000, row: 51397 Current count: 47000, row: 52297 Current count: 48000, row: 53197 Current count: 49000, row: 54097 Current count: 50000, row: 54998 Current count: 51000, row: 55898 Current count: 52000, row: 56798 Current count: 53000, row: 57698 Current count: 54000, row: 58598 Current count: 55000, row: 59498 Current count: 56000, row: 60397 Current count: 57000, row: 61297 Current count: 58000, row: 62197 Current count: 59000, row: 63097 Current count: 60000, row: 63998 Current count: 61000, row: 64898 Current count: 62000, row: 65798 Current count: 63000, row: 66698 Current count: 64000, row: 67598 Current count: 65000, row: 68498 Current count: 66000, row: 69398 Current count: 67000, row: 70297 Current count: 68000, row: 71197 Current count: 69000, row: 72097 Current count: 70000, row: 72998 Current count: 71000, row: 73898 Current count: 72000, row: 74798 Current count: 73000, row: 75698 Current count: 74000, row: 76598 Current count: 75000, row: 77498 Current count: 76000, row: 78398 Current count: 77000, row: 79298 Current count: 78000, row: 80197 Current count: 79000, row: 81097 Current count: 80000, row: 81998 Current count: 81000, row: 82898 Current count: 82000, row: 83798 Current count: 83000, row: 84698 Current count: 84000, row: 85598 Current count: 85000, row: 86498 Current count: 86000, row: 87398 Current count: 87000, row: 88298 Current count: 88000, row: 89198 Current count: 89000, row: 90097 Current count: 90000, row: 90998 Current count: 91000, row: 91898 Current count: 92000, row: 92798 Current count: 93000, row: 93698 Current count: 94000, row: 94598 Current count: 95000, row: 95498 Current count: 96000, row: 96398 Current count: 97000, row: 97298 Current count: 98000, row: 98198 Current count: 99000, row: 99098 Current count: 100000, row: 99999 100000 row(s) in 14.6730 seconds => 100000 hbase(main):002:0> get 'mydata','1' COLUMN CELL info:data1 timestamp=1501925842893, value=14506 info:data2 timestamp=1501925842893, value=10037 info:data3 timestamp=1501925842893, value=10938 3 row(s) in 0.1180 seconds hbase(main):003:0>

10.7.3 MapReduce

package hbaseDemo.dao; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class BatchImport { public static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text> { protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { // super.setup( context ); //System.out.println(key + ":" + value); context.write(key, value); }; } static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable> { protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text text : values) { final String[] splited = text.toString().split("\t"); final Put put = new Put(Bytes.toBytes(splited[0]));// 第一列行键 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("data1"), Bytes.toBytes(splited[1])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("data2"), Bytes.toBytes(splited[2])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("data3"), Bytes.toBytes(splited[3])); context.write(NullWritable.get(), put); } }; } /** * 之前一直报错,failed on connection exception 拒绝连接:nb0:8020 * 因为namenode节点不在192.168.1.160上,而在192.168.1.161和192.168.1.162 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final Configuration conf = new Configuration(); conf.set("hbase.rootdir", "hdfs://cetc32/hbase"); // 设置Zookeeper,直接设置IP地址 conf.set("hbase.zookeeper.quorum", "192.168.1.160,192.168.1.161,192.168.1.162"); // 设置hbase表名称(先在shell下创建一个表:create 'mydata','info') conf.set(TableOutputFormat.OUTPUT_TABLE, "mydata"); // 将该值改大,防止hbase超时退出 conf.set("dfs.socket.timeout", "180000"); //System.setProperty("HADOOP_USER_NAME", "root"); // 设置fs.defaultFS conf.set("fs.defaultFS", "hdfs://192.168.1.161:8020"); // 设置yarn.resourcemanager节点 conf.set("yarn.resourcemanager.hostname", "nb1"); Job job = Job.getInstance(conf); job.setJobName("HBaseBatchImport"); job.setMapperClass(BatchImportMapper.class); job.setReducerClass(BatchImportReducer.class); // 设置map的输出,不设置reduce的输出类型 job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); // 不再设置输出路径,而是设置输出格式类型 job.setOutputFormatClass(TableOutputFormat.class); FileInputFormat.setInputPaths(job, "hdfs://192.168.1.161:8020/user/root/input/mydata.txt"); boolean flag=job.waitForCompletion(true); System.out.println(flag); } }
转载请注明原文地址: https://www.6miu.com/read-31124.html

最新回复(0)