Talk about flink’s InputFormatSourceFunction

  flink

Order

This article mainly studies flink’s InputFormatSourceFunction

Example

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        IteratorInputFormat iteratorInputFormat = new IteratorInputFormat<String>(new WordIterator());
        env
                //TypeInformation.of(new TypeHint<String>() {}
                .createInput(iteratorInputFormat,TypeExtractor.createTypeInfo(String.class))
                .setParallelism(1)
                .print();
  • IteratorInputFormat is used here to call env’s createInput method to create SourceFunction.

StreamExecutionEnvironment.createInput

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java

    @PublicEvolving
    public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) {
        DataStreamSource<OUT> source;

        if (inputFormat instanceof FileInputFormat) {
            @SuppressWarnings("unchecked")
            FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat;

            source = createFileInput(format, typeInfo, "Custom File source",
                    FileProcessingMode.PROCESS_ONCE, -1);
        } else {
            source = createInput(inputFormat, typeInfo, "Custom Source");
        }
        return source;
    }

    private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat,
                                                    TypeInformation<OUT> typeInfo,
                                                    String sourceName) {

        InputFormatSourceFunction<OUT> function = new InputFormatSourceFunction<>(inputFormat, typeInfo);
        return addSource(function, sourceName, typeInfo);
    }
  • Createinput creates inputFormatsourcefunction when inputformat is not of FileInputFormat type.

InputFormatSourceFunction

/**
 * A {@link SourceFunction} that reads data using an {@link InputFormat}.
 */
@Internal
public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
    private static final long serialVersionUID = 1L;

    private TypeInformation<OUT> typeInfo;
    private transient TypeSerializer<OUT> serializer;

    private InputFormat<OUT, InputSplit> format;

    private transient InputSplitProvider provider;
    private transient Iterator<InputSplit> splitIterator;

    private volatile boolean isRunning = true;

    @SuppressWarnings("unchecked")
    public InputFormatSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
        this.format = (InputFormat<OUT, InputSplit>) format;
        this.typeInfo = typeInfo;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void open(Configuration parameters) throws Exception {
        StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();

        if (format instanceof RichInputFormat) {
            ((RichInputFormat) format).setRuntimeContext(context);
        }
        format.configure(parameters);

        provider = context.getInputSplitProvider();
        serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
        splitIterator = getInputSplits();
        isRunning = splitIterator.hasNext();
    }

    @Override
    public void run(SourceContext<OUT> ctx) throws Exception {
        try {

            Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
            if (isRunning && format instanceof RichInputFormat) {
                ((RichInputFormat) format).openInputFormat();
            }

            OUT nextElement = serializer.createInstance();
            while (isRunning) {
                format.open(splitIterator.next());

                // for each element we also check if cancel
                // was called by checking the isRunning flag

                while (isRunning && !format.reachedEnd()) {
                    nextElement = format.nextRecord(nextElement);
                    if (nextElement != null) {
                        ctx.collect(nextElement);
                    } else {
                        break;
                    }
                }
                format.close();
                completedSplitsCounter.inc();

                if (isRunning) {
                    isRunning = splitIterator.hasNext();
                }
            }
        } finally {
            format.close();
            if (format instanceof RichInputFormat) {
                ((RichInputFormat) format).closeInputFormat();
            }
            isRunning = false;
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void close() throws Exception {
        format.close();
        if (format instanceof RichInputFormat) {
            ((RichInputFormat) format).closeInputFormat();
        }
    }

    /**
     * Returns the {@code InputFormat}. This is only needed because we need to set the input
     * split assigner on the {@code StreamGraph}.
     */
    public InputFormat<OUT, InputSplit> getFormat() {
        return format;
    }

    private Iterator<InputSplit> getInputSplits() {

        return new Iterator<InputSplit>() {

            private InputSplit nextSplit;

            private boolean exhausted;

            @Override
            public boolean hasNext() {
                if (exhausted) {
                    return false;
                }

                if (nextSplit != null) {
                    return true;
                }

                final InputSplit split;
                try {
                    split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader());
                } catch (InputSplitProviderException e) {
                    throw new RuntimeException("Could not retrieve next input split.", e);
                }

                if (split != null) {
                    this.nextSplit = split;
                    return true;
                } else {
                    exhausted = true;
                    return false;
                }
            }

            @Override
            public InputSplit next() {
                if (this.nextSplit == null && !hasNext()) {
                    throw new NoSuchElementException();
                }

                final InputSplit tmp = this.nextSplit;
                this.nextSplit = null;
                return tmp;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}
  • InputFormatSourceFunction is a SourceFunction that uses InputFormat to read data. It inherits RichParallelSourceFunction and adds a constructor with 2 parameters, one is InputFormat and the other is TypeInformation
  • Here is a InputSplit method that returns Iterator (splitIterator), nextSplit is to call InputSplitprovider. GetNextInputSplit to get
  • The run method mainly calls splitIterator.next () one by one, uses InputFormat to open the InputSplit, then calls format.nextRecord to read each record of the InputSplit one by one, and finally uses the emit method of SourceContext to emit it.

InputSplitProvider

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java

/**
 * An input split provider can be successively queried to provide a series of {@link InputSplit} objects a
 * task is supposed to consume in the course of its execution.
 */
@Public
public interface InputSplitProvider {

    /**
     * Requests the next input split to be consumed by the calling task.
     *
     * @param userCodeClassLoader used to deserialize input splits
     * @return the next input split to be consumed by the calling task or <code>null</code> if the
     *         task shall not consume any further input splits.
     * @throws InputSplitProviderException if fetching the next input split fails
     */
    InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException;
}
  • The InputSplitProvider interface defines the getNextInputSplit method, which is used to query nextInputSplit. It has two implementation classes: RpcInputSplitProvider and TaskInputSplitProvider

RpcInputSplitProvider

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java

public class RpcInputSplitProvider implements InputSplitProvider {
    private final JobMasterGateway jobMasterGateway;
    private final JobVertexID jobVertexID;
    private final ExecutionAttemptID executionAttemptID;
    private final Time timeout;

    public RpcInputSplitProvider(
            JobMasterGateway jobMasterGateway,
            JobVertexID jobVertexID,
            ExecutionAttemptID executionAttemptID,
            Time timeout) {
        this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
        this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
        this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID);
        this.timeout = Preconditions.checkNotNull(timeout);
    }


    @Override
    public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
        Preconditions.checkNotNull(userCodeClassLoader);

        CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
            jobVertexID,
            executionAttemptID);

        try {
            SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());

            if (serializedInputSplit.isEmpty()) {
                return null;
            } else {
                return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader);
            }
        } catch (Exception e) {
            throw new InputSplitProviderException("Requesting the next input split failed.", e);
        }
    }
}
  • RpcInputSplitProvider requests jobmastergateway.requestnextinputplit to get SerializedInputSplit (The splitProvider for this example is RpcInputSplitProvider.)

