Scala处理文件

xiaoxiao2021-02-28  70

import java.io.File import java.util.concurrent.ConcurrentHashMap import com.alibaba.fastjson.JSONObject import scala.collection.{JavaConversions, mutable} import scala.io.Source import scala.reflect.io.{Directory, Path} import scala.util.matching.Regex class KafkaAnalysis { val fileregex : Regex = ".*\\.log$".r val dir : String = "D:\\securedownload" val cmaps : ConcurrentHashMap[String, JSONObject] = new ConcurrentHashMap() def listFiles() : Iterator[File] = { val dirs = new File(dir) val dirsFilter = Directory(Path.jfile2path(dirs)).walkFilter(path => path.isFile && fileregex.findAllIn(path.name).nonEmpty) dirsFilter.map(_.jfile) } def compact() : Unit = { val files = listFiles() files.toList.par.foreach{ file => fetchParams(Source.fromFile(file).getLines()) } } def fetchParams(iter : Iterator[String]) : Unit = { val lineRegex = ".*Topic:\\s*(\\S+)\\s*Partition:\\s*(\\d).*Leader:\\s*(\\d+)\\s*.*".r val timeRegex = "\\s*(2017-\\d+-\\d+\\s\\d+:\\d+:\\d+)\\s+.*".r var time : Option[String] = None val maps = new mutable.HashMap[String, String]() iter.foreach{ line => val lineMatch = lineRegex findFirstMatchIn line if(lineMatch.isDefined && lineMatch.get.groupCount == 3){ val key = lineMatch.get.group(1) + "-" + lineMatch.get.group(2) val value = lineMatch.get.group(3) maps.put(key, value) }else if(lineMatch.isEmpty){ val timeMatch = timeRegex findFirstMatchIn line if(timeMatch.isDefined && timeMatch.get.groupCount == 1) { time = Some(timeMatch.get.group(1)) } } } if(time.isDefined) { maps.foreach { map => val key = map._1 if(cmaps.containsKey(key)) { val json = cmaps.get(key) json.put(time.get, map._2) }else{ val json = new JSONObject() json.put(time.get, map._2) if(!cmaps.containsKey(key)){ cmaps.put(key, json) }else { val json = cmaps.get(key) json.put(time.get, map._2) } } } } } } object KafkaAnalysis { def main(args : Array[String]) : Unit = { val leo = new KafkaAnalysis() leo.compact() JavaConversions.mapAsScalaMap(leo.cmaps).toList.sortWith(_._1 > _._1)foreach{ case (k, v) => print(k + " ") JavaConversions.asScalaIterator(v.values().iterator()) .map(param => (param, 1)).toList.groupBy(_._1) .foreach{case (gk, gc) => print(gk + ":") print(gc.size) print(" ") } println() } } }

文件数据为:

