Talk about flink jdbc’s ParameterValuesProvider

  flink

Order

This article mainly studies flink jdbc’s ParameterValuesProvider

ParameterValuesProvider

flink-jdbc_2.11-1.8.0-sources.jar! /org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java

/**
 * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits).
 * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider}
 * implementation.
 */
public interface ParameterValuesProvider {

    /** Returns the necessary parameters array to use for query in parallel a table. */
    Serializable[][] getParameterValues();
}
  • The ParameterValuesProvider interface defines the getParameterValues method, which is used to return the parameters required for parallel table queries. This parameter is mainly used to divide a large sql query into several seg mented queries for parallel processing. It has two implementation classes: GenericParameterValuesProvider and numericbetweenparameterprovider.

GenericParameterValuesProvider

flink-jdbc_2.11-1.8.0-sources.jar! /org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java

/**
 * This splits generator actually does nothing but wrapping the query parameters
 * computed by the user before creating the {@link JDBCInputFormat} instance.
 */
public class GenericParameterValuesProvider implements ParameterValuesProvider {

    private final Serializable[][] parameters;

    public GenericParameterValuesProvider(Serializable[][] parameters) {
        this.parameters = parameters;
    }

    @Override
    public Serializable[][] getParameterValues(){
        //do nothing...precomputed externally
        return parameters;
    }

}
  • GenericParameterValuesProvider has actually done nothing else. The value returned by the getParameterValues method it implements is what the constructor requires.

NumericBetweenParametersProvider

flink-jdbc_2.11-1.8.0-sources.jar! /org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java

/**
 * This query parameters generator is an helper class to parameterize from/to queries on a numeric column.
 * The generated array of from/to values will be equally sized to fetchSize (apart from the last one),
 * ranging from minVal up to maxVal.
 *
 * <p>For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
 * <PRE>
 *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
 * </PRE>
 *
 * <p>You can take advantage of this class to automatically generate the parameters of the BETWEEN clause,
 * based on the passed constructor parameters.
 *
 */
public class NumericBetweenParametersProvider implements ParameterValuesProvider {

    private final long fetchSize;
    private final long minVal;
    private final long maxVal;

    /**
     * NumericBetweenParametersProvider constructor.
     *
     * @param fetchSize the max distance between the produced from/to pairs
     * @param minVal the lower bound of the produced "from" values
     * @param maxVal the upper bound of the produced "to" values
     */
    public NumericBetweenParametersProvider(long fetchSize, long minVal, long maxVal) {
        checkArgument(fetchSize > 0, "Fetch size must be greater than 0.");
        checkArgument(minVal <= maxVal, "Min value cannot be greater than max value.");
        this.fetchSize = fetchSize;
        this.minVal = minVal;
        this.maxVal = maxVal;
    }

    @Override
    public Serializable[][] getParameterValues() {
        double maxElemCount = (maxVal - minVal) + 1;
        int numBatches = new Double(Math.ceil(maxElemCount / fetchSize)).intValue();
        Serializable[][] parameters = new Serializable[numBatches][2];
        int batchIndex = 0;
        for (long start = minVal; start <= maxVal; start += fetchSize, batchIndex++) {
            long end = start + fetchSize - 1;
            if (end > maxVal) {
                end = maxVal;
            }
            parameters[batchIndex] = new Long[]{start, end};
        }
        return parameters;
    }

}
  • Numeric BETWEEN nParameterProvider is a range query based on the NUMERIC primary key (WHERE id BETWEEN ? AND ?) automatically generates segmented parameters, and its constructor requires input of each fetchSize, minimum minVal and maximum maxVal;; The getParameterValues method calculates numBatches based on these values, and then calculates the segmented parameter values

JDBCInputFormat

flink-jdbc_2.11-1.8.0-sources.jar! /org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java

