kafka flume生产日志到指定的kafka partition

xiaoxiao2021-02-27  240

需求

Flume+Kafka集成,将不同级别的日志生产到Kafka Topic不同的Partition中

解决方案

在源码中找到kafkasink,为其添加所需要的分区逻辑

//部分代码 Integer partitionId = null; try { ProducerRecord<String, byte[]> record; if (staticPartitionId != null) { partitionId = staticPartitionId; } //Allow a specified header to override a static ID if (partitionHeader != null) { String headerVal = event.getHeaders().get(partitionHeader); if (headerVal != null) { partitionId = Integer.parseInt(headerVal); } } /******我实际只写了这几行代码,指定partitionid******/ String type=headers.get("type"); if(type.equals("INFO")){ partitionId=0; }else if(type.equals("DEBUG")){ partitionId=1; }else if(type.equals("WARN")){ partitionId=2; }else if(type.equals("ERROR")){ partitionId=3; } /**********************************************/ if (partitionId != null) { record = new ProducerRecord<String, byte[]>(eventTopic, partitionId, eventKey, serializeEvent(event, useAvroEventFormat)); } else { record = new ProducerRecord<String, byte[]>(eventTopic, eventKey, serializeEvent(event, useAvroEventFormat)); } kafkaFutures.add(producer.send(record, new SinkCallback(startTime))); } catch (NumberFormatException ex) { throw new EventDeliveryException("Non integer partition id specified", ex); } catch (Exception ex) { // N.B. The producer.send() method throws all sorts of RuntimeExceptions // Catching Exception here to wrap them neatly in an EventDeliveryException // which is what our consumers will expect throw new EventDeliveryException("Could not send event", ex); } }

pom.xml中添加的依赖 当前官网源码已经更新到了1.7,支持的kafka是0.10.0(1.6支持0.8) 如果想使用低版本依赖,则需要使用对应低版本的源码

<dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-kafka-sink</artifactId> <version>1.7.0</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.0.0</version> <scope>runtime</scope> </dependency>

conf文件

#Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spoolDir a1.sources.r1.spoolDir=/tmp/logs a1.sources.r1.channels = c1 a1.sources.r1.interceptors = i2 a1.sources.r1.interceptors.i2.type = org.my.flume.myinterceptor.multinterceptor$Builder a1.sources.r1.interceptors.i2.regex = (\INFO|\DEBUG|\WARN|\ERROR) a1.sources.r1.interceptors.i2.serializers = s2 a1.sources.r1.interceptors.i2.serializers.s2.name = type # Describe the sink a1.sinks.k1.type = test.KafkaSink a1.sinks.k1.brokerList=172.17.11.120:9092,172.17.11.117:9092,172.17.11.118:9092 a1.sinks.k1.topic=TOPIC-20160504-1200 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.channel = c1 # Use a channel which buffers events inmemory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100

启动flume

# flume-ng agent -c conf -f conf/my.conf -n a1 -Dflume.root.logger=INFO,console
转载请注明原文地址: https://www.6miu.com/read-11867.html

最新回复(0)