在kafka中创建一个topic –test2
./kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 1 --partitions 3 --topic test2使用shell产生数据
./kafka-console-producer.sh --broker-list hadoop1:9092 --topic test2pom.xml
<properties> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.10.6</scala.version> <scala.compat.version>2.10</scala.compat.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.2</version> </dependency> </dependencies>WordCount.scala
import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object WordCount { val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))} } def main(args: Array[String]) { //接收命令行中的参数 val Array(zkQuorum, groupId, topics, numThreads, hdfs) = Array("hadoop1:2181", "streaming", "test2", "3", "file:///C:\\Users\\XT\\Desktop\\test") //创建SparkConf并设置AppName val conf = new SparkConf().setAppName("UrlCount") //创建StreamingContext val ssc = new StreamingContext(conf, Seconds(2)) //设置检查点 ssc.checkpoint(hdfs) //设置topic信息 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap //从Kafka中拉取数据创建DStream val lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2) val wc = lines.flatMap(_.split(" ")).map((_, 1)) val result = wc.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) //将结果打印到控制台 result.print() ssc.start() ssc.awaitTermination() } }