Talk about flink’s slot.request.timeout configuration

  flink

Order

This article mainly studies flink’s slot.request.timeout configuration.

JobManagerOptions

flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java

@PublicEvolving
public class JobManagerOptions {
    //......

    /**
     * The timeout in milliseconds for requesting a slot from Slot Pool.
     */
    public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT =
        key("slot.request.timeout")
        .defaultValue(5L * 60L * 1000L)
        .withDescription("The timeout in milliseconds for requesting a slot from Slot Pool.");

    //......
}
  • Time out defaults to 5 minutes.

SlotManagerConfiguration

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java

public class SlotManagerConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(SlotManagerConfiguration.class);

    private final Time taskManagerRequestTimeout;
    private final Time slotRequestTimeout;
    private final Time taskManagerTimeout;

    public SlotManagerConfiguration(
            Time taskManagerRequestTimeout,
            Time slotRequestTimeout,
            Time taskManagerTimeout) {
        this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
        this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
        this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
    }

    public Time getTaskManagerRequestTimeout() {
        return taskManagerRequestTimeout;
    }

    public Time getSlotRequestTimeout() {
        return slotRequestTimeout;
    }

    public Time getTaskManagerTimeout() {
        return taskManagerTimeout;
    }

    public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
        final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT);
        final Time rpcTimeout;

        try {
            rpcTimeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
        } catch (NumberFormatException e) {
            throw new ConfigurationException("Could not parse the resource manager's timeout " +
                "value " + AkkaOptions.ASK_TIMEOUT + '.', e);
        }

        final Time slotRequestTimeout = getSlotRequestTimeout(configuration);
        final Time taskManagerTimeout = Time.milliseconds(
                configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));

        return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout);
    }

    private static Time getSlotRequestTimeout(final Configuration configuration) {
        final long slotRequestTimeoutMs;
        if (configuration.contains(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)) {
            LOGGER.warn("Config key {} is deprecated; use {} instead.",
                ResourceManagerOptions.SLOT_REQUEST_TIMEOUT,
                JobManagerOptions.SLOT_REQUEST_TIMEOUT);
            slotRequestTimeoutMs = configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT);
        } else {
            slotRequestTimeoutMs = configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT);
        }
        return Time.milliseconds(slotRequestTimeoutMs);
    }
}
  • The getSlotRequestTimeout method of SlotManagerConfiguration reads jobmanageroptions.slot _ request _ timeout from the configuration file

SlotManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java