public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable<Row> {

    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);

    private String username;
    private String password;
    private String drivername;
    private String dbURL;
    private String queryTemplate;
    private int resultSetType;
    private int resultSetConcurrency;
    private RowTypeInfo rowTypeInfo;

    private transient Connection dbConn;
    private transient PreparedStatement statement;
    private transient ResultSet resultSet;
    private int fetchSize;

    private boolean hasNext;
    private Object[][] parameterValues;

    public JDBCInputFormat() {
    }

    @Override
    public RowTypeInfo getProducedType() {
        return rowTypeInfo;
    }

    @Override
    public void configure(Configuration parameters) {
        //do nothing here
    }

    @Override
    public void openInputFormat() {
        //called once per inputFormat (on open)
        try {
            Class.forName(drivername);
            if (username == null) {
                dbConn = DriverManager.getConnection(dbURL);
            } else {
                dbConn = DriverManager.getConnection(dbURL, username, password);
            }
            statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
            if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
                statement.setFetchSize(fetchSize);
            }
        } catch (SQLException se) {
            throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
        } catch (ClassNotFoundException cnfe) {
            throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
        }
    }

    @Override
    public void closeInputFormat() {
        //called once per inputFormat (on close)
        try {
            if (statement != null) {
                statement.close();
            }
        } catch (SQLException se) {
            LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage());
        } finally {
            statement = null;
        }

        try {
            if (dbConn != null) {
                dbConn.close();
            }
        } catch (SQLException se) {
            LOG.info("Inputformat couldn't be closed - " + se.getMessage());
        } finally {
            dbConn = null;
        }

        parameterValues = null;
    }

    /**
     * Connects to the source database and executes the query in a <b>parallel
     * fashion</b> if
     * this {@link InputFormat} is built using a parameterized query (i.e. using
     * a {@link PreparedStatement})
     * and a proper {@link ParameterValuesProvider}, in a <b>non-parallel
     * fashion</b> otherwise.
     *
     * @param inputSplit which is ignored if this InputFormat is executed as a
     *        non-parallel source,
     *        a "hook" to the query parameters otherwise (using its
     *        <i>splitNumber</i>)
     * @throws IOException if there's an error during the execution of the query
     */
    @Override
    public void open(InputSplit inputSplit) throws IOException {
        try {
            if (inputSplit != null && parameterValues != null) {
                for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
                    Object param = parameterValues[inputSplit.getSplitNumber()][i];
                    if (param instanceof String) {
                        statement.setString(i + 1, (String) param);
                    } else if (param instanceof Long) {
                        statement.setLong(i + 1, (Long) param);
                    } else if (param instanceof Integer) {
                        statement.setInt(i + 1, (Integer) param);
                    } else if (param instanceof Double) {
                        statement.setDouble(i + 1, (Double) param);
                    } else if (param instanceof Boolean) {
                        statement.setBoolean(i + 1, (Boolean) param);
                    } else if (param instanceof Float) {
                        statement.setFloat(i + 1, (Float) param);
                    } else if (param instanceof BigDecimal) {
                        statement.setBigDecimal(i + 1, (BigDecimal) param);
                    } else if (param instanceof Byte) {
                        statement.setByte(i + 1, (Byte) param);
                    } else if (param instanceof Short) {
                        statement.setShort(i + 1, (Short) param);
                    } else if (param instanceof Date) {
                        statement.setDate(i + 1, (Date) param);
                    } else if (param instanceof Time) {
                        statement.setTime(i + 1, (Time) param);
                    } else if (param instanceof Timestamp) {
                        statement.setTimestamp(i + 1, (Timestamp) param);
                    } else if (param instanceof Array) {
                        statement.setArray(i + 1, (Array) param);
                    } else {
                        //extends with other types if needed
                        throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet).");
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
                }
            }
            resultSet = statement.executeQuery();
            hasNext = resultSet.next();
        } catch (SQLException se) {
            throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
        }
    }

    /**
     * Closes all resources used.
     *
     * @throws IOException Indicates that a resource could not be closed.
     */
    @Override
    public void close() throws IOException {
        if (resultSet == null) {
            return;
        }
        try {
            resultSet.close();
        } catch (SQLException se) {
            LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage());
        }
    }

    /**
     * Checks whether all data has been read.
     *
     * @return boolean value indication whether all data has been read.
     * @throws IOException
     */
    @Override
    public boolean reachedEnd() throws IOException {
        return !hasNext;
    }

    /**
     * Stores the next resultSet row in a tuple.
     *
     * @param row row to be reused.
     * @return row containing next {@link Row}
     * @throws java.io.IOException
     */
    @Override
    public Row nextRecord(Row row) throws IOException {
        try {
            if (!hasNext) {
                return null;
            }
            for (int pos = 0; pos < row.getArity(); pos++) {
                row.setField(pos, resultSet.getObject(pos + 1));
            }
            //update hasNext after we've read the record
            hasNext = resultSet.next();
            return row;
        } catch (SQLException se) {
            throw new IOException("Couldn't read data - " + se.getMessage(), se);
        } catch (NullPointerException npe) {
            throw new IOException("Couldn't access resultSet", npe);
        }
    }

    @Override
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
        return cachedStatistics;
    }

    @Override
    public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
        if (parameterValues == null) {
            return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
        }
        GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];
        for (int i = 0; i < ret.length; i++) {
            ret[i] = new GenericInputSplit(i, ret.length);
        }
        return ret;
    }

    @Override
    public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
        return new DefaultInputSplitAssigner(inputSplits);
    }

    @VisibleForTesting
    PreparedStatement getStatement() {
        return statement;
    }

    //......
}
  • JDBCInputFormat inherits RichInputFormat and implements the ResultTypeQueryable interface at the same time.
  • The createInputSplits method creates a GenericInputSplit array based on parameterValues. If parameterValues is null, totalNumberOfPartitions created by default is 1
  • The getInputSplitAssigner method creates DefaultInputSplitAssigner; based on the InputSplit array; The getStatistics method returns the method parameter cachedStatistics.
  • The openInputFormat method mainly obtains the database connection and prepares the statement; ; CloseInputFormat method mainly closes the statement and closes the database connection
  • The open method receives inputSplit, which mainly extracts query parameters from parameterValues according to inputSplit, sets them to statement, and then executes statement.executeQuery () to obtain resultSet; ; The nextRecord method mainly reads data by traversing the resultSet; The close method mainly closes resultSet

