Talk about spring for kafka’s Packaging and Integration of consumer

  kafka

Order

This article mainly analyzes spring for kafka’s encapsulation and integration of the native kafka client consumer.

Consumer factory

spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

protected KafkaConsumer<K, V> createKafkaConsumer(String clientIdSuffix) {
        if (!this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) || clientIdSuffix == null) {
            return createKafkaConsumer();
        }
        else {
            Map<String, Object> modifiedClientIdConfigs = new HashMap<>(this.configs);
            modifiedClientIdConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,
                    modifiedClientIdConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG) + clientIdSuffix);
            return createKafkaConsumer(modifiedClientIdConfigs);
        }
    }

    protected KafkaConsumer<K, V> createKafkaConsumer(Map<String, Object> configs) {
        return new KafkaConsumer<K, V>(configs, this.keyDeserializer, this.valueDeserializer);
    }

ConcurrentKafkaListenerContainerFactory

spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java

The main package for consumers is the concurrent KafkaConsumer factory. its own kafka consumer is thread unsafe and cannot be operated concurrently. spring is wrapping a layer here. according to the configured spring. kafka. listener. concurrent, multiple concurrent KafkaMessageListenerContainer instances are generated.
spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {

    private final ConsumerFactory<K, V> consumerFactory;

    private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();

    @Override
    protected void doStart() {
        if (!isRunning()) {
            ContainerProperties containerProperties = getContainerProperties();
            TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
            if (topicPartitions != null
                    && this.concurrency > topicPartitions.length) {
                this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
                        + "equal to the number of partitions; reduced from " + this.concurrency + " to "
                        + topicPartitions.length);
                this.concurrency = topicPartitions.length;
            }
            setRunning(true);

            for (int i = 0; i < this.concurrency; i++) {
                KafkaMessageListenerContainer<K, V> container;
                if (topicPartitions == null) {
                    container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
                }
                else {
                    container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
                            partitionSubset(containerProperties, i));
                }
                if (getBeanName() != null) {
                    container.setBeanName(getBeanName() + "-" + i);
                }
                if (getApplicationEventPublisher() != null) {
                    container.setApplicationEventPublisher(getApplicationEventPublisher());
                }
                container.setClientIdSuffix("-" + i);
                container.start();
                this.containers.add(container);
            }
        }
    }
    //......
}    

KafkaMessageListenerContainer

spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/listener/KafkaMessageListenerContainer.java

public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {

    private final ConsumerFactory<K, V> consumerFactory;

    private final TopicPartitionInitialOffset[] topicPartitions;

    private ListenerConsumer listenerConsumer;

    private ListenableFuture<?> listenerConsumerFuture;

    private GenericMessageListener<?> listener;

    private GenericAcknowledgingMessageListener<?> acknowledgingMessageListener;

    private String clientIdSuffix;

    @Override
    protected void doStart() {
        if (isRunning()) {
            return;
        }
        ContainerProperties containerProperties = getContainerProperties();

        if (!this.consumerFactory.isAutoCommit()) {
            AckMode ackMode = containerProperties.getAckMode();
            if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
                Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
            }
            if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
                    && containerProperties.getAckTime() == 0) {
                containerProperties.setAckTime(5000);
            }
        }

        Object messageListener = containerProperties.getMessageListener();
        Assert.state(messageListener != null, "A MessageListener is required");
        if (messageListener instanceof GenericAcknowledgingMessageListener) {
            this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
        }
        else if (messageListener instanceof GenericMessageListener) {
            this.listener = (GenericMessageListener<?>) messageListener;
        }
        else {
            throw new IllegalStateException("messageListener must be 'MessageListener' "
                    + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
        }
        if (containerProperties.getConsumerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor(consumerExecutor);
        }
        if (containerProperties.getListenerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-L-");
            containerProperties.setListenerTaskExecutor(listenerExecutor);
        }
        this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
        setRunning(true);
        this.listenerConsumerFuture = containerProperties
                .getConsumerTaskExecutor()
                .submitListenable(this.listenerConsumer);
    }

    @Override
    protected void doStop(final Runnable callback) {
        if (isRunning()) {
            this.listenerConsumerFuture.addCallback(new ListenableFutureCallback<Object>() {

                @Override
                public void onFailure(Throwable e) {
                    KafkaMessageListenerContainer.this.logger.error("Error while stopping the container: ", e);
                    if (callback != null) {
                        callback.run();
                    }
                }

                @Override
                public void onSuccess(Object result) {
                    if (KafkaMessageListenerContainer.this.logger.isDebugEnabled()) {
                        KafkaMessageListenerContainer.this.logger
                                .debug(KafkaMessageListenerContainer.this + " stopped normally");
                    }
                    if (callback != null) {
                        callback.run();
                    }
                }
            });
            setRunning(false);
            this.listenerConsumer.consumer.wakeup();
        }
    }
    //......

}    

