Talk about flink’s TableFactory.

  flink

Order

This article mainly studies flink’s TableFactory.

Example

class MySystemTableSourceFactory implements StreamTableSourceFactory<Row> {

  @Override
  public Map<String, String> requiredContext() {
    Map<String, String> context = new HashMap<>();
    context.put("update-mode", "append");
    context.put("connector.type", "my-system");
    return context;
  }

  @Override
  public List<String> supportedProperties() {
    List<String> list = new ArrayList<>();
    list.add("connector.debug");
    return list;
  }

  @Override
  public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
    boolean isDebug = Boolean.valueOf(properties.get("connector.debug"));

    # additional validation of the passed properties can also happen here

    return new MySystemAppendTableSource(isDebug);
  }
}

public class MySystemConnector extends ConnectorDescriptor {

  public final boolean isDebug;

  public MySystemConnector(boolean isDebug) {
    super("my-system", 1, false);
    this.isDebug = isDebug;
  }

  @Override
  protected Map<String, String> toConnectorProperties() {
    Map<String, String> properties = new HashMap<>();
    properties.put("connector.debug", Boolean.toString(isDebug));
    return properties;
  }
}
  • This example defines MySystemTableSourceFactory, its requiredContext is update-mode=append and connector.type=my-system, its supportedProperties are connector.debug, and its createStreamTableSource creates MySystemAppendTableSource; ; MySystemConnector inherits the ConnectorDescriptor, which defines the connector.type value as my-system, connector.property-version value as 1, formatNeeded property as false, and ConnectorPropertiesDefines the value of connector.debug

TableFactory

flink-table-common-1.7.1-sources.jar! /org/apache/flink/table/factories/TableFactory.java

@PublicEvolving
public interface TableFactory {

    Map<String, String> requiredContext();

    List<String> supportedProperties();
}
  • Tablefactory defines two methods: requiredContext and supportedProperties, where requiredContext is used for factory matching and supportedProperties is used to specify attributes supported by the factory. If attributes not s upported by the factory are passed in, an exception will be thrown.

BatchTableSourceFactory

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/factories/BatchTableSourceFactory.scala

trait BatchTableSourceFactory[T] extends TableFactory {

  def createBatchTableSource(properties: util.Map[String, String]): BatchTableSource[T]
}
  • BatchTableSourceFactory inherits TableFactory and defines createBatchTableSource method.

BatchTableSinkFactory

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/factories/BatchTableSinkFactory.scala

trait BatchTableSinkFactory[T] extends TableFactory {

  def createBatchTableSink(properties: util.Map[String, String]): BatchTableSink[T]
}
  • BatchTableSinkFactory inherits TableFactory and defines createBatchTableSink method.

StreamTableSourceFactory

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/factories/StreamTableSourceFactory.scala

trait StreamTableSourceFactory[T] extends TableFactory {

  def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[T]
}
  • StreamTableSourceFactory inherits TableFactory and defines createStreamTableSource method.

StreamTableSinkFactory

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/factories/StreamTableSinkFactory.scala

trait StreamTableSinkFactory[T] extends TableFactory {

  def createStreamTableSink(properties: util.Map[String, String]): StreamTableSink[T]
}
  • StreamTableSinkFactory inherits TableFactory and defines createStreamTableSink method.

ConnectorDescriptor

flink-table-common-1.7.1-sources.jar! /org/apache/flink/table/descriptors/ConnectorDescriptor.java

@PublicEvolving
public abstract class ConnectorDescriptor extends DescriptorBase implements Descriptor {

    private String type;

    private int version;

    private boolean formatNeeded;

    /**
     * Constructs a {@link ConnectorDescriptor}.
     *
     * @param type string that identifies this connector
     * @param version property version for backwards compatibility
     * @param formatNeeded flag for basic validation of a needed format descriptor
     */
    public ConnectorDescriptor(String type, int version, boolean formatNeeded) {
        this.type = type;
        this.version = version;
        this.formatNeeded = formatNeeded;
    }

    @Override
    public final Map<String, String> toProperties() {
        final DescriptorProperties properties = new DescriptorProperties();
        properties.putString(CONNECTOR_TYPE, type);
        properties.putLong(CONNECTOR_PROPERTY_VERSION, version);
        properties.putProperties(toConnectorProperties());
        return properties.asMap();
    }

