[case45] Talking about Processing Guard of storm-kafka-client

  storm

Order

This article mainly studies the processing guard of storm-kafka-client

ProcessingGuarantee

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/KafkaSpoutConfig.java

    /**
     * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed,
     * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
     * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval.
     * NO_GUARANTEE may be removed in a later release without warning, we're still evaluating whether it makes sense to keep.
     */
    @InterfaceStability.Unstable
    public enum ProcessingGuarantee {
        /**
         * An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If a tuple fails or
         * times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits synchronously on the defined
         * interval.
         */
        AT_LEAST_ONCE,
        /**
         * Every offset will be synchronously committed to Kafka right after being polled but before being emitted to the downstream
         * components of the topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by
         * ensuring the spout won't retry tuples that fail or time out after the commit to Kafka has been done
         */
        AT_MOST_ONCE,
        /**
         * The polled offsets are ready to commit immediately after being polled. The offsets are committed periodically, i.e. a message may
         * be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but allows the
         * spout to control when commits occur. Commits asynchronously on the defined interval.
         */
        NO_GUARANTEE,
    }
  • Storm-kafka-client is different from the previous version of storm-kafka in that it introduces Processing Guard, which makes the whole code clearer.
  • Processing guarantee. at _ lease _ once is the version that opens ack. it is similar to the auto commit of kafka client, which commit periodically at the specified interval.
  • At _ most _ once, it synchronizes the commit (Ignore interval configuration), so the message is processed at most once.
  • Processing guarantee. no _ guarantee, this is also regardless of ack, but it is similar to processing guarantee. at _ lease _ once, specifying interval periodic commit, except that it is asynchronous commit

KafkaSpout.open

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/KafkaSpout.java

public class KafkaSpout<K, V> extends BaseRichSpout {

    //Initial delay for the commit and subscription refresh timers
    public static final long TIMER_DELAY_MS = 500;

    // timer == null only if the processing guarantee is at-most-once
    private transient Timer commitTimer;

    // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires,
    // or after a consumer rebalance, or during close/deactivate. Always empty if processing guarantee is none or at-most-once.
    private transient Map<TopicPartition, OffsetManager> offsetManagers;

    // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
    private transient Map<TopicPartition, List<ConsumerRecord<K, V>>> waitingToEmit;

    //......

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.context = context;

        // Spout internals
        this.collector = collector;

        // Offset management
        firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();

        // Retries management
        retryService = kafkaSpoutConfig.getRetryService();

        tupleListener = kafkaSpoutConfig.getTupleListener();

        if (kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
            // In at-most-once mode the offsets are committed after every poll, and not periodically as controlled by the timer
            commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
        }
        refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);

        offsetManagers = new HashMap<>();
        emitted = new HashSet<>();
        waitingToEmit = new HashMap<>();
        commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());

        tupleListener.open(conf, context);
        if (canRegisterMetrics()) {
            registerMetric();
        }

        LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
    }

    //......

}
  • When open, it is judged that if it is not processing guarantee. at _ most _ once, commitTimer will be initialized, and the period value is kafkspacoutconfig.getpartitionrefreshperiodms (). if it is not set, the default value is 2000 ms.

Timer.isExpiredResetOnTrue

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/internal/Timer.java

public class Timer {
    private final long delay;
    private final long period;
    private final TimeUnit timeUnit;
    private final long periodNanos;
    private long start;

    //......

    /**
     * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the
     * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset
     * (re-initiated) and a new cycle will start.
     *
     * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false
     * otherwise.
     */
    public boolean isExpiredResetOnTrue() {
        final boolean expired = Time.nanoTime() - start >= periodNanos;
        if (expired) {
            start = Time.nanoTime();
        }
        return expired;
    }
}
  • Timer has an important method, isExpiredResetOnTrue, which is used to judge whether the “scheduling time” has arrived. this is called in the nextTuple