InputSplit

/flink-core-1.8.0-sources.jar! /org/apache/flink/core/io/InputSplit.java

/**
 * This interface must be implemented by all kind of input splits that can be assigned to input formats.
 * 
 * <p>Input splits are transferred in serialized form via the messages, so they need to be serializable
 * as defined by {@link java.io.Serializable}.</p>
 */
@Public
public interface InputSplit extends Serializable {
    
    /**
     * Returns the number of this input split.
     * 
     * @return the number of this input split
     */
    int getSplitNumber();
}
  • The InputSplit interface defines the getSplitNumber method to return the split number of the current input.

GenericInputSplit

flink-core-1.8.0-sources.jar! /org/apache/flink/core/io/GenericInputSplit.java

/**
 * A generic input split that has only a partition number.
 */
@Public
public class GenericInputSplit implements InputSplit, java.io.Serializable {

    private static final long serialVersionUID = 1L;

    /** The number of this split. */
    private final int partitionNumber;

    /** The total number of partitions */
    private final int totalNumberOfPartitions;
    
    // --------------------------------------------------------------------------------------------

    /**
     * Creates a generic input split with the given split number.
     * 
     * @param partitionNumber The number of the split's partition.
     * @param totalNumberOfPartitions The total number of the splits (partitions).
     */
    public GenericInputSplit(int partitionNumber, int totalNumberOfPartitions) {
        this.partitionNumber = partitionNumber;
        this.totalNumberOfPartitions = totalNumberOfPartitions;
    }

    // --------------------------------------------------------------------------------------------

    @Override
    public int getSplitNumber() {
        return this.partitionNumber;
    }
    
    public int getTotalNumberOfSplits() {
        return this.totalNumberOfPartitions;
    }
    
    // --------------------------------------------------------------------------------------------

    @Override
    public int hashCode() {
        return this.partitionNumber ^ this.totalNumberOfPartitions;
    }
    
    @Override
    public boolean equals(Object obj) {
        if (obj instanceof GenericInputSplit) {
            GenericInputSplit other = (GenericInputSplit) obj;
            return this.partitionNumber == other.partitionNumber &&
                    this.totalNumberOfPartitions == other.totalNumberOfPartitions;
        } else {
            return false;
        }
    }
    
