flume实用案例(比较全)

xiaoxiao2021-02-27  221

目录

案例1:Avro

案例2:Spool

案例3:Exec

案例4:Syslogtcp

案例5:JSONHandler

案例6:Hadoop sink

案例7:File Roll Sink

案例8:Replicating Channel Selector

案例9:Multiplexing Channel Selector

案例10:Failover Sink Processor

案例11:Load balancing Sink Processor

案例12:Hbase sink


 

官方网站:http://flume.apache.org/

Apache版1.6.0下载地址:http://www.apache.org/dyn/closer.cgi/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz

 

案例1:Avro

Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。

 

a)创建agent配置文件

[hadoop@h71 conf]$ vi avro.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 192.168.8.71 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

b)启动flume agent a1 [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f conf/avro.conf -n a1 -Dflume.root.logger=INFO,console c)创建指定文件 [hadoop@h71 ~]$ touch log.00 [hadoop@h71 ~]$ echo "hello world" > /home/hadoop/log.00 d)使用avro-client发送文件 [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng avro-client -c conf/ -H h71 -p 4141 -F /home/hadoop/log.00 f)因为sink为logger模式,所以它会把数据写入到日志文件中。所以在控制台,可以看到以下信息,注意最后一行

12/12/13 02:00:34 INFO source.AvroSource: Avro source r1 started. 12/12/13 02:03:42 INFO ipc.NettyServer: [id: 0x060035f0, /192.168.8.71:56184 => /192.168.8.71:4141] OPEN 12/12/13 02:03:42 INFO ipc.NettyServer: [id: 0x060035f0, /192.168.8.71:56184 => /192.168.8.71:4141] BOUND: /192.168.8.71:4141 12/12/13 02:03:42 INFO ipc.NettyServer: [id: 0x060035f0, /192.168.8.71:56184 => /192.168.8.71:4141] CONNECTED: /192.168.8.71:56184 12/12/13 02:03:42 INFO ipc.NettyServer: [id: 0x060035f0, /192.168.8.71:56184 :> /192.168.8.71:4141] DISCONNECTED 12/12/13 02:03:42 INFO ipc.NettyServer: [id: 0x060035f0, /192.168.8.71:56184 :> /192.168.8.71:4141] UNBOUND 12/12/13 02:03:42 INFO ipc.NettyServer: [id: 0x060035f0, /192.168.8.71:56184 :> /192.168.8.71:4141] CLOSED 12/12/13 02:03:42 INFO ipc.NettyServer: Connection to /192.168.8.71:56184 disconnected. 12/12/13 02:03:44 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }

 

补充:当启动命令最后加-Dflume.root.logger=INFO,console参数时会将日志信息打印到控制台中。

当不加-Dflume.root.logger=INFO,console参数的时候,会在flume的家目录中生成logs目录,并且在该目录下生成flume.log文件。当你打开flume.log文件的时候你会发现内容其实就是当你加-Dflume.root.logger=INFO,console参数时在控制台所打印出来的信息(好像格式稍微有点不同,比如时间的表示方式,这个应该得在log4j中配置吧)

查看flume默认的log4j.properties文件(在你flume安装目录的cong目录下)你会发现这么几行:

#flume.root.logger=DEBUG,console flume.root.logger=INFO,LOGFILE flume.log.dir=./logs flume.log.file=flume.log

案例2:Spool

------------------------------------------- Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:      1) 拷贝到spool目录下的文件不可以再打开编辑。      2) spool目录下不可包含相应的子目录

 

a)创建agent配置文件

[hadoop@h71 conf]$ vi spool.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.channels = c1 a1.sources.r1.spoolDir = /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/logs a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

[hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ mkdir logs b)启动flume agent a1 [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f conf/spool.conf -n a1 -Dflume.root.logger=INFO,console c)追加文件到/home/hadoop/flume-1.5.0-bin/logs目录 [hadoop@h71 logs]$ echo "spool test1" > /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/logs/spool_text.log d)在控制台,可以看到以下相关信息:

