sparkStreaming+kafka+hbase实战练习一

xiaoxiao2021-02-28  95

               这个需求是是按照实际工作中的项目改写的一个例子。

               业务需求: 1. 实时统计某市银行流水交易总额

                                   2. 实时统计某市银行某地区的流水交易总额

                                   3. 每隔10s统一次每个地区近一年内的交易总额

                系统需求: 保证数据不丢失、系统可容错

              分析:这是一个实时流处理系统的常见应用,在网页点击分析中也可以看见类似的例子,kafka具有高可扩展性、容错性、高吞吐量、高并发、分布式等特性,常用于实时流中做消息传输,同时也起到了消息缓冲的作用,因此在这个例子中使用kafka对接外部系统做消息的传输。关于流式处理系统现在常用的有sparkStreaming、Storm,Storm是一个真正的流式处理系统,每次处理处理一条消息,SparkStreaming 看做一个伪流式处理系统,每次处理一个很小batch的数据,它基于SparkCore ,因此继承了sparkCore的很多特性,例如:任务执行优化、内存分布式计算、可容错、迭代式计算等,除了可类似于sparkCore RDD的算子外,还提供了状态函数与窗口函数,可对接kafka 、flume、文件、监听端口等,在我们这个例子中使用SparkStreaming 正好能满足我们的要求。Hbase是一个分布式的KV存储系统,能够快速读写数据,在实时流中Hbase用于存储源报文再好不过了。

     设计: 使用kafka对接外部系统,kafka客户端消息的传输有两种方式:sync与async,即同步方式与异步方式,同步方式表示没发送一条消息就提交一次,这种做法能够保证数据不丢失,异步方式是批量发送提交,当客户端的消息达到一定大小就将其提交给broker,这种做法性能高,为了保证数据的不丢失,采用同步方式提交,另外要求该topic的所有副本都收到消息才算成功;kafka与SparkStreaming对接提供了两种方式,一种是reciver,一种是direct,这两种方式都可以做到数据容错,但是reciver方式代价更大,需要预写日志、在内存序列化中保证两份(默认),而direct方式直接读取kafka中消息,使用消息offset手动提交,只有在处理成功的情况下才去读取下一条消息,因此在这里使用direct方式获取消息。我们要求系统能够7*24小时不间断的工作,那就是保证系统的容错性,即时系统出现故障,也能够即时恢复到最近的状态,sparkStreaming中提供了checkpoint 机制,可保存系统的元数据信息与数据信息,恢复时直接读取元数据信息与数据信息恢复到最近的状态。统计流水交易总额与某地区的流水交易总额使用updateStateByKey,每隔10s统一次每个地区近一年内的交易总额使用reduceByKeyAndWindow算子操作。对于将源始报文保存到hbase,直接使用hbase api。

   实现:

         启动服务:hdfs、yarn、hbase、kafka

         创建topic :  ./kafka-topics.sh  --zookeeper hadoop1:2181 --partitions 1 --replication-factor 1 --create --topic  topicName(只有一台服务器创建一个分区一个副本)

               涉及四个topic 1. 源报文:topic2  2. 交易总额:topic_all 

       创建hbase表:create 'trade','cf'

                 3 . 地区交易总额 :topic_regionsum 4. 每隔10s每个地区近一年内的交易总额 topic_regionsumOneYear

         

       创建maven java project项目,使用maven添加依赖:

            <dependencies>     <dependency>       <groupId>junit</groupId>       <artifactId>junit</artifactId>       <version>3.8.1</version>       <scope>test</scope>     </dependency>     <dependency>     <groupId>org.apache.spark</groupId>     <artifactId>spark-streaming_2.11</artifactId>     <version>2.2.0</version> </dependency> <dependency>     <groupId>org.apache.spark</groupId>     <artifactId>spark-sql_2.10</artifactId>     <version>2.2.0</version> </dependency>     <dependency>        <groupId>org.apache.spark</groupId>        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>        <version>2.2.0</version>     </dependency>     <dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>0.8.2.0</version> </dependency>    <dependency>     <groupId>org.apache.hbase</groupId>     <artifactId>hbase-client</artifactId>     <version>1.3.1</version>    </dependency>    <dependency>         <groupId>org.apache.hbase</groupId>         <artifactId>hbase-server</artifactId>         <version>1.3.1</version>     </dependency>     <dependency>       <groupId>jdk.tools</groupId>       <artifactId>jdk.tools</artifactId>       <version>1.6</version>     <scope>system</scope>        <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>       </dependency> <dependency>     <groupId>com.google.code.gson</groupId>     <artifactId>gson</artifactId>     <version>2.2.4</version> </dependency>   </dependencies>

        编码:

               public class Demo2 { private static String SPARK_MASTER="yarn";    //使用yarn作为资源框架 private static String SPARK_DEPLOY_MODE="client"; private static String SPARK_APP_NAME="sparkStreaming"; private static long SPARK_BATCH_SIZE_M=10000;  //批处理间隔10s private static long SPARK_WIN_LEN=120000;   //窗口大小 2min (方便演示) private static long SPARK_WIN_SLID=10000;    //窗口滑动间隔10s 

         //checkpoint的路径 private static String SPARK_CHECKPOINT_DIR="hdfs://hadoop1:9000/spark/streaming/checkpoint3"; private static String KAFKA_BOOTSTRAP_SERVERS="hadoop1:9092"; private static String KAFKA_KEY_DESERIALIZER="org.apache.kafka.common.serialization.StringDeserializer"; private static String KAFKA_VALUE_DESERIALIZER="org.apache.kafka.common.serialization.StringDeserializer"; private static String KAFKA_GROUP_ID="g1"; private static String KAFKA_AUTO_OFFSET_RESET="largest"; private static String KAFKA_ENABLE_AUTO_COMMIT="false"; private static String KAFKA_TOPIC="topic2"; private static Function0<JavaStreamingContext> createStreamingContext=new Function0<JavaStreamingContext>() { public JavaStreamingContext call() throws Exception { SparkConf conf=new SparkConf(); conf.set("spark.app.name", SPARK_APP_NAME);        conf.set("spark.master", SPARK_MASTER);        conf.set("spark.submit.deployMode", SPARK_DEPLOY_MODE);        final JavaSparkContext jsc=new JavaSparkContext(conf);        JavaStreamingContext jssc=new JavaStreamingContext(jsc,Duration.apply(SPARK_BATCH_SIZE_M));        //checkpoint        jssc.checkpoint(SPARK_CHECKPOINT_DIR);                //初始区域交易金额总和        List<Tuple2<String,Long>> initRegionSumData=         Arrays.asList(new Tuple2<String,Long>("A01", Long.valueOf(500000)),new Tuple2<String,Long>("A02", Long.valueOf(8000000)),         new Tuple2<String,Long>("A03", Long.valueOf(15000000)),new Tuple2<String,Long>("A04", Long.valueOf(6000000)));                //一定要为JavaPairRDD类型        JavaPairRDD<String,Long> initRegionSumRDD=jssc.sparkContext().parallelizePairs(initRegionSumData);       

             //初始交易总额        final long initAllSum=100000000;               List<Tuple2<String,Long>> initAllSumData=Arrays.asList(new Tuple2<String,Long>("all",initAllSum));                JavaPairRDD<String,Long> initAllSumRDD =jssc.sparkContext().parallelizePairs(initAllSumData);                Map<String,String> kafkaParams=new HashMap<String,String>();        kafkaParams.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);        kafkaParams.put("key.deserializer", KAFKA_KEY_DESERIALIZER);        kafkaParams.put("value.deserializer", KAFKA_VALUE_DESERIALIZER);        kafkaParams.put("group.id", KAFKA_GROUP_ID);        kafkaParams.put("auto.offset.reset", KAFKA_AUTO_OFFSET_RESET);        kafkaParams.put("enable.auto.commit", KAFKA_ENABLE_AUTO_COMMIT);        Set<String> topics=new HashSet<String>();        topics.add(KAFKA_TOPIC);               JavaPairInputDStream<String, String> kafkaS= KafkaUtils.createDirectStream(jssc, String.class,         String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);               //过虑key  从kafka中获取的消息 key对于我们说是没有用的      JavaDStream<String> jds= kafkaS.map(new Function<Tuple2<String,String>, String>() { public String call(Tuple2<String, String> v1) throws Exception { return v1._2(); }  });             //原始数据存入hbase中,源始报文使用tab制表符分割字段

           //格式  数据格式:交易流水号交易地区编码交易类型交易金额交易时间

           //例如: 00001A01T0120002017-08-01      jds.foreachRDD(new VoidFunction<JavaRDD<String>>() { public void call(JavaRDD<String> t) throws Exception {

//按照官网这里应该使用数据连接池,对每一个partition使用一个连接即可,后续优化 t.foreachPartition(new VoidFunction<Iterator<String>>() { public void call(Iterator<String> t) throws Exception {   List<Put> listPut=new ArrayList<Put>();    while(t.hasNext())    { String one=t.next(); byte[] f=Bytes.toBytes("cf"); String s[]=one.split(""); ImmutableBytesWritable key=new ImmutableBytesWritable(Bytes.toBytes(s[0])); Put p=new Put(Bytes.toBytes(s[0])); p.addColumn(f, Bytes.toBytes("tradeId"), Bytes.toBytes(s[0])); p.addColumn(f, Bytes.toBytes("tradeAreaId"), Bytes.toBytes(s[1])); p.addColumn(f, Bytes.toBytes("tradeType"), Bytes.toBytes(s[2])); p.addColumn(f, Bytes.toBytes("tradeMoney"), Bytes.toBytes(s[3])); p.addColumn(f, Bytes.toBytes("tradeTime"), Bytes.toBytes(s[4])); listPut.add(p);    }        Configuration conf=HBaseConfiguration.create();    conf.set("hbase.zookeeper.quorum", "hadoop1:2181");    HTable table=new HTable(conf, "trade");    table.put(listPut);                        table.flushCommits();                        table.close(); } }); } });           //返回<地区,交易金额>      JavaPairDStream<String, Long> jpds= jds.mapToPair(new PairFunction<String, String, Long>() { public Tuple2<String, Long> call(String t) throws Exception { String[] s=t.split(""); return new Tuple2(s[1],Long.valueOf(s[3])); } });            //多次使用  缓存起来      jpds.cache();             //装换为key相同的交易记录("all",金额)     JavaPairDStream<String, Long> jpdsMap= jpds.mapToPair(new PairFunction<Tuple2<String,Long>, String, Long>() { public Tuple2<String, Long> call(Tuple2<String, Long> t) throws Exception { // TODO Auto-generated method stub return new Tuple2("all",t._2()); } });           //计算总的交易金额,使用默认分区与初始值    JavaPairDStream<String,Long> allResult= jpdsMap.updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {                        //v1表示新进来的数据 v2表示当前的数据 public Optional<Long> call(List<Long> v1, Optional<Long> v2) throws Exception {                   Long news=0L; if(v2.isPresent()) { news=v2.get(); } for(Long i : v1) { news+=i; } return Optional.of(news); } },new HashPartitioner(jssc.sparkContext().defaultParallelism()), initAllSumRDD);            //打印    System.out.println("======总交易金额====");    allResult.print();        //发送到kafka中   sendMessageToKafka("topic_all",allResult);           //统计地区交易总和 使用initRegionSumRDD做为初始值   JavaPairDStream<String, Long> regionSum=jpds.updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() { public Optional<Long> call(List<Long> v1, Optional<Long> v2) throws Exception { Long news=0L; if(v2.isPresent()) { news=v2.get(); } for(Long i : v1) { news+=i; } return Optional.of(news); } }, new HashPartitioner(jssc.sparkContext().defaultParallelism()), initRegionSumRDD);       //打印   System.out.println("======地区交易总金额====");   regionSum.print(1);  sendMessageToKafka("topic_regionsum",regionSum);          //统计各个地区两年内的交易总额 JavaPairDStream<String, Long> regionSumInTwoYears= jpds.reduceByKeyAndWindow(new Function2<Long, Long, Long>() { public Long call(Long v1, Long v2) throws Exception { // TODO Auto-generated method stub return v1+v2; } }, Duration.apply(SPARK_WIN_LEN), Duration.apply(SPARK_WIN_SLID));    System.out.println("======地区交易两年内总金额===="); regionSumInTwoYears.print();           sendMessageToKafka("topic_regionsumTwoYear",regionSumInTwoYears);   return jssc; } }; //将消息发送到kafka中   以json格式发送 private static void sendMessageToKafka(final String topic,JavaPairDStream<String, Long> ds) { ds.foreachRDD(new VoidFunction<JavaPairRDD<String,Long>>() { public void call(JavaPairRDD<String, Long> t) throws Exception { List<Tuple2<String, Long>> list= t.collect(); Map<String,String> message=new HashMap<String,String>();                 for(Tuple2<String, Long> tp: list)                 {                 message.put(tp._1(), String.valueOf(tp._2()));                 }                                  KafKaProducerUtil.sendMassage(topic,null,new Gson().toJson(message)); } }); } public static void main(String[] args) {         //这个做checkpoint 重启仍然能够回到最新一次的状态,还能保证数据不丢失  JavaStreamingContextjssc=JavaStreamingContext.getOrCreate(SPARK_CHECKPOINT_DIR,            createStreamingContext);  jssc.start();  try { jssc.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); } } }

          

public class KafKaProducerUtil { private static String KAFKA_METADATA_BROKER_LIST="hadoop1:9092"; private static String KAFKA_KEY_SERIALIZER_CLASS="kafka.serializer.StringEncoder"; private static String KAFKA_SERIALIZER_CLASS="kafka.serializer.StringEncoder"; private static String KAFKA_REQUEST_REQUIRED_ACKS="1"; private static String KAFKA_COMPRESSION_CODEC="snappy"; private static String KAFKA_MESSAGE_SEND_MAX_RETRIES="2"; private static String KAFKA_RECONNECT_BACKOFF_MS="100"; private static String KAFKA_REQUEST_TIMEOUT_MS="10000"; private static String KAFKA_PRODUCER_TYPE="sync"; //同步处理 private static String KAFKA_QUEUE_BUFFERING_MAX_MS=""; //消息缓存最长时间 异步方式下配置 private static String KAFKA_QUEUE_BUFFERING_MAX_MESSAGES="";//缓存消息的最大值 private static String KAFKA_QUEUE_ENQUEUE_TIMEOUT_MS="";//消息进入队列的的等待时间 private static String KAFKA_BATCH_NUM_MESSAGES="";//批量消息条数 private static String KAFKA_SEND_BUFFER_BYTES="";//socket缓存大小 private static Producer<String, String> producer; private static Producer<String, String> getProducer() { if(producer==null) { synchronized (KafKaProducerUtil.class) { Properties pro=new Properties(); pro.setProperty("metadata.broker.list",KAFKA_METADATA_BROKER_LIST); pro.setProperty("request.required.acks", KAFKA_REQUEST_REQUIRED_ACKS); pro.setProperty("producer.type", KAFKA_PRODUCER_TYPE); pro.setProperty("serializer.class", KAFKA_SERIALIZER_CLASS); pro.setProperty("request.timeout.ms", KAFKA_REQUEST_TIMEOUT_MS); pro.setProperty("key.serializer.class", KAFKA_KEY_SERIALIZER_CLASS); pro.setProperty("compression.codec", KAFKA_COMPRESSION_CODEC); pro.setProperty("message.send.max.retries", KAFKA_MESSAGE_SEND_MAX_RETRIES); pro.setProperty("retry.backoff.ms", KAFKA_RECONNECT_BACKOFF_MS); ProducerConfig config=new ProducerConfig(pro); producer=new  Producer<String, String>(config); } } return producer; } public static void sendMassage(String topic,String key,String message) { Producer<String, String> producer=getProducer(); KeyedMessage<String, String> km=new KeyedMessage<String, String>(topic, key, message); producer.send(km); } public static void sendListMassage(String topic,Map<String,String> msg) { Producer<String, String> producer=getProducer(); List<KeyedMessage<String, String>> list=new ArrayList<KeyedMessage<String, String>>(); for(String k : msg.keySet()) {  KeyedMessage<String, String> km=new KeyedMessage<String, String>(topic, k, msg.get(k));  list.add(km); } producer.send(list); }

}

  编码完成之后打成jar包,上传到服务器。需要将hbase的相关jar包放到spark的jars目录下,另外需要kafka与sparkStreaming的集成jar包spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar,

 启动服务: sh spark-submit  --class sparkStreaming.sparkStreaming.Demo2 /bigdata/spark/javademo/sparkStreamingDemo3.jar 

生产者: ./kafka-console-producer.sh --broker-list hadoop1:9092 --topic topic2  (这里先用console)

消费者:./kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic topic_all

              ./kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic topic_regionsum

            ./kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic topic_regionsumTwoYear

结果:

     [root@hadoop1 bin]# ./kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic topic_all     {"all":"100000000"}

   [root@hadoop1 bin]#  ./kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic topic_regionsum   {"A02":"8000000","A01":"500000","A04":"6000000","A03":"15000000"}

 [root@hadoop1 bin]#  ./kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic topic_regionsumTwoYear  {}

当我们生产消息时:

  [root@hadoop1 bin]# ./kafka-console-producer.sh --broker-list hadoop1:9092 --topic topic2 [2017-08-05 01:53:17,575] WARN Property topic is not valid (kafka.utils.VerifiableProperties) 00001   A01     T01     2000    2017-08-01 12:00:00

[root@hadoop1 bin]# ./kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic topic_all {"all":"100000000"} {"all":"100000000"} {"all":"100002000"} {"all":"100002000"}

[root@hadoop1 bin]#  ./kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic topic_regionsum {"A02":"8000000","A01":"500000","A04":"6000000","A03":"15000000"} {"A02":"8000000","A01":"500000","A04":"6000000","A03":"15000000"} {"A02":"8000000","A01":"502000","A04":"6000000","A03":"15000000"} {"A02":"8000000","A01":"502000","A04":"6000000","A03":"15000000"}

[root@hadoop1 bin]#  ./kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic topic_regionsumTwoYear {} {} {"A01":"2000"} {"A01":"2000"}

hbase 中记录:

  hbase(main):001:0> scan 'trade' ROW                                              COLUMN+CELL                                                                                                                                   00001                                           column=cf:tradeAreaId, timestamp=1501923210595, value=A01                                                                                     00001                                           column=cf:tradeId, timestamp=1501923210595, value=00001                                                                                       00001                                           column=cf:tradeMoney, timestamp=1501923210595, value=2000                                                                                     00001                                           column=cf:tradeTime, timestamp=1501923210595, value=2017-08-01 12:00:00                                                                       00001                                           column=cf:tradeType, timestamp=1501923210595, value=T01     

             

          

    到这里整个实例主要流程已经演示完毕,后续将继续完成kafka生产者部分与消费者部分,研究在sparkStreaming中直接使用SQL方式将数据插入到hbase,研究如果在对原始报文需要关联查询,使用hbase还是使用spark的广播变量。

    对于以上内容欢迎大脚拍砖~~~~~~~~~~~~~

转载请注明原文地址: https://www.6miu.com/read-22031.html

最新回复(0)