Talk about flink’s ParallelIteratorInputFormat

  flink

Order

This article mainly studies flink’s ParallelIteratorInputFormat

Example

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Long> dataSet = env.generateSequence(15,106)
                .setParallelism(3);
        dataSet.print();
  • The generateSequence method of ExecutionEnvironment is used here to create ParallelIteratorInputFormat with NumberSequenceIterator.

ParallelIteratorInputFormat

flink-java-1.6.2-sources.jar! /org/apache/flink/api/java/io/ParallelIteratorInputFormat.java

/**
 * An input format that generates data in parallel through a {@link SplittableIterator}.
 */
@PublicEvolving
public class ParallelIteratorInputFormat<T> extends GenericInputFormat<T> {

    private static final long serialVersionUID = 1L;

    private final SplittableIterator<T> source;

    private transient Iterator<T> splitIterator;

    public ParallelIteratorInputFormat(SplittableIterator<T> iterator) {
        this.source = iterator;
    }

    @Override
    public void open(GenericInputSplit split) throws IOException {
        super.open(split);

        this.splitIterator = this.source.getSplit(split.getSplitNumber(), split.getTotalNumberOfSplits());
    }

    @Override
    public boolean reachedEnd() {
        return !this.splitIterator.hasNext();
    }

    @Override
    public T nextRecord(T reuse) {
        return this.splitIterator.next();
    }
}
  • ParallelIteratorInputFormat inherits the generacinputformat class, and there are four other subclasses under the generacinputformat class, namely CRowValuesInputFormat, CollectionInputFormat, IteratorInputFormat, ValuesInputFormat, which all have the same feature of realizing the NonParallelInput interface.

NonParallelInput

flink-core-1.6.2-sources.jar! /org/apache/flink/api/common/io/NonParallelInput.java

/**
 * This interface acts as a marker for input formats for inputs which cannot be split.
 * Data sources with a non-parallel input formats are always executed with a parallelism
 * of one.
 * 
 * @see InputFormat
 */
@Public
public interface NonParallelInput {
}
  • This interface does not define any method, it is only an identification to indicate whether the InputFormat supports split or not.

GenericInputFormat.createInputSplits

flink-core-1.6.2-sources.jar! /org/apache/flink/api/common/io/GenericInputFormat.java

    @Override
    public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
        if (numSplits < 1) {
            throw new IllegalArgumentException("Number of input splits has to be at least 1.");
        }

        numSplits = (this instanceof NonParallelInput) ? 1 : numSplits;
        GenericInputSplit[] splits = new GenericInputSplit[numSplits];
        for (int i = 0; i < splits.length; i++) {
            splits[i] = new GenericInputSplit(i, numSplits);
        }
        return splits;
    }
  • The createInputSplits method of GenericInputFormat restricts the input numSplits. if it is less than 1, an IllegalArgumentException exception exception is thrown. if the current InputFormat implements the non-parallelinput interface, numSplits is reset to 1

ExecutionEnvironment.fromParallelCollection

flink-java-1.6.2-sources.jar! /org/apache/flink/api/java/ExecutionEnvironment.java

    /**
     * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
     * framework to create a parallel data source that returns the elements in the iterator.
     *
     * <p>Because the iterator will remain unmodified until the actual execution happens, the type of data
     * returned by the iterator must be given explicitly in the form of the type class (this is due to the
     * fact that the Java compiler erases the generic type information).
     *
     * @param iterator The iterator that produces the elements of the data set.
     * @param type The class of the data produced by the iterator. Must not be a generic class.
     * @return A DataSet representing the elements in the iterator.
     *
     * @see #fromParallelCollection(SplittableIterator, TypeInformation)
     */
    public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, Class<X> type) {
        return fromParallelCollection(iterator, TypeExtractor.getForClass(type));
    }

    /**
     * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
     * framework to create a parallel data source that returns the elements in the iterator.
     *
     * <p>Because the iterator will remain unmodified until the actual execution happens, the type of data
     * returned by the iterator must be given explicitly in the form of the type information.
     * This method is useful for cases where the type is generic. In that case, the type class
     * (as given in {@link #fromParallelCollection(SplittableIterator, Class)} does not supply all type information.
     *
     * @param iterator The iterator that produces the elements of the data set.
     * @param type The TypeInformation for the produced data set.
     * @return A DataSet representing the elements in the iterator.
     *
     * @see #fromParallelCollection(SplittableIterator, Class)
     */
    public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type) {
        return fromParallelCollection(iterator, type, Utils.getCallLocationName());
    }

    // private helper for passing different call location names
    private <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type, String callLocationName) {
        return new DataSource<>(this, new ParallelIteratorInputFormat<>(iterator), type, callLocationName);
    }

    /**
     * Creates a new data set that contains a sequence of numbers. The data set will be created in parallel,
     * so there is no guarantee about the order of the elements.
     *
     * @param from The number to start at (inclusive).
     * @param to The number to stop at (inclusive).
     * @return A DataSet, containing all number in the {@code [from, to]} interval.
     */
    public DataSource<Long> generateSequence(long from, long to) {
        return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, Utils.getCallLocationName());
    }
  • The fromParallelCollection method of ExecutionEnvironment will create ParalleliteratorinPutFormat for Iterator of SplittableIterator type; The generateSequence method also calls the fromParallelCollection method, which creates a NumberSequenceIterator (It is a subclass of Splitter)