12/12/13 02:19:50 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 12/12/13 02:20:23 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one. 12/12/13 02:20:23 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/logs/spool_text.log to /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/logs/spool_text.log.COMPLETED 12/12/13 02:20:23 INFO sink.LoggerSink: Event: { headers:{file=/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31 spool test1 }

案例3:Exec

----------------------------------------------- EXEC执行一个给定的命令获得输出的源,如果要使用tail命令,必选使得file足够大才能看到输出内容

 

a)创建agent配置文件

[hadoop@h71 conf]$ vi exec_tail.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.channels = c1 a1.sources.r1.command = tail -F /home/hadoop/flume-1.5.0-bin/log_exec_tail # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

b)启动flume agent a1 [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console c)生成足够多的内容在文件里 [hadoop@h71 ~]$ for i in {1..100};do echo "exec tail$i" >> /home/hadoop/log_exec_tail;echo $i;sleep 0.1;done e)在控制台,可以看到以下信息:

12/12/13 02:33:06 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 12/12/13 02:34:00 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 31 exec tail1 } 12/12/13 02:34:00 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 32 exec tail2 } 12/12/13 02:34:00 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 33 exec tail3 } 12/12/13 02:34:00 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 34 exec tail4 } 12/12/13 02:34:00 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 35 exec tail5 } .... .... .... 12/12/13 02:34:09 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 38 exec tail98 } 12/12/13 02:34:09 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 39 exec tail99 } 12/12/13 02:34:09 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 31 30 30 exec tail100 }

案例4:Syslogtcp

Syslogtcp监听TCP的端口做为数据源

 

a)创建agent配置文件

[hadoop@h71 conf]$ vi syslog_tcp.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = 192.168.8.71 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

b)启动flume agent a1 [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console c)测试产生syslog [hadoop@h71 ~]$ echo "hello idoall.org syslog" | nc 192.168.8.71 5140 d)在控制台,可以看到以下信息:

12/12/13 02:40:51 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 12/12/13 02:40:51 INFO node.Application: Starting Sink k1 12/12/13 02:40:51 INFO node.Application: Starting Source r1 12/12/13 02:40:51 INFO source.SyslogTcpSource: Syslog TCP Source starting... 12/12/13 02:42:09 WARN source.SyslogUtils: Event created from Invalid Syslog data. 12/12/13 02:42:09 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }

案例5:JSONHandler

a)创建agent配置文件 [hadoop@h71 conf]$ vi post_json.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 8888 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

b)启动flume agent a1 [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console c)生成JSON 格式的POST request [hadoop@h71 ~]$ curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]' http://192.168.8.71:8888 d)在控制台,可以看到以下信息: 12/12/13 02:48:25 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 12/12/13 02:49:35 INFO sink.LoggerSink: Event: { headers:{b=b1, a=a1} body: 69 64 6F 61 6C 6C 2E 6F 72 67 5F 62 6F 64 79    idoall.org_body }

案例6:Hadoop sink

其中关于hadoop2.6.0部分的安装部署,请参考文章http://blog.csdn.net/m0_37739193/article/details/71222673

 

a)创建agent配置文件

[hadoop@h71 conf]$ vi hdfs_sink.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = 192.168.8.71 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://192.168.8.71:9000/user/flume/syslogtcp a1.sinks.k1.hdfs.filePrefix = Syslog a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

b)启动flume agent a1 [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console c)测试产生syslog [hadoop@h71 ~]$ echo "hello idoall flume -> hadoop testing one" | nc 192.168.8.71 5140 d)在控制台,可以看到以下信息:

12/12/13 03:00:57 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started 12/12/13 03:00:57 INFO node.Application: Starting Source r1 12/12/13 03:00:58 INFO source.SyslogTcpSource: Syslog TCP Source starting... 12/12/13 03:01:01 WARN source.SyslogUtils: Event created from Invalid Syslog data. 12/12/13 03:01:02 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false 12/12/13 03:01:02 INFO hdfs.BucketWriter: Creating hdfs://192.168.8.71:9000/user/flume/syslogtcp/Syslog.1355338862051.tmp 12/12/13 03:01:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 12/12/13 03:01:33 INFO hdfs.BucketWriter: Closing hdfs://192.168.8.71:9000/user/flume/syslogtcp/Syslog.1355338862051.tmp 12/12/13 03:01:33 INFO hdfs.BucketWriter: Renaming hdfs://192.168.8.71:9000/user/flume/syslogtcp/Syslog.1355338862051.tmp to hdfs://192.168.8.71:9000/user/flume/syslogtcp/Syslog.1355338862051 12/12/13 03:01:33 INFO hdfs.HDFSEventSink: Writer callback called.

