大数据IMF传奇行动绝密课程第104-114课:Spark Streaming电商广告点击综合案例

xiaoxiao2021-02-28  253

Spark Streaming电商广告点击综合案例

需求分析和技术架构

广告点击系统实时分析 广告来自于广告或者移动App等,广告需要设定在具体的广告位,当用户点击广告的时候,一般都会通过ajax或Socket往后台发送日志数据,在这里我们是要做基于SparkStreaming做实时在线统计。那么数据就需要放进消息系统(Kafka)中,我们的Spark Streaming应用程序就会去Kafka中Pull数据过来进行计算和消费,并把计算后的数据放入到持久化系统中(MySQL) 广告点击系统实时分析的意义:因为可以在线实时的看见广告的投放效果,就为广告的更大规模的投入和调整打下了坚实的基础,从而为公司带来最大化的经济回报。 核心需求: 1、实时黑名单动态过滤出有效的用户广告点击行为:因为黑名单用户可能随时出现,所以需要动态更新; 2、在线计算广告点击流量; 3、Top3热门广告; 4、每个广告流量趋势; 5、广告点击用户的区域分布分析 6、最近一分钟的广告点击量; 7、整个广告点击Spark Streaming处理程序7*24小时运行;

数据格式: 时间、用户、广告、城市等

技术细节: 在线计算用户点击的次数分析,屏蔽IP等; 使用updateStateByKey或者mapWithState进行不同地区广告点击排名的计算; Spark Streaming+Spark SQL+Spark Core等综合分析数据; 使用Window类型的操作; 高可用和性能调优等等; 流量趋势,一般会结合DB等; Spark Core

