package
com.prince.demo.test
import
com.typesafe.config.ConfigFactory
import org
.apache.kafka.common.serialization.StringDeserializer
import org
.apache.log4j.{Level, Logger}
import org
.apache.spark.streaming.{Seconds, StreamingContext}
import org
.apache.spark.streaming.kafka010
.ConsumerStrategies.Subscribe
import org
.apache.spark.streaming.kafka010
.KafkaUtils
import org
.apache.spark.streaming.kafka010
.LocationStrategies.PreferConsistent
import org
.apache.spark.sql.SparkSession
import redis
.clients.jedis.Jedis
object SparkStreamingWriteRedis {
Logger
.getLogger(
"org")
.setLevel(Level
.WARN)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder.appName(
"SparkStreamingWriteRedis")
.master(
"local[*]")
.getOrCreate()
val sparkContext = spark
.sparkContext
val ssc = new StreamingContext(sparkContext, Seconds(
1))
implicit val conf = ConfigFactory
.load
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> conf
.getString(
"kafka.brokers"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> conf
.getString(
"kafka.group"),
"auto.offset.reset" ->
"latest",
"enable.auto.commit" -> (false: java
.lang.Boolean))
val topic = conf
.getString(
"kafka.topics")
val topics = Array(topic)
val stream = KafkaUtils
.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
val input = stream
.flatMap(line => {
Some(line
.value.toString)
})
input
.foreachRDD(rdd => {
rdd
.foreachPartition(part => {
val jedis = new Jedis(
"192.168.1.97",
6379,
3000)
jedis
.auth(
"123456")
part
.foreach(
x => {
jedis
.lpush(
"test_key",
x)
jedis
.close()
})
})
})
ssc
.start()
ssc
.awaitTermination()
}
}