kafka应用实践

xiaoxiao2021-02-28  79

1、简介

Kafka是一个分布式的消息缓存系统,用于日志处理的分布式消息队列。日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)。 当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。高可靠交付对日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理。

2、架构

l kafka集群中的服务器都叫做broker

l kafka有两类客户端,一类叫producer(消息生产者),一类叫做consumer(消息消费者),客户端和broker服务器之间采用tcp协议连接

l kafka中不同业务系统的消息可以通过topic进行区分,而且每一个消息topic都会被分区,以分担消息读写的负载

l 每一个分区都可以有多个副本,以防止数据的丢失

l 某一个分区中的数据如果需要更新,都必须通过该分区所有副本中的leader来更新

l 消费者可以分组,比如有两个消费者组A和B,共同消费一个topic:order_info,A和B所消费的消息不会重复

比如 order_info 中有100个消息,每个消息有一个id,编号从0-99,那么,如果A组消费0-49号,B组就消费50-99号

l 消费者在具体消费某个topic中的消息时,可以指定起始偏移量

3、安装

1、版本

kafka_2.10-0.8.1.1.tgz

2、集群部署

外网 内网 brokfer.id kafka1 10.10.75.15:9092 192.168.3.15:9092 0 kafka2 10.10.75.16:9092 192.168.3.16:9092 1 kafka3 10.10.75.18:9092 192.168.3.18:9092 2

3、安装步骤

在15主机上开始部署

a. 在服务器上解压缩包 ubuntu@jx-vm04:/usr/local/wangpei/cloud/kafka$ tar -zxvf kafka_2.10-0.8.1.1.tgz b.修改配置:

到指定目录下:

ubuntu@jx-vm04:/usr/local/wangpei/cloud/kafka/kafka_2.10-0.8.1.1/config$ pwd

/usr/local/wangpei/cloud/kafka/kafka_2.10-0.8.1.1/config

<2>修改服务器启动配置参数 server.properties:

配置zookeeper集群:(所以前提是zk集群先要部署好) 也可以配置单点zookeeper,这个不影响,zookpeer.connect的配置根据实际zk节点进行配置,单台zk就配置一个即可。 装一台zk,如下启动,启动正常就可以使用此台机器zk: zookeeper/zookeeper-3.4.6/bin$ sudo ./zkServer.sh start JMX enabled by default Using config: /usr/local/wangpei/cloud/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg Starting zookeeper … STARTED

3、启动kafka服务:

15上启动broker:

ubuntu@jx-vm04:/usr/local/wangpei/cloud/kafka/kafka_2.10-0.8.1.1/bin$ sudo ./kafka-server-start.sh ../config/server.properties 这是前台启动命令。结束当前命令运行状态kafka运行就结束了,所以一般都采用后台启动命令,保持运行状态一直保持。

使用后台启动命令: kafka_2.10-0.8.1.1/bin$ sudo ./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 & 启动完成。

启动过程中可以观测控制台日志: [2017-06-26 16:14:00,972] INFO Initiating client connection, connectString=192.168.3.15:2181,192.168.3.16:2181,192.168.3.18:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@4e561369 (org.apache.zookeeper.ZooKeeper) [2017-06-26 16:14:00,995] INFO Opening socket connection to server /192.168.3.18:2181 (org.apache.zookeeper.ClientCnxn) [2017-06-26 16:14:11,010] INFO Socket connection established to 192.168.3.18/192.168.3.18:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2017-06-26 16:14:11,068] INFO Session establishment complete on server 192.168.3.18/192.168.3.18:2181, sessionid = 0x35ce36cf8a00000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) 发现kafka初始化去轮询链接 zk集群节点。

4、到16、18服务器上同步进行部署:

拷贝到其他两台服务器上:

ubuntu@jx-vm04:/usr/local/wangpei/cloud/kafka$ sudo scp -r kafka_2.10-0.8.1.1 ubuntu@10.10.75.18:/usr/local/wangpei/

修改server.properties:

16上修改:

broker.id=1

root directory for all kafka znodes.

zookeeper.connect=192.168.12.15:2181,192.168.3.16:2181,192.168.3.18:2181

log.dirs=/usr/local/wangpei/cloud/kafka/kafka_2.10-0.8.1.1/kafka-logs

log.dirs=/usr/local/wangpei/kafka_2.10-0.8.1.1/kafka-logs

18上修改:

broker.id=2

zookeeper.connect=192.168.12.15:2181,192.168.3.16:2181,192.168.3.18:2181

log.dirs=/usr/local/wangpei/cloud/kafka/kafka_2.10-0.8.1.1/kafka-logs

