Order
This article mainly studies storm’s maxSpoutPending
TOPOLOGY_MAX_SPOUT_PENDING
storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java
/**
* The maximum number of tuples that can be pending on a spout task at any given time. This config applies to individual tasks, not to
* spouts or topologies as a whole.
*
* A pending tuple is one that has been emitted from a spout but has not been acked or failed yet. Note that this config parameter has
* no effect for unreliable spouts that don't tag their tuples with a message id.
*/
@isInteger
@isPositiveNumber
public static final String TOPOLOGY_MAX_SPOUT_PENDING = "topology.max.spout.pending";
- Topology _ max _ sport _ pending sets the maximum number of tuples that a spout task has emit waiting for ack. this configuration is only valid for transmitting reliable tuples (
Set msgId
) is at work. - The default configuration of topology.max.spout.pending in the defaults.yaml file is null.
SpoutExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
public void init(final ArrayList<Task> idToTask, int idToTaskBase) {
this.threadId = Thread.currentThread().getId();
executorTransfer.initLocalRecvQueues();
while (!stormActive.get()) {
Utils.sleep(100);
}
LOG.info("Opening spout {}:{}", componentId, taskIds);
this.idToTask = idToTask;
this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
//......
}
public Callable<Long> call() throws Exception {
init(idToTask, idToTaskBase);
return new Callable<Long>() {
final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
int recvqCheckSkips = 0;
int swIdleCount = 0; // counter for spout wait strategy
int bpIdleCount = 0; // counter for back pressure wait strategy
int rmspCount = 0;
@Override
public Long call() throws Exception {
int receiveCount = 0;
if (recvqCheckSkips++ == recvqCheckSkipCountMax) {
receiveCount = receiveQueue.consume(SpoutExecutor.this);
recvqCheckSkips = 0;
}
long currCount = emittedCount.get();
boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
boolean isActive = stormActive.get();
if (!isActive) {
inactiveExecute();
return 0L;
}
if (!lastActive.get()) {
lastActive.set(true);
activateSpouts();
}
boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
boolean noEmits = true;
long emptyStretch = 0;
if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators.
spouts.get(j).nextTuple();
}
noEmits = (currCount == emittedCount.get());
if (noEmits) {
emptyEmitStreak.increment();
} else {
emptyStretch = emptyEmitStreak.get();
emptyEmitStreak.set(0);
}
}
if (reachedMaxSpoutPending) {
if (rmspCount == 0) {
LOG.debug("Reached max spout pending");
}
rmspCount++;
} else {
if (rmspCount > 0) {
LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount);
}
rmspCount = 0;
}
if (receiveCount > 1) {
// continue without idling
return 0L;
}
if (!pendingEmits.isEmpty()) { // then facing backpressure
backPressureWaitStrategy();
return 0L;
}
bpIdleCount = 0;
if (noEmits) {
spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);
return 0L;
}
swIdleCount = 0;
return 0L;
}
private void backPressureWaitStrategy() throws InterruptedException {
long start = Time.currentTimeMillis();
if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop
LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");
}
bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start);
}
private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException {
emptyEmitStreak.increment();
long start = Time.currentTimeMillis();
swIdleCount = spoutWaitStrategy.idle(swIdleCount);
if (reachedMaxSpoutPending) {
spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
} else {
if (emptyStretch > 0) {
LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);
}
}
}
// returns true if pendingEmits is empty
private boolean tryFlushPendingEmits() {
for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
if (executorTransfer.tryTransfer(t, null)) {
pendingEmits.poll();
} else { // to avoid reordering of emits, stop at first failure
return false;
}
}
return true;
}
};
}
- Here, config.topology _ max _ sport _ pending is read from topoConf. if it cannot be read, 0 is taken and multiplied by the number of task, which is maxSpoutPending.
- MaxSpoutPending controls the reachedMaxSpoutPending variable in the call method, only! ReachedMaxSportPending & & PendingEmitsSempty can only execute nextTuple transmit data
- Note here that ReachedMaxSpoutPending = (MaxSpoutPending! = 0)&(pending.size () > = maxsportpending), which calculates the pending size
SpoutOutputCollectorImpl
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@Override
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
try {
return sendSpoutMsg(streamId, tuple, messageId, null);
} catch (InterruptedException e) {
LOG.warn("Spout thread interrupted during emit().");
throw new RuntimeException(e);
}
}
@Override
public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
try {
sendSpoutMsg(streamId, tuple, messageId, taskId);
} catch (InterruptedException e) {
LOG.warn("Spout thread interrupted during emitDirect().");
throw new RuntimeException(e);
}
}
private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws
InterruptedException {
emittedCount.increment();
List<Integer> outTasks;
if (outTaskId != null) {
outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
} else {
outTasks = taskData.getOutgoingTasks(stream, values);
}
final boolean needAck = (messageId != null) && hasAckers;
final List<Long> ackSeq = needAck ? new ArrayList<>() : null;
final long rootId = needAck ? MessageId.generateId(random) : 0;
for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators.
Integer t = outTasks.get(i);
MessageId msgId;
if (needAck) {
long as = MessageId.generateId(random);
msgId = MessageId.makeRootId(rootId, as);
ackSeq.add(as);
} else {
msgId = MessageId.makeUnanchored();
}
final TupleImpl tuple =
new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId);
AddressedTuple adrTuple = new AddressedTuple(t, tuple);
executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits());
}
if (isEventLoggers) {
taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits());
}
if (needAck) {
boolean sample = executor.samplerCheck();
TupleInfo info = new TupleInfo();
info.setTaskId(this.taskId);
info.setStream(stream);
info.setMessageId(messageId);
if (isDebug) {
info.setValues(values);
}
if (sample) {
info.setTimestamp(System.currentTimeMillis());
}
pending.put(rootId, info);
List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
} else if (messageId != null) {
// Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical
if (isDebug) {
if (spoutExecutorThdId != Thread.currentThread().getId()) {
throw new RuntimeException("Detected background thread emitting tuples for the spout. " +
"Spout Output Collector should only emit from the main spout executor thread.");
}
}
globalTupleInfo.clear();
globalTupleInfo.setStream(stream);
globalTupleInfo.setValues(values);
globalTupleInfo.setMessageId(messageId);
globalTupleInfo.setTimestamp(0);
globalTupleInfo.setId("0:");
Long timeDelta = 0L;
executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo);
}
return outTasks;
}
- Emit and emitDirect methods of this collector both call sendSpoutMsg at the end.
- SendSpoutMsg here is a judgment needAck = (messageId! = null) && hasAckers; If there is no messageId or acker, needAck is false.
- If needAck is false, no data will be added to the pending queue, so SpoutExecutor’s ReachedMaxSpoutPending = (MaxSpoutPending! = 0)&(pending.size () > = maxsportpending). since pending.size >= maxSpoutPending is not established and reachedMaxSpoutPending is false, the maxsportpending mechanism will not be triggered.
MasterBatchCoordinator
storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_throttler = new WindowedTimeThrottler((Number) conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
for (String spoutId : _managedSpoutIds) {
_states.add(TransactionalState.newCoordinatorState(conf, spoutId));
}
_currTransaction = getStoredCurrTransaction();
_collector = collector;
Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
if (active == null) {
_maxTransactionActive = 1;
} else {
_maxTransactionActive = active.intValue();
}
_attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);
for (int i = 0; i < _spouts.size(); i++) {
String txId = _managedSpoutIds.get(i);
_coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
}
LOG.debug("Opened {}", this);
}
private void sync() {
// note that sometimes the tuples active may be less than max_spout_pending, e.g.
// max_spout_pending = 3
// tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
// and there won't be a batch for tx 4 because there's max_spout_pending tx active
TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) {
maybeCommit.status = AttemptStatus.COMMITTING;
_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
}
if (_active) {
if (_activeTx.size() < _maxTransactionActive) {
Long curr = _currTransaction;
for (int i = 0; i < _maxTransactionActive; i++) {
if (!_activeTx.containsKey(curr) && isReady(curr)) {
// by using a monotonically increasing attempt id, downstream tasks
// can be memory efficient by clearing out state for old attempts
// as soon as they see a higher attempt id for a transaction
Integer attemptId = _attemptIds.get(curr);
if (attemptId == null) {
attemptId = 0;
} else {
attemptId++;
}
_attemptIds.put(curr, attemptId);
for (TransactionalState state : _states) {
state.setData(CURRENT_ATTEMPTS, _attemptIds);
}
TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
_activeTx.put(curr, newTransactionStatus);
_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt,
newTransactionStatus, this);
_throttler.markEvent();
}
curr = nextTransactionId(curr);
}
}
}
}
- The open method of MasterBatchCoordinator reads config.topology _ max _ sport _ pending setting from conf to _maxTransactionActive. if null, it defaults to 1
- Only _ activetx.size () < _ maxtransactionactive will transmit data to BATCH_STREAM_ID here
Summary
- Config.TOPOLOGY_MAX_SPOUT_PENDING(
topology.max.spout.pending
), the default is null, only for open reliable (msgId
) spout of the message works; For normal spout, if SpoutOutputCollectorImpl judges that ack is not turned on, it will not add data to the pending queue, so REACHDED MAXSPOUTPING is false, which will not trigger the maxSpoutPending mechanism. However, for trident’s spout, the default is to use transactionattempt.gettransactionid () as the batchId and track by transaction. - For normal spout, it refers to the maximum number of waiting ack, beyond which SpoutExecutor will not call spout’s nextTuple to transmit data.
- For trident’s spout, it refers to the number of batches that are processed at the same time. Only after these batches are processed successfully or fail can the next batch be continued.