Kafka入门教程 (三) Apache Kafka Java Client的使用

priority
Updated
Jun 18, 2021 05:26 AM
date
Dec 25, 2018
type
Post
URL
slug
kafka03
Created
Jun 15, 2021 02:02 PM
status
Published
tags
Kafka
summary
本文主要介绍kafka原生api的使用,关于kafka apache官方的文档页面只有简单的说明,不过所有的使用说明都在apache kafka java doc文档页面,每个类的文档都有详细的使用说明,源码中也有详细的注释。
原生api的使用比较简单,直接创建Consumer或者Producer对象即可,注意:由于Consumer线程不安全,不得多线程公用,且最好使用final变量
更多参考api docs

自动提交

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }

手动提交

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(100);
   for (ConsumerRecord<String, String> record : records) {
       buffer.add(record);
   }
   if (buffer.size() >= minBatchSize) {
       insertIntoDb(buffer);
       consumer.commitSync();
       buffer.clear();
   }
}

多线程

public class KafkaConsumerRunner implements Runnable {
   private final AtomicBoolean closed = new AtomicBoolean(false);
   private final KafkaConsumer consumer;

   public void run() {
       try {
           consumer.subscribe(Arrays.asList("topic"));
           while (!closed.get()) {
               ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
               // Handle new records
           }
       } catch (WakeupException e) {
           // Ignore exception if closing
           if (!closed.get()) throw e;
       } finally {
           consumer.close();
       }
   }

   // Shutdown hook which can be called from a separate thread
   public void shutdown() {
       closed.set(true);
       consumer.wakeup();
   }
}

低级消费

String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

© Song 2015 - 2021