我们来安装Kafka的集群模式,三台机器:
192.168.131.128 192.168.131.130 192.168.131.131Kafka集群需要依赖zookeeper,所以需要先安装好zk。
下载Kafka安装包: Kafka_2.11-1.1.0.tgz
解压到 /usr/local/下。
进入到Kafka的config目录下:
我们看到有zk的配置文件,这是Kafka自带的zk,如果你没有安装zk,可以使用Kafka集成的zk,配置方式和单独安装是一样的。
我们默认已经安装zk,所以修改server.properties文件,大致的配置项有这些:
broker.id=0 #每个实例不一样 listeners=PLAINTEXT://192.168.131.128:9092 #改为所在主机的ip advertised.host.name=192.168.131.128 #改为改为所在主机的ip num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/usr/local/Kafka/log #需手动创建,Kafka并不会根据配置文件自动创建 num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.131.128:2181,192.168.131.130:2181,192.168.131.131:2181 #修改为zookeeper所在主机ip:port zookeeper.connection.timeout.ms=6000 delete.topic.enable=true auto.create.topics.enable=false需要修改的地方已经标注出来了。
然后我们需要将Kafka同步到另外两台机器上:
scp -r Kafka hadoop@hadoopslaver1:/usr/local scp -r Kafka hadoop@hadoopslaver2:/usr/local下面我们准备启动,首先确保zk是启动的,如果没有安装可以使用Kafka的zk:
bin/zookeeper-server-start.sh config/zookeeper.properties &然后我们启动Kafka:
bin/Kafka-server-start.sh -daemon config/server.properties &三台机器上都要执行启动操作,如果偶没有报错就是启动成功了。
接下来我们可以做一些测试。
消费端:
import java.util.Arrays; import java.util.Properties; import org.apache.Kafka.clients.consumer.Consumer; import org.apache.Kafka.clients.consumer.ConsumerConfig; import org.apache.Kafka.clients.consumer.ConsumerRecord; import org.apache.Kafka.clients.consumer.ConsumerRecords; import org.apache.Kafka.clients.consumer.KafkaConsumer; import org.apache.Kafka.common.serialization.StringDeserializer; public class Consumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.131.128:9092,192.168.131.130:9092,192.168.131.131:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG ,"test") ; props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("page_visits")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }生产者:
import java.util.Date; import java.util.Properties; import java.util.Random; import org.apache.Kafka.clients.producer.Callback; import org.apache.Kafka.clients.producer.KafkaProducer; import org.apache.Kafka.clients.producer.Producer; import org.apache.Kafka.clients.producer.ProducerRecord; import org.apache.Kafka.clients.producer.RecordMetadata; public class Producer { public static void main(String[] args) { long events = 1; Random rnd = new Random(); Properties props = new Properties(); props.put("bootstrap.servers", "192.168.131.128:9092,192.168.131.130:9092,192.168.131.131:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.Kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.Kafka.common.serialization.StringSerializer"); //配置partitionner选择策略,可选配置 props.put("partitioner.class", "com.rickiyang.service.Partitioner"); Producer<String, String> producer = new KafkaProducer<>(props); for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = "192.168.2." + rnd.nextInt(255); String msg = runtime + ",www.example.com," + ip; ProducerRecord<String, String> data = new ProducerRecord<String, String>("page_visits", ip, msg); producer.send(data, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } }); } producer.close(); } }自定义分区策略:
import java.util.List; import java.util.Map; import org.apache.Kafka.clients.producer.Partitioner; import org.apache.Kafka.common.Cluster; import org.apache.Kafka.common.PartitionInfo; public class Partitioner implements Partitioner { @Override public void configure(Map<String, ?> configs) { // TODO Auto-generated method stub } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { int partition = 0; List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); String stringKey = (String) key; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions; } return partition; } @Override public void close() { // TODO Auto-generated method stub } }我们运行一下:
Producer:
Consumer
客户端可以接受到服务端的消息的。