Talk about the state of storm trident.

  storm

Order

This article mainly studies the state of storm trident.

StateType

storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/state/StateType.java

public enum StateType {
    NON_TRANSACTIONAL,
    TRANSACTIONAL,
    OPAQUE
}
  • There are three types of StateType: NON_TRANSACTIONAL non-TRANSACTIONAL, transactional, OPAQUE
  • There are also three types of corresponding spout, non-transactional, transactional and opaque transactional.

State

storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/state/State.java

/**
 * There's 3 different kinds of state:
 *
 * 1. non-transactional: ignores commits, updates are permanent. no rollback. a cassandra incrementing state would be like this 2.
 * repeat-transactional: idempotent as long as all batches for a txid are identical 3. opaque-transactional: the most general kind of state.
 * updates are always done based on the previous version of the value if the current commit = latest stored commit Idempotent even if the
 * batch for a txid can change.
 *
 * repeat transactional is idempotent for transactional spouts opaque transactional is idempotent for opaque or transactional spouts
 *
 * Trident should log warnings when state is idempotent but updates will not be idempotent because of spout
 */
// retrieving is encapsulated in Retrieval interface
public interface State {
    void beginCommit(Long txid); // can be null for things like partitionPersist occuring off a DRPC stream

    void commit(Long txid);
}
  • Non-transactional, Ignore commits, updates are persistent, No rollback, cassandra’s incrementing state is of this type; At-most or at-least once semantics
  • Repeat-transactional, short for transactional, requires that the txid of the same batch is always the same regardless of whether it is replayed or not, and the tuple in the same batch is also unchanged. A tuple belongs to only one batch and there is no overlap between each batch. For state update, replay can skip if it encounters the same txid. Less state is needed in the database, but the fault tolerance is poor, ensuring the semantics of exactly once.
  • Opaque-transactional, or opaque for short, is a widely used class. Its fault tolerance is stronger than transactional. It does not require a tuple to always be in the same batch/txid, that is to say, it allows a tuple to fail in this batch, but it is successfully processed in other batches, but it can ensure that each tuple is successfully processed only once in a certain batch. OpaqueTridentkafkaSpout is an implementation of this type, which can tolerate the error of Kafka node loss. For state update, replay encounters the same txid and needs to overwrite it with the current value based on prevValue. More space is needed to store the state in the d atabase, but fault tolerance is good, ensuring the semantics of exactly once.

MapState

storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/state/map/MapState.java

public interface MapState<T> extends ReadOnlyMapState<T> {
    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);

    void multiPut(List<List<Object>> keys, List<T> vals);
}
  • MapState inherits the ReadOnlyMapState interface, while ReadOnlyMapState inherits the State interface.
  • Here, we will mainly analyze several implementation classes of MapState.

NonTransactionalMap

storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/state/map/NonTransactionalMap.java

public class NonTransactionalMap<T> implements MapState<T> {
    IBackingMap<T> _backing;

    protected NonTransactionalMap(IBackingMap<T> backing) {
        _backing = backing;
    }

    public static <T> MapState<T> build(IBackingMap<T> backing) {
        return new NonTransactionalMap<T>(backing);
    }

    @Override
    public List<T> multiGet(List<List<Object>> keys) {
        return _backing.multiGet(keys);
    }

    @Override
    public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
        List<T> curr = _backing.multiGet(keys);
        List<T> ret = new ArrayList<T>(curr.size());
        for (int i = 0; i < curr.size(); i++) {
            T currVal = curr.get(i);
            ValueUpdater<T> updater = updaters.get(i);
            ret.add(updater.update(currVal));
        }
        _backing.multiPut(keys, ret);
        return ret;
    }

    @Override
    public void multiPut(List<List<Object>> keys, List<T> vals) {
        _backing.multiPut(keys, vals);
    }

    @Override
    public void beginCommit(Long txid) {
    }

    @Override
    public void commit(Long txid) {
    }
}
  • NonTransactionalMap wraps IBackingMap, beginCommit and commit methods do nothing
  • The multiUpdate method constructs List<T> ret, and then implements it using multiPut of IBackingMap.

