flume版本:apache-flume-1.7.0-bin.tar hadoop版本:hadoop-2.7.3 kafka版本:kafka_2.11-0.10.2.1 zookeeper版本:zookeeper-3.4.6 最近在安装搭建flume和kafka这两款软件,网上有很多这方面的简介,在这里,我把flume—NG和kafka、hdfs整合在一起。flume作为消息采集和传输系统,将数据落地到hdfs进行备份,然后就是kafka作为消息中间件为spark-streaming提供数据支持。当我们搜集某个网站的日志的时候,我们就可以使用flume监控log的一个文件或者是一个目录,每当有新的log,flume就可以将其持久化到hdfs,然后将这个消息发给kafka,kafka在对消息进行分发,处理,实时计算等等。 在这里我准备了5台服务器,为了方便介绍,我画了一幅图,不是很好看,意思到位就行。 配置文件: flume-kafka-hdfs-client.properties
# set agent name agent.sources = r1 agent.channels = c_kafka c_hdfs agent.sinks = s_kafka_k1 s_kafka_k2 s_kafka_k3 s_hdfs_k1 s_hdfs_k2 # set group agent1.sinkgroups = g_kafka g_hdfs # set sources agent.sources.r1.channels = c_kafka c_hdfs agent.sources.r1.type = exec agent.sources.r1.command = tail -F /root/logs/a.txt agent.sources.r1.inputCharset = UTF-8 # set kafka channels agent.channels.c_kafka.type = memory agent.channels.c_kafka.capacity = 1000 agent.channels.c_kafka.transactionCapacity = 100 # set hdfs channels agent.channels.c_hdfs.type = memory agent.channels.c_hdfs.capacity = 1000 agent.channels.c_hdfs.transactionCapacity = 100 # set kafka sink1 agent.sinks.s_kafka_k1.channel = c_kafka agent.sinks.s_kafka_k1.type = avro agent.sinks.s_kafka_k1.hostname = 192.168.183.103 agent.sinks.s_kafka_k1.port = 52021 # set kafka sink2 agent.sinks.s_kafka_k2.channel = c_kafka agent.sinks.s_kafka_k2.type = avro agent.sinks.s_kafka_k2.hostname = 192.168.183.104 agent.sinks.s_kafka_k2.port = 52021 # set kafka sink3 agent.sinks.s_kafka_k3.channel = c_kafka agent.sinks.s_kafka_k3.type = avro agent.sinks.s_kafka_k3.hostname = 192.168.183.105 agent.sinks.s_kafka_k3.port = 52021 # set hdfs sink1 agent.sinks.s_hdfs_k1.channel = c_hdfs agent.sinks.s_hdfs_k1.type = avro agent.sinks.s_hdfs_k1.hostname = 192.168.183.102 agent.sinks.s_hdfs_k1.port = 52020 # set hdfs sink2 agent.sinks.s_hdfs_k1.channel = c_hdfs agent.sinks.s_hdfs_k1.type = avro agent.sinks.s_hdfs_k1.hostname = 192.168.183.103 agent.sinks.s_hdfs_k1.port = 52020 # set sink group agent.sinkgroups.g_kafka.sinks = s_kafka_k1 s_kafka_k2 s_kafka_k3 agent.sinkgroups.g_hdfs.sinks = s_hdfs_k1 s_hdfs_k2 # set failover_kafka agent.sinkgroups.g_kafka.processor.type = failover agent.sinkgroups.g_kafka.processor.priority.s_kafka_k1 = 1 agent.sinkgroups.g_kafka.processor.priority.s_kafka_k2 = 10 agent.sinkgroups.g_kafka.processor.priority.s_kafka_k3 = 100 agent.sinkgroups.g_kafka.processor.maxpenalty = 10000 # set failover_hdfs agent.sinkgroups.g_hdfs.processor.type = failover agent.sinkgroups.g_hdfs.processor.priority.s_hdfs_k1 = 1 agent.sinkgroups.g_hdfs.processor.priority.s_kafka_k2 = 10 agent.sinkgroups.g_hdfs.processor.maxpenalty = 10000flume-hdfs-server1.properties
#set Agent name hdfs1.sources = r1 hdfs1.channels = c1 hdfs1.sinks = k1 #set channel hdfs1.channels.c1.type = memory hdfs1.channels.c1.capacity = 1000 hdfs1.channels.c1.transactionCapacity = 100 # set sources hdfs1.sources.r1.type = avro hdfs1.sources.r1.bind = 192.168.183.102 hdfs1.sources.r1.port = 52020 hdfs1.sources.r1.channels = c1 #set sink to hdfs hdfs1.sinks.k1.type=hdfs hdfs1.sinks.k1.hdfs.path=hdfs://192.168.183.101:9000/flume/logs/%Y/%m/%d hdfs1.sinks.k1.hdfs.fileType=DataStream hdfs1.sinks.k1.hdfs.writeFormat=TEXT hdfs1.sinks.k1.custom.encoding = UTF-8 hdfs1.sinks.k1.channel=c1 hdfs1.sinks.k1.hdfs.filePrefix=%Y-%m-%d hdfs1.sinks.k1.hdfs.fileSuffix=.txt hdfs1.sinks.k1.hdfs.rollInterval=60 hdfs1.sinks.k1.hdfs.rollSize=1024 hdfs1.sinks.k1.hdfs.rollCount=0 hdfs1.sinks.k1.hdfs.idleTimeout=60 hdfs1.sinks.k1.hdfs.useLocalTimeStamp = trueflume-hdfs-server2.properties
#set Agent name hdfs2.sources = r1 hdfs2.channels = c1 hdfs2.sinks = k1 #set channel hdfs2.channels.c1.type = memory hdfs2.channels.c1.capacity = 1000 hdfs2.channels.c1.transactionCapacity = 100 # set sources hdfs2.sources.r1.type = avro hdfs2.sources.r1.bind = 192.168.183.103 hdfs2.sources.r1.port = 52020 hdfs2.sources.r1.channels = c1 #set sink to hdfs hdfs2.sinks.k1.type=hdfs hdfs2.sinks.k1.hdfs.path=hdfs://192.168.183.101:9000/flume/logs/%Y/%m/%d hdfs2.sinks.k1.hdfs.fileType=DataStream hdfs2.sinks.k1.hdfs.writeFormat=TEXT hdfs2.sinks.k1.custom.encoding = UTF-8 hdfs2.sinks.k1.channel=c1 hdfs2.sinks.k1.hdfs.filePrefix=%Y-%m-%d hdfs2.sinks.k1.hdfs.fileSuffix=.txt hdfs2.sinks.k1.hdfs.rollInterval=60 hdfs2.sinks.k1.hdfs.rollSize=1024 hdfs2.sinks.k1.hdfs.rollCount=0 hdfs2.sinks.k1.hdfs.idleTimeout=60 hdfs2.sinks.k1.hdfs.useLocalTimeStamp = trueflume-kafka-server1.properties
#set kafka1 name kafka1.sources = r1 kafka1.channels = c1 kafka1.sinks = k1 #set channel kafka1.channels.c1.type = memory kafka1.channels.c1.capacity = 10000 kafka1.channels.c1.transactionCapacity = 1000 # set sources kafka1.sources.r1.type = avro kafka1.sources.r1.bind = 192.168.183.103 kafka1.sources.r1.port = 52021 kafka1.sources.r1.channels = c1 # set sink to kafka kafka1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink kafka1.sinks.k1.metadata.broker.list= node3:9092,node4:9092,node5:9092 kafka1.sinks.k1.kafka.bootstrap.servers = node3:9092,node4:9092,node5:9092 kafka1.sinks.k1.partition.key=0 kafka1.sinks.k1.partitioner.class=org.apache.flume.plugins.SinglePartition kafka1.sinks.k1.serializer.class=kafka.serializer.StringEncoder kafka1.sinks.k1.request.required.acks=0 kafka1.sinks.k1.max.message.size=1000000 kafka1.sinks.k1.producer.type=sync kafka1.sinks.k1.custom.encoding=UTF-8 #kafka1.sinks.k1.custom.topic.name=test kafka1.sinks.k1.kafka.topic=test kafka1.sinks.k1.channel = c1 kafka1.sinks.k1.zkconnect = node1:2181,node2:2181,node3:2181,node4:2181,node5:2181flume-kafka-server2.properties
#set kafka2 name kafka2.sources = r1 kafka2.channels = c1 kafka2.sinks = k1 #set channel kafka2.channels.c1.type = memory kafka2.channels.c1.capacity = 10000 kafka2.channels.c1.transactionCapacity = 1000 # set sources kafka2.sources.r1.type = avro kafka2.sources.r1.bind = 192.168.183.104 kafka2.sources.r1.port = 52021 kafka2.sources.r1.channels = c1 # set sink to kafka kafka2.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink kafka2.sinks.k1.metadata.broker.list= node3:9092,node4:9092,node5:9092 kafka2.sinks.k1.kafka.bootstrap.servers = node3:9092,node4:9092,node5:9092 kafka2.sinks.k1.partition.key=0 kafka2.sinks.k1.partitioner.class=org.apache.flume.plugins.SinglePartition kafka2.sinks.k1.serializer.class=kafka.serializer.StringEncoder kafka2.sinks.k1.request.required.acks=0 kafka2.sinks.k1.max.message.size=1000000 kafka2.sinks.k1.producer.type=sync kafka2.sinks.k1.custom.encoding=UTF-8 #kafka2.sinks.k1.custom.topic.name=test kafka2.sinks.k1.kafka.topic=test kafka2.sinks.k1.channel = c1 kafka2.sinks.k1.zkconnect = node1:2181,node2:2181,node3:2181,node4:2181,node5:2181flume-kafka-server3.properties
#set kafka3 name kafka3.sources = r1 kafka3.channels = c1 kafka3.sinks = k1 #set channel kafka3.channels.c1.type = memory kafka3.channels.c1.capacity = 10000 kafka3.channels.c1.transactionCapacity = 1000 # set sources kafka3.sources.r1.type = avro kafka3.sources.r1.bind = 192.168.183.105 kafka3.sources.r1.port = 52021 kafka3.sources.r1.channels = c1 # set sink to kafka kafka3.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink kafka3.sinks.k1.metadata.broker.list= node3:9092,node4:9092,node5:9092 kafka3.sinks.k1.kafka.bootstrap.servers = node3:9092,node4:9092,node5:9092 kafka3.sinks.k1.partition.key=0 kafka3.sinks.k1.partitioner.class=org.apache.flume.plugins.SinglePartition kafka3.sinks.k1.serializer.class=kafka.serializer.StringEncoder kafka3.sinks.k1.request.required.acks=0 kafka3.sinks.k1.max.message.size=1000000 kafka3.sinks.k1.producer.type=sync kafka3.sinks.k1.custom.encoding=UTF-8 #kafka3.sinks.k1.custom.topic.name=test kafka3.sinks.k1.kafka.topic=test kafka3.sinks.k1.channel = c1 kafka3.sinks.k1.zkconnect = node1:2181,node2:2181,node3:2181,node4:2181,node5:2181程序启动顺序: 1、启动zookeeper 2、启动hdfs 3、启动kafka 4、首先启动flume的server
启动命令: 1、node2节点 flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-hdfs-server1.properties --name hdfs1 -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-hdfs-server1.log 2>&1 & 2、node3节点 flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-hdfs-server2.properties --name hdfs2 -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-hdfs-server2.log 2>&1 & flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-kafka-server1.properties --name kafka1 -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-kafka-server1.log 2>&1 & 3、node4节点 flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-kafka-server2.properties --name kafka2 -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-kafka-server2.log 2>&1 & 4、node5节点 flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-kafka-server3.properties --name kafka3 -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-kafka-server3.log 2>&1 &5、然后启动flume的client
1、node1节点 flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-kafka-hdfs-client.properties --name agent -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-kafka-hdfs-client.log 2>&1 & 2、node2节点 flume-ng agent --conf conf --conf-file /root/myInstall/flume-1.7.0-bin/properties/flume-kafka-hdfs-client.properties --name agent -Dflume.root.logger=INFO,console > /root/myInstall/flume-1.7.0-bin/logs/flume-kafka-hdfs-client.log 2>&1 &好了,至此flume整合hdfs和kafka就搭建好了!
