Spark Streaming 实时监控一个HDFS的文件夹,当新的文件进来(名字不能重复),将对新文件进行处理。

xiaoxiao2021-02-28  60

import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Spark Streaming 实时监控一个HDFS的文件夹,当新的文件进来(名字不能重复),将对新文件进行处理。 * Created by csw on 2017/7/4. */ object HDFSDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val config = new SparkConf().setAppName("Spark shell") val ssc = new StreamingContext(config, Seconds(10)) val lines = ssc.textFileStream("hdfs://master:9000/csw/tmp2/test/") val words: DStream[String] = lines.flatMap(_.split(" ")) val wordCounts: DStream[(String, Int)] = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }

//下满是获取Linux本地的文件

val lines = ssc.textFileStream("file:///csw/tmp/test2")

转载请注明原文地址: https://www.6miu.com/read-63172.html

最新回复(0)