Talk about flink’s RestartStrategies.

  flink

Order

This article mainly studies flink’s RestartStrategies

RestartStrategies

flink-core-1.7.1-sources.jar! /org/apache/flink/api/common/restartstrategy/RestartStrategies.java

@PublicEvolving
public class RestartStrategies {

    /**
     * Generates NoRestartStrategyConfiguration.
     *
     * @return NoRestartStrategyConfiguration
     */
    public static RestartStrategyConfiguration noRestart() {
        return new NoRestartStrategyConfiguration();
    }

    public static RestartStrategyConfiguration fallBackRestart() {
        return new FallbackRestartStrategyConfiguration();
    }

    /**
     * Generates a FixedDelayRestartStrategyConfiguration.
     *
     * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy
     * @param delayBetweenAttempts Delay in-between restart attempts for the FixedDelayRestartStrategy
     * @return FixedDelayRestartStrategy
     */
    public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, long delayBetweenAttempts) {
        return fixedDelayRestart(restartAttempts, Time.of(delayBetweenAttempts, TimeUnit.MILLISECONDS));
    }

    /**
     * Generates a FixedDelayRestartStrategyConfiguration.
     *
     * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy
     * @param delayInterval Delay in-between restart attempts for the FixedDelayRestartStrategy
     * @return FixedDelayRestartStrategy
     */
    public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, Time delayInterval) {
        return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayInterval);
    }

    /**
     * Generates a FailureRateRestartStrategyConfiguration.
     *
     * @param failureRate Maximum number of restarts in given interval {@code failureInterval} before failing a job
     * @param failureInterval Time interval for failures
     * @param delayInterval Delay in-between restart attempts
     */
    public static FailureRateRestartStrategyConfiguration failureRateRestart(
            int failureRate, Time failureInterval, Time delayInterval) {
        return new FailureRateRestartStrategyConfiguration(failureRate, failureInterval, delayInterval);
    }

    //......
}
  • RestartStrategies provides the static methods of noRestart, fallBackRestart, fixedDelayRestart, failureRateRestart to build RestartStrategyConfiguration.

RestartStrategyConfiguration

flink-core-1.7.1-sources.jar! /org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public abstract static class RestartStrategyConfiguration implements Serializable {
        private static final long serialVersionUID = 6285853591578313960L;

        private RestartStrategyConfiguration() {}

        /**
         * Returns a description which is shown in the web interface.
         *
         * @return Description of the restart strategy
         */
        public abstract String getDescription();
    }
  • RestartStrategyConfiguration is an abstract class that defines the getDescription abstract method. It has several sub-categories of NoRestartStrategyC onfiguration, fixedlayrestartstratgyconfiguration, failureratestartstratgyconfiguration, fallbackrestartstratgyconfiguration.

NoRestartStrategyConfiguration

flink-core-1.7.1-sources.jar! /org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public static final class NoRestartStrategyConfiguration extends RestartStrategyConfiguration {
        private static final long serialVersionUID = -5894362702943349962L;

        @Override
        public String getDescription() {
            return "Restart deactivated.";
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            return o instanceof NoRestartStrategyConfiguration;
        }

        @Override
        public int hashCode() {
            return Objects.hash();
        }
    }
  • NoRestartStrategyConfiguration inherits RestartStrategyConfiguration, which represents no restart strategy.

FixedDelayRestartStrategyConfiguration

flink-core-1.7.1-sources.jar! /org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public static final class FixedDelayRestartStrategyConfiguration extends RestartStrategyConfiguration {
        private static final long serialVersionUID = 4149870149673363190L;

        private final int restartAttempts;
        private final Time delayBetweenAttemptsInterval;

        FixedDelayRestartStrategyConfiguration(int restartAttempts, Time delayBetweenAttemptsInterval) {
            this.restartAttempts = restartAttempts;
            this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
        }

        public int getRestartAttempts() {
            return restartAttempts;
        }

        public Time getDelayBetweenAttemptsInterval() {
            return delayBetweenAttemptsInterval;
        }

        @Override
        public int hashCode() {
            int result = restartAttempts;
            result = 31 * result + (delayBetweenAttemptsInterval != null ? delayBetweenAttemptsInterval.hashCode() : 0);
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (obj instanceof FixedDelayRestartStrategyConfiguration) {
                FixedDelayRestartStrategyConfiguration other = (FixedDelayRestartStrategyConfiguration) obj;

                return restartAttempts == other.restartAttempts && delayBetweenAttemptsInterval.equals(other.delayBetweenAttemptsInterval);
            } else {
                return false;
            }
        }

        @Override
        public String getDescription() {
            return "Restart with fixed delay (" + delayBetweenAttemptsInterval + "). #"
                + restartAttempts + " restart attempts.";
        }
    }
  • FixedLayrestartStrategyConfiguration inherits RestartStrategyConfiguration, which represents fixed delay restart strategy and has two attributes: restartAttempts and delayBetweenAttemptsInterval.

