vi netcat-logger.conf
# 定义这个agent中各组件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 描述和配置source组件:r1的采集类型为 netcat 采集的主机是本机 端口是44444 #如果想采集其他的主机上bind那写0.0.0.0绑定本机的所有ip。相当于与那个主机开了一个tcp socket通信 服务器是绑定的这台主机,主机端口是44444 客户端主机只需要telnet min1 44444即可 #flume的netcat source会自动创建一个socket Server,只需将数据发送到此socket,flume的netcat source 就能获取数据。 配置bind是绑定服务器主机和端口可以是localhost min1 0.0.0.0 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # 描述和配置sink组件:k1 a1.sinks.k1.type = logger # 描述和配置channel组件,此处使用是内存缓存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 描述和配置source channel sink之间的连接关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1-c conf 指定flume自身的配置文件所在目录
-f conf/netcat-logger.con 指定我们所描述的采集方案
-n a1 指定我们这个agent的名字 -Dflume.root.logger=INFO,console 这里是传给打log日志的参数,如果sink不是logger则不要写这个参数
输入 telnet localhost 44444 telnet就是往屏幕上打字然后flume去采集 可以看到数据已经打到屏幕上了 如果没有telnet则需要安装 telnet的客户端和服务端: - 1.rpm -qa | grep telnet 查看有没有安装telnet的rpm包如果没有则: - 2.sudo yum install telnet #这是安装客户端 - 3.sudo yum install telnet-server #这是安装服务端 - 4.重启xinetd服务 sudo service xinetd restart
问题:source里面bind min2没用出错: org.apache.flume.FlumeException: java.net.BindException: Cannot assign requested address 但是绑定所有主机则有用为什么? 原因是 安装flume的是服务端 里面绑定的一定是本机地址 可以是localhost或min1
source为spooldir跟踪一个目录/home/hadoop/logs这个目录必须存在 采集到之后logs文件夹中的文件名将带有.complete后缀(这个.complete后缀可以在配置文件中修改) logs里面的文件名不能重复
采集数据到hdfs source是exex
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source #exec 指的是命令 a1.sources.r1.type = exec #F根据文件名追踪, f根据文件的nodeid追踪 a1.sources.r1.command = tail -F /home/hadoop/logs/test.log # 配置sink a1.sinks.k1.type = hdfs #指定下沉到hdfs的目录, flume帮我们做目录的替换 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/ #文件的命名, 前缀 a1.sinks.k1.hdfs.filePrefix = events- #10 分钟就改目录 这是目录滚动的配置 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute #下面sink的3个配置都是文件的滚动设置 #文件滚动之前的等待时间(秒) a1.sinks.k1.hdfs.rollInterval = 3 #文件滚动的大小限制(bytes) a1.sinks.k1.hdfs.rollSize = 500 #写入多少个event数据后滚动文件(事件个数) a1.sinks.k1.hdfs.rollCount = 20 #5个事件从channel就往里面sink地点(hdfs)里写入 a1.sinks.k1.hdfs.batchSize = 5 #用本地时间格式化目录 a1.sinks.k1.hdfs.useLocalTimeStamp = true #下沉后, 生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1启动命令:
bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1 1.在/home/hadoop/logs/test.log文件创建这个文件2.在test.log下产生数据 while true do echo 111111 >> /home/hadoop/logs/test.log sleep 0.5 done 3.启动flume启动hdfs 可以看到hdfs中产生的数据启动flume:先启动服务端后启动客户端 - min2上:
bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console min1 上 bin/flume-ng agent -c conf -f conf/tail-avro.conf -n a1 # 往/home/hadoop/logs/test.log里打数据 while true do echo 111111 >> /home/hadoop/logs/test.log sleep 0.5 done总结: 在source端的绑定的主机和端口都是服务端、服务端只能绑定本机ip 在sink绑定的主机和端口都是客户端 sink是数据的发送者 注意agent的名字一定不能写错