log.dirs=/usr/local/wangpei/kafka_2.10-0.8.1.1/kafka-logs

完成启动。

5、在kafka集群中创建一个topic:

创建topic命令:

sudo ./kafka-topics.sh --create --zookeeper 10.10.75.15:2181 --replication-factor 3 --partitions 1 --topic Order

replication-factor表示该topic需要在不同的broker中保存几份,如果replication-factor设置为3, 表示在3个broker中保存。

目前kafka集群有3个节点,所以replication-factor 配置成3可创建成功。

如果是单个zookeeper节点,就使用如下创建topic命令,不然使用上面的会报错,提示节点不够用。

kafka/kafka_2.10-0.8.1.1/bin$sudo ./kafka-topics.sh --create --zookeeper 10.10.75.15:2181 --replication-factor 1 --partitions 1 --topic Order

6、 用一个producer向某一个topic中写入消息

7、 用一个comsumer从某一个topic中读取信息

4、发消息

package kafka; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; /** * kafka消息队列消息发送方(生产者) * Created with goDemo * Created by wangpei on 17/2/8. */ public class ProducerDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); // props.put("zk.connect", "zookeeperServer1:2181,zookeeperServer2:2181,zookeeperServer3:2181"); // props.put("metadata.broker.list","zookeeperServer1:9092,zookeeperServer2:9092,zookeeperServer3:9092"); // props.put("serializer.class", "kafka.serializer.StringEncoder"); //测试单点kafka节点发送消息 props.put("zookeeper.connect", "10.10.75.15:2181"); // props.put("advertised.host.name","10.10.75.15:9092"); props.put("metadata.broker.list", "10.10.75.15:9092"); // 获取元信息 props.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); // 发送业务消息 // 读取文件 读取内存数据库 读socket端口 for (int i = 1; i <= 100; i++) { Thread.sleep(500); producer.send(new KeyedMessage<String, String>("order", "我是第" + i + "次来向kafka插入消息 !")); } } } 运行此main方法就使用线程定时向broker发送消息,发送完成此方法结束,消息数据进入消息队列。

5、收消息

也可以先启动consumer进行监听,接受kafka消息队列中的消息。

package kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; /** * 从kafka消息队列接收消息 * Created with goDemo * Created by wangpei on 17/2/8. */ public class ConsumerDemo { private static final String topic = "order"; private static final Integer threads = 1; public static void main(String[] args) { Properties props = new Properties(); // props.put("zookeeper.connect", "zookeeperServer1:2181,zookeeperServer2:2181,zookeeperServer3:2181"); // props.put("group.id", "1111"); // props.put("auto.offset.reset", "smallest"); //测试从单个节点接收消息 props.put("zookeeper.connect", "10.10.75.15:2181"); props.put("group.id", "1111"); props.put("auto.offset.reset", "smallest"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, threads); topicCountMap.put("order", 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("order"); for(final KafkaStream<byte[], byte[]> kafkaStream : streams){ new Thread(new Runnable() { @Override public void run() { for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){ String msg = new String(mm.message()); System.out.println(msg); } } }).start(); } } }

查看接收情况:

6、问题记录

1、启动producer,报错 ERROR Failed to send requests for topics order with correlation ids in [0,8]

[2017-07-10 11:07:06,059] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(order)] from broker [ArrayBuffer(id:0,host:10.10.75.15,port:9092)] failed (kafka.producer.async.DefaultEventHandler:97) [2017-07-10 11:07:06,168] ERROR fetching topic metadata for topics [Set(order)] from broker [ArrayBuffer(id:0,host:10.10.75.15,port:9092)] failed (kafka.utils.Utils$:106) kafka.common.KafkaException: fetching topic metadata for topics [Set(order)] from broker [ArrayBuffer(id:0,host:10.10.75.15,port:9092)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.Producer.send(Producer.scala:77) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at kafka.ProducerDemo.main(ProducerDemo.java:35) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer.send(SyncProducer.scala:113) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) ... 14 more [2017-07-10 11:07:06,170] ERROR Failed to send requests for topics order with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler:97)

原因:在配置server.properties里,有一个参数需要配置:

#advertised.host.name=<hostname routable by clients> advertised.host.name=10.10.75.15

如果不配置,默认advertised.host.name 为localhost,则需要producer服务部署到此服务器上; 本地访问时,此参数必须配置为外机可以访问的ip地址(注意不要配置成外网访问不了的内网地址)。

另外经验证,server.properties 里:

zookeeper.connect=192.168.3.15:2182 zookeeper.connect=10.10.75.15:2182

不论配置成内外网,均ok。

转载请注明原文地址: https://www.6miu.com/read-41402.html

最新回复(0)