Custom spring kafka consumer thread pool

  kafka

Order

This article describes how to customize spring kafka’s consumer thread pool.

KafkaMessageListenerContainer

spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/listener/KafkaMessageListenerContainer.java

protected void doStart() {
        if (isRunning()) {
            return;
        }
        ContainerProperties containerProperties = getContainerProperties();

        if (!this.consumerFactory.isAutoCommit()) {
            AckMode ackMode = containerProperties.getAckMode();
            if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
                Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
            }
            if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
                    && containerProperties.getAckTime() == 0) {
                containerProperties.setAckTime(5000);
            }
        }

        Object messageListener = containerProperties.getMessageListener();
        Assert.state(messageListener != null, "A MessageListener is required");
        if (messageListener instanceof GenericAcknowledgingMessageListener) {
            this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
        }
        else if (messageListener instanceof GenericMessageListener) {
            this.listener = (GenericMessageListener<?>) messageListener;
        }
        else {
            throw new IllegalStateException("messageListener must be 'MessageListener' "
                    + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
        }
        if (containerProperties.getConsumerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor(consumerExecutor);
        }
        if (containerProperties.getListenerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-L-");
            containerProperties.setListenerTaskExecutor(listenerExecutor);
        }
        this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
        setRunning(true);
        this.listenerConsumerFuture = containerProperties
                .getConsumerTaskExecutor()
                .submitListenable(this.listenerConsumer);
    }

There are two thread pools involved, one is ConsumerTaskExecutor and the other is ListenerTaskExecutor, which are all configured in containerProperties.
If there is no default configuration, SimpleAsyncTaskExecutor with “-C-” and “-L-” will be created respectively.
ConsumerTaskExecutor used to go to poll kafka messages
The method used by Listener to call the @KafkaListener annotation

custom

The advantage of customizing executor and hosting it in spring container is that it can follow the life cycle of the container and gracefully close the thread pool before the container is destroyed.

Extension of concurrentkaffiskalistenerfactory

Spring-kafka-1.2.3.release-sources.jar can be rewritten! /org/springframework/kafka/config/concurrence tkafkalisteneccontainer.java’s initializeContainer method, and then set it up.

public class CustomConcurrentKafkaListenerContainerFactory<K,V> extends ConcurrentKafkaListenerContainerFactory<K,V> {

    /**
     * The executor for threads that poll the consumer.
     */
    private AsyncListenableTaskExecutor consumerTaskExecutor;

    /**
     * The executor for threads that invoke the listener.
     */
    private AsyncListenableTaskExecutor listenerTaskExecutor;

    public CustomConcurrentKafkaListenerContainerFactory(AsyncListenableTaskExecutor consumerTaskExecutor, AsyncListenableTaskExecutor listenerTaskExecutor) {
        this.consumerTaskExecutor = consumerTaskExecutor;
        this.listenerTaskExecutor = listenerTaskExecutor;
    }

    @Override
    protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance) {
        super.initializeContainer(instance);
        instance.getContainerProperties().setConsumerTaskExecutor(consumerTaskExecutor);
        instance.getContainerProperties().setListenerTaskExecutor(listenerTaskExecutor);
    }
}

Set up

Apply the custom kafkaListenerContainerFactory and replace it with your own extended concurrentkafkalistenercontainerfactory.

@Configuration
@AutoConfigureBefore(KafkaAutoConfiguration.class)
public class KafkaExecutorConfig {

    @Bean(name = "consumerTaskExecutor")
    public ThreadPoolTaskExecutor consumerTaskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("my-C-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    @Bean(name = "listenerTaskExecutor")
    public ThreadPoolTaskExecutor listenerTaskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("my-L-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new CustomConcurrentKafkaListenerContainerFactory<Object, Object>(consumerTaskExecutor(),listenerTaskExecutor());
        configurer.configure(factory, kafkaConsumerFactory);
        return factory;
    }
}

This is the end of the story.