KafkaSpout.nextTuple

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/KafkaSpout.java

    // ======== Next Tuple =======
    @Override
    public void nextTuple() {
        try {
            if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
                kafkaSpoutConfig.getSubscription().refreshAssignment();
            }

            if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
                if (isAtLeastOnceProcessing()) {
                    commitOffsetsForAckedTuples(kafkaConsumer.assignment());
                } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
                        createFetchedOffsetsMetadata(kafkaConsumer.assignment());
                    kafkaConsumer.commitAsync(offsetsToCommit, null);
                    LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
                }
            }

            PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
            if (pollablePartitionsInfo.shouldPoll()) {
                try {
                    setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
                } catch (RetriableException e) {
                    LOG.error("Failed to poll from kafka.", e);
                }
            }

            emitIfWaitingNotEmitted();
        } catch (InterruptException e) {
            throwKafkaConsumerInterruptedException();
        }
    }
  • NextTuple first judges whether to refresh subscription, then judges commitTimer and whether it should be submitted. here is calling committer.isexpiredresetontrue ()
  • If the ProcessingGuarantee type is NO_GUARANTEE, call createFetchedOffsetsMetadata to create offset and partition information to be submitted, and then call kafkaConsumer.commitAsync for asynchronous submission;
  • If the ProcessingGuarantee type is at _ lease _ once, commit by calling commitOffsetsForAckedTuples
  • After the offset submission is processed, the PollablePartitionsInfo is obtained through getPollablePartitionsInfo. if shouldPoll, the pollKafkaBroker pull data is called, and then the pulled data is put into waitingToEmit through setWaitingToEmit method.
  • Finally, call the emitIfWaitingNotEmitted method, emit or retry when there is data, and waiting through while loop when there is no data.

createFetchedOffsetsMetadata

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/KafkaSpout.java

    private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
        for (TopicPartition tp : assignedPartitions) {
            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata()));
        }
        return offsetsToCommit;
    }
  • According to the information of kafkaConsumer.assignment (), the offset position to be fetch in the next step is extracted through kafkaConsumer.position(tp), and the json string of COMMITTMETADATA is extracted as meta information through COMMITTETADATA MANAGER. GetCOMMITTETADATA ()

commitOffsetsForAckedTuples

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/KafkaSpout.java

    private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) {
        // Find offsets that are ready to be committed for every assigned topic partition
        final Map<TopicPartition, OffsetManager> assignedOffsetManagers = new HashMap<>();
        for (Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) {
            if (assignedPartitions.contains(entry.getKey())) {
                assignedOffsetManagers.put(entry.getKey(), entry.getValue());
            }
        }

        final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
        for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet()) {
            final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
            if (nextCommitOffset != null) {
                nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
            }
        }

        // Commit offsets that are ready to be committed for every topic partition
        if (!nextCommitOffsets.isEmpty()) {
            kafkaConsumer.commitSync(nextCommitOffsets);
            LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
            // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition
            // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop
            for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
                //Update the OffsetManager for each committed partition, and update numUncommittedOffsets
                final TopicPartition tp = tpOffset.getKey();
                long position = kafkaConsumer.position(tp);
                long committedOffset = tpOffset.getValue().offset();
                if (position < committedOffset) {
                    /*
                     * The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, lots of (more
                     * than max.poll.records) later messages were acked, and the failed message then gets acked. The consumer may only be
                     * part way through "catching up" to where it was when it went back to retry the failed tuple. Skip the consumer forward
                     * to the committed offset and drop the current waiting to emit list, since it'll likely contain committed offsets.
                     */
                    LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]",
                        position, committedOffset);
                    kafkaConsumer.seek(tp, committedOffset);
                    List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp);
                    if (waitingToEmitForTp != null) {
                        //Discard the pending records that are already committed
                        List<ConsumerRecord<K, V>> filteredRecords = new ArrayList<>();
                        for (ConsumerRecord<K, V> record : waitingToEmitForTp) {
                            if (record.offset() >= committedOffset) {
                                filteredRecords.add(record);
                            }
                        }
                        waitingToEmit.put(tp, filteredRecords);
                    }
                }

                final OffsetManager offsetManager = assignedOffsetManagers.get(tp);
                offsetManager.commit(tpOffset.getValue());
                LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp);
            }
        } else {
            LOG.trace("No offsets to commit. {}", this);
        }
    }
  • Here, firstly, the partition and msgId information of the waiting commit that has been ack are obtained through offsetManagers. if it is processing guard. at _ most _ once, the set is empty
  • After that, according to the commitMetadata, the offset of this batch of messages to be committed is obtained through OffsetManager. FindNextCommmitOffset.
  • Then call kafkaConsumer.commitSync to submit offset synchronously, and then update the committed related information of the local OffsetManager.

