Spring-cloud-stream-binder-kafka attribute configuration

  kafka

Order

This article briefly introduces some attribute configurations of Spring-Cloud-Stream-Binder-Kafka.

maven

       <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>1.0.3.RELEASE</version>
        </dependency>

Stream attribute

spring-cloud-stream-1.0.3.RELEASE-sources.jar! /org/springframework/cloud/stream/config/ChannelBindingServiceProperties.java

spring:
  cloud:
     stream:
       instanceIndex: 0 ##支持环境变量INSTANCE_INDEX
                        ## The instance index of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka
       instanceCount: 1 ## The number of deployed instances of an application. Must be set for partitioning and if using Kafka.
                        ## used to partition data across different consumers.

Topic can be logically considered a queue. Each consumption must specify its topic, which can be simply understood as indicating which queue to put this message into. In order to enable the throughput of Kafka to expand horizontally, topic is physically divided into one or more partitions, each partition physically corresponds to a folder, under which all messages and index files of this partition are stored. The PARTITION naming rule is topic name+ordered sequence number. The first PAR TITION sequence number starts from 0, and the maximum value of sequence number is the number of partitions minus 1.

Messages in the same partition can only be consumed by one consumer in the same group.

When the number of consumers exceeds the number of partition, the surplus consumers are idle.

When the number of consumers is less than or equal to the number of partitions, there will be a situation where multiple partitions correspond to one consumer, and individual consumers will consume more than others.

InstanceCount is mainly used by consumers and is generally less than or equal to the number of partitions in topic. It is mainly used for consumer partitions.

Bingdings property

spring-cloud-stream-1.0.3.RELEASE-sources.jar! /org/springframework/cloud/stream/config/BindingProperties.java

spring:
  cloud:
     stream:
        bindings:
          output:
            destination: event-demo
            content-type: text/plain
            #group: test  ##consumer属性
            #producer:
            #consumer:

producer

spring-cloud-stream-1.0.3.RELEASE-sources.jar! /org/springframework/cloud/stream/binder/ProducerProperties.java

spring:
  cloud:
     stream:
        bindings:
          output:
            destination: event-demo
            content-type: text/plain
            producer:
              partitionCount: 1
              headerMode
              partitionKeyExtractorClass: org.springframework.cloud.stream.partitioning.CustomPartitionKeyExtractorClass
              partitionSelectorClass: org.springframework.cloud.stream.partitioning.CustomPartitionSelectorClass
              headerMode: raw
  • Kafka producer extended properties
    spring-cloud-stream-binder-kafka-1.0.3.RELEASE-sources.jar! /org/springframework/cloud/stream/binder/kafka/KafkaProducerProperties.java
spring:
  cloud:
     stream:
        bindings:
          output:
            destination: event-demo
            content-type: text/plain
            producer:
              bufferSize: 16384
              maxRequestSize: 1048576
              sync: true
              batchTimeout: 0

consumer

spring-cloud-stream-1.0.3.RELEASE-sources.jar! /org/springframework/cloud/stream/binder/ConsumerProperties.java

spring:
  cloud:
     stream:
        bindings:
          input:
            destination: event-demo
            content-type: text/plain
            consumer:
              concurrency: 1 ## The concurrency of the inbound consumer.
              partitioned: false ## Whether the consumer receives data from a partitioned producer.Default: false.
              headerMode: raw
  • Kafka consumer extended attribute
    spring-cloud-stream-binder-kafka-1.0.3.RELEASE-sources.jar! /org/springframework/cloud/stream/binder/kafka/KafkaConsumerProperties.java
spring:
  cloud:
     stream:
        bindings:
          input:
            destination: event-demo
            content-type: text/plain
            consumer:
              autoCommitOffset: false
              resetOffsets: true
              startOffset: earliest
              enableDlq: false
              recoveryInterval: 5000

Native apis

ConsumerConfig consumerConfig =  new kafka.consumer.ConsumerConfig(props);
        ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, consumerCount);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);

Here the head topicCountMap tells Kafka how many threads we will use in Consumer to consume the topic. The key of topicCountMap is topic name, and value is the number of threads for the topiccountmap.

Summary

On the whole, spring cloud stream has abstracted a part of itself, but one of the hard wounds is the unfriendly Spring. Cloud. Stream. InstanceIndex, which makes the service instance stateful. It is more troublesome to deploy based on docker, and it is not as good as the direct native api. If there are not many partition or if each consumer has strong performance, then at least two are deployed and the configuration is acceptable.

doc