Order
This article briefly introduces some attribute configurations of Spring-Cloud-Stream-Binder-Kafka.
maven
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>1.0.3.RELEASE</version>
</dependency>
Stream attribute
spring-cloud-stream-1.0.3.RELEASE-sources.jar! /org/springframework/cloud/stream/config/ChannelBindingServiceProperties.java
spring:
cloud:
stream:
instanceIndex: 0 ##支持环境变量INSTANCE_INDEX
## The instance index of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka
instanceCount: 1 ## The number of deployed instances of an application. Must be set for partitioning and if using Kafka.
## used to partition data across different consumers.
Topic can be logically considered a queue. Each consumption must specify its topic, which can be simply understood as indicating which queue to put this message into. In order to enable the throughput of Kafka to expand horizontally, topic is physically divided into one or more partitions, each partition physically corresponds to a folder, under which all messages and index files of this partition are stored. The PARTITION naming rule is topic name+ordered sequence number. The first PAR TITION sequence number starts from 0, and the maximum value of sequence number is the number of partitions minus 1.
Messages in the same partition can only be consumed by one consumer in the same group.
When the number of consumers exceeds the number of partition, the surplus consumers are idle.
When the number of consumers is less than or equal to the number of partitions, there will be a situation where multiple partitions correspond to one consumer, and individual consumers will consume more than others.
InstanceCount is mainly used by consumers and is generally less than or equal to the number of partitions in topic. It is mainly used for consumer partitions.
Bingdings property
spring-cloud-stream-1.0.3.RELEASE-sources.jar! /org/springframework/cloud/stream/config/BindingProperties.java
spring:
cloud:
stream:
bindings:
output:
destination: event-demo
content-type: text/plain
#group: test ##consumer属性
#producer:
#consumer:
producer
spring-cloud-stream-1.0.3.RELEASE-sources.jar! /org/springframework/cloud/stream/binder/ProducerProperties.java
spring:
cloud:
stream:
bindings:
output:
destination: event-demo
content-type: text/plain
producer:
partitionCount: 1
headerMode
partitionKeyExtractorClass: org.springframework.cloud.stream.partitioning.CustomPartitionKeyExtractorClass
partitionSelectorClass: org.springframework.cloud.stream.partitioning.CustomPartitionSelectorClass
headerMode: raw
- Kafka producer extended properties
spring-cloud-stream-binder-kafka-1.0.3.RELEASE-sources.jar! /org/springframework/cloud/stream/binder/kafka/KafkaProducerProperties.java
spring:
cloud:
stream:
bindings:
output:
destination: event-demo
content-type: text/plain
producer:
bufferSize: 16384
maxRequestSize: 1048576
sync: true
batchTimeout: 0
consumer
spring-cloud-stream-1.0.3.RELEASE-sources.jar! /org/springframework/cloud/stream/binder/ConsumerProperties.java
spring:
cloud:
stream:
bindings:
input:
destination: event-demo
content-type: text/plain
consumer:
concurrency: 1 ## The concurrency of the inbound consumer.
partitioned: false ## Whether the consumer receives data from a partitioned producer.Default: false.
headerMode: raw
- Kafka consumer extended attribute
spring-cloud-stream-binder-kafka-1.0.3.RELEASE-sources.jar! /org/springframework/cloud/stream/binder/kafka/KafkaConsumerProperties.java
spring:
cloud:
stream:
bindings:
input:
destination: event-demo
content-type: text/plain
consumer:
autoCommitOffset: false
resetOffsets: true
startOffset: earliest
enableDlq: false
recoveryInterval: 5000
Native apis
ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props);
ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, consumerCount);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
Here the head topicCountMap tells Kafka how many threads we will use in Consumer to consume the topic. The key of topicCountMap is topic name, and value is the number of threads for the topiccountmap.
Summary
On the whole, spring cloud stream has abstracted a part of itself, but one of the hard wounds is the unfriendly Spring. Cloud. Stream. InstanceIndex, which makes the service instance stateful. It is more troublesome to deploy based on docker, and it is not as good as the direct native api. If there are not many partition or if each consumer has strong performance, then at least two are deployed and the configuration is acceptable.