kafka

xiaoxiao2021-02-28  92

kafka概论:

kafka高并发问题解决:横向扩大缓冲,发送端和接收端之间加消息队列特点: Persistent messagingHigh throughputDistributedMultiple client supportRealtime 应用场景: 数据管道流应用 LinkedIn (www.linkedin.com): Apache Kafka is used at LinkedIn for the streaming of activity data and operational metrics. This data powers various products such as LinkedIn news feed and LinkedIn Today in addition to offline analytics systems such as Hadoop. • DataSift (www.datasift.com/): At DataSift, Kafka is used as a collector for monitoring events and as a tracker of users' consumption of data streams in real time. • Twitter (www.twitter.com/): Twitter uses Kafka as a part of its Storm— a stream-processing infrastructure. • Foursquare (www.foursquare.com/): Kafka powers online-to-online and online-to-offline messaging at Foursquare. It is used to integrate Foursquare monitoring and production systems with Foursquare, Hadoop-based offline infrastructures. • Square (www.squareup.com/): Square uses Kafka as a bus to move all system events through Square's various datacenters. This includes metrics, logs, custom events, and so on. On the consumer side, it outputs into Splunk, Graphite, or Esper-like real-time alerting. kafka的主要功能: publish & subscribe:出版和订阅存储数据: 分布式有副本的容错的处理流数据 消息队列: 缓冲作用:解决高并发解耦作用:完成两个独立程序对接

集群安装:

依赖于zookeeper: 运行时每个节点上都需要启动zookeeper zkServer.sh start 进入到zk 的bin 目录下执行命名, 其中, 后面的日志路径根据实际情况: nohup ./zkServer.sh start >> /logs/zookeeper.file 2>&1 & 2、参数说明: nohup your_command > /dev/null 2>&1 & 1)、nohup:表示所属终端关闭后,进程不会死掉; 2)、> /dev/null :标准输出重定向到 /dev/null (a dummy device that does not record any output). 3)、2>&1 :标准出错重定向到标准输出,也到/dev/null 4)、最后的& :后台任务 配置环境变量,每个节点上都要有: #kafka export KAFKA_HOME=/opt/kafka/kafka_2.11-0.10.1.0 export PATH=$PATH:$KAFKA_HOME/bin 配置server.properties: # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 注释:master节点broker.id=0 salver1 broker.id=1 salver2 broker.id=2 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=master:2181,slaver1:2181,slaver2:2181 把kafka文件拷贝到另外两个节点上: scp -r kafka root@slaver1: /opt/ scp -r kafka root@slaver2: /opt/ broker的运行依赖于配置文件server.propertiesbroker是一个逻辑概念

Kafka运行:

kafka即时生产者消费者运行:

先启动zookeeper: 运行时每个节点上都需要启动zookeeper zkServer.sh start 在启动kafka: 每个节点都要启动: kafka-server-start.sh server.properties kafka后台运行: kafka-server-start.sh $KAFKA_HOME/config/server.properties >/usr/tmp/kafka.log & kafka创建表指令: 示例:kafka-topics.sh --zookeeper master:2181,slaver1:2181,slaver2:2181 --create --replication-factor 1 --partitions 1 --topic first_topic kafka两个独立程序相互对接发送数据生产者和接收数据消费者: 发送数据生产者:console-producer.sh kafka-console-producer.sh --broker-list slaver1:9092,slaver2:9092,master:9092 --topic first_topic 接收数据消费者:console-consumer.sh kafka-console-consumer.sh --bootstrap-server slaver2:9092 --topic first_topic

分布式架构原理:

