前一段时间,在项目中,领导要求实时查看来自各个省份的ip访问的详情,根据这一需求,通过flume/logstack实时采集nginx的日志到生产到kafka,再通过Spark实时消费分析保存到Redis/MySQL中,最后前端通过百度的echart图实时的显示出来。 首先,得有一份ip归属地的规则表,可以本地的文档,也可以是分布式的在多台机器上的(如hdfs)。 ip规则表部分如下:
1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302 1.0.8.0|1.0.15.255|16779264|16781311|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178 1.0.32.0|1.0.63.255|16785408|16793599|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178 1.1.0.0|1.1.0.255|16842752|16843007|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302 1.1.2.0|1.1.7.255|16843264|16844799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302 1.1.8.0|1.1.63.255|16844800|16859135|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178 1.2.0.0|1.2.1.255|16908288|16908799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302 1.2.2.0|1.2.2.255|16908800|16909055|亚洲|中国|北京|北京|海淀|北龙中网|110108|China|CN|116.29812|39.95931 1.2.4.0|1.2.4.255|16909312|16909567|亚洲|中国|北京|北京||中国互联网信息中心|110100|China|CN|116.405285|39.904989 1.2.5.0|1.2.7.255|16909568|16910335|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302 1.2.8.0|1.2.8.255|16910336|16910591|亚洲|中国|北京|北京||中国互联网信息中心|110100|China|CN|116.405285|39.904989 1.2.9.0|1.2.127.255|16910592|16941055|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178 1.3.0.0|1.3.255.255|16973824|17039359|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178 1.4.1.0|1.4.3.255|17039616|17040383|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302 1234567891011121314 1234567891011121314本地模式
import java.sql.{Date, PreparedStatement, Connection, DriverManager} import org.apache.spark.{SparkContext, SparkConf} /** * 计算ip从属地 * Created by tianjun on 2017/2/13. */ object IpLocation { def ip2Long(ip:String):Long = { val fragments = ip.split("[.]") var ipNum = 0L for(i <- 0 until fragments.length){ ipNum=fragments(i).toLong | ipNum << 8L } ipNum } def binarySearch(lines:Array[(String,String,String)],ip:Long): Int ={ var low =0 var high = lines.length-1 while (low<=high){ val middle = (low + high)/2 if((ip>=lines(middle)._1.toLong)&&(ip<=lines(middle)._2.toLong)){ return middle } if(ip<lines(middle)._1.toLong){ high=middle-1 }else{ low = middle +1 } } -1 } val data2MySql = (iterator:Iterator[(String,Int)])=>{ var conn:Connection = null var ps: PreparedStatement = null val sql = "INSERT INTO location_info(location,counts,access_date) values(?,?,?)" try { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=utf-8", "root", "123") iterator.foreach(line => { ps = conn.prepareStatement(sql) ps.setString(1, line._1) ps.setInt(2, line._2) ps.setDate(3, new Date(System.currentTimeMillis())) ps.executeUpdate() }) } catch { case e: Exception => e.printStackTrace() } finally { if (ps != null) ps.close() if (conn != null) conn.close() } } def main (args: Array[String]){ //windows上报错才加的,在linxu上不需要 System.setProperty("hadoop.home.dir","C:\\tianjun\\winutil\\") val conf = new SparkConf().setMaster("local").setAppName("IpLocation") val sc = new SparkContext(conf) //加载ip属地规则(可以从多台数据获取) val ipRuelsRdd = sc.textFile("c://ip.txt").map(line=>{ val fields = line.split("\\|") val start_num = fields(2) val end_num = fields(3) val province = fields(6) (start_num,end_num,province) }) //全部的ip映射规则 val ipRulesArray = ipRuelsRdd.collect() //广播规则 val ipRulesBroadcast = sc.broadcast(ipRulesArray) //加载处理的数据 val ipsRDD = sc.textFile("c://log").map(line=>{ val fields = line.split("\\|") fields(1) }) val result = ipsRDD.map(ip =>{ val ipNum = ip2Long(ip) val index = binarySearch(ipRulesBroadcast.value,ipNum) val info = ipRulesBroadcast.value(index) //(ip的起始num,ip的结束num,省份) info }) //累加各个省市的结果 .map(t => (t._3,1)).reduceByKey(_+_) result.foreachPartition(data2MySql) // println(result.collect().toBuffer) sc.stop() } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114可以看到,利用spark的算子来进行数据分析是非常容易的。 在spark官网可以看到spark对接kafka,数据库,等,是十分容易的。 再来看看本例子中的写到数据库的结果:
+----+----------+--------+---------------------+ | id | location | counts | access_date | +----+----------+--------+---------------------+ | 7 | 陕西 | 1824 | 2017-02-13 00:00:00 | | 8 | 河北 | 383 | 2017-02-13 00:00:00 | | 9 | 云南 | 126 | 2017-02-13 00:00:00 | | 10 | 重庆 | 868 | 2017-02-13 00:00:00 | | 11 | 北京 | 1535 | 2017-02-13 00:00:00 | +----+----------+--------+---------------------+ 123456789 123456789在本次的测试中,只截取了nginx日志里面的4700条左右的日志,这个文件大小约为1.9M左右。
