Talk about flink’s HistoryServer

  flink

Order

This article mainly studies flink’s HistoryServer

HistoryServer

flink-1.7.2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java

public class HistoryServer {

    private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class);

    private final Configuration config;

    private final String webAddress;
    private final int webPort;
    private final long webRefreshIntervalMillis;
    private final File webDir;

    private final HistoryServerArchiveFetcher archiveFetcher;

    @Nullable
    private final SSLHandlerFactory serverSSLFactory;
    private WebFrontendBootstrap netty;

    private final Object startupShutdownLock = new Object();
    private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
    private final Thread shutdownHook;

    public static void main(String[] args) throws Exception {
        ParameterTool pt = ParameterTool.fromArgs(args);
        String configDir = pt.getRequired("configDir");

        LOG.info("Loading configuration from {}", configDir);
        final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir);

        try {
            FileSystem.initialize(flinkConfig);
        } catch (IOException e) {
            throw new Exception("Error while setting the default filesystem scheme from configuration.", e);
        }

        // run the history server
        SecurityUtils.install(new SecurityConfiguration(flinkConfig));

        try {
            SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    HistoryServer hs = new HistoryServer(flinkConfig);
                    hs.run();
                    return 0;
                }
            });
            System.exit(0);
        } catch (Throwable t) {
            final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("Failed to run HistoryServer.", strippedThrowable);
            strippedThrowable.printStackTrace();
            System.exit(1);
        }
    }

    public HistoryServer(Configuration config) throws IOException, FlinkException {
        this(config, new CountDownLatch(0));
    }

    public HistoryServer(Configuration config, CountDownLatch numFinishedPolls) throws IOException, FlinkException {
        Preconditions.checkNotNull(config);
        Preconditions.checkNotNull(numFinishedPolls);

        this.config = config;
        if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config)) {
            LOG.info("Enabling SSL for the history server.");
            try {
                this.serverSSLFactory = SSLUtils.createRestServerSSLEngineFactory(config);
            } catch (Exception e) {
                throw new IOException("Failed to initialize SSLContext for the history server.", e);
            }
        } else {
            this.serverSSLFactory = null;
        }

        webAddress = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS);
        webPort = config.getInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT);
        webRefreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_WEB_REFRESH_INTERVAL);

        String webDirectory = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR);
        if (webDirectory == null) {
            webDirectory = System.getProperty("java.io.tmpdir") + File.separator + "flink-web-history-" + UUID.randomUUID();
        }
        webDir = new File(webDirectory);

        String refreshDirectories = config.getString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS);
        if (refreshDirectories == null) {
            throw new FlinkException(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS + " was not configured.");
        }
        List<RefreshLocation> refreshDirs = new ArrayList<>();
        for (String refreshDirectory : refreshDirectories.split(",")) {
            try {
                Path refreshPath = WebMonitorUtils.validateAndNormalizeUri(new Path(refreshDirectory).toUri());
                FileSystem refreshFS = refreshPath.getFileSystem();
                refreshDirs.add(new RefreshLocation(refreshPath, refreshFS));
            } catch (Exception e) {
                // there's most likely something wrong with the path itself, so we ignore it from here on
                LOG.warn("Failed to create Path or FileSystem for directory '{}'. Directory will not be monitored.", refreshDirectory, e);
            }
        }

        if (refreshDirs.isEmpty()) {
            throw new FlinkException("Failed to validate any of the configured directories to monitor.");
        }

        long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
        archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numFinishedPolls);

        this.shutdownHook = ShutdownHookUtil.addShutdownHook(
            HistoryServer.this::stop,
            HistoryServer.class.getSimpleName(),
            LOG);
    }

    @VisibleForTesting
    int getWebPort() {
        return netty.getServerPort();
    }

    public void run() {
        try {
            start();
            new CountDownLatch(1).await();
        } catch (Exception e) {
            LOG.error("Failure while running HistoryServer.", e);
        } finally {
            stop();
        }
    }

    // ------------------------------------------------------------------------
    // Life-cycle
    // ------------------------------------------------------------------------

    void start() throws IOException, InterruptedException {
        synchronized (startupShutdownLock) {
            LOG.info("Starting history server.");

            Files.createDirectories(webDir.toPath());
            LOG.info("Using directory {} as local cache.", webDir);

            Router router = new Router();
            router.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));

            if (!webDir.exists() && !webDir.mkdirs()) {
                throw new IOException("Failed to create local directory " + webDir.getAbsoluteFile() + ".");
            }

            createDashboardConfigFile();

            archiveFetcher.start();

            netty = new WebFrontendBootstrap(router, LOG, webDir, serverSSLFactory, webAddress, webPort, config);
        }
    }

    void stop() {
        if (shutdownRequested.compareAndSet(false, true)) {
            synchronized (startupShutdownLock) {
                LOG.info("Stopping history server.");

                try {
                    netty.shutdown();
                } catch (Throwable t) {
                    LOG.warn("Error while shutting down WebFrontendBootstrap.", t);
                }

                archiveFetcher.stop();

                try {
                    LOG.info("Removing web dashboard root cache directory {}", webDir);
                    FileUtils.deleteDirectory(webDir);
                } catch (Throwable t) {
                    LOG.warn("Error while deleting web root directory {}", webDir, t);
                }

                LOG.info("Stopped history server.");

                // Remove shutdown hook to prevent resource leaks
                ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
            }
        }
    }

    // ------------------------------------------------------------------------
    // File generation
    // ------------------------------------------------------------------------

    static FileWriter createOrGetFile(File folder, String name) throws IOException {
        File file = new File(folder, name + ".json");
        if (!file.exists()) {
            Files.createFile(file.toPath());
        }
        FileWriter fr = new FileWriter(file);
        return fr;
    }

    private void createDashboardConfigFile() throws IOException {
        try (FileWriter fw = createOrGetFile(webDir, "config")) {
            fw.write(createConfigJson(DashboardConfiguration.from(webRefreshIntervalMillis, ZonedDateTime.now())));
            fw.flush();
        } catch (IOException ioe) {
            LOG.error("Failed to write config file.");
            throw ioe;
        }
    }

    private static String createConfigJson(DashboardConfiguration dashboardConfiguration) throws IOException {
        StringWriter writer = new StringWriter();
        JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);

        gen.writeStartObject();
        gen.writeNumberField(DashboardConfiguration.FIELD_NAME_REFRESH_INTERVAL, dashboardConfiguration.getRefreshInterval());
        gen.writeNumberField(DashboardConfiguration.FIELD_NAME_TIMEZONE_OFFSET, dashboardConfiguration.getTimeZoneOffset());
        gen.writeStringField(DashboardConfiguration.FIELD_NAME_TIMEZONE_NAME, dashboardConfiguration.getTimeZoneName());
        gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, dashboardConfiguration.getFlinkVersion());
        gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision());

        gen.writeEndObject();

        gen.close();

        return writer.toString();
    }

    /**
     * Container for the {@link Path} and {@link FileSystem} of a refresh directory.
     */
    static class RefreshLocation {
        private final Path path;
        private final FileSystem fs;

        private RefreshLocation(Path path, FileSystem fs) {
            this.path = path;
            this.fs = fs;
        }

        public Path getPath() {
            return path;
        }

        public FileSystem getFs() {
            return fs;
        }
    }
}
  • HistoryServer provides relevant query functions for finished jobs; The constructor reads historyserver.web.address, historyserver.web.port (Default 8082)、historyserver.web.refresh-interval(Default 10 seconds)、historyserver.web.tmpdir、historyserver.archive.fs.dir、historyserver.archive.fs.refresh-interval(Default 10 seconds), and then created HistoryServerArchiveFetcher
  • Its run method mainly calls the start method, which mainly starts HistoryServerArchiveFetcher and then creates WebFrontendBootstrap
  • The constructor uses shutdownookutil. addshutdownook to register ShutdownHook and execute the stop method when shutdown. the stop method mainly calls W ebFrontendBootstrap’s shutdown method and HistoryServerArchiveFetcher’s stop method, then cleans up webDir and removes shutdownHook.

