flume自定义source

xiaoxiao2021-02-27  137

package  me;   import  java.nio.charset.Charset; import  java.util.HashMap; import  java.util.Random;   import  org.apache.flume.Context; import  org.apache.flume.EventDeliveryException; import  org.apache.flume.PollableSource; import  org.apache.flume.conf.Configurable; import  org.apache.flume.event.EventBuilder; import  org.apache.flume.source.AbstractSource;   public  class  MySource  extends  AbstractSource  implements  Configurable, PollableSource {        @Override      public  long  getBackOffSleepIncrement() {          // TODO Auto-generated method stub          return  0 ;      }        @Override      public  long  getMaxBackOffSleepInterval() {          // TODO Auto-generated method stub          return  0 ;      }        @Override      public  Status process()  throws  EventDeliveryException {          try  {              while  ( true ) {                  int  max =  20 ;                  int  min =  10 ;                  Random random =  new  Random();                    int  s = random.nextInt(max) % (max - min +  1 ) + min;                  HashMap<String, String> header =  new  HashMap<String, String>();                  header.put( "id" , Integer.toString(s));                  this .getChannelProcessor()                          .processEvent(EventBuilder.withBody(Integer.toString(s), Charset.forName( "UTF-8" ), header));                    Thread.sleep( 1000 );              }          }   catch  (InterruptedException e) {              e.printStackTrace();          }          return  null ;      }        @Override      public  void  configure(Context arg0) {                } }

 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 agent.channels = ch- 1 agent.sources = src- 1 agent.sinks = sk1   agent.sources.src- 1 .type = me.MySource   agent.channels.ch- 1 .type = memory agent.channels.ch- 1 .capacity =  10000 agent.channels.ch- 1 .transactionCapacity =  10000 agent.channels.ch- 1 .byteCapacityBufferPercentage =  20 agent.channels.ch- 1 .byteCapacity =  800000   agent.sinks.sk1.type = logger    agent.sinks.sk1.channel = ch- 1 agent.sources.src- 1 .channels=ch- 1

 

1 2 lihudeMacBook-Pro:~ SunAndLi$ cd hadoop- 2.7 . 2 /flume/ lihudeMacBook-Pro:flume SunAndLi$  bin/flume-ng agent -c conf -f conf/mysql-source --name agent -Dflume.root.logger=INFO,console

 

转载请注明原文地址: https://www.6miu.com/read-16511.html

最新回复(0)