getPollablePartitionsInfo

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/KafkaSpout.java

   private PollablePartitionsInfo getPollablePartitionsInfo() {
        if (isWaitingToEmit()) {
            LOG.debug("Not polling. Tuples waiting to be emitted.");
            return new PollablePartitionsInfo(Collections.<TopicPartition>emptySet(), Collections.<TopicPartition, Long>emptyMap());
        }

        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        if (!isAtLeastOnceProcessing()) {
            return new PollablePartitionsInfo(assignment, Collections.<TopicPartition, Long>emptyMap());
        }

        Map<TopicPartition, Long> earliestRetriableOffsets = retryService.earliestRetriableOffsets();
        Set<TopicPartition> pollablePartitions = new HashSet<>();
        final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
        for (TopicPartition tp : assignment) {
            OffsetManager offsetManager = offsetManagers.get(tp);
            int numUncommittedOffsets = offsetManager.getNumUncommittedOffsets();
            if (numUncommittedOffsets < maxUncommittedOffsets) {
                //Allow poll if the partition is not at the maxUncommittedOffsets limit
                pollablePartitions.add(tp);
            } else {
                long offsetAtLimit = offsetManager.getNthUncommittedOffsetAfterCommittedOffset(maxUncommittedOffsets);
                Long earliestRetriableOffset = earliestRetriableOffsets.get(tp);
                if (earliestRetriableOffset != null && earliestRetriableOffset <= offsetAtLimit) {
                    //Allow poll if there are retriable tuples within the maxUncommittedOffsets limit
                    pollablePartitions.add(tp);
                } else {
                    LOG.debug("Not polling on partition [{}]. It has [{}] uncommitted offsets, which exceeds the limit of [{}]. ", tp,
                        numUncommittedOffsets, maxUncommittedOffsets);
                }
            }
        }
        return new PollablePartitionsInfo(pollablePartitions, earliestRetriableOffsets);
    }
  • For those not of processing guarantee. at _ lease _ once type, it is directly returned according to kafkaConsumer.assignment ()
  • If it is of type processing guarantee. at _ lease _ once, you will get retry service. earlierstretrieablesets () here to integrate offset information related to failure.
  • There is a MaxUncommitted DoffSets parameter, which will be retried when NumUncommitted DoffSets < MaxUncommitted DoffSets. If it is greater than or equal to MaxUncommitted DoffSets, it will be further judged. If it is earliestRetriableOffset less than or equal to offsetAtLimit, then retry will also be added.

