Talk about JDBCOutputFormat of flink

  flink

Order

This article mainly studies flink’s JDBCOutputFormat

JDBCOutputFormat

flink-jdbc_2.11-1.7.0-sources.jar! /org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java

/**
 * OutputFormat to write Rows into a JDBC database.
 * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
 *
 * @see Row
 * @see DriverManager
 */
public class JDBCOutputFormat extends RichOutputFormat<Row> {
    private static final long serialVersionUID = 1L;
    static final int DEFAULT_BATCH_INTERVAL = 5000;

    private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);

    private String username;
    private String password;
    private String drivername;
    private String dbURL;
    private String query;
    private int batchInterval = DEFAULT_BATCH_INTERVAL;

    private Connection dbConn;
    private PreparedStatement upload;

    private int batchCount = 0;

    private int[] typesArray;

    public JDBCOutputFormat() {
    }

    @Override
    public void configure(Configuration parameters) {
    }

    /**
     * Connects to the target database and initializes the prepared statement.
     *
     * @param taskNumber The number of the parallel instance.
     * @throws IOException Thrown, if the output could not be opened due to an
     * I/O problem.
     */
    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        try {
            establishConnection();
            upload = dbConn.prepareStatement(query);
        } catch (SQLException sqe) {
            throw new IllegalArgumentException("open() failed.", sqe);
        } catch (ClassNotFoundException cnfe) {
            throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
        }
    }

    private void establishConnection() throws SQLException, ClassNotFoundException {
        Class.forName(drivername);
        if (username == null) {
            dbConn = DriverManager.getConnection(dbURL);
        } else {
            dbConn = DriverManager.getConnection(dbURL, username, password);
        }
    }

    /**
     * Adds a record to the prepared statement.
     *
     * <p>When this method is called, the output format is guaranteed to be opened.
     *
     * <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
     * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
     *
     * @param row The records to add to the output.
     * @see PreparedStatement
     * @throws IOException Thrown, if the records could not be added due to an I/O problem.
     */
    @Override
    public void writeRecord(Row row) throws IOException {

        if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
            LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
        }
        try {

            if (typesArray == null) {
                // no types provided
                for (int index = 0; index < row.getArity(); index++) {
                    LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.", index + 1, row.getField(index));
                    upload.setObject(index + 1, row.getField(index));
                }
            } else {
                // types provided
                for (int index = 0; index < row.getArity(); index++) {

                    if (row.getField(index) == null) {
                        upload.setNull(index + 1, typesArray[index]);
                    } else {
                        // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
                        switch (typesArray[index]) {
                            case java.sql.Types.NULL:
                                upload.setNull(index + 1, typesArray[index]);
                                break;
                            case java.sql.Types.BOOLEAN:
                            case java.sql.Types.BIT:
                                upload.setBoolean(index + 1, (boolean) row.getField(index));
                                break;
                            case java.sql.Types.CHAR:
                            case java.sql.Types.NCHAR:
                            case java.sql.Types.VARCHAR:
                            case java.sql.Types.LONGVARCHAR:
                            case java.sql.Types.LONGNVARCHAR:
                                upload.setString(index + 1, (String) row.getField(index));
                                break;
                            case java.sql.Types.TINYINT:
                                upload.setByte(index + 1, (byte) row.getField(index));
                                break;
                            case java.sql.Types.SMALLINT:
                                upload.setShort(index + 1, (short) row.getField(index));
                                break;
                            case java.sql.Types.INTEGER:
                                upload.setInt(index + 1, (int) row.getField(index));
                                break;
                            case java.sql.Types.BIGINT:
                                upload.setLong(index + 1, (long) row.getField(index));
                                break;
                            case java.sql.Types.REAL:
                                upload.setFloat(index + 1, (float) row.getField(index));
                                break;
                            case java.sql.Types.FLOAT:
                            case java.sql.Types.DOUBLE:
                                upload.setDouble(index + 1, (double) row.getField(index));
                                break;
                            case java.sql.Types.DECIMAL:
                            case java.sql.Types.NUMERIC:
                                upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index));
                                break;
                            case java.sql.Types.DATE:
                                upload.setDate(index + 1, (java.sql.Date) row.getField(index));
                                break;
                            case java.sql.Types.TIME:
                                upload.setTime(index + 1, (java.sql.Time) row.getField(index));
                                break;
                            case java.sql.Types.TIMESTAMP:
                                upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index));
                                break;
                            case java.sql.Types.BINARY:
                            case java.sql.Types.VARBINARY:
                            case java.sql.Types.LONGVARBINARY:
                                upload.setBytes(index + 1, (byte[]) row.getField(index));
                                break;
                            default:
                                upload.setObject(index + 1, row.getField(index));
                                LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.",
                                    typesArray[index], index + 1, row.getField(index));
                                // case java.sql.Types.SQLXML
                                // case java.sql.Types.ARRAY:
                                // case java.sql.Types.JAVA_OBJECT:
                                // case java.sql.Types.BLOB:
                                // case java.sql.Types.CLOB:
                                // case java.sql.Types.NCLOB:
                                // case java.sql.Types.DATALINK:
                                // case java.sql.Types.DISTINCT:
                                // case java.sql.Types.OTHER:
                                // case java.sql.Types.REF:
                                // case java.sql.Types.ROWID:
                                // case java.sql.Types.STRUC
                        }
                    }
                }
            }
            upload.addBatch();
            batchCount++;
        } catch (SQLException e) {
            throw new RuntimeException("Preparation of JDBC statement failed.", e);
        }

        if (batchCount >= batchInterval) {
            // execute batch
            flush();
        }
    }

    void flush() {
        try {
            upload.executeBatch();
            batchCount = 0;
        } catch (SQLException e) {
            throw new RuntimeException("Execution of JDBC statement failed.", e);
        }
    }

    int[] getTypesArray() {
        return typesArray;
    }

    /**
     * Executes prepared statement and closes all resources of this instance.
     *
     * @throws IOException Thrown, if the input could not be closed properly.
     */
    @Override
    public void close() throws IOException {
        if (upload != null) {
            flush();
            // close the connection
            try {
                upload.close();
            } catch (SQLException e) {
                LOG.info("JDBC statement could not be closed: " + e.getMessage());
            } finally {
                upload = null;
            }
        }

        if (dbConn != null) {
            try {
                dbConn.close();
            } catch (SQLException se) {
                LOG.info("JDBC connection could not be closed: " + se.getMessage());
            } finally {
                dbConn = null;
            }
        }
    }

    public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
        return new JDBCOutputFormatBuilder();
    }

    //......
}
  • JDBCOutputFormat inherits RichOutputFormat, where the generic is org.apache.flink.types.Row
  • Open calls establishConnection to load the driver, initializes dbConn, and then calls dbConn.prepareStatement(query QUERY) to get the upload (PreparedStatement)
  • The writeRecord method first determines whether typesArray is provided. If not, setObject is used to set the value. If not, conversion is performed according to the corresponding type. Various types in java.sql.Types are supported here
  • Addbatch operation, when batchCount is greater than or equal to batchInterval (Default 5000Executebatch method and then resets batchCount;; In order to prevent the data from failing to submit without reaching batchInterval, flush will be executed again when close, and then PreparedStatement and Connection will be closed.
  • Jdboutputformat provides a JDBCOutputFormatBuilder that can be used to facilitate the construction of jdboutputformat.

Row

flink-core-1.7.0-sources.jar! /org/apache/flink/types/Row.java

/**
 * A Row can have arbitrary number of fields and contain a set of fields, which may all be
 * different types. The fields in Row can be null. Due to Row is not strongly typed, Flink's
 * type extraction mechanism can't extract correct field types. So that users should manually
 * tell Flink the type information via creating a {@link RowTypeInfo}.
 *
 * <p>
 * The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can
 * set fields by {@link #setField(int, Object)}.
 * <p>
 * Row is in principle serializable. However, it may contain non-serializable fields,
 * in which case serialization will fail.
 *
 */
@PublicEvolving
public class Row implements Serializable{

    private static final long serialVersionUID = 1L;

    /** The array to store actual values. */
    private final Object[] fields;

    /**
     * Create a new Row instance.
     * @param arity The number of fields in the Row
     */
    public Row(int arity) {
        this.fields = new Object[arity];
    }

    /**
     * Get the number of fields in the Row.
     * @return The number of fields in the Row.
     */
    public int getArity() {
        return fields.length;
    }

    /**
     * Gets the field at the specified position.
     * @param pos The position of the field, 0-based.
     * @return The field at the specified position.
     * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
     */
    public Object getField(int pos) {
        return fields[pos];
    }

    /**
     * Sets the field at the specified position.
     *
     * @param pos The position of the field, 0-based.
     * @param value The value to be assigned to the field at the specified position.
     * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
     */
    public void setField(int pos, Object value) {
        fields[pos] = value;
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < fields.length; i++) {
            if (i > 0) {
                sb.append(",");
            }
            sb.append(StringUtils.arrayAwareToString(fields[i]));
        }
        return sb.toString();
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }

        Row row = (Row) o;

        return Arrays.deepEquals(fields, row.fields);
    }

    @Override
    public int hashCode() {
        return Arrays.deepHashCode(fields);
    }

    /**
     * Creates a new Row and assigns the given values to the Row's fields.
     * This is more convenient than using the constructor.
     *
     * <p>For example:
     *
     * <pre>
     *     Row.of("hello", true, 1L);}
     * </pre>
     * instead of
     * <pre>
     *     Row row = new Row(3);
     *     row.setField(0, "hello");
     *     row.setField(1, true);
     *     row.setField(2, 1L);
     * </pre>
     *
     */
    public static Row of(Object... values) {
        Row row = new Row(values.length);
        for (int i = 0; i < values.length; i++) {
            row.setField(i, values[i]);
        }
        return row;
    }

    /**
     * Creates a new Row which copied from another row.
     * This method does not perform a deep copy.
     *
     * @param row The row being copied.
     * @return The cloned new Row
     */
    public static Row copy(Row row) {
        final Row newRow = new Row(row.fields.length);
        System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length);
        return newRow;
    }

    /**
     * Creates a new Row with projected fields from another row.
     * This method does not perform a deep copy.
     *
     * @param fields fields to be projected
     * @return the new projected Row
     */
    public static Row project(Row row, int[] fields) {
        final Row newRow = new Row(fields.length);
        for (int i = 0; i < fields.length; i++) {
            newRow.fields[i] = row.fields[fields[i]];
        }
        return newRow;
    }
}
  • Row is the type of writeRecord of JDBCOutputFormat. It uses Object data to access field values, and also provides static methods such as OF, copy, project, etc.

