使用MapReduce对Hadoop下的日志记录进行分析处理

xiaoxiao2025-07-10  14

一. 简介

MapReduce是一个高性能的批处理分布式计算框架,用于对海量数据进行并行分析和处理。与传统方法相比较,MapReduce更倾向于蛮力去解决问题,通过简单、粗暴、有效的方式去处理海量的数据。通过对数据的输入、拆分与组合(核心),将任务分配到多个节点服务器上,进行分布式计算,这样可以有效地提高数据管理的安全性,同时也能够很好地范围被管理的数据。 mapreduce核心就是map+shuffle+reducer,首先通过读取文件,进行分片,通过map获取文件的key-value映射关系,用作reducer的输入,在作为reducer输入之前,要先对map的key进行一个shuffle,也就是排个序,然后将排完序的key-value作为reducer的输入进行reduce操作,当然一个mapreduce任务可以不要有reduce,只用一个map 其实现在MapReduce已经被Spark取代了,不过作为对大数据的学习,还是要稍微了解一下,下面是我学习过程中看过和写过的例子。

二. Hadoop自带的WordCount

2.1. 创建一个Maven项目,目录结构如下:

2.2. pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>sun</groupId> <artifactId>hadoop-MapReduce</artifactId> <version>1.0-SNAPSHOT</version> <properties> <hadoopVersion>2.6.0</hadoopVersion> </properties> <dependencies> <!-- Hadoop start --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoopVersion}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoopVersion}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoopVersion}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoopVersion}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-it</artifactId> <version>1.1.2</version> </dependency> <!-- Hadoop --> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>org.jetbrains</groupId> <artifactId>annotations-java5</artifactId> <version>RELEASE</version> <scope>compile</scope> </dependency> </dependencies> </project>

2.3. WordCount.java

import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; i++) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } this.result.set(sum); context.write(key, this.result); } } public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private static final IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { this.word.set(itr.nextToken()); context.write(this.word, one); } } } }

三. 我的例子:

3.1. 将WordCount的结果上传到HBase:

WordCountUpLoadToHBase.java:

import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; public class WordCountUpLoadToHBase extends Configured { public static class WCHBaseMapper extends Mapper<Object, Text, ImmutableBytesWritable, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ StringTokenizer strs = new StringTokenizer(value.toString()); while(strs.hasMoreTokens()){ word.set(strs.nextToken()); context.write(new ImmutableBytesWritable(Bytes.toBytes(word.toString())), one); } } } public static class WCHBaseReducer extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable>{ public void reduce(ImmutableBytesWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int sum = 0; for(IntWritable val:values){ sum += val.get(); } Put put = new Put(key.get()); put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(sum+"")); context.write(key, put); } } @SuppressWarnings("all") public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub String tableName = "wordcount"; Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","hadoop"); conf.set("hbase.zookeeper.property.clientPort","2181"); HBaseAdmin admin = new HBaseAdmin(conf); //如果表格存在就删除 if(admin.tableExists(tableName)){ admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor columnDescriptor =new HColumnDescriptor("content"); tableDescriptor.addFamily(columnDescriptor); admin.createTable(tableDescriptor); Job job = new Job(conf,"upload to hbase"); job.setJarByClass(WordCountUpLoadToHBase.class); job.setMapperClass(WCHBaseMapper.class); TableMapReduceUtil.initTableReducerJob(tableName, WCHBaseReducer.class, job,null,null,null,null,false); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); FileInputFormat.addInputPaths(job, "hdfs://hadoop:9000/agentlog/*"); System.exit(job.waitForCompletion(true)?0:1); } }

3.2. 从HBase读取数据

MRReadFromHbase.java: import java.io.IOException; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MRReadFromHbase extends Configured { public static class WCHBaseMapper extends TableMapper<Text, Text>{ @Override public void map(ImmutableBytesWritable key,Result values,Context context) throws IOException, InterruptedException{ StringBuffer sb =new StringBuffer(""); for(Map.Entry<byte[], byte[]> value:values.getFamilyMap("content".getBytes()).entrySet()){ String str =new String(value.getValue()); if(str!=null){ sb.append(str); } context.write(new Text(key.get()), new Text(sb.toString())); } } } public static class WCHBaseReducer extends Reducer<Text, Text, Text, Text>{ private Text result =new Text(); public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ for(Text val:values){ result.set(val); context.write(key,result); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub String tableName = "wordcount"; Configuration conf =HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "hadoop"); conf.set("hbase.zookeeper.property.clientPort", "2181"); Job job =new Job(conf,"read from hbase to hdfs"); job.setJarByClass(MRReadFromHbase.class); job.setReducerClass(WCHBaseReducer.class); TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), WCHBaseMapper.class, Text.class, Text.class, job); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop1:9000/user/sun/hbase")); System.exit(job.waitForCompletion(true)?0:1); } }