e)再打开一个窗口,去hadoop上检查文件是否生成 [hadoop@h71 ~]$ hadoop fs -lsr /user/flume/syslogtcp lsr: DEPRECATED: Please use 'ls -R' instead. -rw-r--r--   2 hadoop supergroup        155 2012-12-13 03:01 /user/flume/syslogtcp/Syslog.1355338862051 [hadoop@h71 ~]$ hadoop fs -cat /user/flume/syslogtcp/Syslog.1355338862051 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable^ ;>Gv$hello idoall flume -> hadoop testing one

案例7:File Roll Sink

a)创建agent配置文件 [hadoop@h71 conf]$ vi file_roll.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5555 a1.sources.r1.host = 192.168.8.71 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/logs # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

[hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ mkdir logs b)启动flume agent a1 [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console c)测试产生log [hadoop@h71 logs]$ echo "hello idoall.org syslog" | nc 192.168.8.71 5555 [hadoop@h71 logs]$ echo "hello idoall.org syslog 2" | nc 192.168.8.71 5555 在控制台,可以看到以下信息:

12/12/13 03:10:33 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started 12/12/13 03:10:33 INFO node.Application: Starting Source r1 12/12/13 03:10:33 INFO sink.RollingFileSink: RollingFileSink k1 started. 12/12/13 03:10:34 INFO source.SyslogTcpSource: Syslog TCP Source starting... 12/12/13 03:11:38 WARN source.SyslogUtils: Event created from Invalid Syslog data. 12/12/13 03:12:44 WARN source.SyslogUtils: Event created from Invalid Syslog data.

d)查看/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/logs下是否生成文件,默认每30秒生成一个新文件(如果有内容输入则会输入生成的这个文件中,如果这30秒内没有内容输入,则只生成一个空文件) [hadoop@h71 logs]$ ll total 1

 

-rw-rw-r-- 1 hadoop hadoop 50 Dec 13 03:19 1355339980196-1

 

(默认它会每30秒生成一个文件,这样的话会在产生很多的小文件,如果嫌麻烦的话可以添加参数a1.sinks.k1.sink.rollInterval = 0,后面跟的数字自己定义,就是多少秒产生一个新的文件,我这里设置的0,那么就会只产生一个文件)

[hadoop@h71 logs]$ cat 1355339980196-1  hello idoall.org syslog hello idoall.org syslog 2  

Flume支持Fan out flow(扇出流)从一个源到多个通道。有两种方式,一种是用来复制(Replication),另一种是用来分流(Multiplexing)。

案例8:Replicating Channel Selector

Replication方式,可以将最前端的数据源复制多份,分别传递到多个channel中,每个channel接收到的数据都是相同的。

    这次我们需要用到h71,h72两台机器

a)在h71创建replicating_Channel_Selector配置文件

[hadoop@h71 conf]$ vi replicating_Channel_Selector.conf

a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = 192.168.8.71 a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type = replicating # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 192.168.8.71 a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 192.168.8.72 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100

b)在h71创建replicating_Channel_Selector_avro配置文件 [hadoop@h71 conf]$ vi replicating_Channel_Selector_avro.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 192.168.8.71 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

c)在h71上将replicating_Channel_Selector_avro.conf配置文件复制到h72上一份 [hadoop@h71 conf]$ scp replicating_Channel_Selector_avro.conf h72:/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/ 在h72上修改replicating_Channel_Selector_avro.conf的ip为 a1.sources.r1.bind = 192.168.8.72 d)打开3个窗口,在h71和h72上同时启动两个flume agent [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console [hadoop@h72 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/ -f conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console e)然后在h71或h72的任意一台机器上,测试产生syslog [hadoop@h71 conf]$ echo "hello idoall.org syslog" | nc 192.168.8.71 5140 f)在h71和h72的sink窗口,分别可以看到以下信息,这说明信息得到了同步: h71上:

12/12/13 06:36:01 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 12/12/13 06:36:01 INFO source.AvroSource: Avro source r1 started. 12/12/13 06:36:46 INFO ipc.NettyServer: [id: 0x54f65bf3, /192.168.8.71:43038 => /192.168.8.71:5555] OPEN 12/12/13 06:36:46 INFO ipc.NettyServer: [id: 0x54f65bf3, /192.168.8.71:43038 => /192.168.8.71:5555] BOUND: /192.168.8.71:5555 12/12/13 06:36:46 INFO ipc.NettyServer: [id: 0x54f65bf3, /192.168.8.71:43038 => /192.168.8.71:5555] CONNECTED: /192.168.8.71:43038 12/12/13 06:36:47 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }

h72上:

2012-12-13 06:31:28,547 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started 2012-12-13 06:31:28,549 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:253)] Avro source r1 started. 2012-12-13 06:31:38,500 (New I/O server boss #3) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x0a5fa6e0, /192.168.8.71:49630 => /192.168.8.72:5555] OPEN 2012-12-13 06:31:38,501 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x0a5fa6e0, /192.168.8.71:49630 => /192.168.8.72:5555] BOUND: /192.168.8.72:5555 2012-12-13 06:31:38,501 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x0a5fa6e0, /192.168.8.71:49630 => /192.168.8.72:5555] CONNECTED: /192.168.8.71:49630 2012-12-13 06:33:18,375 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }

 

猜想:我记得以前并没有设置selector.type的类型为replicating,也就是说并没有该项,只是简单的在一个配置文件中配置了一个source和两个channel和两个sink,并且两个channel和两个sink是一一对应,然而也成功了,而且就是复制的效果。所以我这里大胆猜测当你不设置这一项的时候,它默认就是复制(replicating),如果你想用多路复用的话还必须得配置该项为Multiplexing。

 

案例9:Multiplexing Channel Selector

Multiplexing方式,selector可以根据header的值来确定数据传递到哪一个channel

a)在h71创建Multiplexing_Channel_Selector配置文件

[hadoop@h71 conf]$ vi Multiplexing_Channel_Selector.conf

a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = type #映射允许每个值通道可以重叠。默认值可以包含任意数量的通道。 a1.sources.r1.selector.mapping.baidu = c1 a1.sources.r1.selector.mapping.ali = c2 a1.sources.r1.selector.default = c1 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 192.168.8.71 a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 192.168.8.72 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100

b)在h71创建Multiplexing_Channel_Selector_avro配置文件 [hadoop@h71 conf]$ vi Multiplexing_Channel_Selector_avro.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 192.168.8.71 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

c)将Multiplexing_Channel_Selector_avro.conf配置文件复制到h72上一份 [hadoop@h71 conf]$ scp Multiplexing_Channel_Selector_avro.conf h72:/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/ 在h72上修改Multiplexing_Channel_Selector_avro.conf的ip为: a1.sources.r1.bind = 192.168.8.72 d)打开3个窗口,在h71和h72上同时启动两个flume agent(我试验的时候打开三个就可以。。) [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console [hadoop@h72 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/ -f conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console e)然后在h71或h72的任意一台机器上,测试产生syslog [hadoop@h71 conf]$ curl -X POST -d '[{ "headers" :{"type" : "baidu"},"body" : "idoall_TEST1"}]' http://192.168.8.71:5140 && curl -X POST -d '[{ "headers" :{"type" : "ali"},"body" : "idoall_TEST2"}]' http://192.168.8.71:5140 && curl -X POST -d '[{ "headers" :{"type" : "qq"},"body" : "idoall_TEST3"}]' http://192.168.8.71:5140 f)在h71的sink窗口,可以看到以下信息:

12/12/13 08:12:23 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 12/12/13 08:12:23 INFO source.AvroSource: Avro source r1 started. 12/12/13 08:13:08 INFO ipc.NettyServer: [id: 0x7c761258, /192.168.8.71:52767 => /192.168.8.71:5555] OPEN 12/12/13 08:13:08 INFO ipc.NettyServer: [id: 0x7c761258, /192.168.8.71:52767 => /192.168.8.71:5555] BOUND: /192.168.8.71:5555 12/12/13 08:13:08 INFO ipc.NettyServer: [id: 0x7c761258, /192.168.8.71:52767 => /192.168.8.71:5555] CONNECTED: /192.168.8.71:52767 12/12/13 08:15:33 INFO sink.LoggerSink: Event: { headers:{type=baidu} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 31 idoall_TEST1 } 12/12/13 08:15:33 INFO sink.LoggerSink: Event: { headers:{type=qq} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 33 idoall_TEST3 }

g)在h72的sink窗口,可以看到以下信息:

