flume的安装和使用

xiaoxiao2021-02-28  50

flume的安装及配置

flume概述:

Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。Flume可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFShbase、hive、kafka等众多外部存储系统中一般的采集需求,通过对flume的简单配置即可实现Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景

flume的运行机制:

1、Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成每一个agent相当于一个数据传递员(Source 到 Channel 到 Sink之间传递数据的形式是Event事件,Event事件是一个数据流单元),内部有三个组件: Source:采集源,用于跟数据源对接,以获取数据Sink:下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往最终存储系统传递数据Channel:angent内部的数据传输通道,用于从source将数据传递到sink

flume的安装与部署

1、Flume的安装非常简单,只需要解压即可,当然,前提是已有hadoop环境 上传安装包到数据源所在节点上 然后解压 tar -zxvf apache-flume-1.6.0-bin.tar.gz 然后进入flume的目录,修改conf下的flume-env.sh ,在里面配置JAVA_HOME2、根据数据采集的需求配置采集方案,描述在配置文件中(文件名可任意自定义)3、指定采集方案配置文件,在相应的节点上启动flume agent
1、先在flume的conf目录下新建一个文件

vi netcat-logger.conf

# 定义这个agent中各组件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 描述和配置source组件:r1的采集类型为 netcat 采集的主机是本机 端口是44444 #如果想采集其他的主机上bind那写0.0.0.0绑定本机的所有ip。相当于与那个主机开了一个tcp socket通信 服务器是绑定的这台主机,主机端口是44444 客户端主机只需要telnet min1 44444即可 #flume的netcat source会自动创建一个socket Server,只需将数据发送到此socket,flume的netcat source 就能获取数据。 配置bind是绑定服务器主机和端口可以是localhost min1 0.0.0.0 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # 描述和配置sink组件:k1 a1.sinks.k1.type = logger # 描述和配置channel组件,此处使用是内存缓存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 描述和配置source channel sink之间的连接关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2、启动agent去采集数据
bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console

-c conf 指定flume自身的配置文件所在目录

-f conf/netcat-logger.con 指定我们所描述的采集方案

-n a1 指定我们这个agent的名字 -Dflume.root.logger=INFO,console 这里是传给打log日志的参数,如果sink不是logger则不要写这个参数

3、测试:

输入 telnet localhost 44444 telnet就是往屏幕上打字然后flume去采集 可以看到数据已经打到屏幕上了 如果没有telnet则需要安装 telnet的客户端和服务端: - 1.rpm -qa | grep telnet 查看有没有安装telnet的rpm包如果没有则: - 2.sudo yum install telnet #这是安装客户端 - 3.sudo yum install telnet-server #这是安装服务端 - 4.重启xinetd服务 sudo service xinetd restart

总结:
flume的关键是配置source channel sink其中source里面的type是指定采集的类型是什么采集的类型有很多种 有telnet exec 等等。bind是指定采集的主机 port是端口sink是采集到的数据放在哪里去 下沉地可以是hdfs logger hive等等。注意启动flume的命令。

问题:source里面bind min2没用出错: org.apache.flume.FlumeException: java.net.BindException: Cannot assign requested address 但是绑定所有主机则有用为什么? 原因是 安装flume的是服务端 里面绑定的一定是本机地址 可以是localhost或min1


flume进阶

1、spooldir-logger

#定义三大组件的名称 agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # 配置source组件 agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /home/hadoop/logs/ agent1.sources.source1.fileHeader = false # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 #配置sink agent1.sinks.sink1.type = logger # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1

source为spooldir跟踪一个目录/home/hadoop/logs这个目录必须存在 采集到之后logs文件夹中的文件名将带有.complete后缀(这个.complete后缀可以在配置文件中修改) logs里面的文件名不能重复

2、tail-hdfs

采集数据到hdfs source是exex

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source #exec 指的是命令 a1.sources.r1.type = exec #F根据文件名追踪, f根据文件的nodeid追踪 a1.sources.r1.command = tail -F /home/hadoop/logs/test.log # 配置sink a1.sinks.k1.type = hdfs #指定下沉到hdfs的目录, flume帮我们做目录的替换 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/ #文件的命名, 前缀 a1.sinks.k1.hdfs.filePrefix = events- #10 分钟就改目录 这是目录滚动的配置 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute #下面sink的3个配置都是文件的滚动设置 #文件滚动之前的等待时间(秒) a1.sinks.k1.hdfs.rollInterval = 3 #文件滚动的大小限制(bytes) a1.sinks.k1.hdfs.rollSize = 500 #写入多少个event数据后滚动文件(事件个数) a1.sinks.k1.hdfs.rollCount = 20 #5个事件从channel就往里面sink地点(hdfs)里写入 a1.sinks.k1.hdfs.batchSize = 5 #用本地时间格式化目录 a1.sinks.k1.hdfs.useLocalTimeStamp = true #下沉后, 生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

启动命令:

bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1 1.在/home/hadoop/logs/test.log文件创建这个文件2.在test.log下产生数据 while true do echo 111111 >> /home/hadoop/logs/test.log sleep 0.5 done 3.启动flume启动hdfs 可以看到hdfs中产生的数据

3.多级agent的串联:

1.有两个配置文件,第一个配置文件tail-avro.conf配置的是从exec往avro上发:2.第二个配置文件是avro-logger.conf 配置的是从avro往logger上发。3.在min2上启动 avro-logger.conf 在 min1上启动tail-avro.conf 这样就实现了多级agent的串联。

tail-avro.conf

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/hadoop/logs/test.log # Describe the sink # 绑定的不是本机, 是另外一台机器的服务地址, sink端的avro是一个发送端avro的客户端 # 将数据发送到min2上的4141端口 a1.sinks = k1 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = min2 a1.sinks.k1.port = 4141 a1.sinks.k1.batch-size = 2 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

avro-logger.conf

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source #source中的avro组件是接收者服务端, 绑定本机 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 #绑定本机所有ip a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

启动flume:先启动服务端后启动客户端 - min2上:

bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console min1 上 bin/flume-ng agent -c conf -f conf/tail-avro.conf -n a1 # 往/home/hadoop/logs/test.log里打数据 while true do echo 111111 >> /home/hadoop/logs/test.log sleep 0.5 done

总结: 在source端的绑定的主机和端口都是服务端、服务端只能绑定本机ip 在sink绑定的主机和端口都是客户端 sink是数据的发送者 注意agent的名字一定不能写错

转载请注明原文地址: https://www.6miu.com/read-1450311.html

最新回复(0)