Each KafkaMessageListenerContainer creates its own ListenerConsumer and then creates its own independent kafka consumer. Each ListenerConsumer runs in the thread pool to achieve concurrency.

Each ListenerConsumer has a recordsToProcess queue in which records from the original kafka consumer poll are placed, and then a ListenerInvoker thread cycles overtime waiting for records to be retrieved from recordsToProcess and handed over to the application’s KafkaListener marked method for execution.

private final class ListenerInvoker implements SchedulingAwareRunnable {

            private final CountDownLatch exitLatch = new CountDownLatch(1);

            private volatile boolean active = true;

            private volatile Thread executingThread;

            ListenerInvoker() {
                super();
            }

            @Override
            public void run() {
                Assert.isTrue(this.active, "This instance is not active anymore");
                if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) {
                    ((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this);
                }
                try {
                    this.executingThread = Thread.currentThread();
                    while (this.active) {
                        try {
                            ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1,
                                    TimeUnit.SECONDS);
                            if (this.active) {
                                if (records != null) {
                                    invokeListener(records);
                                }
                                else {
                                    if (ListenerConsumer.this.logger.isTraceEnabled()) {
                                        ListenerConsumer.this.logger.trace("No records to process");
                                    }
                                }
                            }
                        }
                        catch (InterruptedException e) {
                            if (!this.active) {
                                Thread.currentThread().interrupt();
                            }
                            else {
                                ListenerConsumer.this.logger.debug("Interrupt ignored");
                            }
                        }
                    }
                }
                finally {
                    this.active = false;
                    this.exitLatch.countDown();
                }
            }

            @Override
            public boolean isLongLived() {
                return true;
            }

            private void stop() {
                if (ListenerConsumer.this.logger.isDebugEnabled()) {
                    ListenerConsumer.this.logger.debug("Stopping invoker");
                }
                this.active = false;
                try {
                    if (!this.exitLatch.await(getContainerProperties().getShutdownTimeout(), TimeUnit.MILLISECONDS)
                            && this.executingThread != null) {
                        if (ListenerConsumer.this.logger.isDebugEnabled()) {
                            ListenerConsumer.this.logger.debug("Interrupting invoker");
                        }
                        this.executingThread.interrupt();
                    }
                }
                catch (InterruptedException e) {
                    if (this.executingThread != null) {
                        this.executingThread.interrupt();
                    }
                    Thread.currentThread().interrupt();
                }
                if (ListenerConsumer.this.logger.isDebugEnabled()) {
                    ListenerConsumer.this.logger.debug("Invoker stopped");
                }
            }
        }

The invokeListener here is the onMessage method that calls the listener.

KafkaListener comments

Let’s take a look at how the method of labeling Kafkalistener is finally wrapped into the Listener called in the ListenerInvoker class.

KafkaListenerAnnotationBeanPostProcessor

spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
This class scans the bean’s KafkaListener comments and then registers its information with KafkaListenerEndpointRegistrar

    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
            final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
            final List<Method> multiMethods = new ArrayList<Method>();
            Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {

                        @Override
                        public Set<KafkaListener> inspect(Method method) {
                            Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
                            return (!listenerMethods.isEmpty() ? listenerMethods : null);
                        }

                    });
            if (hasClassLevelListeners) {
                Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
                        new ReflectionUtils.MethodFilter() {

                            @Override
                            public boolean matches(Method method) {
                                return AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null;
                            }

                        });
                multiMethods.addAll(methodsWithHandler);
            }
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(bean.getClass());
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());
                }
            }
            else {
                // Non-empty set of methods
                for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
                    Method method = entry.getKey();
                    for (KafkaListener listener : entry.getValue()) {
                        processKafkaListener(listener, method, bean, beanName);
                    }
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
                            + beanName + "': " + annotatedMethods);
                }
            }
            if (hasClassLevelListeners) {
                processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
            }
        }
        return bean;
    }

    protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
        Method methodToUse = checkProxy(method, bean);
        MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<K, V>();
        endpoint.setMethod(methodToUse);
        endpoint.setBeanFactory(this.beanFactory);
        processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
    }

protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean,
            Object adminTarget, String beanName) {
        endpoint.setBean(bean);
        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        endpoint.setId(getEndpointId(kafkaListener));
        endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
        endpoint.setTopics(resolveTopics(kafkaListener));
        endpoint.setTopicPattern(resolvePattern(kafkaListener));
        String group = kafkaListener.containerGroup();
        if (StringUtils.hasText(group)) {
            Object resolvedGroup = resolveExpression(group);
            if (resolvedGroup instanceof String) {
                endpoint.setGroup((String) resolvedGroup);
            }
        }

        KafkaListenerContainerFactory<?> factory = null;
        String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
        if (StringUtils.hasText(containerFactoryBeanName)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
            }
            catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
                        + "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
                        + " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
            }
        }

        this.registrar.registerEndpoint(endpoint, factory);
    }

KafkaListenerEndpointRegistrar

spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java

/**
     * Register a new {@link KafkaListenerEndpoint} alongside the
     * {@link KafkaListenerContainerFactory} to use to create the underlying container.
     * <p>The {@code factory} may be {@code null} if the default factory has to be
     * used for that endpoint.
     * @param endpoint the {@link KafkaListenerEndpoint} instance to register.
     * @param factory the {@link KafkaListenerContainerFactory} to use.
     */
    public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
        Assert.notNull(endpoint, "Endpoint must be set");
        Assert.hasText(endpoint.getId(), "Endpoint id must be set");
        // Factory may be null, we defer the resolution right before actually creating the container
        KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
        synchronized (this.endpointDescriptors) {
            if (this.startImmediately) { // Register and start immediately
                this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
                        resolveContainerFactory(descriptor), true);
            }
            else {
                this.endpointDescriptors.add(descriptor);
            }
        }
    }

Here, KafkaListenerEndpoint is packaged as kafkalistan stennerendpointdescriptor and registered in the kafkalistan stennerendpointdescriptor collection named endpointDescriptors.

KafkaListenerEndpointRegistrar

spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {

    @Override
    public void afterPropertiesSet() {
        registerAllEndpoints();
    }

    protected void registerAllEndpoints() {
        synchronized (this.endpointDescriptors) {
            for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
                this.endpointRegistry.registerListenerContainer(
                        descriptor.endpoint, resolveContainerFactory(descriptor));
            }
            this.startImmediately = true;  // trigger immediate startup
        }
    }
}

