Talk about flink’s CsvTableSource

  flink

Order

This article mainly studies flink’s CsvTableSource

TableSource

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/sources/TableSource.scala

trait TableSource[T] {

  /** Returns the [[TypeInformation]] for the return type of the [[TableSource]].
    * The fields of the return type are mapped to the table schema based on their name.
    *
    * @return The type of the returned [[DataSet]] or [[DataStream]].
    */
  def getReturnType: TypeInformation[T]

  /**
    * Returns the schema of the produced table.
    *
    * @return The [[TableSchema]] of the produced table.
    */
  def getTableSchema: TableSchema

  /**
    * Describes the table source.
    *
    * @return A String explaining the [[TableSource]].
    */
  def explainSource(): String =
    TableConnectorUtil.generateRuntimeName(getClass, getTableSchema.getFieldNames)
}
  • TableSource defines three methods: getReturnType, getTableSchema, and explainSource.

BatchTableSource

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/sources/BatchTableSource.scala

trait BatchTableSource[T] extends TableSource[T] {

  /**
    * Returns the data of the table as a [[DataSet]].
    *
    * NOTE: This method is for internal use only for defining a [[TableSource]].
    *       Do not use it in Table API programs.
    */
  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}
  • BatchTableSource inherits TableSource, which defines the getDataSet method.

StreamTableSource

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/sources/StreamTableSource.scala

trait StreamTableSource[T] extends TableSource[T] {

  /**
    * Returns the data of the table as a [[DataStream]].
    *
    * NOTE: This method is for internal use only for defining a [[TableSource]].
    *       Do not use it in Table API programs.
    */
  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
  • StreamTableSource inherits TableSource, which defines the getDataStream method.

CsvTableSource

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/sources/CsvTableSource.scala

class CsvTableSource private (
    private val path: String,
    private val fieldNames: Array[String],
    private val fieldTypes: Array[TypeInformation[_]],
    private val selectedFields: Array[Int],
    private val fieldDelim: String,
    private val rowDelim: String,
    private val quoteCharacter: Character,
    private val ignoreFirstLine: Boolean,
    private val ignoreComments: String,
    private val lenient: Boolean)
  extends BatchTableSource[Row]
  with StreamTableSource[Row]
  with ProjectableTableSource[Row] {

  def this(
    path: String,
    fieldNames: Array[String],
    fieldTypes: Array[TypeInformation[_]],
    fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
    rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
    quoteCharacter: Character = null,
    ignoreFirstLine: Boolean = false,
    ignoreComments: String = null,
    lenient: Boolean = false) = {

    this(
      path,
      fieldNames,
      fieldTypes,
      fieldTypes.indices.toArray, // initially, all fields are returned
      fieldDelim,
      rowDelim,
      quoteCharacter,
      ignoreFirstLine,
      ignoreComments,
      lenient)

  }

  def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = {
    this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
      CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
  }

  if (fieldNames.length != fieldTypes.length) {
    throw new TableException("Number of field names and field types must be equal.")
  }

  private val selectedFieldTypes = selectedFields.map(fieldTypes(_))
  private val selectedFieldNames = selectedFields.map(fieldNames(_))

  private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames)

  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
    execEnv.createInput(createCsvInput(), returnType).name(explainSource())
  }

  /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
  override def getReturnType: RowTypeInfo = returnType

  override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
    streamExecEnv.createInput(createCsvInput(), returnType).name(explainSource())
  }

  /** Returns the schema of the produced table. */
  override def getTableSchema = new TableSchema(fieldNames, fieldTypes)

  /** Returns a copy of [[TableSource]] with ability to project fields */
  override def projectFields(fields: Array[Int]): CsvTableSource = {

    val selectedFields = if (fields.isEmpty) Array(0) else fields

    new CsvTableSource(
      path,
      fieldNames,
      fieldTypes,
      selectedFields,
      fieldDelim,
      rowDelim,
      quoteCharacter,
      ignoreFirstLine,
      ignoreComments,
      lenient)
  }

  private def createCsvInput(): RowCsvInputFormat = {
    val inputFormat = new RowCsvInputFormat(
      new Path(path),
      selectedFieldTypes,
      rowDelim,
      fieldDelim,
      selectedFields)

    inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
    inputFormat.setLenient(lenient)
    if (quoteCharacter != null) {
      inputFormat.enableQuotedStringParsing(quoteCharacter)
    }
    if (ignoreComments != null) {
      inputFormat.setCommentPrefix(ignoreComments)
    }

    inputFormat
  }

  override def equals(other: Any): Boolean = other match {
    case that: CsvTableSource => returnType == that.returnType &&
        path == that.path &&
        fieldDelim == that.fieldDelim &&
        rowDelim == that.rowDelim &&
        quoteCharacter == that.quoteCharacter &&
        ignoreFirstLine == that.ignoreFirstLine &&
        ignoreComments == that.ignoreComments &&
        lenient == that.lenient
    case _ => false
  }

  override def hashCode(): Int = {
    returnType.hashCode()
  }

  override def explainSource(): String = {
    s"CsvTableSource(" +
      s"read fields: ${getReturnType.getFieldNames.mkString(", ")})"
  }
}
  • CsvTableSource implements both BatchTableSource and StreamTableSource interfaces. The getDataSet method uses executionenvironment.createinput to create a DataSet; ; The getDataStream method creates a DataStream using streamexecutionenvironment.createinput.
  • Createinput and streamexecutionenvironment.createinput receives InputFormat as RowCsvInputFormat, which is created by createCsvInput.
  • The TableSchema returned by the getTableSchema method is created by fieldNames and fieldTypes; RowTypeInfo returned by getReturnType method is created by selectedFieldTypes and selectedFieldNames; The explainSource method returns here the string beginning with CsvTableSource

Summary

  • TableSource defines three methods: getReturnType, getTableSchema, explainSource; ; BatchTableSource inherits TableSource, which defines the getDataSet method; StreamTableSource inherits TableSource, which defines the getDataStream method.
  • CsvTableSource implements both BatchTableSource and StreamTableSource interfaces. The getDataSet method uses executionenvironment.createinput to create a DataSet; ; The getDataStream method creates a DataStream using streamexecutionenvironment.createinput.
  • Createinput and streamexecutionenvironment.createinput receive InputFormat as RowCsvInputFormat, which is created by createCsvInput. The TableSchema returned by the getTableSchema method is created by fieldNames and fieldTypes; RowTypeInfo returned by getReturnType method is created by selectedFieldTypes and selectedFieldNames; The explainSource method returns here the string beginning with CsvTableSource

doc