SplittableIterator

flink-core-1.6.2-sources.jar! /org/apache/flink/util/SplittableIterator.java

/**
 * Abstract base class for iterators that can split themselves into multiple disjoint
 * iterators. The union of these iterators returns the original iterator values.
 *
 * @param <T> The type of elements returned by the iterator.
 */
@Public
public abstract class SplittableIterator<T> implements Iterator<T>, Serializable {

    private static final long serialVersionUID = 200377674313072307L;

    /**
     * Splits this iterator into a number disjoint iterators.
     * The union of these iterators returns the original iterator values.
     *
     * @param numPartitions The number of iterators to split into.
     * @return An array with the split iterators.
     */
    public abstract Iterator<T>[] split(int numPartitions);

    /**
     * Splits this iterator into <i>n</i> partitions and returns the <i>i-th</i> partition
     * out of those.
     *
     * @param num The partition to return (<i>i</i>).
     * @param numPartitions The number of partitions to split into (<i>n</i>).
     * @return The iterator for the partition.
     */
    public Iterator<T> getSplit(int num, int numPartitions) {
        if (numPartitions < 1 || num < 0 || num >= numPartitions) {
            throw new IllegalArgumentException();
        }

        return split(numPartitions)[num];
    }

    /**
     * The maximum number of splits into which this iterator can be split up.
     *
     * @return The maximum number of splits into which this iterator can be split up.
     */
    public abstract int getMaximumNumberOfSplits();
}
  • Splitter is an abstract class that defines the abstract methods Split and getMaximumNumberOfSplitsï¼› ; It has two implementation classes, Longvaluesequenceiter and NumberSequenceiter respectively. Let’s look at NumberSequenceiter here.

NumberSequenceIterator

flink-core-1.6.2-sources.jar! /org/apache/flink/util/NumberSequenceIterator.java

/**
 * The {@code NumberSequenceIterator} is an iterator that returns a sequence of numbers (as {@code Long})s.
 * The iterator is splittable (as defined by {@link SplittableIterator}, i.e., it can be divided into multiple
 * iterators that each return a subsequence of the number sequence.
 */
@Public
public class NumberSequenceIterator extends SplittableIterator<Long> {

    private static final long serialVersionUID = 1L;

    /** The last number returned by the iterator. */
    private final long to;

    /** The next number to be returned. */
    private long current;


    /**
     * Creates a new splittable iterator, returning the range [from, to].
     * Both boundaries of the interval are inclusive.
     *
     * @param from The first number returned by the iterator.
     * @param to The last number returned by the iterator.
     */
    public NumberSequenceIterator(long from, long to) {
        if (from > to) {
            throw new IllegalArgumentException("The 'to' value must not be smaller than the 'from' value.");
        }

        this.current = from;
        this.to = to;
    }


    @Override
    public boolean hasNext() {
        return current <= to;
    }

    @Override
    public Long next() {
        if (current <= to) {
            return current++;
        } else {
            throw new NoSuchElementException();
        }
    }