JDBCOutputFormatBuilder

flink-jdbc_2.11-1.7.0-sources.jar! /org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java

    /**
     * Builder for a {@link JDBCOutputFormat}.
     */
    public static class JDBCOutputFormatBuilder {
        private final JDBCOutputFormat format;

        protected JDBCOutputFormatBuilder() {
            this.format = new JDBCOutputFormat();
        }

        public JDBCOutputFormatBuilder setUsername(String username) {
            format.username = username;
            return this;
        }

        public JDBCOutputFormatBuilder setPassword(String password) {
            format.password = password;
            return this;
        }

        public JDBCOutputFormatBuilder setDrivername(String drivername) {
            format.drivername = drivername;
            return this;
        }

        public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
            format.dbURL = dbURL;
            return this;
        }

        public JDBCOutputFormatBuilder setQuery(String query) {
            format.query = query;
            return this;
        }

        public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
            format.batchInterval = batchInterval;
            return this;
        }

        public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
            format.typesArray = typesArray;
            return this;
        }

        /**
         * Finalizes the configuration and checks validity.
         *
         * @return Configured JDBCOutputFormat
         */
        public JDBCOutputFormat finish() {
            if (format.username == null) {
                LOG.info("Username was not supplied.");
            }
            if (format.password == null) {
                LOG.info("Password was not supplied.");
            }
            if (format.dbURL == null) {
                throw new IllegalArgumentException("No database URL supplied.");
            }
            if (format.query == null) {
                throw new IllegalArgumentException("No query supplied.");
            }
            if (format.drivername == null) {
                throw new IllegalArgumentException("No driver supplied.");
            }

            return format;
        }
    }
  • JDBCOutputFormatBuilder provides builder methods for username, password, dbURL, query, drivername, batchInterval, typesArray

