Talk about storm’s PartialKeyGrouping

  storm

Order

This article mainly studies storm’s PartialKeyGrouping

Example

    @Test
    public void testPartialKeyGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        String spoutId = "wordGenerator";
        String counterId = "counter";
        String aggId = "aggregator";
        String intermediateRankerId = "intermediateRanker";
        String totalRankerId = "finalRanker";
        int TOP_N = 5;

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(spoutId, new TestWordSpout(), 5);
        //NOTE 通过partialKeyGrouping替代fieldsGrouping,实现较为均衡的负载到countBolt
        builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word"));
        builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj"));
        builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj"));
        builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
        submitRemote(builder);
    }
  • It is worth noting that the bolt of wordCount use PartialKeyGrouping, and the same word is no longer fixed to the same task. Therefore, RollingCountAggBolt needs to merge by fieldsGrouping.

PartialKeyGrouping(Version 1.2.2)

storm-core-1.2.2-sources.jar! /org/apache/storm/grouping/PartialKeyGrouping.java

public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
    private static final long serialVersionUID = -447379837314000353L;
    private List<Integer> targetTasks;
    private long[] targetTaskStats;
    private HashFunction h1 = Hashing.murmur3_128(13);
    private HashFunction h2 = Hashing.murmur3_128(17);
    private Fields fields = null;
    private Fields outFields = null;

    public PartialKeyGrouping() {
        //Empty
    }

    public PartialKeyGrouping(Fields fields) {
        this.fields = fields;
    }

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        this.targetTasks = targetTasks;
        targetTaskStats = new long[this.targetTasks.size()];
        if (this.fields != null) {
            this.outFields = context.getComponentOutputFields(stream);
        }
    }

    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        List<Integer> boltIds = new ArrayList<>(1);
        if (values.size() > 0) {
            byte[] raw;
            if (fields != null) {
                List<Object> selectedFields = outFields.select(fields, values);
                ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);
                for (Object o: selectedFields) {
                    if (o instanceof List) {
                        out.putInt(Arrays.deepHashCode(((List)o).toArray()));
                    } else if (o instanceof Object[]) {
                        out.putInt(Arrays.deepHashCode((Object[])o));
                    } else if (o instanceof byte[]) {
                        out.putInt(Arrays.hashCode((byte[]) o));
                    } else if (o instanceof short[]) {
                        out.putInt(Arrays.hashCode((short[]) o));
                    } else if (o instanceof int[]) {
                        out.putInt(Arrays.hashCode((int[]) o));
                    } else if (o instanceof long[]) {
                        out.putInt(Arrays.hashCode((long[]) o));
                    } else if (o instanceof char[]) {
                        out.putInt(Arrays.hashCode((char[]) o));
                    } else if (o instanceof float[]) {
                        out.putInt(Arrays.hashCode((float[]) o));
                    } else if (o instanceof double[]) {
                        out.putInt(Arrays.hashCode((double[]) o));
                    } else if (o instanceof boolean[]) {
                        out.putInt(Arrays.hashCode((boolean[]) o));
                    } else if (o != null) {
                        out.putInt(o.hashCode());
                    } else {
                      out.putInt(0);
                    }
                }
                raw = out.array();
            } else {
                raw = values.get(0).toString().getBytes(); // assume key is the first field
            }
            int firstChoice = (int) (Math.abs(h1.hashBytes(raw).asLong()) % this.targetTasks.size());
            int secondChoice = (int) (Math.abs(h2.hashBytes(raw).asLong()) % this.targetTasks.size());
            int selected = targetTaskStats[firstChoice] > targetTaskStats[secondChoice] ? secondChoice : firstChoice;
            boltIds.add(targetTasks.get(selected));
            targetTaskStats[selected]++;
        }
        return boltIds;
    }
}
  • It can be seen that PartialKeyGrouping is a CustomStreamGrouping. when prepare, long[] targetTaskStats is initialized to count each task.
  • PartialKeyGrouping If fields are not specified, it is calculated by default by the first field of outputFields.
  • The Hashing.murmur3_128 function provided by the guava class library is used here to construct two HashFunction, and then the absolute value of the Hash value and the targetTasks.size () are calculated to obtain the remainder to obtain two optional taskId subscripts.
  • Then, according to the statistical value of targetTaskStats, take the taskId that has been used less frequently, and update targetTaskStats after being selected.

