Talk about flink’s ScheduledExecutor

  flink

Order

This article mainly studies flink’s ScheduledExecutor

Executor

java.base/java/util/concurrent/Executor.java

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
  • Jdk’s Executor interface defines the execute method, and the receiving parameter type is Runnable

ScheduledExecutor

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java

public interface ScheduledExecutor extends Executor {

    /**
     * Executes the given command after the given delay.
     *
     * @param command the task to execute in the future
     * @param delay the time from now to delay the execution
     * @param unit the time unit of the delay parameter
     * @return a ScheduledFuture representing the completion of the scheduled task
     */
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    /**
     * Executes the given callable after the given delay. The result of the callable is returned
     * as a {@link ScheduledFuture}.
     *
     * @param callable the callable to execute
     * @param delay the time from now to delay the execution
     * @param unit the time unit of the delay parameter
     * @param <V> result type of the callable
     * @return a ScheduledFuture which holds the future value of the given callable
     */
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    /**
     * Executes the given command periodically. The first execution is started after the
     * {@code initialDelay}, the second execution is started after {@code initialDelay + period},
     * the third after {@code initialDelay + 2*period} and so on.
     * The task is executed until either an execution fails, or the returned {@link ScheduledFuture}
     * is cancelled.
     *
     * @param command the task to be executed periodically
     * @param initialDelay the time from now until the first execution is triggered
     * @param period the time after which the next execution is triggered
     * @param unit the time unit of the delay and period parameter
     * @return a ScheduledFuture representing the periodic task. This future never completes
     * unless an execution of the given task fails or if the future is cancelled
     */
    ScheduledFuture<?> scheduleAtFixedRate(
        Runnable command,
        long initialDelay,
        long period,
        TimeUnit unit);

    /**
     * Executed the given command repeatedly with the given delay between the end of an execution
     * and the start of the next execution.
     * The task is executed repeatedly until either an exception occurs or if the returned
     * {@link ScheduledFuture} is cancelled.
     *
     * @param command the task to execute repeatedly
     * @param initialDelay the time from now until the first execution is triggered
     * @param delay the time between the end of the current and the start of the next execution
     * @param unit the time unit of the initial delay and the delay parameter
     * @return a ScheduledFuture representing the repeatedly executed task. This future never
     * completes unless the execution of the given task fails or if the future is cancelled
     */
    ScheduledFuture<?> scheduleWithFixedDelay(
        Runnable command,
        long initialDelay,
        long delay,
        TimeUnit unit);
}
  • The scheduledExecutor interface inherits the Executor and defines the schedule, ScheduleLettIxedDraw, scheduleWithFixedDelay methods, where the Schedule method can receive Runnable or Callable, and all these methods return the ScheduledFuture; ; The interface has two implementation classes: ScheduledExecutorServiceAdapter and ActorsystemScheduledExecutorAdapter

ScheduledExecutorServiceAdapter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java

public class ScheduledExecutorServiceAdapter implements ScheduledExecutor {

    private final ScheduledExecutorService scheduledExecutorService;

    public ScheduledExecutorServiceAdapter(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return scheduledExecutorService.schedule(command, delay, unit);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return scheduledExecutorService.schedule(callable, delay, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }

    @Override
    public void execute(Runnable command) {
        scheduledExecutorService.execute(command);
    }
}
  • ScheduledExecutorServiceAdapter implements the scheduledExecutor interface. It uses the ScheduledExecutorService of jdk to implement it. It uses the Sc heduledExecutorService’s Schedule, scheduleAtFixedRate,

ScheduleWithFixedDelay, execute Methods

ActorSystemScheduledExecutorAdapter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java

public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {

    private final ActorSystem actorSystem;

    public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) {
        this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService");
    }