FailureRateRestartStrategyConfiguration

flink-core-1.7.1-sources.jar! /org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public static final class FailureRateRestartStrategyConfiguration extends RestartStrategyConfiguration {
        private static final long serialVersionUID = 1195028697539661739L;
        private final int maxFailureRate;

        private final Time failureInterval;
        private final Time delayBetweenAttemptsInterval;

        public FailureRateRestartStrategyConfiguration(int maxFailureRate, Time failureInterval, Time delayBetweenAttemptsInterval) {
            this.maxFailureRate = maxFailureRate;
            this.failureInterval = failureInterval;
            this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
        }

        public int getMaxFailureRate() {
            return maxFailureRate;
        }

        public Time getFailureInterval() {
            return failureInterval;
        }

        public Time getDelayBetweenAttemptsInterval() {
            return delayBetweenAttemptsInterval;
        }

        @Override
        public String getDescription() {
            return "Failure rate restart with maximum of " + maxFailureRate + " failures within interval " + failureInterval.toString()
                    + " and fixed delay " + delayBetweenAttemptsInterval.toString();
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            FailureRateRestartStrategyConfiguration that = (FailureRateRestartStrategyConfiguration) o;
            return maxFailureRate == that.maxFailureRate &&
                Objects.equals(failureInterval, that.failureInterval) &&
                Objects.equals(delayBetweenAttemptsInterval, that.delayBetweenAttemptsInterval);
        }

        @Override
        public int hashCode() {
            return Objects.hash(maxFailureRate, failureInterval, delayBetweenAttemptsInterval);
        }
    }
  • FailureRateRestartStrategyConfiguration inherits the RestartStrategicConfiguration, which represents failure rate restart strategy and has three attributes: maxFailureRate, failureInterval, and delayBetweenAttemptsInterval

FallbackRestartStrategyConfiguration

flink-core-1.7.1-sources.jar! /org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration {
        private static final long serialVersionUID = -4441787204284085544L;

        @Override
        public String getDescription() {
            return "Cluster level default restart strategy";
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            return o instanceof FallbackRestartStrategyConfiguration;
        }

        @Override
        public int hashCode() {
            return Objects.hash();
        }
    }
  • FallackRestartStrategyConfiguration inherits restartstrategy configuration, which represents clusterlevel defaultrestartstrategy.

RestartStrategyResolving

flink-runtime_2.11-1.7.1-sources.jar! /org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java

public final class RestartStrategyResolving {

    /**
     * Resolves which {@link RestartStrategy} to use. It should be used only on the server side.
     * The resolving strategy is as follows:
     * <ol>
     * <li>Strategy set within job graph.</li>
     * <li>Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing
     * is enabled.</li>
     * <li>If no strategy was set on client and server side and checkpointing was enabled then
     * {@link FixedDelayRestartStrategy} is used</li>
     * </ol>
     *
     * @param clientConfiguration restart configuration given within the job graph
     * @param serverStrategyFactory default server side strategy factory
     * @param isCheckpointingEnabled if checkpointing was enabled for the job
     * @return resolved strategy
     */
    public static RestartStrategy resolve(
            RestartStrategies.RestartStrategyConfiguration clientConfiguration,
            RestartStrategyFactory serverStrategyFactory,
            boolean isCheckpointingEnabled) {

        final RestartStrategy clientSideRestartStrategy =
            RestartStrategyFactory.createRestartStrategy(clientConfiguration);

        if (clientSideRestartStrategy != null) {
            return clientSideRestartStrategy;
        } else {
            if (serverStrategyFactory instanceof NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) {
                return ((NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) serverStrategyFactory)
                    .createRestartStrategy(isCheckpointingEnabled);
            } else {
                return serverStrategyFactory.createRestartStrategy();
            }
        }
    }

    private RestartStrategyResolving() {
    }
}
  • RestartStrategyResolving provides a static method resolve, which is used to parse RestartStrategies. RestartStrategies Configuration and then create RestartStrategies using RestartStrategies.

RestartStrategy

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java

public interface RestartStrategy {

    /**
     * True if the restart strategy can be applied to restart the {@link ExecutionGraph}.
     *
     * @return true if restart is possible, otherwise false
     */
    boolean canRestart();

