Talk about JDBCAppendTableSink of flink

  flink

Order

This article mainly studies flink’s JDBCAppendTableSink

Example

JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
  .setDBUrl("jdbc:derby:memory:ebookshop")
  .setQuery("INSERT INTO books (id) VALUES (?)")
  .setParameterTypes(INT_TYPE_INFO)
  .build();

tableEnv.registerTableSink(
  "jdbcOutputTable",
  // specify table schema
  new String[]{"id"},
  new TypeInformation[]{Types.INT},
  sink);

Table table = ...
table.insertInto("jdbcOutputTable");
  • We use tableEnv.registerTableSink to register JDBCAppendTableSink, and then use table.insertInto to write data to the sink.

JDBCAppendTableSink

flink-jdbc_2.11-1.7.0-sources.jar! /org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java

public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {

    private final JDBCOutputFormat outputFormat;

    private String[] fieldNames;
    private TypeInformation[] fieldTypes;

    JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
        this.outputFormat = outputFormat;
    }

    public static JDBCAppendTableSinkBuilder builder() {
        return new JDBCAppendTableSinkBuilder();
    }

    @Override
    public void emitDataStream(DataStream<Row> dataStream) {
        dataStream
                .addSink(new JDBCSinkFunction(outputFormat))
                .name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
    }

    @Override
    public void emitDataSet(DataSet<Row> dataSet) {
        dataSet.output(outputFormat);
    }

    @Override
    public TypeInformation<Row> getOutputType() {
        return new RowTypeInfo(fieldTypes, fieldNames);
    }

    @Override
    public String[] getFieldNames() {
        return fieldNames;
    }

    @Override
    public TypeInformation<?>[] getFieldTypes() {
        return fieldTypes;
    }

    @Override
    public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
        int[] types = outputFormat.getTypesArray();

        String sinkSchema =
            String.join(", ", IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
        String tableSchema =
            String.join(", ", Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
        String msg = String.format("Schema of output table is incompatible with JDBCAppendTableSink schema. " +
            "Table schema: [%s], sink schema: [%s]", tableSchema, sinkSchema);

        Preconditions.checkArgument(fieldTypes.length == types.length, msg);
        for (int i = 0; i < types.length; ++i) {
            Preconditions.checkArgument(
                JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
                msg);
        }

        JDBCAppendTableSink copy;
        try {
            copy = new JDBCAppendTableSink(InstantiationUtil.clone(outputFormat));
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException(e);
        }

        copy.fieldNames = fieldNames;
        copy.fieldTypes = fieldTypes;
        return copy;
    }

    @VisibleForTesting
    JDBCOutputFormat getOutputFormat() {
        return outputFormat;
    }
}
  • JDBCAppendTableSink implements the emitDataStream method of AppendStreamTableSink interface and the emitDataSet method of BatchTableSink interface. Bo th the AppendStreamTableSink interface and the BatchTableSink interface inherit from the TableSink interface, which defines getOutputType, getFieldNames, getFieldTypes, and configure methods.
  • The emitDataStream method creates JDBCSinkFunction through JDBCOutputFormat and then outputs it to DataStream; EmitDataSet method uses JDBCOutputFormat output directly through dataSet’s output method.
  • JDBCAppAndTableSink provides a builder static method for creating JDBCAppendTableSinkBuilder, which can be used to build JDBCAppAndTableSink.

JDBCAppendTableSinkBuilder

flink-jdbc_2.11-1.7.0-sources.jar! /org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java

public class JDBCAppendTableSinkBuilder {
    private String username;
    private String password;
    private String driverName;
    private String dbURL;
    private String query;
    private int batchSize = DEFAULT_BATCH_INTERVAL;
    private int[] parameterTypes;

    /**
     * Specify the username of the JDBC connection.
     * @param username the username of the JDBC connection.
     */
    public JDBCAppendTableSinkBuilder setUsername(String username) {
        this.username = username;
        return this;
    }

