生产者将信息输入到集群中, 那么消费者就要能从集群中取出所需要的信息。
主要的类就是: KafkaConsumer<K,V>
一、原理
Kafka的一个分区的每一个记录保持一个数值偏移。这个偏移量作为该分区内记录的惟一标识符,并表示该分区中的使用者的位置。例如,处于位置5的消费者使用偏移量0到4的记录,然后将使用偏移量5来接收记录。
消费者api提供了覆盖各种消费用例的灵活性。
二、自动位移提交
以下是完成的程序案例
package wangxn.testConsumer; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; public class autoConsumer { private static KafkaConsumer<String, String> consumer; public static void main(String[] args) { // TODO Auto-generated method stub Properties props = new Properties(); props.put("bootstrap.servers", "mymac:9092"); props.put("group.id", "DemoConsumer"); props.put("enable.auto.commit", "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put("auto.commit.interval.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //consumer.subscribe(Arrays.asList("foo", "bar")); consumer = new KafkaConsumer<>(props); while (true) { consumer.subscribe(Collections.singletonList("my-topic")); ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); //consumer.close(); } } } 其中,设置 enable.auto.commit 为 true, 就是偏移量可以按照一定的频率自动提交。 auto.commit.interval.ms 就是用来设置频率的。程序是不断运行的。运行结果就是:
producer 端发送消息:
$ kafka-console-producer.sh --broker-list mymac:9092 --topic my-topic
>dajia >ceshi >ok, quickly >
本程序将接收消息:
三、手动偏移量控制
这将设置 enable.auto.commit 为 false
用户不必依赖于使用者定期提交消耗的偏移量,还可以控制记录何时被视为消耗,从而提交他们的偏移量。当消息的消费与某些处理逻辑结合在一起时,这是很有用的,因此在完成处理之前,不应该将消息视为消费。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } } 在本例中,我们将使用一批记录,并在内存中对它们进行批处理。当我们有足够的记录时,我们将把它们插入到一个数据库中。如果我们允许在前面的示例中使用补偿来进行自动提交,那么在Poll 中返回给用户之后,记录将被认为是消耗的。这样,我们的进程在批处理记录之后可能会失败,但是在它们被插入到数据库之前。 为了避免这种情况,我们将在将相应的记录插入到数据库之后手工提交补偿。这使我们精确地控制了记录被认为是消耗的时间。这就产生了相反的可能性:在插入到数据库之后,进程可能会失败,但是在提交之前(尽管可能只有几毫秒,但这是有可能的)。在这种情况下,接管消费的过程将消耗最后一次提交的偏移量,并将重复最后一批数据的插入。使用这种方式,Kafka提供了通常被称为“最少一次”的交付保证,因为每一次记录都可能被交付一次,但在失败案例中可能会被重复。
注意:使用自动偏移量提交也可以给您“至少一次”的交付,但是需求是您必须在任何后续调用之前,或者在关闭客户之前,使用从每个调用返回的所有数据(给 Poll)。如果您没有执行这两种操作,那么就有可能被提交的偏移量超出所消耗的位置,从而导致丢失的记录。使用手动补偿控制的优点是,当记录被认为是“消耗”时,您可以直接控制。
上面的例子是用 CommitSync 去标记所有已经提交的接收记录。在某些情况下,您可能希望通过显式地指定偏移量来更好地控制哪些记录被提交。在下面的例子中,我们在处理完每个分区中的记录之后,提交了偏移量。
try { while(running) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }注意:提交的偏移量应该始终是应用程序将要读取的下一条消息的偏移量。因此,当调用 commitSync(offsets) 时,您应该将一个添加到处理的最后一条消息的偏移量。
四、手动分配 Partition
String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
五、采用多线程来进行处理
下面是完整的程序,不解释。
package wangxn.testkafka; import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; public class myConsumer extends ShutdownableThread { private final KafkaConsumer<Integer, String> consumer; private final String topic; public myConsumer(String topic) { super("KafkaConsumerExample", false); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "mymac:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<Integer,String>(props); this.topic = topic; } @Override public void doWork() { consumer.subscribe(Collections.singletonList(this.topic)); // System.err.println("consumer.subscribe!"); ConsumerRecords<Integer, String> records = consumer.poll(3000); for (ConsumerRecord<Integer, String> record : records) System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } @Override public String name() { return null; } @Override public boolean isInterruptible() { return false; } }