    /**
     * Called by the ExecutionGraph to eventually trigger a full recovery.
     * The recovery must be triggered on the given callback object, and may be delayed
     * with the help of the given scheduled executor.
     *
     * <p>The thread that calls this method is not supposed to block/sleep.
     *
     * @param restarter The hook to restart the ExecutionGraph
     * @param executor An scheduled executor to delay the restart
     */
    void restart(RestartCallback restarter, ScheduledExecutor executor);
}
  • RestartStrategy defines two methods of canRestart and restart, which are NoRestartStrategy, FixedDelayRestartStrategy and FailureRateRestartStrategy.

NoRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar! /org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java

public class NoRestartStrategy implements RestartStrategy {

    @Override
    public boolean canRestart() {
        return false;
    }

    @Override
    public void restart(RestartCallback restarter, ScheduledExecutor executor) {
        throw new UnsupportedOperationException("NoRestartStrategy does not support restart.");
    }

    /**
     * Creates a NoRestartStrategyFactory instance.
     *
     * @param configuration Configuration object which is ignored
     * @return NoRestartStrategyFactory instance
     */
    public static NoRestartStrategyFactory createFactory(Configuration configuration) {
        return new NoRestartStrategyFactory();
    }

    @Override
    public String toString() {
        return "NoRestartStrategy";
    }

    public static class NoRestartStrategyFactory extends RestartStrategyFactory {

        private static final long serialVersionUID = -1809462525812787862L;

        @Override
        public RestartStrategy createRestartStrategy() {
            return new NoRestartStrategy();
        }
    }
}
  • NoRestartStrategy implements the RestartStrategy interface, its canRestart method returns false, and the restart method throws UnsupportedOperationException.

FixedDelayRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar! /org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java

public class FixedDelayRestartStrategy implements RestartStrategy {

    private final int maxNumberRestartAttempts;
    private final long delayBetweenRestartAttempts;
    private int currentRestartAttempt;

    public FixedDelayRestartStrategy(
        int maxNumberRestartAttempts,
        long delayBetweenRestartAttempts) {

        Preconditions.checkArgument(maxNumberRestartAttempts >= 0, "Maximum number of restart attempts must be positive.");
        Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, "Delay between restart attempts must be positive");

        this.maxNumberRestartAttempts = maxNumberRestartAttempts;
        this.delayBetweenRestartAttempts = delayBetweenRestartAttempts;
        currentRestartAttempt = 0;
    }

    public int getCurrentRestartAttempt() {
        return currentRestartAttempt;
    }

    @Override
    public boolean canRestart() {
        return currentRestartAttempt < maxNumberRestartAttempts;
    }

    @Override
    public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
        currentRestartAttempt++;

        executor.schedule(new Runnable() {
            @Override
            public void run() {
                restarter.triggerFullRecovery();
            }
        }, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS);
    }

    /**
     * Creates a FixedDelayRestartStrategy from the given Configuration.
     *
     * @param configuration Configuration containing the parameter values for the restart strategy
     * @return Initialized instance of FixedDelayRestartStrategy
     * @throws Exception
     */
    public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
        int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);

        String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY);

        long delay;

        try {
            delay = Duration.apply(delayString).toMillis();
        } catch (NumberFormatException nfe) {
            throw new Exception("Invalid config value for " +
                    ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
                    ". Value must be a valid duration (such as '100 milli' or '10 s')");
        }

        return new FixedDelayRestartStrategyFactory(maxAttempts, delay);
    }

    @Override
    public String toString() {
        return "FixedDelayRestartStrategy(" +
                "maxNumberRestartAttempts=" + maxNumberRestartAttempts +
                ", delayBetweenRestartAttempts=" + delayBetweenRestartAttempts +
                ')';
    }

    public static class FixedDelayRestartStrategyFactory extends RestartStrategyFactory {

        private static final long serialVersionUID = 6642934067762271950L;

        private final int maxAttempts;
        private final long delay;

        public FixedDelayRestartStrategyFactory(int maxAttempts, long delay) {
            this.maxAttempts = maxAttempts;
            this.delay = delay;
        }

        @Override
        public RestartStrategy createRestartStrategy() {
            return new FixedDelayRestartStrategy(maxAttempts, delay);
        }
    }
}
  • FixedDelayRestartStrategy implements the RestartStrategy interface, and its canRestart method is judged by currentRestartAttempt and maxNumberRestartAttempts. The restart method directly calls the ScheduledExecutor.sc hedule method, delaying delayBetweenRestartAttempts for milliseconds to execute the restart callback.triggerfullrecovery ()

FailureRateRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar! /org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java

public class FailureRateRestartStrategy implements RestartStrategy {

