Talk about flink JobManager’s High Availability

  flink

Order

This article mainly studies flink JobManager’s High Availability.

Configuration

flink-conf.yaml

high-availability: zookeeper
high-availability.zookeeper.quorum: zookeeper:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
high-availability.storageDir: file:///share
  • The optional value for high-availability is NONE or zookeeper; ; High-availability.zookeeper.quorum is used to specify the peers; of zookeeper; High-availability.zookeeper.path.root is used to specify the root node path in zookeeper; Cluster-id is used to specify the node name of the current cluster, which is located below root node; Storage dir is used to specify the storage path of JobManager metadata.

Masters file

localhost:8081
localhost:8082
  • The masters file is used to specify the jobmanager address.

HighAvailabilityMode

flink-runtime_2.11-1.7.1-sources.jar! /org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java

public enum HighAvailabilityMode {
    NONE(false),
    ZOOKEEPER(true),
    FACTORY_CLASS(true);

    private final boolean haActive;

    HighAvailabilityMode(boolean haActive) {
        this.haActive = haActive;
    }

    /**
     * Return the configured {@link HighAvailabilityMode}.
     *
     * @param config The config to parse
     * @return Configured recovery mode or {@link HighAvailabilityMode#NONE} if not
     * configured.
     */
    public static HighAvailabilityMode fromConfig(Configuration config) {
        String haMode = config.getValue(HighAvailabilityOptions.HA_MODE);

        if (haMode == null) {
            return HighAvailabilityMode.NONE;
        } else if (haMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
            // Map old default to new default
            return HighAvailabilityMode.NONE;
        } else {
            try {
                return HighAvailabilityMode.valueOf(haMode.toUpperCase());
            } catch (IllegalArgumentException e) {
                return FACTORY_CLASS;
            }
        }
    }

    /**
     * Returns true if the defined recovery mode supports high availability.
     *
     * @param configuration Configuration which contains the recovery mode
     * @return true if high availability is supported by the recovery mode, otherwise false
     */
    public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
        HighAvailabilityMode mode = fromConfig(configuration);
        return mode.haActive;
    }
}
  • HighAvailabilityMode has three enumerations: NONE, ZOOKEEPER, FACTORY_CLASS; ; These enumerations have an attribute haActive, which is used to indicate whether HighAvailability is supported.

HighAvailabilityOptions

flink-core-1.7.1-sources.jar! /org/apache/flink/configuration/HighAvailabilityOptions.java

@PublicEvolving
@ConfigGroups(groups = {
    @ConfigGroup(name = "HighAvailabilityZookeeper", keyPrefix = "high-availability.zookeeper")
})
public class HighAvailabilityOptions {

    // ------------------------------------------------------------------------
    //  Required High Availability Options
    // ------------------------------------------------------------------------

    /**
     * Defines high-availability mode used for the cluster execution.
     * A value of "NONE" signals no highly available setup.
     * To enable high-availability, set this mode to "ZOOKEEPER".
     * Can also be set to FQN of HighAvailability factory class.
     */
    @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
    public static final ConfigOption<String> HA_MODE =
            key("high-availability")
            .defaultValue("NONE")
            .withDeprecatedKeys("recovery.mode")
            .withDescription("Defines high-availability mode used for the cluster execution." +
                " To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class.");

    /**
     * The ID of the Flink cluster, used to separate multiple Flink clusters
     * Needs to be set for standalone clusters, is automatically inferred in YARN and Mesos.
     */
    public static final ConfigOption<String> HA_CLUSTER_ID =
            key("high-availability.cluster-id")
            .defaultValue("/default")
            .withDeprecatedKeys("high-availability.zookeeper.path.namespace", "recovery.zookeeper.path.namespace")
            .withDescription("The ID of the Flink cluster, used to separate multiple Flink clusters from each other." +
                " Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos.");

    /**
     * File system path (URI) where Flink persists metadata in high-availability setups.
     */
    @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
    public static final ConfigOption<String> HA_STORAGE_PATH =
            key("high-availability.storageDir")
            .noDefaultValue()
            .withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir")
            .withDescription("File system path (URI) where Flink persists metadata in high-availability setups.");

