Talk about storm’s opaquepartitionedtrindentspouteexecutor

  storm

Order

This article mainly studies storm’s opaquepartitionedtridentspouteexecutor.

TridentTopology.newStream

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/TridentTopology.java

    public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
        return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
    }
  • The TridentTopology.newStream method uses opaquepartitionedtridentate cutter for spout of type iopaquepartitionedtridentate. While KafkaTridentSpoutOp aque implements the IOpaquePartitionedTridentSpout interface

TridentTopologyBuilder.buildTopology

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/topology/TridentTopologyBuilder.java

    public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
        TopologyBuilder builder = new TopologyBuilder();
        Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
        Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);

        Map<String, List<String>> batchesToCommitIds = new HashMap<>();
        Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
        
        for(String id: _spouts.keySet()) {
            TransactionalSpoutComponent c = _spouts.get(id);
            if(c.spout instanceof IRichSpout) {
                
                //TODO: wrap this to set the stream name
                builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
            } else {
                String batchGroup = c.batchGroupId;
                if(!batchesToCommitIds.containsKey(batchGroup)) {
                    batchesToCommitIds.put(batchGroup, new ArrayList<String>());
                }
                batchesToCommitIds.get(batchGroup).add(c.commitStateId);

                if(!batchesToSpouts.containsKey(batchGroup)) {
                    batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
                }
                batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);
                
                
                BoltDeclarer scd =
                      builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                
                for(Map<String, Object> m: c.componentConfs) {
                    scd.addConfigurations(m);
                }
                
                Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
                specs.put(c.batchGroupId, new CoordSpec());
                BoltDeclarer bd = builder.setBolt(id,
                        new TridentBoltExecutor(
                          new TridentSpoutExecutor(
                            c.commitStateId,
                            c.streamName,
                            ((ITridentSpout) c.spout)),
                            batchIdsForSpouts,
                            specs),
                        c.parallelism);
                bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
                bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                if(c.spout instanceof ICommitterTridentSpout) {
                    bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
                }
                for(Map<String, Object> m: c.componentConfs) {
                    bd.addConfigurations(m);
                }
            }
        }

        //......

        return builder.createTopology();
    }
  • Build topology converts IOpaquePartitionedTridentSpout (OpaquePartitionedTridentSpoutExecutor) Wrap it in TridentSpoutExecutor, and then wrap it in bolt using TridentBoltExecutor.

OpaquePartitionedTridentSpoutExecutor

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout<Object> {
    protected final Logger LOG = LoggerFactory.getLogger(OpaquePartitionedTridentSpoutExecutor.class);

    IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> _spout;
    
    //......

    public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> spout) {
        _spout = spout;
    }
    
    @Override
    public ITridentSpout.BatchCoordinator<Object> getCoordinator(String txStateId, Map conf, TopologyContext context) {
        return new Coordinator(conf, context);
    }

    @Override
    public ICommitterTridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new Emitter(txStateId, conf, context);
    }

    @Override
    public Fields getOutputFields() {
        return _spout.getOutputFields();
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return _spout.getComponentConfiguration();
    }
    
}
  • OpaquePartitionedTrident Executor has implemented ICommitterTridentSpout, where getCoordinator returns ITridentSpout.BatchCoordinator and getEmitter returns ICommitterTridentSpout.Emitter

ITridentSpout.BatchCoordinator

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

    public class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
        IOpaquePartitionedTridentSpout.Coordinator _coordinator;

        public Coordinator(Map conf, TopologyContext context) {
            _coordinator = _spout.getCoordinator(conf, context);
        }
        
        @Override
        public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
            LOG.debug("Initialize Transaction. [txid = {}], [prevMetadata = {}], [currMetadata = {}]", txid, prevMetadata, currMetadata);
            return _coordinator.getPartitionsForBatch();
        }


        @Override
        public void close() {
            LOG.debug("Closing");
            _coordinator.close();
            LOG.debug("Closed");
        }

        @Override
        public void success(long txid) {
            LOG.debug("Success [txid = {}]", txid);
        }

        @Override
        public boolean isReady(long txid) {
            boolean ready = _coordinator.isReady(txid);
            LOG.debug("[isReady = {}], [txid = {}]", ready, txid);
            return ready;
        }
    }
  • The _coordinator of spout is wrapped, and its type IOPAQUEPARTITIONEDTRIDENTSOUT. COORDINATOR, here is just more debug logs.

