spark读取kafka后写入redis

xiaoxiao2021-02-28  17

package com.prince.demo.test import com.typesafe.config.ConfigFactory import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.{Level, Logger} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.sql.SparkSession import redis.clients.jedis.Jedis /** * Created by prince on 2017/9/13. */ object SparkStreamingWriteRedis { Logger.getLogger("org").setLevel(Level.WARN) def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("SparkStreamingWriteRedis").master("local[*]").getOrCreate() val sparkContext = spark.sparkContext val ssc = new StreamingContext(sparkContext, Seconds(1)) implicit val conf = ConfigFactory.load val kafkaParams = Map[String, Object]( "bootstrap.servers" -> conf.getString("kafka.brokers"), "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> conf.getString("kafka.group"), "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean)) val topic = conf.getString("kafka.topics") val topics = Array(topic) val stream = KafkaUtils .createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) val input = stream.flatMap(line => { Some(line.value.toString) }) input.foreachRDD(rdd => { rdd.foreachPartition(part => { val jedis = new Jedis("192.168.1.97", 6379, 3000) jedis.auth("123456") part.foreach(x => { jedis.lpush("test_key", x) jedis.close() }) }) }) ssc.start() ssc.awaitTermination() } }
转载请注明原文地址: https://www.6miu.com/read-2650165.html

最新回复(0)