zookeeper:集群协调管理配置工具。kafka的集群的形成和集群的基本配置信息,以及kafka的元数据信息,都保存在zookeeper中。kafka的broker连接信息也保存在zookeeper中。设计基础原理: broker:kafka的每一个服务节点都被称作broker,kafka集群就是就是若干个broker构成的一个服务整体。broker节点是一个逻辑上的概念(一个broker就是指一台机器上的一个进程),不是物理上的节点。topic:消息的分类名称即:数据的划分类别,如果把kafka认为一个数据的话,topic就是表的概念,它可以做数据隔离,类似redis上channl频道Partition:kafka的分布式方式,一个topic可以划分成多个partition,不同的partition负责不同的数据读写请求。Replication:副本用来容错,一个partition可以有多个副本,副本也有主次之分。kafka的容错手段,每一个partition有若干个replication,多个replication分别由不同的broker来进行负责。一个partition的多个replication中在某个时间点上只有一个是对外提供读写服务的,这个replication被称作leader负责对外读写操作,其余的replication被称作folower,folower主动同步leader的数据。Producer生产者:主要负责把数据发送至kafka,producer可以随意的和任意的broker来对接,消息可以均衡的随机的分配到一个topic的partition消息还可以在生产者中指定其所指向的partition。Consumer消费者:主要负责取出kafka中的数据进行消费处理,kafka对topic的消费可以是分布式的。consumer也可以连接任意的broker。Consumer group消费组:它是被存储在kafka服务端的一个名称,每个comsumer必须属于某个消费组,不同的消费组的消费者之间消费的数据是可以重复的,同一个消费组下的多个消费者之间,他们消费的数据是完全不会重复的,这点事kafka保证。每一个consumer都有所属的consumer group,每一个组都有一个名称,通过对consumer参数设置,就可以把consumer划分到某个组下面,每一个组下的多个consumer在消费同一个topic数据的时候,kafka机制能保证他们之间消费不会重复,不同组之间对一个topic进行消费时,数据时重复的生产者:写入物理文件: log文件index文件消费者: commits current offsets :提交当前偏移量 commit:指定的offsetcommit current offset :提交当前偏移量automatic commit :自动提交asynchronous commit :异步提交同步异步组合提交

图1:数据重复:

:图2:数据丢失:

指定offset消费kafka-consumer-groups.sh --bootstrap-server:服务器--list:列表、名单--topic:消息的分类名称kafka-consumer-offset-checker.sh 示例代码: kafka-consumer-offset-checker.sh --zookeeper slaver1:2181 --group abc --topic multitopic --zookeeper--topic:消息的分类名称--group:群、组、聚合--broker-info:broker的信息

Kafka基本指令:

topic管理:kafka-topic.sh --zookeeper<urls>:zookeeper的资源定位符即网址或链接--list:查看topic列表--describe:查看topic的详细信息--topic<topic>消息的分类名称--create创建 创建topic分区: kafka-topics.sh --zookeeper master:2181,slaver1:2181,slaver2:2181 --create --replication-factor 1 --partitions 1 --topic first_topic --replication-factor 副本数--partitions:分区--delete删除,设置之后删除的表不能恢复: 在:server.properties里面 #delete.topic.enable=true --alter:更改、修改--config<name=value>:配置kafka-console-producer.sh --broker-list:broker的列表--topic:消息分类名称kafka-console-consumer.sh --bootstrap-server:服务器--topic:消息分类名称--from-beginning:从头开始

kafkaApi:

producer:

kafkaProducer: 线程安全在一个应用中与其创建多个producer实例,不如在多个线程中使用一个producerproperties:java代码配置new一个properties kafkaProducer的配置bootstrap.serverskey.serizlizervalue.serializeracks:     0:producer不等待服务端的响应即认为消息发送成功    1:服务端成功把消息写入到parition的主replication之后就认为消息成功发送,反馈给producer    -1(all):服务端成功把 消息写入到相应partition的每一个replication之后才会认为消息成功发送并反馈给producerProducerRecord:生产者记载 topicpartition:分割keyvaluetimestamp:时间戳Partitioner:分区器 决定一条message应该该属于topic 的哪一个分区PartitionInfo:硬盘信息

consumer:

KfkaConsumer: subscribe():订阅poll():投票Properties:属性ConsumerRecords:消费者多个记录ConsumerRecord:消费者记录TopicPartitions:topic分割 构建数据管道可靠的数据传输

 

转载请注明原文地址: https://www.6miu.com/read-2600065.html

最新回复(0)