Talk about flink’s RestClusterClientConfiguration

  flink

Order

This article mainly studies flink’s RestClusterClientConfiguration.

RestClusterClientConfiguration

flink-release-1.7.2/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java

public final class RestClusterClientConfiguration {

    private final RestClientConfiguration restClientConfiguration;

    private final long awaitLeaderTimeout;

    private final int retryMaxAttempts;

    private final long retryDelay;

    private RestClusterClientConfiguration(
            final RestClientConfiguration endpointConfiguration,
            final long awaitLeaderTimeout,
            final int retryMaxAttempts,
            final long retryDelay) {
        checkArgument(awaitLeaderTimeout >= 0, "awaitLeaderTimeout must be equal to or greater than 0");
        checkArgument(retryMaxAttempts >= 0, "retryMaxAttempts must be equal to or greater than 0");
        checkArgument(retryDelay >= 0, "retryDelay must be equal to or greater than 0");

        this.restClientConfiguration = Preconditions.checkNotNull(endpointConfiguration);
        this.awaitLeaderTimeout = awaitLeaderTimeout;
        this.retryMaxAttempts = retryMaxAttempts;
        this.retryDelay = retryDelay;
    }

    public RestClientConfiguration getRestClientConfiguration() {
        return restClientConfiguration;
    }

    /**
     * @see RestOptions#AWAIT_LEADER_TIMEOUT
     */
    public long getAwaitLeaderTimeout() {
        return awaitLeaderTimeout;
    }

    /**
     * @see RestOptions#RETRY_MAX_ATTEMPTS
     */
    public int getRetryMaxAttempts() {
        return retryMaxAttempts;
    }

    /**
     * @see RestOptions#RETRY_DELAY
     */
    public long getRetryDelay() {
        return retryDelay;
    }

    public static RestClusterClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
        RestClientConfiguration restClientConfiguration = RestClientConfiguration.fromConfiguration(config);

        final long awaitLeaderTimeout = config.getLong(RestOptions.AWAIT_LEADER_TIMEOUT);
        final int retryMaxAttempts = config.getInteger(RestOptions.RETRY_MAX_ATTEMPTS);
        final long retryDelay = config.getLong(RestOptions.RETRY_DELAY);

        return new RestClusterClientConfiguration(restClientConfiguration, awaitLeaderTimeout, retryMaxAttempts, retryDelay);
    }
}
  • RestClusterClientConfiguration has 3 attributes besides RestClientConfiguration, namely awaitLeaderTimeout, retryMaxAttempts, retryDelay; ; AwaitLeaderTimeout reads rest.await-leader-timeout configuration, default is 30 seconds; RetryMaxAttempts reads the rest.retry.max-attempts configuration, which defaults to 20; RetryDelay reads the rest.retry.delay configuration, which defaults to 3 seconds.

RestClusterClient

flink-release-1.7.2/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java

public class RestClusterClient<T> extends ClusterClient<T> implements NewClusterClient {

    private final RestClusterClientConfiguration restClusterClientConfiguration;

    private final RestClient restClient;

    private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));

    private final WaitStrategy waitStrategy;

    private final T clusterId;

    private final LeaderRetrievalService webMonitorRetrievalService;

    private final LeaderRetrievalService dispatcherRetrievalService;

    private final LeaderRetriever webMonitorLeaderRetriever = new LeaderRetriever();

    private final LeaderRetriever dispatcherLeaderRetriever = new LeaderRetriever();

    /** ExecutorService to run operations that can be retried on exceptions. */
    private ScheduledExecutorService retryExecutorService;

    //......

    private <C> CompletableFuture<C> retry(
            CheckedSupplier<CompletableFuture<C>> operation,
            Predicate<Throwable> retryPredicate) {
        return FutureUtils.retryWithDelay(
            CheckedSupplier.unchecked(operation),
            restClusterClientConfiguration.getRetryMaxAttempts(),
            Time.milliseconds(restClusterClientConfiguration.getRetryDelay()),
            retryPredicate,
            new ScheduledExecutorServiceAdapter(retryExecutorService));
    }

    @VisibleForTesting
    CompletableFuture<URL> getWebMonitorBaseUrl() {
        return FutureUtils.orTimeout(
                webMonitorLeaderRetriever.getLeaderFuture(),
                restClusterClientConfiguration.getAwaitLeaderTimeout(),
                TimeUnit.MILLISECONDS)
            .thenApplyAsync(leaderAddressSessionId -> {
                final String url = leaderAddressSessionId.f0;
                try {
                    return new URL(url);
                } catch (MalformedURLException e) {
                    throw new IllegalArgumentException("Could not parse URL from " + url, e);
                }
            }, executorService);
    }

    //......
}
  • RestClusterClient’s constructor builds the restclusterclientConfiguration from the configuration using the RestClusterClientConfiguration.fromconfiguration method.
  • The retrymethod uses FutureUtils.retryWithDelay method internally, and its retryparameters use restclusterclientconfiguration.getretrymaxattempts (), and the retryDelay parameters use time.milliseconds (restclusterclientconfiguration.getretrydelay ())
  • The getWebMonitorBaseUrl method uses FutureUtils.orTimeout method internally, and its timeout parameter uses restclusterclientconfiguration.getawaitleadtimeout ()

Summary

  • RestClusterClientConfiguration has 3 attributes besides RestClientConfiguration, namely awaitLeaderTimeout, retryMaxAttempts, retryDelay
  • AwaitLeaderTimeout reads rest.await-leader-timeout configuration, default is 30 seconds; RetryMaxAttempts reads the rest.retry.max-attempts configuration, which defaults to 20; RetryDelay reads the rest.retry.delay configuration, which defaults to 3 seconds.
  • RestClusterClient’s etry method uses FutureUtils.retryWithDelay method internally, and its retries parameter uses restclusterclientconfiguration.getretrymaxattempts (), The retryDelay parameter uses Time.Milliseconds (RestclusterClientConfiguration.getRetryDelay ()); The getWebMonitorBaseUrl method uses FutureUtils.orTimeout method internally, and its timeout parameter uses restclusterclientconfiguration.getawaitleadtimeout ()

doc