漫游Kafka实战篇之客户端编程实例

xiaoxiao2021-02-28  115

新版的Producer API提供了以下功能: 可以将多个消息缓存到本地队列里,然后异步的批量发送到broker,可以通过参数producer.type=async做到。缓存的大小可以通过一些参数指定:queue.time和batch.size。一个后台线程((kafka.producer.async.ProducerSendThread)从队列中取出数据并让kafka.producer.EventHandler将消息发送到broker,也可以通过参数event.handler定制handler,在producer端处理数据的不同的阶段注册处理器,比如可以对这一过程进行日志追踪,或进行一些监控。只需实现kafka.producer.async.CallbackHandler接口,并在callback.handler中配置。自己编写Encoder来序列化消息,只需实现下面这个接口。默认的Encoder是kafka.serializer.DefaultEncoder。 [java]  view plain  copy interface Encoder<T> {     public Message toMessage(T data);   }   提供了基于Zookeeper的broker自动感知能力,可以通过参数zk.connect实现。如果不使用Zookeeper,也可以使用broker.list参数指定一个静态的brokers列表,这样消息将被随机的发送到一个broker上,一旦选中的broker失败了,消息发送也就失败了。通过分区函数kafka.producer.Partitioner类对消息分区。 [java]  view plain  copy interface Partitioner<T> {      int partition(T key, int numPartitions);   }   分区函数有两个参数:key和可用的分区数量,从分区列表中选择一个分区并返回id。默认的分区策略是hash(key)%numPartitions.如果key是null,就随机的选择一个。可以通过参数partitioner.class定制分区函数。

新的api完整实例如下:

[java]  view plain  copy package com.cuicui.kafkademon;         import java.util.ArrayList;   import java.util.List;   import java.util.Properties;         import kafka.javaapi.producer.Producer;   import kafka.producer.KeyedMessage;   import kafka.producer.ProducerConfig;         /**   * @author <a href="mailto:leicui001@126.com">崔磊</a>   * @date 2015年11月4日 上午11:44:15   */   public class MyProducer {             public static void main(String[] args) throws InterruptedException {                 Properties props = new Properties();           props.put("serializer.class""kafka.serializer.StringEncoder");           props.put("metadata.broker.list", KafkaProperties.BROKER_CONNECT);           props.put("partitioner.class""com.cuicui.kafkademon.MyPartitioner");           props.put("request.required.acks""1");           ProducerConfig config = new ProducerConfig(props);           Producer<String, String> producer = new Producer<String, String>(config);                 // 单个发送           for (int i = 0; i <= 1000000; i++) {               KeyedMessage<String, String> message =                       new KeyedMessage<String, String>(KafkaProperties.TOPIC, i + """Message" + i);               producer.send(message);               Thread.sleep(5000);           }                 // 批量发送           List<KeyedMessage<String, String>> messages = new ArrayList<KeyedMessage<String, String>>(100);           for (int i = 0; i <= 10000; i++) {               KeyedMessage<String, String> message =                       new KeyedMessage<String, String>(KafkaProperties.TOPIC, i + """Message" + i);               messages.add(message);               if (i % 100 == 0) {                   producer.send(messages);                   messages.clear();               }           }           producer.send(messages);       }   }   下面这个是用到的分区函数:

[java]  view plain  copy import kafka.producer.Partitioner;   import kafka.utils.VerifiableProperties;         public class MyPartitioner implements Partitioner {       public MyPartitioner(VerifiableProperties props) {             }             /*       * @see kafka.producer.Partitioner#partition(java.lang.Object, int)       */       @Override       public int partition(Object key, int partitionCount) {           return Integer.valueOf((String) key) % partitionCount;       }   }  

KafKa Consumer APIs

Consumer API有两个级别。低级别的和一个指定的broker保持连接,并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。

高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还可以自己维护消费状态,并可以通过一些条件指定订阅特定的topic,比如白名单黑名单或者正则表达式。

低级别的API

[java]  view plain  copy package com.cuicui.kafkademon;         import java.nio.ByteBuffer;   import java.util.Collections;   import java.util.HashMap;   import java.util.List;   import java.util.Map;         import kafka.api.FetchRequest;   import kafka.api.FetchRequestBuilder;   import kafka.api.PartitionOffsetRequestInfo;   import kafka.cluster.Broker;   import kafka.common.TopicAndPartition;   import kafka.javaapi.FetchResponse;   import kafka.javaapi.OffsetRequest;   import kafka.javaapi.OffsetResponse;   import kafka.javaapi.PartitionMetadata;   import kafka.javaapi.TopicMetadata;   import kafka.javaapi.TopicMetadataRequest;   import kafka.javaapi.TopicMetadataResponse;   import kafka.javaapi.consumer.SimpleConsumer;   import kafka.javaapi.message.ByteBufferMessageSet;   import kafka.message.Message;   import kafka.message.MessageAndOffset;         /**   * offset自己维护 目标topic、partition均由自己分配   *    * @author <a href="mailto:leicui001@126.com">崔磊</a>   * @date 2015年11月4日 上午11:44:15   *   */   public class MySimpleConsumer {             public static void main(String[] args) {           new MySimpleConsumer().consume();       }             /**       * 消费消息       */       public void consume() {           int partition = 0;                 // 找到leader           Broker leaderBroker = findLeader(KafkaProperties.BROKER_CONNECT, KafkaProperties.TOPIC, partition);                 // 从leader消费           SimpleConsumer simpleConsumer =                   new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), 2000010000"mySimpleConsumer");           long startOffet = 1;           int fetchSize = 1000;                 while (true) {               long offset = startOffet;               // 添加fetch指定目标tipic,分区,起始offset及fetchSize(字节),可以添加多个fetch               FetchRequest req =                       new FetchRequestBuilder().addFetch(KafkaProperties.TOPIC, 0, startOffet, fetchSize).build();                     // 拉取消息               FetchResponse fetchResponse = simpleConsumer.fetch(req);                     ByteBufferMessageSet messageSet = fetchResponse.messageSet(KafkaProperties.TOPIC, partition);               for (MessageAndOffset messageAndOffset : messageSet) {                   Message mess = messageAndOffset.message();                   ByteBuffer payload = mess.payload();                   byte[] bytes = new byte[payload.limit()];                   payload.get(bytes);                   String msg = new String(bytes);                         offset = messageAndOffset.offset();                   System.out.println("partition : " + 3 + ", offset : " + offset + "  mess : " + msg);               }               // 继续消费下一批               startOffet = offset + 1;           }       }             /**       * 找到制定分区的leader broker       *        * @param brokerHosts broker地址,格式为:“host1:port1,host2:port2,host3:port3”       * @param topic topic       * @param partition 分区       * @return       */       public Broker findLeader(String brokerHosts, String topic, int partition) {           Broker leader = findPartitionMetadata(brokerHosts, topic, partition).leader();           System.out.println(String.format("Leader tor topic %s, partition %d is %s:%d", topic, partition, leader.host(),                   leader.port()));           return leader;       }             /**       * 找到指定分区的元数据       *        * @param brokerHosts broker地址,格式为:“host1:port1,host2:port2,host3:port3”       * @param topic topic       * @param partition 分区       * @return 元数据       */       private PartitionMetadata findPartitionMetadata(String brokerHosts, String topic, int partition) {           PartitionMetadata returnMetaData = null;           for (String brokerHost : brokerHosts.split(",")) {               SimpleConsumer consumer = null;               String[] splits = brokerHost.split(":");               consumer = new SimpleConsumer(splits[0], Integer.valueOf(splits[1]), 10000064 * 1024"leaderLookup");               List<String> topics = Collections.singletonList(topic);               TopicMetadataRequest request = new TopicMetadataRequest(topics);               TopicMetadataResponse response = consumer.send(request);               List<TopicMetadata> topicMetadatas = response.topicsMetadata();               for (TopicMetadata topicMetadata : topicMetadatas) {                   for (PartitionMetadata PartitionMetadata : topicMetadata.partitionsMetadata()) {                       if (PartitionMetadata.partitionId() == partition) {                           returnMetaData = PartitionMetadata;                       }                   }               }               if (consumer != null)                   consumer.close();           }           return returnMetaData;       }             /**       * 根据时间戳找到某个客户端消费的offset       *        * @param consumer SimpleConsumer       * @param topic topic       * @param partition 分区       * @param clientID 客户端的ID       * @param whichTime 时间戳       * @return offset       */       public long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientID, long whichTime) {           TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);           Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =                   new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();           requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));           OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientID);           OffsetResponse response = consumer.getOffsetsBefore(request);           long[] offsets = response.offsets(topic, partition);           return offsets[0];       }   }   低级别的API是高级别API实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比如Hadoop consumer这样的离线consumer。

高级别的API

[java]  view plain  copy package com.cuicui.kafkademon;      import java.util.HashMap;   import java.util.List;   import java.util.Map;   import java.util.Properties;      import kafka.consumer.Consumer;   import kafka.consumer.ConsumerConfig;   import kafka.consumer.ConsumerIterator;   import kafka.consumer.KafkaStream;   import kafka.javaapi.consumer.ConsumerConnector;   import kafka.message.MessageAndMetadata;         /**   * offset在zookeeper中记录,以group.id为key 分区和customer的对应关系由Kafka维护   *    * @author <a href="mailto:leicui001@126.com">崔磊</a>   * @date 2015年11月4日 上午11:44:15   */   public class MyHighLevelConsumer {          /**       * 该consumer所属的组ID       */       private String groupid;          /**       * 该consumer的ID       */       private String consumerid;          /**       * 每个topic开几个线程?       */       private int threadPerTopic;          public MyHighLevelConsumer(String groupid, String consumerid, int threadPerTopic) {           super();           this.groupid = groupid;           this.consumerid = consumerid;           this.threadPerTopic = threadPerTopic;       }          public void consume() {           Properties props = new Properties();           props.put("group.id", groupid);           props.put("consumer.id", consumerid);           props.put("zookeeper.connect", KafkaProperties.ZK_CONNECT);           props.put("zookeeper.session.timeout.ms""60000");           props.put("zookeeper.sync.time.ms""2000");           // props.put("auto.commit.interval.ms", "1000");              ConsumerConfig config = new ConsumerConfig(props);           ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);              Map<String, Integer> topicCountMap = new HashMap<String, Integer>();              // 设置每个topic开几个线程           topicCountMap.put(KafkaProperties.TOPIC, threadPerTopic);              // 获取stream           Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);              // 为每个stream启动一个线程消费消息           for (KafkaStream<byte[], byte[]> stream : streams.get(KafkaProperties.TOPIC)) {               new MyStreamThread(stream).start();           }       }          /**       * 每个consumer的内部线程       *        * @author cuilei05       *       */       private class MyStreamThread extends Thread {           private KafkaStream<byte[], byte[]> stream;              public MyStreamThread(KafkaStream<byte[], byte[]> stream) {               super();               this.stream = stream;           }              @Override           public void run() {               ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();                  // 逐条处理消息               while (streamIterator.hasNext()) {                   MessageAndMetadata<byte[], byte[]> message = streamIterator.next();                   String topic = message.topic();                   int partition = message.partition();                   long offset = message.offset();                   String key = new String(message.key());                   String msg = new String(message.message());                   // 在这里处理消息,这里仅简单的输出                   // 如果消息消费失败,可以将已上信息打印到日志中,活着发送到报警短信和邮件中,以便后续处理                   System.out.println("consumerid:" + consumerid + ", thread : " + Thread.currentThread().getName()                           + ", topic : " + topic + ", partition : " + partition + ", offset : " + offset + " , key : "                           + key + " , mess : " + msg);               }           }       }          public static void main(String[] args) {           String groupid = "myconsumergroup";           MyHighLevelConsumer consumer1 = new MyHighLevelConsumer(groupid, "myconsumer1"3);           MyHighLevelConsumer consumer2 = new MyHighLevelConsumer(groupid, "myconsumer2"3);              consumer1.consume();           consumer2.consume();       }   }   这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。

每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的tipic。

转载请注明原文地址: https://www.6miu.com/read-45007.html

最新回复(0)