Kafka0.8 Producer Exception Handling



This article briefly analyzes exception handling of java producer in Kafka version kafka0.8.2.2.



Kafka’s java producer is sent asynchronously, mainly in several steps:

  • Append to RecordAccumulator
  • Sender takes out RecordBatch from RecordAccumulator and sends it to client for sending.
  • NetworkClient deals with broker and sends RecordBatch out.

This involves several steps of exceptions. When appending, exceptions will be thrown, while ApiException will be put into callback, and other exceptions will be thrown directly (Callback is only the layer dealing with RecordAccumulator.)

The run method in sender captures log directly.

When dealing specifically with network, the request fails (Network Link Fails or broker Returns Exception) will be re-queued based on the number of retries.


kafka-clients-! /org/apache/kafka/clients/producer/KafkaProducer.java

public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) {
        try {
            // first make sure the metadata for the topic is available
            waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer");
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer");
            ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
            int partition = partitioner.partition(serializedRecord, metadata.fetch());
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
            TopicPartition tp = new TopicPartition(record.topic(), partition);
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            return result.future;
            // Handling exceptions and record the errors;
            // For API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            return new FutureFailure(e);
        } catch (InterruptedException e) {
            throw new KafkaException(e);
        } catch (KafkaException e) {
            throw e;

Handling exceptions and record the errors; For API exceptions return them in the future, for other exceptions throw directly.


kafka-clients-! /org/apache/kafka/clients/producer/internals/Sender.java

  • Run method for thread
     * The main run loop for the sender thread
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {
            try {
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) {
            try {
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);


        log.debug("Shutdown of Kafka producer I/O thread has completed.");
  • Run(long) method
     * Run a single iteration of sending
     * @param now The current POSIX time in milliseconds
    public void run(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (result.unknownLeadersExist)

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));

        // create produce requests
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        List<ClientRequest> requests = createProduceRequests(batches, now);

        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);
        for (ClientResponse response : responses) {
            if (response.wasDisconnected())
                handleDisconnect(response, now);
                handleResponse(response, now);


     * Complete or retry the given batch of records.
     * @param batch The record batch
     * @param error The error (or null if none)
     * @param baseOffset The base offset assigned to the records if successful
     * @param correlationId The correlation id for the request
     * @param now The current POSIX time stamp in milliseconds
    private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) {
        if (error != Errors.NONE && canRetry(batch, error)) {
            // retry
            log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                     this.retries - batch.attempts - 1,
            this.accumulator.reenqueue(batch, now);
            this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
        } else {
            // tell the user the result of their request
            batch.done(baseOffset, error.exception());
            if (error != Errors.NONE)
                this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
        if (error.exception() instanceof InvalidMetadataException)

For the case with error, reenqueue the entire batch.
HandleDisconnect and handleResponse both call this method.

  • handleDisconnect
private void handleDisconnect(ClientResponse response, long now) {
        log.trace("Cancelled request {} due to node {} being disconnected", response, response.request().request().destination());
        int correlation = response.request().request().header().correlationId();
        Map<TopicPartition, RecordBatch> responseBatches = (Map<TopicPartition, RecordBatch>) response.request().attachment();
        for (RecordBatch batch : responseBatches.values())
            completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now);
  • handleResponse
     * Handle a produce response
    private void handleResponse(ClientResponse response, long now) {
        int correlationId = response.request().request().header().correlationId();
        log.trace("Received produce response from node {} with correlation id {}",
        Map<TopicPartition, RecordBatch> batches = (Map<TopicPartition, RecordBatch>) response.request().attachment();
        // if we have a response, parse it
        if (response.hasResponse()) {
            ProduceResponse produceResponse = new ProduceResponse(response.responseBody());
            for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
                TopicPartition tp = entry.getKey();
                ProduceResponse.PartitionResponse partResp = entry.getValue();
                Errors error = Errors.forCode(partResp.errorCode);
                RecordBatch batch = batches.get(tp);
                completeBatch(batch, error, partResp.baseOffset, correlationId, now);
            this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
        } else {
            // this is the acks = 0 case, just complete all requests
            for (RecordBatch batch : batches.values())
                completeBatch(batch, Errors.NONE, -1L, correlationId, now);


kafka-clients-! /org/apache/kafka/clients/NetworkClient.java

public List<ClientResponse> poll(List<ClientRequest> requests, long timeout, long now) {
        List<NetworkSend> sends = new ArrayList<NetworkSend>();

        for (int i = 0; i < requests.size(); i++) {
            ClientRequest request = requests.get(i);
            int nodeId = request.request().destination();
            if (!isSendable(nodeId))
                throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");


        // should we update our metadata?
        long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
        long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
        long waitForMetadataFetch = (this.metadataFetchInProgress ? Integer.MAX_VALUE : 0);
        // if there is no node available to connect, back off refreshing metadata
        long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch);
        if (!this.metadataFetchInProgress && metadataTimeout == 0)
            maybeUpdateMetadata(sends, now);

        // do the I/O
        try {
            this.selector.poll(Math.min(timeout, metadataTimeout), sends);
        } catch (IOException e) {
            log.error("Unexpected error during I/O in producer network thread", e);

        List<ClientResponse> responses = new ArrayList<ClientResponse>();
        handleCompletedSends(responses, now);
        handleCompletedReceives(responses, now);
        handleDisconnections(responses, now);

        return responses;