TransactionalMap

storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/state/map/TransactionalMap.java

public class TransactionalMap<T> implements MapState<T> {
    CachedBatchReadsMap<TransactionalValue> _backing;
    Long _currTx;

    protected TransactionalMap(IBackingMap<TransactionalValue> backing) {
        _backing = new CachedBatchReadsMap(backing);
    }

    public static <T> MapState<T> build(IBackingMap<TransactionalValue> backing) {
        return new TransactionalMap<T>(backing);
    }

    @Override
    public List<T> multiGet(List<List<Object>> keys) {
        List<CachedBatchReadsMap.RetVal<TransactionalValue>> vals = _backing.multiGet(keys);
        List<T> ret = new ArrayList<T>(vals.size());
        for (CachedBatchReadsMap.RetVal<TransactionalValue> retval : vals) {
            TransactionalValue v = retval.val;
            if (v != null) {
                ret.add((T) v.getVal());
            } else {
                ret.add(null);
            }
        }
        return ret;
    }

    @Override
    public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
        List<CachedBatchReadsMap.RetVal<TransactionalValue>> curr = _backing.multiGet(keys);
        List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(curr.size());
        List<List<Object>> newKeys = new ArrayList();
        List<T> ret = new ArrayList<T>();
        for (int i = 0; i < curr.size(); i++) {
            CachedBatchReadsMap.RetVal<TransactionalValue> retval = curr.get(i);
            TransactionalValue<T> val = retval.val;
            ValueUpdater<T> updater = updaters.get(i);
            TransactionalValue<T> newVal;
            boolean changed = false;
            if (val == null) {
                newVal = new TransactionalValue<T>(_currTx, updater.update(null));
                changed = true;
            } else {
                if (_currTx != null && _currTx.equals(val.getTxid()) && !retval.cached) {
                    newVal = val;
                } else {
                    newVal = new TransactionalValue<T>(_currTx, updater.update(val.getVal()));
                    changed = true;
                }
            }
            ret.add(newVal.getVal());
            if (changed) {
                newVals.add(newVal);
                newKeys.add(keys.get(i));
            }
        }
        if (!newKeys.isEmpty()) {
            _backing.multiPut(newKeys, newVals);
        }
        return ret;
    }

    @Override
    public void multiPut(List<List<Object>> keys, List<T> vals) {
        List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(vals.size());
        for (T val : vals) {
            newVals.add(new TransactionalValue<T>(_currTx, val));
        }
        _backing.multiPut(keys, newVals);
    }

    @Override
    public void beginCommit(Long txid) {
        _currTx = txid;
        _backing.reset();
    }

    @Override
    public void commit(Long txid) {
        _currTx = null;
        _backing.reset();
    }
}
  • TransactionalMap adopts cached batchreadsmap < TransactionalValue >. generics here use transactional value. beginCommit sets the current txid, resets _backing, resets txid when committing, and then resets _backing
  • If the value of _currTx already exists and this value! retval.cached(That is, it is not multiPut in this transaction.), the value will not be updated (skip the update), using newVal = val
  • The multiPut method constructs a batch of TransactionalValue, and then uses the CachedBatchReadsMap. MULTIPUT (List < List < Object > > Keys, List < T > Values) method, which updates the value to the cache

OpaqueMap

storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/state/map/OpaqueMap.java

public class OpaqueMap<T> implements MapState<T> {
    CachedBatchReadsMap<OpaqueValue> _backing;
    Long _currTx;

    protected OpaqueMap(IBackingMap<OpaqueValue> backing) {
        _backing = new CachedBatchReadsMap(backing);
    }

    public static <T> MapState<T> build(IBackingMap<OpaqueValue> backing) {
        return new OpaqueMap<T>(backing);
    }