    public String toString() {
        return "GenericSplit (" + this.partitionNumber + '/' + this.totalNumberOfPartitions + ')';
    }
}
  • GenericInputSplit implements the InputSplit interface, and its getSplitNumber method returns partitionNumber.

InputSplitAssigner

flink-core-1.8.0-sources.jar! /org/apache/flink/core/io/InputSplitAssigner.java

/**
 * An input split assigner distributes the {@link InputSplit}s among the instances on which a
 * data source exists.
 */
@PublicEvolving
public interface InputSplitAssigner {

    /**
     * Returns the next input split that shall be consumed. The consumer's host is passed as a parameter
     * to allow localized assignments.
     * 
     * @param host The host address of split requesting task.
     * @param taskId The id of the split requesting task.
     * @return the next input split to be consumed, or <code>null</code> if no more splits remain.
     */
    InputSplit getNextInputSplit(String host, int taskId);

}
  • The inputSplitAssigner interface defines the getNextInputSplit method. The two parameters received by the method are host and taskId respectively. The method is used to return the next InputSplit

DefaultInputSplitAssigner

flink-core-1.8.0-sources.jar! /org/apache/flink/api/common/io/DefaultInputSplitAssigner.java

/**
 * This is the default implementation of the {@link InputSplitAssigner} interface. The default input split assigner
 * simply returns all input splits of an input vertex in the order they were originally computed.
 */
@Internal
public class DefaultInputSplitAssigner implements InputSplitAssigner {

    /** The logging object used to report information and errors. */
    private static final Logger LOG = LoggerFactory.getLogger(DefaultInputSplitAssigner.class);

    /** The list of all splits */
    private final List<InputSplit> splits = new ArrayList<InputSplit>();


    public DefaultInputSplitAssigner(InputSplit[] splits) {
        Collections.addAll(this.splits, splits);
    }
    
    public DefaultInputSplitAssigner(Collection<? extends InputSplit> splits) {
        this.splits.addAll(splits);
    }
    
    
    @Override
    public InputSplit getNextInputSplit(String host, int taskId) {
        InputSplit next = null;
        
        // keep the synchronized part short
        synchronized (this.splits) {
            if (this.splits.size() > 0) {
                next = this.splits.remove(this.splits.size() - 1);
            }
        }
        
        if (LOG.isDebugEnabled()) {
            if (next == null) {
                LOG.debug("No more input splits available");
            } else {
                LOG.debug("Assigning split " + next + " to " + host);
            }
        }
        return next;
    }
}
  • DefaultInputSplitAssigner is the default implementation of InputSplitAssigner. Its getNextInputSplit method uses synchronized to modify the splits value and remove the last element.

InputFormatSourceFunction

flink-streaming-java_2.11-1.8.0-sources.jar! /org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java