    /**
     * Specify the password of the JDBC connection.
     * @param password the password of the JDBC connection.
     */
    public JDBCAppendTableSinkBuilder setPassword(String password) {
        this.password = password;
        return this;
    }

    /**
     * Specify the name of the JDBC driver that will be used.
     * @param drivername the name of the JDBC driver.
     */
    public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
        this.driverName = drivername;
        return this;
    }

    /**
     * Specify the URL of the JDBC database.
     * @param dbURL the URL of the database, whose format is specified by the
     *              corresponding JDBC driver.
     */
    public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
        this.dbURL = dbURL;
        return this;
    }

    /**
     * Specify the query that the sink will execute. Usually user can specify
     * INSERT, REPLACE or UPDATE to push the data to the database.
     * @param query The query to be executed by the sink.
     * @see org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.JDBCOutputFormatBuilder#setQuery(String)
     */
    public JDBCAppendTableSinkBuilder setQuery(String query) {
        this.query = query;
        return this;
    }

    /**
     * Specify the size of the batch. By default the sink will batch the query
     * to improve the performance
     * @param batchSize the size of batch
     */
    public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) {
        this.batchSize = batchSize;
        return this;
    }

    /**
     * Specify the type of the rows that the sink will be accepting.
     * @param types the type of each field
     */
    public JDBCAppendTableSinkBuilder setParameterTypes(TypeInformation<?>... types) {
        int[] ty = new int[types.length];
        for (int i = 0; i < types.length; ++i) {
            ty[i] = JDBCTypeUtil.typeInformationToSqlType(types[i]);
        }
        this.parameterTypes = ty;
        return this;
    }

    /**
     * Specify the type of the rows that the sink will be accepting.
     * @param types the type of each field defined by {@see java.sql.Types}.
     */
    public JDBCAppendTableSinkBuilder setParameterTypes(int... types) {
        this.parameterTypes = types;
        return this;
    }

    /**
     * Finalizes the configuration and checks validity.
     *
     * @return Configured JDBCOutputFormat
     */
    public JDBCAppendTableSink build() {
        Preconditions.checkNotNull(parameterTypes,
            "Types of the query parameters are not specified." +
            " Please specify types using the setParameterTypes() method.");

        JDBCOutputFormat format = JDBCOutputFormat.buildJDBCOutputFormat()
            .setUsername(username)
            .setPassword(password)
            .setDBUrl(dbURL)
            .setQuery(query)
            .setDrivername(driverName)
            .setBatchInterval(batchSize)
            .setSqlTypes(parameterTypes)
            .finish();

        return new JDBCAppendTableSink(format);
    }
}
  • JDBCAppendTableSinkBuilder provides setUsername, setPassword, setDrivername, setDBUrl, setQuery, setBatchSize, setParameterTypes methods to set the corresponding attributes for constructing JDBCOutputFormat. Finally, the build method uses JDBCOutputFormat to create JDBCAppendTableSink.

Summary

  • JDBCAppendTableSink implements the semantics of at-least-once when checkpoint is opened. If you want to implement the semantics of exactly-once, you need to use idempotent operations such as REPLACE or INSERT OVERWRITE. JDBCAppendTableSink implements the emitDataStream method of the AppendStre amTableSink interface and the emitDataSet method of the BatchTableSink interface.
  • The AppendStreamTableSink interface and BatchTableSink interface both inherit from the TableSink interface, which defines getOutputType, getFieldNames, getFieldTypes, and configure methods. The emitDataStream method creates JDBCSinkFunction through JDBCOutputFormat and then outputs it to DataStream; EmitDataSet method uses JDBCOutputFormat output directly through dataSet output method. JDBCAppAndTableSink provides a builder static method for creat ing JDBCAppendTableSinkBuilder, which can be used to build JDBCAppAndTableSink.
  • JDBCAppendTableSinkBuilder provides setUsername, setPassword, setDrivername, setDBUrl, setQuery, setBatchSize, setParameterTypes methods to set the corresponding attributes for constructing JDBCOutputFormat. Finally, the build method uses JDBCOutputFormat to create JDBCAppendTableSink.

doc