    @Override
    public List<T> multiGet(List<List<Object>> keys) {
        List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys);
        List<T> ret = new ArrayList<T>(curr.size());
        for (CachedBatchReadsMap.RetVal<OpaqueValue> retval : curr) {
            OpaqueValue val = retval.val;
            if (val != null) {
                if (retval.cached) {
                    ret.add((T) val.getCurr());
                } else {
                    ret.add((T) val.get(_currTx));
                }
            } else {
                ret.add(null);
            }
        }
        return ret;
    }

    @Override
    public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
        List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys);
        List<OpaqueValue> newVals = new ArrayList<OpaqueValue>(curr.size());
        List<T> ret = new ArrayList<T>();
        for (int i = 0; i < curr.size(); i++) {
            CachedBatchReadsMap.RetVal<OpaqueValue> retval = curr.get(i);
            OpaqueValue<T> val = retval.val;
            ValueUpdater<T> updater = updaters.get(i);
            T prev;
            if (val == null) {
                prev = null;
            } else {
                if (retval.cached) {
                    prev = val.getCurr();
                } else {
                    prev = val.get(_currTx);
                }
            }
            T newVal = updater.update(prev);
            ret.add(newVal);
            OpaqueValue<T> newOpaqueVal;
            if (val == null) {
                newOpaqueVal = new OpaqueValue<T>(_currTx, newVal);
            } else {
                newOpaqueVal = val.update(_currTx, newVal);
            }
            newVals.add(newOpaqueVal);
        }
        _backing.multiPut(keys, newVals);
        return ret;
    }

    @Override
    public void multiPut(List<List<Object>> keys, List<T> vals) {
        List<ValueUpdater> updaters = new ArrayList<ValueUpdater>(vals.size());
        for (T val : vals) {
            updaters.add(new ReplaceUpdater<T>(val));
        }
        multiUpdate(keys, updaters);
    }

    @Override
    public void beginCommit(Long txid) {
        _currTx = txid;
        _backing.reset();
    }

    @Override
    public void commit(Long txid) {
        _currTx = null;
        _backing.reset();
    }

    static class ReplaceUpdater<T> implements ValueUpdater<T> {
        T _t;

        public ReplaceUpdater(T t) {
            _t = t;
        }

        @Override
        public T update(Object stored) {
            return _t;
        }
    }
}
  • OpaqueMap adopts cachedbatchreadsmap < OpaqueValue >. generics here use opaquevalue. beginCommit sets the current txid, resets _backing, resets txid when commit, and then resets _backing
  • Different from TransactionalMap, when multiPut is used here, a ReplaceUpdater is used, and then multiUpdate is called to force overwrite.
  • Different from TransactionalMap, multiUpdate is based on prev value to calculate newVal.

Summary

  • Trident updates state strictly in the order of batch. For example, batch with txid 3 cannot be processed until batch with txid 2 is processed.
  • State is divided into three types, namely non-transactional, transactional and opaque transactional, and the corresponding spout is also of these three types.

    • Non-transactional cannot guarantee exactly once, it may be at-least once or at-mostonce; Its state calculation refers to NonTransactionalMap, and there is no processing for beginCommit and commit operations.
    • Transactional type can guarantee exactly once, but the requirements are relatively strict. the txid and tuple of the same batch are still the same when replayed, so the fault tolerance is relatively poor. however, its state calculation is relatively simple. refer to TransactionalMap, if you encounter the same txid value, skip is enough.
    • Opaque transactional type can also ensure exactly once. It allows a tuple to be processed in other batch after it fails, so it has good fault tolerance. However, the state calculation needs to store more prev values, refer to OpaqueMap, encounter the same txid value, and use prev value to overwrite the current value.
  • Trident will ensure that the calculation of the statefor exactly once is packaged. when using, it is enough to pass in the corresponding StateFactory in persistentAggregate. factory that support multiple StateTypes can choose to use the statetyp e attribute to construct different transactional state by passing in different parameters. You can also customize the StateFactory by implementing sta te factory, and you can also customize the stateQuery query by inheriting BaseQueryFunction. if you customize the update, you can inherit BaseStateUpdater and pass it in through partitionPersist.

doc