Kafka入门教程 (四) Spring-kafka的使用和原理

Create Topics

定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建
@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
            StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return new NewTopic("foo", 10, (short) 2);
}

@Bean
public NewTopic topic2() {
    return new NewTopic("bar", 10, (short) 2);
}

Send Message

使用kafkaTemplate即可
@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See <https://kafka.apache.org/documentation/#producerconfigs> for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

Receive Message:Consumer

接收消息(Consumer)有两种方式,一种是使用@KafkaListener注解的方式,一种是实现MessageListener<K, V>接口。

容易混淆的几个类

//接口,封装原生KafkaConsumer,一个container封装一个consumer
interface MessageListenerContainer;
//单线程container实现,只启动一个consumer
class KafkaMessageListenerContainer implemets MessageListenerContainer;
//多线程container实现,负责创建多个KafkaMessageListenerContainer
class ConcurrentMessageListenerContainer implemets MessageListenerContainer;

//接口,工厂模式,container工厂,负责创建container,当使用@KafkaListener时需要提供
interface KafkaListenerContainerFactory<C extends MessageListenerContainer>;
//container工厂的唯一实现,且参数为多线程container,如果需要单线程,setConsurrency(null)即可,这也是默认参数
class KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>>

实现接口(非注解模式)

需要提供:
  • MessageListenerContainer
前者是consumer的业务实现(消息处理方法),后者是spring-kafka封装的对象,负责创建consumer

MessageListenerContainer

负责创建MessageListener,有两种MessageListenerContainer
  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer
前者是单线程消费使用;后者是多线程消费使用,通过代理的方式创建多个消费者。二者构造函数也比较相似
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties,
                    TopicPartitionInitialOffset... topicPartitions)
  • ConsumerFactory:包含了Consumer的配置信息
  • ContainerProperties:构造函数设置监听的topics,ContainerProps.setMessageListener方法设置MessageListener
  • TopicPartitionInitialOffset:可以指定要消费的Topic/Partition和对应的offset,也可以在ContainerProperties中指定,指定后属于低级消费
多线程下,使用ConcurrentMessageListenerContainer::setConcurrency(3),就会创建3个KafkaMessageListenerContainer。 一个单线程的示例如下
Configuration
@EnableKafka
public class KafkaConfig {

	@Bean
	public KafkaMessageListenerContainer<String,String> kafkaConatiner(){
		ContainerProperties properties=new ContainerProperties(topic) //can use TopicPartitionInitialOffset constructor to use low level consumer
		KafkaMessageListenerContainer<String,String> container=new KafkaMessageListenerContainer<String,String>(consumerFactory(),properties)
		container.getContainerProperties().setMessageListener("your listener")
		container.getContainerProperties().setAckMode(//ack mode,参见官网说明)
	}
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers)
        //put all configs in map
        ...
        return props;
    }
}
注意,高级消费要设置partition.assignment.strategy为RoundRobinAssignor,保证线程的平均分配(提醒:一个partition中,对于一个consumer group只能有一个线程消费,所以不存在多线程并发的问题,这也是Kafka高吞吐量的一个保证)如果不设置,可能会导致很多线程空闲。同时,线程多于partition*topic,Spring会自动的减少线程数来降低消耗。设置方法为
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor")

MessageListener<K, V>

这里有多种不同的接口,如下:
public interface MessageListener<K, V> { 1

    void onMessage(ConsumerRecord<K, V> data);

}

public interface AcknowledgingMessageListener<K, V> { 2

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

}

public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 3

    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);

}

public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 4

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

public interface BatchMessageListener<K, V> { 5

    void onMessage(List<ConsumerRecord<K, V>> data);

}

public interface BatchAcknowledgingMessageListener<K, V> { 6

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);

}

public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 7

    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);

}

public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 8

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}
总结一下就是:
 
根据Container中kafka的配置选用适当的MessageListener即可。 注意
  1. Consumer是线程不安全的,只能在调用他的线程中使用其方法
  1. 使用ack时, prop的map中要设置auto commit 为false,同时要在ContainerProperties::setAckMode中设置为MANNUAL或者MANNUAL_IMMEDIATE

@KafkaListener

首先按照上文的介绍创建consumerFactory(),再创建KafkaListenerContainerFactory,示例如下
@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);// set to null to use single thread
        factory.setBatchListener(true);// enable this when use batch listener
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        ...
        return props;
    }
}
直接添加注解到方法上
@Component
public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }
}
注意
  1. 默认需要创建一个名为**“kafkaListenerContainerFactory”的bean**,如果名称不一致,则需要在@KafkaListener中指定ContainerFactory
  1. 以上是多线程下的配置,非多线程设置concurrency为null,setConcurrency(null)
  1. 如果@KafkaListener只指定topic,则属于高级消费;指定了@TopicPartition则属于低级消费,要注意***设置线程分配策略***
  1. 被注解的方法参数如果是List,要记得设置batchListener为true
  1. 同样注意ackmode

Commit Offset

默认自动提交 配置中设为false,需要设置ackmode ContainerProperties.setAckMode(AbstractMessageListenerContainer.AckMode ackMode)
  • RECORD - 处理完一条记录后提交
  • BATCH - 处理完poll的一批数据后提交.
  • TIME - 处理完poll的一批数据后并且距离上次提交超过了设置的ackTime
  • COUNT - 处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount
  • COUNT_TIME - TIME和COUNT中任意一条满足即提交.
  • MANUAL - 手动调用Acknowledgment.acknowledge()后,并且处理完poll的这批数据后提交
  • MANUAL_IMMEDIATE - 手动调用Acknowledgment.acknowledge()后立即提交

