MapReduce之数据库操作

xiaoxiao2021-02-28  75

package com.uplooking.bigdata.mr.format.in; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; /**  * 将数据库表中的数据迁移到hdfs上面,这里以mysql为例操作  * 迁移bigdata_db中的表person,将其迁移到hdfs的目录/input/mr/db  */ public class DbInputFormatApp {     public static void main(String[] args) throws Exception {         if(args == null || args.length < 2) {             System.err.println("Parameter Errors! Usage: <tblName outputpath>");             System.exit(-1);         }         String tblName = args[0];         Path outputPath = new Path(args[1]);         Configuration conf = new Configuration();         //设置mr要连接的数据库         /**          * Sets the DB access related fields in the {@link Configuration}.          * conf             the configuration          * driverClass      JDBC Driver class name          * dbUrl            JDBC DB access URL.          * userName         DB access username          * passwd           DB access passwd          */         DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://master:3306/bigdata_db", "root", "root");         Job job = Job.getInstance(conf, DbInputFormatApp.class.getSimpleName());         job.setJarByClass(DbInputFormatApp.class);         //设置map         job.setInputFormatClass(DBInputFormat.class);         /**          * Initializes the map-part of the job with the appropriate input settings.          *          * job              The map-reduce job          * inputClass       the class object implementing DBWritable, which is the          *                  Java object holding tuple fields.          * tableName        The table to read data from          * conditions       The condition which to select data with, eg. '(updated > 20070101 AND length > 0)'          * orderBy          the fieldNames in the orderBy clause.          * fieldNames       The field names in the table          */         String fieldNames[] = {"pid", "pname", "page", "pgender"};         DBInputFormat.setInput(job, PersonDBWritable.class, tblName, null, null, fieldNames);         job.setMapperClass(DbInputMapper.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(NullWritable.class);         //不需要设置reducer         outputPath.getFileSystem(conf).delete(outputPath, true);         FileOutputFormat.setOutputPath(job, outputPath);         job.setOutputFormatClass(TextOutputFormat.class);         job.setNumReduceTasks(0);         job.waitForCompletion(true);     }     static class DbInputMapper extends Mapper<LongWritable, PersonDBWritable, Text, NullWritable> {         @Override         protected void map(LongWritable k1, PersonDBWritable v1, Context context) throws IOException, InterruptedException {             context.write(new Text(v1.toString()), NullWritable.get());         }     }     /*      * 还需要reducer吗?      * 我们这里只是对数据进行原样输出,所以不需要进行reducer操作      */ } class PersonDBWritable implements Writable, DBWritable {     private int pid;     private String pname;     private int page;     private String pgender;     public PersonDBWritable(int pid, String pname, int page, String pgender) {         this.pid = pid;         this.pname = pname;         this.page = page;         this.pgender = pgender;     }     public PersonDBWritable() {     }     public void write(DataOutput out) throws IOException {         out.writeInt(this.pid);         out.writeUTF(this.pname);         out.writeInt(this.page);         out.writeUTF(this.pgender);     }     public void readFields(DataInput in) throws IOException {         this.pid = in.readInt();         this.pname = in.readUTF();         this.page = in.readInt();         this.pgender = in.readUTF();     }     public void write(PreparedStatement ps) throws SQLException {         ps.setInt(1, this.pid);         ps.setString(2, this.pname);         ps.setInt(3, this.page);         ps.setString(4, this.pgender);     }     public void readFields(ResultSet rs) throws SQLException {         this.pid = rs.getInt("pid");         this.pname = rs.getString("pname");         this.page = rs.getInt("page");         this.pgender = rs.getString("pgender");     }     @Override     public String toString() {         return pid + "\t" + pname + '\t' + page + "\t" + pgender;     }     public int getPid() {         return pid;     }     public void setPid(int pid) {         this.pid = pid;     }     public String getPname() {         return pname;     }     public void setPname(String pname) {         this.pname = pname;     }     public int getPage() {         return page;     }     public void setPage(int page) {         this.page = page;     }     public String getPgender() {         return pgender;     }     public void setPgender(String pgender) {         this.pgender = pgender;     } }
转载请注明原文地址: https://www.6miu.com/read-69794.html

最新回复(0)