    // ------------------------------------------------------------------------
    //  Recovery Options
    // ------------------------------------------------------------------------

    /**
     * Optional port (range) used by the job manager in high-availability mode.
     */
    public static final ConfigOption<String> HA_JOB_MANAGER_PORT_RANGE =
            key("high-availability.jobmanager.port")
            .defaultValue("0")
            .withDeprecatedKeys("recovery.jobmanager.port")
            .withDescription("Optional port (range) used by the job manager in high-availability mode.");

    /**
     * The time before a JobManager after a fail over recovers the current jobs.
     */
    public static final ConfigOption<String> HA_JOB_DELAY =
            key("high-availability.job.delay")
            .noDefaultValue()
            .withDeprecatedKeys("recovery.job.delay")
            .withDescription("The time before a JobManager after a fail over recovers the current jobs.");

    // ------------------------------------------------------------------------
    //  ZooKeeper Options
    // ------------------------------------------------------------------------

    /**
     * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
     */
    public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
            key("high-availability.zookeeper.quorum")
            .noDefaultValue()
            .withDeprecatedKeys("recovery.zookeeper.quorum")
            .withDescription("The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.");

    /**
     * The root path under which Flink stores its entries in ZooKeeper.
     */
    public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =
            key("high-availability.zookeeper.path.root")
            .defaultValue("/flink")
            .withDeprecatedKeys("recovery.zookeeper.path.root")
            .withDescription("The root path under which Flink stores its entries in ZooKeeper.");

    public static final ConfigOption<String> HA_ZOOKEEPER_LATCH_PATH =
            key("high-availability.zookeeper.path.latch")
            .defaultValue("/leaderlatch")
            .withDeprecatedKeys("recovery.zookeeper.path.latch")
            .withDescription("Defines the znode of the leader latch which is used to elect the leader.");

    /** ZooKeeper root path (ZNode) for job graphs. */
    public static final ConfigOption<String> HA_ZOOKEEPER_JOBGRAPHS_PATH =
            key("high-availability.zookeeper.path.jobgraphs")
            .defaultValue("/jobgraphs")
            .withDeprecatedKeys("recovery.zookeeper.path.jobgraphs")
            .withDescription("ZooKeeper root path (ZNode) for job graphs");

    public static final ConfigOption<String> HA_ZOOKEEPER_LEADER_PATH =
            key("high-availability.zookeeper.path.leader")
            .defaultValue("/leader")
            .withDeprecatedKeys("recovery.zookeeper.path.leader")
            .withDescription("Defines the znode of the leader which contains the URL to the leader and the current" +
                " leader session ID.");

    /** ZooKeeper root path (ZNode) for completed checkpoints. */
    public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINTS_PATH =
            key("high-availability.zookeeper.path.checkpoints")
            .defaultValue("/checkpoints")
            .withDeprecatedKeys("recovery.zookeeper.path.checkpoints")
            .withDescription("ZooKeeper root path (ZNode) for completed checkpoints.");

    /** ZooKeeper root path (ZNode) for checkpoint counters. */
    public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
            key("high-availability.zookeeper.path.checkpoint-counter")
            .defaultValue("/checkpoint-counter")
            .withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter")
            .withDescription("ZooKeeper root path (ZNode) for checkpoint counters.");

    /** ZooKeeper root path (ZNode) for Mesos workers. */
    @PublicEvolving
    public static final ConfigOption<String> HA_ZOOKEEPER_MESOS_WORKERS_PATH =
            key("high-availability.zookeeper.path.mesos-workers")
            .defaultValue("/mesos-workers")
            .withDeprecatedKeys("recovery.zookeeper.path.mesos-workers")
            .withDescription(Description.builder()
                .text("The ZooKeeper root path for persisting the Mesos worker information.")
                .build());

    // ------------------------------------------------------------------------
    //  ZooKeeper Client Settings
    // ------------------------------------------------------------------------