    private final Time failuresInterval;
    private final Time delayInterval;
    private final int maxFailuresPerInterval;
    private final ArrayDeque<Long> restartTimestampsDeque;

    public FailureRateRestartStrategy(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) {
        Preconditions.checkNotNull(failuresInterval, "Failures interval cannot be null.");
        Preconditions.checkNotNull(delayInterval, "Delay interval cannot be null.");
        Preconditions.checkArgument(maxFailuresPerInterval > 0, "Maximum number of restart attempts per time unit must be greater than 0.");
        Preconditions.checkArgument(failuresInterval.getSize() > 0, "Failures interval must be greater than 0 ms.");
        Preconditions.checkArgument(delayInterval.getSize() >= 0, "Delay interval must be at least 0 ms.");

        this.failuresInterval = failuresInterval;
        this.delayInterval = delayInterval;
        this.maxFailuresPerInterval = maxFailuresPerInterval;
        this.restartTimestampsDeque = new ArrayDeque<>(maxFailuresPerInterval);
    }

    @Override
    public boolean canRestart() {
        if (isRestartTimestampsQueueFull()) {
            Long now = System.currentTimeMillis();
            Long earliestFailure = restartTimestampsDeque.peek();

            return (now - earliestFailure) > failuresInterval.toMilliseconds();
        } else {
            return true;
        }
    }

    @Override
    public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
        if (isRestartTimestampsQueueFull()) {
            restartTimestampsDeque.remove();
        }
        restartTimestampsDeque.add(System.currentTimeMillis());

        executor.schedule(new Runnable() {
            @Override
            public void run() {
                restarter.triggerFullRecovery();
            }
        }, delayInterval.getSize(), delayInterval.getUnit());
    }

    private boolean isRestartTimestampsQueueFull() {
        return restartTimestampsDeque.size() >= maxFailuresPerInterval;
    }

    @Override
    public String toString() {
        return "FailureRateRestartStrategy(" +
            "failuresInterval=" + failuresInterval +
            "delayInterval=" + delayInterval +
            "maxFailuresPerInterval=" + maxFailuresPerInterval +
            ")";
    }

    public static FailureRateRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
        int maxFailuresPerInterval = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1);
        String failuresIntervalString = configuration.getString(
                ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.apply(1, TimeUnit.MINUTES).toString()
        );
        String timeoutString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL);
        String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, timeoutString);

        Duration failuresInterval = Duration.apply(failuresIntervalString);
        Duration delay = Duration.apply(delayString);


        return new FailureRateRestartStrategyFactory(maxFailuresPerInterval, Time.milliseconds(failuresInterval.toMillis()), Time.milliseconds(delay.toMillis()));
    }

    public static class FailureRateRestartStrategyFactory extends RestartStrategyFactory {
        private static final long serialVersionUID = -373724639430960480L;

        private final int maxFailuresPerInterval;
        private final Time failuresInterval;
        private final Time delayInterval;

        public FailureRateRestartStrategyFactory(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) {
            this.maxFailuresPerInterval = maxFailuresPerInterval;
            this.failuresInterval = Preconditions.checkNotNull(failuresInterval);
            this.delayInterval = Preconditions.checkNotNull(delayInterval);
        }

        @Override
        public RestartStrategy createRestartStrategy() {
            return new FailureRateRestartStrategy(maxFailuresPerInterval, failuresInterval, delayInterval);
        }
    }
}
  • FailureRateRestartStrategy implements the RestartStrategy interface. Its canRestart method returns true when the restartTimestampsDeque queue size is s maller than maxFailuresPerInterval, and determines whether the current time distance from earliestFailure is greater than failuresIntervalļ¼› when it is greater than or equal to maxFailuresPerInterval. The restart method adds the current time to restartTimestampsDeque, and then calls the ScheduledExecutor.schedule method to delay delayInterval to execute restartcallback.triggerfullrecovery ()

Summary

  • RestartStrategies provides the static methods of noRestart, fallBackRestart, fixedDelayRestart, failureRateRestart to build RestartStrategyConfiguration.
  • RestartStrategyConfiguration is an abstract class that defines the getDescription abstract method. It has several sub-categories of NoRestartStrategyC onfiguration, fixedlayrestartstratgyconfiguration, failureratestartstratgyconfiguration, fallbackrestartstratgyconfiguration.
  • RestartStrategyResolving provides a static method resolve, which is used to parse RestartStrategies. RestartStrategies Configuration and then create RestartStrategies using RestartStrategies. RestartStrategy defines two methods of canRestart and restart, which are NoRestartStrategy, FixedDelayRestartStrategy and FailureRateRestartStrategy.

doc