1.1.1 kafka的shell命令的使用
1、创建一个topic bin/kafka-topics.sh –zookeeper zk01:2181 –replication-factor 1 –partitions 1 –create –topic test
2、查看当前集群服务器中的所有topic bin/kafka-topics.sh –list –zookeeper zk01:2181
3、删除topic bin/kafka-topics.sh –delete –zookeeper zk01:2181 –topic test 需要在server.properties配置文件中设置delete.topic.enable=true否则是标记删除或者直接重启
4、通过shell指令发送消息(生产者) bin/kafka-console-producer.sh –broker-list kafka01:9092 –topic test
5、通过shell消费消息(消费者) bin/kafka-console-consumer.sh –zookeeper zk01:2181 –from-beginning –topic test
6、查看消费位置 bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand –zookeeper kafka01:2181 –group wsz:user –describe
kafka-0.9.0.0之后kafka.tools.ConsumerOffsetChecker换成了kafka.admin.ConsumerGroupCommand
7、查看某个Toic的详情
1.1.2 kafka的Java API 1、maven中的pom.xml配置文件
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency>2、producer,Java端生产者生产的消息到指定的topic中必须在server.properties中设置advertised.listeners=PLAINTEXT://kafka01:9092 多个节点上的配置文件都需要配置
public class KafkaProducerSimple { public static final String TOPIC = "test"; public static void main(String []args) { Properties properties = new Properties(); // properties.put("serializer.class", "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("bootstrap.servers","kafka01:9092,kafka02:9092,kafka03:9092"); properties.put("retries" , "1"); properties.put("acks" , "1"); // 创建producer Producer<Integer,String> producer = new KafkaProducer<Integer, String>(properties); // 发送数据 int messageNo = 1; while(true) { String messageStr = new String("Message_" + messageNo++); System.out.println("message="+messageStr); // 调用producer的send方法发送数据 producer.send(new ProducerRecord<Integer, String>(TOPIC , messageNo , messageStr)); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }3、consumer消费者
public class KafkaConsumerSimple{ public static final String TOPIC = "test"; public static void main(String[] args) { // 设置配置文件 Properties properties = new Properties(); // 配置kafka集群列表,可以写全,也可以只一个 properties.put("bootstrap.servers","kafka01:9092,kafka02:9092,kafka03:9092"); // StringDeserializer.class.getCanonicalName() properties.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("key.deserializer" , "org.apache.kafka.common.serialization.IntegerDeserializer"); // 消费者组 properties.put("group.id" , "wsz:user"); // zookeeper会话超时时间 properties.put("zookeeper.session.timeout.ms" , 6000); // 消息同步到follower上的时间 properties.put("zookeeper.sync.time.ms",500); // 客户端消息自动提交时间 properties.put("auto.commit.interval.ms" , 1000); KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); kafkaConsumer.subscribe(Arrays.asList(TOPIC)); while (true) { // 200表示多长时间拉取一次数据 ConsumerRecords<Integer, String> records = kafkaConsumer.poll(200); for (ConsumerRecord<Integer, String> record : records) { System.out.println("topic: "+record.topic() + " key: " + record.key() + " value: " + record.value() + " partition: "+ record.partition()); } } } }