TaskInputSplitProvider

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java

/**
 * Implementation using {@link ActorGateway} to forward the messages.
 */
public class TaskInputSplitProvider implements InputSplitProvider {

    private final ActorGateway jobManager;
    
    private final JobID jobID;
    
    private final JobVertexID vertexID;

    private final ExecutionAttemptID executionID;

    private final FiniteDuration timeout;


    public TaskInputSplitProvider(
        ActorGateway jobManager,
        JobID jobID,
        JobVertexID vertexID,
        ExecutionAttemptID executionID,
        FiniteDuration timeout) {

        this.jobManager = Preconditions.checkNotNull(jobManager);
        this.jobID = Preconditions.checkNotNull(jobID);
        this.vertexID = Preconditions.checkNotNull(vertexID);
        this.executionID = Preconditions.checkNotNull(executionID);
        this.timeout = Preconditions.checkNotNull(timeout);
    }

    @Override
    public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
        Preconditions.checkNotNull(userCodeClassLoader);

        final Future<Object> response = jobManager.ask(
            new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID),
            timeout);

        final Object result;

        try {
            result = Await.result(response, timeout);
        } catch (Exception e) {
            throw new InputSplitProviderException("Did not receive next input split from JobManager.", e);
        }

        if(result instanceof JobManagerMessages.NextInputSplit){
            final JobManagerMessages.NextInputSplit nextInputSplit =
                (JobManagerMessages.NextInputSplit) result;

            byte[] serializedData = nextInputSplit.splitData();

            if(serializedData == null) {
                return null;
            } else {
                final Object deserialized;

                try {
                    deserialized = InstantiationUtil.deserializeObject(serializedData,
                        userCodeClassLoader);
                } catch (Exception e) {
                    throw new InputSplitProviderException("Could not deserialize the serialized input split.", e);
                }

                return (InputSplit) deserialized;
            }
        } else {
            throw new InputSplitProviderException("RequestNextInputSplit requires a response of type " +
                "NextInputSplit. Instead response is of type " + result.getClass() + '.');
        }

    }
}
  • TaskInputSplitProvider requested jobmanager.ask (newjobmanagermessages.requestnextputplit (jobid, vertesid, executionid), timeout) to obtain SerializedInputSplit

InputSplit

flink-core-1.6.2-sources.jar! /org/apache/flink/core/io/InputSplit.java