ICommitterTridentSpout.Emitter

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

   public class Emitter implements ICommitterTridentSpout.Emitter {        
        IOpaquePartitionedTridentSpout.Emitter<Object, ISpoutPartition, Object> _emitter;
        TransactionalState _state;
        TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<>();
        Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
        int _index;
        int _numTasks;

        public Emitter(String txStateId, Map conf, TopologyContext context) {
            _emitter = _spout.getEmitter(conf, context);
            _index = context.getThisTaskIndex();
            _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
            _state = TransactionalState.newUserState(conf, txStateId);
            LOG.debug("Created {}", this);
        }

        Object _savedCoordinatorMeta = null;
        boolean _changedMeta = false;

        @Override
        public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
            LOG.debug("Emitting Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]",
                    tx, coordinatorMeta, collector, this);

            if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
                _partitionStates.clear();
                final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta);
                for (ISpoutPartition partition : taskPartitions) {
                    _partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition));
                }

                // refresh all partitions for backwards compatibility with old spout
                _emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
                _savedCoordinatorMeta = coordinatorMeta;
                _changedMeta = true;
            }
            Map<String, Object> metas = new HashMap<>();
            _cachedMetas.put(tx.getTransactionId(), metas);

            Entry<Long, Map<String, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
            Map<String, Object> prevCached;
            if(entry!=null) {
                prevCached = entry.getValue();
            } else {
                prevCached = new HashMap<>();
            }
            
            for(Entry<String, EmitterPartitionState> e: _partitionStates.entrySet()) {
                String id = e.getKey();
                EmitterPartitionState s = e.getValue();
                s.rotatingState.removeState(tx.getTransactionId());
                Object lastMeta = prevCached.get(id);
                if(lastMeta==null) lastMeta = s.rotatingState.getLastState();
                Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta);
                metas.put(id, meta);
            }
            LOG.debug("Emitted Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]",
                    tx, coordinatorMeta, collector, this);
        }

        @Override
        public void success(TransactionAttempt tx) {
            for(EmitterPartitionState state: _partitionStates.values()) {
                state.rotatingState.cleanupBefore(tx.getTransactionId());
            }
            LOG.debug("Success transaction {}. [{}]", tx, this);
        }

        @Override
        public void commit(TransactionAttempt attempt) {
            LOG.debug("Committing transaction {}. [{}]", attempt, this);
            // this code here handles a case where a previous commit failed, and the partitions
            // changed since the last commit. This clears out any state for the removed partitions
            // for this txid.
            // we make sure only a single task ever does this. we're also guaranteed that
            // it's impossible for there to be another writer to the directory for that partition
            // because only a single commit can be happening at once. this is because in order for 
            // another attempt of the batch to commit, the batch phase must have succeeded in between.
            // hence, all tasks for the prior commit must have finished committing (whether successfully or not)
            if(_changedMeta && _index==0) {
                Set<String> validIds = new HashSet<>();
                for(ISpoutPartition p: _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
                    validIds.add(p.getId());
                }
                for(String existingPartition: _state.list("")) {
                    if(!validIds.contains(existingPartition)) {
                        RotatingTransactionalState s = new RotatingTransactionalState(_state, existingPartition);
                        s.removeState(attempt.getTransactionId());
                    }
                }
                _changedMeta = false;
            }
            
            Long txid = attempt.getTransactionId();
            Map<String, Object> metas = _cachedMetas.remove(txid);
            for(Entry<String, Object> entry: metas.entrySet()) {
                _partitionStates.get(entry.getKey()).rotatingState.overrideState(txid, entry.getValue());
            }
            LOG.debug("Exiting commit method for transaction {}. [{}]", attempt, this);
        }

        @Override
        public void close() {
            LOG.debug("Closing");
            _emitter.close();
            LOG.debug("Closed");
        }

        @Override
        public String toString() {
            return "Emitter{" +
                    ", _state=" + _state +
                    ", _cachedMetas=" + _cachedMetas +
                    ", _partitionStates=" + _partitionStates +
                    ", _index=" + _index +
                    ", _numTasks=" + _numTasks +
                    ", _savedCoordinatorMeta=" + _savedCoordinatorMeta +
                    ", _changedMeta=" + _changedMeta +
                    '}';
        }
    }

    static class EmitterPartitionState {
        public RotatingTransactionalState rotatingState;
        public ISpoutPartition partition;
        
        public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) {
            rotatingState = s;
            partition = p;
        }
    }
  • The IopaquePartitionedTrident.emitter of spout is encapsulated here, and _partitionStates uses EmitterPartitionState.
  • EmitBatch method calculates _partitionStates first, then prevCached, and finally calls _ emitter.emitpartitionbatch (tx, collector, s.partition, lastmeta)
  • The success method calls state.rotatingstate.cleanupbefore (tx.gettransactionid ()) to clear the state information before the txid; The commit method mainly updates _partitionStates

