Talk about flink’s jobstore configuration

  flink

Order

This article mainly studies flink’s jobstore configuration.

JobManagerOptions

flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java

@PublicEvolving
public class JobManagerOptions {
    //......

    /**
     * The job store cache size in bytes which is used to keep completed
     * jobs in memory.
     */
    public static final ConfigOption<Long> JOB_STORE_CACHE_SIZE =
        key("jobstore.cache-size")
        .defaultValue(50L * 1024L * 1024L)
        .withDescription("The job store cache size in bytes which is used to keep completed jobs in memory.");

    /**
     * The time in seconds after which a completed job expires and is purged from the job store.
     */
    public static final ConfigOption<Long> JOB_STORE_EXPIRATION_TIME =
        key("jobstore.expiration-time")
        .defaultValue(60L * 60L)
        .withDescription("The time in seconds after which a completed job expires and is purged from the job store.");

    //......
}
  • Jobstore.cache-size defaults to 50M; ; Jobstore.expiration-time defaults to 1 hour.

SessionClusterEntrypoint

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java

public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {

    public SessionClusterEntrypoint(Configuration configuration) {
        super(configuration);
    }

    @Override
    protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
            Configuration configuration,
            ScheduledExecutor scheduledExecutor) throws IOException {
        final File tmpDir = new File(ConfigurationUtils.parseTempDirectories(configuration)[0]);

        final Time expirationTime =  Time.seconds(configuration.getLong(JobManagerOptions.JOB_STORE_EXPIRATION_TIME));
        final long maximumCacheSizeBytes = configuration.getLong(JobManagerOptions.JOB_STORE_CACHE_SIZE);

        return new FileArchivedExecutionGraphStore(
            tmpDir,
            expirationTime,
            maximumCacheSizeBytes,
            scheduledExecutor,
            Ticker.systemTicker());
    }
}
  • SessionClusterEntrypoint’s CreateSerializableExecutionGraphStore method reads the JobManagerOptions. Job _ Store _ Expiration _ Time and JobManagerOpti ons. Job _ Store _ Cache _ Size configurations, and then creates FileArchiveExecutionGraphStore

FileArchivedExecutionGraphStore

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java