2012-12-13 08:09:18,316 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started 2012-12-13 08:09:18,317 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:253)] Avro source r1 started. 2012-12-13 08:09:40,430 (New I/O server boss #3) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xcb673fb5, /192.168.8.71:46032 => /192.168.8.72:5555] OPEN 2012-12-13 08:09:40,432 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xcb673fb5, /192.168.8.71:46032 => /192.168.8.72:5555] BOUND: /192.168.8.72:5555 2012-12-13 08:09:40,432 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xcb673fb5, /192.168.8.71:46032 => /192.168.8.72:5555] CONNECTED: /192.168.8.71:46032 2012-12-13 08:12:05,774 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{type=ali} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 32 idoall_TEST2 }

可以看到,根据header中不同的条件分布到不同的channel上

 

案例10:Failover Sink Processor

failover的机器是一直发送给其中一个sink,当这个sink不可用的时候,自动发送到下一个sink。

 

a)在h71创建Flume_Sink_Processors配置文件

[hadoop@h71 conf]$ vi Flume_Sink_Processors.conf

a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 #这个是配置failover的关键,需要有一个sink group a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 #处理的类型是failover a1.sinkgroups.g1.processor.type = failover #优先级,数字越大优先级越高,每个sink的优先级必须不相同 a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 #设置为10秒,当然可以根据你的实际状况更改成更快或者很慢 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type = replicating # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 192.168.8.71 a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 192.168.8.72 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100

b)在h71创建Flume_Sink_Processors_avro配置文件 [hadoop@h71 conf]$ vi Flume_Sink_Processors_avro.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 192.168.8.71 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

c)将Flume_Sink_Processors_avro.conf配置文件复制到h72上一份 [hadoop@h71 conf]$ scp Flume_Sink_Processors_avro.conf h72:/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/ (但是这个得在h72修改一下IP,和网上的博客有点出入) 改为a1.sources.r1.bind = 192.168.8.72 否则h72在执行bin/flume-ng agent -c /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/ -f conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console命令时会报这个错:

2012-12-13 05:51:57,248 (lifecycleSupervisor-1-4) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: 192.168.8.71, port: 5555 } } - Exception follows. org.jboss.netty.channel.ChannelException: Failed to bind to: /192.168.8.71:5555 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106) at org.apache.flume.source.AvroSource.start(AvroSource.java:236) at org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44) at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Caused by: java.net.BindException: Cannot assign requested address at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) ... 3 more