PartialKeyGrouping(Version 2.0.0)

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

/**
 * A variation on FieldGrouping. This grouping operates on a partitioning of the incoming tuples (like a FieldGrouping), but it can send
 * Tuples from a given partition to multiple downstream tasks.
 *
 * Given a total pool of target tasks, this grouping will always send Tuples with a given key to one member of a subset of those tasks. Each
 * key is assigned a subset of tasks. Each tuple is then sent to one task from that subset.
 *
 * Notes: - the default TaskSelector ensures each task gets as close to a balanced number of Tuples as possible - the default
 * AssignmentCreator hashes the key and produces an assignment of two tasks
 */
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
    private static final long serialVersionUID = -1672360572274911808L;
    private List<Integer> targetTasks;
    private Fields fields = null;
    private Fields outFields = null;

    private AssignmentCreator assignmentCreator;
    private TargetSelector targetSelector;

    public PartialKeyGrouping() {
        this(null);
    }

    public PartialKeyGrouping(Fields fields) {
        this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
    }

    public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {
        this(fields, assignmentCreator, new BalancedTargetSelector());
    }

    public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {
        this.fields = fields;
        this.assignmentCreator = assignmentCreator;
        this.targetSelector = targetSelector;
    }

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        this.targetTasks = targetTasks;
        if (this.fields != null) {
            this.outFields = context.getComponentOutputFields(stream);
        }
    }

    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        List<Integer> boltIds = new ArrayList<>(1);
        if (values.size() > 0) {
            final byte[] rawKeyBytes = getKeyBytes(values);

            final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
            final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);

            boltIds.add(selectedTask);
        }
        return boltIds;
    }

    /**
     * Extract the key from the input Tuple.
     */
    private byte[] getKeyBytes(List<Object> values) {
        byte[] raw;
        if (fields != null) {
            List<Object> selectedFields = outFields.select(fields, values);
            ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);
            for (Object o : selectedFields) {
                if (o instanceof List) {
                    out.putInt(Arrays.deepHashCode(((List) o).toArray()));
                } else if (o instanceof Object[]) {
                    out.putInt(Arrays.deepHashCode((Object[]) o));
                } else if (o instanceof byte[]) {
                    out.putInt(Arrays.hashCode((byte[]) o));
                } else if (o instanceof short[]) {
                    out.putInt(Arrays.hashCode((short[]) o));
                } else if (o instanceof int[]) {
                    out.putInt(Arrays.hashCode((int[]) o));
                } else if (o instanceof long[]) {
                    out.putInt(Arrays.hashCode((long[]) o));
                } else if (o instanceof char[]) {
                    out.putInt(Arrays.hashCode((char[]) o));
                } else if (o instanceof float[]) {
                    out.putInt(Arrays.hashCode((float[]) o));
                } else if (o instanceof double[]) {
                    out.putInt(Arrays.hashCode((double[]) o));
                } else if (o instanceof boolean[]) {
                    out.putInt(Arrays.hashCode((boolean[]) o));
                } else if (o != null) {
                    out.putInt(o.hashCode());
                } else {
                    out.putInt(0);
                }
            }
            raw = out.array();
        } else {
            raw = values.get(0).toString().getBytes(); // assume key is the first field
        }
        return raw;
    }

    //......
}
  • Version 2.0.0 encapsulates logic into RandomTwoTaskAssignmentCreator and BalancedTargetSelector

RandomTwoTaskAssignmentCreator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

    /**
     * This interface is responsible for choosing a subset of the target tasks to use for a given key.
     *
     * NOTE: whatever scheme you use to create the assignment should be deterministic. This may be executed on multiple Storm Workers, thus
     * each of them needs to come up with the same assignment for a given key.
     */
    public interface AssignmentCreator extends Serializable {
        int[] createAssignment(List<Integer> targetTasks, byte[] key);
    }

    /*========== Implementations ==========*/

    /**
     * This implementation of AssignmentCreator chooses two arbitrary tasks.
     */
    public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator {
        /**
         * Creates a two task assignment by selecting random tasks.
         */
        public int[] createAssignment(List<Integer> tasks, byte[] key) {
            // It is necessary that this produce a deterministic assignment based on the key, so seed the Random from the key
            final long seedForRandom = Arrays.hashCode(key);
            final Random random = new Random(seedForRandom);
            final int choice1 = random.nextInt(tasks.size());
            int choice2 = random.nextInt(tasks.size());
            // ensure that choice1 and choice2 are not the same task
            choice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2;
            return new int[]{ tasks.get(choice1), tasks.get(choice2) };
        }
    }
  • Version 2.0.0 no longer uses hashing. murmurmur 3 _ 128 hash function provided by guava class library, instead uses hash value of key as seed, uses Random function to calculate subscript of two taskId, and returns two values here for bolt to make load balancing selection

