1.1.1 kafka的安装过程 1、官网下载http://kafka.apache.org/downloads
2、上传安装包到Linux中的/export/servers/目录下,进入到该目录下 执行命令:tar -zxvf kafka_2.12-0.10.2.1.tgz -C /export/servers/
3、进入到/export/servers/目录下,建立软连接 执行命令:ln -s kafka_2.12-0.10.2.1/ kafka
4、配置profile文件 执行命令:vim /etc/profile 在该文件中加入下面代码
#set kafka env export KAFKA_HOME=/export/servers/kafka export PATH=$PATH:${KAFKA_HOME}/bin执行命令:source /etc/profile 让配置文件有效
5、进入到${KAFKA_HOME}/config目录下配置相关文件
producer.properties配置文件:
# 指定kafka节点列表,用于获取metadata,不必全部指定 metadata.broker.list=kafka01:9092,kafka02:9092 # 指定分区处理类,默认kafka.producer.DefaultPartitioner,表通过key哈希对应分区 # partitioner.class=kafka.producer.DefaultPartitioner # 是否压缩,默认0表示不压缩,1表示GZIP压缩,2表示snappy压缩,压缩后消息会有头来指明消息压缩类型, # 故消费者消费消息解压是透明的无需指定。 compression.codec=none # 指定序列化处理类 serializer.class=kafka.serializer.DefaultEncoder # 如果要压缩消息,这里指定那些topic要压缩消息,默认empty,表示不压缩 # compressed.topics= # 设置发送数据是否需要服务端的反馈,有三个值分别是0,1,-1 # 0:producer不会等待broker发送ack # 1:当leader接收到消息之后会发送ack # -1:当所有的follower都同步消息成功之后发送ack request.required.acks=0 # 在向producer发送ack之前,broker允许等待最大时间,如果超时,broker将会向producer发送一个error ack, # 意味着一次消息因为某种原因未能成功(比如follower不能同步成功) request.timeout.ms=10000 # 同步还是异步发送消息,默认“sync”表示同步,“async”表示异步,异步可以提高发送吞吐量 # 也意味着消息将会在本地buffer中,并适合批量发送,但是也可能导致未发送过去的数据丢失 producer.type=sync # 在async模式下,当message被缓存的时间超过次值后,将会批量的发送broker,默认为5000ms # 此值和batch.num.messages协同工作 queue.buffering.max.ms=5000 # 在async模式下,producer端允许buffer最大消息量 # 无论如何producer都无法尽快的将消息发送给broker,从而导致消息大量的在producer端大量沉积 # 此时,如果消费的条数达到阀值,将会导致producer端堵塞或者消息被抛弃,默认为10000 queue.buffering.max.messages=20000 # 如果是异步,指定每次批量发送数据量,默认为200 batch.num.messages=500 # 当消息在producer端沉积的条数达到“queue.buffering.max.messages”后, # 堵塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息) # 此时producer可以继续堵塞或者将消息抛弃,此timeout值用于控制“堵塞”的时间 # -1:无堵塞超时限制,消息不会别抛弃 # 0:立即清空队列,将消息抛弃 queue.enqueue.timeout.ms=-1server.properties配置文件:
# broker的全局唯一编号,不能重复 broker.id=0 # 用来监听连接的端口,producer或consumer将此端口建立连接 port=9092 # 处理网络请求的线程数量 num.network.threads=3 # 用来处理磁盘IO的线程数量 num.io.threads=8 # 发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 # 接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 # 请求套接字的缓冲区大小 socket.request.buffer.bytes=102400 # kafka运行日志的存放的路径 log.dirs=/export/servers/logs/kafka # topic在当前broker上的分片个数 num.partitions=2 # 用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 # segment文件保留的最长时间,超时将被删除 log.retention.hours=168 # 滚动生成新的segment文件的最大时间 log.roll.hours=168 # 日志文件每个segment文件的大小,默认为1G log.segment.bytes=1073741824 # 周期性检查文件大小的时间 log.retention.check.interval.ms=300000 # 日志清理是否打开 log.cleaner.enable=true # broker需要zookeeper保存元数据信息 zookeeper.connect=zk01:2181,zk01:2181,zk01:2181 # zookeeper连接超时时间 zookeeper.connection.timeout.ms=6000 # partition buffer中,消息的条数达到阀值,将触发flush到磁盘 log.flush.interval.message=10000 # 删除topic需要server.properties配置文件中配置delete.topic.enable=true否则只能标记删除 delete.topic.enable=true # 解决可以使用Java客户端产生消息到topic中(多个配置文件都需要配置) advertised.listeners=PLAINTEXT://kafka01:9092 # 此处的host.name为本机IP(重要),如果不改,则客户端抛出:producer connection to localhost:9092 ussuccessful 错误! host.name=kafka01consumer.propseties配置文件:
# zookeeper连接服务器地址 zookeeper.connect=zk01:2181,zk02:2181,zk03:2181 # zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉 zookeeper.session.timeout.ms=5000 # 当前消费者挂掉,其他消费者要等待指定时间才能检测到并且触发重新负载均衡 zookeeper.connetion.timeout.ms=10000 # 指定多久消费者更新offset到zookeeper中, # 注意offset更新时基于time而不是每次获得消息, # 一旦更新zookeeper发生异常并重启,将可能拿到已经拿过的消息 zookeeper.sync.time.ms=2000 # 指定消费组wsz.user可以任意,最好是有意义 group.id=wsz.user # 当consumer消费一定消息之后,将会自动向zookeeper提交offset信息 # 注意offset信息并不是每消费一次就向zk提交一次,而是在本地保存(内存),并定期提交,默认为true auto.commit.enable=true # 自定更新时间,默认60*1000 auto.commit.interval.ms=1000 # 当前consumer的标识,可以设定,也可以由系统生成,主要是用来跟踪消息消费情况,便于观察 # consumer.id= # 消费客户端编号,用于区分不同的客户端,默认客户端程序自动产生 # client.id= # 最大去多少块缓存得到消费者 queued.max.message.chunks=50 # 当有新的consumer加入到group中时,将会rebalance,此后将会有partitions的消费端迁移到新的consumer上 # 如果一个consumer获得了某个partitions的消费权限,那么他将会向zk注册"Partition Owner registry"节点信息 # 但是有可能此时旧的的consumer尚未释放该节点,此值用于控制,注册节点的重试次数 rebalance.max.retrils=5 # 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk,每次fetch将得到多条消息,此值为总大小。 # 提示此值,将会消耗跟多的consumer端的内存 fetch.min.bytes=6553600 # 当消费者的尺寸不足时,server堵塞的时间,如果超时,消息将立即发送给consumer fetch.wait.max.ms=5000 socket.receive.buffer.bytes=6553600 # 如果zookeeper没有offset值或者offset超出范围,那么就给个初始的offset,分别表示 # smallest:当前最小的offset # largest:当前最大的offset # anything:抛异常 # 默认largest auto.offset.reset=smallest6、修改host文件(所有服务器都需要修改,如果上中的配置文件使用的是IP地址,这里就不需要修改了,下面的hostname也不需要修改了)
192.168.1.110 storm01 zk01 kafka01 192.168.1.111 storm02 zk02 kafka02 192.168.1.112 storm03 zk03 kafka03 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain67、 修改hostname配置文件(所有服务器多需要修改,修改为对应的主机名) 执行命令:vim /etc/sysconfig/network 文件内容:
NETWORKING=yes HOSTNAME=kafka018、分发安装好的kafka到其他机器上 使用指令:scp -r /export/servers/kafka_2.12-0.10.2.1/ kafka02:/export/servers/
9、分发好了在其他机器上建立软连接 执行命令:ln -s kafka_2.12-0.10.2.1/ kafka
10、配置profile文件 执行命令:vim /etc/profile 在文件中加入内容:
#set kafka env export KAFKA_HOME=/export/servers/kafka export PATH=$PATH:${KAFKA_HOME}/bin使配置文件生效:source /etc/profile
11、修改其他机器上的server.properties配置文件 使用配置文件中的id和其他机器上不同即可 host.name是此机器上的机器名,修改为对应的机器名即可。
12、重启所有节点
1.1.2 启动kafka集群
1、启动kafka集群先要启动zookeeper集群,zookeeper的安装可以查看该文章 http://blog.csdn.net/qq_27385301/article/details/66481908
2、启动zookeeper后,在各个节点上的${KAFKA_HOME}目录下执行 bin/kafka-server-start.sh config/server.properties &