一开始在h72上启动flume时报这个错: [hadoop@h72 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

Info: Including Hive libraries found via () for Hive access + exec /usr/jdk1.7.0_25/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin:/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/avro.conf -n a1 log4j:WARN No appenders could be found for logger (org.apache.flume.lifecycle.LifecycleSupervisor). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

 

可能原因1:后来上网查说是因为使用 -c 指定的conf位置出错;于是修改执行命令

 

[hadoop@h72 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/  -f conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

可能原因2:原来是我在h72机器的.bash_profile文件中没有添加hadoop的环境变量才出现了这个问题,于是添加环境变量后好使了

HADOOP_HOME=/home/hadoop/hadoop-2.6.0-cdh5.5.2 HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop PATH=$HADOOP_HOME/bin:$PATH export HADOOP_HOME HADOOP_CONF_DIR PATH

然后再让该配置文件生效[hadoop@h72 ~]$ source .bash_profile

以前的思考:按理说-c后面跟的应该是conf目录,那么跟相对路径和绝对路径都是可以的,但是在.bash_profile中添加了hadoop的环境变量后在flume的家目录下-c后面跟.为什么也会识别啊,.的意思不是本目录下吗,即/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin,而不是/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf啊。

2020.03.06号的回答:后来在没有装Hadoop集群的一台服务器上装了flume后在apache-flume-1.8.0-bin目录下执行命令-c参数后面还必须是跟conf/而不能是点,否则也会报上面那个错。我猜测可能是Hadoop集群的环境变量可以识别到点吧

d)打开3个窗口,在h71和h72上同时启动两个flume agent [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console [hadoop@h72 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/ -f conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console 一开始启动这个命令的时候总是报这个错:Caused by: java.net.BindException: Address already in use,于是我就 [hadoop@h71 conf]$ jps 4350 Jps 3762 SecondaryNameNode 25747 Main 3571 NameNode 3904 ResourceManager 3061 Application 4561 HMaster 4485 HQuorumPeer 3722 Application (有两个Application进程,我就把最前面的那个Application进程用kill -9杀死了,然后再执行上面的那个命令奇迹般的就好使了。。。) [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console e)然后在h71或h72的任意一台机器上,测试产生log [hadoop@h71 conf]$ echo "idoall.org test1 failover" | nc 192.168.8.71 5140 f)因为h72的优先级高,所以在h72的sink窗口,可以看到以下信息,而h71没有

2012-12-13 06:04:42,892 (New I/O server boss #3) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xcdd2cc86, /192.168.8.71:37143 => /192.168.8.72:5555] OPEN 2012-12-13 06:04:42,892 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xcdd2cc86, /192.168.8.71:37143 => /192.168.8.72:5555] BOUND: /192.168.8.72:5555 2012-12-13 06:04:42,892 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xcdd2cc86, /192.168.8.71:37143 => /192.168.8.72:5555] CONNECTED: /192.168.8.71:37143 2012-12-13 06:04:52,000 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 30 idoall.org test1 }

g)这时我们停止掉h72机器上的sink(ctrl+c),再次输出测试数据: [hadoop@h71 conf]$ echo "idoall.org test2 failover" | nc 192.168.8.71 5140 h)可以在h71的sink窗口,看到读取到了刚才发送的两条测试数据:

12/12/13 06:08:10 INFO ipc.NettyServer: [id: 0x45a46286, /192.168.8.71:55655 => /192.168.8.71:5555] OPEN 12/12/13 06:08:10 INFO ipc.NettyServer: [id: 0x45a46286, /192.168.8.71:55655 => /192.168.8.71:5555] BOUND: /192.168.8.71:5555 12/12/13 06:08:10 INFO ipc.NettyServer: [id: 0x45a46286, /192.168.8.71:55655 => /192.168.8.71:5555] CONNECTED: /192.168.8.71:55655 12/12/13 06:16:13 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 30 idoall.org test1 } 12/12/13 06:16:13 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }

i)我们再在h72的sink窗口中,启动sink: [hadoop@h72 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/ -f conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console j)输入两批测试数据: [hadoop@h71 conf]$ echo "idoall.org test3 failover" | nc 192.168.8.71 5140 && echo "idoall.org test4 failover" | nc 192.168.8.71 5140 k)在h72的sink窗口,我们可以看到以下信息,因为优先级的关系,log消息会再次落到h72上:

2012-12-13 06:14:14,191 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started 2012-12-13 06:14:14,192 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:253)] Avro source r1 started. 2012-12-13 06:14:18,934 (New I/O server boss #3) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x45dd9ffb, /192.168.8.71:57973 => /192.168.8.72:5555] OPEN 2012-12-13 06:14:18,936 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x45dd9ffb, /192.168.8.71:57973 => /192.168.8.72:5555] BOUND: /192.168.8.72:5555 2012-12-13 06:14:18,936 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x45dd9ffb, /192.168.8.71:57973 => /192.168.8.72:5555] CONNECTED: /192.168.8.71:57973 2012-12-13 06:14:22,935 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 } 2012-12-13 06:16:07,028 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 } 2012-12-13 06:16:07,028 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }

案例11:Load balancing Sink Processor

load balance type和failover不同的地方是,load balance有两个配置,一个是round_robin(轮询),一个是random(随机),默认情况下使用round_robin。两种情况下如果被选择的sink不可用,就会自动尝试发送到下一个可用的sink上面。

 

a)在h71创建Load_balancing_Sink_Processors配置文件

[hadoop@h71 conf]$ vi Load_balancing_Sink_Processors.conf

a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 #这个是配置Load balancing的关键,需要有一个sink group a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 192.168.8.71 a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro a1.sinks.k2.channel = c1 a1.sinks.k2.hostname = 192.168.8.72 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100

