Kafka入门教程 (四) Spring-kafka的使用和原理
Create Topics
定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建
@Bean public KafkaAdmin admin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses())); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return new NewTopic("foo", 10, (short) 2); } @Bean public NewTopic topic2() { return new NewTopic("bar", 10, (short) 2); }
Send Message
使用kafkaTemplate即可
@Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // See <https://kafka.apache.org/documentation/#producerconfigs> for more properties return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { return new KafkaTemplate<Integer, String>(producerFactory()); }
Receive Message:Consumer
接收消息(Consumer)有两种方式,一种是使用@KafkaListener注解的方式,一种是实现MessageListener<K, V>接口。
容易混淆的几个类
//接口,封装原生KafkaConsumer,一个container封装一个consumer interface MessageListenerContainer; //单线程container实现,只启动一个consumer class KafkaMessageListenerContainer implemets MessageListenerContainer; //多线程container实现,负责创建多个KafkaMessageListenerContainer class ConcurrentMessageListenerContainer implemets MessageListenerContainer; //接口,工厂模式,container工厂,负责创建container,当使用@KafkaListener时需要提供 interface KafkaListenerContainerFactory<C extends MessageListenerContainer>; //container工厂的唯一实现,且参数为多线程container,如果需要单线程,setConsurrency(null)即可,这也是默认参数 class KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>>
实现接口(非注解模式)
需要提供:
- MessageListenerContainer
前者是consumer的业务实现(消息处理方法),后者是spring-kafka封装的对象,负责创建consumer
MessageListenerContainer
负责创建MessageListener,有两种MessageListenerContainer
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
前者是单线程消费使用;后者是多线程消费使用,通过代理的方式创建多个消费者。二者构造函数也比较相似
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionInitialOffset... topicPartitions)
- ConsumerFactory:包含了Consumer的配置信息
- ContainerProperties:构造函数设置监听的topics,ContainerProps.setMessageListener方法设置MessageListener
- TopicPartitionInitialOffset:可以指定要消费的Topic/Partition和对应的offset,也可以在ContainerProperties中指定,指定后属于低级消费
多线程下,使用ConcurrentMessageListenerContainer::setConcurrency(3),就会创建3个KafkaMessageListenerContainer。
一个单线程的示例如下
Configuration @EnableKafka public class KafkaConfig { @Bean public KafkaMessageListenerContainer<String,String> kafkaConatiner(){ ContainerProperties properties=new ContainerProperties(topic) //can use TopicPartitionInitialOffset constructor to use low level consumer KafkaMessageListenerContainer<String,String> container=new KafkaMessageListenerContainer<String,String>(consumerFactory(),properties) container.getContainerProperties().setMessageListener("your listener") container.getContainerProperties().setAckMode(//ack mode,参见官网说明) } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers) //put all configs in map ... return props; } }
注意,高级消费要设置partition.assignment.strategy为RoundRobinAssignor,保证线程的平均分配(提醒:一个partition中,对于一个consumer group只能有一个线程消费,所以不存在多线程并发的问题,这也是Kafka高吞吐量的一个保证)如果不设置,可能会导致很多线程空闲。同时,线程多于partition*topic,Spring会自动的减少线程数来降低消耗。设置方法为
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor")
MessageListener<K, V>
这里有多种不同的接口,如下:
public interface MessageListener<K, V> { 1 void onMessage(ConsumerRecord<K, V> data); } public interface AcknowledgingMessageListener<K, V> { 2 void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); } public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 3 void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); } public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 4 void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } public interface BatchMessageListener<K, V> { 5 void onMessage(List<ConsumerRecord<K, V>> data); } public interface BatchAcknowledgingMessageListener<K, V> { 6 void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); } public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 7 void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 8 void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
总结一下就是:
根据Container中kafka的配置选用适当的MessageListener即可。
注意
- Consumer是线程不安全的,只能在调用他的线程中使用其方法
- 使用ack时, prop的map中要设置auto commit 为false,同时要在ContainerProperties::setAckMode中设置为MANNUAL或者MANNUAL_IMMEDIATE
@KafkaListener
首先按照上文的介绍创建consumerFactory(),再创建KafkaListenerContainerFactory,示例如下
@Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3);// set to null to use single thread factory.setBatchListener(true);// enable this when use batch listener factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); ... return props; } }
直接添加注解到方法上
@Component public class Listener { @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId") public void listen(String data) { ... } }
注意
- 默认需要创建一个名为**“kafkaListenerContainerFactory”的bean**,如果名称不一致,则需要在@KafkaListener中指定ContainerFactory
- 以上是多线程下的配置,非多线程设置concurrency为null,setConcurrency(null)
- 如果@KafkaListener只指定topic,则属于高级消费;指定了@TopicPartition则属于低级消费,要注意***设置线程分配策略***
- 被注解的方法参数如果是List,要记得设置batchListener为true
- 同样注意ackmode
Commit Offset
默认自动提交
配置中设为false,需要设置ackmode
ContainerProperties.setAckMode(AbstractMessageListenerContainer.AckMode ackMode)
- RECORD - 处理完一条记录后提交
- BATCH - 处理完poll的一批数据后提交.
- TIME - 处理完poll的一批数据后并且距离上次提交超过了设置的ackTime
- COUNT - 处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount
- COUNT_TIME - TIME和COUNT中任意一条满足即提交.
- MANUAL - 手动调用Acknowledgment.acknowledge()后,并且处理完poll的这批数据后提交
- MANUAL_IMMEDIATE - 手动调用Acknowledgment.acknowledge()后立即提交
Hint提示
一个程序有多种consumer配置?
- 扩展接口的方法,只需要定义新的KafkaMessageListenerContainer实现新配置,绑定新的MessageListener即可。
- @KafkaListener注解的方法:定义新的kafkaListenerContainerFactory实现新配置(主要是ContainerProperties),定义新的方法,添加@KafkaListener注解并指定刚创建的ContainerFactory即可。
选择用哪个?
- 都支持低级消费,都支持手动提交offset
- 注解不能方便的绑定topic,paitition等信息(当这些由运行时决定时)
其它
- @kafkaListener注解可以添加到类上,方法上加@KafkaHandler可以不同方法处理不同类型的消息
- 被注解的方法可以通过@Header获取更多record的信息
spring做了什么
- KafkaListenerContainerFactory创建ConcurrentMessageListenerContainer(仅使用注解时调用)
- ConcurrentMessageListenerContainer<K, V> 初始化,调用doStart方法(仅多线程时调用),根据线程数创建KafkaMessageListenerContainer(数量等于max[线程数,partition数]),并分配partition
protected void doStart() { //不在运行时执行 if (!isRunning()) { checkTopics(); ContainerProperties containerProperties = getContainerProperties(); TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions(); //指定了topicPartitions,检查currency是否大于partition数,是则修正为partition数(低级消费) if (topicPartitions != null && this.concurrency > topicPartitions.length) { this.logger.warn("When specific partitions are provided, the concurrency must be less than or " + "equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length); this.concurrency = topicPartitions.length; } //设置标志位 setRunning(true); //循环创建KafkaMessageListenerContainer for (int i = 0; i < this.concurrency; i++) { KafkaMessageListenerContainer<K, V> container; //高级消费 if (topicPartitions == null) { container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties); } //低级消费,通过partitionSubset方法分配 else { container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties, partitionSubset(containerProperties, i)); } String beanName = getBeanName(); container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i); if (getApplicationEventPublisher() != null) { container.setApplicationEventPublisher(getApplicationEventPublisher()); } container.setClientIdSuffix("-" + i); container.setGenericErrorHandler(getGenericErrorHandler()); container.setAfterRollbackProcessor(getAfterRollbackProcessor()); //调用KafkaMessageListenerContainer的start方法 container.start(); this.containers.add(container); } } }
//获取当前线程要处理的partitions private TopicPartitionInitialOffset[] partitionSubset(ContainerProperties containerProperties, int i) { TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions(); //一个线程,直接返回所有的 if (this.concurrency == 1) { return topicPartitions; } else { int numPartitions = topicPartitions.length; //线程数等于partition数,直接返回对应的下标的partition if (numPartitions == this.concurrency) { return new TopicPartitionInitialOffset[] { topicPartitions[i] }; } else { //获得每个线程平均partition数(整数) int perContainer = numPartitions / this.concurrency; TopicPartitionInitialOffset[] subset; //最后一个线程获得i * perContainer之后所有的partition //低阶消费,spring的分配不是很均匀的分配方式,所以建议设置线程数=partitions数 if (i == this.concurrency - 1) { subset = Arrays.copyOfRange(topicPartitions, i * perContainer, topicPartitions.length); } //其余每个线程获得perContainer个连续partition else { subset = Arrays.copyOfRange(topicPartitions, i * perContainer, (i + 1) * perContainer); } return subset; } } }
- 调用AbstractMessageListenerContainer的start方法,该方法调用KafkaMessageListenerContainer的doStart()方法,该方法初始化container,创建ListenerConsumer,以下是ListenerConsumer构造方法的一部分。(根据上一步是否分配partition,调用subscribe或assign创建Consumer对象)
final Consumer<K, V> consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer( this.consumerGroupId, this.containerProperties.getClientId(), KafkaMessageListenerContainer.this.clientIdSuffix); this.consumer = consumer; ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer); //未指定topic partition,调用subscribe,高级api if (KafkaMessageListenerContainer.this.topicPartitions == null) { if (this.containerProperties.getTopicPattern() != null) { consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener); } else { consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener); } } else { //指定topic partition,调用assign,低级api List<TopicPartitionInitialOffset> topicPartitions = Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions); this.definedPartitions = new HashMap<>(topicPartitions.size()); for (TopicPartitionInitialOffset topicPartition : topicPartitions) { this.definedPartitions.put(topicPartition.topicPartition(), new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent(), topicPartition.getPosition())); } consumer.assign(new ArrayList<>(this.definedPartitions.keySet())); }