在flume中事务是一个重要的概念,事务保证了数据的可用性。这里的事务有别于数据库中的事务,比事务在回滚时,可能会造成数据重复,所以flume保证的是每条数据最少发送一次,以此来保证数据不丢失。
此篇从具体的数据流中分析事务,配置的数据流是taildir+kafkachannel,然后kafkachannel+hdfsSink。
kafkachannel中维护了两个事务,分别是put事务和take事务。
个人站点地址:http://bigdatadecode.club/flume事务解析.html
kafkachannel的put事务是由taildir触发的,我们从代码中跟下put事务的流程。
taildir的入口是TaildirSource.process,代码如下:
public Status process() { Status status = Status.READY; try { existingInodes.clear(); existingInodes.addAll(reader.updateTailFiles()); for (long inode : existingInodes) { TailFile tf = reader.getTailFiles().get(inode); // 判断是否需要tail // 判断规则,修改时间是否大于上次记录的tial时间,记录的postition是否大于该文件的length if (tf.needTail()) { tailFileProcess(tf, true); } } closeTailFiles(); ... } catch (Throwable t) { ... } return status; }当file的修改时间大于记录的上次tail时间或者记录的postition大于file的length时(从0处tail),需要tail该file。 文件的tail逻辑在tailFileProcess代码中。
private void tailFileProcess(TailFile tf, boolean backoffWithoutNL) throws IOException, InterruptedException { while (true) { reader.setCurrentFile(tf); // 从文件中读取batchSize条数据 List<Event> events = reader.readEvents(batchSize, backoffWithoutNL); if (events.isEmpty()) { break; } sourceCounter.addToEventReceivedCount(events.size()); sourceCounter.incrementAppendBatchReceivedCount(); try { // 事务的实现 getChannelProcessor().processEventBatch(events); reader.commit(); } catch (ChannelException ex) { logger.warn("The channel is full or unexpected failure. " + "The source will try again after " + retryInterval + " ms"); TimeUnit.MILLISECONDS.sleep(retryInterval); retryInterval = retryInterval << 1; retryInterval = Math.min(retryInterval, maxRetryInterval); continue; } retryInterval = 1000; sourceCounter.addToEventAcceptedCount(events.size()); sourceCounter.incrementAppendBatchAcceptedCount(); // 追上写入的速度之后才会退出当前file?是否存在其它文件无法得到tail的机会?? // 这应该是个bug if (events.size() < batchSize) { break; } } }上面的bug是当一个fileGroup中有多个正在写入的文件时,如果某个文件的写入量大,致使每次都能从中读取batchSize条数据,则其它文件将没有机会被读取。 这个bug我已提交到社区FLUME-3101
下面看下事务具体是怎么实现的, getChannelProcessor().processEventBatch(events) -> ChannelProcesser.processEventBatch,看下processEventBatch的代码:
public void processEventBatch(List<Event> events) { ... // 将event与channel组成map for (Event event : events) { ... } // Process required channels for (Channel reqChannel : reqChannelQueue.keySet()) { // 得到channel对应的事务 Transaction tx = reqChannel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { // 开始事务 tx.begin(); // 处理事务,这里是先将event写入内存,然后由commit批量将events写入kafka List<Event> batch = reqChannelQueue.get(reqChannel); for (Event event : batch) { reqChannel.put(event); } // 提交事务,也是一个事务的结束 tx.commit(); } catch (Throwable t) { // 发生议程,进行事务回滚 tx.rollback(); if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; } else if (t instanceof ChannelException) { throw (ChannelException) t; } else { throw new ChannelException("Unable to put batch on required " + "channel: " + reqChannel, t); } } finally { if (tx != null) { tx.close(); } } } // Process optional channels for (Channel optChannel : optChannelQueue.keySet()) { ... } }首先从该source中绑定的channel中拿到对应的Transaction,然后调用begin方法开始事务,等数据处理结束之后,调用commit提交事务,如果处理数据的过程中发生错误,则在catch中捕获,调用rollback进行事务回滚。
先看下数据处理的逻辑,通过reqChannel.put(event)将数据将入channel的内存中。看似调用的是channel的方法,其实channel的put只是对Transaction的put进行了下封装,而Transaction.put的具体实现是在channel中的Transaction.doPut里实现的。 reqChannel.put(event) -> BasicChannelSemantics.put -> BasicTransactionSemantics.put -> BasicTransactionSemantics.duPut 其中doPut是一个抽象方法,其具体实现放在各个channel的Transaction中。这里使用的kafkaChannel,其实现如下:
protected void doPut(Event event) throws InterruptedException { // 事务类型 PUT or TAKE type = TransactionType.PUT; ... Integer partitionId = null; try { if (staticPartitionId != null) { partitionId = staticPartitionId; } //Allow a specified header to override a static ID if (partitionHeader != null) { String headerVal = event.getHeaders().get(partitionHeader); if (headerVal != null) { partitionId = Integer.parseInt(headerVal); } } // 将event构建一个ProducerRecord对象放入producerRecords中, // 等待commit时写入kafka if (partitionId != null) { producerRecords.get().add( new ProducerRecord<String, byte[]>(topic.get(), partitionId, key, serializeValue(event, parseAsFlumeEvent))); } else { producerRecords.get().add( new ProducerRecord<String, byte[]>(topic.get(), key, serializeValue(event, parseAsFlumeEvent))); } } catch (NumberFormatException e) { throw new ChannelException("Non integer partition id specified", e); } catch (Exception e) { throw new ChannelException("Error while serializing event", e); } }doPut首先给事务的类型赋值,然后将event放入内存中,如果此过程中没有发生错误,则会调用commit对内存中的event提交到kafka中。
下面看下commit的代码,commit的调用逻辑和put类似,具体实现是在KafkaChannel.KafkaTransaction的duCommit中,代码如下:
protected void doCommit() throws InterruptedException { if (type.equals(TransactionType.NONE)) { return; } // 判断需要commit的事务类型 // 此处先分析PUT的commit if (type.equals(TransactionType.PUT)) { if (!kafkaFutures.isPresent()) { kafkaFutures = Optional.of(new LinkedList<Future<RecordMetadata>>()); } try { long batchSize = producerRecords.get().size(); long startTime = System.nanoTime(); int index = 0; for (ProducerRecord<String, byte[]> record : producerRecords.get()) { index++; // 多线程之间共享一个producer实例(官方推荐,但也可以根据自己的情况而定) // The producer is thread safe and sharing a single producer instance // across threads will generally be faster than having multiple instances. kafkaFutures.get().add(producer.send(record, new ChannelCallback(index, startTime))); } //prevents linger.ms from being a problem // 强制发送累加队列RecordAccumulator中的数据 producer.flush(); // 等待各线程将数据发送至kafka for (Future<RecordMetadata> future : kafkaFutures.get()) { future.get(); } long endTime = System.nanoTime(); counter.addToKafkaEventSendTimer((endTime - startTime) / (1000 * 1000)); counter.addToEventPutSuccessCount(batchSize); producerRecords.get().clear(); kafkaFutures.get().clear(); } catch (Exception ex) { logger.warn("Sending events to Kafka failed", ex); throw new ChannelException("Commit failed as send to Kafka failed", ex); } } else { ... } }PUT类型的事务和TAKE类型的事务都是在doCommit中提交,这里调用的都是kafka的Java Api,需要注意的是各个线程之间共享一个producer实例,event发送到kafka可以认为是同步发送,因为调用了future.get等待各个线程的结束。 这里还调用了producer.flush(),这是为了防止配置了linger.ms对record进行合并发送,flush强制将队列中的数据发送到kafka。
无论是在doPut还是在doCommit中发生错误,都会对事务进行回滚。回滚是在doRollback中,代码如下:
protected void doRollback() throws InterruptedException { if (type.equals(TransactionType.NONE)) { return; } if (type.equals(TransactionType.PUT)) { // PUT时发生错误,则把内存中的数据清空 // 但没有对回滚的次数进行统计 producerRecords.get().clear(); kafkaFutures.get().clear(); } else { ... } }*由上面的分析可知,kafkachannel是将event通过doPut写入内存,然后通过doCommit将内存中的数据发送到kafka,这个事务是将event写入到kafka时才结束。 而memorychannel则是将event通过doPut写入内存(putList)中,然后通过doCommit将putList中的数据写入queue中,写入queue成功则事务结束。可见如果使用kafkachannel向kafka中写数据时会比memorychannel要高效,更重要的是能保证数据的事务性*。
下面看下take事务
kafkachannel中的take事务是由sink触发的,这里是指hdfsSink,下面看下take的事务代码。
此处的sink用的是HDFSEventSink,其process代码如下:
// 非线程安全 public Status process() throws EventDeliveryException { // 拿到sink关联的channel Channel channel = getChannel(); // 从channel中得到Transaction Transaction transaction = channel.getTransaction(); List<BucketWriter> writers = Lists.newArrayList(); // 开始事务 transaction.begin(); try { int txnEventCount = 0; for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { // 从channel中取出一条数据 Event event = channel.take(); if (event == null) { break; } ... synchronized (sfWritersLock) { bucketWriter = sfWriters.get(lookupPath); // we haven't seen this file yet, so open it and cache the handle // 没有文件的句柄,则新建一个 if (bucketWriter == null) { hdfsWriter = writerFactory.getWriter(fileType); bucketWriter = initializeBucketWriter(realPath, realName, lookupPath, hdfsWriter, closeCallback); sfWriters.put(lookupPath, bucketWriter); } } // track the buckets getting written in this transaction // 一次事务中,take的event可能来自不同topic的parition,则需要同时打开多个文件句柄 if (!writers.contains(bucketWriter)) { writers.add(bucketWriter); } // Write the data to HDFS try { bucketWriter.append(event); } catch (BucketClosedException ex) { ... } } ... // flush all pending buckets before committing the transaction for (BucketWriter bucketWriter : writers) { bucketWriter.flush(); } // 事务提交 transaction.commit(); ... } catch (IOException eIO) { // 发生异常进行事务回滚 transaction.rollback(); LOG.warn("HDFS IO error", eIO); return Status.BACKOFF; } catch (Throwable th) { transaction.rollback(); LOG.error("process failed", th); if (th instanceof Error) { throw (Error) th; } else { throw new EventDeliveryException(th); } } finally { transaction.close(); } }sink的process中先从对应的channel中得到Transaction,然后调用begin开始执行事务,然后开始处理数据。 处理数据时,调用channel.take从channel中take一条event,take最终调用的是KafkaTransaction.doTake,代码如下:
protected Event doTake() throws InterruptedException { // 事务类型 type = TransactionType.TAKE; try { // channelUUID是final类型的,那一个kafkachannel实例只有一个consumer? if (!(consumerAndRecords.get().uuid.equals(channelUUID))) { logger.info("UUID mismatch, creating new consumer"); decommissionConsumerAndRecords(consumerAndRecords.get()); consumerAndRecords.remove(); } } catch (Exception ex) { logger.warn("Error while shutting down consumer", ex); } if (!events.isPresent()) { events = Optional.of(new LinkedList<Event>()); } Event e; // Give the channel a chance to commit if there has been a rebalance if (rebalanceFlag.get()) { logger.debug("Returning null event after Consumer rebalance."); return null; } if (!consumerAndRecords.get().failedEvents.isEmpty()) { e = consumerAndRecords.get().failedEvents.removeFirst(); } else { if ( logger.isTraceEnabled() ) { logger.trace("Assignment during take: {}", consumerAndRecords.get().consumer.assignment().toString()); } try { long startTime = System.nanoTime(); if (!consumerAndRecords.get().recordIterator.hasNext()) { consumerAndRecords.get().poll(); } if (consumerAndRecords.get().recordIterator.hasNext()) { ConsumerRecord<String, byte[]> record = consumerAndRecords.get().recordIterator.next(); e = deserializeValue(record.value(), parseAsFlumeEvent); TopicPartition tp = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID); consumerAndRecords.get().saveOffsets(tp,oam); //Add the key to the header if (record.key() != null) { e.getHeaders().put(KEY_HEADER, record.key()); } long endTime = System.nanoTime(); counter.addToKafkaEventGetTimer((endTime - startTime) / (1000 * 1000)); if (logger.isDebugEnabled()) { logger.debug("{} processed output from partition {} offset {}", new Object[] {getName(), record.partition(), record.offset()}); } } else { return null; } } catch (Exception ex) { logger.warn("Error while getting events from Kafka. This is usually caused by " + "trying to read a non-flume event. Ensure the setting for " + "parseAsFlumeEvent is correct", ex); throw new ChannelException("Error while getting events from Kafka", ex); } } eventTaken = true; events.get().add(e); return e; }doTake其实就是使用consumer消费kafka。理想情况下应该让一个consumer消费多个topic的一个partition,但这里consumer是和channelUUID对应的,而channelUUID又是final类型的,那是不是说kafkachannel实例中只有一个consumer? 这里的消费逻辑是consumer通过poll将数据拉到本地内存中,然后在sink中一条一条的取,每取一条offset就加1,内存取完之后再调用一次poll。 sink拿到event之后,根据event的信息放入相应的bucketWriter中,取出batchSize大小之后将所有的bucketWriter进行一次flush。flush成功之后进行事务的commit。
commit调用的是doCommit,下面看下代码:
protected void doCommit() throws InterruptedException { logger.trace("Starting commit"); if (type.equals(TransactionType.NONE)) { return; } if (type.equals(TransactionType.PUT)) { ... } else { // event taken ensures that we have collected events in this transaction // before committing // commit之前要保证当前事务中的event都被采集了 if (consumerAndRecords.get().failedEvents.isEmpty() && eventTaken) { logger.trace("About to commit batch"); long startTime = System.nanoTime(); // 提交offset consumerAndRecords.get().commitOffsets(); long endTime = System.nanoTime(); counter.addToKafkaCommitTimer((endTime - startTime) / (1000 * 1000)); if (logger.isDebugEnabled()) { logger.debug(consumerAndRecords.get().getCommittedOffsetsString()); } } int takes = events.get().size(); if (takes > 0) { counter.addToEventTakeSuccessCount(takes); events.get().clear(); } } }这里offset是手动触发的,调用的是kafka consumer的apiconsumer.commitSync(offsets)。 如果在commit或者flush的过程中发生异常,则进行事务回滚,代码如下:
protected void doRollback() throws InterruptedException { if (type.equals(TransactionType.NONE)) { return; } if (type.equals(TransactionType.PUT)) { ... } else { // 回滚次数统计 counter.addToRollbackCounter(events.get().size()); // 将内存中的event放入failedEvents中 consumerAndRecords.get().failedEvents.addAll(events.get()); events.get().clear(); } }flume的事务保证了数据不会丢失,是flume中一个重要的概念。
HdfsSink 和 kafkachannel consumer都是单线程吗? 一个kafkachannel实例一个consumer,sink从consumer中取数,然后分给不同的bucketWriter,可以认为consumer是单线程,处理数据是多线程?