    public static final ConfigOption<Integer> ZOOKEEPER_SESSION_TIMEOUT =
            key("high-availability.zookeeper.client.session-timeout")
            .defaultValue(60000)
            .withDeprecatedKeys("recovery.zookeeper.client.session-timeout")
            .withDescription("Defines the session timeout for the ZooKeeper session in ms.");

    public static final ConfigOption<Integer> ZOOKEEPER_CONNECTION_TIMEOUT =
            key("high-availability.zookeeper.client.connection-timeout")
            .defaultValue(15000)
            .withDeprecatedKeys("recovery.zookeeper.client.connection-timeout")
            .withDescription("Defines the connection timeout for ZooKeeper in ms.");

    public static final ConfigOption<Integer> ZOOKEEPER_RETRY_WAIT =
            key("high-availability.zookeeper.client.retry-wait")
            .defaultValue(5000)
            .withDeprecatedKeys("recovery.zookeeper.client.retry-wait")
            .withDescription("Defines the pause between consecutive retries in ms.");

    public static final ConfigOption<Integer> ZOOKEEPER_MAX_RETRY_ATTEMPTS =
            key("high-availability.zookeeper.client.max-retry-attempts")
            .defaultValue(3)
            .withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts")
            .withDescription("Defines the number of connection retries before the client gives up.");

    public static final ConfigOption<String> ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH =
            key("high-availability.zookeeper.path.running-registry")
            .defaultValue("/running_job_registry/");

    public static final ConfigOption<String> ZOOKEEPER_CLIENT_ACL =
            key("high-availability.zookeeper.client.acl")
            .defaultValue("open")
            .withDescription("Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be" +
                " set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use" +
                " SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).");

    // ------------------------------------------------------------------------

    /** Not intended to be instantiated. */
    private HighAvailabilityOptions() {}
}
  • HighAvailabilityOptions defines a configuration item prefixed with high-availability.zookeeper

HighAvailabilityServicesUtils

flink-runtime_2.11-1.7.1-sources.jar! /org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java

public class HighAvailabilityServicesUtils {

    public static HighAvailabilityServices createAvailableOrEmbeddedServices(
        Configuration config,
        Executor executor) throws Exception {
        HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);

