kafka可以通过自定义Sink的方式实现数据搜集并写入各种LOTP数据库,下面的例子是通过自定义Source实现数据写入分布式K-V数据库Aerospike.
1. 自定义Sink代码如下
[java]
view plain
copy
package kafka_sink.asd; import java.io.IOException; import java.net.ConnectException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.flume.Channel; import org.apache.flume.Constants; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import com.aerospike.client.AerospikeException; import com.aerospike.client.Bin; import com.aerospike.client.Key; import com.aerospike.client.Record; import com.aerospike.client.async.AsyncClient; import com.aerospike.client.listener.RecordListener; import com.aerospike.client.listener.WriteListener; import com.aerospike.client.policy.WritePolicy; import com.aerospike.client.async.AsyncClientPolicy; import com.aerospike.client.policy.Policy; import com.aerospike.client.Host; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.regex.Matcher; import java.util.regex.Pattern; public class Asdsink extends AbstractSink implements Configurable { public static final String TOPIC_HDR = "topic"; public static final String KEY_HDR = "key"; private String ASD_HOST1; private String ASD_HOST2; private int ASD_PORT; private String ASD_NAME_SPACE = "cm"; private String MZ_SET_NAME; private String MZ_BIN_NAME; private int batchSize; private WritePolicy write_policy; private Policy policy; private AsyncClient asd_async_client; private AsyncClientPolicy async_client_policy; private boolean completed; @Override public void configure(Context context) { ASD_HOST1 = context.getString("asd_host1", "127.0.0.1"); ASD_HOST2 = context.getString("asd_host2", "127.0.0.1"); ASD_PORT = context.getInteger("asd_port",3000); SET_NAME = context.getString("set_name", "xxx"); BIN_NAME = context.getString("bin_name", "xxx"); batchSize = context.getInteger("batchSize",1000); System.out.printf("ASD_HOST1:%s\n",ASD_HOST1); System.out.printf("ASD_HOST2:%s\n",ASD_HOST2); System.out.printf("ASD_PORT:%d\n",ASD_PORT); System.out.printf("SET_NAME:%s\n",SET_NAME); System.out.printf("BIN_NAME:%s\n",BIN_NAME); System.out.printf("batchSize:%d\n",batchSize); } @Override public void start() { Host[] hosts = new Host[] {new Host(ASD_HOST1, 3000), new Host(ASD_HOST2, 3000)}; async_client_policy = new AsyncClientPolicy(); async_client_policy.asyncMaxCommands = 300; async_client_policy.failIfNotConnected = true; asd_async_client = new AsyncClient(async_client_policy, hosts); policy = new Policy(); policy.timeout = 20; write_policy = new WritePolicy(); write_policy.timeout = 20; } @Override public void stop () { asd_async_client.close(); } @Override public Status process() throws EventDeliveryException { Status status = null; Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { long processedEvent = 0; for (; processedEvent < batchSize; processedEvent++) { Event event = ch.take(); byte[] eventBody; if(event != null) { eventBody = event.getBody(); String line= new String(eventBody,"UTF-8"); if (line.length() > 0 ) { String[] key_tag = line.split("\t"); if(key_tag.length == 2){ String tmp_key = key_tag[0]; String tmp_tag = key_tag[1]; Key as_key = new Key(ASD_NAME_SPACE, SET_NAME, tmp_key); Bin ad_bin = new Bin(BIN_NAME, tmp_tag); try{ completed = false; asd_async_client.get(policy,new ReadHandler(asd_async_client,policy,write_policy, as_key, ad_bin), as_key); waitTillComplete(); } catch (Throwable t) { System.out.println("[ERROR][process]"+ t.toString()); } } } } } status = Status.READY; txn.commit(); } catch (Throwable t) { txn.rollback(); status = Status.BACKOFF; if (t instanceof Error) { System.out.println("[ERROR][process]"+ t.toString()); throw (Error)t; } } txn.close(); return status; } private class WriteHandler implements WriteListener { private final AsyncClient client; private final WritePolicy policy; private final Key key; private final Bin bin; private int failCount = 0; public WriteHandler(AsyncClient client, WritePolicy policy, Key key, Bin bin) { this.client = client; this.policy = policy; this.key = key; this.bin = bin; } public void onSuccess(Key key) { try { } catch (Exception e) { System.out.printf("[ERROR][WriteHandler]Failed to put: namespace=%s set=%s key=%s exception=%s\n",key.namespace, key.setName, key.userKey, e.getMessage()); } notifyCompleted(); } public void onFailure(AerospikeException e) { if (++failCount <= 2) { Throwable t = e.getCause(); if (t != null && (t instanceof ConnectException || t instanceof IOException)) { try { client.put(policy, this, key, bin); return; } catch (Exception ex) { System.out.printf("[ERROR][WriteHandler]Failed to put: namespace=%s set=%s key=%s bin_name=% bin_value=%s exception=%s\n",key.namespace, key.setName, key.userKey,bin.name,bin.value.toString(), e.getMessage()); } } } notifyCompleted(); } } private class ReadHandler implements RecordListener { private final AsyncClient client; private final Policy policy; private final WritePolicy write_policy; private final Key key; private final Bin bin; private int failCount = 0; public ReadHandler(AsyncClient client, Policy policy,WritePolicy write_policy, Key key, Bin bin) { this.client = client; this.policy = policy; this.write_policy = write_policy; this.key = key; this.bin = bin; } public void onSuccess(Key key, Record record) { try { if(record != null) { String str = record.getString("mz_tag"); if(str != null && str.length() > 0) { Pattern p101 = Pattern.compile("(101\\d{4})"); Pattern p102 = Pattern.compile("(102\\d{4})"); Pattern p103 = Pattern.compile("(103\\d{4})"); String tags=""; Matcher m101 = p101.matcher(str); while (m701.find()) { tags += ("," + m701.group(1)); } Matcher m102 = p102.matcher(str); while (m102.find()) { tags += ( "," + m102.group(1)); } Matcher m103 = p103.matcher(str); while (m103.find()) { tags += ( "," + m103.group(1)); } if(tags.length() > 0) { String value_new = ( bin.value.toString() + tags); Bin new_bin = new Bin("mz_tag", value_new); client.put(write_policy,new WriteHandler(client,write_policy, key, new_bin), key,new_bin); } else { client.put(write_policy,new WriteHandler(client,write_policy, key, bin), key,bin); } } else { client.put(write_policy,new WriteHandler(client,write_policy, key, bin), key,bin); } } else { client.put(write_policy,new WriteHandler(client,write_policy, key, bin), key,bin); } } catch (Exception e) { System.out.printf("[ERROR][ReadHandler]Failed to get: namespace=%s set=%s key=%s exception=%s\n",key.namespace, key.setName, key.userKey, e.getMessage()); } } public void onFailure(AerospikeException e) { if (++failCount <= 2) { Throwable t = e.getCause(); if (t != null && (t instanceof ConnectException || t instanceof IOException)) { try { client.get(policy, this, key); return; } catch (Exception ex) { System.out.printf("[ERROR][ReadHandler]Failed to get: namespace=%s set=%s key=%s exception=%s\n",key.namespace, key.setName, key.userKey, e.getMessage()); } } } notifyCompleted(); } } private synchronized void waitTillComplete() { while (! completed) { try { super.wait(); } catch (InterruptedException ie) { } } } private synchronized void notifyCompleted() { completed = true; super.notify(); } }
2. 用maven将自定义的Sink打包成jar包,maven xml配置文件如下
[html]
view plain
copy
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-aerospike-master-sink</artifactId> <name>Flume Kafka Sink</name> <version>1.0.0</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> <dependency> <groupId>com.aerospike</groupId> <artifactId>aerospike-client</artifactId> <version>[3.0.0,)</version> </dependency> </dependencies> </project>
3. 将打包好的jar 包放到flume lib目录下
4. 在flume 的conf目录的配置文件中加入自定义Sink配置
[plain]
view plain
copy
<span style="font-family:KaiTi_GB2312;">a1.sinks.k1_1.type = kafka_sink.asd.Asdsink a1.sinks.k1_1.asd_host1 = 127.0.0.1 a1.sinks.k1_1.asd_host2 = 192.168.0.1 a1.sinks.k1_1.asd_port = 3000 a1.sinks.k1_1.set_name = test_set_name a1.sinks.k1_1.bin_name = test_bin_name a1.sinks.k1_1.batchSize = 10000</span>
相关资料: 1. flume官网文档:http://flume.apache.org/FlumeDeveloperGuide.html 2. Apache Maven 入门篇(上):http://blog.csdn.net/yanshu2012/article/details/50722088 3. Apache Maven 入门篇(下):http://blog.csdn.net/yanshu2012/article/details/50722621