JDBCAppendTableSink

flink-jdbc_2.11-1.7.0-sources.jar! /org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java

/**
 * An at-least-once Table sink for JDBC.
 *
 * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if
 * checkpointing is enabled). However, one common use case is to run idempotent queries
 * (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>) to upsert into the database and
 * achieve exactly-once semantic.</p>
 */
public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {

    private final JDBCOutputFormat outputFormat;

    private String[] fieldNames;
    private TypeInformation[] fieldTypes;

    JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
        this.outputFormat = outputFormat;
    }

    public static JDBCAppendTableSinkBuilder builder() {
        return new JDBCAppendTableSinkBuilder();
    }

    @Override
    public void emitDataStream(DataStream<Row> dataStream) {
        dataStream
                .addSink(new JDBCSinkFunction(outputFormat))
                .name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
    }

    @Override
    public void emitDataSet(DataSet<Row> dataSet) {
        dataSet.output(outputFormat);
    }

    @Override
    public TypeInformation<Row> getOutputType() {
        return new RowTypeInfo(fieldTypes, fieldNames);
    }

    @Override
    public String[] getFieldNames() {
        return fieldNames;
    }

    @Override
    public TypeInformation<?>[] getFieldTypes() {
        return fieldTypes;
    }

    @Override
    public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
        int[] types = outputFormat.getTypesArray();

        String sinkSchema =
            String.join(", ", IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
        String tableSchema =
            String.join(", ", Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
        String msg = String.format("Schema of output table is incompatible with JDBCAppendTableSink schema. " +
            "Table schema: [%s], sink schema: [%s]", tableSchema, sinkSchema);

        Preconditions.checkArgument(fieldTypes.length == types.length, msg);
        for (int i = 0; i < types.length; ++i) {
            Preconditions.checkArgument(
                JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
                msg);
        }

        JDBCAppendTableSink copy;
        try {
            copy = new JDBCAppendTableSink(InstantiationUtil.clone(outputFormat));
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException(e);
        }

        copy.fieldNames = fieldNames;
        copy.fieldTypes = fieldTypes;
        return copy;
    }

    @VisibleForTesting
    JDBCOutputFormat getOutputFormat() {
        return outputFormat;
    }
}
  • JDBCOutputFormat is used in JDBCAppendTableSink, which implements AppendStreamTableSink and BatchTableSink interfaces.
  • Its emitdataStream method sets JDBCSinkFunction’s sink (JDBCSinkFunction); The emitDataSet method sets output on the dataSet.
  • TableSink (BatchTableSink Declaration Implements TableSinkGetOutputType, getFieldNames, getFieldTypes, configure methods; The configure method here is mainly based on JDBCOutputFormat to create JDBCAppendTableSink

JDBCSinkFunction

flink-jdbc_2.11-1.7.0-sources.jar! /org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java

class JDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
    final JDBCOutputFormat outputFormat;

    JDBCSinkFunction(JDBCOutputFormat outputFormat) {
        this.outputFormat = outputFormat;
    }

    @Override
    public void invoke(Row value) throws Exception {
        outputFormat.writeRecord(value);
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        outputFormat.flush();
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        RuntimeContext ctx = getRuntimeContext();
        outputFormat.setRuntimeContext(ctx);
        outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
    }

    @Override
    public void close() throws Exception {
        outputFormat.close();
        super.close();
    }
}
  • JDBCSinkFunction inherits RichSinkFunction and also implements CheckpointedFunction interface. The invoke method uses the JDBCOutputFormat.writeRecord method, while the snapshotState calls JDBCOutputFormat.flush to submit the record in time

Summary

  • JDBCOutputFormat inherits RichOutputFormat. When open, it calls establishConnection to load the driver, initializes dbConn, and then calls dbConn.prepareStatement(query QUERY) to get upload (PreparedStatement); Addbatch operation, when batchCount is greater than or equal to batchInterval (Default 5000Executebatch method and then resets batchCount;; In order to prevent the data from failing to submit without reaching batchInterval, flush will be executed again when close, and then PreparedStatement and Connection will be closed.
  • Row is the type of writeRecord in JDBCOutputFormat, which uses Object data to access field values.
  • JDBCOutputFormatBuilder provides builder methods for username, password, dbURL, query, drivername, batchInterval, typesArray
  • JDBCOutputFormat is used in JDBCAppendTablesink. Its emitdataStream method sets the Sink (JDBCSinkFunction); The emitDataSet method sets output on the dataSet.
  • JDBCSinkFunction inherits RichSinkFunction and also implements CheckpointedFunction interface. The invoke method uses the JDBCOutputFormat.writeRecord method, while the snapshotState calls JDBCOutputFormat.flush to submit the record in time

doc