    /**
     * Returns if this connector requires a format descriptor.
     */
    protected final boolean isFormatNeeded() {
        return formatNeeded;
    }

    /**
     * Converts this descriptor into a set of connector properties. Usually prefixed with
     * {@link FormatDescriptorValidator#FORMAT}.
     */
    protected abstract Map<String, String> toConnectorProperties();
}
  • ConnectorDescriptor inherits the DescriptorBase and implements the Descriptor interface. It rewrites the toProperties method of the Descriptor interfac e, defines the built-in properties CONNECTOR_TYPE and CONNECTOR_PROPERTY_VERSION, and then merges the properties defined by subclasses through the abstract method toConnectorProperties.

TableFactoryUtil

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/factories/TableFactoryUtil.scala

object TableFactoryUtil {

  /**
    * Returns a table source for a table environment.
    */
  def findAndCreateTableSource[T](
      tableEnvironment: TableEnvironment,
      descriptor: Descriptor)
    : TableSource[T] = {

    val javaMap = descriptor.toProperties

    tableEnvironment match {
      case _: BatchTableEnvironment =>
        TableFactoryService
          .find(classOf[BatchTableSourceFactory[T]], javaMap)
          .createBatchTableSource(javaMap)

      case _: StreamTableEnvironment =>
        TableFactoryService
          .find(classOf[StreamTableSourceFactory[T]], javaMap)
          .createStreamTableSource(javaMap)

      case e@_ =>
        throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
    }
  }

  /**
    * Returns a table sink for a table environment.
    */
  def findAndCreateTableSink[T](
      tableEnvironment: TableEnvironment,
      descriptor: Descriptor)
    : TableSink[T] = {

    val javaMap = descriptor.toProperties

    tableEnvironment match {
      case _: BatchTableEnvironment =>
        TableFactoryService
          .find(classOf[BatchTableSinkFactory[T]], javaMap)
          .createBatchTableSink(javaMap)

      case _: StreamTableEnvironment =>
        TableFactoryService
          .find(classOf[StreamTableSinkFactory[T]], javaMap)
          .createStreamTableSink(javaMap)

      case e@_ =>
        throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
    }
  }
}
  • TableFactoryUtil is a tool class, which is mainly used to create TableSource or TableSink; according to the specified TableEnvironment and Descriptor. It internally uses TableFactoryService and uses descriptor.toProperties to find the corresponding TableFactory.

TableFactoryService

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/factories/TableFactoryService.scala

object TableFactoryService extends Logging {