pollKafkaBroker

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/KafkaSpout.java

    // ======== poll =========
    private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
        doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
        Set<TopicPartition> pausedPartitions = new HashSet<>(kafkaConsumer.assignment());
        Iterator<TopicPartition> pausedIter = pausedPartitions.iterator();
        while (pausedIter.hasNext()) {
            if (pollablePartitionsInfo.pollablePartitions.contains(pausedIter.next())) {
                pausedIter.remove();
            }
        }
        try {
            kafkaConsumer.pause(pausedPartitions);
            final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
            ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
            final int numPolledRecords = consumerRecords.count();
            LOG.debug("Polled [{}] records from Kafka",
                numPolledRecords);
            if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                //Commit polled records immediately to ensure delivery is at-most-once.
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
                    createFetchedOffsetsMetadata(kafkaConsumer.assignment());
                kafkaConsumer.commitSync(offsetsToCommit);
                LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
            }
            return consumerRecords;
        } finally {
            kafkaConsumer.resume(pausedPartitions);
        }
    }

    private void doSeekRetriableTopicPartitions(Map<TopicPartition, Long> pollableEarliestRetriableOffsets) {
        for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : pollableEarliestRetriableOffsets.entrySet()) {
            //Seek directly to the earliest retriable message for each retriable topic partition
            kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
        }
    }

    private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> earliestRetriableOffsets,
        ConsumerRecords<K, V> consumerRecords) {
        for (Entry<TopicPartition, Long> entry : earliestRetriableOffsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
            if (!records.isEmpty()) {
                ConsumerRecord<K, V> record = records.get(0);
                long seekOffset = entry.getValue();
                long earliestReceivedOffset = record.offset();
                if (seekOffset < earliestReceivedOffset) {
                    //Since we asked for tuples starting at seekOffset, some retriable records must have been compacted away.
                    //Ack up to the first offset received if the record is not already acked or currently in the topology
                    for (long i = seekOffset; i < earliestReceivedOffset; i++) {
                        KafkaSpoutMessageId msgId = retryService.getMessageId(new ConsumerRecord<>(tp.topic(), tp.partition(), i, null, null));
                        if (!offsetManagers.get(tp).contains(msgId) && !emitted.contains(msgId)) {
                            LOG.debug("Record at offset [{}] appears to have been compacted away from topic [{}], marking as acked", i, tp);
                            retryService.remove(msgId);
                            emitted.add(msgId);
                            ack(msgId);
                        }
                    }
                }
            }
        }
    }
  • If PollablePartitionsInfo’s pollablePartitions are not empty, pollKafkaBroker pull message will be called
  • Firstly, doSeekRetriableTopicPartitions are called, and the seek operation is carried out according to the parition and offset information to be retried, and each Paris is moved to the earliest offset position to be retried.
  • When pulling the message, pause the Paris that does not meet the conditions of maxUncommitted, etc. first, then carry out poll message. after pulling the message, poll judges that if it is of processing guarantee. at _ most _ once type, kafkaConsumer.commitSync is called to submit synchronously, and then returns the pulled record (Finally set to waitingToEmit), and finally resume the partitions of pause (By doing so, it is avoided to pull messages of partitions that do not meet the submission criteria.);
  • Note that the pollablePartitionsInfo here is obtained according to getPollablePartitionsInfo (), which traverses kafkaConsumer.assignment () filters according to relevant parameters such as offsetManager and maxUncommittedOffsets. Therefore, it can be considered that pololablepartitionsinfo.pololablepartitions is a subset of kafkaConsumer.assignment (). However, the pausedPartitions are obtained by filtering out PollablePartitions Sinfo. PollablePartitions according to kafkaConsumer.assignment (). Therefore, pausedpartitions are those in getpollablepartitionsInfo () that do not meet the conditions and are rejected. For these Partitions, pause before poll is called, and then resume, that is, Poll will not pull messages from pausedPartitions this time.
  • Another action after poll message is to call ackRETRIABLEOFFSETSIFCOPACTEDAWAY to process ACK for the message that has already been compacted.

emitIfWaitingNotEmitted

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/KafkaSpout.java

    private void emitIfWaitingNotEmitted() {
        Iterator<List<ConsumerRecord<K, V>>> waitingToEmitIter = waitingToEmit.values().iterator();
        outerLoop:
        while (waitingToEmitIter.hasNext()) {
            List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmitIter.next();
            while (!waitingToEmitForTp.isEmpty()) {
                final boolean emittedTuple = emitOrRetryTuple(waitingToEmitForTp.remove(0));
                if (emittedTuple) {
                    break outerLoop;
                }
            }
            waitingToEmitIter.remove();
        }
    }
  • EmitIfWaitingNotEmitted is mainly used to judge whether waitingToEmit has data or not. if yes, it will be taken out to trigger emitOrRetryTuple. if no, it will continue to waiting a loop.

