最近公司的kafka集群出现了节点已经失效但是节点进程和端口都还在的情况,目前我们的系统监控只是做到了进程监控,即为整个集群的每台机群建立进程快照,如果进程(如NameNode、kakfa broker)丢失,则报警并立刻自动重启进程。但是这次的kafka事故进程和端口都还在,因此报警系统没有能够及时报警,因此对此次事故发生的过程和解决方式做详细的分析。 首先,我们一个同学使用kafka的过程中发现消息无法消费,因此进入进群进行如下检查: 进程和端口:我们的kafka的3个broker,进程和端口都在,正常使用kakfa-console-producer进行消息的生产,抛出异常 使用kakfa-console-consumer进行消息的消费,抛出异常 使用kafka-topics --describe进行topic的详细情况的分析,发现,partition 和 Isr(In-Sync Replication)竟然只剩下一台机器 我们知道,kafka在创建topic的时候会指定partition数量和replication数量,对于每一个partition,都会有一个broker作为leader broker,剩余的broker作为slave broker。我们猜想在我们的代码中生产的消息应该已经丢失。因此进行验证。在紧急重启了假死的两台broker以后,我们开始对消息丢失情况进行验证,令人惊讶的是,没有发生消息丢失。但是,为了以防万一,无论消息是否丢失,我们都必须找到足够的证据。
我们的topic属性是2个partition、2个replication组成,当我们发现从这个topic消费消息发生异常的时候,我们打印了这个topic的描述信息:
[appuser@hz-10-120-241-50 kafka]$ bin/kafka-topics.sh --describe --topic wuchang1 --zookeeper 10.120.241.50:2181 Topic:wuchang1 PartitionCount:2 ReplicationFactor:2 Configs: Topic: wuchang1 Partition: 0 Leader: 110 Replicas: 110,50 Isr: 110 Topic: wuchang1 Partition: 1 Leader: -1 Replicas: 50,82 Isr:这个信息其实是我们在测试环境复现出来的现场。我觉得,一个资深的软件工程师,非常注重对事故现场的复现,因为只有成功地复现问题,才能根本地解决问题。在正常情况下,3个broker工作正常,它的描述信息是这样的:
[appuser@hz-10-120-241-50 kafka]$ bin/kafka-topics.sh --describe --topic wuchang1 --zookeeper 10.120.241.50:2181 Topic:wuchang1 PartitionCount:2 ReplicationFactor:2 Configs: Topic: wuchang1 Partition: 0 Leader: 110 Replicas: 110,50 Isr: 110,50 Topic: wuchang1 Partition: 1 Leader: 50 Replicas: 50,82 Isr: 50,82为了让我们的kafka cluster能够容忍部分机器宕机,我们的生产环境和测试环境打开了leader 自动选举:auto.leader.rebalance.enable=true 这样,当任何一个TopicPartition的leader丢失,Controller会启动一个监控线程监控所有partition的Leader状态,如果发现某个Topic-Partition的leader丢失,则该线程会为该Leader启动重新选举,代码在KafkaController.scala中:
def onControllerFailover() { //省略 if (config.autoLeaderRebalanceEnable) { info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS) } deleteTopicManager.start() } else info("Controller has been shut down, aborting startup/failover") }方法checkAndTriggerPartitionRebalance()就是完成对所有Topic-Partition的leader检查,这个作为参数被定时调用,注意,Controller与Leader不同,Leader是针对某个Topic-Partition而言,而Controller是整个集群的Controller。 为了能够在自动leader选举打开的情况下让某个Topic-Partition失去leader,我们将这个topic的partition 1对应的两个replication 全部kill,这样,即使自动leader检查打开,由于partition-1 已经不存在任何一个活着的replication,因此无从选举出一个leader,此时,这个partition已经不再工作,partition-0也只有仅剩的一个broker来作为leader。 我们当时在测试环境复现问题的时候,在自动leader选举打开的情况下,只要某个partition的replication中有一个还活着(即ISR中还有任何一个broker),这个broker就会被自动选举为leader。这就是Kafka高可用性的一个体现。只有当一个topic的全部replication全部丢失,这个kafka的这个Topic-Partitioin才会变为不可用状态。 反过来看,如果我们的topic的replication-factor设置为2,那么,在自动leader rebalance打开的情况下,任何两台broker丢失,都不会对任何partition造成影响,除非这个Topic-Partition的三个replication全部挂掉。
现在现场已经复现,我们就来验证这种情况下消息是否丢失。
我们线上环境生产消息的代码来源于nginx lua插件,用来将nginx收的的用户访问信息发送到kafka:
topic = args["pbtype"] -- topic = "LivyRoomMsg" if (topic == "LivyRoomMsg") then msgJson = Convert_GjsWebLiveRoomWechatUserLogin(msgJson,args["messageSentTime"]) end local ok, err = bp:send(topic, nil, msgJson)从代码片段中我们可以看到,发送消息的时候,key为null。我们使用java代码,同样设置key为null,发送消息到我们测试环境的现场,的确消息未丢失,发送的所有消息都被打倒到了leader存在的partition上面了。 我们知道,Kafka通过key信息决定了消息发送到哪个broker,我们使用的是默认的Partitioner, Kafka默认的Partitioner是DefaultPartitioner,核心方法是partition():
/** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { //从可用的partitioner中选择一个 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else {//从所有集群中选择一个 // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else {// 只要有key , 就按照key去确定 // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } /** * topicCounterMap维护了每个topic的一个计数器,这个计数器用来通过Round-Robin方式选择一个partition * @param topic * @return */ private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(new Random().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); }默认partitioner的消息分派逻辑是:
如果存在key,则通过Round-Robin的方式,从该topic的所有partition中选择一个分区,即,无论现在分区的状态如何,一旦key确定,对应的broker就确定了,到5;如果key为空,则到3;如果这个topic还存在可用的Partition(还存在leader的partition),则通过Round-Robin的方式,以这个topic递增的随机数作为种子,从这些可用的partition中选择一个partition,将消息发送到这个partition,否则到4;如果这个topic没有任何一个可用的Partition,则通过Round-Robin的方式,以这个topic递增的随机数作为种子,从所有partition中选择一个partition发送消息。很显然,如果选择的partition不可用,消息发送失败;退出DefaultPartition 使用 topicCounterMap来维护每个topic用来通过Round-Robin方式选择partition的序列号,key是所有topic的名字,value是一个整数计数器,每次进行一次选择则自增1,保证所有partition被依次使用到。
在我们使用bin/kafka-console-producer.sh的命令行工具生产消息的时候,其实最终也是调用了。本文中,我并不急于让大家知道问题原因,而希望逐步拨云见日,让大家从代码层面循序渐渐,逐步接近问题真想。这样做,不仅仅能够找到问题原因,更能够学到知识,而不仅仅是确认了或者解决了一个问题。
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"运行kakfa-console-producer.sh时,默认java进程堆内存大小为512M,当然,如果我们可以自行设置和修改。实际执行的,是ConsoleProducer类,显然,ConsoleProducer负责从命令行中读取我们输入的消息,然后生产到Kafka Server。看过$KAFKA_HOME/bin目录下面的脚本代码你就会知道,kafka的代码重用做得非常好,即使是脚本,也充分重用。$KAFKA_HOME/bin/kafka-run-class.sh是一个公共启动类,无论我们调用kakfa-console-producer.sh、kakfa-console-consumer.sh、kafka-topics.sh等等脚本,都是通过kafka-run-class.sh运行起来的,只需要告诉kafka-run-class.sh需要启动的java类以及额外的启动参数,kafka-run-class.sh就会运行这个java类,添加上这些额外的启动参数,以及一些共用的、必须的classpath。 因此,我们继续来看ConsoleProducer的实现,了解这个类的实现机制,直接关系到我们最常用的kafka-console-producer的命令的行为:
def main(args: Array[String]) { try { val config = new ProducerConfig(args) val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader] reader.init(System.in, getReaderProps(config)) val producer = if(config.useOldProducer) { new OldProducer(getOldProducerProps(config)) } else { //NewShinyProducer只是对org.apache.kafka.clients.producer.KafkaProducer //进行了简单封装,底层还是用org.apache.kafka.clients.producer.KafkaProducer发送消息 new NewShinyProducer(getNewProducerProps(config)) } //省略 var message: ProducerRecord[Array[Byte], Array[Byte]] = null do { //reader的默认实现类是LineMessageReader,一行一行读取用户在命令行中的输入 message = reader.readMessage() if (message != null) //在没有特殊指定消息的key的情况下,key为空 producer.send(message.topic, message.key, message.value) } while (message != null) } catch { //省略 } Exit.exit(0) }LineMessageReader是消息读取的实现类,用来读取我们在命令行中输入的Kafka消息:
class LineMessageReader extends MessageReader { var topic: String = null var reader: BufferedReader = null var parseKey = false var keySeparator = "\t" var ignoreError = false var lineNumber = 0 override def init(inputStream: InputStream, props: Properties) { topic = props.getProperty("topic") //如果需要指定key,则在kafka-console-producer中增加参数--property "parse.key=true",--property "key.separator=:" //用来告诉kafka是否使用key以及分割消息和key的分隔符 if (props.containsKey("parse.key")) parseKey = props.getProperty("parse.key").trim.equalsIgnoreCase("true") if (props.containsKey("key.separator")) keySeparator = props.getProperty("key.separator") if (props.containsKey("ignore.error"))//是否忽略错误 ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true") reader = new BufferedReader(new InputStreamReader(inputStream)) } override def readMessage() = { lineNumber += 1 print(">") (reader.readLine(), parseKey) match { case (null, _) => null case (line, true) => line.indexOf(keySeparator) match { case -1 => //在用户输入的消息中没有找到keySeparator if (ignoreError) new ProducerRecord(topic, line.getBytes) else throw new KafkaException(s"No key found on line $lineNumber: $line") case n => //找到keySeparator定义的字符,则提取消息体和key,组装成为ProducerRecord对象 val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes new ProducerRecord(topic, line.substring(0, n).getBytes, value) } case (line, false) =>//用户没有打开parse.key功能,则设置key为null new ProducerRecord(topic, line.getBytes) } } }LineMessageReader的主要职责是读取命令行中用户的输入,然后使用KafkaProducer把消息发送出去。我们可以通过参数parse.key=true以及key.separator=:来告诉kakfa我们会显式指定key。绝大多数情况下,除非有特殊需求,我们都不会使用如此繁复冗长的参数。因此,实际上,我使用如下命令进行消息的生产时,效果和我在代码中使用KafkaProducer进行消息的生产并将key设置为null的效果一样,Kafka都会通过使用DefaultPartitioner来进行消息的分派,由于key为null,将选择任何一个活着的broker,因此,虽然我们Kafka的某个topic的部分partition的leader丢失,消息却不会丢失。
其实,我们在使用Kafka过程中,我们会以为我们使用比如一个每次递增1的key,可以实现消息分派的负载均衡,即消息会几乎均匀地分布到所有的partition上面去。但是其实这样做可能会造成消息的丢失,更好的做法,就是直接不指定key,此时Kafka会帮助我们在所有或者的broker中选择一个进行消息分派,不会造成消息丢失,同时负载均衡Kafka也帮我们完成了。 Kafka的key的使用是用来满足定制化的分派规则而不是消息均匀分派,比如: 1. 我们希望这个topic的所有消息打到同一个partition,这时候我们可以指定一个不变的任意的key,根据DefaultPartitioner的实现,消息会固定打到某个partition; 2. 我们希望根据消息的内容完全定制化地控制这个消息对应的partition,这时候我们需要自己实现一个Partitioner。如果我们看过DefaultPartitioner的实现,那么实现自己的定制化的Partitioner就太简单了。
总体来说本文的这些代码难度不是很大,但是对于我们理解Kafka的运行机制从而正确地、毫无误解地使用Kafka非常有帮助。相比我在博客中介绍的Hadoop、Yarn的调度求、资源管理器代码,这段代码非常容易理解,但是也可以从中看到Kafka代码的优雅和规范,良好的接口定义带来良好的可扩展性。