Kafka0.10client use case

  kafka

Order

This article mainly talks about how to simply use kafka0.10 client to send and receive messages.

maven

    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

If you are using log4j, you may not use exclude.

producer

    @Test
    public void send(){
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,broker);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 1000000; i++) {
            ProducerRecord record = new ProducerRecord<String, String>(topic, Integer.toString(i),
                    Integer.toString(i));
            producer.send(record, new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(metadata != null) {
                        System.out.printf("Send record partition:%d, offset:%d, keysize:%d, valuesize:%d %n",
                                metadata.partition(), metadata.offset(), metadata.serializedKeySize(),
                                metadata.serializedValueSize());
                    }
                    if(exception != null) {
                        exception.printStackTrace();
                    }
                }

            });
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }

consume

    @Test
    public void receive(){
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        try{
            consumer.subscribe(Arrays.asList(topic));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(10000);
                records.forEach(record -> {
                    System.out.printf("client : %s , topic: %s , partition: %d , offset = %d, key = %s, value = %s%n", clientId, record.topic(),
                            record.partition(), record.offset(), record.key(), record.value());
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }

As you can see, unlike version 0.8, topicCountMap is no longer needed.
This client transparently handles the failure of Kafka brokers, and transparently adapts as topic partitions it fetches migrate within the cluster.
The consumer is not thread-safe.

Consumer multithreading scheme

  • It is convenient to start multiple consumer application instances under the scenario of using docker and kubernetes.
  • In a single application instance, there are multiple KafkaConsumer instances.
  • Single Application Instance, Single KafkaConsumer Instance, Multithread/Asynchronous Consumption Message

Personally, I prefer the first scheme. There are as many partition in the topic as there are examples of consumer applications.
For those with large throughput and need to speed up the processing of consumption, then add the third scheme.

doc