Talk about sentinel’s StatisticSlot

  sentinel

Order

This article mainly studies sentinel’s StatisticSlot.

StatisticSlot

com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java

public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
        throws Throwable {

        try {
            fireEntry(context, resourceWrapper, node, count, args);
            node.increaseThreadNum();
            node.addPassRequest();

            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest();
            }

        } catch (BlockException e) {
            context.getCurEntry().setError(e);

            // Add block count.
            node.increaseBlockedQps();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockedQps();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseBlockedQps();
            }

            throw e;
        } catch (Throwable e) {
            context.getCurEntry().setError(e);

            // Should not happen
            node.increaseExceptionQps();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseExceptionQps();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseExceptionQps();
            }
            throw e;
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        DefaultNode node = (DefaultNode)context.getCurNode();

        if (context.getCurEntry().getError() == null) {
            long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
            if (rt > Constants.TIME_DROP_VALVE) {
                rt = Constants.TIME_DROP_VALVE;
            }

            node.rt(rt);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().rt(rt);
            }

            node.decreaseThreadNum();

            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().decreaseThreadNum();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.rt(rt);
                Constants.ENTRY_NODE.decreaseThreadNum();
            }
        } else {
            // error may happen
            // node.rt(-2);
        }

        fireExit(context, resourceWrapper, count);
    }

}
  • Entry and exit methods have been rewritten.
  • Entry is to call node’s increaseThreadNum and addPassRequest; when it passes successfully. If not, increaseBlockedQps; is called; Exception, then call increaseExceptionQps
  • The exit method called node’s decreaseThreadNum and finally triggered the real exit operation.

StatisticNode

com/alibaba/csp/sentinel/node/StatisticNode.java

public class StatisticNode implements Node {

    private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount,
        IntervalProperty.INTERVAL);

    private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60);

    private AtomicInteger curThreadNum = new AtomicInteger(0);

    private long lastFetchTime = -1;

    //......

    @Override
    public int curThreadNum() {
        return curThreadNum.get();
    }

    @Override
    public void addPassRequest() {
        rollingCounterInSecond.addPass();
        rollingCounterInMinute.addPass();
    }

    @Override
    public void increaseBlockedQps() {
        rollingCounterInSecond.addBlock();
        rollingCounterInMinute.addBlock();
    }

    @Override
    public void increaseExceptionQps() {
        rollingCounterInSecond.addException();
        rollingCounterInMinute.addException();

    }

    @Override
    public void increaseThreadNum() {
        curThreadNum.incrementAndGet();
    }

    @Override
    public void decreaseThreadNum() {
        curThreadNum.decrementAndGet();
    }

}
  • Statisticnode has a curThreadNum, which records the number of calling threads for each Node to facilitate flow control later on.

DefaultController

com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java

public class DefaultController implements Controller {

    double count = 0;
    int grade = 0;

    public DefaultController(double count, int grade) {
        super();
        this.count = count;
        this.grade = grade;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {
            return false;
        }

        return true;
    }

    private int avgUsedTokens(Node node) {
        if (node == null) {
            return -1;
        }
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)node.passQps();
    }

}
  • The canPass method here uses node’s relevant statistics to judge the current limit.
  • CurThreadNum is used if it is FLOW_GRADE_THREAD, otherwise passQps statistics are used.

DefaultSlotsChainBuilder

com/alibaba/csp/sentinel/slots/DefaultSlotsChainBuilder.java

public class DefaultSlotsChainBuilder implements SlotsChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());

        return chain;
    }

}

StatisticSlot precedes SystemSlot, SystemSlot, AuthoritySlot, FlowSlot, but its increment statistics are performed after the fireEntry action, which is equivalent to after intercepted by aop. Node’s curThreadNum can also be understood as the number of request threads after passing the current limit condition.

Summary

  • StatisticSlot is similar to after interception, and statistics will not be carried out until all subsequent slot conditions are passed.
  • Statistics maintains relevant data such as curThreadNum, rollingCounterInSecond, rollingCounterInMinute through StatisticNode.
  • DefaultController controls by default according to the specified current limiting conditions. If it is FLOW_GRADE_THREAD, curThreadNum is used, otherwise passQps statistics is used to judge

doc