kafka在spark中集群的配置

xiaoxiao2021-02-28  60

1.安装zk集群,必须先启动zk,然后才能启动kafka 2.config/server.properties 添加zk地址:zookeeper.connect=node-1:2181,node-2:2181,node-3:2181 修改broker.id(唯一的):broker.id=0 # 各自的id不一样 修改最后一行的ip为各自本机的ip

server.properties:

#broker的全局唯一编号,不能重复 broker.id=0 #用来监听链接的端口,producer或consumer将在此端口建立连接 port=9092 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘IO的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接受套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka运行日志存放的路径 log.dirs=/export/logs/kafka #topic在当前broker上的分片个数 num.partitions=2 #用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 #segment文件保留的最长时间,超时将被删除 log.retention.hours=168 #滚动生成新的segment文件的最大时间 log.roll.hours=168 #日志文件中每个segment的大小,默认为1G log.segment.bytes=1073741824 #周期性检查文件大小的时间 log.retention.check.interval.ms=300000 #日志清理是否打开 log.cleaner.enable=true #broker需要使用zookeeper保存meta数据 zookeeper.connect=master:2181,work1:2181,work2:2181 #zookeeper链接超时时间 zookeeper.connection.timeout.ms=6000 #partion buffer中,消息的条数达到阈值,将触发flush到磁盘 log.flush.interval.messages=10000 #消息buffer的时间,达到阈值,将触发flush到磁盘 log.flush.interval.ms=3000 #删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除 delete.topic.enable=true #此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误! host.name=guang1

3.启动 /bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 & (把文件信息写入到/dev/null文件中,以后台方式启动)并且每个broker都要分别启动 4.创建topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test 5.列出所有topic bin/kafka-topics.sh --list --zookeeper localhost:2181 6.向topic中写入数据 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 7.消费数据 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 8.查看指定topic的详情 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
转载请注明原文地址: https://www.6miu.com/read-2613837.html

最新回复(0)