Kafka入门教程 (四) Spring-kafka的使用和原理
Create Topics
定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic("foo", 10, (short) 2);
}
@Bean
public NewTopic topic2() {
return new NewTopic("bar", 10, (short) 2);
}
Send Message
使用kafkaTemplate即可
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See <https://kafka.apache.org/documentation/#producerconfigs> for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
Receive Message:Consumer
接收消息(Consumer)有两种方式,一种是使用@KafkaListener注解的方式,一种是实现MessageListener<K, V>接口。
容易混淆的几个类
//接口,封装原生KafkaConsumer,一个container封装一个consumer
interface MessageListenerContainer;
//单线程container实现,只启动一个consumer
class KafkaMessageListenerContainer implemets MessageListenerContainer;
//多线程container实现,负责创建多个KafkaMessageListenerContainer
class ConcurrentMessageListenerContainer implemets MessageListenerContainer;
//接口,工厂模式,container工厂,负责创建container,当使用@KafkaListener时需要提供
interface KafkaListenerContainerFactory<C extends MessageListenerContainer>;
//container工厂的唯一实现,且参数为多线程container,如果需要单线程,setConsurrency(null)即可,这也是默认参数
class KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>>
实现接口(非注解模式)
需要提供:
- MessageListenerContainer
前者是consumer的业务实现(消息处理方法),后者是spring-kafka封装的对象,负责创建consumer
MessageListenerContainer
负责创建MessageListener,有两种MessageListenerContainer
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
前者是单线程消费使用;后者是多线程消费使用,通过代理的方式创建多个消费者。二者构造函数也比较相似
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties,
TopicPartitionInitialOffset... topicPartitions)
- ConsumerFactory:包含了Consumer的配置信息
- ContainerProperties:构造函数设置监听的topics,ContainerProps.setMessageListener方法设置MessageListener
- TopicPartitionInitialOffset:可以指定要消费的Topic/Partition和对应的offset,也可以在ContainerProperties中指定,指定后属于低级消费
多线程下,使用ConcurrentMessageListenerContainer::setConcurrency(3),就会创建3个KafkaMessageListenerContainer。
一个单线程的示例如下
Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public KafkaMessageListenerContainer<String,String> kafkaConatiner(){
ContainerProperties properties=new ContainerProperties(topic) //can use TopicPartitionInitialOffset constructor to use low level consumer
KafkaMessageListenerContainer<String,String> container=new KafkaMessageListenerContainer<String,String>(consumerFactory(),properties)
container.getContainerProperties().setMessageListener("your listener")
container.getContainerProperties().setAckMode(//ack mode,参见官网说明)
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers)
//put all configs in map
...
return props;
}
}
注意,高级消费要设置partition.assignment.strategy为RoundRobinAssignor,保证线程的平均分配(提醒:一个partition中,对于一个consumer group只能有一个线程消费,所以不存在多线程并发的问题,这也是Kafka高吞吐量的一个保证)如果不设置,可能会导致很多线程空闲。同时,线程多于partition*topic,Spring会自动的减少线程数来降低消耗。设置方法为
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor")
MessageListener<K, V>
这里有多种不同的接口,如下:
public interface MessageListener<K, V> { 1
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> { 2
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 3
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 4
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public interface BatchMessageListener<K, V> { 5
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> { 6
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 7
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 8
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
总结一下就是:
根据Container中kafka的配置选用适当的MessageListener即可。
注意
- Consumer是线程不安全的,只能在调用他的线程中使用其方法
- 使用ack时, prop的map中要设置auto commit 为false,同时要在ContainerProperties::setAckMode中设置为MANNUAL或者MANNUAL_IMMEDIATE
@KafkaListener
首先按照上文的介绍创建consumerFactory(),再创建KafkaListenerContainerFactory,示例如下
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);// set to null to use single thread
factory.setBatchListener(true);// enable this when use batch listener
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
...
return props;
}
}
直接添加注解到方法上
@Component
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
注意
- 默认需要创建一个名为**“kafkaListenerContainerFactory”的bean**,如果名称不一致,则需要在@KafkaListener中指定ContainerFactory
- 以上是多线程下的配置,非多线程设置concurrency为null,setConcurrency(null)
- 如果@KafkaListener只指定topic,则属于高级消费;指定了@TopicPartition则属于低级消费,要注意***设置线程分配策略***
- 被注解的方法参数如果是List,要记得设置batchListener为true
- 同样注意ackmode
Commit Offset
默认自动提交
配置中设为false,需要设置ackmode
ContainerProperties.setAckMode(AbstractMessageListenerContainer.AckMode ackMode)
- RECORD - 处理完一条记录后提交
- BATCH - 处理完poll的一批数据后提交.
- TIME - 处理完poll的一批数据后并且距离上次提交超过了设置的ackTime
- COUNT - 处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount
- COUNT_TIME - TIME和COUNT中任意一条满足即提交.
- MANUAL - 手动调用Acknowledgment.acknowledge()后,并且处理完poll的这批数据后提交
- MANUAL_IMMEDIATE - 手动调用Acknowledgment.acknowledge()后立即提交
Hint提示
一个程序有多种consumer配置?
- 扩展接口的方法,只需要定义新的KafkaMessageListenerContainer实现新配置,绑定新的MessageListener即可。
- @KafkaListener注解的方法:定义新的kafkaListenerContainerFactory实现新配置(主要是ContainerProperties),定义新的方法,添加@KafkaListener注解并指定刚创建的ContainerFactory即可。
选择用哪个?
- 都支持低级消费,都支持手动提交offset
- 注解不能方便的绑定topic,paitition等信息(当这些由运行时决定时)
其它
- @kafkaListener注解可以添加到类上,方法上加@KafkaHandler可以不同方法处理不同类型的消息
- 被注解的方法可以通过@Header获取更多record的信息
spring做了什么
- KafkaListenerContainerFactory创建ConcurrentMessageListenerContainer(仅使用注解时调用)
- ConcurrentMessageListenerContainer<K, V> 初始化,调用doStart方法(仅多线程时调用),根据线程数创建KafkaMessageListenerContainer(数量等于max[线程数,partition数]),并分配partition
protected void doStart() {
//不在运行时执行
if (!isRunning()) {
checkTopics();
ContainerProperties containerProperties = getContainerProperties();
TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
//指定了topicPartitions,检查currency是否大于partition数,是则修正为partition数(低级消费)
if (topicPartitions != null
&& this.concurrency > topicPartitions.length) {
this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
+ topicPartitions.length);
this.concurrency = topicPartitions.length;
}
//设置标志位
setRunning(true);
//循环创建KafkaMessageListenerContainer
for (int i = 0; i < this.concurrency; i++) {
KafkaMessageListenerContainer<K, V> container;
//高级消费
if (topicPartitions == null) {
container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
containerProperties);
}
//低级消费,通过partitionSubset方法分配
else {
container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
containerProperties, partitionSubset(containerProperties, i));
}
String beanName = getBeanName();
container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
if (getApplicationEventPublisher() != null) {
container.setApplicationEventPublisher(getApplicationEventPublisher());
}
container.setClientIdSuffix("-" + i);
container.setGenericErrorHandler(getGenericErrorHandler());
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
//调用KafkaMessageListenerContainer的start方法
container.start();
this.containers.add(container);
}
}
}
//获取当前线程要处理的partitions
private TopicPartitionInitialOffset[] partitionSubset(ContainerProperties containerProperties, int i) {
TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
//一个线程,直接返回所有的
if (this.concurrency == 1) {
return topicPartitions;
}
else {
int numPartitions = topicPartitions.length;
//线程数等于partition数,直接返回对应的下标的partition
if (numPartitions == this.concurrency) {
return new TopicPartitionInitialOffset[] { topicPartitions[i] };
}
else {
//获得每个线程平均partition数(整数)
int perContainer = numPartitions / this.concurrency;
TopicPartitionInitialOffset[] subset;
//最后一个线程获得i * perContainer之后所有的partition
//低阶消费,spring的分配不是很均匀的分配方式,所以建议设置线程数=partitions数
if (i == this.concurrency - 1) {
subset = Arrays.copyOfRange(topicPartitions, i * perContainer, topicPartitions.length);
}
//其余每个线程获得perContainer个连续partition
else {
subset = Arrays.copyOfRange(topicPartitions, i * perContainer, (i + 1) * perContainer);
}
return subset;
}
}
}
- 调用AbstractMessageListenerContainer的start方法,该方法调用KafkaMessageListenerContainer的doStart()方法,该方法初始化container,创建ListenerConsumer,以下是ListenerConsumer构造方法的一部分。(根据上一步是否分配partition,调用subscribe或assign创建Consumer对象)
final Consumer<K, V> consumer =
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix);
this.consumer = consumer;
ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer);
//未指定topic partition,调用subscribe,高级api
if (KafkaMessageListenerContainer.this.topicPartitions == null) {
if (this.containerProperties.getTopicPattern() != null) {
consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
}
else { consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
}
}
else {
//指定topic partition,调用assign,低级api
List<TopicPartitionInitialOffset> topicPartitions =
Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
this.definedPartitions = new HashMap<>(topicPartitions.size());
for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
this.definedPartitions.put(topicPartition.topicPartition(),
new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent(),
topicPartition.getPosition()));
}
consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
}