@Internal
public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
    private static final long serialVersionUID = 1L;

    private TypeInformation<OUT> typeInfo;
    private transient TypeSerializer<OUT> serializer;

    private InputFormat<OUT, InputSplit> format;

    private transient InputSplitProvider provider;
    private transient Iterator<InputSplit> splitIterator;

    private volatile boolean isRunning = true;

    @SuppressWarnings("unchecked")
    public InputFormatSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
        this.format = (InputFormat<OUT, InputSplit>) format;
        this.typeInfo = typeInfo;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void open(Configuration parameters) throws Exception {
        StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();

        if (format instanceof RichInputFormat) {
            ((RichInputFormat) format).setRuntimeContext(context);
        }
        format.configure(parameters);

        provider = context.getInputSplitProvider();
        serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
        splitIterator = getInputSplits();
        isRunning = splitIterator.hasNext();
    }

    @Override
    public void run(SourceContext<OUT> ctx) throws Exception {
        try {

            Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
            if (isRunning && format instanceof RichInputFormat) {
                ((RichInputFormat) format).openInputFormat();
            }

            OUT nextElement = serializer.createInstance();
            while (isRunning) {
                format.open(splitIterator.next());

                // for each element we also check if cancel
                // was called by checking the isRunning flag

                while (isRunning && !format.reachedEnd()) {
                    nextElement = format.nextRecord(nextElement);
                    if (nextElement != null) {
                        ctx.collect(nextElement);
                    } else {
                        break;
                    }
                }
                format.close();
                completedSplitsCounter.inc();

                if (isRunning) {
                    isRunning = splitIterator.hasNext();
                }
            }
        } finally {
            format.close();
            if (format instanceof RichInputFormat) {
                ((RichInputFormat) format).closeInputFormat();
            }
            isRunning = false;
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void close() throws Exception {
        format.close();
        if (format instanceof RichInputFormat) {
            ((RichInputFormat) format).closeInputFormat();
        }
    }

    /**
     * Returns the {@code InputFormat}. This is only needed because we need to set the input
     * split assigner on the {@code StreamGraph}.
     */
    public InputFormat<OUT, InputSplit> getFormat() {
        return format;
    }

    private Iterator<InputSplit> getInputSplits() {

        return new Iterator<InputSplit>() {

            private InputSplit nextSplit;

            private boolean exhausted;

            @Override
            public boolean hasNext() {
                if (exhausted) {
                    return false;
                }

                if (nextSplit != null) {
                    return true;
                }

                final InputSplit split;
                try {
                    split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader());
                } catch (InputSplitProviderException e) {
                    throw new RuntimeException("Could not retrieve next input split.", e);
                }

                if (split != null) {
                    this.nextSplit = split;
                    return true;
                } else {
                    exhausted = true;
                    return false;
                }
            }

            @Override
            public InputSplit next() {
                if (this.nextSplit == null && !hasNext()) {
                    throw new NoSuchElementException();
                }

                final InputSplit tmp = this.nextSplit;
                this.nextSplit = null;
                return tmp;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}
  • The hasNext () method of splitIterator of InputFormatSourceFunction uses provider. getnextInputSplit (getruntimecontext (). getusercodeclassloader ()) to get nextputplit, whose provider is RpcInputSplitProvider

InputSplitProvider

flink-runtime_2.11-1.8.0-sources.jar! /org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java

/**
 * An input split provider can be successively queried to provide a series of {@link InputSplit} objects a
 * task is supposed to consume in the course of its execution.
 */
@Public
public interface InputSplitProvider {

    /**
     * Requests the next input split to be consumed by the calling task.
     *
     * @param userCodeClassLoader used to deserialize input splits
     * @return the next input split to be consumed by the calling task or <code>null</code> if the
     *         task shall not consume any further input splits.
     * @throws InputSplitProviderException if fetching the next input split fails
     */
    InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException;
}
  • The inputSplitProvider interface defines the getNextInputSplit method, which is used to get the InputSplit it wants to process for each task call.

RpcInputSplitProvider

flink-runtime_2.11-1.8.0-sources.jar! /org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java

public class RpcInputSplitProvider implements InputSplitProvider {
    private final JobMasterGateway jobMasterGateway;
    private final JobVertexID jobVertexID;
    private final ExecutionAttemptID executionAttemptID;
    private final Time timeout;

    public RpcInputSplitProvider(
            JobMasterGateway jobMasterGateway,
            JobVertexID jobVertexID,
            ExecutionAttemptID executionAttemptID,
            Time timeout) {
        this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
        this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
        this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID);
        this.timeout = Preconditions.checkNotNull(timeout);
    }


    @Override
    public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
        Preconditions.checkNotNull(userCodeClassLoader);

        CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
            jobVertexID,
            executionAttemptID);

        try {
            SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());

            if (serializedInputSplit.isEmpty()) {
                return null;
            } else {
                return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader);
            }
        } catch (Exception e) {
            throw new InputSplitProviderException("Requesting the next input split failed.", e);
        }
    }
}
  • The getNextInputSplit method of RpcInputSplitProvider is mainly through jobMastergateway.requestnextInputSplit, like jobmaster requesting NextInputPlitt

JobMaster.requestNextInputSplit

