大数据生态系统基础:Apache Kafka基础(四):最新kafka编程入门:Stream API

xiaoxiao2021-02-28  117

    数据传输的事务定义通常有以下三种级别:

最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的。当发布消息时,Kafka有一个“committed”的概念,一旦消息被提交了,只要消息被写入的分区的所在的副本broker是活动的,数据就不会丢失。  consumer可以先读取消息,然后将offset写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理,那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。consumer可以先读取消息,处理消息,最后记录offset,当然如果在记录offset之前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”。“精确一次”可以通过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset和消息被处理后的结果保存在一起。比如用Hadoop ETL处理消息时,将处理后的结果和offset同时保存在HDFS中,这样就能保证消息和offser同时被处理了。 一、流 Stream API 的作用        流 API 提供一个主题到另一个主题的转换。 也就是从一个输入主题到一个输出主题的流数据转换。        它的主要类就是 : Class KafkaStreams        可以通过使用TopologyBuilder来定义一个处理器的DAG拓扑结构,或者使用提供高级DSL来定义转换的KStreamBuilder来指定计算逻辑。       一个KafkaStreams可以包含一个或多个在configs中指定的线程,用于处理工作。可以通过相同的 APP ID 来协调其它的实例,不管是在同一进程中,还是在这台机器上的其他进程上,还是在远程机器上,都可以作为一个流处理 APP。这些实例将根据输入主题分区的分配来划分工作,从而使所有分区都被消耗。如果添加或失败实例,所有(剩余)实例将重新平衡分区分配,以平衡处理负载,并确保所有输入主题分区都被处理。         在内部,一个流实例包含了一个正常的 kafkaProducer 和 kafkaConsumer 实例,他们用做读输入和写输出。如下简单的例子:         Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");//定义 App ID props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //连接 kafka 集群 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); 二、WordCountDemo 例子         1、说明        

 */

/**

 * 用 high-level KStream DSL演示怎么完成 WordCount程序

 * 从一个输入的 txt 文件中,计算一个简单单词出现的频率

 *

 * 在本例中,输入流从一个名为“streams-file-input”的主题中读取,

 * 其中消息的值表示文本行;并且,直方图输出被写入到主题“streams-wordcount-output”,

 * 其中每个记录都是单个单词的更新计数。

 *

 * 在运行本例子钱,你一定要创建一个输入主题和输出主题 (e.g. kafka-topics.sh--create ...),

 * 并写一些数据到输入主题中,(e.g. kafka-console-producer.sh). 

 * 否则的话,你在输出主题中什么都看不到.

 */

       2、准备数据

创建数据文件:file-input.txt

1 >echo-e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt 或者在 Windows: 1 2 3 >echoall streams lead to kafka> file-input.txt >echohello kafka streams>> file-input.txt >echo|set/p=joinkafka summit>> file-input.txt

   建立主题: stream-file-input

  

1 2 3 4 5 > bin /kafka-topics .sh --create \      --zookeeper localhost:2181 \      --replication-factor 1 \      --partitions 1 \      --topic streams- file -input            将数据文件 file-input.txt 输入到主题 streams-file-input

 3、开始处理数据

1 >kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo 4、检查结果 1 2 3 4 5 6 7 8 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \     --topic streams-wordcount-output \     --from-beginning \     --formatter kafka.tools.DefaultMessageFormatter \     --property print.key=true\     --property print.value=true\     --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \     --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer 输出结果为: 1 2 3 4 5 6 7 8 all     1 lead    1 to      1 hello   1 streams 2 join   1 kafka   3 summit  1 三、源程序       注意:在 pom.xml 增添以下一行代码:

<dependency>

    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka-streams</artifactId>

    <version>0.11.0.0</version>

</dependency>

下面是源程序 package wangxn.testkafka; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueMapper; /** * @author wangxinnian * */ /** * 用 high-level KStream DSL演示怎么完成 WordCount程序 * 从一个输入的 txt 文件中,计算一个简单单词出现的频率 * * 在本例中,输入流从一个名为“streams-file-input”的主题中读取, * 其中消息的值表示文本行;并且,直方图输出被写入到主题“streams-wordcount-output”, * 其中每个记录都是单个单词的更新计数。 * * 在运行本例子钱,你一定要创建一个输入主题和输出主题 (e.g. kafka-topics.sh --create ...), * 并写一些数据到输入主题中,(e.g. kafka-console-producer.sh). * 否则的话,你在输出主题中什么都看不到. */ public class WordCountDemo { /** * @param args */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "mymac:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // 设置偏移复位到最早,这样我们就可以用相同的预加载数据重新运行演示代码 // 注意,重新运行 demo, 你需要用 偏移量复位工具(offset reset tool): // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builder.stream("streams-wordcount-input"); KTable<String, Long> counts=source.flatMapValues(new ValueMapper<String, Iterable<String>>() { public Iterable<String> apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); } }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { public KeyValue<String, String> apply(String key, String value) { return new KeyValue<String,String>(value, value); } }).groupByKey().count("Counts"); // need to override value serde to Long type counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output"); final KafkaStreams streams = new KafkaStreams(builder, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { Exit.exit(1); } Exit.exit(0); } }

转载请注明原文地址: https://www.6miu.com/read-44509.html

最新回复(0)