        switch (highAvailabilityMode) {
            case NONE:
                return new EmbeddedHaServices(executor);

            case ZOOKEEPER:
                BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

                return new ZooKeeperHaServices(
                    ZooKeeperUtils.startCuratorFramework(config),
                    executor,
                    config,
                    blobStoreService);

            case FACTORY_CLASS:
                return createCustomHAServices(config, executor);

            default:
                throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
        }
    }

    public static HighAvailabilityServices createHighAvailabilityServices(
        Configuration configuration,
        Executor executor,
        AddressResolution addressResolution) throws Exception {

        HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);

        switch (highAvailabilityMode) {
            case NONE:
                final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);

                final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                    hostnamePort.f0,
                    hostnamePort.f1,
                    JobMaster.JOB_MANAGER_NAME,
                    addressResolution,
                    configuration);
                final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                    hostnamePort.f0,
                    hostnamePort.f1,
                    ResourceManager.RESOURCE_MANAGER_NAME,
                    addressResolution,
                    configuration);
                final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                    hostnamePort.f0,
                    hostnamePort.f1,
                    Dispatcher.DISPATCHER_NAME,
                    addressResolution,
                    configuration);

                final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS),
                    "%s must be set",
                    RestOptions.ADDRESS.key());
                final int port = configuration.getInteger(RestOptions.PORT);
                final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration);
                final String protocol = enableSSL ? "https://" : "http://";

                return new StandaloneHaServices(
                    resourceManagerRpcUrl,
                    dispatcherRpcUrl,
                    jobManagerRpcUrl,
                    String.format("%s%s:%s", protocol, address, port));
            case ZOOKEEPER:
                BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);

                return new ZooKeeperHaServices(
                    ZooKeeperUtils.startCuratorFramework(configuration),
                    executor,
                    configuration,
                    blobStoreService);

            case FACTORY_CLASS:
                return createCustomHAServices(configuration, executor);

            default:
                throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
        }
    }

    /**
     * Returns the JobManager's hostname and port extracted from the given
     * {@link Configuration}.
     *
     * @param configuration Configuration to extract the JobManager's address from
     * @return The JobManager's hostname and port
     * @throws ConfigurationException if the JobManager's address cannot be extracted from the configuration
     */
    public static Tuple2<String, Integer> getJobManagerAddress(Configuration configuration) throws ConfigurationException {

        final String hostname = configuration.getString(JobManagerOptions.ADDRESS);
        final int port = configuration.getInteger(JobManagerOptions.PORT);

        if (hostname == null) {
            throw new ConfigurationException("Config parameter '" + JobManagerOptions.ADDRESS +
                "' is missing (hostname/address of JobManager to connect to).");
        }

        if (port <= 0 || port >= 65536) {
            throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT +
                "' (port of the JobManager actor system) : " + port +
                ".  it must be greater than 0 and less than 65536.");
        }

        return Tuple2.of(hostname, port);
    }

    private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException {
        final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE);

        final HighAvailabilityServicesFactory highAvailabilityServicesFactory;

        try {
            highAvailabilityServicesFactory = InstantiationUtil.instantiate(
                haServicesClassName,
                HighAvailabilityServicesFactory.class,
                classLoader);
        } catch (Exception e) {
            throw new FlinkException(
                String.format(
                    "Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.",
                    haServicesClassName),
                e);
        }

        try {
            return highAvailabilityServicesFactory.createHAServices(config, executor);
        } catch (Exception e) {
            throw new FlinkException(
                String.format(
                    "Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.",
                    haServicesClassName),
                e);
        }
    }

    /**
     * Enum specifying whether address resolution should be tried or not when creating the
     * {@link HighAvailabilityServices}.
     */
    public enum AddressResolution {
        TRY_ADDRESS_RESOLUTION,
        NO_ADDRESS_RESOLUTION
    }
}
  • Highailableorembeddedservices provides static methods for creating HighAvailabilityServices, such as createavaillableorembeddedservices, createHighAvailabilityServices, createCustomHAServices.
  • The CreateAvailableOrembeddedServices method is mainly used by FlinkMiniCluster. The createHighAvailabilityServices method is mainly used for ClusterE ntrypoint. it creates StandaloneHaServices when highAvailabilityMode is NONE. ZOOKEEPERHaservices is created for Zookeeper in HigavilabilityMode, and createCustomHAServices method is used when HigavilabilityMode is FACTORY_CLASS.
  • HighAvailabilityServicesUtils also provides a static method of getJobManagerAddress to obtain JobManager’s hostname and port.

HighAvailabilityServices

flink-runtime_2.11-1.7.1-sources.jar! /org/apache/flink/runtime/highavailability/HighAvailabilityServices.java

/**
 * The HighAvailabilityServices give access to all services needed for a highly-available
 * setup. In particular, the services provide access to highly available storage and
 * registries, as well as distributed counters and leader election.
 * 
 * <ul>
 *     <li>ResourceManager leader election and leader retrieval</li>
 *     <li>JobManager leader election and leader retrieval</li>
 *     <li>Persistence for checkpoint metadata</li>
 *     <li>Registering the latest completed checkpoint(s)</li>
 *     <li>Persistence for the BLOB store</li>
 *     <li>Registry that marks a job's status</li>
 *     <li>Naming of RPC endpoints</li>
 * </ul>
 */
public interface HighAvailabilityServices extends AutoCloseable {

    // ------------------------------------------------------------------------
    //  Constants
    // ------------------------------------------------------------------------

    /**
     * This UUID should be used when no proper leader election happens, but a simple
     * pre-configured leader is used. That is for example the case in non-highly-available
     * standalone setups.
     */
    UUID DEFAULT_LEADER_ID = new UUID(0, 0);

