日志采集框架Flume是一个分布式(distributed)、高可靠(reliable)和高可用的(available )海量日志(efficiently )采集、聚合和传输的系统,Flume可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFS、hbase、hive、kafka等众多外部存储系统中。
Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成
每一个agent相当于一个数据传递员,内部有三个组件
Source: Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event)里,然后将事件推入Channel中。 常用Source类型 HTTP Source: 读取syslog 数据,产生Event,支持UDP和TCP两种协议 Spooling Directory Source: 监控指定目录内数据变更Channel: 主要提供一个队列的功能,对source提供中的数据进行简单的缓存。 Channel是中转Event的一个临时存储,保存由Source组件传递过来的Event。(Channel连接Source和Sink的组件,可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直到Sink处理完该事件。介绍两个较为常用的Channel, MemoryChannel和FileChannel(MemoryChannel可以实现高速的吞吐, 但是无法保证数据完整性;MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。 常用channel 类型 Memory Channel Event数据存储在内存中 JDBC Channel Event数据存储在持久化存储中,当前flume channel内置支持Derby File Channel Event 数数据存储在磁盘文件中Sink: 取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。可靠性
Channels提供了Flume可靠性保障,默认Channels的保障模式为Memory Channel,MemoryChannel就是内存,将所有的数据存放在里面,但是它本身存在缺陷,如果断电数据将会丢失。那怎么解决这个问题呢? Channels还有另外一种模式,就是基于磁盘的Channels,基于磁盘的队列确保当出现断电时数据不丢失,但是在这里我们必须明确Memory的性能是比磁盘高的。
Agent和Channel之间的数据传输是事务性的,就是在传输数据的过程中如果出现了故障,失败的数据会回滚和重试,不会丢失。事务就是保证我们的源到目标整体是完整的,要么一起成功,要么一起失败。
相同的任务可以配置多个Agent。比如,两个agent完成一个数据采集作业,如果一个agent失败,则上游的agent会失败切换到另一个。
下载解压: http://flume.apache.org/FlumeUserGuide.html 从官网下载 apache-flume-1.6.0-bin.tar.gz, 解压 tar -zxvf apache-flume-1.6.0-bin.tar.gz
进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME,复制一份 flume-env.sh.template 并改名 flume-env.sh vi flume-env.sh 加上一句 export JAVA_HOME=/usr/local/jdk7/(你自己的安装位置)
修改flume-conf配置文件,conf目录下修改flume-conf.properties.template文件,复制并改名为 flume-conf.properties
采集方案配置:从网络端口接收数据,下沉到logger。
# example.conf: A single-node Flume configuration # Name the components on this agent #给那三个组件取个名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source #类型, 从网络端口接收数据,在本机启动, 所以localhost, type=spoolDir采集目录源,目录里有就采 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory #下沉的时候是一批一批的, 下沉的时候是一个个eventChannel参数解释: #capacity:默认该通道中最大的可以存储的event数量 #trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1在flume的安装目录运行如下命令: bin/flume-ng agent –conf conf –conf-file conf/netcat-logger.conf –name a1 -Dflume.root.logger=INFO,console 注意: -Dflume a1 可以看做是flume服务的名称 netcat-logger.conf 配置文件
监听文件夹配置
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source #监听目录,spoolDir指定目录, fileHeader要不要给文件夹前坠名 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/hadoop/flumespool a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1启动命令: bin/flume-ng agent – conf conf/ – conf-file conf/netcat-logger.conf –name a1 –Dflume.root.logger=INFO,console
http://flume.apache.org/FlumeUserGuide.html,
需求:把A服务器上的日志采集到B服务器上(web服务器上的日志采集到hadoop集群服务器上)
需要在:需要在web服务器上搭建一个agent,在hadoop集群上搭建一个agent
模拟测试: 参数如下
web服务器: source:exec , sink:avro(夸节点sink) , channel memo
hadoop集群服务器:source:avro , sink:logger , channel memo
flume-web.properties 配置文件:
exec_memmory_avro.sources = exec_source exec_memmory_avro.sinks = avro_sink exec_memmory_avro.channels = memory_channel exec_memmory_avro.sources.exec_source.type = exec exec_memmory_avro.sources.exec_source.command = tail -F /hadoopData/exe.log exec_memmory_avro.sinks.avro_sink.type = avro exec_memmory_avro.sinks.avro_sink.hostname = 192.168.126.32 exec_memmory_avro.sinks.avro_sink.port = 3033 exec_memmory_avro.channels.memory_channel.type = memory exec_memmory_avro.channels.memory_channel.capacity = 1000 exec_memmory_avro.channels.memory_channel.transactionCapacity = 100 exec_memmory_avro.sources.exec_source.channels = memory_channel exec_memmory_avro.sinks.avro_sink.channel = memory_channelflume-hadoop-cluster.properties配置文件:
avro_memory_logger.sources = avro_source avro_memory_logger.sinks = logger_sink avro_memory_logger.channels = memory_chanel avro_memory_logger.sources.avro_source.type = avro avro_memory_logger.sources.avro_source.bind =192.168.126.31 avro_memory_logger.sources.avro_source.port = 3033 avro_memory_logger.sinks.logger_sink.type = logger avro_memory_logger.channels.memory_chanel.type = memory avro_memory_logger.channels.memory_chanel.capacity = 1000 avro_memory_logger.channels.memory_chanel.transactionCapacity = 100 avro_memory_logger.sources.avro_source.channels = memory_chanel avro_memory_logger.sinks.logger_sink.channel = memory_chanel