/**
 * This interface must be implemented by all kind of input splits that can be assigned to input formats.
 * 
 * <p>Input splits are transferred in serialized form via the messages, so they need to be serializable
 * as defined by {@link java.io.Serializable}.</p>
 */
@Public
public interface InputSplit extends Serializable {
    
    /**
     * Returns the number of this input split.
     * 
     * @return the number of this input split
     */
    int getSplitNumber();
}
  • InputSplit is an interface that all types of input splits must implement. It inherits Serializable and is convenient for serialization transmission. G etSplitNumber returns the number of the current split.
  • It has four implementation classes, two of which directly implement the interface, namely GenericInputSplit and LocatableInputSplit
  • The other two are FileInputSplit, which inherits LocatableInputSplit, and TimestampedFileInputSplit, which inherits FileInputSplit.

GenericInputSplit

flink-core-1.6.2-sources.jar! /org/apache/flink/core/io/GenericInputSplit.java

/**
 * A generic input split that has only a partition number.
 */
@Public
public class GenericInputSplit implements InputSplit, java.io.Serializable {

    private static final long serialVersionUID = 1L;

    /** The number of this split. */
    private final int partitionNumber;

    /** The total number of partitions */
    private final int totalNumberOfPartitions;
    
    // --------------------------------------------------------------------------------------------

    /**
     * Creates a generic input split with the given split number.
     * 
     * @param partitionNumber The number of the split's partition.
     * @param totalNumberOfPartitions The total number of the splits (partitions).
     */
    public GenericInputSplit(int partitionNumber, int totalNumberOfPartitions) {
        this.partitionNumber = partitionNumber;
        this.totalNumberOfPartitions = totalNumberOfPartitions;
    }

    //......
    
    public String toString() {
        return "GenericSplit (" + this.partitionNumber + '/' + this.totalNumberOfPartitions + ')';
    }
}
  • GenericInputSplit is relatively simple, with only two attributes, namely partitionNumber, totalNumberOfPartitions (The InputSplit for this example is of type GenericInputSplit)

LocatableInputSplit

flink-core-1.6.2-sources.jar! /org/apache/flink/core/io/LocatableInputSplit.java

/**
 * A locatable input split is an input split referring to input data which is located on one or more hosts.
 */
@Public
public class LocatableInputSplit implements InputSplit, java.io.Serializable {
    
    private static final long serialVersionUID = 1L;

    private static final String[] EMPTY_ARR = new String[0];
    
    /** The number of the split. */
    private final int splitNumber;

    /** The names of the hosts storing the data this input split refers to. */
    private final String[] hostnames;

    // --------------------------------------------------------------------------------------------
    
    /**
     * Creates a new locatable input split that refers to a multiple host as its data location.
     * 
     * @param splitNumber The number of the split
     * @param hostnames The names of the hosts storing the data this input split refers to.
     */
    public LocatableInputSplit(int splitNumber, String[] hostnames) {
        this.splitNumber = splitNumber;
        this.hostnames = hostnames == null ? EMPTY_ARR : hostnames;
    }

    /**
     * Creates a new locatable input split that refers to a single host as its data location.
     *
     * @param splitNumber The number of the split.
     * @param hostname The names of the host storing the data this input split refers to.
     */
    public LocatableInputSplit(int splitNumber, String hostname) {
        this.splitNumber = splitNumber;
        this.hostnames = hostname == null ? EMPTY_ARR : new String[] { hostname };
    }

    //......
    
    @Override
    public String toString() {
        return "Locatable Split (" + splitNumber + ") at " + Arrays.toString(this.hostnames);
    }
}
  • Localableinput split is a LocatableInputSplit, which has two attributes, namely splitNumber and hostnames where the data corresponding to the split is located.

IteratorInputFormat

flink-java-1.6.2-sources.jar! /org/apache/flink/api/java/io/IteratorInputFormat.java

/**
 * An input format that returns objects from an iterator.
 */
@PublicEvolving
public class IteratorInputFormat<T> extends GenericInputFormat<T> implements NonParallelInput {

    private static final long serialVersionUID = 1L;

    private Iterator<T> iterator; // input data as serializable iterator

    public IteratorInputFormat(Iterator<T> iterator) {
        if (!(iterator instanceof Serializable)) {
            throw new IllegalArgumentException("The data source iterator must be serializable.");
        }

        this.iterator = iterator;
    }

    @Override
    public boolean reachedEnd() {
        return !this.iterator.hasNext();
    }

    @Override
    public T nextRecord(T record) {
        return this.iterator.next();
    }
}
  • IteratorInputFormat mainly wraps Iterator and implements the reachedEnd and nextRecord interfaces. It inherits GenericInputFormat.

GenericInputFormat