    /**
     * This JobID should be used to identify the old JobManager when using the
     * {@link HighAvailabilityServices}. With the new mode every JobMaster will have a
     * distinct JobID assigned.
     */
    JobID DEFAULT_JOB_ID = new JobID(0L, 0L);

    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------

    /**
     * Gets the leader retriever for the cluster's resource manager.
     */
    LeaderRetrievalService getResourceManagerLeaderRetriever();

    /**
     * Gets the leader retriever for the dispatcher. This leader retrieval service
     * is not always accessible.
     */
    LeaderRetrievalService getDispatcherLeaderRetriever();

    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job
     *
     * @param jobID The identifier of the job.
     * @return Leader retrieval service to retrieve the job manager for the given job
     * @deprecated This method should only be used by the legacy code where the JobManager acts as the master.
     */
    @Deprecated
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);

    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job
     *
     * @param jobID The identifier of the job.
     * @param defaultJobManagerAddress JobManager address which will be returned by
     *                              a static leader retrieval service.
     * @return Leader retrieval service to retrieve the job manager for the given job
     */
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);

    LeaderRetrievalService getWebMonitorLeaderRetriever();

    /**
     * Gets the leader election service for the cluster's resource manager.
     *
     * @return Leader election service for the resource manager leader election
     */
    LeaderElectionService getResourceManagerLeaderElectionService();

    /**
     * Gets the leader election service for the cluster's dispatcher.
     *
     * @return Leader election service for the dispatcher leader election
     */
    LeaderElectionService getDispatcherLeaderElectionService();

    /**
     * Gets the leader election service for the given job.
     *
     * @param jobID The identifier of the job running the election.
     * @return Leader election service for the job manager leader election
     */
    LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);

    LeaderElectionService getWebMonitorLeaderElectionService();

    /**
     * Gets the checkpoint recovery factory for the job manager
     *
     * @return Checkpoint recovery factory
     */
    CheckpointRecoveryFactory getCheckpointRecoveryFactory();

    /**
     * Gets the submitted job graph store for the job manager
     *
     * @return Submitted job graph store
     * @throws Exception if the submitted job graph store could not be created
     */
    SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;

    /**
     * Gets the registry that holds information about whether jobs are currently running.
     *
     * @return Running job registry to retrieve running jobs
     */
    RunningJobsRegistry getRunningJobsRegistry() throws Exception;

    /**
     * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
     *
     * @return Blob store
     * @throws IOException if the blob store could not be created
     */
    BlobStore createBlobStore() throws IOException;

    // ------------------------------------------------------------------------
    //  Shutdown and Cleanup
    // ------------------------------------------------------------------------

    /**
     * Closes the high availability services, releasing all resources.
     * 
     * <p>This method <b>does not delete or clean up</b> any data stored in external stores
     * (file systems, ZooKeeper, etc). Another instance of the high availability
     * services will be able to recover the job.
     * 
     * <p>If an exception occurs during closing services, this method will attempt to
     * continue closing other services and report exceptions only after all services
     * have been attempted to be closed.
     *
     * @throws Exception Thrown, if an exception occurred while closing these services.
     */
    @Override
    void close() throws Exception;

    /**
     * Closes the high availability services (releasing all resources) and deletes
     * all data stored by these services in external stores.
     * 
     * <p>After this method was called, the any job or session that was managed by
     * these high availability services will be unrecoverable.
     * 
     * <p>If an exception occurs during cleanup, this method will attempt to
     * continue the cleanup and report exceptions only after all cleanup steps have
     * been attempted.
     * 
     * @throws Exception Thrown, if an exception occurred while closing these services
     *                   or cleaning up data stored by them.
     */
    void closeAndCleanupAllData() throws Exception;
}
  • HighAvailabilityServices defines the get methods of various services required by highly-available services.

ZooKeeperHaServices

flink-runtime_2.11-1.7.1-sources.jar! /org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java