KafkaTridentSpoutOpaque

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java

public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<Map<String, Object>>,
        KafkaTridentSpoutTopicPartition, Map<String, Object>> {
    private static final long serialVersionUID = -8003272486566259640L;

    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);

    private final KafkaTridentSpoutManager<K, V> kafkaManager;

    public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
        this(new KafkaTridentSpoutManager<>(conf));
    }
    
    public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) {
        this.kafkaManager = kafkaManager;
        LOG.debug("Created {}", this.toString());
    }

    @Override
    public Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>> getEmitter(
            Map conf, TopologyContext context) {
        return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
    }

    @Override
    public Coordinator<List<Map<String, Object>>> getCoordinator(Map conf, TopologyContext context) {
        return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        final Fields outputFields = kafkaManager.getFields();
        LOG.debug("OutputFields = {}", outputFields);
        return outputFields;
    }

    @Override
    public final String toString() {
        return super.toString() +
                "{kafkaManager=" + kafkaManager + '}';
    }
}
  • The getCoordinator of KafkaTridentSpoutOpaque returned KafkatIdentsPoutpaqueCoordinator. GetEmitter returned KafkaTridentSpoutEmitter.

KafkaTridentSpoutOpaqueCoordinator

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java

public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
        Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class);

    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
    private final KafkaTridentSpoutManager<K,V> kafkaManager;

    public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager<K, V> kafkaManager) {
        this.kafkaManager = kafkaManager;
        LOG.debug("Created {}", this.toString());
    }

    @Override
    public boolean isReady(long txid) {
        LOG.debug("isReady = true");
        return true;    // the "old" trident kafka spout always returns true, like this
    }

    @Override
    public List<Map<String, Object>> getPartitionsForBatch() {
        final ArrayList<TopicPartition> topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions());
        LOG.debug("TopicPartitions for batch {}", topicPartitions);
        List<Map<String, Object>> tps = new ArrayList<>();
        for(TopicPartition tp : topicPartitions) {
            tps.add(tpSerializer.toMap(tp));
        }
        return tps;
    }

    @Override
    public void close() {
        LOG.debug("Closed"); // the "old" trident kafka spout is no op like this
    }

    @Override
    public final String toString() {
        return super.toString() +
                "{kafkaManager=" + kafkaManager +
                '}';
    }
}
  • IsReady here always returns true. The getPartitionsForBatch method mainly converts the kafkamanager. getTopicPartitions () information into a map structure

KafkaTridentSpoutEmitter

storm-kafka-client-1.2.2-sources.jar! /org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java