This class implements the afterPropertiesSet method of the InitializingBean interface (Execute when initializing beanIn this case, according to endpointDescriptors, register will be called one by one.

KafkaListenerEndpointRegistry

spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
        ApplicationListener<ContextRefreshedEvent> {

    protected final Log logger = LogFactory.getLog(getClass()); //NOSONAR

    private final Map<String, MessageListenerContainer> listenerContainers =
            new ConcurrentHashMap<String, MessageListenerContainer>();

    //......        
    public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
            boolean startImmediately) {
        Assert.notNull(endpoint, "Endpoint must not be null");
        Assert.notNull(factory, "Factory must not be null");

        String id = endpoint.getId();
        Assert.hasText(id, "Endpoint id must not be empty");
        synchronized (this.listenerContainers) {
            Assert.state(!this.listenerContainers.containsKey(id),
                    "Another endpoint is already registered with id '" + id + "'");
            MessageListenerContainer container = createListenerContainer(endpoint, factory);
            this.listenerContainers.put(id, container);
            if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
                List<MessageListenerContainer> containerGroup;
                if (this.applicationContext.containsBean(endpoint.getGroup())) {
                    containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
                }
                else {
                    containerGroup = new ArrayList<MessageListenerContainer>();
                    this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
                }
                containerGroup.add(container);
            }
            if (startImmediately) {
                startIfNecessary(container);
            }
        }
    }

    protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
            KafkaListenerContainerFactory<?> factory) {

        MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);

        if (listenerContainer instanceof InitializingBean) {
            try {
                ((InitializingBean) listenerContainer).afterPropertiesSet();
            }
            catch (Exception ex) {
                throw new BeanInitializationException("Failed to initialize message listener container", ex);
            }
        }

        int containerPhase = listenerContainer.getPhase();
        if (containerPhase < Integer.MAX_VALUE) {  // a custom phase value
            if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
                throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
                        this.phase + " vs " + containerPhase);
            }
            this.phase = listenerContainer.getPhase();
        }

        return listenerContainer;
    }

    @Override
    public void start() {
        for (MessageListenerContainer listenerContainer : getListenerContainers()) {
            startIfNecessary(listenerContainer);
        }
    }

    @Override
    public void stop() {
        for (MessageListenerContainer listenerContainer : getListenerContainers()) {
            listenerContainer.stop();
        }
    }
}        

When registering, convert endpoint to MessageListenerContainer and put it into the map of listenerContainers.

AbstractKafkaListenerContainerFactory

spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

public C createListenerContainer(KafkaListenerEndpoint endpoint) {
        C instance = createContainerInstance(endpoint);

        if (this.autoStartup != null) {
            instance.setAutoStartup(this.autoStartup);
        }
        if (this.phase != null) {
            instance.setPhase(this.phase);
        }
        if (this.applicationEventPublisher != null) {
            instance.setApplicationEventPublisher(this.applicationEventPublisher);
        }
        if (endpoint.getId() != null) {
            instance.setBeanName(endpoint.getId());
        }

        if (endpoint instanceof AbstractKafkaListenerEndpoint) {
            AbstractKafkaListenerEndpoint<K, V> aklEndpoint = (AbstractKafkaListenerEndpoint<K, V>) endpoint;
            if (this.recordFilterStrategy != null) {
                aklEndpoint.setRecordFilterStrategy(this.recordFilterStrategy);
            }
            if (this.ackDiscarded != null) {
                aklEndpoint.setAckDiscarded(this.ackDiscarded);
            }
            if (this.retryTemplate != null) {
                aklEndpoint.setRetryTemplate(this.retryTemplate);
            }
            if (this.recoveryCallback != null) {
                aklEndpoint.setRecoveryCallback(this.recoveryCallback);
            }
            if (this.batchListener != null) {
                aklEndpoint.setBatchListener(this.batchListener);
            }
        }

        endpoint.setupListenerContainer(instance, this.messageConverter);
        initializeContainer(instance);

        return instance;
    }

This is the setupMessageListener method.

spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) {
        Object messageListener = createMessageListener(container, messageConverter);
        Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");
        if (this.retryTemplate != null) {
            if (messageListener instanceof AcknowledgingMessageListener) {
                messageListener = new RetryingAcknowledgingMessageListenerAdapter<>(
                        (AcknowledgingMessageListener<K, V>) messageListener, this.retryTemplate,
                        this.recoveryCallback);
            }
            else {
                messageListener = new RetryingMessageListenerAdapter<>((MessageListener<K, V>) messageListener,
                        this.retryTemplate, (RecoveryCallback<Object>) this.recoveryCallback);
            }
        }
        if (this.recordFilterStrategy != null) {
            if (messageListener instanceof AcknowledgingMessageListener) {
                messageListener = new FilteringAcknowledgingMessageListenerAdapter<>(
                        (AcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy,
                        this.ackDiscarded);
            }
            else if (messageListener instanceof MessageListener) {
                messageListener = new FilteringMessageListenerAdapter<>((MessageListener<K, V>) messageListener,
                        this.recordFilterStrategy);
            }
            else if (messageListener instanceof BatchAcknowledgingMessageListener) {
                messageListener = new FilteringBatchAcknowledgingMessageListenerAdapter<>(
                        (BatchAcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy,
                        this.ackDiscarded);
            }
            else if (messageListener instanceof BatchMessageListener) {
                messageListener = new FilteringBatchMessageListenerAdapter<>(
                        (BatchMessageListener<K, V>) messageListener, this.recordFilterStrategy);
            }
        }
        container.setupMessageListener(messageListener);
    }

This messageListener contains the bean carried by the original endpoint and the InvocableHandlerMethod converted from the method, which is then injected into the MessageListenerContainer (ConcurrentMessageListenerContainer), and then here, on the connection of the concatenation of the following header, multiple concurrent KafkaMessageListenerContainer instances are generated according to the configured spring.kafka.listener.concatenation.

MethodKafkaListenerEndpoint

CreateMessageListener this method packs the bean and method originally annotated with KafkaListener annotation contained in endpoint as InvocableHandlerMethod and injects it into messagingmessagelisteneradapter

public class MethodKafkaListenerEndpoint<K, V> extends AbstractKafkaListenerEndpoint<K, V> {

    private Object bean;

    private Method method;

    private MessageHandlerMethodFactory messageHandlerMethodFactory;

    //......

    @Override
    protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,
            MessageConverter messageConverter) {
        Assert.state(this.messageHandlerMethodFactory != null,
                "Could not create message listener - MessageHandlerMethodFactory not set");
        MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);
        messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
        return messageListener;
    }

    /**
     * Create a {@link HandlerAdapter} for this listener adapter.
     * @param messageListener the listener adapter.
     * @return the handler adapter.
     */
    protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) {
        InvocableHandlerMethod invocableHandlerMethod =
                this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod());
        return new HandlerAdapter(invocableHandlerMethod);
    }
}    

This class wraps the original bean and method as the InvocableHandlerMethod class and injects it into the MessageMessageListener

Summary

  • It is relatively simple for producers to package KafkaProducer to KafkaTemplate.
  • For consumers, spring annotates the message processing method in the form of annotations, so it is a little difficult here:

    • First scan the bean in kafkalistan stennerannotationbeanpostprocessor, and then register with KafkaListenerEndpointRegistrar
    • While KafkaListenerEndpointRegistrar created MessageListenerContainer at the time of afterPropertiesSet.
    • The messageListener contains the bean carried by the original endpoint and the InvocableHandlerMethod converted from the method.
    • On the connection of Concurrent MessageListener, according to the configured Spring. Kafka. Listener. Concurrency, multiple concurrent KafkaMessageListenerContainer instances are generated.
    • Each KafkaMessageListenerContainer creates its own ListenerConsumer, and then creates its own independent kafka consumer. Each ListenerConsumer runs in the thread pool to achieve concurrency.
    • Each ListenerConsumer has a recordsToProcess queue in which records from the original kafka consumer poll are placed.
    • Then there is a ListenerInvoker thread that cycles overtime waiting for the record to be retrieved from recordsToProcess, and then calls the messageListener’s onMessage method (That is, KafkaListener annotation standard method)

ListenerConsumer is the key point, including the submission of offset, which will be explained in detail another time.