/**
 * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
 * The services store data in ZooKeeper's nodes as illustrated by teh following tree structure:
 * 
 * <pre>
 * /flink
 *      +/cluster_id_1/resource_manager_lock
 *      |            |
 *      |            +/job-id-1/job_manager_lock
 *      |            |         /checkpoints/latest
 *      |            |                     /latest-1
 *      |            |                     /latest-2
 *      |            |
 *      |            +/job-id-2/job_manager_lock
 *      |      
 *      +/cluster_id_2/resource_manager_lock
 *                   |
 *                   +/job-id-1/job_manager_lock
 *                            |/checkpoints/latest
 *                            |            /latest-1
 *                            |/persisted_job_graph
 * </pre>
 * 
 * <p>The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
 * This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to
 * accommodate specific permission.
 * 
 * <p>The "cluster_id" part identifies the data stored for a specific Flink "cluster". 
 * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job
 * on a framework like YARN or Mesos (in a "per-job-cluster" mode).
 * 
 * <p>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured
 * automatically by the client or dispatcher that submits the Job to YARN or Mesos.
 * 
 * <p>In the case of a standalone cluster, that cluster-id needs to be configured via
 * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same
 * cluster and participate in the execution of the same set of jobs.
 */
public class ZooKeeperHaServices implements HighAvailabilityServices {

    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHaServices.class);

    private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";

    private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";

    private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";

    private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";

    // ------------------------------------------------------------------------
    
    
    /** The ZooKeeper client to use */
    private final CuratorFramework client;

    /** The executor to run ZooKeeper callbacks on */
    private final Executor executor;

    /** The runtime configuration */
    private final Configuration configuration;

    /** The zookeeper based running jobs registry */
    private final RunningJobsRegistry runningJobsRegistry;

    /** Store for arbitrary blobs */
    private final BlobStoreService blobStoreService;

    public ZooKeeperHaServices(
            CuratorFramework client,
            Executor executor,
            Configuration configuration,
            BlobStoreService blobStoreService) {
        this.client = checkNotNull(client);
        this.executor = checkNotNull(executor);
        this.configuration = checkNotNull(configuration);
        this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration);

        this.blobStoreService = checkNotNull(blobStoreService);
    }

    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------

    @Override
    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
        return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
    }

    @Override
    public LeaderRetrievalService getDispatcherLeaderRetriever() {
        return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH);
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
        return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
        return getJobManagerLeaderRetriever(jobID);
    }

    @Override
    public LeaderRetrievalService getWebMonitorLeaderRetriever() {
        return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, REST_SERVER_LEADER_PATH);
    }

    @Override
    public LeaderElectionService getResourceManagerLeaderElectionService() {
        return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
    }

    @Override
    public LeaderElectionService getDispatcherLeaderElectionService() {
        return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH);
    }

    @Override
    public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
        return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
    }

    @Override
    public LeaderElectionService getWebMonitorLeaderElectionService() {
        return ZooKeeperUtils.createLeaderElectionService(client, configuration, REST_SERVER_LEADER_PATH);
    }

    @Override
    public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
        return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor);
    }

    @Override
    public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
        return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
    }

    @Override
    public RunningJobsRegistry getRunningJobsRegistry() {
        return runningJobsRegistry;
    }

    @Override
    public BlobStore createBlobStore() throws IOException {
        return blobStoreService;
    }

    // ------------------------------------------------------------------------
    //  Shutdown
    // ------------------------------------------------------------------------

    @Override
    public void close() throws Exception {
        Throwable exception = null;

        try {
            blobStoreService.close();
        } catch (Throwable t) {
            exception = t;
        }

        internalClose();

        if (exception != null) {
            ExceptionUtils.rethrowException(exception, "Could not properly close the ZooKeeperHaServices.");
        }
    }

    @Override
    public void closeAndCleanupAllData() throws Exception {
        LOG.info("Close and clean up all data for ZooKeeperHaServices.");

        Throwable exception = null;

        try {
            blobStoreService.closeAndCleanupAllData();
        } catch (Throwable t) {
            exception = t;
        }

        internalClose();

        if (exception != null) {
            ExceptionUtils.rethrowException(exception, "Could not properly close and clean up all data of ZooKeeperHaServices.");
        }
    }

    /**
     * Closes components which don't distinguish between close and closeAndCleanupAllData
     */
    private void internalClose() {
        client.close();
    }

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    private static String getPathForJobManager(final JobID jobID) {
        return "/" + jobID + JOB_MANAGER_LEADER_PATH;
    }
}
  • ZooKeeperHaServices implements the HighAvailabilityServices interface, which creates the required service through various create methods of ZooKeeperUtils. For example, zookeeper utils. createleaderretrieval service, zookeeper utils. createleaderelectionservice, zookeeper utils. createsubmiteddjobgraphs