emitOrRetryTuple

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/KafkaSpout.java

    /**
     * Creates a tuple from the kafka record and emits it if it was never emitted or it is ready to be retried.
     *
     * @param record to be emitted
     * @return true if tuple was emitted. False if tuple has been acked or has been emitted and is pending ack or fail
     */
    private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
        final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
        final KafkaSpoutMessageId msgId = retryService.getMessageId(record);

        if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) {   // has been acked
            LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
        } else if (emitted.contains(msgId)) {   // has been emitted and it is pending ack or fail
            LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
        } else {
            final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
            if (isAtLeastOnceProcessing()
                && committedOffset != null 
                && committedOffset.offset() > record.offset()
                && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {
                // Ensures that after a topology with this id is started, the consumer fetch
                // position never falls behind the committed offset (STORM-2844)
                throw new IllegalStateException("Attempting to emit a message that has already been committed."
                    + " This should never occur when using the at-least-once processing guarantee.");
            }

            final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
            if (isEmitTuple(tuple)) {
                final boolean isScheduled = retryService.isScheduled(msgId);
                // not scheduled <=> never failed (i.e. never emitted), or scheduled and ready to be retried
                if (!isScheduled || retryService.isReady(msgId)) {
                    final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID;

                    if (!isAtLeastOnceProcessing()) {
                        if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
                            collector.emit(stream, tuple, msgId);
                            LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
                        } else {
                            collector.emit(stream, tuple);
                            LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
                        }
                    } else {
                        emitted.add(msgId);
                        offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
                        if (isScheduled) {  // Was scheduled for retry and re-emitted, so remove from schedule.
                            retryService.remove(msgId);
                        }
                        collector.emit(stream, tuple, msgId);
                        tupleListener.onEmit(tuple, msgId);
                        LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
                    }
                    return true;
                }
            } else {
                /*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately
                * to allow its offset to be commited to Kafka*/
                LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
                if (isAtLeastOnceProcessing()) {
                    msgId.setNullTuple(true);
                    offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
                    ack(msgId);
                }
            }
        }
        return false;
    }
  • EmitOrRetryTuple is the core of the whole nextTuple, which includes emit operation and retry operation.
  • Because the message for fail is pulled again using the seek method, offsetManagers (Acked Waiting for commit) and emitted (Emit has waited for ack.) to re-judge, if both are not included, only emit or retry
  • When performing emit processing, first determine whether it is a failed retry through retryService.isScheduled(msgId). if it is not a failed retry, or if it is a failed retry and has expired, then the following emit processing is performed
  • For the processing guarantee. at _ leader _ once type, here you need to maintain emit and offsetManagers, then perform emit operation and call back the tuplelistener. onemit (tuple, msgid) method; If it is not of type processing guarantee. at _ lease _ once, it is only the collector.emit operation.

KafkaSpout.ack

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/KafkaSpout.java

   // ======== Ack =======
    @Override
    public void ack(Object messageId) {
        if (!isAtLeastOnceProcessing()) {
            return;
        }

        // Only need to keep track of acked tuples if commits to Kafka are controlled by
        // tuple acks, which happens only for at-least-once processing semantics
        final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;

        if (msgId.isNullTuple()) {
            //a null tuple should be added to the ack list since by definition is a direct ack
            offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
            LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
            tupleListener.onAck(msgId);
            return;
        }

        if (!emitted.contains(msgId)) {
            LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
                        + "came from a topic-partition that this consumer group instance is no longer tracking "
                        + "due to rebalance/partition reassignment. No action taken.", msgId);
        } else {
            Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
                        + " This should never occur barring errors in the RetryService implementation or the spout code.");
            offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
            emitted.remove(msgId);
        }
        tupleListener.onAck(msgId);
    }
  • When ack, if it is not the processing guard. AT _ LEAST _ ONCE type, it will immediately return
  • After that, the msgId that has been acked is put into the map of offsetManagers, waiting to commit in the nextTuple, and then it is removed from the emitted.
  • Here is an emitted judgment. If it is not emit’s judgment, it will not be processed. This is usually caused by Rebalance/Partition Creation

