Talk about spring’s Integration with kafka

  kafka

Order

This article mainly briefly combs and combs some usage choices of producing/consuming kafka messages in java applications.

Available class library

  • kafka client
  • spring for apache kafka
  • spring integration kafka
  • spring cloud stream binder kafka

In addition to the official java api class library, there are many extra packages in spring ecology, which are briefly introduced here.

spring for apache kafka

Kafka client and spring Integration Based on java Version

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.2.2.RELEASE</version>
</dependency>

Integration with springboot

Before Spring Boot version 1.5, you need to configure java configuration yourself, while after version 1.5, auto config is provided. For details, please refer to the package org.Spring Framework.boot.autoconfigure.kafka, which mainly includes

  • KafkaAutoConfiguration
    spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar! /org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {

    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(
            ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(
                kafkaProducerFactory);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

    @Bean
    @ConditionalOnMissingBean(ProducerListener.class)
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener<Object, Object>();
    }

    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
        return new DefaultKafkaConsumerFactory<Object, Object>(
                this.properties.buildConsumerProperties());
    }

    @Bean
    @ConditionalOnMissingBean(ProducerFactory.class)
    public ProducerFactory<?, ?> kafkaProducerFactory() {
        return new DefaultKafkaProducerFactory<Object, Object>(
                this.properties.buildProducerProperties());
    }

}
  • KafkaAnnotationDrivenConfiguration
    spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar! /org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java
@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {

    private final KafkaProperties properties;

    KafkaAnnotationDrivenConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean
    public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
        configurer.setKafkaProperties(this.properties);
        return configurer;
    }

    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, kafkaConsumerFactory);
        return factory;
    }

    @EnableKafka
    @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    protected static class EnableKafkaConfiguration {

    }
}
  • ConcurrentKafkaListenerContainerFactoryConfigurer
    spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar! /org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java
public class ConcurrentKafkaListenerContainerFactoryConfigurer {

    private KafkaProperties properties;

    /**
     * Set the {@link KafkaProperties} to use.
     * @param properties the properties
     */
    void setKafkaProperties(KafkaProperties properties) {
        this.properties = properties;
    }

    /**
     * Configure the specified Kafka listener container factory. The factory can be
     * further tuned and default settings can be overridden.
     * @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory}
     * instance to configure
     * @param consumerFactory the {@link ConsumerFactory} to use
     */
    public void configure(
            ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
            ConsumerFactory<Object, Object> consumerFactory) {
        listenerContainerFactory.setConsumerFactory(consumerFactory);
        Listener container = this.properties.getListener();
        ContainerProperties containerProperties = listenerContainerFactory
                .getContainerProperties();
        if (container.getAckMode() != null) {
            containerProperties.setAckMode(container.getAckMode());
        }
        if (container.getAckCount() != null) {
            containerProperties.setAckCount(container.getAckCount());
        }
        if (container.getAckTime() != null) {
            containerProperties.setAckTime(container.getAckTime());
        }
        if (container.getPollTimeout() != null) {
            containerProperties.setPollTimeout(container.getPollTimeout());
        }
        if (container.getConcurrency() != null) {
            listenerContainerFactory.setConcurrency(container.getConcurrency());
        }
    }

}

Creating multiple concurrent KafkaMessageListenerContainer is equivalent to one application instance creating multiple consumer

If it is springboot version 1.5 or above, it is relatively simple to use. Inject kafkaTemplate to send a message directly, and then simply configure it to consume the message.

spring integration kafka

Spring integration is spring’s implementation of Enterprise Integration Patterns, while spring integration kafka provides adapters for inbound and outbound channel based on spring for apache kafka.
Starting from version 2.0 version this project is a complete rewrite based on the new spring-kafka project which uses the pure java Producer and Consumer clients provided by Kafka 0.9.x.x and 0.10.x.x

In this case, there is no automatic configuration, and integration related concepts are introduced, which is relatively complicated as a whole.

Consumer configuration

    @Bean
    public KafkaMessageListenerContainer<String, String> container(
            ConsumerFactory<String, String> kafkaConsumerFactory) {
        return new KafkaMessageListenerContainer<>(kafkaConsumerFactory,
                new ContainerProperties(new TopicPartitionInitialOffset(topic, 0)));
    }

    /**
     * KAFKA consumer.
     */
    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    /**
     * Channel adapter for message.
     */
    @Bean
    public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) {
        KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
                new KafkaMessageDrivenChannelAdapter<>(container);
        kafkaMessageDrivenChannelAdapter.setOutputChannel(fromKafka());
        return kafkaMessageDrivenChannelAdapter;
    }

    /**
     * Channel for KAFKA message received.
     */
    @Bean
    public PollableChannel fromKafka() {
        return new QueueChannel();
    }

Producer configuration

    @Bean
    @ServiceActivator(inputChannel = "toKafka")
    public MessageHandler handler() throws Exception {
        KafkaProducerMessageHandler<String, String> handler =
                new KafkaProducerMessageHandler<>(kafkaTemplate());
        handler.setTopicExpression(new LiteralExpression(topic));
        handler.setMessageKeyExpression(new LiteralExpression(messageKey));
        return handler;
    }

    @Bean
    public ProducerFactory<String, String> kafkaProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(kafkaProducerFactory());
    }

Send and receive information

    @Autowired
    @Qualifier("fromKafka")
    private PollableChannel fromKafka;

    @Autowired
    @Qualifier("toKafka")
    MessageChannel toKafka;

    Message msg = fromKafka.receive(10000l);
    toKafka.send(new GenericMessage<Object>(UUID.randomUUID().toString()));

spring cloud stream

Based on Spring Integration, it is slightly processed and packaged in spring cloud environment.

For details, seeSpring cloud stream kafka instanceas well asSpring-cloud-stream-binder-kafka attribute configuration

doc