    @Override
    public NumberSequenceIterator[] split(int numPartitions) {
        if (numPartitions < 1) {
            throw new IllegalArgumentException("The number of partitions must be at least 1.");
        }

        if (numPartitions == 1) {
            return new NumberSequenceIterator[] { new NumberSequenceIterator(current, to) };
        }

        // here, numPartitions >= 2 !!!

        long elementsPerSplit;

        if (to - current + 1 >= 0) {
            elementsPerSplit = (to - current + 1) / numPartitions;
        }
        else {
            // long overflow of the range.
            // we compute based on half the distance, to prevent the overflow.
            // in most cases it holds that: current < 0 and to > 0, except for: to == 0 and current == Long.MIN_VALUE
            // the later needs a special case
            final long halfDiff; // must be positive

            if (current == Long.MIN_VALUE) {
                // this means to >= 0
                halfDiff = (Long.MAX_VALUE / 2 + 1) + to / 2;
            } else {
                long posFrom = -current;
                if (posFrom > to) {
                    halfDiff = to + ((posFrom - to) / 2);
                } else {
                    halfDiff = posFrom + ((to - posFrom) / 2);
                }
            }
            elementsPerSplit = halfDiff / numPartitions * 2;
        }

        if (elementsPerSplit < Long.MAX_VALUE) {
            // figure out how many get one in addition
            long numWithExtra = -(elementsPerSplit * numPartitions) + to - current + 1;

            // based on rounding errors, we may have lost one)
            if (numWithExtra > numPartitions) {
                elementsPerSplit++;
                numWithExtra -= numPartitions;

                if (numWithExtra > numPartitions) {
                    throw new RuntimeException("Bug in splitting logic. To much rounding loss.");
                }
            }

            NumberSequenceIterator[] iters = new NumberSequenceIterator[numPartitions];
            long curr = current;
            int i = 0;
            for (; i < numWithExtra; i++) {
                long next = curr + elementsPerSplit + 1;
                iters[i] = new NumberSequenceIterator(curr, next - 1);
                curr = next;
            }
            for (; i < numPartitions; i++) {
                long next = curr + elementsPerSplit;
                iters[i] = new NumberSequenceIterator(curr, next - 1, true);
                curr = next;
            }

            return iters;
        }
        else {
            // this can only be the case when there are two partitions
            if (numPartitions != 2) {
                throw new RuntimeException("Bug in splitting logic.");
            }

            return new NumberSequenceIterator[] {
                new NumberSequenceIterator(current, current + elementsPerSplit),
                new NumberSequenceIterator(current + elementsPerSplit, to)
            };
        }
    }

    @Override
    public int getMaximumNumberOfSplits() {
        if (to >= Integer.MAX_VALUE || current <= Integer.MIN_VALUE || to - current + 1 >= Integer.MAX_VALUE) {
            return Integer.MAX_VALUE;
        }
        else {
            return (int) (to - current + 1);
        }
    }

    //......
}
  • The constructor of the NumberSequenceIterator provides two parameters from and to, which have a current value inside and are equal to from at the beginning.
  • The split method first calculates elementsPerSplit according to numPartitions. when to-current+1 >= 0, the calculation formula is (to-current+1)/numPartitions
  • After that, numWithExtra is calculated according to the calculated elementsPerSplit, because the calculation of elementsPerSplit is based on rounding operation. if each batch is based on elementsPerSplit, there may be surplus. therefore, this surplus numWithExtra is calculated. if it is larger than numPartitions, it is increased by 1 to elementsPerSplit, and then numWithExtra is subtracted from numPartitions
  • Finally, the first numWithExtra batches are circularly distributed according to numWithExtra, and the surplus numWithExtra batches are evenly distributed to the first numWithExtra batches; For batches from numWithExtra to numPartitions, TO is normally calculated using from+elementsPerSplit -1
  • GetMaximumNumberOfSplits returns Integer.MAX_VALUE if it is the maximum number that can be split, (to > = integer.max _ value | | current < = integer.min _ value | | to-current+1 > = integer.max _ value), otherwise (int) (to-current+1)

Summary

  • There are five sub-classes under the GenericInputFormat class. in addition to ParallelIteratorInputFormat, the others are CRowValuesInputFormat, CollectionInputFormat, IteratorInputFormat, ValuesInputFormat. the latter four sub-classes have a common feature that they all implement the NonParallelInput interface.
  • CreateInputSplits of GenericInputFormat will restrict the input numSplits, and if it is of non-parallelinput type, it will be forced to reset to 1
  • NumberSequenceIterator is an implementation class of SplittableIterator. It is implemented in the fromParallelCollection method of ExecutionEnvironment, the generateSequence method (It created NumberSequenceIterator), create ParalleliteratorinPutFormat for Iterator of SplittableIterator type; On the other hand, the split method of numbersequencer first calculates elementsPerSplit, then calculates numWithExtra, divides numWithExtra into the previous batches, and finally divides the remaining batches according to elementsPerSplit.

doc