KafkaSpout.fail

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/KafkaSpout.java

   // ======== Fail =======
    @Override
    public void fail(Object messageId) {
        if (!isAtLeastOnceProcessing()) {
            return;
        }
        // Only need to keep track of failed tuples if commits to Kafka are controlled by
        // tuple acks, which happens only for at-least-once processing semantics
        final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
        if (!emitted.contains(msgId)) {
            LOG.debug("Received fail for tuple this spout is no longer tracking."
                + " Partitions may have been reassigned. Ignoring message [{}]", msgId);
            return;
        }
        Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed."
            + " This should never occur barring errors in the RetryService implementation or the spout code.");

        msgId.incrementNumFails();

        if (!retryService.schedule(msgId)) {
            LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
            // this tuple should be removed from emitted only inside the ack() method. This is to ensure
            // that the OffsetManager for that TopicPartition is updated and allows commit progression
            tupleListener.onMaxRetryReached(msgId);
            ack(msgId);
        } else {
            tupleListener.onRetry(msgId);
            emitted.remove(msgId);
        }
    }
  • Fail also judges first, if it is not the processing guarantee. at _ lease _ once type, it will return immediately
  • Then, it is determined whether there is any emitted, and if not, it is returned immediately, which is usually caused by partition created.
  • Fail, call retryService.schedule(msgId), if not successful, trigger tuplelistener.onmaxretryreacted, and then ack; ; If successful, call the tupleListener.onRetry callback, and then delete it from emitted.

KafkaSpoutRetryExponentialBackoff.schedule

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java

    private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();

    //This class assumes that there is at most one retry schedule per message id in this set at a time.
    private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);

    /**
     * Comparator ordering by timestamp 
     */
    private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
        @Override
        public int compare(RetrySchedule entry1, RetrySchedule entry2) {
            int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
            
            if(result == 0) {
                //TreeSet uses compareTo instead of equals() for the Set contract
                //Ensure that we can save two retry schedules with the same timestamp
                result = entry1.hashCode() - entry2.hashCode();
            }
            return result;
        }
    }

    @Override
    public boolean schedule(KafkaSpoutMessageId msgId) {
        if (msgId.numFails() > maxRetries) {
            LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
            return false;
        } else {
            //Remove existing schedule for the message id
            remove(msgId);
            final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
            retrySchedules.add(retrySchedule);
            toRetryMsgs.add(msgId);
            LOG.debug("Scheduled. {}", retrySchedule);
            LOG.trace("Current state {}", retrySchedules);
            return true;
        }
    }

    @Override
    public Map<TopicPartition, Long> earliestRetriableOffsets() {
        final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>();
        final long currentTimeNanos = Time.nanoTime();
        for (RetrySchedule retrySchedule : retrySchedules) {
            if (retrySchedule.retry(currentTimeNanos)) {
                final KafkaSpoutMessageId msgId = retrySchedule.msgId;
                final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition());
                final Long currentLowestOffset = tpToEarliestRetriableOffset.get(tpForMessage);
                if(currentLowestOffset != null) {
                    tpToEarliestRetriableOffset.put(tpForMessage, Math.min(currentLowestOffset, msgId.offset()));
                } else {
                    tpToEarliestRetriableOffset.put(tpForMessage, msgId.offset());
                }
            } else {
                break;  // Stop searching as soon as passed current time
            }
        }
        LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset);
        return tpToEarliestRetriableOffset;
    }

    @Override
    public boolean isReady(KafkaSpoutMessageId msgId) {
        boolean retry = false;
        if (isScheduled(msgId)) {
            final long currentTimeNanos = Time.nanoTime();
            for (RetrySchedule retrySchedule : retrySchedules) {
                if (retrySchedule.retry(currentTimeNanos)) {
                    if (retrySchedule.msgId.equals(msgId)) {
                        retry = true;
                        LOG.debug("Found entry to retry {}", retrySchedule);
                        break; //Stop searching if the message is known to be ready for retry
                    }
                } else {
                    LOG.debug("Entry to retry not found {}", retrySchedule);
                    break;  // Stop searching as soon as passed current time
                }
            }
        }
        return retry;
    }
  • Schedule first determines whether the number of failures exceeds maxRetries, and returns false if it exceeds, indicating that it is no longer scheduled. then KafkaSpout calls back the tuplelistener. onmaxretryreacted method in the fail method, and then makes ack, indicating that it is no longer processed.
  • If maxRetries is not exceeded, create retrySchedules information and add it to retry schedules; RetrySchedules is a TreeSet, which uses RetryEntryTimeStampComparator by default and sorts according to nextRetryTimeNanos, and hashCode if equal.
  • EarliestRetriableOffsets and isReady all use the information of retrySchedules.