HistoryServerArchiveFetcher

flink-1.7.2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java

class HistoryServerArchiveFetcher {

    private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);

    private static final JsonFactory jacksonFactory = new JsonFactory();
    private static final ObjectMapper mapper = new ObjectMapper();

    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
        new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
    private final JobArchiveFetcherTask fetcherTask;
    private final long refreshIntervalMillis;

    HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
        this.refreshIntervalMillis = refreshIntervalMillis;
        this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir, numFinishedPolls);
        if (LOG.isInfoEnabled()) {
            for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
                LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath());
            }
        }
    }

    void start() {
        executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS);
    }

    void stop() {
        executor.shutdown();

        try {
            if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException ignored) {
            executor.shutdownNow();
        }
    }

    /**
     * {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS} for
     * new job archives.
     */
    static class JobArchiveFetcherTask extends TimerTask {

        private final List<HistoryServer.RefreshLocation> refreshDirs;
        private final CountDownLatch numFinishedPolls;

        /** Cache of all available jobs identified by their id. */
        private final Set<String> cachedArchives;

        private final File webDir;
        private final File webJobDir;
        private final File webOverviewDir;

        private static final String JSON_FILE_ENDING = ".json";

        JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
            this.refreshDirs = checkNotNull(refreshDirs);
            this.numFinishedPolls = numFinishedPolls;
            this.cachedArchives = new HashSet<>();
            this.webDir = checkNotNull(webDir);
            this.webJobDir = new File(webDir, "jobs");
            webJobDir.mkdir();
            this.webOverviewDir = new File(webDir, "overviews");
            webOverviewDir.mkdir();
        }

        @Override
        public void run() {
            try {
                for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
                    Path refreshDir = refreshLocation.getPath();
                    FileSystem refreshFS = refreshLocation.getFs();

                    // contents of /:refreshDir
                    FileStatus[] jobArchives;
                    try {
                        jobArchives = refreshFS.listStatus(refreshDir);
                    } catch (IOException e) {
                        LOG.error("Failed to access job archive location for path {}.", refreshDir, e);
                        continue;
                    }
                    if (jobArchives == null) {
                        continue;
                    }
                    boolean updateOverview = false;
                    for (FileStatus jobArchive : jobArchives) {
                        Path jobArchivePath = jobArchive.getPath();
                        String jobID = jobArchivePath.getName();
                        try {
                            JobID.fromHexString(jobID);
                        } catch (IllegalArgumentException iae) {
                            LOG.debug("Archive directory {} contained file with unexpected name {}. Ignoring file.",
                                refreshDir, jobID, iae);
                            continue;
                        }
                        if (cachedArchives.add(jobID)) {
                            try {
                                for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchive.getPath())) {
                                    String path = archive.getPath();
                                    String json = archive.getJson();

                                    File target;
                                    if (path.equals(JobsOverviewHeaders.URL)) {
                                        target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
                                    } else if (path.equals("/joboverview")) { // legacy path
                                        json = convertLegacyJobOverview(json);
                                        target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
                                    } else {
                                        target = new File(webDir, path + JSON_FILE_ENDING);
                                    }

                                    java.nio.file.Path parent = target.getParentFile().toPath();

                                    try {
                                        Files.createDirectories(parent);
                                    } catch (FileAlreadyExistsException ignored) {
                                        // there may be left-over directories from the previous attempt
                                    }

                                    java.nio.file.Path targetPath = target.toPath();

                                    // We overwrite existing files since this may be another attempt at fetching this archive.
                                    // Existing files may be incomplete/corrupt.
                                    Files.deleteIfExists(targetPath);

                                    Files.createFile(target.toPath());
                                    try (FileWriter fw = new FileWriter(target)) {
                                        fw.write(json);
                                        fw.flush();
                                    }
                                }
                                updateOverview = true;
                            } catch (IOException e) {
                                LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e);
                                // Make sure we attempt to fetch the archive again
                                cachedArchives.remove(jobID);
                                // Make sure we do not include this job in the overview
                                try {
                                    Files.delete(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath());
                                } catch (IOException ioe) {
                                    LOG.debug("Could not delete file from overview directory.", ioe);
                                }

                                // Clean up job files we may have created
                                File jobDirectory = new File(webJobDir, jobID);
                                try {
                                    FileUtils.deleteDirectory(jobDirectory);
                                } catch (IOException ioe) {
                                    LOG.debug("Could not clean up job directory.", ioe);
                                }
                            }
                        }
                    }
                    if (updateOverview) {
                        updateJobOverview(webOverviewDir, webDir);
                    }
                }
            } catch (Exception e) {
                LOG.error("Critical failure while fetching/processing job archives.", e);
            }
            numFinishedPolls.countDown();
        }
    }

    private static String convertLegacyJobOverview(String legacyOverview) throws IOException {
        JsonNode root = mapper.readTree(legacyOverview);
        JsonNode finishedJobs = root.get("finished");
        JsonNode job = finishedJobs.get(0);

        JobID jobId = JobID.fromHexString(job.get("jid").asText());
        String name = job.get("name").asText();
        JobStatus state = JobStatus.valueOf(job.get("state").asText());

        long startTime = job.get("start-time").asLong();
        long endTime = job.get("end-time").asLong();
        long duration = job.get("duration").asLong();
        long lastMod = job.get("last-modification").asLong();

        JsonNode tasks = job.get("tasks");
        int numTasks = tasks.get("total").asInt();
        int pending = tasks.get("pending").asInt();
        int running = tasks.get("running").asInt();
        int finished = tasks.get("finished").asInt();
        int canceling = tasks.get("canceling").asInt();
        int canceled = tasks.get("canceled").asInt();
        int failed = tasks.get("failed").asInt();

        int[] tasksPerState = new int[ExecutionState.values().length];
        // pending is a mix of CREATED/SCHEDULED/DEPLOYING
        // to maintain the correct number of task states we have to pick one of them
        tasksPerState[ExecutionState.SCHEDULED.ordinal()] = pending;
        tasksPerState[ExecutionState.RUNNING.ordinal()] = running;
        tasksPerState[ExecutionState.FINISHED.ordinal()] = finished;
        tasksPerState[ExecutionState.CANCELING.ordinal()] = canceling;
        tasksPerState[ExecutionState.CANCELED.ordinal()] = canceled;
        tasksPerState[ExecutionState.FAILED.ordinal()] = failed;

        JobDetails jobDetails = new JobDetails(jobId, name, startTime, endTime, duration, state, lastMod, tasksPerState, numTasks);
        MultipleJobsDetails multipleJobsDetails = new MultipleJobsDetails(Collections.singleton(jobDetails));

        StringWriter sw = new StringWriter();
        mapper.writeValue(sw, multipleJobsDetails);
        return sw.toString();
    }

    /**
     * This method replicates the JSON response that would be given by the JobsOverviewHandler when
     * listing both running and finished jobs.
     *
     * <p>Every job archive contains a joboverview.json file containing the same structure. Since jobs are archived on
     * their own however the list of finished jobs only contains a single job.
     *
     * <p>For the display in the HistoryServer WebFrontend we have to combine these overviews.
     */
    private static void updateJobOverview(File webOverviewDir, File webDir) {
        try (JsonGenerator gen = jacksonFactory.createGenerator(HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) {
            File[] overviews = new File(webOverviewDir.getPath()).listFiles();
            if (overviews != null) {
                Collection<JobDetails> allJobs = new ArrayList<>(overviews.length);
                for (File overview : overviews) {
                    MultipleJobsDetails subJobs = mapper.readValue(overview, MultipleJobsDetails.class);
                    allJobs.addAll(subJobs.getJobs());
                }
                mapper.writeValue(gen, new MultipleJobsDetails(allJobs));
            }
        } catch (IOException ioe) {
            LOG.error("Failed to update job overview.", ioe);
        }
    }
}
  • HistoryServerArchiveFetcher mainly pulls jobarchives from the HistoryServer.archive.fs.refresh-interval directory at intervals of historyserver.archive.fs.dir; It internally created JobArchiveFetcherTask to perform this task
  • JobArchiveFetcherTask inherits jdk’s TimerTask. Its run method is to traverse refreshDirs, then execute FileSystem.listStatus, and then use fsjobArchivist.getArchivedJSONS to obtain ArchivedJson and write it to the specified file according to different path.
  • Json file if path is /jobs/overview; If path is /joboverview, first call convertLegacyJobOverview to convert json, and then write into webDir/overviews/jobID.json file; The other path are written to the webDir/path.json file.

