This article briefly introduces some attribute configurations of Spring-Cloud-Stream-Binder-Kafka.
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> <version>1.0.3.RELEASE</version> </dependency>
spring: cloud: stream: instanceIndex: 0 ##支持环境变量INSTANCE_INDEX ## The instance index of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka instanceCount: 1 ## The number of deployed instances of an application. Must be set for partitioning and if using Kafka. ## used to partition data across different consumers.
Topic can be logically considered a queue. Each consumption must specify its topic, which can be simply understood as indicating which queue to put this message into. In order to enable the throughput of Kafka to expand horizontally, topic is physically divided into one or more partitions, each partition physically corresponds to a folder, under which all messages and index files of this partition are stored. The PARTITION naming rule is topic name+ordered sequence number. The first PAR TITION sequence number starts from 0, and the maximum value of sequence number is the number of partitions minus 1.
Messages in the same partition can only be consumed by one consumer in the same group.
When the number of consumers exceeds the number of partition, the surplus consumers are idle.
When the number of consumers is less than or equal to the number of partitions, there will be a situation where multiple partitions correspond to one consumer, and individual consumers will consume more than others.
InstanceCount is mainly used by consumers and is generally less than or equal to the number of partitions in topic. It is mainly used for consumer partitions.
spring: cloud: stream: bindings: output: destination: event-demo content-type: text/plain #group: test ##consumer属性 #producer: #consumer:
spring: cloud: stream: bindings: output: destination: event-demo content-type: text/plain producer: partitionCount: 1 headerMode partitionKeyExtractorClass: org.springframework.cloud.stream.partitioning.CustomPartitionKeyExtractorClass partitionSelectorClass: org.springframework.cloud.stream.partitioning.CustomPartitionSelectorClass headerMode: raw
- Kafka producer extended properties
spring: cloud: stream: bindings: output: destination: event-demo content-type: text/plain producer: bufferSize: 16384 maxRequestSize: 1048576 sync: true batchTimeout: 0
spring: cloud: stream: bindings: input: destination: event-demo content-type: text/plain consumer: concurrency: 1 ## The concurrency of the inbound consumer. partitioned: false ## Whether the consumer receives data from a partitioned producer.Default: false. headerMode: raw
- Kafka consumer extended attribute
spring: cloud: stream: bindings: input: destination: event-demo content-type: text/plain consumer: autoCommitOffset: false resetOffsets: true startOffset: earliest enableDlq: false recoveryInterval: 5000
ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, consumerCount); Map<String, List<KafkaStream<byte, byte>>> consumerMap = consumerConnector .createMessageStreams(topicCountMap);
Here the head topicCountMap tells Kafka how many threads we will use in Consumer to consume the topic. The key of topicCountMap is topic name, and value is the number of threads for the topiccountmap.
On the whole, spring cloud stream has abstracted a part of itself, but one of the hard wounds is the unfriendly Spring. Cloud. Stream. InstanceIndex, which makes the service instance stateful. It is more troublesome to deploy based on docker, and it is not as good as the direct native api. If there are not many partition or if each consumer has strong performance, then at least two are deployed and the configuration is acceptable.