package com.uplooking.bigdata.mr.format.in; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; /** * 将数据库表中的数据迁移到hdfs上面,这里以mysql为例操作 * 迁移bigdata_db中的表person,将其迁移到hdfs的目录/input/mr/db */ public class DbInputFormatApp { public static void main(String[] args) throws Exception { if(args == null || args.length < 2) { System.err.println("Parameter Errors! Usage: <tblName outputpath>"); System.exit(-1); } String tblName = args[0]; Path outputPath = new Path(args[1]); Configuration conf = new Configuration(); //设置mr要连接的数据库 /** * Sets the DB access related fields in the {@link Configuration}. * conf the configuration * driverClass JDBC Driver class name * dbUrl JDBC DB access URL. * userName DB access username * passwd DB access passwd */ DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://master:3306/bigdata_db", "root", "root"); Job job = Job.getInstance(conf, DbInputFormatApp.class.getSimpleName()); job.setJarByClass(DbInputFormatApp.class); //设置map job.setInputFormatClass(DBInputFormat.class); /** * Initializes the map-part of the job with the appropriate input settings. * * job The map-reduce job * inputClass the class object implementing DBWritable, which is the * Java object holding tuple fields. * tableName The table to read data from * conditions The condition which to select data with, eg. '(updated > 20070101 AND length > 0)' * orderBy the fieldNames in the orderBy clause. * fieldNames The field names in the table */ String fieldNames[] = {"pid", "pname", "page", "pgender"}; DBInputFormat.setInput(job, PersonDBWritable.class, tblName, null, null, fieldNames); job.setMapperClass(DbInputMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //不需要设置reducer outputPath.getFileSystem(conf).delete(outputPath, true); FileOutputFormat.setOutputPath(job, outputPath); job.setOutputFormatClass(TextOutputFormat.class); job.setNumReduceTasks(0); job.waitForCompletion(true); } static class DbInputMapper extends Mapper<LongWritable, PersonDBWritable, Text, NullWritable> { @Override protected void map(LongWritable k1, PersonDBWritable v1, Context context) throws IOException, InterruptedException { context.write(new Text(v1.toString()), NullWritable.get()); } } /* * 还需要reducer吗? * 我们这里只是对数据进行原样输出,所以不需要进行reducer操作 */ } class PersonDBWritable implements Writable, DBWritable { private int pid; private String pname; private int page; private String pgender; public PersonDBWritable(int pid, String pname, int page, String pgender) { this.pid = pid; this.pname = pname; this.page = page; this.pgender = pgender; } public PersonDBWritable() { } public void write(DataOutput out) throws IOException { out.writeInt(this.pid); out.writeUTF(this.pname); out.writeInt(this.page); out.writeUTF(this.pgender); } public void readFields(DataInput in) throws IOException { this.pid = in.readInt(); this.pname = in.readUTF(); this.page = in.readInt(); this.pgender = in.readUTF(); } public void write(PreparedStatement ps) throws SQLException { ps.setInt(1, this.pid); ps.setString(2, this.pname); ps.setInt(3, this.page); ps.setString(4, this.pgender); } public void readFields(ResultSet rs) throws SQLException { this.pid = rs.getInt("pid"); this.pname = rs.getString("pname"); this.page = rs.getInt("page"); this.pgender = rs.getString("pgender"); } @Override public String toString() { return pid + "\t" + pname + '\t' + page + "\t" + pgender; } public int getPid() { return pid; } public void setPid(int pid) { this.pid = pid; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public int getPage() { return page; } public void setPage(int page) { this.page = page; } public String getPgender() { return pgender; } public void setPgender(String pgender) { this.pgender = pgender; } }
转载请注明原文地址: https://www.6miu.com/read-69794.html