public class SlotManager implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);

    /** Scheduled executor for timeouts. */
    private final ScheduledExecutor scheduledExecutor;

    /** Timeout for slot requests to the task manager. */
    private final Time taskManagerRequestTimeout;

    /** Timeout after which an allocation is discarded. */
    private final Time slotRequestTimeout;

    /** Timeout after which an unused TaskManager is released. */
    private final Time taskManagerTimeout;

    /** Map for all registered slots. */
    private final HashMap<SlotID, TaskManagerSlot> slots;

    /** Index of all currently free slots. */
    private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;

    /** All currently registered task managers. */
    private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;

    /** Map of fulfilled and active allocations for request deduplication purposes. */
    private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;

    /** Map of pending/unfulfilled slot allocation requests. */
    private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;

    private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;

    /** ResourceManager's id. */
    private ResourceManagerId resourceManagerId;

    /** Executor for future callbacks which have to be "synchronized". */
    private Executor mainThreadExecutor;

    /** Callbacks for resource (de-)allocations. */
    private ResourceActions resourceActions;

    private ScheduledFuture<?> taskManagerTimeoutCheck;

    private ScheduledFuture<?> slotRequestTimeoutCheck;

    /** True iff the component has been started. */
    private boolean started;

    public SlotManager(
            ScheduledExecutor scheduledExecutor,
            Time taskManagerRequestTimeout,
            Time slotRequestTimeout,
            Time taskManagerTimeout) {
        this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
        this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
        this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
        this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);

        slots = new HashMap<>(16);
        freeSlots = new LinkedHashMap<>(16);
        taskManagerRegistrations = new HashMap<>(4);
        fulfilledSlotRequests = new HashMap<>(16);
        pendingSlotRequests = new HashMap<>(16);
        pendingSlots = new HashMap<>(16);

        resourceManagerId = null;
        resourceActions = null;
        mainThreadExecutor = null;
        taskManagerTimeoutCheck = null;
        slotRequestTimeoutCheck = null;

        started = false;
    }

    public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
        LOG.info("Starting the SlotManager.");

        this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
        mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
        resourceActions = Preconditions.checkNotNull(newResourceActions);

        started = true;

        taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
            () -> mainThreadExecutor.execute(
                () -> checkTaskManagerTimeouts()),
            0L,
            taskManagerTimeout.toMilliseconds(),
            TimeUnit.MILLISECONDS);

        slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
            () -> mainThreadExecutor.execute(
                () -> checkSlotRequestTimeouts()),
            0L,
            slotRequestTimeout.toMilliseconds(),
            TimeUnit.MILLISECONDS);
    }

    /**
     * Suspends the component. This clears the internal state of the slot manager.
     */
    public void suspend() {
        LOG.info("Suspending the SlotManager.");

        // stop the timeout checks for the TaskManagers and the SlotRequests
        if (taskManagerTimeoutCheck != null) {
            taskManagerTimeoutCheck.cancel(false);
            taskManagerTimeoutCheck = null;
        }

        if (slotRequestTimeoutCheck != null) {
            slotRequestTimeoutCheck.cancel(false);
            slotRequestTimeoutCheck = null;
        }

        for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
            cancelPendingSlotRequest(pendingSlotRequest);
        }

        pendingSlotRequests.clear();

        ArrayList<InstanceID> registeredTaskManagers = new ArrayList<>(taskManagerRegistrations.keySet());

        for (InstanceID registeredTaskManager : registeredTaskManagers) {
            unregisterTaskManager(registeredTaskManager);
        }

        resourceManagerId = null;
        resourceActions = null;
        started = false;
    }

    public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException {
        checkInit();

        if (checkDuplicateRequest(slotRequest.getAllocationId())) {
            LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());

            return false;
        } else {
            PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);

            pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);

            try {
                internalRequestSlot(pendingSlotRequest);
            } catch (ResourceManagerException e) {
                // requesting the slot failed --> remove pending slot request
                pendingSlotRequests.remove(slotRequest.getAllocationId());

                throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
            }

            return true;
        }
    }

    private void checkSlotRequestTimeouts() {
        if (!pendingSlotRequests.isEmpty()) {
            long currentTime = System.currentTimeMillis();

            Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator();

            while (slotRequestIterator.hasNext()) {
                PendingSlotRequest slotRequest = slotRequestIterator.next().getValue();

                if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) {
                    slotRequestIterator.remove();

                    if (slotRequest.isAssigned()) {
                        cancelPendingSlotRequest(slotRequest);
                    }

                    resourceActions.notifyAllocationFailure(
                        slotRequest.getJobId(),
                        slotRequest.getAllocationId(),
                        new TimeoutException("The allocation could not be fulfilled in time."));
                }
            }
        }
    }

    //......

}
  • The constructor of SlotManager receives the slotRequestTimeout parameter; It maintains the map; of pendingSlotRequests; The start method registers the slotRequestTimeoutCheck, which is dispatched every slotRequestTimeout time, and executes the checkSlotRequestTimeouts method. The suspend method cancel these pendingSlotRequest and then the map of pendingSlotRequests
  • RegisterSlotRequest method will first execute checkDuplicateRequest to determine whether there is duplication. if there is no duplication, it will maintain the slotRequest to pendingSlotRequests, and then call internalRequestSlot to allocate. if there is an exception, it will exception from pendingSlotRequests, and then throw SlotManagerException.
  • CheckslotRequestTimeouts will traverse pendingSlotRequests, and then judge whether the time difference is greater than or equal to SlotRequestTimeout according to SlotRequest. GetCreationTimestamp () and the current time. If it has timed out, The slotRequest is removed from pendingSlotRequests, and then cancelled, triggering resourceactions.notifyallocationfailure

Summary

  • The getSlotRequestTimeout method of SlotManagerConfiguration reads jobmanageroptions.slot _ request _ timeout from the configuration file; Time out defaults to 5 minutes.
  • The constructor of SlotManager receives the slotRequestTimeout parameter; It maintains the map; of pendingSlotRequests; The start method registers the slotRequestTimeoutCheck, which is dispatched every slotRequestTimeout time, and executes the checkSlotRequestTimeouts method. The suspend method cancel these pendingSlotRequest and then the map of pendingSlotRequests
  • RegisterSlotRequest method will first execute checkDuplicateRequest to determine whether there is duplication. if there is no duplication, it will maintain the slotRequest to pendingSlotRequests, then call internalRequestSlot to allocate, if there is an exception, it will exception from pendingSlotRequests, and then throw SlotManagerException; ; CheckslotRequestTimeouts will traverse pendingSlotRequests, and then judge whether the time difference is greater than or equal to SlotRequestTimeout according to SlotRequest. GetCreationTimestamp () and the current time. If it has timed out, The slotRequest is removed from pendingSlotRequests, and then cancelled, triggering resourceactions.notifyallocationfailure

doc