flink-core-1.6.2-sources.jar! /org/apache/flink/api/common/io/GenericInputFormat.java

/**
 * Generic base class for all Rich inputs that are not based on files.
 */
@Public
public abstract class GenericInputFormat<OT> extends RichInputFormat<OT, GenericInputSplit> {

    private static final long serialVersionUID = 1L;
    
    /**
     * The partition of this split.
     */
    protected int partitionNumber;

    // --------------------------------------------------------------------------------------------
    
    @Override
    public void configure(Configuration parameters) {
        //    nothing by default
    }

    @Override
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
        // no statistics available, by default.
        return cachedStatistics;
    }

    @Override
    public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
        if (numSplits < 1) {
            throw new IllegalArgumentException("Number of input splits has to be at least 1.");
        }

        numSplits = (this instanceof NonParallelInput) ? 1 : numSplits;
        GenericInputSplit[] splits = new GenericInputSplit[numSplits];
        for (int i = 0; i < splits.length; i++) {
            splits[i] = new GenericInputSplit(i, numSplits);
        }
        return splits;
    }
    
    @Override
    public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
        return new DefaultInputSplitAssigner(splits);
    }

    // --------------------------------------------------------------------------------------------

    @Override
    public void open(GenericInputSplit split) throws IOException {
        this.partitionNumber = split.getSplitNumber();
    }

    @Override
    public void close() throws IOException {}
}
  • RpcInputSplitProvider calls JobMaster.requestnextinputplit to get SerializedInputSplit, while jobmaster calls splitAssigner.getnextinputplit (host, taskid), where splitassigner is DefaultInputSplitAssigner (GetSplitsigner ())
  • The splitAssigner returned by vertex.getSplitAssigner () is obtained by ExecutionJobVertex in the constructor according to Splitsource. GetInputSplitAssigner (Splitsource. CreateInputSplits (NumTaskSplits))
  • SplitSource is the IteratorInputFormat here, while createInputSplits (Divide according to numTaskVertices) and getInputSplitAssigner methods are both provided for the parent class GenericInputFormat

DefaultInputSplitAssigner

flink-core-1.6.2-sources.jar! /org/apache/flink/api/common/io/DefaultInputSplitAssigner.java

/**
 * This is the default implementation of the {@link InputSplitAssigner} interface. The default input split assigner
 * simply returns all input splits of an input vertex in the order they were originally computed.
 */
@Internal
public class DefaultInputSplitAssigner implements InputSplitAssigner {

    /** The logging object used to report information and errors. */
    private static final Logger LOG = LoggerFactory.getLogger(DefaultInputSplitAssigner.class);

    /** The list of all splits */
    private final List<InputSplit> splits = new ArrayList<InputSplit>();


    public DefaultInputSplitAssigner(InputSplit[] splits) {
        Collections.addAll(this.splits, splits);
    }
    
    public DefaultInputSplitAssigner(Collection<? extends InputSplit> splits) {
        this.splits.addAll(splits);
    }
    
    
    @Override
    public InputSplit getNextInputSplit(String host, int taskId) {
        InputSplit next = null;
        
        // keep the synchronized part short
        synchronized (this.splits) {
            if (this.splits.size() > 0) {
                next = this.splits.remove(this.splits.size() - 1);
            }
        }
        
        if (LOG.isDebugEnabled()) {
            if (next == null) {
                LOG.debug("No more input splits available");
            } else {
                LOG.debug("Assigning split " + next + " to " + host);
            }
        }
        return next;
    }
}
  • DefaultInputSplitAssigner only returns InputSplit in sequence

Summary

  • InputFormatSourceFunction is a SourceFunction that uses InputFormat to read data. It inherits RichParallelSourceFunction and adds a constructor with 2 parameters, one is InputFormat and the other is TypeInformation
  • IteratorInputFormat used in this example inherits GenericInputFormat, which provides two important methods, one is createInputSplits (Here, the segmentation is based on numTaskVertices.), one is getInputSplitAssigner (What is created here is DefaultInputSplitAssigner, which returns the split InputSplit in sequence)
  • The run method of InputFormatSourceFunction mainly calls splitIterator.next () one by one, uses InputFormat to open the InputSplit, then calls format.nextRecord to read each record of the InputSplit one by one, and finally uses the emit method of SourceContext to emit

You can see that the whole big logic is that GenericInputFormat provides a method to divide input into InputSplit, and at the same time provides InputSplitAssigner. then InputFormatSourceFunction traverses the divided ones one by one (Task) on the internet (Obtained via InputSplitAssigner), and then read InputSplit through InputFormat to get each element of this InputSplit one by one, and then emit it through the emit method of SourceContext.

doc