Kafka是一个分布式的发布-订阅式的消息系统,可以作为 DStream 的高级数据源,本部分以单击统计为例介绍 Spark Streaming 程序从 kafka 中消费数据,包括两部分(基于 Kafka Receiver 方式,基于Kafka Direct方式)。
1.2 先学课程
Hadoop 入门进阶课程:https://www.shiyanlou.com/courses/237
Kafka 快速上手教程:https://www.shiyanlou.com/teacher/courses/785
Spark Streaming 是 Spark 的核心组件之一,为 Spark 提供了可拓展、高吞吐、容错的流计算能力。如下图所示,Spark Streaming 可整合多种输入数据源,如 Kafka、Flume、HDFS等,经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里。
Spark Streaming 最主要的抽象是 DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。在内部实现上,Spark Streaming 的输入数据按照时间片(如1秒)分成一段一段的 DStream,每一段数据转换为 Spark 中的 RDD,并且对 DStream 的操作都最终转变为对相应的 RDD 的操作。例如,下图展示了进行单词统计时,每个时间片的数据(存储句子的 RDD)经 flatMap 操作,生成了存储单词的 RDD。整个流式计算可根据业务的需求对这些中间的结果进一步处理,或者存储到外部设备中。
我们已经在实验楼环境里下载并配置启动 hadoop-2.6.1 所需的文件,免除您配置文件的麻烦,您可以在 /opt 找到,只需格式化并启动 hadoop 进程即可。
双击打开桌面上的 Xfce 终端,用 sudo 命令切换到 hadoop 用户,hadoop 用户密码为 hadoop,用 cd 命令进入 /opt目录。
$ su hadoop $ cd /opt/
在 /opt 目录下格式化 hadoop。
$ hadoop-2.6.1/bin/hdfs namenode -format
在 /opt 目录下启动 hadoop 进程。
$ hadoop-2.6.1/sbin/start-all.sh
用 jps 查看 hadoop 进程是否启动。
在 /opt 目录下,用 hadoop 用户通过 wget 命令下载,并用 tar 解压。
$ sudo wget http://labfile.oss.aliyuncs.com/courses/785/kafka_2.10-0.10.0.0.tgz $ sudo tar -zxf kafka_2.10-0.10.0.0.tgz
分别启动 zookeeper,kafka。
#权限不足,授权 $ sudo chmod 777 -R kafka_2.10-0.10.0.0 $ cd kafka_2.10-0.10.0.0 #启动zookeeper $ bin/zookeeper-server-start.sh config//zookeeper.properties & #启动kafka $ bin/kafka-server-start.sh config/server.properties &
用 jps命令查看进程。
用 kafka-topics.sh 脚本创建主题。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wc1
注意:本节课实验是沿用上节课的 scala IDE 环境,pom.xml 不需要修改,需要的 spark-streaming-kafka_2.10 jar 依赖已经添加在里面。
选中 cn.com.syl.spark 包 -> 用快捷键 Ctrl+N ->搜索 class -> 选中 java class -> Next
输入类名 -> Finish
KafkaReceiverSpark.java 代码如下:
package cn.com.syl.spark; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import scala.Tuple2; public class KafkaReceiverSpark { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("KafkaReceiverSpark"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(6)); // 使用KafkaUtils.createStream()方法,创建 Kafka 的输入数据流 Map<String, Integer> topicThreadMap = new HashMap<String, Integer>(); topicThreadMap.put("wc1", 1); JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream( jsc, "localhost:2181", "DefaultConsumerGroup", topicThreadMap); // wordcount code JavaDStream<String> words = lines.flatMap( new FlatMapFunction<Tuple2<String,String>, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(Tuple2<String, String> tuple) throws Exception { return Arrays.asList(tuple._2.split(" ")); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jsc.start(); jsc.awaitTermination(); jsc.close(); } }启动 Spark Streaming。
打开 Xfce 终端启动 kafka Producer。
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wc1 #输入任意
快速切换到scala IDE Console 控制台,屏幕上会显示程序运行的相关信息,并会每隔6秒钟刷新一次信息,大量信息中会包含如下重要信息,默认只显示前十条:
同样地,您也可以再另外开启 consume 终端。
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic wc1
至此基于 Kafka Receiver 方式整哈Spark Streaming 顺利完成。实验结束后,要关闭各个终端,只要切换到该终端窗口,然后按键盘的 Ctrl+C 组合键,就可以结束程序运行。
关于 基于 Kafka Direct 方式,只需要新建一个类 KafkaDirectSpark,具体代码如下:
package cn.com.syl.spark; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; public class KafkaDirectSpark{ public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("KafkaDirectSpark"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(6)); // 创建map,添加参数 Map<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", "localhost:9092"); // 创建一个集合set,添加读取的topic Set<String> topics = new HashSet<String>(); topics.add("wc1"); // 创建输入DStream JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); // 单词统计 JavaDStream<String> words = lines.flatMap( new FlatMapFunction<Tuple2<String,String>, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(Tuple2<String, String> tuple) throws Exception { return Arrays.asList(tuple._2.split(" ")); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); } }执行方式和上述基于 Kafka Receiver 方式一模一样,在此就不演示了,请您完成。
补充知识:
假定您是在 windows 平台写的代码 ,对于上面的代码,您完全可以用打 jar 包的方式运行,具体参考上节 Streaming 整合 Flume
本节课主要介绍了 Spark Streaming 与 Kafka 的整合的两种方式,并就 Windows 平台如何打 jar 包提交到远程服务器进行讲解,希望学完本节课,能帮助您理解 Spark Streaming,并能很快上手。