l kafka集群中的服务器都叫做broker
l kafka有两类客户端,一类叫producer(消息生产者),一类叫做consumer(消息消费者),客户端和broker服务器之间采用tcp协议连接
l kafka中不同业务系统的消息可以通过topic进行区分,而且每一个消息topic都会被分区,以分担消息读写的负载
l 每一个分区都可以有多个副本,以防止数据的丢失
l 某一个分区中的数据如果需要更新,都必须通过该分区所有副本中的leader来更新
l 消费者可以分组,比如有两个消费者组A和B,共同消费一个topic:order_info,A和B所消费的消息不会重复
比如 order_info 中有100个消息,每个消息有一个id,编号从0-99,那么,如果A组消费0-49号,B组就消费50-99号
l 消费者在具体消费某个topic中的消息时,可以指定起始偏移量
1、版本
kafka_2.10-0.8.1.1.tgz
2、集群部署
外网 内网 brokfer.id kafka1 10.10.75.15:9092 192.168.3.15:9092 0 kafka2 10.10.75.16:9092 192.168.3.16:9092 1 kafka3 10.10.75.18:9092 192.168.3.18:9092 23、安装步骤
在15主机上开始部署:
a. 在服务器上解压缩包 ubuntu@jx-vm04:/usr/local/wangpei/cloud/kafka$ tar -zxvf kafka_2.10-0.8.1.1.tgz b.修改配置:到指定目录下:
ubuntu@jx-vm04:/usr/local/wangpei/cloud/kafka/kafka_2.10-0.8.1.1/config$ pwd/usr/local/wangpei/cloud/kafka/kafka_2.10-0.8.1.1/config
<2>修改服务器启动配置参数 server.properties:
配置zookeeper集群:(所以前提是zk集群先要部署好) 也可以配置单点zookeeper,这个不影响,zookpeer.connect的配置根据实际zk节点进行配置,单台zk就配置一个即可。 装一台zk,如下启动,启动正常就可以使用此台机器zk: zookeeper/zookeeper-3.4.6/bin$ sudo ./zkServer.sh start JMX enabled by default Using config: /usr/local/wangpei/cloud/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg Starting zookeeper … STARTED
3、启动kafka服务:
15上启动broker:
ubuntu@jx-vm04:/usr/local/wangpei/cloud/kafka/kafka_2.10-0.8.1.1/bin$ sudo ./kafka-server-start.sh ../config/server.properties 这是前台启动命令。结束当前命令运行状态kafka运行就结束了,所以一般都采用后台启动命令,保持运行状态一直保持。
使用后台启动命令: kafka_2.10-0.8.1.1/bin$ sudo ./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 & 启动完成。
启动过程中可以观测控制台日志: [2017-06-26 16:14:00,972] INFO Initiating client connection, connectString=192.168.3.15:2181,192.168.3.16:2181,192.168.3.18:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@4e561369 (org.apache.zookeeper.ZooKeeper) [2017-06-26 16:14:00,995] INFO Opening socket connection to server /192.168.3.18:2181 (org.apache.zookeeper.ClientCnxn) [2017-06-26 16:14:11,010] INFO Socket connection established to 192.168.3.18/192.168.3.18:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2017-06-26 16:14:11,068] INFO Session establishment complete on server 192.168.3.18/192.168.3.18:2181, sessionid = 0x35ce36cf8a00000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) 发现kafka初始化去轮询链接 zk集群节点。
4、到16、18服务器上同步进行部署:
拷贝到其他两台服务器上:
ubuntu@jx-vm04:/usr/local/wangpei/cloud/kafka$ sudo scp -r kafka_2.10-0.8.1.1 ubuntu@10.10.75.18:/usr/local/wangpei/
修改server.properties:
16上修改:
broker.id=1
root directory for all kafka znodes.
zookeeper.connect=192.168.12.15:2181,192.168.3.16:2181,192.168.3.18:2181
log.dirs=/usr/local/wangpei/cloud/kafka/kafka_2.10-0.8.1.1/kafka-logs
log.dirs=/usr/local/wangpei/kafka_2.10-0.8.1.1/kafka-logs
18上修改:
broker.id=2
zookeeper.connect=192.168.12.15:2181,192.168.3.16:2181,192.168.3.18:2181
log.dirs=/usr/local/wangpei/cloud/kafka/kafka_2.10-0.8.1.1/kafka-logs
log.dirs=/usr/local/wangpei/kafka_2.10-0.8.1.1/kafka-logs
完成启动。
5、在kafka集群中创建一个topic:
创建topic命令:
sudo ./kafka-topics.sh --create --zookeeper 10.10.75.15:2181 --replication-factor 3 --partitions 1 --topic Orderreplication-factor表示该topic需要在不同的broker中保存几份,如果replication-factor设置为3, 表示在3个broker中保存。
目前kafka集群有3个节点,所以replication-factor 配置成3可创建成功。
如果是单个zookeeper节点,就使用如下创建topic命令,不然使用上面的会报错,提示节点不够用。
kafka/kafka_2.10-0.8.1.1/bin$sudo ./kafka-topics.sh --create --zookeeper 10.10.75.15:2181 --replication-factor 1 --partitions 1 --topic Order6、 用一个producer向某一个topic中写入消息
7、 用一个comsumer从某一个topic中读取信息
也可以先启动consumer进行监听,接受kafka消息队列中的消息。
package kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; /** * 从kafka消息队列接收消息 * Created with goDemo * Created by wangpei on 17/2/8. */ public class ConsumerDemo { private static final String topic = "order"; private static final Integer threads = 1; public static void main(String[] args) { Properties props = new Properties(); // props.put("zookeeper.connect", "zookeeperServer1:2181,zookeeperServer2:2181,zookeeperServer3:2181"); // props.put("group.id", "1111"); // props.put("auto.offset.reset", "smallest"); //测试从单个节点接收消息 props.put("zookeeper.connect", "10.10.75.15:2181"); props.put("group.id", "1111"); props.put("auto.offset.reset", "smallest"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, threads); topicCountMap.put("order", 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("order"); for(final KafkaStream<byte[], byte[]> kafkaStream : streams){ new Thread(new Runnable() { @Override public void run() { for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){ String msg = new String(mm.message()); System.out.println(msg); } } }).start(); } } }查看接收情况:
1、启动producer,报错 ERROR Failed to send requests for topics order with correlation ids in [0,8]
[2017-07-10 11:07:06,059] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(order)] from broker [ArrayBuffer(id:0,host:10.10.75.15,port:9092)] failed (kafka.producer.async.DefaultEventHandler:97) [2017-07-10 11:07:06,168] ERROR fetching topic metadata for topics [Set(order)] from broker [ArrayBuffer(id:0,host:10.10.75.15,port:9092)] failed (kafka.utils.Utils$:106) kafka.common.KafkaException: fetching topic metadata for topics [Set(order)] from broker [ArrayBuffer(id:0,host:10.10.75.15,port:9092)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.Producer.send(Producer.scala:77) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at kafka.ProducerDemo.main(ProducerDemo.java:35) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer.send(SyncProducer.scala:113) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) ... 14 more [2017-07-10 11:07:06,170] ERROR Failed to send requests for topics order with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler:97)原因:在配置server.properties里,有一个参数需要配置:
#advertised.host.name=<hostname routable by clients> advertised.host.name=10.10.75.15如果不配置,默认advertised.host.name 为localhost,则需要producer服务部署到此服务器上; 本地访问时,此参数必须配置为外机可以访问的ip地址(注意不要配置成外网访问不了的内网地址)。
另外经验证,server.properties 里:
#zookeeper.connect=192.168.3.15:2182 zookeeper.connect=10.10.75.15:2182不论配置成内外网,均ok。