public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore {

    private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);

    private final File storageDir;

    private final Cache<JobID, JobDetails> jobDetailsCache;

    private final LoadingCache<JobID, ArchivedExecutionGraph> archivedExecutionGraphCache;

    private final ScheduledFuture<?> cleanupFuture;

    private final Thread shutdownHook;

    private int numFinishedJobs;

    private int numFailedJobs;

    private int numCanceledJobs;

    public FileArchivedExecutionGraphStore(
            File rootDir,
            Time expirationTime,
            long maximumCacheSizeBytes,
            ScheduledExecutor scheduledExecutor,
            Ticker ticker) throws IOException {

        final File storageDirectory = initExecutionGraphStorageDirectory(rootDir);

        LOG.info(
            "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.",
            FileArchivedExecutionGraphStore.class.getSimpleName(),
            storageDirectory,
            expirationTime.toMilliseconds(),
            maximumCacheSizeBytes);

        this.storageDir = Preconditions.checkNotNull(storageDirectory);
        Preconditions.checkArgument(
            storageDirectory.exists() && storageDirectory.isDirectory(),
            "The storage directory must exist and be a directory.");
        this.jobDetailsCache = CacheBuilder.newBuilder()
            .expireAfterWrite(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS)
            .removalListener(
                (RemovalListener<JobID, JobDetails>) notification -> deleteExecutionGraphFile(notification.getKey()))
            .ticker(ticker)
            .build();

        this.archivedExecutionGraphCache = CacheBuilder.newBuilder()
            .maximumWeight(maximumCacheSizeBytes)
            .weigher(this::calculateSize)
            .build(new CacheLoader<JobID, ArchivedExecutionGraph>() {
                @Override
                public ArchivedExecutionGraph load(JobID jobId) throws Exception {
                    return loadExecutionGraph(jobId);
                }});

        this.cleanupFuture = scheduledExecutor.scheduleWithFixedDelay(
            jobDetailsCache::cleanUp,
            expirationTime.toMilliseconds(),
            expirationTime.toMilliseconds(),
            TimeUnit.MILLISECONDS);

        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG);

        this.numFinishedJobs = 0;
        this.numFailedJobs = 0;
        this.numCanceledJobs = 0;
    }

    @Override
    public int size() {
        return Math.toIntExact(jobDetailsCache.size());
    }

    @Override
    @Nullable
    public ArchivedExecutionGraph get(JobID jobId) {
        try {
            return archivedExecutionGraphCache.get(jobId);
        } catch (ExecutionException e) {
            LOG.debug("Could not load archived execution graph for job id {}.", jobId, e);
            return null;
        }
    }

    @Override
    public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
        final JobStatus jobStatus = archivedExecutionGraph.getState();
        final JobID jobId = archivedExecutionGraph.getJobID();
        final String jobName = archivedExecutionGraph.getJobName();

        Preconditions.checkArgument(
            jobStatus.isGloballyTerminalState(),
            "The job " + jobName + '(' + jobId +
                ") is not in a globally terminal state. Instead it is in state " + jobStatus + '.');

        switch (jobStatus) {
            case FINISHED:
                numFinishedJobs++;
                break;
            case CANCELED:
                numCanceledJobs++;
                break;
            case FAILED:
                numFailedJobs++;
                break;
            default:
                throw new IllegalStateException("The job " + jobName + '(' +
                    jobId + ") should have been in a globally terminal state. " +
                    "Instead it was in state " + jobStatus + '.');
        }

        // write the ArchivedExecutionGraph to disk
        storeArchivedExecutionGraph(archivedExecutionGraph);

        final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);

        jobDetailsCache.put(jobId, detailsForJob);
        archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
    }

    @Override
    public JobsOverview getStoredJobsOverview() {
        return new JobsOverview(0, numFinishedJobs, numCanceledJobs, numFailedJobs);
    }

    @Override
    public Collection<JobDetails> getAvailableJobDetails() {
        return jobDetailsCache.asMap().values();
    }

    @Nullable
    @Override
    public JobDetails getAvailableJobDetails(JobID jobId) {
        return jobDetailsCache.getIfPresent(jobId);
    }

    @Override
    public void close() throws IOException {
        cleanupFuture.cancel(false);

        jobDetailsCache.invalidateAll();

        // clean up the storage directory
        FileUtils.deleteFileOrDirectory(storageDir);

        // Remove shutdown hook to prevent resource leaks
        ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
    }

    // --------------------------------------------------------------
    // Internal methods
    // --------------------------------------------------------------

    private int calculateSize(JobID jobId, ArchivedExecutionGraph serializableExecutionGraph) {
        final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);

        if (archivedExecutionGraphFile.exists()) {
            return Math.toIntExact(archivedExecutionGraphFile.length());
        } else {
            LOG.debug("Could not find archived execution graph file for {}. Estimating the size instead.", jobId);
            return serializableExecutionGraph.getAllVertices().size() * 1000 +
                serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
        }
    }

    private ArchivedExecutionGraph loadExecutionGraph(JobID jobId) throws IOException, ClassNotFoundException {
        final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);

        if (archivedExecutionGraphFile.exists()) {
            try (FileInputStream fileInputStream = new FileInputStream(archivedExecutionGraphFile)) {
                return InstantiationUtil.deserializeObject(fileInputStream, getClass().getClassLoader());
            }
        } else {
            throw new FileNotFoundException("Could not find file for archived execution graph " + jobId +
                ". This indicates that the file either has been deleted or never written.");
        }
    }

    private void storeArchivedExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
        final File archivedExecutionGraphFile = getExecutionGraphFile(archivedExecutionGraph.getJobID());

        try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile)) {
            InstantiationUtil.serializeObject(fileOutputStream, archivedExecutionGraph);
        }
    }

    private File getExecutionGraphFile(JobID jobId) {
        return new File(storageDir, jobId.toString());
    }

    private void deleteExecutionGraphFile(JobID jobId) {
        Preconditions.checkNotNull(jobId);

        final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);

        try {
            FileUtils.deleteFileOrDirectory(archivedExecutionGraphFile);
        } catch (IOException e) {
            LOG.debug("Could not delete file {}.", archivedExecutionGraphFile, e);
        }

        archivedExecutionGraphCache.invalidate(jobId);
        jobDetailsCache.invalidate(jobId);
    }

    private static File initExecutionGraphStorageDirectory(File tmpDir) throws IOException {
        final int maxAttempts = 10;

        for (int attempt = 0; attempt < maxAttempts; attempt++) {
            final File storageDirectory = new File(tmpDir, "executionGraphStore-" + UUID.randomUUID());

            if (storageDirectory.mkdir()) {
                return storageDirectory;
            }
        }

        throw new IOException("Could not create executionGraphStorage directory in " + tmpDir + '.');
    }

    // --------------------------------------------------------------
    // Testing methods
    // --------------------------------------------------------------

    @VisibleForTesting
    File getStorageDir() {
        return storageDir;
    }

    @VisibleForTesting
    LoadingCache<JobID, ArchivedExecutionGraph> getArchivedExecutionGraphCache() {
        return archivedExecutionGraphCache;
    }
}
  • FileArchivedExecutionGraphStore implements the archivedexecutiongraphstore interface, and its constructor uses guava cache to create jobDetailsCache and archivedExecutionGraphCache.
  • The expiration of jobDetailsCache uses expirationTime, i.e. jobstore.expiration-time configuration. The maximumWeight of archivedExecutionGraphCache uses maximumCacheSizeBytes, or jobstore.cache-size configuration.
  • FileArchiveExecutionGraphStore also sets a timed task to execute the cleanUp method of jobDetailsCache at intervals of expirationTime to clean up the cache.

Summary

  • Flink’s jobstore has two configurations: jobstore.cache-size defaults to 50M and jobstore.expiration-time defaults to 1 hour.
  • SessionClusterEntrypoint’s CreateSerializableExecutionGraphStore method reads the JobManagerOptions. Job _ Store _ Expiration _ Time and JobManagerOpti ons. Job _ Store _ Cache _ Size configurations, and then creates FileArchiveExecutionGraphStore
  • FileArchivedExecutionGraphStore implements the ArchiveExecutionGraphStore interface. Its constructor uses guava cache to create jobDetailsCache and archivedExecutionGraphCache; . The expiration of jobDetailsCache uses expirationTime, i.e. jobstore.expiration-time configuration. The maximumWeight of archivedExecutionGraphCache uses maximumCacheSizeBytes, i.e. jobstore.cache-size configuration. It also sets a timed task to execute the cleanUp method of jobDetailsCache every expirationTime to clean up the cache.

doc