    @Override
    @Nonnull
    public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L);

        Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);

        scheduledFutureTask.setCancellable(cancellable);

        return scheduledFutureTask;
    }

    @Override
    @Nonnull
    public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L);

        Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);

        scheduledFutureTask.setCancellable(cancellable);

        return scheduledFutureTask;
    }

    @Override
    @Nonnull
    public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
            command,
            triggerTime(unit.toNanos(initialDelay)),
            unit.toNanos(period));

        Cancellable cancellable = actorSystem.scheduler().schedule(
            new FiniteDuration(initialDelay, unit),
            new FiniteDuration(period, unit),
            scheduledFutureTask,
            actorSystem.dispatcher());

        scheduledFutureTask.setCancellable(cancellable);

        return scheduledFutureTask;
    }

    @Override
    @Nonnull
    public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
            command,
            triggerTime(unit.toNanos(initialDelay)),
            unit.toNanos(-delay));

        Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit);

        scheduledFutureTask.setCancellable(cancellable);

        return scheduledFutureTask;
    }

    @Override
    public void execute(@Nonnull Runnable command) {
        actorSystem.dispatcher().execute(command);
    }

    private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) {
        return actorSystem.scheduler().scheduleOnce(
            new FiniteDuration(delay, unit),
            runnable,
            actorSystem.dispatcher());
    }

    private long now() {
        return System.nanoTime();
    }

    private long triggerTime(long delay) {
        return now() + delay;
    }

    private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

        private long time;

        private final long period;

        private volatile Cancellable cancellable;

        ScheduledFutureTask(Callable<V> callable, long time, long period) {
            super(callable);
            this.time = time;
            this.period = period;
        }

        ScheduledFutureTask(Runnable runnable, long time, long period) {
            super(runnable, null);
            this.time = time;
            this.period = period;
        }

        public void setCancellable(Cancellable newCancellable) {
            this.cancellable = newCancellable;
        }

        @Override
        public void run() {
            if (!isPeriodic()) {
                super.run();
            } else if (runAndReset()){
                if (period > 0L) {
                    time += period;
                } else {
                    cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS);

                    // check whether we have been cancelled concurrently
                    if (isCancelled()) {
                        cancellable.cancel();
                    } else {
                        time = triggerTime(-period);
                    }
                }
            }
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean result = super.cancel(mayInterruptIfRunning);

            return result && cancellable.cancel();
        }

        @Override
        public long getDelay(@Nonnull  TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }

        @Override
        public int compareTo(@Nonnull Delayed o) {
            if (o == this) {
                return 0;
            }

            long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
            return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0;
        }

        @Override
        public boolean isPeriodic() {
            return period != 0L;
        }
    }
}
  • ActorSystemScheduledExecutorAdapter implements the ScheduledExecutor interface, which is implemented by Actors System. Where the execute method uses the actorsystem.dispatcher (). execute method
  • The schedule and scheduleWithFixedDelay methods call the internalSchedule method, which uses the Actiorsystem. Scheduler (). ScheduleNCE method, but their ScheduledFutureTask is different. Where the period of ScheduledFutureTask of the schedule method is 0, and the period of ScheduledFutureTask of the scheduleWithFixedDelay method is unit.toNanos(-delay); ); The run method of ScheduledFutureTask will judge the period. if the period is less than or equal to 0, the internalSchedule method will be called again to realize the effect of scheduling with FixedDelay.
  • The scheduleAtFixedRate method uses the actorsystem.scheduler () . schedule method. the period of the ScheduledFutureTask is the period of the method parameter, and it does not use unit.to nano (-delay) as the period like the scheduleWithFixedDelay method.

Summary

  • The scheduledExecutor interface inherits the Executor and defines the schedule, ScheduleLettIxedDraw, scheduleWithFixedDelay methods, where the Schedule method can receive Runnable or Callable, and all these methods return the ScheduledFuture; ; The interface has two implementation classes: ScheduledExecutorServiceAdapter and ActorsystemScheduledExecutorAdapter
  • ScheduledExecutorServiceAdapter implements the scheduledExecutor interface. It uses the ScheduledExecutorService of jdk to implement it, and uses the ScheduledExecutorService’s Schedule, scheduleAtFixedRate, scheduleWithFixedDelay, and execute methods.
  • ActorSystemScheduledExecutorAdapter implements the ScheduledExecutor interface, which is implemented by Actors System. The execute method uses the Actiorsystem.dispatcher (). Execute method; The schedule and scheduleWithFixedDelay methods call the internalSchedule method, which uses the Actiorsystem. Scheduler (). ScheduleNCE method, but their ScheduledFutureTask is different. Where the period of ScheduledFutureTask of the schedule method is 0, and the period of ScheduledFutureTask of the scheduleWithFixedDelay method is unit.toNanos(-delay); ); The run method of ScheduledFutureTask will judge the period. If the period is less than or equal to 0, the internalSchedule method will be called again to realize the effect of scheduling with FixedDelay. The scheduleAtFixedRate method uses the actorsy stem.scheduler (). schedule method. the period of the ScheduledFutureTask is the period of the method parameter, and it does not use unit.to nano (-delay) as the period like the scheduleWithFixedDelay method.

doc