Flume-1.7.0的安装和配置

xiaoxiao2021-02-28  59

Flume是什么

  Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation);改动的另一原因是将 Flume 纳入 apache 旗下,cloudera Flume 改名为 Apache Flume。


Flume的特点

  Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。   Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。


Flume的可靠性

  当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Besteffort(数据发送到接收方后,不会进行确认)。


Flume的可恢复性:

  还是靠Channel。推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。   Flume的一些核心概念: Agent:使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。 Client:生产数据,运行在一个独立的线程。 Source:从Client收集数据,传递给Channel。 Sink:从Channel收集数据,运行在一个独立线程。 Channel:连接 sources 和 sinks ,这个有点像一个队列。 Events:可以是日志记录、 avro 对象等。

准备

1.搭建好的Hadoop分布式集群2.apache-flume-1.7.0-bin.tar.gz

安装

#tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local/hadoop #cd /usr/local/hadoop #mv apache-flume-1.7.0-bin flume-1.7.0

配置

1.修改flume-env.sh #cd /usr/local/hadoop/flume-1.7.0/conf #mv flume-env.sh.template flume-env.sh #vim flume-env.sh

添加Java环境变量

export JAVA_HOME=/usr/local/jvm/jdk1.8.0_144 2.配置flume环境变量 #vim /etc/profile

添加如下记录

FLUME_HOME=/usr/local/hadoop/flume-1.7.0 export PATH=$FLUME_HOME/bin:$PATH export FLUME_CONF_DIR=$FLUME_HOME/conf

使配置文件生效

#source /etc/profile

验证

[root@Slave1 ~]# flume-ng version Flume 1.7.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707 Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016 From source with checksum 0d21b3ffdc55a07e1d08875872c00523 [root@Slave1 ~]#

如上则表示安装配置成功

Flume案例

1.编写配置文件 #vim logger.conf # Name the components on this agent agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure the source agent1.sources.source1.type = netcat agent1.sources.source1.bind = Slave1 agent1.sources.source1.port = 44444 # Describe the sink agent1.sinks.sink1.type = logger # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 1000 //默认该通道中最大可以存储的event数量 agent1.channels.channel1.transactionCapacity = 100 //每次最大可以从source中拿到或者送到sink中的event数量 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 //注意这里的channels和channel 2.测试

启动

[root@Slave1 flume-1.7.0]# bin/flume-ng agent --conf conf --conf-file conf/logger.conf --name agent1 -Dflume.root.logger=INFO,console 参数 agent:启动一个agent --conf:指定配置文件,加载一些默认的参数 --conf-file:指定采集方案路径 --name:启动的agent的名字 -Dflume.root.logger=INFO,console(实际生产中用不到)

产生信息

[root@Slave2 conf]# telnet Slave1 44444 Trying 192.168.142.131... Connected to Slave1. Escape character is '^]'. luchao OK abcdefg OK hello world OK ABCDEFGHIJKLMNOPQRSTUVWXYZ OK

接收信息

2017-09-01 03:15:20,001 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6C 75 63 68 61 6F 0D luchao. } 2017-09-01 03:15:26,551 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 61 62 63 64 65 66 67 0D abcdefg. } 2017-09-01 03:15:36,170 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 0D hello world. } 2017-09-01 03:16:14,197 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 41 42 43 44 45 46 47 48 49 4A 4B 4C 4D 4E 4F 50 ABCDEFGHIJKLMNOP }

注:“ABCDEFGHIJKLMNOPQRSTUVWXYZ”没有接收完全的原因是netcat默认接收的长度限制。

Over

转载请注明原文地址: https://www.6miu.com/read-53161.html

最新回复(0)