一、kafka生产者
kafka目前在0.9版本后采用java版本实现,生产者KafkaProducer是线程安全对象,所以我们建议KafkaProducer采用单例模式,多个线程共享一个实例
[java] view plain copy package com.kafka.singleton; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.Random; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class KafkaProducerSingleton { private static final Logger LOGGER = LoggerFactory .getLogger(KafkaProducerSingleton.class); private static KafkaProducer<String, String> kafkaProducer; private Random random = new Random(); private String topic; private int retry; private KafkaProducerSingleton() { } /** * 静态内部类 * * @author tanjie * */ private static class LazyHandler { private static final KafkaProducerSingleton instance = new KafkaProducerSingleton(); } /** * 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例 * * @return */ public static final KafkaProducerSingleton getInstance() { return LazyHandler.instance; } /** * kafka生产者进行初始化 * * @return KafkaProducer */ public void init(String topic,int retry) { this.topic = topic; this.retry = retry; if (null == kafkaProducer) { Properties props = new Properties(); InputStream inStream = null; try { inStream = this.getClass().getClassLoader() .getResourceAsStream("kafka.properties"); props.load(inStream); kafkaProducer = new KafkaProducer<String, String>(props); } catch (IOException e) { LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e); } finally { if (null != inStream) { try { inStream.close(); } catch (IOException e) { LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e); } } } } } /** * 通过kafkaProducer发送消息 * * @param topic * 消息接收主题 * @param partitionNum * 哪一个分区 * @param retry * 重试次数 * @param message * 具体消息值 */ public void sendKafkaMessage(final String message) { /** * 1、如果指定了某个分区,会只讲消息发到这个分区上 2、如果同时指定了某个分区和key,则也会将消息发送到指定分区上,key不起作用 * 3、如果没有指定分区和key,那么将会随机发送到topic的分区中 4、如果指定了key,那么将会以hash<key>的方式发送到分区中 */ ProducerRecord<String, String> record = new ProducerRecord<String, String>( topic, random.nextInt(3), "", message); // send方法是异步的,添加消息到缓存区等待发送,并立即返回,这使生产者通过批量发送消息来提高效率 // kafka生产者是线程安全的,可以单实例发送消息 kafkaProducer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception exception) { if (null != exception) { LOGGER.error("kafka发送消息失败:" + exception.getMessage(), exception); retryKakfaMessage(message); } } }); } /** * 当kafka消息发送失败后,重试 * * @param retryMessage */ private void retryKakfaMessage(final String retryMessage) { ProducerRecord<String, String> record = new ProducerRecord<String, String>( topic, random.nextInt(3), "", retryMessage); for (int i = 1; i <= retry; i++) { try { kafkaProducer.send(record); return; } catch (Exception e) { LOGGER.error("kafka发送消息失败:" + e.getMessage(), e); retryKakfaMessage(retryMessage); } } } /** * kafka实例销毁 */ public void close() { if (null != kafkaProducer) { kafkaProducer.close(); } } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public int getRetry() { return retry; } public void setRetry(int retry) { this.retry = retry; } }
HandlerProducer
[java] view plain copy package com.travelsky.kafka.singleton; public class HandlerProducer implements Runnable { private String message; public HandlerProducer(String message) { this.message = message; } @Override public void run() { KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton .getInstance(); kafkaProducerSingleton.init("test_find",3); System.out.println("当前线程:" + Thread.currentThread().getName() + ",获取的kafka实例:" + kafkaProducerSingleton); kafkaProducerSingleton.sendKafkaMessage("发送消息" + message); } }
kafka.properties
[java] view plain copy bootstrap.servers=master:9092,slave1:9092,slave2:9092 acks=1 retries=0 batch.size=1000 compression.type=gzip #buffer.memory=33554432 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer二、kafka消费者
[java] view plain copy package com.kafka.consumer; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public final class Kafka_Consumer { /** * kafka消费者不是线程安全的 */ private final KafkaConsumer<String, String> consumer; private ExecutorService executorService; public Kafka_Consumer() { Properties props = new Properties(); props.put("bootstrap.servers", "ip,port"); props.put("group.id", "group"); // 关闭自动提交 props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("test_find")); } public void execute() { executorService = Executors.newFixedThreadPool(3); while (true) { ConsumerRecords<String, String> records = consumer.poll(10); if (null != records) { executorService.submit(new ConsumerThread(records, consumer)); } } } public void shutdown() { try { if (consumer != null) { consumer.close(); } if (executorService != null) { executorService.shutdown(); } if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { System.out.println("Timeout"); } } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } } }ConsumerThread
[java] view plain copy package com.kafka.consumer; import java.util.Collections; import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; /** * 多消费者,多个work线程,难保证分区消息消费的顺序性 * * @author tanjie * */ public final class ConsumerThread implements Runnable { private ConsumerRecords<String, String> records; private KafkaConsumer<String, String> consumer; public ConsumerThread(ConsumerRecords<String, String> records, KafkaConsumer<String, String> consumer) { this.records = records; this.consumer = consumer; } @Override public void run() { for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records .records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println("当前线程:" + Thread.currentThread() + "," + "偏移量:" + record.offset() + "," + "主题:" + record.topic() + "," + "分区:" + record.partition() + "," + "获取的消息:" + record.value()); } // 消费者自己手动提交消费的offest,确保消息正确处理后再提交 long lastOffset = partitionRecords.get(partitionRecords.size() - 1) .offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } }Main方法
[java] view plain copy public static void main(String[] args) { Kafka_Consumer kafka_Consumer = new Kafka_Consumer(); try { kafka_Consumer.execute(); Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } finally { kafka_Consumer.shutdown(); } }三、运行效果
先起消费者,再起生产者,运行效果如下
消费者:
[java] view plain copy 当前线程:Thread[pool-1-thread-1,5,main],偏移量:44,主题:test_find,分区:1,获取的消息:发送消息:1 当前线程:Thread[pool-1-thread-2,5,main],偏移量:45,主题:test_find,分区:1,获取的消息:发送消息:2 当前线程:Thread[pool-1-thread-1,5,main],偏移量:46,主题:test_find,分区:1,获取的消息:发送消息:3 当前线程:Thread[pool-1-thread-1,5,main],偏移量:39,主题:test_find,分区:0,获取的消息:发送消息:4 当前线程:Thread[pool-1-thread-2,5,main],偏移量:47,主题:test_find,分区:1,获取的消息:发送消息:5 当前线程:Thread[pool-1-thread-3,5,main],偏移量:40,主题:test_find,分区:0,获取的消息:发送消息:6 当前线程:Thread[pool-1-thread-2,5,main],偏移量:37,主题:test_find,分区:2,获取的消息:发送消息:7 当前线程:Thread[pool-1-thread-2,5,main],偏移量:38,主题:test_find,分区:2,获取的消息:发送消息:8 当前线程:Thread[pool-1-thread-1,5,main],偏移量:48,主题:test_find,分区:1,获取的消息:发送消息:9 当前线程:Thread[pool-1-thread-2,5,main],偏移量:39,主题:test_find,分区:2,获取的消息:发送消息:10生产者:
[java] view plain copy import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.travelsky.kafka.singleton.HandlerProducer; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:applicationContext.xml" }) public class Kafka生产_多线程单实例 { @Test public void testSendMessageSingleton() throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(3); for (int i = 1; i <= 10; i++) { Thread.sleep(1000); executor.submit(new HandlerProducer(":" + i)); } } }
[java] view plain copy 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475 当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475 当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475 当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475 当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475 当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475 当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/charry_a/article/details/79621324