编程环境参见前面的 storm 的开发,还是使用 maven 项目。
由于使用的是 kafka 0.11.0.0版本,所以,必须在 pom.xml中加入下面一行代码:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>0.11.0.0</version>
</dependency>
开发内容,分为 Producer API、Consumer API 、Streams API、Connect API 和 AdminClient API。 首先看 Producer API 的例子。 一、参数定义 Producer 是信息的产生者,将产生的信息发送到集群服务中, 因此,我们需要对一些参数做出定义。特别是连接服务器的一些参数。我们这里给出一个参数类KafkaProperties。
package wangxn.mr; public class KafkaProperties { public static final String TOPIC = "my-topic"; public static final String KAFKA_SERVER_URL = "mymac"; public static final int KAFKA_SERVER_PORT = 9092; public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024; public static final int CONNECTION_TIMEOUT = 100000; public static final String TOPIC2 = "topic2"; public static final String TOPIC3 = "topic3"; public static final String CLIENT_ID = "SimpleConsumerDemoClient"; private KafkaProperties() {} }
使用 java 的属性类。
Properties props=new Properties();
props.put("bootstrap.servers","mymac:9092");
props.put("acks","all");
props.put("retries", 0);
props.put("batch.size",16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("client.id","Demo1Producer");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
还有2个参数需要定义,就是 "key.serializer" 和"value.serializer",指定参数的序列化。props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
二、定义 Producer 包含一个缓冲空间池,其中保存了尚未传输到服务器的记录,以及一个后台的输入/输出线程,该线程负责将这些记录转换为请求并将其发送到集群。在使用后关闭该生产者的失败将会泄漏这些资源。 还有一个异步发送的方法 send()Producer<String, String> producer=new KafkaProducer<String, String>(props); for (int i=0;i<100;i++) { producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i),Integer.toString(i))); //new DemoCallBack(System.currentTimeMillis(), i, Integer.toString(i) ) ); System.out.println("Sent message: (" + i + ", " + Integer.toString(i) + ")");} producer.close(); 这样可以将信息发送到集群中。 为了表示信息发动的中间过程,所以使用了 system.out 来输出。当然最终可以使用 kafka-console-consumer.sh 来查看
三、完成的程序实例
pom.xml 一定要包含 maven 依赖啊。
package wangxn.testProducer; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; //Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs. public class RunProducer { public static void main(String[] args) { // TODO Auto-generated method stub Properties props=new Properties(); props.put("bootstrap.servers", "mymac:9092"); props.put("acks","all"); props.put("retries", 0); props.put("batch.size",16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("client.id", "Demo1Producer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer=new KafkaProducer<String, String>(props); for (int i=0;i<100;i++) { producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i),Integer.toString(i))); //new DemoCallBack(System.currentTimeMillis(), i, Integer.toString(i) ) ); System.out.println("Sent message: (" + i + ", " + Integer.toString(i) + ")");} producer.close(); } } 四、运行 1、运行 直接当做一个 java APP 运行即可。或者 import 出一个可运行的 jar 文件 1)开发环境运行 2)编译后在命令行下运行 >kafka-run-class.sh -cp myy.jar wangxn.testproducer.RunProducer 或者直接使用 java >java -cp myy.jar wangxn..testproducer.RunProducer 2、检验 检查输入的信息,使用:kafka-console-consumer.sh --zookeeper mymac:2181 --topic my-topic --from-beginning