Talk about spring for kafka’s Packaging and Integration of producer

  kafka

Order

This article mainly analyzes the encapsulation and integration of spring for apache kafka to the native kafka client producer.

Producer factory

spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/core/DefaultKafkaProducerFactory.java

public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, Lifecycle, DisposableBean {
    @Override
    public void destroy() throws Exception { //NOSONAR
        CloseSafeProducer<K, V> producer = this.producer;
        this.producer = null;
        if (producer != null) {
            producer.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
        }
    }


    @Override
    public void start() {
        this.running = true;
    }


    @Override
    public void stop() {
        try {
            destroy();
        }
        catch (Exception e) {
            logger.error("Exception while stopping producer", e);
        }
    }


    @Override
    public boolean isRunning() {
        return this.running;
    }

    @Override
    public Producer<K, V> createProducer() {
        if (this.producer == null) {
            synchronized (this) {
                if (this.producer == null) {
                    this.producer = new CloseSafeProducer<K, V>(createKafkaProducer());
                }
            }
        }
        return this.producer;
    }
}

The first step in integrating spring is to integrate into the spring container hosting, and then start and destroy normally following the life cycle of the spring container. CloseSafeProducer was created here, and its actual operation was entrusted to kafka producer.

KafkaTemplate

spring-kafka-1.2.3.RELEASE-sources.jar! /org/springframework/kafka/core/KafkaTemplate.java
The following interfaces are implemented

public interface KafkaOperations<K, V> {

    /**
     * Send the data to the default topic with no key or partition.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> sendDefault(V data);

    /**
     * Send the data to the default topic with the provided key and no partition.
     * @param key the key.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

    /**
     * Send the data to the default topic with the provided key and partition.
     * @param partition the partition.
     * @param key the key.
     * @param data the data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> sendDefault(int partition, K key, V data);

    /**
     * Send the data to the provided topic with no key or partition.
     * @param topic the topic.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> send(String topic, V data);

    /**
     * Send the data to the provided topic with the provided key and no partition.
     * @param topic the topic.
     * @param key the key.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

    /**
     * Send the data to the provided topic with the provided partition and no key.
     * @param topic the topic.
     * @param partition the partition.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data);

    /**
     * Send the data to the provided topic with the provided key and partition.
     * @param topic the topic.
     * @param partition the partition.
     * @param key the key.
     * @param data the data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data);

    /**
     * Send a message with routing information in message headers. The message payload
     * may be converted before sending.
     * @param message the message to send.
     * @return a Future for the {@link SendResult}.
     * @see org.springframework.kafka.support.KafkaHeaders#TOPIC
     * @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID
     * @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY
     */
    ListenableFuture<SendResult<K, V>> send(Message<?> message);

    /**
     * See {@link Producer#partitionsFor(String)}.
     * @param topic the topic.
     * @return the partition info.
     * @since 1.1
     */
    List<PartitionInfo> partitionsFor(String topic);

    /**
     * See {@link Producer#metrics()}.
     * @return the metrics.
     * @since 1.1
     */
    Map<MetricName, ? extends Metric> metrics();

    /**
     * Execute some arbitrary operation(s) on the producer and return the result.
     * @param callback the callback.
     * @param <T> the result type.
     * @return the result.
     * @since 1.1
     */
    <T> T execute(ProducerCallback<K, V, T> callback);

    /**
     * Flush the producer.
     */
    void flush();

    /**
     * A callback for executing arbitrary operations on the {@link Producer}.
     * @param <K> the key type.
     * @param <V> the value type.
     * @param <T> the return type.
     * @since 1.1
     */
    interface ProducerCallback<K, V, T> {

        T doInKafka(Producer<K, V> producer);

    }

}

The main send method is as follows, which is where spring’s main packaging for producer is:

/**
     * Send the producer record.
     * @param producerRecord the producer record.
     * @return a Future for the {@link RecordMetadata}.
     */
    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        final Producer<K, V> producer = getTheProducer();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sending: " + producerRecord);
        }
        final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
        producer.send(producerRecord, new Callback() {

            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                try {
                    if (exception == null) {
                        future.set(new SendResult<>(producerRecord, metadata));
                        if (KafkaTemplate.this.producerListener != null
                                && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
                            KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
                                    producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
                        }
                    }
                    else {
                        future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
                        if (KafkaTemplate.this.producerListener != null) {
                            KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
                                    producerRecord.partition(),
                                    producerRecord.key(),
                                    producerRecord.value(),
                                    exception);
                        }
                    }
                }
                finally {
                    producer.close();
                }
            }

        });
        if (this.autoFlush) {
            flush();
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sent: " + producerRecord);
        }
        return future;
    }

Don’t be misled by CloseSafeProducer’s close method, it is an empty method.

  • Spring returns SettableListenableFuture with SendResult inside after wrapping the send method one layer.
  • Then the exception is also wrapped once, which is KafkaException defined by spring.
  • Listener supported, synchronous call
  • Built-in MessagingMessageConverter