flink-runtime_2.11-1.8.0-sources.jar! /org/apache/flink/runtime/jobmaster/JobMaster.java

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
    //......

    @Override
    public CompletableFuture<SerializedInputSplit> requestNextInputSplit(
            final JobVertexID vertexID,
            final ExecutionAttemptID executionAttempt) {

        final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
        if (execution == null) {
            // can happen when JobManager had already unregistered this execution upon on task failure,
            // but TaskManager get some delay to aware of that situation
            if (log.isDebugEnabled()) {
                log.debug("Can not find Execution for attempt {}.", executionAttempt);
            }
            // but we should TaskManager be aware of this
            return FutureUtils.completedExceptionally(new Exception("Can not find Execution for attempt " + executionAttempt));
        }

        final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
        if (vertex == null) {
            log.error("Cannot find execution vertex for vertex ID {}.", vertexID);
            return FutureUtils.completedExceptionally(new Exception("Cannot find execution vertex for vertex ID " + vertexID));
        }

        final InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
        if (splitAssigner == null) {
            log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
            return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex ID " + vertexID));
        }

        final LogicalSlot slot = execution.getAssignedResource();
        final int taskId = execution.getVertex().getParallelSubtaskIndex();
        final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
        final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);

        if (log.isDebugEnabled()) {
            log.debug("Send next input split {}.", nextInputSplit);
        }

        try {
            final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
            return CompletableFuture.completedFuture(new SerializedInputSplit(serializedInputSplit));
        } catch (Exception ex) {
            log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
            IOException reason = new IOException("Could not serialize the next input split of class " +
                    nextInputSplit.getClass() + ".", ex);
            vertex.fail(reason);
            return FutureUtils.completedExceptionally(reason);
        }
    }

    //......
}
  • JobMaster’s requestNextInputSplit method obtains nextInputSplit through SplitAssigner. GetNextInputPlit (Host, TasKid) and returns it to the requested RpcInputSplitProvider

Summary

  • The ParameterValuesProvider interface defines the getParameterValues method, which is used to return the parameters required for parallel table queries. This parameter is mainly used to divide a large sql query into several seg mented queries for parallel processing. It has two implementation classes: GenericParameterValuesProvider and numericbetweenparameterprovider.
  • GenericParameterValuesProvider did not actually do anything else, and the value returned by the getParameterValues method it implemented was required by the constructor. Numeric BETWEEN nParameterProvider is a range query based on the NUMERIC primary key (WHERE id BETWEEN ? AND ?) automatically generates segmented parameters, and its constructor requires input of each fetchSize, minimum minVal and maximum maxVal;; The getParameterValues method calculates numBatches based on these values, and then calculates the segmented parameter values
  • JDBCInputFormat inherits RichInputFormat and implements ResultTypeQueryable interface. The createInputSplits method creates a GenericInputSplit array based on parameterValues. If parameterValues is null, totalNumberOfPartitions created by default is 1; The getInputSplitAssigner method creates DefaultInputSplitAssigner; based on the InputSplit array; The getStatistics method returns the method parameter cachedStatistics; ; The openInputFormat method mainly obtains the database connection and prepares the statement; ; CloseInputFormat method mainly closes the statement and closes the database connection; The open method receives inputSplit, which mainly extracts query parameters from parameterValues according to inputSplit, sets them to statement, and then executes statement.executeQuery () to obtain resultSet; ; The nextRecord method mainly reads data by traversing the resultSet; The close method mainly closes resultSet
  • The InputSplit interface defines the getSplitNumber method to return the split number; of the current input; GenericInputSplit implements the InputSplit interface, and its getSplitNumber method returns partitionNumber; ; The inputSplit; Assigner interface defines the getNextInputSplit method. The two parameters received by the method are host and taskId respectively. The method is used to return the next InputSplit. DefaultInputSplitAssigner is the default implementation of InputSplitAssigner. Its getNextInputSplit method uses synch ronized to modify the splits value and remove the last element.
  • The hasNext () method of splitIterator of InputFormatSourceFunction uses provider. GetnextInputSplit (GetRunTimeContext (). GetUserCodeClassLoader ()) to get NextInputPlit, whose provider is RpcInputSplitProvider; ; The inputSplit; Provider interface defines the getNextInputSplit method, which is used to get the InputSplit it wants to process for each task call. The getNextInputSplit method of RpcInputSplitProvider is mainly through jobMaster Gateway. RequestnextInputSplit; , like Jobmaster requesting NextInputPlit; JobMaster’s requestNextInputSplit method obtains nextInputSplit through SplitAssigner. GetNextInputPlit (Host, TasKid) and returns it to the requested RpcInputSplitProvider

doc