kafka的内部实现、安装和使用

xiaoxiao2021-02-28  13

原理解析

producer创建一个topic时,可以指定该topic为几个partition(默认是1,配置num.partitions),然后会把partition分配到每个broker上,分配的算法是:a个broker,第b个partition分配到b%a的broker上,可以指定有每个partition有几分副本Replication,副本的分配策略为:第c个副本存储在第(b+c)%a的broker上。一个partition在每个broker上是一个文件夹,文件夹中文件的命名方式为:topic名称+有序序号。每个partition中文件是一个个的segment,segment file由.index和.log文件组成。两个文件的命名规则是,上一个segmentfile的最后一个offset。这样,可以快速的删除old文件。

producer往kafka里push数据,会自动的push到所有的分区上,消息是否push成功有几种情况:1,接收到partition的ack就算成功,2全部副本都写成功才算成功;数据可以存储多久,默认是两天;producer的数据会先存到缓存中,等大小或时间达到阈值时,flush到磁盘,consumer只能读到磁盘中的数据。

consumer从kafka里poll数据,poll到一定配置大小的数据放到内存中处理。每个group里的consumer共同消费全部的消息,不同group里的数据不能消费同样的数据,即每个group消费一组数据;consumer的数量和partition的数量相等时消费的效率最高。

这样,kafka可以横向的扩充broker数量和partitions;数据顺序写入磁盘;producer和consumer异步

kafka安装

上传到focuson1,解压,配置server.properties.然后考到focuson2、focuson3.

3个broker,broker.id,分别为0,1,2.其余的见如下配置,

vi config/server.properties broker.id=0 num.partitions=3 zookeeper.connect=focuson1:2181,focuson2:2181,focuson3:2181  启动,在三个节点分别执行:

nohup ./bin/kafka-server-start.sh config/server.properties & 创建一个topic并查看所有的topic,可指定该topic的分区,副本数量 [root@focuson1 kafka_2.11-1.1.0]# bin/kafka-topics.sh --create --zookeeper focuson1:2181 --replication-factor 2 --partitions 3 --topic focuson_test1 WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. Created topic "focuson_test1". [root@focuson1 kafka_2.11-1.1.0]# bin/kafka-topics.sh --list --zookeeper focuson2:2181 focuson_test1   使用shell启动该topic的producer,可在里面写入数据。

bin/kafka-console-producer.sh --broker-list focuson1:9092,focuson2:9092,focuson3:9092 --topic focuson_test1 查看各个broker的数据分区情况,以及副本存储情况 [root@focuson1 kafka-logs]# ll total 28 -rw-r--r-- 1 root root 4 May 12 08:31 cleaner-offset-checkpoint drwxr-xr-x 2 root root 4096 May 12 08:32 focuson_test1-0 drwxr-xr-x 2 root root 4096 May 12 08:32 focuson_test1-1 -rw-r--r-- 1 root root 4 May 12 08:48 log-start-offset-checkpoint -rw-r--r-- 1 root root 54 May 12 08:09 meta.properties -rw-r--r-- 1 root root 40 May 12 08:48 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 40 May 12 08:49 replication-offset-checkpoint [root@focuson1 kafka-logs]# ll focuson_test1-0 total 8 -rw-r--r-- 1 root root 10485760 May 12 08:24 00000000000000000000.index -rw-r--r-- 1 root root 162 May 12 08:33 00000000000000000000.log -rw-r--r-- 1 root root 10485756 May 12 08:24 00000000000000000000.timeindex -rw-r--r-- 1 root root 8 May 12 08:32 leader-epoch-checkpoint [root@focuson2 kafka-logs]# ll total 24 -rw-r--r-- 1 root root    0 May 12 08:11 cleaner-offset-checkpoint drwxr-xr-x 2 root root 4096 May 12 08:32 focuson_test1-1 drwxr-xr-x 2 root root 4096 May 12 08:32 focuson_test1-2 -rw-r--r-- 1 root root    4 May 12 08:33 log-start-offset-checkpoint -rw-r--r-- 1 root root   54 May 12 08:11 meta.properties -rw-r--r-- 1 root root   40 May 12 08:33 recovery-point-offset-checkpoint -rw-r--r-- 1 root root   40 May 12 08:33 replication-offset-checkpoint [root@focuson3 kafka-logs]# ll total 24 -rw-r--r-- 1 root root    0 May 12 08:12 cleaner-offset-checkpoint drwxr-xr-x 2 root root 4096 May 12 08:32 focuson_test1-0 drwxr-xr-x 2 root root 4096 May 12 08:32 focuson_test1-2 -rw-r--r-- 1 root root    4 May 12 08:45 log-start-offset-checkpoint -rw-r--r-- 1 root root   54 May 12 08:12 meta.properties -rw-r--r-- 1 root root   40 May 12 08:45 recovery-point-offset-checkpoint -rw-r--r-- 1 root root   40 May 12 08:46 replication-offset-checkpoint

zookeeper与kafka

broker 会在zookeeper上注册临时节点,当该broker临时节点断开时,能监控到变化,可以是producer等进行负载均衡

topic。zookeeper上会有每个topic的partition的分布情况,当broker启动或添加时,会到topic下把自己加入到对应partition的isr(In-Sync Replicas的缩写,表示副本同步队列)列表。同样的,当broker退出时,也会触发zookeeper更新其对应topic分区的isr列表,并决定是否需要做消费者的负载均衡。

kafka consumer 0.9之后不需要使用zookeeper,但zookeeper-based模式仍然支持。主要实现是注册消费者到zookeeper下,记录每次的offset等信息,并对每个consumer进行监控等。
转载请注明原文地址: https://www.6miu.com/read-2050187.html

最新回复(0)