hive中的TextFile转为SequenceFile

xiaoxiao2021-02-28  140

由于业务需要,把SequenceFile文件导入hive,但是之前的SequenceFile文件是flume传来的。

所以要hadoop的mr任务把TextFile类型转SequenceFile,再导入hive。

代码如下:

import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** * 将文本文件转化为序列化文件 * @author gongmf * email: 1376818286@qq.com * */ public class TextToSequencefile { public static class ReaderMapper extends Mapper<Writable, Text , Writable, Text> { // private final static IntWritable one = new IntWritable(1); // private Text word = new Text(); protected void map(Writable key, Text value, Context context) throws IOException, InterruptedException { // StringTokenizer tokenizer = new StringTokenizer(value.toString()); // while (tokenizer.hasMoreTokens()) { // word.set(tokenizer.nextToken()); // context.write(word, one); // } // if(value == null){ return; } String str=value.toString() ; // 此处是我的业务需要截取,可注释 if(str == null || str.length() < 14){ return; } str = str.substring( 14 , str.length() ) ; context.write(key, new Text(str)) ; } } public static class WriterReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); protected void reduce(Text key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; while (values.hasNext()) { sum += ((IntWritable) values.next()).get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // section 1 Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage : TextToSequencefile "); System.exit(2); } @SuppressWarnings("deprecation") Job job = new Job(conf, "TextToSequencefile"); job.setJarByClass(TextToSequencefile.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.NONE); //是否压缩 // section2 job.setMapOutputKeyClass(Writable.class); job.setMapOutputValueClass(Text.class); // job.setOutputKeyClass(LongWritable.class); // job.setOutputValueClass(Text.class); // section3 job.setMapperClass(ReaderMapper.class); // job.setCombinerClass(WriterReducer.class); // job.setReducerClass(WriterReducer.class); job.setNumReduceTasks(0); // section4 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); SequenceFileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // section5 System.exit(job.waitForCompletion(true) ? 0 : 1); } }

转载请注明原文地址: https://www.6miu.com/read-34852.html

最新回复(0)