Talk about flink’s slot.idle.timeout configuration

  flink

Order

This article mainly studies flink’s slot.idle.timeout configuration.

JobManagerOptions

flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java

@PublicEvolving
public class JobManagerOptions {
    //......

    /**
     * The timeout in milliseconds for a idle slot in Slot Pool.
     */
    public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT =
        key("slot.idle.timeout")
            // default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery
            .defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
            .withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");

    //......
}
  • Slot.idle.timeout defaults to heartbeat managementoptions.heartbeat _ timeout.defaultvalue (), i.e. 50000L milliseconds.

SlotPool

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java

public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions {

    /** The interval (in milliseconds) in which the SlotPool writes its slot distribution on debug level. */
    private static final int STATUS_LOG_INTERVAL_MS = 60_000;

    private final JobID jobId;

    private final SchedulingStrategy schedulingStrategy;

    private final ProviderAndOwner providerAndOwner;

    /** All registered TaskManagers, slots will be accepted and used only if the resource is registered. */
    private final HashSet<ResourceID> registeredTaskManagers;

    /** The book-keeping of all allocated slots. */
    private final AllocatedSlots allocatedSlots;

    /** The book-keeping of all available slots. */
    private final AvailableSlots availableSlots;

    /** All pending requests waiting for slots. */
    private final DualKeyMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;

    /** The requests that are waiting for the resource manager to be connected. */
    private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager;

    /** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor). */
    private final Time rpcTimeout;

    /** Timeout for releasing idle slots. */
    private final Time idleSlotTimeout;

    private final Clock clock;

    /** Managers for the different slot sharing groups. */
    protected final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers;

    /** the fencing token of the job manager. */
    private JobMasterId jobMasterId;

    /** The gateway to communicate with resource manager. */
    private ResourceManagerGateway resourceManagerGateway;

    private String jobManagerAddress;

    //......

    /**
     * Start the slot pool to accept RPC calls.
     *
     * @param jobMasterId The necessary leader id for running the job.
     * @param newJobManagerAddress for the slot requests which are sent to the resource manager
     */
    public void start(JobMasterId jobMasterId, String newJobManagerAddress) throws Exception {
        this.jobMasterId = checkNotNull(jobMasterId);
        this.jobManagerAddress = checkNotNull(newJobManagerAddress);

        // TODO - start should not throw an exception
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException("This should never happen", e);
        }

        scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);

        if (log.isDebugEnabled()) {
            scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);
        }
    }

    /**
     * Check the available slots, release the slot that is idle for a long time.
     */
    private void checkIdleSlot() {

        // The timestamp in SlotAndTimestamp is relative
        final long currentRelativeTimeMillis = clock.relativeTimeMillis();

        final List<AllocatedSlot> expiredSlots = new ArrayList<>(availableSlots.size());

        for (SlotAndTimestamp slotAndTimestamp : availableSlots.availableSlots.values()) {
            if (currentRelativeTimeMillis - slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) {
                expiredSlots.add(slotAndTimestamp.slot);
            }
        }

        final FlinkException cause = new FlinkException("Releasing idle slot.");

        for (AllocatedSlot expiredSlot : expiredSlots) {
            final AllocationID allocationID = expiredSlot.getAllocationId();
            if (availableSlots.tryRemove(allocationID) != null) {

                log.info("Releasing idle slot [{}].", allocationID);
                final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
                    allocationID,
                    cause,
                    rpcTimeout);

                freeSlotFuture.whenCompleteAsync(
                    (Acknowledge ignored, Throwable throwable) -> {
                        if (throwable != null) {
                            if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
                                log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " +
                                    "Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(),
                                    throwable);
                                tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
                            } else {
                                log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " +
                                    "longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId());
                            }
                        }
                    },
                    getMainThreadExecutor());
            }
        }

        scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
    }

    //......
}
  • SlotPool calls the scheduleRunAsync method in the start method to delay the execution of checkIdleSlot; scheduled by idleSlotTimeout. The checkIdleSlot method checks the SlotAndTimestamp of the availableSlots one by one to determine whether the time diff erence between the current time and slotAndTimestamp.timestamp exceeds idleSlotTimeout, and if so, puts the expiredSlots in. After that, make availableSlots.tryRemove one by one for the expiredSlots, then call TaskManagerGateway.freeSlot to release, and then call schedulerunsync (this:: checkledslot, idleslottimeout) again for the next delay scheduling detection.

RpcEndpoint

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java

public abstract class RpcEndpoint implements RpcGateway {
    //......

    /**
     * Execute the runnable in the main thread of the underlying RPC endpoint, with
     * a delay of the given number of milliseconds.
     *
     * @param runnable Runnable to be executed
     * @param delay    The delay after which the runnable will be executed
     */
    protected void scheduleRunAsync(Runnable runnable, Time delay) {
        scheduleRunAsync(runnable, delay.getSize(), delay.getUnit());
    }

    /**
     * Execute the runnable in the main thread of the underlying RPC endpoint, with
     * a delay of the given number of milliseconds.
     *
     * @param runnable Runnable to be executed
     * @param delay    The delay after which the runnable will be executed
     */
    protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
        rpcServer.scheduleRunAsync(runnable, unit.toMillis(delay));
    }

    //......
}
  • RpcEndpoint provides scheduleRunAsync, which was last called rpcServer.scheduleRunAsync

Summary

  • Slot.idle.timeout defaults to heartbeat managementoptions.heartbeat _ timeout.defaultvalue (), i.e. 50000L milliseconds.
  • SlotPool calls the scheduleRunAsync method in the start method to delay the execution of checkIdleSlot; scheduled by idleSlotTimeout. The checkIdleSlot method checks the SlotAndTimestamp of the availableSlots one by one to determine whether the time diff erence between the current time and slotAndTimestamp.timestamp exceeds idleSlotTimeout, and if so, puts the expiredSlots in. After that, make availableSlots.tryRemove one by one for the expiredSlots, then call TaskManagerGateway.freeSlot to release, and then call schedulerunsync (this:: checkledslot, idleslottimeout) again for the next delay scheduling detection.
  • RpcEndpoint provides scheduleRunAsync, which was last called rpcServer.scheduleRunAsync

doc