Talk about the scan operation of flink TableEnvironment.

  flink

Order

This article mainly studies the scan operation of flink TableEnvironment.

Example

//Scanning a directly registered table
val tab: Table = tableEnv.scan("tableName")

//Scanning a table from a registered catalog
val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
  • The scan operation is used to read the specified table from the schema, and can also pass in catalogName and dbName to read from the specified catalog and db.

TableEnvironment.scan

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/api/TableEnvironment.scala

abstract class TableEnvironment(val config: TableConfig) {

  private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false)
  private val rootSchema: SchemaPlus = internalSchema.plus()

  //......

  @throws[TableException]
  @varargs
  def scan(tablePath: String*): Table = {
    scanInternal(tablePath.toArray) match {
      case Some(table) => table
      case None => throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.")
    }
  }

  private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = {
    require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
    val schemaPaths = tablePath.slice(0, tablePath.length - 1)
    val schema = getSchema(schemaPaths)
    if (schema != null) {
      val tableName = tablePath(tablePath.length - 1)
      val table = schema.getTable(tableName)
      if (table != null) {
        return Some(new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory))))
      }
    }
    None
  }

  private def getSchema(schemaPath: Array[String]): SchemaPlus = {
    var schema = rootSchema
    for (schemaName <- schemaPath) {
      schema = schema.getSubSchema(schemaName)
      if (schema == null) {
        return schema
      }
    }
    schema
  }

  //......
}
  • The scan method calls scanInternal internally. scanInternal first reads catalog and db information, and then calls getSchema method to obtain schema.
  • GetSchema uses getSubSchema of schemapolus to obtain schemapolus hierarchically. if catalog and db are not specified, then rootSchema is returned here.
  • After getting the schema, you can get tablePath (The last element of the array), call the getTable method of SchemaPlus to find Table.

Summary

  • The scan operation of TableEnvironment is to find Table from Schema. tableName can be used, or catalog and db can be additionally specified to find table.
  • GetSchema uses getSubSchema of schemapolus to obtain schemapolus hierarchically. if catalog and db are not specified, then rootSchema is returned here.
  • After getting the schema, you can get tablePath (The last element of the array), call the getTable method of SchemaPlus to find Table.

doc