Kafka多线程生产消费

xiaoxiao2021-02-28  33

一、kafka生产者

       kafka目前在0.9版本后采用java版本实现,生产者KafkaProducer是线程安全对象,所以我们建议KafkaProducer采用单例模式,多个线程共享一个实例

    

[java]  view plain  copy package com.kafka.singleton;    import java.io.IOException;  import java.io.InputStream;  import java.util.Properties;  import java.util.Random;    import org.apache.kafka.clients.producer.Callback;  import org.apache.kafka.clients.producer.KafkaProducer;  import org.apache.kafka.clients.producer.ProducerRecord;  import org.apache.kafka.clients.producer.RecordMetadata;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;    public final class KafkaProducerSingleton {        private static final Logger LOGGER = LoggerFactory              .getLogger(KafkaProducerSingleton.class);        private static KafkaProducer<String, String> kafkaProducer;        private Random random = new Random();        private String topic;        private int retry;        private KafkaProducerSingleton() {        }              /**      * 静态内部类      *       * @author tanjie      *      */      private static class LazyHandler {            private static final KafkaProducerSingleton instance = new KafkaProducerSingleton();      }        /**      * 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例      *       * @return      */      public static final KafkaProducerSingleton getInstance() {          return LazyHandler.instance;      }        /**      * kafka生产者进行初始化      *       * @return KafkaProducer      */      public void init(String topic,int retry) {          this.topic = topic;          this.retry = retry;          if (null == kafkaProducer) {              Properties props = new Properties();              InputStream inStream = null;              try {                  inStream = this.getClass().getClassLoader()                          .getResourceAsStream("kafka.properties");                  props.load(inStream);                  kafkaProducer = new KafkaProducer<String, String>(props);              } catch (IOException e) {                  LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);              } finally {                  if (null != inStream) {                      try {                          inStream.close();                      } catch (IOException e) {                          LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);                      }                  }              }          }      }        /**      * 通过kafkaProducer发送消息      *       * @param topic      *            消息接收主题      * @param partitionNum      *            哪一个分区      * @param retry      *            重试次数      * @param message      *            具体消息值      */      public void sendKafkaMessage(final String message) {          /**          * 1、如果指定了某个分区,会只讲消息发到这个分区上 2、如果同时指定了某个分区和key,则也会将消息发送到指定分区上,key不起作用          * 3、如果没有指定分区和key,那么将会随机发送到topic的分区中 4、如果指定了key,那么将会以hash<key>的方式发送到分区中          */          ProducerRecord<String, String> record = new ProducerRecord<String, String>(                  topic, random.nextInt(3), "", message);          // send方法是异步的,添加消息到缓存区等待发送,并立即返回,这使生产者通过批量发送消息来提高效率          // kafka生产者是线程安全的,可以单实例发送消息          kafkaProducer.send(record, new Callback() {              public void onCompletion(RecordMetadata recordMetadata,                      Exception exception) {                  if (null != exception) {                      LOGGER.error("kafka发送消息失败:" + exception.getMessage(),                              exception);                      retryKakfaMessage(message);                  }              }          });      }        /**      * 当kafka消息发送失败后,重试      *       * @param retryMessage      */      private void retryKakfaMessage(final String retryMessage) {          ProducerRecord<String, String> record = new ProducerRecord<String, String>(                  topic, random.nextInt(3), "", retryMessage);          for (int i = 1; i <= retry; i++) {              try {                  kafkaProducer.send(record);                  return;              } catch (Exception e) {                  LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);                  retryKakfaMessage(retryMessage);              }          }      }        /**      * kafka实例销毁      */      public void close() {          if (null != kafkaProducer) {              kafkaProducer.close();          }      }        public String getTopic() {          return topic;      }        public void setTopic(String topic) {          this.topic = topic;      }        public int getRetry() {          return retry;      }        public void setRetry(int retry) {          this.retry = retry;      }    }  

   HandlerProducer

  

[java]  view plain  copy package com.travelsky.kafka.singleton;      public class HandlerProducer implements Runnable {        private String message;        public HandlerProducer(String message) {          this.message = message;      }        @Override      public void run() {          KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton                  .getInstance();          kafkaProducerSingleton.init("test_find",3);          System.out.println("当前线程:" + Thread.currentThread().getName()                  + ",获取的kafka实例:" + kafkaProducerSingleton);          kafkaProducerSingleton.sendKafkaMessage("发送消息" + message);      }    }  

    kafka.properties

[java]  view plain  copy bootstrap.servers=master:9092,slave1:9092,slave2:9092  acks=1  retries=0  batch.size=1000  compression.type=gzip  #buffer.memory=33554432  key.serializer=org.apache.kafka.common.serialization.StringSerializer  value.serializer=org.apache.kafka.common.serialization.StringSerializer  

  二、kafka消费者

[java]  view plain  copy package com.kafka.consumer;    import java.util.Arrays;  import java.util.Properties;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;  import java.util.concurrent.TimeUnit;    import org.apache.kafka.clients.consumer.ConsumerRecords;  import org.apache.kafka.clients.consumer.KafkaConsumer;    public final class Kafka_Consumer {        /**      * kafka消费者不是线程安全的      */      private final KafkaConsumer<String, String> consumer;        private ExecutorService executorService;        public Kafka_Consumer() {          Properties props = new Properties();          props.put("bootstrap.servers",                  "ip,port");          props.put("group.id""group");          // 关闭自动提交          props.put("enable.auto.commit""false");          props.put("auto.commit.interval.ms""1000");          props.put("session.timeout.ms""30000");          props.put("key.deserializer",                  "org.apache.kafka.common.serialization.StringDeserializer");          props.put("value.deserializer",                  "org.apache.kafka.common.serialization.StringDeserializer");          consumer = new KafkaConsumer<String, String>(props);          consumer.subscribe(Arrays.asList("test_find"));      }        public void execute() {          executorService = Executors.newFixedThreadPool(3);          while (true) {              ConsumerRecords<String, String> records = consumer.poll(10);              if (null != records) {                  executorService.submit(new ConsumerThread(records, consumer));              }          }      }        public void shutdown() {          try {              if (consumer != null) {                  consumer.close();              }              if (executorService != null) {                  executorService.shutdown();              }              if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {                  System.out.println("Timeout");              }          } catch (InterruptedException ignored) {              Thread.currentThread().interrupt();          }      }    }  

    ConsumerThread

[java]  view plain  copy package com.kafka.consumer;    import java.util.Collections;  import java.util.List;    import org.apache.kafka.clients.consumer.ConsumerRecord;  import org.apache.kafka.clients.consumer.ConsumerRecords;  import org.apache.kafka.clients.consumer.KafkaConsumer;  import org.apache.kafka.clients.consumer.OffsetAndMetadata;  import org.apache.kafka.common.TopicPartition;    /**  * 多消费者,多个work线程,难保证分区消息消费的顺序性  *   * @author tanjie  *  */  public final class ConsumerThread implements Runnable {        private ConsumerRecords<String, String> records;        private KafkaConsumer<String, String> consumer;        public ConsumerThread(ConsumerRecords<String, String> records,              KafkaConsumer<String, String> consumer) {          this.records = records;          this.consumer = consumer;      }        @Override      public void run() {          for (TopicPartition partition : records.partitions()) {              List<ConsumerRecord<String, String>> partitionRecords = records                      .records(partition);              for (ConsumerRecord<String, String> record : partitionRecords) {                  System.out.println("当前线程:" + Thread.currentThread() + ","                          + "偏移量:" + record.offset() + "," + "主题:"                          + record.topic() + "," + "分区:" + record.partition()                          + "," + "获取的消息:" + record.value());              }              // 消费者自己手动提交消费的offest,确保消息正确处理后再提交              long lastOffset = partitionRecords.get(partitionRecords.size() - 1)                      .offset();              consumer.commitSync(Collections.singletonMap(partition,                      new OffsetAndMetadata(lastOffset + 1)));          }      }  }  

   Main方法

[java]  view plain  copy public static void main(String[] args) {          Kafka_Consumer kafka_Consumer = new Kafka_Consumer();          try {              kafka_Consumer.execute();              Thread.sleep(20000);          } catch (InterruptedException e) {              e.printStackTrace();          } finally {              kafka_Consumer.shutdown();          }      }  

 三、运行效果

     先起消费者,再起生产者,运行效果如下

     消费者:

[java]  view plain  copy 当前线程:Thread[pool-1-thread-1,5,main],偏移量:44,主题:test_find,分区:1,获取的消息:发送消息:1  当前线程:Thread[pool-1-thread-2,5,main],偏移量:45,主题:test_find,分区:1,获取的消息:发送消息:2  当前线程:Thread[pool-1-thread-1,5,main],偏移量:46,主题:test_find,分区:1,获取的消息:发送消息:3  当前线程:Thread[pool-1-thread-1,5,main],偏移量:39,主题:test_find,分区:0,获取的消息:发送消息:4  当前线程:Thread[pool-1-thread-2,5,main],偏移量:47,主题:test_find,分区:1,获取的消息:发送消息:5  当前线程:Thread[pool-1-thread-3,5,main],偏移量:40,主题:test_find,分区:0,获取的消息:发送消息:6  当前线程:Thread[pool-1-thread-2,5,main],偏移量:37,主题:test_find,分区:2,获取的消息:发送消息:7  当前线程:Thread[pool-1-thread-2,5,main],偏移量:38,主题:test_find,分区:2,获取的消息:发送消息:8  当前线程:Thread[pool-1-thread-1,5,main],偏移量:48,主题:test_find,分区:1,获取的消息:发送消息:9  当前线程:Thread[pool-1-thread-2,5,main],偏移量:39,主题:test_find,分区:2,获取的消息:发送消息:10  

    生产者:

  

[java]  view plain  copy import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;    import org.junit.Test;  import org.junit.runner.RunWith;  import org.springframework.test.context.ContextConfiguration;  import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;    import com.travelsky.kafka.singleton.HandlerProducer;    @RunWith(SpringJUnit4ClassRunner.class)  @ContextConfiguration(locations = { "classpath:applicationContext.xml" })  public class Kafka生产_多线程单实例 {              @Test      public void testSendMessageSingleton() throws InterruptedException {          ExecutorService executor = Executors.newFixedThreadPool(3);          for (int i = 1; i <= 10; i++) {              Thread.sleep(1000);              executor.submit(new HandlerProducer(":" + i));          }      }    }  

   

[java]  view plain  copy 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  

 

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/charry_a/article/details/79621324
转载请注明原文地址: https://www.6miu.com/read-2623908.html

最新回复(0)