当有持续不断的结构化或非结构化大数据集以流(streaming)的方式进入分布式计算平台, 能够保存在大规模分布式存储上,并且能够提供准实时SQL查询,这个系统多少人求之不得。
今天,咱们就来介绍一下这个计算框架和过程。
假设,你已经有一个数据收集的引擎或工具(不在本博客讨论范围内,请出门左转Google右转百度),怎么都行, 反正数据能以流的方式给出来,塞进Kafka类似的消息系统。
关于结构化或非结构化,也不在今天的主要讨论范围,但是,必须要说明的是, 你的数据能够以某种规则进行正则化,比如:空格分隔,CSV,JSON等。咱们今天以Apache网站日志数据作为参照。
类似如下:
124.67.32.161 - - [10/Apr/2016:05:37:36 +0800] "GET /blog/app_backend.html HTTP/1.1" 200 26450拿到数据,我们需要一些处理,将业务逻辑分离开来,做成二维表,行列分明,就像是关系型数据库的表。这个事情有Spark DataFrame来完成。
就像写入关系型数据库一样,我们需要将DataFrame写入某处,这里,就是Parquet文件,天然支持schema,太棒了。
我们的数据已经被当做“二维表,Table”写入了Parquet,取出来当然也得是“表”或其他什么的,当然最好是能暴露出JDBC SQL,相关人员使用起来就方便了。
这个事情交给Spark的 SparkThriftServer 来完成。
以上分解似乎完美,一起来看看“设计框架”或“蓝图”。
算了,不解释了,图,自己看。
scala代码如下:
import sqlContext.implicits._ val parquetFile = sqlContext.read.parquet("/user/spark/apachelog.parquet") parquetFile.registerTempTable("logs")在Hive中复制表,这里你会发现,文件LOCATION位置还是原来的路径,目的就是这个,使得新写入的文件还在Hive模型中。
我总觉得这个方法有问题,是不是哪位Hive高人指点一下,有没有更好的办法来完成这个工作?
CREATE EXTERNAL TABLE apachelog LIKE logs STORED AS PARQUET LOCATION '/user/spark/apachelog.parquet';当然,在集群中启用ThriftServer是必须的工作,SparkThriftServer其实暴露的是Hive2服务器,用JDBC驱动就可以访问了。
本博客中使用的SQL查询工具是SQuirreL SQL,具体JDBC配置方法请参照前面说的向左向右转。
结果看似简单,但是经历还是很有挑战的。
至此,本例已完成。完成代码见 GitHub
转自:https://blog.sectong.com/blog/spark_to_parquet.html
APPMain.java
package com.sectong.spark_to_parquet; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SaveMode; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import kafka.serializer.StringDecoder; import scala.Tuple2; import scala.collection.Seq; /** * 运行程序,spark-submit --class "com.sectong.spark_to_parquet.AppMain" --master * yarn target/park_to_parquet-0.0.1-SNAPSHOT.jar --kafka_broker * hadoop1:6667,hadoop2:6667 --kafka_topic apache --parquet_file /user/spark/ * --slide_interval 30 */ public class AppMain { public static final String WINDOW_LENGTH = "window_length"; public static final String SLIDE_INTERVAL = "slide_interval"; public static final String KAFKA_BROKER = "kafka_broker"; public static final String KAFKA_TOPIC = "kafka_topic"; public static final String PARQUET_FILE = "parquet_file"; private static final Options THE_OPTIONS = createOptions(); private static Options createOptions() { Options options = new Options(); options.addOption(new Option(WINDOW_LENGTH, true, "The window length in seconds"));// 窗口大小 options.addOption(new Option(SLIDE_INTERVAL, true, "The slide interval in seconds"));// 计算间隔 options.addOption(new Option(KAFKA_BROKER, true, "The kafka broker list")); // Kafka队列 options.addOption(new Option(KAFKA_TOPIC, true, "The kafka topic"));// TOPIC options.addOption(new Option(PARQUET_FILE, true, "The parquet file"));// 写入Parquet文件位置 return options; } public static void main(String[] args) throws IOException { Flags.setFromCommandLineArgs(THE_OPTIONS, args); // 初始化Spark Conf. SparkConf conf = new SparkConf().setAppName("A SECTONG Application: Apache Log Analysis with Spark"); JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, Flags.getInstance().getSlideInterval()); SQLContext sqlContext = new SQLContext(sc); // 初始化参数 HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(Flags.getInstance().getKafka_topic().split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", Flags.getInstance().getKafka_broker()); // 从Kafka Stream获取数据 JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { private static final long serialVersionUID = 5266880065425088203L; public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<ApacheAccessLog> accessLogsDStream = lines.flatMap(line -> { List<ApacheAccessLog> list = new ArrayList<>(); try { // 映射每一行 list.add(ApacheAccessLog.parseFromLogLine(line)); return list; } catch (RuntimeException e) { return list; } }).cache(); accessLogsDStream.foreachRDD(rdd -> { // rdd to DataFrame DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class); // 写入Parquet文件 df.write().partitionBy("ipAddress", "method", "responseCode").mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile()); return null; }); // 启动Streaming服务器 jssc.start(); // 启动计算 jssc.awaitTermination(); // 等待终止 } } ApacheAccessLog.java
package com.sectong.spark_to_parquet; import java.io.Serializable; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * 解析Apache log */ public class ApacheAccessLog implements Serializable { /** * */ private static final long serialVersionUID = 6681372116317508248L; private String ipAddress; private String clientIdentd; private String userID; private String dateTimeString; private String method; private String endpoint; private String protocol; private int responseCode; private long contentSize; private ApacheAccessLog(String ipAddress, String clientIdentd, String userID, String dateTime, String method, String endpoint, String protocol, String responseCode, String contentSize) { this.ipAddress = ipAddress; this.clientIdentd = clientIdentd; this.userID = userID; this.dateTimeString = dateTime; this.method = method; this.endpoint = endpoint; this.protocol = protocol; this.responseCode = Integer.parseInt(responseCode); if (contentSize.equals("-")) { this.contentSize = 0; } else { this.contentSize = Long.parseLong(contentSize); } } public String getIpAddress() { return ipAddress; } public String getClientIdentd() { return clientIdentd; } public String getUserID() { return userID; } public String getDateTimeString() { return dateTimeString; } public String getMethod() { return method; } public String getEndpoint() { return endpoint; } public String getProtocol() { return protocol; } public int getResponseCode() { return responseCode; } public long getContentSize() { return contentSize; } public void setIpAddress(String ipAddress) { this.ipAddress = ipAddress; } public void setClientIdentd(String clientIdentd) { this.clientIdentd = clientIdentd; } public void setUserID(String userID) { this.userID = userID; } public void setDateTimeString(String dateTimeString) { this.dateTimeString = dateTimeString; } public void setMethod(String method) { this.method = method; } public void setEndpoint(String endpoint) { this.endpoint = endpoint; } public void setProtocol(String protocol) { this.protocol = protocol; } public void setResponseCode(int responseCode) { this.responseCode = responseCode; } public void setContentSize(long contentSize) { this.contentSize = contentSize; } // Example Apache log line: // 127.0.0.1 - - [21/Jul/2014:9:55:27 -0800] "GET /home.html HTTP/1.1" 200 // 2048 private static final String LOG_ENTRY_PATTERN = // 1:IP 2:client 3:user 4:date time 5:method 6:req 7:proto // 8:respcode 9:size "(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\S+)"; private static final Pattern PATTERN = Pattern.compile(LOG_ENTRY_PATTERN); public static ApacheAccessLog parseFromLogLine(String logline) { Matcher m = PATTERN.matcher(logline); if (!m.find()) { // logger.log(Level.ALL, "Cannot parse logline" + logline); throw new RuntimeException("Error parsing logline"); } else { return new ApacheAccessLog(m.group(1), m.group(2), m.group(3), m.group(4), m.group(5), m.group(6), m.group(7), m.group(8), m.group(9)); } } } Flags.java
package com.sectong.spark_to_parquet; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.apache.spark.streaming.Duration; public class Flags { private static Flags THE_INSTANCE = new Flags(); private Duration windowLength; private Duration slideInterval; private String kafka_broker; private String kafka_topic; private String parquet_file; private boolean initialized = false; private Flags() { } public Duration getWindowLength() { return windowLength; } public Duration getSlideInterval() { return slideInterval; } public String getKafka_broker() { return kafka_broker; } public String getKafka_topic() { return kafka_topic; } public String getParquetFile() { return parquet_file; } public static Flags getInstance() { if (!THE_INSTANCE.initialized) { throw new RuntimeException("Flags have not been initalized"); } return THE_INSTANCE; } public static void setFromCommandLineArgs(Options options, String[] args) { CommandLineParser parser = new PosixParser(); try { CommandLine cl = parser.parse(options, args); // 参数默认值 THE_INSTANCE.windowLength = new Duration( Integer.parseInt(cl.getOptionValue(AppMain.WINDOW_LENGTH, "30")) * 1000); THE_INSTANCE.slideInterval = new Duration( Integer.parseInt(cl.getOptionValue(AppMain.SLIDE_INTERVAL, "5")) * 1000); THE_INSTANCE.kafka_broker = cl.getOptionValue(AppMain.KAFKA_BROKER, "kafka:9092"); THE_INSTANCE.kafka_topic = cl.getOptionValue(AppMain.KAFKA_TOPIC, "apache"); THE_INSTANCE.parquet_file = cl.getOptionValue(AppMain.PARQUET_FILE, "/user/spark/"); THE_INSTANCE.initialized = true; } catch (ParseException e) { THE_INSTANCE.initialized = false; System.err.println("Parsing failed. Reason: " + e.getMessage()); } } }