public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<
        List<Map<String, Object>>,
        KafkaTridentSpoutTopicPartition,
        Map<String, Object>>,
        Serializable {

    private static final long serialVersionUID = -7343927794834130435L;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);

    // Kafka
    private final KafkaConsumer<K, V> kafkaConsumer;

    // Bookkeeping
    private final KafkaTridentSpoutManager<K, V> kafkaManager;
    // set of topic-partitions for which first poll has already occurred, and the first polled txid
    private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>(); 

    // Declare some KafkaTridentSpoutManager references for convenience
    private final long pollTimeoutMs;
    private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
    private final RecordTranslator<K, V> translator;
    private final Timer refreshSubscriptionTimer;
    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();

    private TopologyContext topologyContext;

    /**
     * Create a new Kafka spout emitter.
     * @param kafkaManager The Kafka consumer manager to use
     * @param topologyContext The topology context
     * @param refreshSubscriptionTimer The timer for deciding when to recheck the subscription
     */
    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
        this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
        this.kafkaManager = kafkaManager;
        this.topologyContext = topologyContext;
        this.refreshSubscriptionTimer = refreshSubscriptionTimer;
        this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();

        final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
        this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
        this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
        LOG.debug("Created {}", this.toString());
    }

    /**
     * Creates instance of this class with default 500 millisecond refresh subscription timer
     */
    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) {
        this(kafkaManager, topologyContext, new Timer(500,
                kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS));
    }

    //......

    @Override
    public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
            KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {

        LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
                tx, currBatchPartition, lastBatch, collector);

        final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
        final Set<TopicPartition> assignments = kafkaConsumer.assignment();
        KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
        KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
        Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();

        if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) {
            LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
                            "[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " +
                            "of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments,
                    kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
        } else {
            try {
                // pause other topic-partitions to only poll from current topic-partition
                pausedTopicPartitions = pauseTopicPartitions(currBatchTp);

                seek(currBatchTp, lastBatchMeta, tx.getTransactionId());

                // poll
                if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
                    kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
                }

                final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
                LOG.debug("Polled [{}] records from Kafka.", records.count());

                if (!records.isEmpty()) {
                    emitTuples(collector, records);
                    // build new metadata
                    currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records);
                }
            } finally {
                kafkaConsumer.resume(pausedTopicPartitions);
                LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
            }
            LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
                    "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
        }

        return currentBatch == null ? null : currentBatch.toMap();
    }

    private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) {
        for (ConsumerRecord<K, V> record : records) {
            final List<Object> tuple = translator.apply(record);
            collector.emit(tuple);
            LOG.debug("Emitted tuple {} for record [{}]", tuple, record);
        }
    }

    @Override
    public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
        LOG.trace("Refreshing of topic-partitions handled by Kafka. " +
                "No action taken by this method for topic partitions {}", partitionResponsibilities);
    }

    /**
     * Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions
     * for this task must be assigned to the Kafka consumer running on this task.
     *
     * @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator}
     * @return ordered list of topic partitions for this task
     */
    @Override
    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) {
        List<TopicPartition> allTopicPartitions = new ArrayList<>();
        for(Map<String, Object> map : allPartitionInfo) {
            allTopicPartitions.add(tpSerializer.fromMap(map));
        }
        final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions);
        LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
                allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
        return allPartitions;
    }

    @Override
    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks,
        List<Map<String, Object>> allPartitionInfo) {
        final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
        LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
        final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);
        LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId);
        return taskTps;
    }

    @Override
    public void close() {
        kafkaConsumer.close();
        LOG.debug("Closed");
    }

    @Override
    public final String toString() {
        return super.toString() +
                "{kafkaManager=" + kafkaManager +
                '}';
    }
}
  • The interval of refreshSubscriptionTimer here is kafkamanager.getkafksoutconfig (). getpartitionrefreshperiodms (), the default is 2000
  • The emitPartitionBatch method will determine refreshscriptiontimer.isexpiredresetontrue () every time it is not called. if the time is up, kafkamanager.getkafkaspersoutconfig (). getsubscription (). refreshassignment () will be called to refresh the assignment.
  • EmitPartitionBatch method is mainly to find the parition associated with the batch, stop pulling messages from other parities, and then call kafkaConsumer’s seek-related method seek to the specified location according to firstPollOffsetStrategy and lastBatchMeta information
  • After that, kafka consumer.poll (politimeoutms) is used to pull the data, and then emitTuples; ; EmitTuples method will use translator to convert the data, and then call collector.emit to send it out.
  • The refreshPartitions method is currently only a trace log. The getOrderedPartitions method deserializes the data of allPartitionInfo from the map str ucture, and then converts it to KafkaAtrIdentsPouttToPartitionReturn. The getPartitionsForTask method is mainly returned by converting the information of kaf kaConsumer.assignment () into KafkaAtridentsPouttToPartitions

Summary

  • Storm-kafka-client has provided KafkaTridentSpoutOpaque, the sport, as trident’s kafka spout (The old version is OpaqueTridentKafkaSpout, which is in storm-kafka class library.), which implements the IOpaquePartitionedTridentSpout interface
  • The TridentTopology.newStream method uses opaquepartitionedtridentate cutter for spout of type iopaquepartitionedtridentate. Build topology converts IOpaquePartitionedTridentSpout (OpaquePartitionedTridentSpoutExecutor) Wrap it in TridentSpoutExecutor first, and then wrap it in bolt using TridentBoltExecutor.
  • The getCoordinator of opaquepartitionedtridentspouteexecutor returns ITridentSpout.BatchCoordinator, and getEmitter returns icommittertridentsport.emitter; They respectively packaged and reprocessed the KafkaTridentspoutOpaque, the KafkatIdentsPoutpaqueCoordinator returned by the original Spout, and the KafkaTridentSpoutEmitter. Among them, debug log is added to coordinator, and access to emitter is mainly increased.

doc