大数据生态系统基础:Apache Kafka基础(二):最新kafka编程入门:Producer API

xiaoxiao2021-02-28  99

      编程环境参见前面的 storm 的开发,还是使用 maven 项目。

     由于使用的是 kafka  0.11.0.0版本,所以,必须在 pom.xml中加入下面一行代码:

   

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.12</artifactId>

<version>0.11.0.0</version>

</dependency>

    开发内容,分为 Producer API、Consumer API 、Streams API、Connect API 和 AdminClient API。     首先看 Producer API 的例子。     一、参数定义       Producer 是信息的产生者,将产生的信息发送到集群服务中, 因此,我们需要对一些参数做出定义。特别是连接服务器的一些参数。我们这里给出一个参数类KafkaProperties。

package wangxn.mr; public class KafkaProperties { public static final String TOPIC = "my-topic"; public static final String KAFKA_SERVER_URL = "mymac"; public static final int KAFKA_SERVER_PORT = 9092; public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024; public static final int CONNECTION_TIMEOUT = 100000; public static final String TOPIC2 = "topic2"; public static final String TOPIC3 = "topic3"; public static final String CLIENT_ID = "SimpleConsumerDemoClient"; private KafkaProperties() {} }

        使用 java 的属性类。

                Properties props=new Properties();

props.put("bootstrap.servers","mymac:9092");

props.put("acks","all");

props.put("retries", 0);

props.put("batch.size",16384);

props.put("linger.ms", 1);

props.put("buffer.memory", 33554432);

props.put("client.id","Demo1Producer");

props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

    还有2个参数需要定义,就是 "key.serializer" 和"value.serializer",指定参数的序列化。

         props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

二、定义 Producer         包含一个缓冲空间池,其中保存了尚未传输到服务器的记录,以及一个后台的输入/输出线程,该线程负责将这些记录转换为请求并将其发送到集群。在使用后关闭该生产者的失败将会泄漏这些资源。        还有一个异步发送的方法 send()

Producer<String, String> producer=new KafkaProducer<String, String>(props); for (int i=0;i<100;i++) { producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i),Integer.toString(i))); //new DemoCallBack(System.currentTimeMillis(), i, Integer.toString(i) ) ); System.out.println("Sent message: (" + i + ", " + Integer.toString(i) + ")");} producer.close();         这样可以将信息发送到集群中。 为了表示信息发动的中间过程,所以使用了 system.out 来输出。当然最终可以使用 kafka-console-consumer.sh 来查看

  

三、完成的程序实例

    pom.xml 一定要包含 maven 依赖啊。

     

package wangxn.testProducer; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; //Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs. public class RunProducer { public static void main(String[] args) { // TODO Auto-generated method stub Properties props=new Properties(); props.put("bootstrap.servers", "mymac:9092"); props.put("acks","all"); props.put("retries", 0); props.put("batch.size",16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("client.id", "Demo1Producer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer=new KafkaProducer<String, String>(props); for (int i=0;i<100;i++) { producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i),Integer.toString(i))); //new DemoCallBack(System.currentTimeMillis(), i, Integer.toString(i) ) ); System.out.println("Sent message: (" + i + ", " + Integer.toString(i) + ")");} producer.close(); } }

四、运行      1、运行        直接当做一个 java APP 运行即可。或者 import 出一个可运行的 jar 文件        1)开发环境运行               2)编译后在命令行下运行      >kafka-run-class.sh -cp myy.jar wangxn.testproducer.RunProducer        或者直接使用 java     >java -cp myy.jar wangxn..testproducer.RunProducer 2、检验       检查输入的信息,使用:kafka-console-consumer.sh --zookeeper mymac:2181 --topic my-topic --from-beginning
转载请注明原文地址: https://www.6miu.com/read-19290.html

最新回复(0)