Hint提示

一个程序有多种consumer配置?

  1. 扩展接口的方法,只需要定义新的KafkaMessageListenerContainer实现新配置,绑定新的MessageListener即可。
  1. @KafkaListener注解的方法:定义新的kafkaListenerContainerFactory实现新配置(主要是ContainerProperties),定义新的方法,添加@KafkaListener注解并指定刚创建的ContainerFactory即可。

选择用哪个?

  1. 都支持低级消费,都支持手动提交offset
  1. 注解不能方便的绑定topic,paitition等信息(当这些由运行时决定时)

其它

  1. @kafkaListener注解可以添加到类上,方法上加@KafkaHandler可以不同方法处理不同类型的消息
  1. 被注解的方法可以通过@Header获取更多record的信息

spring做了什么

  1. KafkaListenerContainerFactory创建ConcurrentMessageListenerContainer(仅使用注解时调用)
  1. ConcurrentMessageListenerContainer<K, V> 初始化,调用doStart方法(仅多线程时调用),根据线程数创建KafkaMessageListenerContainer(数量等于max[线程数,partition数]),并分配partition
    1. protected void doStart() {
      		//不在运行时执行
      		if (!isRunning()) {
      			checkTopics();
      			ContainerProperties containerProperties = getContainerProperties();
      			TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
      			//指定了topicPartitions,检查currency是否大于partition数,是则修正为partition数(低级消费)
      			if (topicPartitions != null
      					&& this.concurrency > topicPartitions.length) {
      				this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
      						+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
      						+ topicPartitions.length);
      				this.concurrency = topicPartitions.length;
      			}
      			//设置标志位
      			setRunning(true);
      
      			//循环创建KafkaMessageListenerContainer
      			for (int i = 0; i < this.concurrency; i++) {
      				KafkaMessageListenerContainer<K, V> container;
      				//高级消费
      				if (topicPartitions == null) {
      					container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
      							containerProperties);
      				}
      				//低级消费,通过partitionSubset方法分配
      				else {
      					container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
      							containerProperties, partitionSubset(containerProperties, i));
      				}
      				String beanName = getBeanName();
      				container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
      				if (getApplicationEventPublisher() != null) {
      					container.setApplicationEventPublisher(getApplicationEventPublisher());
      				}
      				container.setClientIdSuffix("-" + i);
      				container.setGenericErrorHandler(getGenericErrorHandler());
      				container.setAfterRollbackProcessor(getAfterRollbackProcessor());
      				//调用KafkaMessageListenerContainer的start方法
      				container.start();
      				this.containers.add(container);
      			}
      		}
      	}
      
      //获取当前线程要处理的partitions
      private TopicPartitionInitialOffset[] partitionSubset(ContainerProperties containerProperties, int i) {
      		TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
      		//一个线程,直接返回所有的
      		if (this.concurrency == 1) {
      			return topicPartitions;
      		}
      		else {
      			int numPartitions = topicPartitions.length;
      			//线程数等于partition数,直接返回对应的下标的partition
      			if (numPartitions == this.concurrency) {
      				return new TopicPartitionInitialOffset[] { topicPartitions[i] };
      			}
      			else {
      			//获得每个线程平均partition数(整数)
      				int perContainer = numPartitions / this.concurrency;
      				TopicPartitionInitialOffset[] subset;
      				//最后一个线程获得i * perContainer之后所有的partition
      				//低阶消费,spring的分配不是很均匀的分配方式,所以建议设置线程数=partitions数
      				if (i == this.concurrency - 1) {
      					subset = Arrays.copyOfRange(topicPartitions, i * perContainer, topicPartitions.length);
      				}
      				//其余每个线程获得perContainer个连续partition
      				else {
      					subset = Arrays.copyOfRange(topicPartitions, i * perContainer, (i + 1) * perContainer);
      				}
      				return subset;
      			}
      		}
      	}
      
  1. 调用AbstractMessageListenerContainer的start方法,该方法调用KafkaMessageListenerContainer的doStart()方法,该方法初始化container,创建ListenerConsumer,以下是ListenerConsumer构造方法的一部分。(根据上一步是否分配partition,调用subscribe或assign创建Consumer对象)
    1. final Consumer<K, V> consumer =
      					KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
      							this.consumerGroupId,
      							this.containerProperties.getClientId(),
      							KafkaMessageListenerContainer.this.clientIdSuffix);
      			this.consumer = consumer;
      
      			ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer);
      
      			//未指定topic partition,调用subscribe,高级api
      			if (KafkaMessageListenerContainer.this.topicPartitions == null) {
      				if (this.containerProperties.getTopicPattern() != null) {
      					consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
      				}
      				else {					consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
      				}
      			}
      			else {
      			//指定topic partition,调用assign,低级api
      				List<TopicPartitionInitialOffset> topicPartitions =
      						Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
      				this.definedPartitions = new HashMap<>(topicPartitions.size());
      				for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
      					this.definedPartitions.put(topicPartition.topicPartition(),
      							new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent(),
      									topicPartition.getPosition()));
      				}
      				consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
      			}
      

© Song 2015 - 2023