b)在h71创建Load_balancing_Sink_Processors_avro配置文件 [hadoop@h71 conf]$ vi Load_balancing_Sink_Processors_avro.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 192.168.8.71 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

c)将这Load_balancing_Sink_Processors_avro.conf配置文件复制到h72上一份 [hadoop@h71 conf]$ scp Load_balancing_Sink_Processors.conf h72:/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/ 在h72上修改Load_balancing_Sink_Processors_avro.conf的ip为: a1.sources.r1.bind = 192.168.8.72 d)打开3个窗口,在h71和h72上同时启动两个flume agent [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console [hadoop@h72 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/ -f conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console e)然后在h71或h72的任意一台机器上,测试产生log,一行一行输入,输入太快,容易落到一台机器上 [hadoop@h72 conf]$ echo "idoall.org test1" | nc 192.168.8.71 5140 [hadoop@h72 conf]$ echo "idoall.org test2" | nc 192.168.8.71 5140 [hadoop@h71 conf]$ echo "idoall.org test3" | nc 192.168.8.71 5140 [hadoop@h71 conf]$ echo "idoall.org test4" | nc 192.168.8.71 5140 f)在h71的sink窗口,可以看到以下信息: 14/08/10 15:35:29 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 } 14/08/10 15:35:33 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 } g)在h72的sink窗口,可以看到以下信息: 14/08/10 15:35:27 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 } 14/08/10 15:35:29 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 } 说明轮询模式起到了作用。

案例12:Hbase sink

      a)在测试之前,请先将hbase启动       b)然后将以下jar包复制到flume的lib目录中: [hadoop@h71 lib]$ cp protobuf-java-2.5.0.jar /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/lib/ [hadoop@h71 lib]$ cp hbase-protocol-1.0.0-cdh5.5.2.jar /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/lib/ [hadoop@h71 lib]$ cp hbase-client-1.0.0-cdh5.5.2.jar /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/lib/ [hadoop@h71 lib]$ cp hbase-common-1.0.0-cdh5.5.2.jar /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/lib/ [hadoop@h71 lib]$ cp hbase-server-1.0.0-cdh5.5.2.jar /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/lib/ [hadoop@h71 lib]$ cp hbase-hadoop2-compat-1.0.0-cdh5.5.2.jar /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/lib/ [hadoop@h71 lib]$ cp hbase-hadoop-compat-1.0.0-cdh5.5.2.jar /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/lib/ [hadoop@h71 lib]$ cp htrace-core-3.2.0-incubating.jar /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/lib/(博客中复制的是htrace-core-2.04.jar,而我这里却只有htrace-core-3.2.0-incubating.jar) (也可以直接把hbase-1.0.0-cdh5.5.2/lib下的jar包全部复制到flume的lib目录下) c)确保test_idoall_org表在hbase中已经存在,若不存在则在hbase中建立。 hbase(main):002:0> create 'test_idoall_org','uid','name' 0 row(s) in 0.6730 seconds d)在h71上创建hbase_simple配置文件 [hadoop@h71 conf]$ vi hbase_simple.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = 192.168.8.71 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = logger a1.sinks.k1.type = hbase a1.sinks.k1.table = test_idoall_org a1.sinks.k1.columnFamily = name a1.sinks.k1.column = idoall a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer a1.sinks.k1.channel = memoryChannel # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

e)启动flume agent [hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/hbase_simple2.conf -n a1 -Dflume.root.logger=INFO,console f)测试产生syslog [hadoop@h71 conf]$ echo "hello idoall.org from flume" | nc 192.168.8.71 5140 g)这时登录到hbase中,可以发现新数据已经插入 hbase(main):006:0> scan 'test_idoall_org' ROW                                                          COLUMN+CELL                                                                                                                                                                       1355359597472-jBAezVDqVh-0                                  column=name:payload, timestamp=1355359392694, value=hello idoall.org from flume                                                         

 

1 row(s) in 0.0160 seconds

 

 

参考文章:http://www.jb51.net/article/53542.htm (在此文章的基础上做了一些修改和完善)

转载请注明原文地址: https://www.6miu.com/read-6072.html

最新回复(0)