BalancedTargetSelector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

    /**
     * This interface chooses one element from a task assignment to send a specific Tuple to.
     */
    public interface TargetSelector extends Serializable {
        Integer chooseTask(int[] assignedTasks);
    }

    /**
     * A basic implementation of target selection. This strategy chooses the task within the assignment that has received the fewest Tuples
     * overall from this instance of the grouping.
     */
    public static class BalancedTargetSelector implements TargetSelector {
        private Map<Integer, Long> targetTaskStats = Maps.newHashMap();

        /**
         * Chooses one of the incoming tasks and selects the one that has been selected the fewest times so far.
         */
        public Integer chooseTask(int[] assignedTasks) {
            Integer taskIdWithMinLoad = null;
            Long minTaskLoad = Long.MAX_VALUE;

            for (Integer currentTaskId : assignedTasks) {
                final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L);
                if (currentTaskLoad < minTaskLoad) {
                    minTaskLoad = currentTaskLoad;
                    taskIdWithMinLoad = currentTaskId;
                }
            }

            targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1);
            return taskIdWithMinLoad;
        }
    }
  • BalancedTargetSelector calculates taskIdWithMinLoad return according to the selected taskId and then according to targetTaskStats

FieldsGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

    public static class FieldsGrouper implements CustomStreamGrouping {

        private Fields outFields;
        private List<List<Integer>> targetTasks;
        private Fields groupFields;
        private int numTasks;

        public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
            this.outFields = outFields;
            this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));

        }

        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
            this.targetTasks = new ArrayList<List<Integer>>();
            for (Integer targetTask : targetTasks) {
                this.targetTasks.add(Collections.singletonList(targetTask));
            }
            this.numTasks = targetTasks.size();
        }

        @Override
        public List<Integer> chooseTasks(int taskId, List<Object> values) {
            int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
            return targetTasks.get(targetTaskIndex);
        }

    }
  • You can see here that FieldsGrouper’s chooseTasks method uses TupleUtils.chooseTaskIndex to select the taskId subscript

TupleUtils.chooseTaskIndex

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/TupleUtils.java

    public static <T> int chooseTaskIndex(List<T> keys, int numTasks) {
        return Math.floorMod(listHashCode(keys), numTasks);
    }

    private static <T> int listHashCode(List<T> alist) {
        if (alist == null) {
            return 1;
        } else {
            return Arrays.deepHashCode(alist.toArray());
        }
    }
  • Here, the keys are first listHashCode, and then Math.floorMod is performed with numTasks, i.e. modulo down
  • ListHashCode called arrays.deep hashcode (anlist.toarray ()) for hash value calculation

Summary

  • Storm’s PartialKeyGrouping is to solve the problem of bolt node skewdload caused by fieldsGrouping.
  • FieldsGrouping selects the subscript of taskId by hashing the selected fields and then modulo down the number of taskids.
  • The implementation of PartialKeyGrouping in version 1.2.2 is to use Hashing.murmur3_128 hash function provided by guava to calculate hash value, and then take the absolute value and the number of taskId to take the remainder to obtain two optional taskId subscripts; In version 2.0.0, the hash value of key is used as seed, and Random function is used to calculate subscripts of two taskId. Note that two values are returned here for bolt to make load balancing selection, which is different from fieldsGrouping. After obtaining the two candidate taskId, PartialKeyGrouping additionally maintains the usage number of taskId, and uses less for each selection. At the same time, it also updates the count for each selection.
  • It is worth noting that the bolt of wordCount use PartialKeyGrouping, and the same word is no longer fixed to the same task. Therefore, RollingCountAggBolt needs to merge by fieldsGrouping.

doc