Order
This article mainly analyzes spring for kafka’s encapsulation and integration of the native kafka client consumer.
Consumer factory
spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/core/DefaultKafkaConsumerFactory.java
protected KafkaConsumer<K, V> createKafkaConsumer(String clientIdSuffix) {
if (!this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) || clientIdSuffix == null) {
return createKafkaConsumer();
}
else {
Map<String, Object> modifiedClientIdConfigs = new HashMap<>(this.configs);
modifiedClientIdConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,
modifiedClientIdConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG) + clientIdSuffix);
return createKafkaConsumer(modifiedClientIdConfigs);
}
}
protected KafkaConsumer<K, V> createKafkaConsumer(Map<String, Object> configs) {
return new KafkaConsumer<K, V>(configs, this.keyDeserializer, this.valueDeserializer);
}
ConcurrentKafkaListenerContainerFactory
spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java
The main package for consumers is the concurrent KafkaConsumer factory. its own kafka consumer is thread unsafe and cannot be operated concurrently. spring is wrapping a layer here. according to the configured spring. kafka. listener. concurrent, multiple concurrent KafkaMessageListenerContainer instances are generated.
spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
private final ConsumerFactory<K, V> consumerFactory;
private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
@Override
protected void doStart() {
if (!isRunning()) {
ContainerProperties containerProperties = getContainerProperties();
TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
if (topicPartitions != null
&& this.concurrency > topicPartitions.length) {
this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
+ topicPartitions.length);
this.concurrency = topicPartitions.length;
}
setRunning(true);
for (int i = 0; i < this.concurrency; i++) {
KafkaMessageListenerContainer<K, V> container;
if (topicPartitions == null) {
container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
}
else {
container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
partitionSubset(containerProperties, i));
}
if (getBeanName() != null) {
container.setBeanName(getBeanName() + "-" + i);
}
if (getApplicationEventPublisher() != null) {
container.setApplicationEventPublisher(getApplicationEventPublisher());
}
container.setClientIdSuffix("-" + i);
container.start();
this.containers.add(container);
}
}
}
//......
}
KafkaMessageListenerContainer
spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/listener/KafkaMessageListenerContainer.java
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
private final ConsumerFactory<K, V> consumerFactory;
private final TopicPartitionInitialOffset[] topicPartitions;
private ListenerConsumer listenerConsumer;
private ListenableFuture<?> listenerConsumerFuture;
private GenericMessageListener<?> listener;
private GenericAcknowledgingMessageListener<?> acknowledgingMessageListener;
private String clientIdSuffix;
@Override
protected void doStart() {
if (isRunning()) {
return;
}
ContainerProperties containerProperties = getContainerProperties();
if (!this.consumerFactory.isAutoCommit()) {
AckMode ackMode = containerProperties.getAckMode();
if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
}
if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
&& containerProperties.getAckTime() == 0) {
containerProperties.setAckTime(5000);
}
}
Object messageListener = containerProperties.getMessageListener();
Assert.state(messageListener != null, "A MessageListener is required");
if (messageListener instanceof GenericAcknowledgingMessageListener) {
this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
}
else if (messageListener instanceof GenericMessageListener) {
this.listener = (GenericMessageListener<?>) messageListener;
}
else {
throw new IllegalStateException("messageListener must be 'MessageListener' "
+ "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
}
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
if (containerProperties.getListenerTaskExecutor() == null) {
SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-L-");
containerProperties.setListenerTaskExecutor(listenerExecutor);
}
this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
setRunning(true);
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
}
@Override
protected void doStop(final Runnable callback) {
if (isRunning()) {
this.listenerConsumerFuture.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onFailure(Throwable e) {
KafkaMessageListenerContainer.this.logger.error("Error while stopping the container: ", e);
if (callback != null) {
callback.run();
}
}
@Override
public void onSuccess(Object result) {
if (KafkaMessageListenerContainer.this.logger.isDebugEnabled()) {
KafkaMessageListenerContainer.this.logger
.debug(KafkaMessageListenerContainer.this + " stopped normally");
}
if (callback != null) {
callback.run();
}
}
});
setRunning(false);
this.listenerConsumer.consumer.wakeup();
}
}
//......
}
Each KafkaMessageListenerContainer creates its own ListenerConsumer and then creates its own independent kafka consumer. Each ListenerConsumer runs in the thread pool to achieve concurrency.
Each ListenerConsumer has a recordsToProcess queue in which records from the original kafka consumer poll are placed, and then a ListenerInvoker thread cycles overtime waiting for records to be retrieved from recordsToProcess and handed over to the application’s KafkaListener marked method for execution.
private final class ListenerInvoker implements SchedulingAwareRunnable {
private final CountDownLatch exitLatch = new CountDownLatch(1);
private volatile boolean active = true;
private volatile Thread executingThread;
ListenerInvoker() {
super();
}
@Override
public void run() {
Assert.isTrue(this.active, "This instance is not active anymore");
if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) {
((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this);
}
try {
this.executingThread = Thread.currentThread();
while (this.active) {
try {
ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1,
TimeUnit.SECONDS);
if (this.active) {
if (records != null) {
invokeListener(records);
}
else {
if (ListenerConsumer.this.logger.isTraceEnabled()) {
ListenerConsumer.this.logger.trace("No records to process");
}
}
}
}
catch (InterruptedException e) {
if (!this.active) {
Thread.currentThread().interrupt();
}
else {
ListenerConsumer.this.logger.debug("Interrupt ignored");
}
}
}
}
finally {
this.active = false;
this.exitLatch.countDown();
}
}
@Override
public boolean isLongLived() {
return true;
}
private void stop() {
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Stopping invoker");
}
this.active = false;
try {
if (!this.exitLatch.await(getContainerProperties().getShutdownTimeout(), TimeUnit.MILLISECONDS)
&& this.executingThread != null) {
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Interrupting invoker");
}
this.executingThread.interrupt();
}
}
catch (InterruptedException e) {
if (this.executingThread != null) {
this.executingThread.interrupt();
}
Thread.currentThread().interrupt();
}
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Invoker stopped");
}
}
}
The invokeListener here is the onMessage method that calls the listener.
KafkaListener comments
Let’s take a look at how the method of labeling Kafkalistener is finally wrapped into the Listener called in the ListenerInvoker class.
KafkaListenerAnnotationBeanPostProcessor
spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
This class scans the bean’s KafkaListener comments and then registers its information with KafkaListenerEndpointRegistrar
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<Method> multiMethods = new ArrayList<Method>();
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {
@Override
public Set<KafkaListener> inspect(Method method) {
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
}
});
if (hasClassLevelListeners) {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
new ReflectionUtils.MethodFilter() {
@Override
public boolean matches(Method method) {
return AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null;
}
});
multiMethods.addAll(methodsWithHandler);
}
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
if (this.logger.isTraceEnabled()) {
this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());
}
}
else {
// Non-empty set of methods
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
processKafkaListener(listener, method, bean, beanName);
}
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
+ beanName + "': " + annotatedMethods);
}
}
if (hasClassLevelListeners) {
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
}
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<K, V>();
endpoint.setMethod(methodToUse);
endpoint.setBeanFactory(this.beanFactory);
processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
}
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean,
Object adminTarget, String beanName) {
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(kafkaListener));
endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
endpoint.setTopics(resolveTopics(kafkaListener));
endpoint.setTopicPattern(resolvePattern(kafkaListener));
String group = kafkaListener.containerGroup();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
}
}
KafkaListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
}
}
this.registrar.registerEndpoint(endpoint, factory);
}
KafkaListenerEndpointRegistrar
spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
/**
* Register a new {@link KafkaListenerEndpoint} alongside the
* {@link KafkaListenerContainerFactory} to use to create the underlying container.
* <p>The {@code factory} may be {@code null} if the default factory has to be
* used for that endpoint.
* @param endpoint the {@link KafkaListenerEndpoint} instance to register.
* @param factory the {@link KafkaListenerContainerFactory} to use.
*/
public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must be set");
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
// Factory may be null, we defer the resolution right before actually creating the container
KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
this.endpointDescriptors.add(descriptor);
}
}
}
Here, KafkaListenerEndpoint is packaged as kafkalistan stennerendpointdescriptor and registered in the kafkalistan stennerendpointdescriptor collection named endpointDescriptors.
KafkaListenerEndpointRegistrar
spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
@Override
public void afterPropertiesSet() {
registerAllEndpoints();
}
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
this.startImmediately = true; // trigger immediate startup
}
}
}
This class implements the afterPropertiesSet method of the InitializingBean interface (
Execute when initializing bean
In this case, according to endpointDescriptors, register will be called one by one.
KafkaListenerEndpointRegistry
spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/config/KafkaListenerEndpointRegistry.java
public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
ApplicationListener<ContextRefreshedEvent> {
protected final Log logger = LogFactory.getLog(getClass()); //NOSONAR
private final Map<String, MessageListenerContainer> listenerContainers =
new ConcurrentHashMap<String, MessageListenerContainer>();
//......
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
boolean startImmediately) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.hasText(id, "Endpoint id must not be empty");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List<MessageListenerContainer> containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}
else {
containerGroup = new ArrayList<MessageListenerContainer>();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
if (startImmediately) {
startIfNecessary(container);
}
}
}
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> factory) {
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
if (listenerContainer instanceof InitializingBean) {
try {
((InitializingBean) listenerContainer).afterPropertiesSet();
}
catch (Exception ex) {
throw new BeanInitializationException("Failed to initialize message listener container", ex);
}
}
int containerPhase = listenerContainer.getPhase();
if (containerPhase < Integer.MAX_VALUE) { // a custom phase value
if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
this.phase + " vs " + containerPhase);
}
this.phase = listenerContainer.getPhase();
}
return listenerContainer;
}
@Override
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
startIfNecessary(listenerContainer);
}
}
@Override
public void stop() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
listenerContainer.stop();
}
}
}
When registering, convert endpoint to MessageListenerContainer and put it into the map of listenerContainers.
AbstractKafkaListenerContainerFactory
spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
C instance = createContainerInstance(endpoint);
if (this.autoStartup != null) {
instance.setAutoStartup(this.autoStartup);
}
if (this.phase != null) {
instance.setPhase(this.phase);
}
if (this.applicationEventPublisher != null) {
instance.setApplicationEventPublisher(this.applicationEventPublisher);
}
if (endpoint.getId() != null) {
instance.setBeanName(endpoint.getId());
}
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
AbstractKafkaListenerEndpoint<K, V> aklEndpoint = (AbstractKafkaListenerEndpoint<K, V>) endpoint;
if (this.recordFilterStrategy != null) {
aklEndpoint.setRecordFilterStrategy(this.recordFilterStrategy);
}
if (this.ackDiscarded != null) {
aklEndpoint.setAckDiscarded(this.ackDiscarded);
}
if (this.retryTemplate != null) {
aklEndpoint.setRetryTemplate(this.retryTemplate);
}
if (this.recoveryCallback != null) {
aklEndpoint.setRecoveryCallback(this.recoveryCallback);
}
if (this.batchListener != null) {
aklEndpoint.setBatchListener(this.batchListener);
}
}
endpoint.setupListenerContainer(instance, this.messageConverter);
initializeContainer(instance);
return instance;
}
This is the setupMessageListener method.
spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java
private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) {
Object messageListener = createMessageListener(container, messageConverter);
Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");
if (this.retryTemplate != null) {
if (messageListener instanceof AcknowledgingMessageListener) {
messageListener = new RetryingAcknowledgingMessageListenerAdapter<>(
(AcknowledgingMessageListener<K, V>) messageListener, this.retryTemplate,
this.recoveryCallback);
}
else {
messageListener = new RetryingMessageListenerAdapter<>((MessageListener<K, V>) messageListener,
this.retryTemplate, (RecoveryCallback<Object>) this.recoveryCallback);
}
}
if (this.recordFilterStrategy != null) {
if (messageListener instanceof AcknowledgingMessageListener) {
messageListener = new FilteringAcknowledgingMessageListenerAdapter<>(
(AcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy,
this.ackDiscarded);
}
else if (messageListener instanceof MessageListener) {
messageListener = new FilteringMessageListenerAdapter<>((MessageListener<K, V>) messageListener,
this.recordFilterStrategy);
}
else if (messageListener instanceof BatchAcknowledgingMessageListener) {
messageListener = new FilteringBatchAcknowledgingMessageListenerAdapter<>(
(BatchAcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy,
this.ackDiscarded);
}
else if (messageListener instanceof BatchMessageListener) {
messageListener = new FilteringBatchMessageListenerAdapter<>(
(BatchMessageListener<K, V>) messageListener, this.recordFilterStrategy);
}
}
container.setupMessageListener(messageListener);
}
This messageListener contains the bean carried by the original endpoint and the InvocableHandlerMethod converted from the method, which is then injected into the MessageListenerContainer (
ConcurrentMessageListenerContainer
), and then here, on the connection of the concatenation of the following header, multiple concurrent KafkaMessageListenerContainer instances are generated according to the configured spring.kafka.listener.concatenation.
MethodKafkaListenerEndpoint
CreateMessageListener this method packs the bean and method originally annotated with KafkaListener annotation contained in endpoint as InvocableHandlerMethod and injects it into messagingmessagelisteneradapter
public class MethodKafkaListenerEndpoint<K, V> extends AbstractKafkaListenerEndpoint<K, V> {
private Object bean;
private Method method;
private MessageHandlerMethodFactory messageHandlerMethodFactory;
//......
@Override
protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,
MessageConverter messageConverter) {
Assert.state(this.messageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not set");
MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);
messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
return messageListener;
}
/**
* Create a {@link HandlerAdapter} for this listener adapter.
* @param messageListener the listener adapter.
* @return the handler adapter.
*/
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) {
InvocableHandlerMethod invocableHandlerMethod =
this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod());
return new HandlerAdapter(invocableHandlerMethod);
}
}
This class wraps the original bean and method as the InvocableHandlerMethod class and injects it into the MessageMessageListener
Summary
- It is relatively simple for producers to package KafkaProducer to KafkaTemplate.
-
For consumers, spring annotates the message processing method in the form of annotations, so it is a little difficult here:
- First scan the bean in kafkalistan stennerannotationbeanpostprocessor, and then register with KafkaListenerEndpointRegistrar
- While KafkaListenerEndpointRegistrar created MessageListenerContainer at the time of afterPropertiesSet.
- The messageListener contains the bean carried by the original endpoint and the InvocableHandlerMethod converted from the method.
- On the connection of Concurrent MessageListener, according to the configured Spring. Kafka. Listener. Concurrency, multiple concurrent KafkaMessageListenerContainer instances are generated.
- Each KafkaMessageListenerContainer creates its own ListenerConsumer, and then creates its own independent kafka consumer. Each ListenerConsumer runs in the thread pool to achieve concurrency.
- Each ListenerConsumer has a recordsToProcess queue in which records from the original kafka consumer poll are placed.
- Then there is a ListenerInvoker thread that cycles overtime waiting for the record to be retrieved from recordsToProcess, and then calls the messageListener’s onMessage method (
That is, KafkaListener annotation standard method
)
ListenerConsumer is the key point, including the submission of offset, which will be explained in detail another time.