spark从入门到放弃五十:Spark Streaming(10)实时黑名单过滤

xiaoxiao2021-02-28  14

文章地址:http://www.haha174.top/article/details/254946 transform 操作,应用在DStream 上时,可以用于执行任意的RDD 到RDD 转换的操作。他可以用于实现,DStream api 所没有提供的操作。比如说,DStream api 中并没有提供一个DStream 中的每个batch ,与一个特定的RDD 进行join 操作。但是我们自己就可以使用transform 操作来实现该功能。 DStream.join() 只能join 其他的DStream ,在DStream 每个 batch 的RDD 计算出来之后,会去跟其他的DStream 的RDD 行join 下面给出java 示例和注释:

public class TransFormBlackList { public static void main(String[] args) throws InterruptedException { System.setProperty("HADOOP_USER_NAME", "root"); SparkConf conf=new SparkConf().setMaster("local[2]").setAppName("TransFormBlackList"); JavaSparkContext sc=new JavaSparkContext(conf); JavaStreamingContext jssc=new JavaStreamingContext(sc, Durations.seconds(5)); // 用户对我们的网站上面的广告可以进行点击 //点击之后,要进行实时的计费,点一下,算一次钱 //但是对于某些无良商家刷广告的人,那么我们就有一个黑名单 //只要是黑名单中点击的都过滤掉 // 日志格式 date username // 先做一份模拟的黑名单 List<Tuple2<String,Boolean>> blackListData=new ArrayList<>(); blackListData.add(new Tuple2<>("tom",true)); final JavaPairRDD<String,Boolean> blackListRDD=sc.parallelizePairs(blackListData); JavaReceiverInputDStream<String> adsClickLog=jssc.socketTextStream("www.codeguoj.cn",9999); // 首先需要转换一个格式(date,date username) //以便于后面 于黑名单RDD 进行join JavaPairDStream<String,String> userAdsClickDStream=adsClickLog.mapToPair(new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String s) throws Exception { return new Tuple2<>(s.split(" ")[1],s); } }); //执行transform 操作实时进行黑名单过滤 JavaDStream<String> validAdfClickLogDStream= userAdsClickDStream.transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() { private static final long serialVersionUID=1L; @Override public JavaRDD<String> call(JavaPairRDD<String, String> v1) throws Exception { // 并不是每个用户都存在给名单中所以使用做外链接 JavaPairRDD<String, Tuple2<String, org.apache.spark.api.java.Optional<Boolean>>> joindRDD= v1.leftOuterJoin(blackListRDD); //这里得到的是用户的点击日志 和在黑名单的状态 JavaPairRDD<String, Tuple2<String, org.apache.spark.api.java.Optional<Boolean>>> filterRDD=joindRDD.filter(new Function<Tuple2<String, Tuple2<String, org.apache.spark.api.java.Optional<Boolean>>>, Boolean>() { private static final long serialVersionUID=1L; @Override public Boolean call(Tuple2<String, Tuple2<String, org.apache.spark.api.java.Optional<Boolean>>> v1) throws Exception { if(v1._2._2().isPresent()&&v1._2._2().get()){ return false; } return true; } }); JavaRDD<String> validAdfClickRDD=filterRDD.map(new Function<Tuple2<String, Tuple2<String, org.apache.spark.api.java.Optional<Boolean>>>, String>() { @Override public String call(Tuple2<String, Tuple2<String, org.apache.spark.api.java.Optional<Boolean>>> v1) throws Exception { return v1._2._1; } }); return validAdfClickRDD; } }); validAdfClickLogDStream.print(); jssc.start(); jssc.awaitTermination(); jssc.stop(); } }

下面给出scala 示例和注释

object TransFormBlackList { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("updateStateByKeyWorldCount").setMaster("local[2]") val jssc = new StreamingContext(conf, Durations.seconds(5)) //首先创建输入DStream 代表一个数据源 (比如kafka ,socket) 来持续不断的实时数据流 //调用JavaStreamingContext 的socketTextStream 方法可以创建一个数据源为socket 的网路端口的数据源 val blackList=Array(("tom",true)) val blackListRDD=jssc.sparkContext.parallelize(blackList,5); val lines = jssc.socketTextStream("www.codeguoj.cn", 9999) val userAdsClickLogDStream=lines.map(adfLog=>(adfLog.split(" ")(1),adfLog)) val validAdfClickRDD=userAdsClickLogDStream.transform(userAdsClickLogRDD=>{ val joinedRDD=userAdsClickLogRDD.leftOuterJoin(blackListRDD) val fliterRDD=joinedRDD.filter(tupe=>{ if(tupe._2._2.getOrElse(false)){ false }else{ true } }) val clickedLogRDD=fliterRDD.map(tupe=>tupe._2._1) clickedLogRDD }) validAdfClickRDD.print() jssc.start() jssc.awaitTermination() } }

欢迎关注,更多福利

转载请注明原文地址: https://www.6miu.com/read-2800033.html

最新回复(0)