  private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])

  /**
    * Finds a table factory of the given class and descriptor.
    *
    * @param factoryClass desired factory class
    * @param descriptor descriptor describing the factory configuration
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    Preconditions.checkNotNull(descriptor)

    findInternal(factoryClass, descriptor.toProperties, None)
  }

  /**
    * Finds a table factory of the given class, descriptor, and classloader.
    *
    * @param factoryClass desired factory class
    * @param descriptor descriptor describing the factory configuration
    * @param classLoader classloader for service loading
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    Preconditions.checkNotNull(descriptor)
    Preconditions.checkNotNull(classLoader)

    findInternal(factoryClass, descriptor.toProperties, Some(classLoader))
  }

  /**
    * Finds a table factory of the given class and property map.
    *
    * @param factoryClass desired factory class
    * @param propertyMap properties that describe the factory configuration
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    findInternal(factoryClass, propertyMap, None)
  }

  /**
    * Finds a table factory of the given class, property map, and classloader.
    *
    * @param factoryClass desired factory class
    * @param propertyMap properties that describe the factory configuration
    * @param classLoader classloader for service loading
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](
      factoryClass: Class[T],
      propertyMap: JMap[String, String],
      classLoader: ClassLoader)
    : T = {
    Preconditions.checkNotNull(classLoader)

    findInternal(factoryClass, propertyMap, Some(classLoader))
  }

  /**
    * Finds a table factory of the given class, property map, and classloader.
    *
    * @param factoryClass desired factory class
    * @param propertyMap properties that describe the factory configuration
    * @param classLoader classloader for service loading
    * @tparam T factory class type
    * @return the matching factory
    */
  private def findInternal[T](
      factoryClass: Class[T],
      propertyMap: JMap[String, String],
      classLoader: Option[ClassLoader])
    : T = {

    Preconditions.checkNotNull(factoryClass)
    Preconditions.checkNotNull(propertyMap)

    val properties = propertyMap.asScala.toMap

    val foundFactories = discoverFactories(classLoader)

    val classFactories = filterByFactoryClass(
      factoryClass,
      properties,
      foundFactories)

    val contextFactories = filterByContext(
      factoryClass,
      properties,
      foundFactories,
      classFactories)

    filterBySupportedProperties(
      factoryClass,
      properties,
      foundFactories,
      contextFactories)
  }

  /**
    * Searches for factories using Java service providers.
    *
    * @return all factories in the classpath
    */
  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
    try {
      val iterator = classLoader match {
        case Some(customClassLoader) =>
          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
          customLoader.iterator()
        case None =>
          defaultLoader.iterator()
      }
      iterator.asScala.toSeq
    } catch {
      case e: ServiceConfigurationError =>
        LOG.error("Could not load service provider for table factories.", e)
        throw new TableException("Could not load service provider for table factories.", e)
    }
  }

  /**
    * Filters factories with matching context by factory class.
    */
  private def filterByFactoryClass[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory])
    : Seq[TableFactory] = {

    val classFactories = foundFactories.filter(f => factoryClass.isAssignableFrom(f.getClass))
    if (classFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        s"No factory implements '${factoryClass.getCanonicalName}'.",
        factoryClass,
        foundFactories,
        properties)
    }
    classFactories
  }

  /**
    * Filters for factories with matching context.
    *
    * @return all matching factories
    */
  private def filterByContext[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory],
      classFactories: Seq[TableFactory])
    : Seq[TableFactory] = {

    val matchingFactories = classFactories.filter { factory =>
      val requestedContext = normalizeContext(factory)

      val plainContext = mutable.Map[String, String]()
      plainContext ++= requestedContext
      // we remove the version for now until we have the first backwards compatibility case
      // with the version we can provide mappings in case the format changes
      plainContext.remove(CONNECTOR_PROPERTY_VERSION)
      plainContext.remove(FORMAT_PROPERTY_VERSION)
      plainContext.remove(METADATA_PROPERTY_VERSION)
      plainContext.remove(STATISTICS_PROPERTY_VERSION)

      // check if required context is met
      plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)
    }

    if (matchingFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        "No context matches.",
        factoryClass,
        foundFactories,
        properties)
    }

    matchingFactories
  }

  /**
    * Prepares the properties of a context to be used for match operations.
    */
  private def normalizeContext(factory: TableFactory): Map[String, String] = {
    val requiredContextJava = factory.requiredContext()
    if (requiredContextJava == null) {
      throw new TableException(
        s"Required context of factory '${factory.getClass.getName}' must not be null.")
    }
    requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap
  }

  /**
    * Filters the matching class factories by supported properties.
    */
  private def filterBySupportedProperties[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory],
      classFactories: Seq[TableFactory])
    : T = {

    val plainGivenKeys = mutable.ArrayBuffer[String]()
    properties.keys.foreach { k =>
      // replace arrays with wildcard
      val key = k.replaceAll(".\\d+", ".#")
      // ignore duplicates
      if (!plainGivenKeys.contains(key)) {
        plainGivenKeys += key
      }
    }
    var lastKey: Option[String] = None
    val supportedFactories = classFactories.filter { factory =>
      val requiredContextKeys = normalizeContext(factory).keySet
      val (supportedKeys, wildcards) = normalizeSupportedProperties(factory)
      // ignore context keys
      val givenContextFreeKeys = plainGivenKeys.filter(!requiredContextKeys.contains(_))
      // perform factory specific filtering of keys
      val givenFilteredKeys = filterSupportedPropertiesFactorySpecific(
        factory,
        givenContextFreeKeys)

      givenFilteredKeys.forall { k =>
        lastKey = Option(k)
        supportedKeys.contains(k) || wildcards.exists(k.startsWith)
      }
    }

    if (supportedFactories.isEmpty && classFactories.length == 1 && lastKey.isDefined) {
      // special case: when there is only one matching factory but the last property key
      // was incorrect
      val factory = classFactories.head
      val (supportedKeys, _) = normalizeSupportedProperties(factory)
      throw new NoMatchingTableFactoryException(
        s"""
          |The matching factory '${factory.getClass.getName}' doesn't support '${lastKey.get}'.
          |
          |Supported properties of this factory are:
          |${supportedKeys.sorted.mkString("\n")}""".stripMargin,
        factoryClass,
        foundFactories,
        properties)
    } else if (supportedFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        s"No factory supports all properties.",
        factoryClass,
        foundFactories,
        properties)
    } else if (supportedFactories.length > 1) {
      throw new AmbiguousTableFactoryException(
        supportedFactories,
        factoryClass,
        foundFactories,
        properties)
    }

    supportedFactories.head.asInstanceOf[T]
  }

  /**
    * Prepares the supported properties of a factory to be used for match operations.
    */
  private def normalizeSupportedProperties(factory: TableFactory): (Seq[String], Seq[String]) = {
    val supportedPropertiesJava = factory.supportedProperties()
    if (supportedPropertiesJava == null) {
      throw new TableException(
        s"Supported properties of factory '${factory.getClass.getName}' must not be null.")
    }
    val supportedKeys = supportedPropertiesJava.asScala.map(_.toLowerCase)

    // extract wildcard prefixes
    val wildcards = extractWildcardPrefixes(supportedKeys)

    (supportedKeys, wildcards)
  }

  /**
    * Converts the prefix of properties with wildcards (e.g., "format.*").
    */
  private def extractWildcardPrefixes(propertyKeys: Seq[String]): Seq[String] = {
    propertyKeys
      .filter(_.endsWith("*"))
      .map(s => s.substring(0, s.length - 1))
  }

  /**
    * Performs filtering for special cases (i.e. table format factories with schema derivation).
    */
  private def filterSupportedPropertiesFactorySpecific(
      factory: TableFactory,
      keys: Seq[String])
    : Seq[String] = factory match {

    case formatFactory: TableFormatFactory[_] =>
      val includeSchema = formatFactory.supportsSchemaDerivation()
      // ignore non-format (or schema) keys
      keys.filter { k =>
        if (includeSchema) {
          k.startsWith(SchemaValidator.SCHEMA + ".") ||
            k.startsWith(FormatDescriptorValidator.FORMAT + ".")
        } else {
          k.startsWith(FormatDescriptorValidator.FORMAT + ".")
        }
      }

    case _ =>
      keys
  }
}
  • TableFactoryService is mainly used according to factoryClass and Descriptor (Or descriptor.toProperties) to find the matching TableFactory, the main matching logic lies in the following methods: filterByFactoryClass, filterByContext, filterBySupportedProperties; The filterByFactoryClass method finds classFactories; according to the specified factoryClass; The filterByContext method further filters classFactories according to descriptor.toProperties to obtain contextFactories;; The filterBysupportedProperties method further filters contextFactories based on SupportedProperties.