2017-06-01 22:00:01 checkpoint the topics information Topic:A PartitionCount:20 ReplicationFactor:3 Configs:retention.ms=604800000 Topic: A Partition: 0 Leader: 11250 Replicas: 11250,11251,11252 Isr: 11251,11250,11252 Topic: A Partition: 1 Leader: 11251 Replicas: 11251,11252,11254 Isr: 11251,11252,11254 Topic: A Partition: 2 Leader: 11252 Replicas: 11252,11254,11255 Isr: 11255,11252,11254 Topic: A Partition: 3 Leader: 11254 Replicas: 11254,11255,11256 Isr: 11255,11256,11254 Topic: A Partition: 4 Leader: 11255 Replicas: 11255,11256,11257 Isr: 11257,11255,11256 Topic: A Partition: 5 Leader: 11256 Replicas: 11256,11257,11258 Isr: 11256,11258,11257 Topic: A Partition: 6 Leader: 11257 Replicas: 11257,11258,11247 Isr: 11257,11258,11247 Topic: A Partition: 7 Leader: 11258 Replicas: 11258,11247,11249 Isr: 11258,11247,11249 Topic: A Partition: 8 Leader: 11247 Replicas: 11247,11249,11250 Isr: 11250,11247,11249 Topic: A Partition: 9 Leader: 11249 Replicas: 11249,11250,11251 Isr: 11251,11250,11249 Topic: A Partition: 10 Leader: 11250 Replicas: 11250,11252,11254 Isr: 11250,11252,11254 Topic: A Partition: 11 Leader: 11251 Replicas: 11251,11254,11255 Isr: 11251,11255,11254 Topic: A Partition: 12 Leader: 11252 Replicas: 11252,11255,11256 Isr: 11255,11256,11252 Topic: A Partition: 13 Leader: 11254 Replicas: 11254,11256,11257 Isr: 11257,11256,11254 Topic: A Partition: 14 Leader: 11255 Replicas: 11255,11257,11258 Isr: 11257,11255,11258 Topic: A Partition: 15 Leader: 11256 Replicas: 11256,11258,11247 Isr: 11256,11258,11247 Topic: A Partition: 16 Leader: 11257 Replicas: 11257,11247,11249 Isr: 11257,11247,11249 Topic: A Partition: 17 Leader: 11258 Replicas: 11258,11249,11250 Isr: 11250,11258,11249 Topic: A Partition: 18 Leader: 11247 Replicas: 11247,11250,11251 Isr: 11251,11250,11247 Topic: A Partition: 19 Leader: 11249 Replicas: 11249,11251,11252 Isr: 11251,11252,11249 Topic:B PartitionCount:20 ReplicationFactor:3 Configs:retention.ms=604800000 Topic: B Partition: 0 Leader: 11249 Replicas: 11249,11247,11250 Isr: 11250,11247,11249 Topic: B Partition: 1 Leader: 11250 Replicas: 11250,11249,11251 Isr: 11251,11250,11249 Topic: B Partition: 2 Leader: 11251 Replicas: 11251,11250,11252 Isr: 11251,11250,11252 Topic: B Partition: 3 Leader: 11252 Replicas: 11252,11251,11254 Isr: 11251,11252,11254 Topic: B Partition: 4 Leader: 11254 Replicas: 11254,11252,11255 Isr: 11255,11252,11254 Topic: B Partition: 5 Leader: 11255 Replicas: 11255,11254,11256 Isr: 11255,11256,11254 Topic: B Partition: 6 Leader: 11256 Replicas: 11256,11255,11257 Isr: 11255,11256,11257 Topic: B Partition: 7 Leader: 11257 Replicas: 11257,11256,11258 Isr: 11257,11256,11258 Topic: B Partition: 8 Leader: 11258 Replicas: 11258,11257,11247 Isr: 11257,11258,11247 Topic: B Partition: 9 Leader: 11247 Replicas: 11247,11258,11249 Isr: 11258,11247,11249 Topic: B Partition: 10 Leader: 11249 Replicas: 11249,11250,11251 Isr: 11251,11250,11249 Topic: B Partition: 11 Leader: 11250 Replicas: 11250,11251,11252 Isr: 11251,11250,11252 Topic: B Partition: 12 Leader: 11251 Replicas: 11251,11252,11254 Isr: 11251,11252,11254 Topic: B Partition: 13 Leader: 11252 Replicas: 11252,11254,11255 Isr: 11255,11252,11254 Topic: B Partition: 14 Leader: 11254 Replicas: 11254,11255,11256 Isr: 11255,11256,11254 Topic: B Partition: 15 Leader: 11255 Replicas: 11255,11256,11257 Isr: 11257,11255,11256 Topic: B Partition: 16 Leader: 11256 Replicas: 11256,11257,11258 Isr: 11256,11258,11257 Topic: B Partition: 17 Leader: 11257 Replicas: 11257,11258,11247 Isr: 11257,11258,11247 Topic: B Partition: 18 Leader: 11258 Replicas: 11258,11247,11249 Isr: 11258,11247,11249 Topic: B Partition: 19 Leader: 11247 Replicas: 11247,11249,11250 Isr: 11250,11247,11249

转载请注明原文地址: https://www.6miu.com/read-34945.html

最新回复(0)