druid中 如何从本地批(batch)导入与从hdfs 批导入数据 实战

xiaoxiao2021-03-01  22

druid中 如何从本地批(batch)导入与从hdfs 批导入数据 实战

程序员小新人学习 2018-06-25 11:13:26

使用indexing service 批导入数据,如何配置task文件,指定从本地 和 hdfs中导入数据。很多在手册中没有详细说明,配置起来存在困难。

先搭建几个节点:coordinator、historical、overlord、middleManager。并且启动服务。

前提:需要准备好mysql(http://my.oschina.net/u/2460844/blog/637334 该文中说明了mysql的配置)、hdfs集群、zookeeper(单机版就可以)

1. __common 配置:

[html] view plain copy

 

[html] view plain copy

druid.extensions.loadList=["mysql-metadata-storage","druid-hdfs-storage"]ruid.startup.logging.logProperties=truedruid.zk.service.host=10.70.27.8:2181,10.70.27.10:2181,10.70.27.12:2181druid.zk.paths.base=/druiddruid.metadata.storage.type=mysqldruid.metadata.storage.connector.connectURI=jdbc:mysql://10.70.27.12:3306/druiddruid.metadata.storage.connector.user=fooldruid.metadata.storage.connector.password=fooldruid.storage.type=hdfsdruid.storage.storageDirectory=hdfs://10.70.27.3:9000/data/druid/segmentsdruid.indexer.logs.type=hdfsdruid.indexer.logs.directory=/data/druid/indexing-logsdruid.monitoring.monitors=["io.druid.java.util.metrics.JvmMonitor"]druid.emitter=loggingdruid.emitter.logging.logLevel=infodruid.indexing.doubleStorage=double

2. coordinator 配置:

[html] view plain copy

druid.host=druid01druid.port=8081druid.service=coordinatordruid.coordinator.startDelay=PT5M

3. historical 配置:

[html] view plain copy

druid.host=druid02druid.port=8082druid.service=druid/historicaldruid.historical.cache.useCache=truedruid.historical.cache.populateCache=truedruid.processing.buffer.sizeBytes=100000000druid.processing.numThreads=3druid.server.http.numThreads=5druid.server.maxSize=300000000000druid.segmentCache.locations=[{"path": " /tmp/druid/indexCache", "maxSize": 300000000000}]druid.monitoring.monitors=["io.druid.server.metrics.HistoricalMetricsMonitor", "com.metamx.metrics.JvmMonitor"]

4. overlord 配置:

[html] view plain copy

druid.host=druid03druid.port=8090druid.service=overlorddruid.indexer.autoscale.doAutoscale=truedruid.indexer.autoscale.strategy=ec2druid.indexer.autoscale.workerIdleTimeout=PT90mdruid.indexer.autoscale.terminatePeriod=PT5Mdruid.indexer.autoscale.workerVersion=0druid.indexer.logs.type=localdruid.indexer.logs.directory=/tmp/druid/indexlogdruid.indexer.runner.type=remotedruid.indexer.runner.minWorkerVersion=0# Store all task state in the metadata storagedruid.indexer.storage.type=metadata#druid.indexer.fork.property.druid.processing.numThreads=1#druid.indexer.fork.property.druid.computation.buffer.size=100000000druid.indexer.runner.type=remote

5. middleManager 配置:

[html] view plain copy

druid.host=druid04druid.port=8091druid.service=druid/middlemanagerdruid.indexer.logs.type=localdruid.indexer.logs.directory=/tmp/druid/indexlogdruid.indexer.fork.property.druid.processing.numThreads=5druid.indexer.fork.property.druid.computation.buffer.size=100000000# Resources for peonsdruid.indexer.runner.javaOpts=-server -Xmx3gdruid.indexer.task.baseTaskDir=/tmp/persistent/task/

6. 分别启动各个节点,如果出现了启动问题,很能是因为内存问题,可适当调整java运行参数。

7. 需要导入的数据 wikipedia_data.csv , wikipedia_data.json

---wikipedia_data.json:

[html] view plain copy

{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "cancer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}

---wikipedia_data.csv:

[html] view plain copy

31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Francisco, 57, 200, -1432013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Francisc, 57, 200, -1432013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Francis, 57, 200, -1432013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Franci, 57, 200, -1432013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Franc, 57, 200, -1432013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Fran, 57, 200, -1432013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Fra, 57, 200, -1432013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Fr, 57, 200, -1432013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San F, 57, 200, -1432013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, Sa , 57, 200, -143

注意 这里导入的数据 如果保存在本机磁盘导入时,数据文件必须保存在middleManager节点上,

不然提交task后无法找到文件。

如果是从hdfs中导入,只需要先put到hdfs文件系统中。这里的overlord 节点是druid03(你可以换成ip)。

8 从本地倒入数据到druid

在任意一个节点上(保证这个节点能够访问druid03), 创建一个json的index task任务.

8.1 导入一个 本地local保存的json格式的文件,这个task的json如下所示:

8.1.1 先将数据 wikipedia_data.json 保存在middleManager节点的druid的文件夹下(比如/root/druid-0.8.3)。

vi wikipedia_index_local_json_task.json , 内容如下。

[html] view plain copy

{"type" : "index_hadoop","spec" : {"dataSchema" : {"dataSource" : "wikipedia","parser" : {"type" : "string","parseSpec" : {"format" : "json","timestampSpec" : {"column" : "timestamp","format" : "auto"},"dimensionsSpec" : {"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],"dimensionExclusions" : [],"spatialDimensions" : []}}},"metricsSpec" : [{"type" : "count","name" : "count"},{"type" : "doubleSum","name" : "added","fieldName" : "added"},{"type" : "doubleSum","name" : "deleted","fieldName" : "deleted"},{"type" : "doubleSum","name" : "delta","fieldName" : "delta"}],"granularitySpec" : {"type" : "uniform","segmentGranularity" : "DAY","queryGranularity" : "NONE","intervals" : [ "2013-08-31/2013-09-01" ]}},"ioConfig": {"type": "index","firehose": {"type": "local","baseDir": "./","filter": "wikipedia_data.json"}},"tuningConfig": {"type": "index","targetPartitionSize": 0,"rowFlushBoundary": 0}}}

8.1.2. 提交任务,前面已经说过了overlord节点在druid03主机上,所以得向 druid03 主机提交任务,命令如下:

# curl -X 'POST' -H 'Content-Type:application/json' -d @wikipedia_index_local_json_task.json druid03:8090/druid/indexer/v1/task

在overlord节点的日志上可以看出任务的情况,当出现如下信息表示任务成功

[html] view plain copy

2016-03-29T17:35:11,385 INFO [forking-task-runner-1] io.druid.indexing.overlord.ForkingTaskRunner - Logging task index_hadoop_NN_2016-03-29T17:35:11.510+08:00 output to: /tmp/persistent/task/index_hadoop_NN_2016-03-29T17:35:11.510+08:00/log2016-03-29T17:42:15,263 INFO [forking-task-runner-1] io.druid.indexing.overlord.ForkingTaskRunner - Process exited with status[0] for task: index_hadoop_NN_2016-03-29T17:35:11.510+08:002016-03-29T17:42:15,265 INFO [forking-task-runner-1] io.druid.indexing.common.tasklogs.FileTaskLogs - Wrote task log to: /tmp/druid/indexlog/index_hadoop_NN_2016-03-29T17:35:11.510+08:00.log2016-03-29T17:42:15,267 INFO [forking-task-runner-1] io.druid.indexing.overlord.ForkingTaskRunner - Removing task directory: /tmp/persistent/task/index_hadoop_NN_2016-03-29T17:35:11.510+08:002016-03-29T17:42:15,284 INFO [WorkerTaskMonitor-1] io.druid.indexing.worker.WorkerTaskMonitor - Job's finished. Completed [index_hadoop_NN_2016-03-29T17:35:11.510+08:00] with status [SUCCESS

8.2 本地导入csv格式数据的 task文件示例。

wikipedia_data.csv 需要先保存在middleManager节点的druid目录下(比如/root/druid-0.8.3)。

8.2.1 task文件 wikipedia_index_local_csv_task.json 内容如下:

[html] view plain copy

{"type": "index","spec": {"dataSchema": {"dataSource": "wikipedia","parser": {"type": "string", "parseSpec":{"format" : "csv","timestampSpec" :{"column" : "timestamp"},"columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"],"dimensionsSpec" :{"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]}}},"metricsSpec": [{"type": "count","name": "count"},{"type": "doubleSum","name": "added","fieldName": "added"},{"type": "doubleSum","name": "deleted","fieldName": "deleted"},{"type": "doubleSum","name": "delta","fieldName": "delta"}],"granularitySpec": {"type": "uniform","segmentGranularity": "DAY","queryGranularity": "NONE","intervals": ["2013-08-31/2013-09-01"]}},"ioConfig": {"type": "index","firehose": {"type": "local","baseDir": "./","filter": "wikipedia_data.csv"}},"tuningConfig": {"type": "index","targetPartitionSize": 0,"rowFlushBoundary": 0}}}

8.2.2. 提交任务,前面已经说过了overlord节点在druid03主机上,所以得向 druid03 主机提交任务,命令如下:

# curl -X 'POST' -H 'Content-Type:application/json' -d @wikipedia_index_local_csv_task.json druid03:8090/druid/indexer/v1/task

下面说一下如何从hdfs倒入csv和json格式的文件。

9 从hdfs中倒入数据到druid

9.1 导入hdfs中的json文件。

先需要把wikipedia_data.json ftp到到hdfs系统中,记住目录然后在task文件中给定路径,hdfs路径中要带有hdfs 的namenode的 名字或者ip。

这里使用vm1.cci代替namenode的ip。注意对比与本地导入task文件的区别,这些区别决定你能否导入成功。

然后和从本地倒入数据的过程一样,向overload提交任务即可。

ask.json 文件描述如下:

[html] view plain copy

{"type" : "index_hadoop","spec" : {"dataSchema" : {"dataSource" : "wikipedia","parser" : {"type" : "string","parseSpec" : {"format" : "json","timestampSpec" : {"column" : "timestamp","format" : "auto"},"dimensionsSpec" : {"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],"dimensionExclusions" : [],"spatialDimensions" : []}}},"metricsSpec" : [{"type" : "count","name" : "count"},{"type" : "doubleSum","name" : "added","fieldName" : "added"},{"type" : "doubleSum","name" : "deleted","fieldName" : "deleted"},{"type" : "doubleSum","name" : "delta","fieldName" : "delta"}],"granularitySpec" : {"type" : "uniform","segmentGranularity" : "DAY","queryGranularity" : "NONE","intervals" : [ "2013-08-31/2013-09-01" ]}},"ioConfig" : {"type" : "hadoop","inputSpec" : {"type" : "static","paths" : "hdfs://vm1.cci/tmp/druid/datasource/wikipedia_data.json"}},"tuningConfig" : {"type": "hadoop"}}}

9.2 导入hdfs中的csv格式文件。

task.json 文件描述如下:

[html] view plain copy

{"type": "index","spec": {"dataSchema": {"dataSource": "wikipedia","parser": {"type": "string", "parseSpec":{"format" : "csv","timestampSpec" :{"column" : "timestamp"},"columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"],"dimensionsSpec" :{"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]}}},"metricsSpec": [{"type": "count","name": "count"},{"type": "doubleSum","name": "added","fieldName": "added"},{"type": "doubleSum","name": "deleted","fieldName": "deleted"},{"type": "doubleSum","name": "delta","fieldName": "delta"}],"granularitySpec": {"type": "uniform","segmentGranularity": "DAY","queryGranularity": "NONE","intervals": ["2013-08-31/2013-09-01"]}},"ioConfig" : {"type" : "hadoop","inputSpec" : {"type" : "static","paths" : "hdfs://vm1.cci/tmp/druid/datasource/wikipedia_data.csv"}},"tuningConfig" : {"type": "hadoop"}}}

总结: druid.io 可以配置的项超级多,任何一个地方配置疏忽都可能会导致task失败。

这里给出四种示例,还是有必要细分其中的差别。初学者磕绊在此很难免。

注意: 如果你的druid extension用的hadoop 版本和目标的hadoop机器用的版本不一样,则必须用druid自己带的hadoop版本,否则hadoop Map Reduce任务起不来。 通过下面的两个参数指定druid 自己带的hadoop版本。本次druid自己带的是2.7.3 hadoop版本。

"tuningConfig" : {

"type" : "hadoop",

"partitionsSpec" : {

"type" : "hashed",

"targetPartitionSize" : 5000000

},

"jobProperties" : {

"mapreduce.job.classloader":"true"

}

}

},

"hadoopDependencyCoordinates": [

"org.apache.hadoop:hadoop-client:2.7.3"

]

转载请注明原文地址: https://www.6miu.com/read-4150193.html

最新回复(0)