3.3. 我的自定义格式的日志数据处理:

日志以[format:1][user:AAA][interface:/bt/btCourse/get][date:2018/10/10]的格式存储,针对[format:1]开头的数据,根据不同用户user进行排序统计。

UserLog.java:

import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.*; public class UserLog { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(UserLog.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); for (int i = 0; i < otherArgs.length - 1; i++) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)])); System.exit(job.waitForCompletion(true) ? 0 : 1); } // map将输入中的value复制到输出数据的key上,并直接输出 public static class TokenizerMapper extends Mapper<Object, Text, Text, Text> { private static Text line = new Text();// 每行数据 // 实现map函数 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { if (value.toString().startsWith("[format:1]")) { context.write(new Text(getParameter(value, "user") + "|" + getParameter(value, "time") + "|" + getParameter(value, "html")), new Text("")); } } } /* // reduce将输入中的key复制到输出数据的key上,并直接输出 public static class IntSumReducer extends Reducer<Text, Text, Text, Text> { // 实现reduce函数 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text val : values) { context.write(key, val); } } }*/ public static class IntSumReducer extends Reducer<Text, Text, Text, Text> { //定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排 private TreeMap<String, String> treeMap = new TreeMap<String, String>(new Comparator<String>() { //@Override public int compare(String x, String y) { return x.compareTo(y); } }); //定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排 private TreeMap<String, Long> treeMapResult = new TreeMap<String, Long>(new Comparator<String>() { //@Override public int compare(String x, String y) { return x.compareTo(y); } }); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //reduce后的结果放入treeMap,而不是向context中记入结果 treeMap.put(key.toString(), key.toString()); } protected void cleanup(Context context) throws IOException, InterruptedException { if (StringUtils.isBlank(context.getCurrentValue().toString())) { Iterator it = treeMap.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = (Map.Entry) it.next(); context.write(new Text(entry.getKey().toString()), new Text("0")); } }else{ String key = ""; String value = ""; //将treeMap中的结果,按value-key顺序写入contex中 Iterator it = treeMap.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = (Map.Entry) it.next(); String[] sp = entry.getKey().toString().split("\\|"); if (key.equals(sp[0])){ long time = getSecondDiff(value, sp[1]); if (!treeMapResult.containsKey(sp[0] + "|" + sp[2])) { treeMapResult.put(sp[0] + "|" + sp[2], time); } else { treeMapResult.put(sp[0] + "|" + sp[2], Long.parseLong(treeMapResult.get(sp[0] + "|" + sp[2]).toString()) + time); } }else{ treeMapResult.put(sp[0] + "|" + sp[2], Long.parseLong("0")); } key = sp[0]; value = sp[1]; } // 输出 Iterator iter = treeMapResult.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); context.write(new Text(entry.getKey().toString()), new Text(entry.getValue().toString())); } } } } private static String getParameter(Text value, String param){ try { return value.toString().substring(value.toString().indexOf(param) + param.length() + 1, value.toString().indexOf("]", value.toString().indexOf(param) + param.length() + 1)); }catch(Exception e){ return ""; } } private static long getSecondDiff(String s1, String s2){ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date d1 = null; Date d2 = null; try { d1 = format.parse(s1); d2 = format.parse(s2); //毫秒ms long diff = d2.getTime() - d1.getTime(); long diffSeconds = diff / 1000; return diffSeconds; }catch(Exception e){ return 0; } } }

四. 结果验证:

4.1. 将项目打成Jar包后,放到CentOS上的/home/hadoop/Downloads目录下。

4.2. 执行:

hadoop jar /home/hadoop/Downloads/hadoop-MapReduce-1.0-SNAPSHOT.jar org.apache.hadoop.examples.wordcount /agentlog/ /user/sun/MapReduce/wordCountX

4.3. 查看结果:

hadoop fs -cat /user/sun/MapReduce/wordCountX/part-r-00000

 

 

转载请注明原文地址: https://www.6miu.com/read-5032845.html

最新回复(0)