/** * */ package com.tom.spark.SparkApps.sparkstreaming; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 数据生成代码,Kafka Producer产生数据 */ public class MockAdClickedStat { /** * @param args */ public static void main(String[] args) { final Random random = new Random(); final String[] provinces = new String[]{"Guangdong", "Zhejiang", "Jiangsu", "Fujian"}; final Map<String, String[]> cities = new HashMap<String, String[]>(); cities.put("Guangdong", new String[]{"Guangzhou", "Shenzhen", "Dongguan"}); cities.put("Zhejiang", new String[]{"Hangzhou", "Wenzhou", "Ningbo"}); cities.put("Jiangsu", new String[]{"Nanjing", "Suzhou", "Wuxi"}); cities.put("Fujian", new String[]{"Fuzhou", "Xiamen", "Sanming"}); final String[] ips = new String[] { "192.168.112.240", "192.168.112.239", "192.168.112.245", "192.168.112.246", "192.168.112.247", "192.168.112.248", "192.168.112.249", "192.168.112.250", "192.168.112.251", "192.168.112.252", "192.168.112.253", "192.168.112.254", }; /** * Kafka相关的基本配置信息 */ Properties kafkaConf = new Properties(); kafkaConf.put("serializer.class", "kafka.serializer.StringEncoder"); kafkaConf.put("metadeta.broker.list", "Master:9092,Worker1:9092,Worker2:9092"); ProducerConfig producerConfig = new ProducerConfig(kafkaConf); final Producer<Integer, String> producer = new Producer<Integer, String>(producerConfig); new Thread(new Runnable() { public void run() { while(true) { //在线处理广告点击流的基本数据格式:timestamp、ip、userID、adID、province、city Long timestamp = new Date().getTime(); String ip = ips[random.nextInt(12)]; //可以采用网络上免费提供的ip库 int userID = random.nextInt(10000); int adID = random.nextInt(100); String province = provinces[random.nextInt(4)]; String city = cities.get(province)[random.nextInt(3)]; String clickedAd = timestamp + "\t" + ip + "\t" + userID + "\t" + adID + "\t" + province + "\t" + city; producer.send(new KeyedMessage<Integer, String>("AdClicked", clickedAd)); try { Thread.sleep(50); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }).start(); } } package com.tom.spark.SparkApps.sparkstreaming; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; import org.apache.spark.streaming.kafka.KafkaUtils; import com.google.common.base.Optional; import scala.Tuple2; /** * 数据处理,Kafka消费者 */ public class AdClickedStreamingStats { /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub //好处:1、checkpoint 2、工厂 final SparkConf conf = new SparkConf().setAppName("SparkStreamingOnKafkaDirect").setMaster("hdfs://Master:7077/"); final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/CheckPoint_Data"; JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { public JavaStreamingContext create() { // TODO Auto-generated method stub return createContext(checkpointDirectory, conf); } }; /** * 可以从失败中恢复Driver,不过还需要指定Driver这个进程运行在Cluster,并且在提交应用程序的时候制定--supervise; */ JavaStreamingContext javassc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory); /** * 第三步:创建Spark Streaming输入数据来源input Stream: * 1、数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等 * 2、在这里我们指定数据来源于网络Socket端口,Spark Streaming连接上该端口并在运行的时候一直监听该端口的数据 * (当然该端口服务首先必须存在),并且在后续会根据业务需要不断有数据产生(当然对于Spark Streaming * 应用程序的运行而言,有无数据其处理流程都是一样的) * 3、如果经常在每间隔5秒钟没有数据的话不断启动空的Job其实会造成调度资源的浪费,因为并没有数据需要发生计算;所以 * 实际的企业级生成环境的代码在具体提交Job前会判断是否有数据,如果没有的话就不再提交Job; */ //创建Kafka元数据来让Spark Streaming这个Kafka Consumer利用 Map<String, String> kafkaParameters = new HashMap<String, String>(); kafkaParameters.put("metadata.broker.list", "Master:9092,Worker1:9092,Worker2:9092"); Set<String> topics = new HashSet<String>(); topics.add("SparkStreamingDirected"); JavaPairInputDStream<String, String> adClickedStreaming = KafkaUtils.createDirectStream(javassc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParameters, topics); /**因为要对黑名单进行过滤,而数据是在RDD中的,所以必然使用transform这个函数; * 但是在这里我们必须使用transformToPair,原因是读取进来的Kafka的数据是Pair<String,String>类型, * 另一个原因是过滤后的数据要进行进一步处理,所以必须是读进的Kafka数据的原始类型 * * 在此再次说明,每个Batch Duration中实际上讲输入的数据就是被一个且仅被一个RDD封装的,你可以有多个 * InputDStream,但其实在产生job的时候,这些不同的InputDStream在Batch Duration中就相当于Spark基于HDFS * 数据操作的不同文件来源而已罢了。 */ JavaPairDStream<String, String> filteredadClickedStreaming = adClickedStreaming.transformToPair(new Function<JavaPairRDD<String,String>, JavaPairRDD<String,String>>() { public JavaPairRDD<String, String> call( JavaPairRDD<String, String> rdd) throws Exception { /** * 在线黑名单过滤思路步骤: * 1、从数据库中获取黑名单转换成RDD,即新的RDD实例封装黑名单数据; * 2、然后把代表黑名单的RDD的实例和Batch Duration产生的RDD进行Join操作, * 准确的说是进行leftOuterJoin操作,也就是说使用Batch Duration产生的RDD和代表黑名单的RDD实例进行 * leftOuterJoin操作,如果两者都有内容的话,就会是true,否则的话就是false * * 我们要留下的是leftOuterJoin结果为false; * */ final List<String> blackListNames = new ArrayList<String>(); JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance(); jdbcWrapper.doQuery("SELECT * FROM blacklisttable", null, new ExecuteCallBack() { public void resultCallBack(ResultSet result) throws Exception { while(result.next()){ blackListNames.add(result.getString(1)); } } }); List<Tuple2<String, Boolean>> blackListTuple = new ArrayList<Tuple2<String,Boolean>>(); for(String name : blackListNames) { blackListTuple.add(new Tuple2<String, Boolean>(name, true)); } List<Tuple2<String, Boolean>> blacklistFromListDB = blackListTuple; //数据来自于查询的黑名单表并且映射成为<String, Boolean> JavaSparkContext jsc = new JavaSparkContext(rdd.context()); /** * 黑名单的表中只有userID,但是如果要进行join操作的话就必须是Key-Value,所以在这里我们需要 * 基于数据表中的数据产生Key-Value类型的数据集合 */ JavaPairRDD<String, Boolean> blackListRDD = jsc.parallelizePairs(blacklistFromListDB); /** * 进行操作的时候肯定是基于userID进行join,所以必须把传入的rdd进行mapToPair操作转化成为符合格式的RDD * */ JavaPairRDD<String, Tuple2<String, String>> rdd2Pair = rdd.mapToPair(new PairFunction<Tuple2<String,String>, String, Tuple2<String, String>>() { public Tuple2<String, Tuple2<String, String>> call( Tuple2<String, String> t) throws Exception { // TODO Auto-generated method stub String userID = t._2.split("\t")[2]; return new Tuple2<String, Tuple2<String,String>>(userID, t); } }); JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> joined = rdd2Pair.leftOuterJoin(blackListRDD); JavaPairRDD<String, String> result = joined.filter(new Function<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, Boolean>() { public Boolean call( Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> tuple) throws Exception { // TODO Auto-generated method stub Optional<Boolean> optional = tuple._2._2; if(optional.isPresent() && optional.get()){ return false; } else { return true; } } }).mapToPair(new PairFunction<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, String, String>() { public Tuple2<String, String> call( Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> t) throws Exception { // TODO Auto-generated method stub return t._2._1; } }); return result; } }); //广告点击的基本数据格式:timestamp、ip、userID、adID、province、city JavaPairDStream<String, Long> pairs = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() { public Tuple2<String, Long> call(Tuple2<String, String> t) throws Exception { String[] splited=t._2.split("\t"); String timestamp = splited[0]; //YYYY-MM-DD String ip = splited[1]; String userID = splited[2]; String adID = splited[3]; String province = splited[4]; String city = splited[5]; String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_" +province +"_"+city; return new Tuple2<String, Long>(clickedRecord, 1L); } }); /** * 第4.3步:在单词实例计数为1基础上,统计每个单词在文件中出现的总次数 */ JavaPairDStream<String, Long> adClickedUsers= pairs.reduceByKey(new Function2<Long, Long, Long>() { public Long call(Long i1, Long i2) throws Exception{ return i1 + i2; } }); /*判断有效的点击,复杂化的采用机器学习训练模型进行在线过滤 简单的根据ip判断1天不超过100次;也可以通过一个batch duration的点击次数 判断是否非法广告点击,通过一个batch来判断是不完整的,还需要一天的数据也可以每一个小时来判断。*/ JavaPairDStream<String, Long> filterClickedBatch = adClickedUsers.filter(new Function<Tuple2<String,Long>, Boolean>() { public Boolean call(Tuple2<String, Long> v1) throws Exception { if (1 < v1._2){ //更新一些黑名单的数据库表 return false; } else { return true; } } }); //filterClickedBatch.print(); //写入数据库 filterClickedBatch.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() { public Void call(JavaPairRDD<String, Long> rdd) throws Exception { rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() { public void call(Iterator<Tuple2<String, Long>> partition) throws Exception { //使用数据库连接池的高效读写数据库的方式将数据写入数据库mysql //例如一次插入 1000条 records,使用insertBatch 或 updateBatch //插入的用户数据信息:userID,adID,clickedCount,time //这里面有一个问题,可能出现两条记录的key是一样的,此时需要更新累加操作 List<UserAdClicked> userAdClickedList = new ArrayList<UserAdClicked>(); while(partition.hasNext()) { Tuple2<String, Long> record = partition.next(); String[] splited = record._1.split("\t"); UserAdClicked userClicked = new UserAdClicked(); userClicked.setTimestamp(splited[0]); userClicked.setIp(splited[1]); userClicked.setUserID(splited[2]); userClicked.setAdID(splited[3]); userClicked.setProvince(splited[4]); userClicked.setCity(splited[5]); userAdClickedList.add(userClicked); } final List<UserAdClicked> inserting = new ArrayList<UserAdClicked>(); final List<UserAdClicked> updating = new ArrayList<UserAdClicked>(); JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance(); //表的字段timestamp、ip、userID、adID、province、city、clickedCount for(final UserAdClicked clicked : userAdClickedList) { jdbcWrapper.doQuery("SELECT clickedCount FROM adclicked WHERE" + " timestamp =? AND userID = ? AND adID = ?", new Object[]{clicked.getTimestamp(), clicked.getUserID(), clicked.getAdID()}, new ExecuteCallBack() { public void resultCallBack(ResultSet result) throws Exception { // TODO Auto-generated method stub if(result.next()) { long count = result.getLong(1); clicked.setClickedCount(count); updating.add(clicked); } else { inserting.add(clicked); clicked.setClickedCount(1L); } } }); } //表的字段timestamp、ip、userID、adID、province、city、clickedCount List<Object[]> insertParametersList = new ArrayList<Object[]>(); for(UserAdClicked insertRecord : inserting) { insertParametersList.add(new Object[] { insertRecord.getTimestamp(), insertRecord.getIp(), insertRecord.getUserID(), insertRecord.getAdID(), insertRecord.getProvince(), insertRecord.getCity(), insertRecord.getClickedCount() }); } jdbcWrapper.doBatch("INSERT INTO adclicked VALUES(?, ?, ?, ?, ?, ?, ?)", insertParametersList); //表的字段timestamp、ip、userID、adID、province、city、clickedCount List<Object[]> updateParametersList = new ArrayList<Object[]>(); for(UserAdClicked updateRecord : updating) { updateParametersList.add(new Object[] { updateRecord.getTimestamp(), updateRecord.getIp(), updateRecord.getUserID(), updateRecord.getAdID(), updateRecord.getProvince(), updateRecord.getCity(), updateRecord.getClickedCount() + 1 }); } jdbcWrapper.doBatch("UPDATE adclicked SET clickedCount = ? WHERE" + " timestamp =? AND ip = ? AND userID = ? AND adID = ? " + "AND province = ? AND city = ?", updateParametersList); } }); return null; } }); //再次过滤,从数据库中读取数据过滤黑名单 JavaPairDStream<String, Long> blackListBasedOnHistory = filterClickedBatch.filter(new Function<Tuple2<String,Long>, Boolean>() { public Boolean call(Tuple2<String, Long> v1) throws Exception { //广告点击的基本数据格式:timestamp,ip,userID,adID,province,city String[] splited = v1._1.split("\t"); //提取key值 String date =splited[0]; String userID =splited[2]; String adID =splited[3]; //查询一下数据库同一个用户同一个广告id点击量超过50次列入黑名单 //接下来 根据date、userID、adID条件去查询用户点击广告的数据表,获得总的点击次数 //这个时候基于点击次数判断是否属于黑名单点击 int clickedCountTotalToday = 81 ; if (clickedCountTotalToday > 50) { return true; }else { return false ; } } }); //map操作,找出用户的id JavaDStream<String> blackListuserIDBasedInBatchOnhistroy =blackListBasedOnHistory.map(new Function<Tuple2<String,Long>, String>() { public String call(Tuple2<String, Long> v1) throws Exception { // TODO Auto-generated method stub return v1._1.split("\t")[2]; } }); //有一个问题,数据可能重复,在一个partition里面重复,这个好办; //但多个partition不能保证一个用户重复,需要对黑名单的整个rdd进行去重操作。 //rdd去重了,partition也就去重了,一石二鸟,一箭双雕 // 找出了黑名单,下一步就写入黑名单数据库表中 JavaDStream<String> blackListUniqueuserBasedInBatchOnhistroy = blackListuserIDBasedInBatchOnhistroy.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception { // TODO Auto-generated method stub return rdd.distinct(); } }); // 下一步写入到数据表中 blackListUniqueuserBasedInBatchOnhistroy.foreachRDD(new Function<JavaRDD<String>, Void>() { public Void call(JavaRDD<String> rdd) throws Exception { rdd.foreachPartition(new VoidFunction<Iterator<String>>() { public void call(Iterator<String> t) throws Exception { // TODO Auto-generated method stub //插入的用户信息可以只包含:useID //此时直接插入黑名单数据表即可。 //写入数据库 List<Object[]> blackList = new ArrayList<Object[]>(); while(t.hasNext()) { blackList.add(new Object[]{t.next()}); } JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance(); jdbcWrapper.doBatch("INSERT INTO blacklisttable values (?)", blackList); } }); return null; } }); /**广告点击累计动态更新,每个updateStateByKey都会在Batch Duration的时间间隔的基础上进行广告点击次数的更新, * 更新之后我们一般都会持久化到外部存储设备上,在这里我们存储到MySQL数据库中 */ JavaPairDStream<String, Long> updateStateByKeyDSteam = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() { public Tuple2<String, Long> call(Tuple2<String, String> t) throws Exception { String[] splited=t._2.split("\t"); String timestamp = splited[0]; //YYYY-MM-DD String ip = splited[1]; String userID = splited[2]; String adID = splited[3]; String province = splited[4]; String city = splited[5]; String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_" +province +"_"+city; return new Tuple2<String, Long>(clickedRecord, 1L); } }).updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() { public Optional<Long> call(List<Long> v1, Optional<Long> v2) throws Exception { // v1:当前的Key在当前的Batch Duration中出现的次数的集合,例如{1,1,1,。。。,1} // v2:当前的Key在以前的Batch Duration中积累下来的结果; Long clickedTotalHistory = 0L; if(v2.isPresent()){ clickedTotalHistory = v2.get(); } for(Long one : v1) { clickedTotalHistory += one; } return Optional.of(clickedTotalHistory); } }); updateStateByKeyDSteam.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() { public Void call(JavaPairRDD<String, Long> rdd) throws Exception { rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() { public void call(Iterator<Tuple2<String, Long>> partition) throws Exception { //使用数据库连接池的高效读写数据库的方式将数据写入数据库mysql //例如一次插入 1000条 records,使用insertBatch 或 updateBatch //插入的用户数据信息:timestamp、adID、province、city //这里面有一个问题,可能出现两条记录的key是一样的,此时需要更新累加操作 List<AdClicked> AdClickedList = new ArrayList<AdClicked>(); while(partition.hasNext()) { Tuple2<String, Long> record = partition.next(); String[] splited = record._1.split("\t"); AdClicked adClicked = new AdClicked(); adClicked.setTimestamp(splited[0]); adClicked.setAdID(splited[1]); adClicked.setProvince(splited[2]); adClicked.setCity(splited[3]); adClicked.setClickedCount(record._2); AdClickedList.add(adClicked); } final List<AdClicked> inserting = new ArrayList<AdClicked>(); final List<AdClicked> updating = new ArrayList<AdClicked>(); JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance(); //表的字段timestamp、ip、userID、adID、province、city、clickedCount for(final AdClicked clicked : AdClickedList) { jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedcount WHERE" + " timestamp = ? AND adID = ? AND province = ? AND city = ?", new Object[]{clicked.getTimestamp(), clicked.getAdID(), clicked.getProvince(), clicked.getCity()}, new ExecuteCallBack() { public void resultCallBack(ResultSet result) throws Exception { // TODO Auto-generated method stub if(result.next()) { long count = result.getLong(1); clicked.setClickedCount(count); updating.add(clicked); } else { inserting.add(clicked); clicked.setClickedCount(1L); } } }); } //表的字段timestamp、ip、userID、adID、province、city、clickedCount List<Object[]> insertParametersList = new ArrayList<Object[]>(); for(AdClicked insertRecord : inserting) { insertParametersList.add(new Object[] { insertRecord.getTimestamp(), insertRecord.getAdID(), insertRecord.getProvince(), insertRecord.getCity(), insertRecord.getClickedCount() }); } jdbcWrapper.doBatch("INSERT INTO adclickedcount VALUES(?, ?, ?, ?, ?)", insertParametersList); //表的字段timestamp、ip、userID、adID、province、city、clickedCount List<Object[]> updateParametersList = new ArrayList<Object[]>(); for(AdClicked updateRecord : updating) { updateParametersList.add(new Object[] { updateRecord.getClickedCount(), updateRecord.getTimestamp(), updateRecord.getAdID(), updateRecord.getProvince(), updateRecord.getCity() }); } jdbcWrapper.doBatch("UPDATE adclickedcount SET clickedCount = ? WHERE" + " timestamp =? AND adID = ? AND province = ? AND city = ?", updateParametersList); } }); return null; } }); /** * 对广告点击进行TopN计算,计算出每天每个省份Top5排名的广告 * 因为我们直接对RDD进行操作,所以使用了transfomr算子; */ updateStateByKeyDSteam.transform(new Function<JavaPairRDD<String,Long>, JavaRDD<Row>>() { public JavaRDD<Row> call(JavaPairRDD<String, Long> rdd) throws Exception { JavaRDD<Row> rowRDD = rdd.mapToPair(new PairFunction<Tuple2<String,Long>, String, Long>() { public Tuple2<String, Long> call(Tuple2<String, Long> t) throws Exception { // TODO Auto-generated method stub String[] splited=t._1.split("_"); String timestamp = splited[0]; //YYYY-MM-DD String adID = splited[3]; String province = splited[4]; String clickedRecord = timestamp + "_" + adID + "_" + province; return new Tuple2<String, Long>(clickedRecord, t._2); } }).reduceByKey(new Function2<Long, Long, Long>() { public Long call(Long v1, Long v2) throws Exception { // TODO Auto-generated method stub return v1 + v2; } }).map(new Function<Tuple2<String,Long>, Row>() { public Row call(Tuple2<String, Long> v1) throws Exception { // TODO Auto-generated method stub String[] splited=v1._1.split("_"); String timestamp = splited[0]; //YYYY-MM-DD String adID = splited[3]; String province = splited[4]; return RowFactory.create(timestamp, adID, province, v1._2); } }); StructType structType = DataTypes.createStructType(Arrays.asList( DataTypes.createStructField("timestamp", DataTypes.StringType, true), DataTypes.createStructField("adID", DataTypes.StringType, true), DataTypes.createStructField("province", DataTypes.StringType, true), DataTypes.createStructField("clickedCount", DataTypes.LongType, true) )); HiveContext hiveContext = new HiveContext(rdd.context()); DataFrame df = hiveContext.createDataFrame(rowRDD, structType); df.registerTempTable("topNTableSource"); DataFrame result = hiveContext.sql("SELECT timestamp, adID, province, clickedCount, FROM" + " (SELECT timestamp, adID, province,clickedCount, " + "ROW_NUMBER() OVER(PARTITION BY province ORDER BY clickeCount DESC) rank " + "FROM topNTableSource) subquery " + "WHERE rank <= 5"); return result.toJavaRDD(); } }).foreachRDD(new Function<JavaRDD<Row>, Void>() { public Void call(JavaRDD<Row> rdd) throws Exception { // TODO Auto-generated method stub rdd.foreachPartition(new VoidFunction<Iterator<Row>>() { public void call(Iterator<Row> t) throws Exception { // TODO Auto-generated method stub List<AdProvinceTopN> adProvinceTopN = new ArrayList<AdProvinceTopN>(); while(t.hasNext()) { Row row = t.next(); AdProvinceTopN item = new AdProvinceTopN(); item.setTimestamp(row.getString(0)); item.setAdID(row.getString(1)); item.setProvince(row.getString(2)); item.setClickedCount(row.getLong(3)); adProvinceTopN.add(item); } // final List<AdProvinceTopN> inserting = new ArrayList<AdProvinceTopN>(); // final List<AdProvinceTopN> updating = new ArrayList<AdProvinceTopN>(); JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance(); Set<String> set = new HashSet<String>(); for(AdProvinceTopN item: adProvinceTopN){ set.add(item.getTimestamp() + "_" + item.getProvince()); } //表的字段timestamp、adID、province、clickedCount ArrayList<Object[]> deleteParametersList = new ArrayList<Object[]>(); for(String deleteRecord : set) { String[] splited = deleteRecord.split("_"); deleteParametersList.add(new Object[]{ splited[0], splited[1] }); } jdbcWrapper.doBatch("DELETE FROM adprovincetopn WHERE timestamp = ? AND province = ?", deleteParametersList); //表的字段timestamp、ip、userID、adID、province、city、clickedCount List<Object[]> insertParametersList = new ArrayList<Object[]>(); for(AdProvinceTopN insertRecord : adProvinceTopN) { insertParametersList.add(new Object[] { insertRecord.getClickedCount(), insertRecord.getTimestamp(), insertRecord.getAdID(), insertRecord.getProvince() }); } jdbcWrapper.doBatch("INSERT INTO adprovincetopn VALUES (?, ?, ?, ?)", insertParametersList); } }); return null; } }); /** * 计算过去半个小时内广告点击的趋势 * 广告点击的基本数据格式:timestamp、ip、userID、adID、province、city */ filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() { public Tuple2<String, Long> call(Tuple2<String, String> t) throws Exception { String splited[] = t._2.split("\t"); String adID = splited[3]; String time = splited[0]; //Todo:后续需要重构代码实现时间戳和分钟的转换提取。此处需要提取出该广告的点击分钟单位 return new Tuple2<String, Long>(time + "_" + adID, 1L); } }).reduceByKeyAndWindow(new Function2<Long, Long, Long>() { public Long call(Long v1, Long v2) throws Exception { // TODO Auto-generated method stub return v1 + v2; } }, new Function2<Long, Long, Long>() { public Long call(Long v1, Long v2) throws Exception { // TODO Auto-generated method stub return v1 - v2; } }, Durations.minutes(30), Durations.milliseconds(5)).foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() { public Void call(JavaPairRDD<String, Long> rdd) throws Exception { // TODO Auto-generated method stub rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() { public void call(Iterator<Tuple2<String, Long>> partition) throws Exception { List<AdTrendStat> adTrend = new ArrayList<AdTrendStat>(); // TODO Auto-generated method stub while(partition.hasNext()) { Tuple2<String, Long> record = partition.next(); String[] splited = record._1.split("_"); String time = splited[0]; String adID = splited[1]; Long clickedCount = record._2; /** * 在插入数据到数据库的时候具体需要哪些字段?time、adID、clickedCount; * 而我们通过J2EE技术进行趋势绘图的时候肯定是需要年、月、日、时、分这个维度的,所以我们在这里需要 * 年月日、小时、分钟这些时间维度; */ AdTrendStat adTrendStat = new AdTrendStat(); adTrendStat.setAdID(adID); adTrendStat.setClickedCount(clickedCount); adTrendStat.set_date(time); //Todo:获取年月日 adTrendStat.set_hour(time); //Todo:获取小时 adTrendStat.set_minute(time);//Todo:获取分钟 adTrend.add(adTrendStat); } final List<AdTrendStat> inserting = new ArrayList<AdTrendStat>(); final List<AdTrendStat> updating = new ArrayList<AdTrendStat>(); JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance(); //表的字段timestamp、ip、userID、adID、province、city、clickedCount for(final AdTrendStat trend : adTrend) { final AdTrendCountHistory adTrendhistory = new AdTrendCountHistory(); jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedtrend WHERE" + " date =? AND hour = ? AND minute = ? AND AdID = ?", new Object[]{trend.get_date(), trend.get_hour(), trend.get_minute(), trend.getAdID()}, new ExecuteCallBack() { public void resultCallBack(ResultSet result) throws Exception { // TODO Auto-generated method stub if(result.next()) { long count = result.getLong(1); adTrendhistory.setClickedCountHistoryLong(count); updating.add(trend); } else { inserting.add(trend); } } }); } //表的字段date、hour、minute、adID、clickedCount List<Object[]> insertParametersList = new ArrayList<Object[]>(); for(AdTrendStat insertRecord : inserting) { insertParametersList.add(new Object[] { insertRecord.get_date(), insertRecord.get_hour(), insertRecord.get_minute(), insertRecord.getAdID(), insertRecord.getClickedCount() }); } jdbcWrapper.doBatch("INSERT INTO adclickedtrend VALUES(?, ?, ?, ?, ?)", insertParametersList); //表的字段date、hour、minute、adID、clickedCount List<Object[]> updateParametersList = new ArrayList<Object[]>(); for(AdTrendStat updateRecord : updating) { updateParametersList.add(new Object[] { updateRecord.getClickedCount(), updateRecord.get_date(), updateRecord.get_hour(), updateRecord.get_minute(), updateRecord.getAdID() }); } jdbcWrapper.doBatch("UPDATE adclickedtrend SET clickedCount = ? WHERE" + " date =? AND hour = ? AND minute = ? AND AdID = ?" , updateParametersList); } }); return null; } });; /** * Spark Streaming 执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的,当然其内部有消息循环体,用于 * 接收应用程序本身或者Executor中的消息, */ javassc.start(); javassc.awaitTermination(); javassc.close(); } private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf) { // If you do not see this printed, that means the StreamingContext has been loaded // from the new checkpoint System.out.println("Creating new context"); // Create the context with a 5 second batch size JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(10)); ssc.checkpoint(checkpointDirectory); return ssc; } } class JDBCWrapper { private static JDBCWrapper jdbcInstance = null; private static LinkedBlockingQueue<Connection> dbConnectionPool = new LinkedBlockingQueue<Connection>(); static { try { Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static JDBCWrapper getJDBCInstance() { if(jdbcInstance == null) { synchronized (JDBCWrapper.class) { if(jdbcInstance == null) { jdbcInstance = new JDBCWrapper(); } } } return jdbcInstance; } private JDBCWrapper() { for(int i = 0; i < 10; i++){ try { Connection conn = DriverManager.getConnection("jdbc:mysql://Master:3306/sparkstreaming","root", "root"); dbConnectionPool.put(conn); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public synchronized Connection getConnection() { while(0 == dbConnectionPool.size()){ try { Thread.sleep(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return dbConnectionPool.poll(); } public int[] doBatch(String sqlText, List<Object[]> paramsList){ Connection conn = getConnection(); PreparedStatement preparedStatement = null; int[] result = null; try { conn.setAutoCommit(false); preparedStatement = conn.prepareStatement(sqlText); for(Object[] parameters: paramsList) { for(int i = 0; i < parameters.length; i++){ preparedStatement.setObject(i + 1, parameters[i]); } preparedStatement.addBatch(); } result = preparedStatement.executeBatch(); conn.commit(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if(preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(conn != null) { try { dbConnectionPool.put(conn); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } return result; } public void doQuery(String sqlText, Object[] paramsList, ExecuteCallBack callback){ Connection conn = getConnection(); PreparedStatement preparedStatement = null; ResultSet result = null; try { preparedStatement = conn.prepareStatement(sqlText); for(int i = 0; i < paramsList.length; i++){ preparedStatement.setObject(i + 1, paramsList[i]); } result = preparedStatement.executeQuery(); try { callback.resultCallBack(result); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if(preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(conn != null) { try { dbConnectionPool.put(conn); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } interface ExecuteCallBack { void resultCallBack(ResultSet result) throws Exception; } class UserAdClicked { private String timestamp; private String ip; private String userID; private String adID; private String province; private String city; private Long clickedCount; public String getTimestamp() { return timestamp; } public void setTimestamp(String timestamp) { this.timestamp = timestamp; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public String getUserID() { return userID; } public void setUserID(String userID) { this.userID = userID; } public String getAdID() { return adID; } public void setAdID(String adID) { this.adID = adID; } public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public Long getClickedCount() { return clickedCount; } public void setClickedCount(Long clickedCount) { this.clickedCount = clickedCount; } } class AdClicked { private String timestamp; private String adID; private String province; private String city; private Long clickedCount; public String getTimestamp() { return timestamp; } public void setTimestamp(String timestamp) { this.timestamp = timestamp; } public String getAdID() { return adID; } public void setAdID(String adID) { this.adID = adID; } public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public Long getClickedCount() { return clickedCount; } public void setClickedCount(Long clickedCount) { this.clickedCount = clickedCount; } } class AdProvinceTopN { private String timestamp; private String adID; private String province; private Long clickedCount; public String getTimestamp() { return timestamp; } public void setTimestamp(String timestamp) { this.timestamp = timestamp; } public String getAdID() { return adID; } public void setAdID(String adID) { this.adID = adID; } public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } public Long getClickedCount() { return clickedCount; } public void setClickedCount(Long clickedCount) { this.clickedCount = clickedCount; } } class AdTrendStat { private String _date; private String _hour; private String _minute; private String adID; private Long clickedCount; public String get_date() { return _date; } public void set_date(String _date) { this._date = _date; } public String get_hour() { return _hour; } public void set_hour(String _hour) { this._hour = _hour; } public String get_minute() { return _minute; } public void set_minute(String _minute) { this._minute = _minute; } public String getAdID() { return adID; } public void setAdID(String adID) { this.adID = adID; } public Long getClickedCount() { return clickedCount; } public void setClickedCount(Long clickedCount) { this.clickedCount = clickedCount; } } class AdTrendCountHistory{ private Long clickedCountHistoryLong; public Long getClickedCountHistoryLong() { return clickedCountHistoryLong; } public void setClickedCountHistoryLong(Long clickedCountHistoryLong) { this.clickedCountHistoryLong = clickedCountHistoryLong; } }
转载请注明原文地址: https://www.6miu.com/read-18742.html

最新回复(0)