Flume官方文档上写的配置写法适用于es版本<2.0 我用的是flume1.6和ES2.2版本,会报错
解决方案:
Flume-NG Sink for Elasticsarch >= 2.0: https://github.com/lucidfrontier45/ElasticsearchSink2
Requirements: Flume-NG >= 1.6 Elasticsearch >= 2.0
JAR Build: Build standard jar by the following command
$ ./gradlew build
Build fat jar which contains elasticsearch dependencies
$ ./gradlew assembly
Jar will be generated in build/libs
把jar包放到flume的lib下,然后 remove guava-.jar and jackson-core-.jar in flume’s default libs dir. They are outdated and newer version are included in Elasticsearch.
把ES目录lib下的jar都放到flume的lib下
然后就可以开始写配置文件了
新建一个flume的.properties文件
vim flume-conf-kafka2es.properties
# ------------------- 定义数据流---------------------- # source的名字 flume2es_agent.sources = source_from_kafka # channels的名字,建议按照type来命名 flume2es_agent.channels = mem_channel # sink的名字,建议按照目标来命名 flume2es_agent.sinks = es_sink #auto.commit.enable = true #-------- kafkaSource相关配置----------------- # 定义消息源类型 # For each one of the sources, the type is defined flume2es_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource flume2es_agent.sources.source_from_kafka.channels = mem_channel flume2es_agent.sources.source_from_kafka.batchSize = 5000 # 定义kafka所在的地址 flume2es_agent.sources.source_from_kafka.kafka.bootstrap.servers = 192.168.2.xx:9092,192.168.2.xx:9092 # 配置消费的kafka topic #flume2es_agent.sources.source_from_kafka.topic = itil_topic_4097 flume2es_agent.sources.source_from_kafka.kafka.topics = mtopic # 配置消费的kafka groupid #flume2es_agent.sources.source_from_kafka.groupId = flume4097 flume2es_agent.sources.source_from_kafka.kafka.consumer.group.id = flumetest #---------ES Sink 相关配置------------------ # The channel can be defined as follows. #flume2es_agent.sinks.es_sink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink flume2es_agent.sinks.es_sink.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink # 指定sink需要使用的channel的名字,注意这里是channel #Specify the channel the sink should use flume2es_agent.sinks.es_sink.channel = mem_channel #flume2es_agent.sinks.es_sink.filePrefix = %{host} flume2es_agent.sinks.es_sink.hostNames = cdh1,cdh2,cdh3,cdh4,cdh5,cdh6 flume2es_agent.sinks.es_sink.clusterName = elasticsearch flume2es_agent.sinks.es_sink.indexName = items flume2es_agent.sinks.es_sink.indexTypeitems = item flume2es_agent.sinks.es_sink.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer flume2es_agent.sinks.es_sink.indexNameBuilder = com.frontier45.flume.sink.elasticsearch2.TimeBasedIndexNameBuilder #File size to trigger roll, in bytes (0: never roll based on file size) flume2es_agent.sinks.es_sink.batchSize = 500 #------- memoryChannel相关配置------------------------- # channel类型 # Each channel's type is defined. flume2es_agent.channels.mem_channel.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # channel存储的事件容量 # In this case, it specifies the capacity of the memory channel flume2es_agent.channels.mem_channel.capacity = 100000 # 事务容量 flume2es_agent.channels.mem_channel.transactionCapacity = 10000运行:
>flume-ng agent -n flume2HDFS_agent -f flume-conf-kafka2es.properties
控制台打开一个producer发送消息试一下吧:
>kafka-console-producer --broker-list cdh5:9092 --topic mtopic