WebFrontendBootstrap

flink-1.7.2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java

public class WebFrontendBootstrap {
    private final Router router;
    private final Logger log;
    private final File uploadDir;
    private final ServerBootstrap bootstrap;
    private final Channel serverChannel;
    private final String restAddress;

    public WebFrontendBootstrap(
            Router router,
            Logger log,
            File directory,
            @Nullable SSLHandlerFactory serverSSLFactory,
            String configuredAddress,
            int configuredPort,
            final Configuration config) throws InterruptedException, UnknownHostException {

        this.router = Preconditions.checkNotNull(router);
        this.log = Preconditions.checkNotNull(log);
        this.uploadDir = directory;

        ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) {
                RouterHandler handler = new RouterHandler(WebFrontendBootstrap.this.router, new HashMap<>());

                // SSL should be the first handler in the pipeline
                if (serverSSLFactory != null) {
                    ch.pipeline().addLast("ssl", serverSSLFactory.createNettySSLHandler());
                }

                ch.pipeline()
                    .addLast(new HttpServerCodec())
                    .addLast(new ChunkedWriteHandler())
                    .addLast(new HttpRequestHandler(uploadDir))
                    .addLast(handler.getName(), handler)
                    .addLast(new PipelineErrorHandler(WebFrontendBootstrap.this.log));
            }
        };

        NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        this.bootstrap = new ServerBootstrap();
        this.bootstrap
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(initializer);

        ChannelFuture ch;
        if (configuredAddress == null) {
            ch = this.bootstrap.bind(configuredPort);
        } else {
            ch = this.bootstrap.bind(configuredAddress, configuredPort);
        }
        this.serverChannel = ch.sync().channel();

        InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();

        InetAddress inetAddress = bindAddress.getAddress();
        final String address;

        if (inetAddress.isAnyLocalAddress()) {
            address = config.getString(JobManagerOptions.ADDRESS, InetAddress.getLocalHost().getHostName());
        } else {
            address = inetAddress.getHostAddress();
        }

        int port = bindAddress.getPort();

        this.log.info("Web frontend listening at {}" + ':' + "{}", address, port);

        final String protocol = serverSSLFactory != null ? "https://" : "http://";

        this.restAddress = protocol + address + ':' + port;
    }

    public ServerBootstrap getBootstrap() {
        return bootstrap;
    }

    public int getServerPort() {
        Channel server = this.serverChannel;
        if (server != null) {
            try {
                return ((InetSocketAddress) server.localAddress()).getPort();
            }
            catch (Exception e) {
                log.error("Cannot access local server port", e);
            }
        }

        return -1;
    }

    public String getRestAddress() {
        return restAddress;
    }

    public void shutdown() {
        if (this.serverChannel != null) {
            this.serverChannel.close().awaitUninterruptibly();
        }
        if (bootstrap != null) {
            if (bootstrap.group() != null) {
                bootstrap.group().shutdownGracefully();
            }
            if (bootstrap.childGroup() != null) {
                bootstrap.childGroup().shutdownGracefully();
            }
        }
    }
}
  • WebFrontendBootstrap started an http server using netty, and its pipeline include HttpServerCodec, ChunkedWriteHandler, HttpRequestHandler, RouterHandler, PipelineErrorHandler; ; The Router of the RouterHandler here has a GET route, which uses the HistoryServerstaticfileserverhandler to provide static file services to the historyserver.

Summary

  • HistoryServer provides relevant query functions for finished jobs; It mainly consists of two parts: HistoryServerArchiveFetcher and WebFrontendBootstrap. Its run method mainly calls the start method, which mainly starts HistoryServerArchiveFetcher and then creates WebFrontendBootstrap
  • HistoryServerArchiveFetcher mainly pulls jobarchives from the HistoryServer.archive.fs.refresh-interval directory at intervals of historyserver.archive.fs.dir; It internally created JobArchiveFetcherTask to perform this task; JobArchiveFetcherTask inherits jdk’s TimerTask. Its run method is to traverse refreshDirs, then execute FileSystem.listStatus, and then use fsjobArchivist.getArchivedJSONS to obtain ArchivedJson and write it to the specified file according to different path.
  • WebFrontendBootstrap started an http server using netty, and its pipeline include HttpServerCodec, ChunkedWriteHandler, HttpRequestHandler, RouterHandler, PipelineErrorHandler; ; The Router of the RouterHandler here has a GET route, which uses the HistoryServerstaticfileserverhandler to provide static file services to the historyserver.

doc