Spring for kafka automatic configuration and configuration properties

  kafka

Order

This article mainly lists some auto config and attribute configurations of spring for apache kafka.

maven

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.2.3.RELEASE</version>
</dependency>

This version uses Kafka Client version 0.10.2.1
Spring retry used is version 1.1.3.RELEASE

Several Key Configuration Classes

  • KafkaAutoConfiguration
    spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar! /org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {

    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(
            ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(
                kafkaProducerFactory);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

    @Bean
    @ConditionalOnMissingBean(ProducerListener.class)
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener<Object, Object>();
    }

    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
        return new DefaultKafkaConsumerFactory<Object, Object>(
                this.properties.buildConsumerProperties());
    }

    @Bean
    @ConditionalOnMissingBean(ProducerFactory.class)
    public ProducerFactory<?, ?> kafkaProducerFactory() {
        return new DefaultKafkaProducerFactory<Object, Object>(
                this.properties.buildProducerProperties());
    }

}
  • KafkaAnnotationDrivenConfiguration
    spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar! /org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java
@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {

    private final KafkaProperties properties;

    KafkaAnnotationDrivenConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean
    public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
        configurer.setKafkaProperties(this.properties);
        return configurer;
    }

    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, kafkaConsumerFactory);
        return factory;
    }

    @EnableKafka
    @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    protected static class EnableKafkaConfiguration {

    }
}

Configure attributes

spring-boot-autoconfigure-1.5.7.RELEASE.jar! /META-INF/spring-configuration-metadata.json

Public allocation

    {
      "name": "spring.kafka.bootstrap-servers",
      "type": "java.util.List<java.lang.String>",
      "description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties"
    },
    {
      "name": "spring.kafka.client-id",
      "type": "java.lang.String",
      "description": "Id to pass to the server when making requests; used for server-side logging.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties"
    },
        {
      "name": "spring.kafka.ssl.key-password",
      "type": "java.lang.String",
      "description": "Password of the private key in the key store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.ssl.keystore-location",
      "type": "org.springframework.core.io.Resource",
      "description": "Location of the key store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.ssl.keystore-password",
      "type": "java.lang.String",
      "description": "Store password for the key store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.ssl.truststore-location",
      "type": "org.springframework.core.io.Resource",
      "description": "Location of the trust store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.ssl.truststore-password",
      "type": "java.lang.String",
      "description": "Store password for the trust store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.template.default-topic",
      "type": "java.lang.String",
      "description": "Default topic to which messages will be sent.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Template"
    }

Consumer configuration properties

    {
      "name": "spring.kafka.consumer.auto-commit-interval",
      "type": "java.lang.Integer",
      "description": "Frequency in milliseconds that the consumer offsets are auto-committed to Kafka\n if 'enable.auto.commit' true.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.auto-offset-reset",
      "type": "java.lang.String",
      "description": "What to do when there is no initial offset in Kafka or if the current offset\n does not exist any more on the server.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.bootstrap-servers",
      "type": "java.util.List<java.lang.String>",
      "description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.client-id",
      "type": "java.lang.String",
      "description": "Id to pass to the server when making requests; used for server-side logging.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.enable-auto-commit",
      "type": "java.lang.Boolean",
      "description": "If true the consumer's offset will be periodically committed in the background.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.fetch-max-wait",
      "type": "java.lang.Integer",
      "description": "Maximum amount of time in milliseconds the server will block before answering\n the fetch request if there isn't sufficient data to immediately satisfy the\n requirement given by \"fetch.min.bytes\".",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.fetch-min-size",
      "type": "java.lang.Integer",
      "description": "Minimum amount of data the server should return for a fetch request in bytes.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.group-id",
      "type": "java.lang.String",
      "description": "Unique string that identifies the consumer group this consumer belongs to.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.heartbeat-interval",
      "type": "java.lang.Integer",
      "description": "Expected time in milliseconds between heartbeats to the consumer coordinator.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.key-deserializer",
      "type": "java.lang.Class<?>",
      "description": "Deserializer class for keys.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.max-poll-records",
      "type": "java.lang.Integer",
      "description": "Maximum number of records returned in a single call to poll().",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.value-deserializer",
      "type": "java.lang.Class<?>",
      "description": "Deserializer class for values.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.listener.ack-count",
      "type": "java.lang.Integer",
      "description": "Number of records between offset commits when ackMode is \"COUNT\" or\n \"COUNT_TIME\".",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    },
    {
      "name": "spring.kafka.listener.ack-mode",
      "type": "org.springframework.kafka.listener.AbstractMessageListenerContainer$AckMode",
      "description": "Listener AckMode; see the spring-kafka documentation.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    },
    {
      "name": "spring.kafka.listener.ack-time",
      "type": "java.lang.Long",
      "description": "Time in milliseconds between offset commits when ackMode is \"TIME\" or\n \"COUNT_TIME\".",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    },
    {
      "name": "spring.kafka.listener.concurrency",
      "type": "java.lang.Integer",
      "description": "Number of threads to run in the listener containers.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    },
    {
      "name": "spring.kafka.listener.poll-timeout",
      "type": "java.lang.Long",
      "description": "Timeout in milliseconds to use when polling the consumer.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    }

Producer configuration

{
      "name": "spring.kafka.producer.acks",
      "type": "java.lang.String",
      "description": "Number of acknowledgments the producer requires the leader to have received\n before considering a request complete.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.batch-size",
      "type": "java.lang.Integer",
      "description": "Number of records to batch before sending.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.bootstrap-servers",
      "type": "java.util.List<java.lang.String>",
      "description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.buffer-memory",
      "type": "java.lang.Long",
      "description": "Total bytes of memory the producer can use to buffer records waiting to be sent\n to the server.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.client-id",
      "type": "java.lang.String",
      "description": "Id to pass to the server when making requests; used for server-side logging.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.compression-type",
      "type": "java.lang.String",
      "description": "Compression type for all data generated by the producer.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.key-serializer",
      "type": "java.lang.Class<?>",
      "description": "Serializer class for keys.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.retries",
      "type": "java.lang.Integer",
      "description": "When greater than zero, enables retrying of failed sends.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.value-serializer",
      "type": "java.lang.Class<?>",
      "description": "Serializer class for values.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.properties",
      "type": "java.util.Map<java.lang.String,java.lang.String>",
      "description": "Additional properties used to configure the client.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties"
    }