JobClient.submitJob

flink-runtime_2.11-1.7.1-sources.jar! /org/apache/flink/runtime/client/JobClient.java

public class JobClient {

    private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);

    //......

    /**
     * Submits a job to a Flink cluster (non-blocking) and returns a JobListeningContext which can be
     * passed to {@code awaitJobResult} to get the result of the submission.
     * @return JobListeningContext which may be used to retrieve the JobExecutionResult via
     *             {@code awaitJobResult(JobListeningContext context)}.
     */
    public static JobListeningContext submitJob(
            ActorSystem actorSystem,
            Configuration config,
            HighAvailabilityServices highAvailabilityServices,
            JobGraph jobGraph,
            FiniteDuration timeout,
            boolean sysoutLogUpdates,
            ClassLoader classLoader) {

        checkNotNull(actorSystem, "The actorSystem must not be null.");
        checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
        checkNotNull(jobGraph, "The jobGraph must not be null.");
        checkNotNull(timeout, "The timeout must not be null.");

        // for this job, we create a proxy JobClientActor that deals with all communication with
        // the JobManager. It forwards the job submission, checks the success/failure responses, logs
        // update messages, watches for disconnect between client and JobManager, ...

        Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
            highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
            timeout,
            sysoutLogUpdates,
            config);

        ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);

        Future<Object> submissionFuture = Patterns.ask(
                jobClientActor,
                new JobClientMessages.SubmitJobAndWait(jobGraph),
                new Timeout(AkkaUtils.INF_TIMEOUT()));

        return new JobListeningContext(
            jobGraph.getJobID(),
            submissionFuture,
            jobClientActor,
            timeout,
            classLoader,
            highAvailabilityServices);
    }

    //......
}
  • For example, the JobClient.submitJob method uses the highhavilabilityservices.getJobManagerLeader retriever method to obtain the address of jobmanagerleader for submitting the job.

Summary

  • HighAvailabilityMode has three enumerations: NONE, ZOOKEEPER, FACTORY_CLASS; ; These enumerations have an attribute haActive, which is used to indicate whether HighAvailability; is supported; HighAvailabilityOptions defines a configuration item prefixed with high-availability.zookeeper
  • Highailableorembeddedservices provides static methods for creating HighAvailabilityServices, such as createavaillableorembeddedservices, createHighAvailabilityServices, createCustomHAServices; ; The CreateAvailableOrembeddedServices method is mainly used by FlinkMiniCluster. The createHighAvailabilityServices method is mainly used for ClusterE ntrypoint. it creates StandaloneHaServices when highAvailabilityMode is NONE. ZOOKEEPERHaservices is created for Zookeeper in HigavilabilityMode, and createCustomHAServices method is used when HigavilabilityMode is FACTORY_CLASS.
  • HighAvailabilityServices defines the get methods of various services required by highly-available services; ZooKeeperHaServices implements the HighAvailabilityServices interface, which creates the required service through various create methods of ZooKeeperUtils. For example, zookeeperutils.createleaderretrieval service, zookeeperutils.createleaderelectionservice, zookeeperutils.createsubmittedjobgraphs; For example, the JobClient.submitJob method uses the highhavilabilityservices.getJobManagerLeader retriever method to obtain the address of jobmanagerleader for submitting the job.

doc