Flume常用场景

xiaoxiao2021-02-28  63

1 使用正则表达式即按照时间戳保存

Source:Spooling Directory Sink:hdfs Channel:memory 配置文件 a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/home/hadoop/data/spool_data a1.sources.r1.ignorePattern=^(.)*\\.txt //忽略本地目录下以.txt结尾的文件 a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.tarasactionCapacity=10000 a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path=hdfs://hadoop:9000/flume/logs/%Y%m%d%H%M //%Y%m%d%H%M代表年月日分钟 a1.sinks.k1.hdfs.useLocalTimeStamp=true //设置了%Y%m%d%H%M就需要打开一个时间戳,默认为false a1.sinks.k1.hdfs.batchSize=10 a1.sinks.k1.hdfs.fileType=CompressedStream //文件格为CompressedStream要配置一个hdfs.codeC值,即压缩格式 a1.sinks.k1.hdfs.codeC=bzip2 //采用bzip2格式,文件名会加.bzip格式 a1.sinks.k1.hdfs.writeFormat=Text a1.sinks.k1.hdfs.filePrefix=page-views //文件名前缀为page-views a1.sinks.k1.hdfs.rollInterval=0 //每隔多长时间,临时文件滚动成目标文件 a1.sinks.k1.hdfs.rollSize=10485760 //当临时文件达到该大小时,滚动成目标文件 a1.sinks.k1.hdfs.rollCount=10000 // 当events数据达到该数量时候,将临时文件滚动成目标文件 a1.sinks.k1.channel=c1 a1.sources.r1.channels=c1 执行命令 创建监测目录 [hadoop@hadoop data]$ pwd /home/hadoop/data [hadoop@hadoop data]$ touch spool_data 创建输出目录 [hadoop@hadoop data]$ hdfs dfs -mkdir /flume/logs/ 执行命令 ./flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/test4.conf -Dflume.root.logger=INFO,console cp ../page_views.dat 1 cp ../page_views.dat 2 cp ../page_views.dat 3 cp ../input.txt 结果 [hadoop@hadoop ~]$ hdfs dfs -ls /flume/logs Found 3 items drwxr-xr-x - hadoop supergroup 0 2018-04-22 00:53 /flume/logs/201804220052 drwxr-xr-x - hadoop supergroup 0 2018-04-22 00:53 /flume/logs/201804220053 drwxr-xr-x - hadoop supergroup 0 2018-04-22 00:54 /flume/logs/201804220054

注意:配置文件大家不要写错了哦。

2 一台机器向另一台机器的文件传输(Avro)

A机器:avro-client B机器:avro-source ==> channel(memory) ==>sink(logger) A机器向B机器传输日志 - 配置文件

B机器的agent文件: a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=avro a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=44444 a1.channels.c1.type=memory a1.sinks.k1.type=logger a1.sinks.k1.channel=c1 a1.sources.r1.channels=c1 执行命令 ./flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/conf/avro.conf \ -Dflume.root.logger=INFO,console 在a机器上执行: ./flume-ng avro-client --host 0.0.0.0 --port 44444 --filename /home/hadoop/data/input.txt

注意:这种方式只能传一次,完了就会中断。这种方式在生产上肯定是不行的,那该如何是好呢,下面我们介绍另外一种方式。

3 A机器到另B机器的文件传输(avro),不中断

配置文件 定义A机器的agent: a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=exec a1.sources.r1.command=tail -F /home/hadoop/data/data.log a1.channels.c1.type=memory a1.sinks.k1.type=avro a1.sinks.k1.bind=0.0.0.0 //与B机器像对应 a1.sinks.k1.port=44444 //与B机器相对应 a1.sinks.k1.channel=c1 a1.sources.r1.channels=c1 定义B机器的agent: b1.sources=r1 b1.sinks=k1 b1.channels=c1 b1.sources.r1.type=avro a1.sources.r1.bind = 0.0.0.0 //与A机器像对应 a1.sources.r1.port = 44444 //与A机器像对应 b1.channels.c1.type=memory b1.sinks.k1.type=logger b1.sinks.k1.channel=c1 b1.sources.r1.channels=c1 执行命令 b机器执行: ./flume-ng agent \ --name b1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/conf/avro_source.conf \ -Dflume.root.logger=INFO,console 然后在A机器上执行: ./flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/conf/avro_sink.conf \ -Dflume.root.logger=INFO,console [hadoop@hadoop data]$ echo aaa > data.log [hadoop@hadoop data]$ echo 112121 > data.log A机器目标目录下文件内容会被打印在B机器的控制台上

4 收集windows产生的日志:

官方网址,我们可以借助于flume的log4j.appender来实现

配置文件 定义agent文件: b1.sources=r1 b1.sinks=k1 b1.channels=c1 b1.sources.r1.type=avro b1.sources.r1.bind=0.0.0.0 b1.sources.r1.port=44444 b1.channels.c1.type=memory b1.sinks.k1.type=logger b1.sinks.k1.channel=c1 b1.sources.r1.channels=c1 导入依赖 <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-clients/flume-ng-log4jappender --> <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.6.0</version> </dependency> 创建log4j.properties 此时在项目工程中新建一个resources,选择resources,单击搜索框左边,选择Modules,找到resources,将其选择Mask as Resources。 在resources下新建一个log4j.properties,将log4j数据导入到里面。 //log4j.properties添加: log4j.rootCategory=INFO, console, flume log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = hadoop log4j.appender.flume.Port = 44444 log4j.appender.flume.UnsafeMode = true 启动命令 ./flume-ng agent \ --name b1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/conf/avro_source.conf \ -Dflume.root.logger=INFO,console 运行主函数查看结果

提示:这时候运行可能会出现如下错误

log4j:ERROR Could not find value for key log4j.appender.flume.layout 18/05/03 11:24:45 WARN NettyAvroRpcClient: Using default maxIOWorkers log4j:ERROR RPC client creation failed! NettyAvroRpcClient { host: hadoop, port: 44444 }: RPC connection error 18/05/03 11:24:46 INFO Flume_LogApp: current value is:0 log4j:ERROR Cannot Append to Appender! Appender either closed or not setup correctly! 18/05/03 11:24:47 INFO Flume_LogApp: current value is:1 log4j:ERROR Cannot Append to Appender! Appender either closed or not setup correctly! 18/05/03 11:24:48 INFO Flume_LogApp: current value is:2 log4j:ERROR Cannot Append to Appender! Appender either closed or not setup correctly!

错误原因: rpc通信失败,可能是本地没有配置ip和主机名的映射关系,修改: log4j.appender.flume.Hostname = hadoop ,把hadoop主机名修改为ip即可。

转载请注明原文地址: https://www.6miu.com/read-2625853.html

最新回复(0)