Summary

  • Tablefactory defines two methods: requiredContext and supportedProperties, where requiredContext is used for factory matching and supportedProperties is used to specify attributes supported by the factory. If attributes not s upported by the factory are passed in, an exception will be thrown.
  • BatchTableSourceFactory inherits TableFactory and defines createBatchTableSource method. BatchTableSinkFactory inherits TableFactory and defines creat eBatchTableSink method. StreamTableSourceFactory inherits TableFactory and defines createStreamTableSource method. StreamTableSinkFactory inherits Ta bleFactory and defines createStreamTableSink method.
  • ConnectorDescriptor inherits the DescriptorBase and implements the Descriptor interface. It rewrites the toProperties method of the Descriptor interfac e, defines the built-in properties CONNECTOR_TYPE and CONNECTOR_PROPERTY_VERSION, and then merges the properties defined by subclasses through the abstract method toConnectorProperties.
  • TableFactoryUtil is a tool class, which is mainly used to create TableSource or TableSink; according to the specified TableEnvironment and Descriptor. It internally uses TableFactoryService and uses descriptor.toProperties to find the corresponding TableFactory.
  • TableFactoryService is mainly used according to factoryClass and Descriptor (Or descriptor.toProperties) to find the matching TableFactory, the main matching logic lies in the following methods: filterByFactoryClass, filterByContext, filterBySupportedProperties; The filterByFactoryClass method finds classFactories; according to the specified factoryClass; The filterByContext method further filters classFactories according to descriptor.toProperties to obtain contextFactories;; The filterBysupportedProperties method further filters contextFactories based on SupportedProperties.

doc