Summary

  • Storm-kafka-client is mainly aimed at kafka version 0.10 and above. it introduces ProcessingGuarantee enumeration, which has three values, namely

    • At _ leader _ once is the version that opens ack, which is similar to the auto commit of kafka client and commit; periodically at the specified interval. It will maintain the status that has been emitted (Emitted but not yet acknowledged),offsetManagers(Acknowledged but uncommitted) and retrySchedules that fail needs to be retried.
    • At _ most _ once, it synchronizes the commit (Ignore interval configuration), so the message is processed at most once.
    • Processing guarantee. no _ guarantee, this is also regardless of ack, but it is similar to processing guarantee. at _ lease _ once, specifying interval periodic commit (All depend on commitTimer.), except that it is asynchronous
  • Processing guarantee. at _ lease _ once it combines spout’s ack mechanism and maintains emit (Emitted but not yet acknowledged); After the fail method places msgId into the retryService for retry (This is not available in processing guarantee.no _ guarantee); Like processing guarantee. no _ guarantee, it relies on commitTimer to submit offset information during initerval. the difference is that it is commitSync, i.e. synchronous submission, and it submits acked messages. However, processing guarantee. no _ guarantee is submitted asynchronously, and the submitted offset is based on consumer’s poll regardless of whether an ack has been made at stormsport or not.
  • Processing guarantee.at _ most _ once is in the pollKafkaBroker method. after calling kafkaConsumer.poll, kafkaConsumer.commit; Sync is called to submit the commit synchronously. It is a synchronous commit and does not depend on commitTimer, i.e. it is not an interval commit offset.
  • When processing guarantee. no _ guarantee judges that a commit is required in the nextTuple, kafkaConsumer.commitAsync is called to commit asynchronously. it relies on commitTimer just like processing guarantee. at _ leader _ once. Offset is committed during initerval, but it is asynchronous, while processing guarantee. at _ lease _ once is synchronous
  • The nextTuple () method will call the kafkaConsumer.poll party farad to retrieve the message, then put the retrieved message into waitingToEmit, and then call the emitIfWaitingNotEmitted method to emit or wait, if emit is to call the emitOrRetryTuple method; Since pollKafkaBroker will perform the seek operation to move the offset to the smallest position in the failed offset in each Paris, and pull the message again from that position, the pull message calls the kafkaConsumer.poll method, where kafkaConsumer.commitSync submits the offset synchronously. Due to the inclusion of the message to retry, emitOrRetryTuple is required to use the offsetManagers (Acknowledged waiting for commit) and emitted (Emit has waited for ack.) to determine whether it is necessary to call collector.emit;; For processing guarantee. at _ lease _ once type, not only emit method is called here, but also offsetManagers, emitted and retry information related states need to be maintained, and then the tupleListener.onEmit method is called back. For non-processing guarantee.at _ lease _ once types, this is only emit.

doc