Order
This article mainly talks about how to simply use kafka0.10 client to send and receive messages.
maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
If you are using log4j, you may not use exclude.
producer
@Test
public void send(){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,broker);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 1000000; i++) {
ProducerRecord record = new ProducerRecord<String, String>(topic, Integer.toString(i),
Integer.toString(i));
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(metadata != null) {
System.out.printf("Send record partition:%d, offset:%d, keysize:%d, valuesize:%d %n",
metadata.partition(), metadata.offset(), metadata.serializedKeySize(),
metadata.serializedValueSize());
}
if(exception != null) {
exception.printStackTrace();
}
}
});
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}
consume
@Test
public void receive(){
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try{
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10000);
records.forEach(record -> {
System.out.printf("client : %s , topic: %s , partition: %d , offset = %d, key = %s, value = %s%n", clientId, record.topic(),
record.partition(), record.offset(), record.key(), record.value());
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
consumer.close();
}
}
As you can see, unlike version 0.8, topicCountMap is no longer needed.
This client transparently handles the failure of Kafka brokers, and transparently adapts as topic partitions it fetches migrate within the cluster.
The consumer is not thread-safe.
Consumer multithreading scheme
- It is convenient to start multiple consumer application instances under the scenario of using docker and kubernetes.
- In a single application instance, there are multiple KafkaConsumer instances.
- Single Application Instance, Single KafkaConsumer Instance, Multithread/Asynchronous Consumption Message
Personally, I prefer the first scheme. There are as many partition in the topic as there are examples of consumer applications.
For those with large throughput and need to speed up the processing of consumption, then add the third scheme.
doc
- kafka-01020-document
- [Original] Kafka Consumer Multithread Instance
- Summarize kafka’s treatment plan under the condition of low consumer capacity.
- [Original] Discuss kafka’s Partition Number and Multi-thread Consumption
- Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client