flume开发--自定义Sink

xiaoxiao2021-02-28  107

kafka可以通过自定义Sink的方式实现数据搜集并写入各种LOTP数据库,下面的例子是通过自定义Source实现数据写入分布式K-V数据库Aerospike.

1. 自定义Sink代码如下
[java]  view plain  copy package kafka_sink.asd;      import java.io.IOException;   import java.net.ConnectException;   import java.util.ArrayList;   import java.util.List;   import java.util.Map;   import java.util.Properties;   import org.apache.flume.Channel;   import org.apache.flume.Constants;   import org.apache.flume.Context;   import org.apache.flume.Event;   import org.apache.flume.EventDeliveryException;   import org.apache.flume.Transaction;   import org.apache.flume.conf.Configurable;   import org.apache.flume.sink.AbstractSink;   import com.aerospike.client.AerospikeException;   import com.aerospike.client.Bin;   import com.aerospike.client.Key;   import com.aerospike.client.Record;   import com.aerospike.client.async.AsyncClient;   import com.aerospike.client.listener.RecordListener;   import com.aerospike.client.listener.WriteListener;   import com.aerospike.client.policy.WritePolicy;   import com.aerospike.client.async.AsyncClientPolicy;   import com.aerospike.client.policy.Policy;   import com.aerospike.client.Host;   import org.slf4j.Logger;   import org.slf4j.LoggerFactory;   import java.util.regex.Matcher;   import java.util.regex.Pattern;      public class Asdsink extends AbstractSink implements Configurable {     //private String myProp;     public static final String TOPIC_HDR = "topic";     public static final String KEY_HDR = "key";     //private String mz_tag_topic;     //private AerospikeClient asd_client;     private String ASD_HOST1;     private String ASD_HOST2;     private int ASD_PORT;     private String ASD_NAME_SPACE = "cm";     private String MZ_SET_NAME;     private String MZ_BIN_NAME;     private int batchSize;// 一次事务的event数量,整体提交     private WritePolicy write_policy;     private Policy policy;     //Async Read and Write     private AsyncClient asd_async_client;     private AsyncClientPolicy async_client_policy;     private boolean completed;           @Override     public void configure(Context context) {       //String myProp = context.getString("myProp", "defaultValue");          // Process the myProp value (e.g. validation)          // Store myProp for later retrieval by process() method       //this.myProp = myProp;       ASD_HOST1 =  context.getString("asd_host1""127.0.0.1");       ASD_HOST2 =  context.getString("asd_host2""127.0.0.1");       ASD_PORT =  context.getInteger("asd_port",3000);       SET_NAME = context.getString("set_name""xxx");       BIN_NAME = context.getString("bin_name""xxx");       batchSize = context.getInteger("batchSize",1000);       System.out.printf("ASD_HOST1:%s\n",ASD_HOST1);       System.out.printf("ASD_HOST2:%s\n",ASD_HOST2);       System.out.printf("ASD_PORT:%d\n",ASD_PORT);       System.out.printf("SET_NAME:%s\n",SET_NAME);       System.out.printf("BIN_NAME:%s\n",BIN_NAME);       System.out.printf("batchSize:%d\n",batchSize);            }        @Override     public void start() {       // Initialize the connection to the external repository (e.g. HDFS) that       // this Sink will forward Events to ..          Host[] hosts = new Host[] {new Host(ASD_HOST1, 3000),                                  new Host(ASD_HOST2, 3000)};              async_client_policy = new AsyncClientPolicy();       async_client_policy.asyncMaxCommands = 300;       async_client_policy.failIfNotConnected = true;       asd_async_client = new AsyncClient(async_client_policy, hosts);       policy = new Policy();       policy.timeout = 20;        write_policy = new WritePolicy();       write_policy.timeout = 20;     }        @Override     public void stop () {       // Disconnect from the external respository and do any       // additional cleanup (e.g. releasing resources or nulling-out       // field values) ..         asd_async_client.close();     }        @Override     public Status process() throws EventDeliveryException {         Status status = null;           // Start transaction              Channel ch = getChannel();           Transaction txn = ch.getTransaction();           txn.begin();           try {             // This try clause includes whatever Channel operations you want to do             long processedEvent = 0;             for (; processedEvent < batchSize; processedEvent++) {                 Event event = ch.take();                                  byte[] eventBody;                    if(event != null)                 {                     eventBody = event.getBody();                     String line= new String(eventBody,"UTF-8");                     if (line.length() > 0 )                     {                         String[] key_tag = line.split("\t");                         if(key_tag.length == 2){                             String tmp_key = key_tag[0];                             String tmp_tag = key_tag[1];                             Key as_key = new Key(ASD_NAME_SPACE, SET_NAME, tmp_key);                             Bin ad_bin = new Bin(BIN_NAME, tmp_tag);                             try{                                 completed = false;                                 asd_async_client.get(policy,new ReadHandler(asd_async_client,policy,write_policy, as_key, ad_bin), as_key);                                 waitTillComplete();                             }                             catch (Throwable t) {                                 System.out.println("[ERROR][process]"+ t.toString());                             }                         }                     }                 }             }                                       // Send the Event to the external repository.             // storeSomeData(e);             status = Status.READY;             txn.commit();           } catch (Throwable t) {                          txn.rollback();             // Log exception, handle individual exceptions as needed                status = Status.BACKOFF;             // re-throw all Errors             if (t instanceof Error) {                 System.out.println("[ERROR][process]"+ t.toString());               throw (Error)t;             }           }              txn.close();           return status;                       }               private class WriteHandler implements WriteListener {           private final AsyncClient client;           private final WritePolicy policy;           private final Key key;           private final Bin bin;           private int failCount = 0;                      public WriteHandler(AsyncClient client, WritePolicy policy, Key key, Bin bin) {               this.client = client;               this.policy = policy;               this.key = key;               this.bin = bin;           }                      // Write success callback.           public void onSuccess(Key key) {               try {                   // Write succeeded.                                }               catch (Exception e) {                                  System.out.printf("[ERROR][WriteHandler]Failed to put: namespace=%s set=%s key=%s exception=%s\n",key.namespace, key.setName, key.userKey, e.getMessage());               }                              notifyCompleted();           }                      public void onFailure(AerospikeException e) {              // Retry up to 2 more times.            if (++failCount <= 2) {               Throwable t = e.getCause();                              // Check for common socket errors.               if (t != null && (t instanceof ConnectException || t instanceof IOException)) {                     //console.info("Retrying put: " + key.userKey);                     try {                         client.put(policy, this, key, bin);                         return;                     }                     catch (Exception ex) {                       // Fall through to error case.                       System.out.printf("[ERROR][WriteHandler]Failed to put: namespace=%s set=%s key=%s bin_name=% bin_value=%s exception=%s\n",key.namespace, key.setName, key.userKey,bin.name,bin.value.toString(), e.getMessage());                     }               }           }                      notifyCompleted();           }       }          private class ReadHandler implements RecordListener {           private final AsyncClient client;           private final Policy policy;           private final WritePolicy write_policy;           private final Key key;           private final Bin bin;           private int failCount = 0;                      public ReadHandler(AsyncClient client, Policy policy,WritePolicy write_policy, Key key, Bin bin) {               this.client = client;               this.policy = policy;               this.write_policy = write_policy;               this.key = key;               this.bin = bin;           }                              // Read success callback.           public void onSuccess(Key key, Record record) {                              try {                   // Read succeeded.  Now call write.                   if(record != null)                   {                       String str  = record.getString("mz_tag");                                                                    if(str != null  && str.length() > 0)                         {                           Pattern p101 = Pattern.compile("(101\\d{4})");                           Pattern p102 = Pattern.compile("(102\\d{4})");                           Pattern p103 = Pattern.compile("(103\\d{4})");                           String tags="";                           Matcher m101 = p101.matcher(str);                           while (m701.find()) {                               tags += ("," + m701.group(1));                                       }                                                       Matcher m102 = p102.matcher(str);                           while (m102.find()) {                               tags += ( "," + m102.group(1));                                       }                                                       Matcher m103 = p103.matcher(str);                           while (m103.find()) {                               tags += ( "," + m103.group(1));                                       }                                                       if(tags.length() > 0)                           {                               String value_new = ( bin.value.toString() + tags);                               Bin new_bin = new Bin("mz_tag", value_new);                               client.put(write_policy,new WriteHandler(client,write_policy, key, new_bin), key,new_bin);                           }                           else                           {                               client.put(write_policy,new WriteHandler(client,write_policy, key, bin), key,bin);                           }                                                              }                       else                       {                           client.put(write_policy,new WriteHandler(client,write_policy, key, bin), key,bin);                       }                   }                   else                   {                       client.put(write_policy,new WriteHandler(client,write_policy, key, bin), key,bin);                   }                                  }               catch (Exception e) {                                  System.out.printf("[ERROR][ReadHandler]Failed to get: namespace=%s set=%s key=%s exception=%s\n",key.namespace, key.setName, key.userKey, e.getMessage());               }                      }              // Error callback.           public void onFailure(AerospikeException e) {               // Retry up to 2 more times.               if (++failCount <= 2) {               Throwable t = e.getCause();                              // Check for common socket errors.               if (t != null && (t instanceof ConnectException || t instanceof IOException)) {                     //console.info("Retrying get: " + key.userKey);                     try {                         client.get(policy, this, key);                         return;                     }                     catch (Exception ex) {                       // Fall through to error case.                         System.out.printf("[ERROR][ReadHandler]Failed to get: namespace=%s set=%s key=%s exception=%s\n",key.namespace, key.setName, key.userKey, e.getMessage());                     }               }           }               notifyCompleted();           }       }                        private synchronized void waitTillComplete() {           while (! completed) {               try {                   super.wait();               }               catch (InterruptedException ie) {               }           }       }          private synchronized void notifyCompleted() {           completed = true;           super.notify();       }   }  
2. 用maven将自定义的Sink打包成jar包,maven xml配置文件如下
[html]  view plain  copy <?xml version="1.0" encoding="UTF-8"?>      <project xmlns="http://maven.apache.org/POM/4.0.0"            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>          <groupId>org.apache.flume.flume-ng-sinks</groupId>     <artifactId>flume-ng-aerospike-master-sink</artifactId>     <name>Flume Kafka Sink</name>     <version>1.0.0</version>     <build>       <plugins>         <plugin>           <groupId>org.apache.maven.plugins</groupId>           <artifactId>maven-jar-plugin</artifactId>         </plugin>       </plugins>     </build>        <dependencies>       <dependency>         <groupId>org.apache.flume</groupId>         <artifactId>flume-ng-sdk</artifactId>         <version>1.5.2</version>       </dependency>          <dependency>         <groupId>org.apache.flume</groupId>         <artifactId>flume-ng-core</artifactId>         <version>1.5.2</version>       </dependency>          <dependency>         <groupId>org.apache.flume</groupId>         <artifactId>flume-ng-configuration</artifactId>         <version>1.5.2</version>       </dependency>          <dependency>         <groupId>org.slf4j</groupId>         <artifactId>slf4j-api</artifactId>         <version>1.6.1</version>       </dependency>          <dependency>         <groupId>junit</groupId>         <artifactId>junit</artifactId>         <version>4.10</version>         <scope>test</scope>       </dependency>                   <dependency>         <groupId>com.aerospike</groupId>         <artifactId>aerospike-client</artifactId>         <version>[3.0.0,)</version>       </dependency>              </dependencies>      </project>  
3. 将打包好的jar 包放到flume lib目录下
4. 在flume 的conf目录的配置文件中加入自定义Sink配置
[plain]  view plain  copy <span style="font-family:KaiTi_GB2312;">a1.sinks.k1_1.type = kafka_sink.asd.Asdsink   a1.sinks.k1_1.asd_host1 = 127.0.0.1   a1.sinks.k1_1.asd_host2 = 192.168.0.1   a1.sinks.k1_1.asd_port = 3000   a1.sinks.k1_1.set_name = test_set_name   a1.sinks.k1_1.bin_name = test_bin_name   a1.sinks.k1_1.batchSize =  10000</span>  

相关资料: 1. flume官网文档:http://flume.apache.org/FlumeDeveloperGuide.html 2. Apache Maven 入门篇(上):http://blog.csdn.net/yanshu2012/article/details/50722088 3. Apache Maven 入门篇(下):http://blog.csdn.net/yanshu2012/article/details/50722621

转载请注明原文地址: https://www.6miu.com/read-18044.html

最新回复(0)