Talk about storm’s CustomStreamGrouping

  storm

Order

This article mainly studies storm’s CustomStreamGrouping

CustomStreamGrouping

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/CustomStreamGrouping.java

public interface CustomStreamGrouping extends Serializable {

    /**
     * Tells the stream grouping at runtime the tasks in the target bolt. This information should be used in chooseTasks to determine the
     * target tasks.
     *
     * It also tells the grouping the metadata on the stream this grouping will be used on.
     */
    void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);

    /**
     * This function implements a custom stream grouping. It takes in as input the number of tasks in the target bolt in prepare and returns
     * the tasks to send the tuples to.
     *
     * @param values the values to group on
     */
    List<Integer> chooseTasks(int taskId, List<Object> values);
}
  • Here are the prepare and chooseTasks methods defined.
  • FieldsGrouper, GlobalGrouper, NoneGrouper, AllGrouper, basiclaoadawarechustomstreamgrouping are defined in GrouperFactory.
  • In addition, ShuffleGrouping, PartialKeyGrouping, LoadAwareShuffleGrouping are also defined in the org.apache.storm.grouping package.

FieldsGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

    public static class FieldsGrouper implements CustomStreamGrouping {

        private Fields outFields;
        private List<List<Integer>> targetTasks;
        private Fields groupFields;
        private int numTasks;

        public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
            this.outFields = outFields;
            this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));

        }

        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
            this.targetTasks = new ArrayList<List<Integer>>();
            for (Integer targetTask : targetTasks) {
                this.targetTasks.add(Collections.singletonList(targetTask));
            }
            this.numTasks = targetTasks.size();
        }

        @Override
        public List<Integer> chooseTasks(int taskId, List<Object> values) {
            int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
            return targetTasks.get(targetTaskIndex);
        }

    }
  • Select task subscript for values with fields selected through TupleUtils.chooseTaskIndex; ChooseTaskIndex uses Arrays.deepHashCode to get the hash value and then modulo numTask downward.

GlobalGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

    public static class GlobalGrouper implements CustomStreamGrouping {

        private List<Integer> targetTasks;

        public GlobalGrouper() {
        }

        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
            this.targetTasks = targetTasks;
        }

        @Override
        public List<Integer> chooseTasks(int taskId, List<Object> values) {
            if (targetTasks.isEmpty()) {
                return null;
            }
            // It's possible for target to have multiple tasks if it reads multiple sources
            return Collections.singletonList(targetTasks.get(0));
        }
    }
  • The first task, targetTasks.get(0), is fixed here.

NoneGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

    public static class NoneGrouper implements CustomStreamGrouping {

        private final Random random;
        private List<Integer> targetTasks;
        private int numTasks;

        public NoneGrouper() {
            random = new Random();
        }

        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
            this.targetTasks = targetTasks;
            this.numTasks = targetTasks.size();
        }

        @Override
        public List<Integer> chooseTasks(int taskId, List<Object> values) {
            int index = random.nextInt(numTasks);
            return Collections.singletonList(targetTasks.get(index));
        }
    }
  • Nextint (numtasks)

AllGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

    public static class AllGrouper implements CustomStreamGrouping {

        private List<Integer> targetTasks;

        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
            this.targetTasks = targetTasks;
        }

        @Override
        public List<Integer> chooseTasks(int taskId, List<Object> values) {
            return targetTasks;
        }
    }
  • All targetTasks are returned here.

ShuffleGrouping

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java

public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
    private ArrayList<List<Integer>> choices;
    private AtomicInteger current;

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        choices = new ArrayList<List<Integer>>(targetTasks.size());
        for (Integer i : targetTasks) {
            choices.add(Arrays.asList(i));
        }
        current = new AtomicInteger(0);
        Collections.shuffle(choices, new Random());
    }

    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        int rightNow;
        int size = choices.size();
        while (true) {
            rightNow = current.incrementAndGet();
            if (rightNow < size) {
                return choices.get(rightNow);
            } else if (rightNow == size) {
                current.set(0);
                return choices.get(0);
            }
        } // race condition with another thread, and we lost. try again
    }
}
  • Arraylist < list < integer > > choices are randomized when prepare here
  • Use current.incrementAndGet () to achieve the effect of round robbin. reset to return the first when it exceeds size, and return the index value after incr if it does not exceed

PartialKeyGrouping

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
    private static final long serialVersionUID = -1672360572274911808L;
    private List<Integer> targetTasks;
    private Fields fields = null;
    private Fields outFields = null;

    private AssignmentCreator assignmentCreator;
    private TargetSelector targetSelector;

    public PartialKeyGrouping() {
        this(null);
    }

    public PartialKeyGrouping(Fields fields) {
        this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
    }

    public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {
        this(fields, assignmentCreator, new BalancedTargetSelector());
    }

    public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {
        this.fields = fields;
        this.assignmentCreator = assignmentCreator;
        this.targetSelector = targetSelector;
    }

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        this.targetTasks = targetTasks;
        if (this.fields != null) {
            this.outFields = context.getComponentOutputFields(stream);
        }
    }

    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        List<Integer> boltIds = new ArrayList<>(1);
        if (values.size() > 0) {
            final byte[] rawKeyBytes = getKeyBytes(values);

            final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
            final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);

            boltIds.add(selectedTask);
        }
        return boltIds;
    }
    //......
}
  • Here, the RandomTwoTaskAssignmentCreator is used to select the two taskId and then the one that is used less frequently.

LoadAwareCustomStreamGrouping

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java

public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
    void refreshLoad(LoadMapping loadMapping);
}
  • Inherited the CustomStreamGrouping interface, and then newly defined the refreshLoad method to refresh the load, where the load is mainly the load of executor’s receiveQueue (qMetrics.population() / qMetrics.capacity())
  • LoadAwareCustomStreamGrouping has several implementation classes, including BasicLoadAwareCustomStreamGrouping and LoadAwareShuffleGrouping.

Summary

  • Storm’s CustomStreamGrouping interface defines the choosetasks method, which is used to select Tasks to process tuples.
  • ShuffleGrouping is similar to round robbin, FieldsGrouper uses Arrays.deepHashCode to get hash value according to the selected field value and then modulizes numTask downward, GlobalGrouper returns taskId with index of 0, and NoneGrouper returns randomly. AllGrouper returns all taskids without filtering, PartialKeyGrouping uses the hash value of key as seed, uses Random function to calculate the subscripts of two taskids, and then selects the task that is used less frequently.
  • LoadAware’s grouping includes basicLoadAwareCustomStreamGrouping and LoadAwareShuffleGrouping, which both implement the loadawarectomstream grouping interface, which defines the refreshLoad method to dynamically refresh the load. the load here is mainly the load of executor’s receiveQueue (qMetrics.population() / qMetrics.capacity())

doc