File configuration of flink

  flink

Order

This article mainly studies flink’s log.file configuration.

log4j.properties

flink-release-1.6.2/flink-dist/src/main/flink-bin/conf/log4j.properties

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
  • Here, log.file is used as the system attribute to configure log4j.appender.file.file

MiniCluster

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/minicluster/MiniCluster.java

    /**
     * Starts the mini cluster, based on the configured properties.
     *
     * @throws Exception This method passes on any exception that occurs during the startup of
     *                   the mini cluster.
     */
    public void start() throws Exception {
        synchronized (lock) {
            checkState(!running, "FlinkMiniCluster is already running");

            LOG.info("Starting Flink Mini Cluster");
            LOG.debug("Using configuration {}", miniClusterConfiguration);

            final Configuration configuration = miniClusterConfiguration.getConfiguration();
            final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
            final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
            final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;

            try {
                initializeIOFormatClasses(configuration);

                LOG.info("Starting Metrics Registry");
                metricRegistry = createMetricRegistry(configuration);
                this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
                    metricRegistry,
                    "localhost");

                final RpcService jobManagerRpcService;
                final RpcService resourceManagerRpcService;
                final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];

                // bring up all the RPC services
                LOG.info("Starting RPC Service(s)");

                // we always need the 'commonRpcService' for auxiliary calls
                commonRpcService = createRpcService(configuration, rpcTimeout, false, null);

                // TODO: Temporary hack until the metric query service is ported to the RpcEndpoint
                final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem();
                metricRegistry.startQueryService(actorSystem, null);

                if (useSingleRpcService) {
                    for (int i = 0; i < numTaskManagers; i++) {
                        taskManagerRpcServices[i] = commonRpcService;
                    }

                    jobManagerRpcService = commonRpcService;
                    resourceManagerRpcService = commonRpcService;

                    this.resourceManagerRpcService = null;
                    this.jobManagerRpcService = null;
                    this.taskManagerRpcServices = null;
                }
                else {
                    // start a new service per component, possibly with custom bind addresses
                    final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
                    final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
                    final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();

                    jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
                    resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);

                    for (int i = 0; i < numTaskManagers; i++) {
                        taskManagerRpcServices[i] = createRpcService(
                                configuration, rpcTimeout, true, taskManagerBindAddress);
                    }

                    this.jobManagerRpcService = jobManagerRpcService;
                    this.taskManagerRpcServices = taskManagerRpcServices;
                    this.resourceManagerRpcService = resourceManagerRpcService;
                }

                // create the high-availability services
                LOG.info("Starting high-availability services");
                haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
                    configuration,
                    commonRpcService.getExecutor());

                blobServer = new BlobServer(configuration, haServices.createBlobStore());
                blobServer.start();

                heartbeatServices = HeartbeatServices.fromConfiguration(configuration);

                // bring up the ResourceManager(s)
                LOG.info("Starting ResourceManger");
                resourceManagerRunner = startResourceManager(
                    configuration,
                    haServices,
                    heartbeatServices,
                    metricRegistry,
                    resourceManagerRpcService,
                    new ClusterInformation("localhost", blobServer.getPort()),
                    jobManagerMetricGroup);

                blobCacheService = new BlobCacheService(
                    configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
                );

                // bring up the TaskManager(s) for the mini cluster
                LOG.info("Starting {} TaskManger(s)", numTaskManagers);
                taskManagers = startTaskManagers(
                    configuration,
                    haServices,
                    heartbeatServices,
                    metricRegistry,
                    blobCacheService,
                    numTaskManagers,
                    taskManagerRpcServices);

                // starting the dispatcher rest endpoint
                LOG.info("Starting dispatcher rest endpoint.");

                dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
                    jobManagerRpcService,
                    DispatcherGateway.class,
                    DispatcherId::fromUuid,
                    20,
                    Time.milliseconds(20L));
                final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
                    jobManagerRpcService,
                    ResourceManagerGateway.class,
                    ResourceManagerId::fromUuid,
                    20,
                    Time.milliseconds(20L));

                this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
                    RestServerEndpointConfiguration.fromConfiguration(configuration),
                    dispatcherGatewayRetriever,
                    configuration,
                    RestHandlerConfiguration.fromConfiguration(configuration),
                    resourceManagerGatewayRetriever,
                    blobServer.getTransientBlobService(),
                    WebMonitorEndpoint.createExecutorService(
                        configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
                        configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                        "DispatcherRestEndpoint"),
                    new AkkaQueryServiceRetriever(
                        actorSystem,
                        Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
                    haServices.getWebMonitorLeaderElectionService(),
                    new ShutDownFatalErrorHandler());

                dispatcherRestEndpoint.start();

                restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl());

                // bring up the dispatcher that launches JobManagers when jobs submitted
                LOG.info("Starting job dispatcher(s) for JobManger");

                this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost");

                final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);

                dispatcher = new StandaloneDispatcher(
                    jobManagerRpcService,
                    Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
                    configuration,
                    haServices,
                    resourceManagerRunner.getResourceManageGateway(),
                    blobServer,
                    heartbeatServices,
                    jobManagerMetricGroup,
                    metricRegistry.getMetricQueryServicePath(),
                    new MemoryArchivedExecutionGraphStore(),
                    Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                    new ShutDownFatalErrorHandler(),
                    dispatcherRestEndpoint.getRestBaseUrl(),
                    historyServerArchivist);

                dispatcher.start();

                resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
                dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();

                resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
                dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
            }
            catch (Exception e) {
                // cleanup everything
                try {
                    close();
                } catch (Exception ee) {
                    e.addSuppressed(ee);
                }
                throw e;
            }

            // create a new termination future
            terminationFuture = new CompletableFuture<>();

            // now officially mark this as running
            running = true;

            LOG.info("Flink Mini Cluster started successfully");
        }
    }
  • Here, we first created metricRegistry, commonRpcService, jobManagerRpcService, resourceManagerRpcService, haServices, blobServer, heartbeatServices, resourceManagerRunner, blobCacheService、taskManagers、dispatcherGatewayRetriever、dispatcherRestEndpoint、dispatcher、dispatcherLeaderRetriever

RestServerEndpoint

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/rest/RestServerEndpoint.java

    /**
     * Starts this REST server endpoint.
     *
     * @throws Exception if we cannot start the RestServerEndpoint
     */
    public final void start() throws Exception {
        synchronized (lock) {
            Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted.");

            log.info("Starting rest endpoint.");

            final Router router = new Router();
            final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();

            List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = initializeHandlers(restAddressFuture);

            /* sort the handlers such that they are ordered the following:
             * /jobs
             * /jobs/overview
             * /jobs/:jobid
             * /jobs/:jobid/config
             * /:*
             */
            Collections.sort(
                handlers,
                RestHandlerUrlComparator.INSTANCE);

            handlers.forEach(handler -> {
                log.debug("Register handler {} under {}@{}.", handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL());
                registerHandler(router, handler);
            });

            ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) {
                    RouterHandler handler = new RouterHandler(router, responseHeaders);

                    // SSL should be the first handler in the pipeline
                    if (sslEngineFactory != null) {
                        ch.pipeline().addLast("ssl",
                            new RedirectingSslHandler(restAddress, restAddressFuture, sslEngineFactory));
                    }

                    ch.pipeline()
                        .addLast(new HttpServerCodec())
                        .addLast(new FileUploadHandler(uploadDir))
                        .addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders))
                        .addLast(new ChunkedWriteHandler())
                        .addLast(handler.getName(), handler)
                        .addLast(new PipelineErrorHandler(log, responseHeaders));
                }
            };

            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("flink-rest-server-netty-boss"));
            NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("flink-rest-server-netty-worker"));

            bootstrap = new ServerBootstrap();
            bootstrap
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(initializer);

            log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort);
            final ChannelFuture channel;
            if (restBindAddress == null) {
                channel = bootstrap.bind(restBindPort);
            } else {
                channel = bootstrap.bind(restBindAddress, restBindPort);
            }
            serverChannel = channel.syncUninterruptibly().channel();

            final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
            final String advertisedAddress;
            if (bindAddress.getAddress().isAnyLocalAddress()) {
                advertisedAddress = this.restAddress;
            } else {
                advertisedAddress = bindAddress.getAddress().getHostAddress();
            }
            final int port = bindAddress.getPort();

            log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);

            final String protocol;

            if (sslEngineFactory != null) {
                protocol = "https://";
            } else {
                protocol = "http://";
            }

            restBaseUrl = protocol + advertisedAddress + ':' + port;

            restAddressFuture.complete(restBaseUrl);

            state = State.RUNNING;

            startInternal();
        }
    }
  • InitializeHandlers is called here to get ChannelInboundHandler, which is implemented in the subclass DispatcherRestEndpoint.

DispatcherRestEndpoint

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java

    @Override
    protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
        List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(restAddressFuture);

        // Add the Dispatcher specific handlers

        final Time timeout = restConfiguration.getTimeout();

        JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
            restAddressFuture,
            leaderRetriever,
            timeout,
            responseHeaders,
            executor,
            clusterConfiguration);

        if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
            try {
                webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
                    leaderRetriever,
                    restAddressFuture,
                    timeout,
                    responseHeaders,
                    uploadDir,
                    executor,
                    clusterConfiguration);

                // register extension handlers
                handlers.addAll(webSubmissionExtension.getHandlers());
            } catch (FlinkException e) {
                if (log.isDebugEnabled()) {
                    log.debug("Failed to load web based job submission extension.", e);
                } else {
                    log.info("Failed to load web based job submission extension. " +
                        "Probable reason: flink-runtime-web is not in the classpath.");
                }
            }
        } else {
            log.info("Web-based job submission is not enabled.");
        }

        handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));

        return handlers;
    }
  • First, initializeHandlers of the parent class are called here, where the parent class is WebMonitorEndpoint (It is a direct subclass of DispatcherRestEndpoint, which in turn inherits WebMonitorEndpoint.)

WebMonitorEndpoint

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java

    @Override
    protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
        ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30);

        final Time timeout = restConfiguration.getTimeout();

        //......

        // TODO: Remove once the Yarn proxy can forward all REST verbs
        handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), jobCancelTerminationHandler));
        handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), jobStopTerminationHandler));

        handlers.add(Tuple2.of(shutdownHandler.getMessageHeaders(), shutdownHandler));

        //......

        // load the log and stdout file handler for the main cluster component
        final WebMonitorUtils.LogFileLocation logFileLocation = WebMonitorUtils.LogFileLocation.find(clusterConfiguration);

        final ChannelInboundHandler logFileHandler = createStaticFileHandler(
            restAddressFuture,
            timeout,
            logFileLocation.logFile);

        final ChannelInboundHandler stdoutFileHandler = createStaticFileHandler(
            restAddressFuture,
            timeout,
            logFileLocation.stdOutFile);

        handlers.add(Tuple2.of(LogFileHandlerSpecification.getInstance(), logFileHandler));
        handlers.add(Tuple2.of(StdoutFileHandlerSpecification.getInstance(), stdoutFileHandler));

        // TaskManager log and stdout file handler

        final Time cacheEntryDuration = Time.milliseconds(restConfiguration.getRefreshInterval());

        final TaskManagerLogFileHandler taskManagerLogFileHandler = new TaskManagerLogFileHandler(
            restAddressFuture,
            leaderRetriever,
            timeout,
            responseHeaders,
            TaskManagerLogFileHeaders.getInstance(),
            resourceManagerRetriever,
            transientBlobService,
            cacheEntryDuration);

        final TaskManagerStdoutFileHandler taskManagerStdoutFileHandler = new TaskManagerStdoutFileHandler(
            restAddressFuture,
            leaderRetriever,
            timeout,
            responseHeaders,
            TaskManagerStdoutFileHeaders.getInstance(),
            resourceManagerRetriever,
            transientBlobService,
            cacheEntryDuration);

        handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler));
        handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler));

        //......

    }

    @Nonnull
    private ChannelInboundHandler createStaticFileHandler(
            CompletableFuture<String> restAddressFuture,
            Time timeout,
            File fileToServe) {

        if (fileToServe == null) {
            return new ConstantTextHandler("(file unavailable)");
        } else {
            try {
                return new StaticFileServerHandler<>(
                    leaderRetriever,
                    restAddressFuture,
                    timeout,
                    fileToServe);
            } catch (IOException e) {
                log.info("Cannot load log file handler.", e);
                return new ConstantTextHandler("(log file unavailable)");
            }
        }
    }
  • It initializes a series of ChannelInboundHandler and then registers them in handlers
  • For FileHandler of JobManager, it first called webmonitorutils.logFileLocation.find (clusterconfiguration) and built logfilelocation. After that, logFileLocation.logFile and logFileLocation.stdOutFile were used to construct logFileHandler and stdoutFileHandler respectively, which were used to process the download of logfile and stdoutfile respectively.
  • For FileHandler of TaskManager, TaskManagerLogFileHandler and TaskManagerStdoutFileHandler are respectively constructed to handle the download of log and stdout files.

JobManager FileHandler

WebMonitorUtils.LogFileLocation.find

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/webmonitor/WebMonitorUtils.java

    /**
     * Singleton to hold the log and stdout file.
     */
    public static class LogFileLocation {

        public final File logFile;
        public final File stdOutFile;

        private LogFileLocation(File logFile, File stdOutFile) {
            this.logFile = logFile;
            this.stdOutFile = stdOutFile;
        }

        /**
         * Finds the Flink log directory using log.file Java property that is set during startup.
         */
        public static LogFileLocation find(Configuration config) {
            final String logEnv = "log.file";
            String logFilePath = System.getProperty(logEnv);

            if (logFilePath == null) {
                LOG.warn("Log file environment variable '{}' is not set.", logEnv);
                logFilePath = config.getString(WebOptions.LOG_PATH);
            }

            // not configured, cannot serve log files
            if (logFilePath == null || logFilePath.length() < 4) {
                LOG.warn("JobManager log files are unavailable in the web dashboard. " +
                    "Log file location not found in environment variable '{}' or configuration key '{}'.",
                    logEnv, WebOptions.LOG_PATH);
                return new LogFileLocation(null, null);
            }

            String outFilePath = logFilePath.substring(0, logFilePath.length() - 3).concat("out");

            LOG.info("Determined location of main cluster component log file: {}", logFilePath);
            LOG.info("Determined location of main cluster component stdout file: {}", outFilePath);

            return new LogFileLocation(resolveFileLocation(logFilePath), resolveFileLocation(outFilePath));
        }

        /**
         * Verify log file location.
         *
         * @param logFilePath Path to log file
         * @return File or null if not a valid log file
         */
        private static File resolveFileLocation(String logFilePath) {
            File logFile = new File(logFilePath);
            return (logFile.exists() && logFile.canRead()) ? logFile : null;
        }
    }
  • Here, first read the log.file attribute from the system attribute, and if it is not found, print warning (Log file environment variable 'log.file' is not set.)
  • If log.file is not configured, read WebOptions.LOG_PATH (web.log.path) configuration, print warning (JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'w eb.log.path' , default: null (deprecated keys: [jobmanager.web.log.path])'.)
  • The reason why logFilePath.length () is greater than or equal to 4 is that logfilepath.substring (0, logfilepath.length ()-3). concat (“out”) is used to build outFilePath; ; Then, logfilepathand outFilePath are verified by resolveFileLocation method to build LogFileLocation return

StaticFileServerHandler

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java

/**
 * Simple file server handler that serves requests to web frontend's static files, such as
 * HTML, CSS, or JS files.
 *
 * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
 * example.</p>
 */
@ChannelHandler.Sharable
public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectHandler<T> {

    /** Timezone in which this server answers its "if-modified" requests. */
    private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT");

    /** Date format for HTTP. */
    public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";

    /** Be default, we allow files to be cached for 5 minutes. */
    private static final int HTTP_CACHE_SECONDS = 300;

    // ------------------------------------------------------------------------

    /** The path in which the static documents are. */
    private final File rootPath;

    public StaticFileServerHandler(
            GatewayRetriever<? extends T> retriever,
            CompletableFuture<String> localJobManagerAddressFuture,
            Time timeout,
            File rootPath) throws IOException {

        super(localJobManagerAddressFuture, retriever, timeout, Collections.emptyMap());

        this.rootPath = checkNotNull(rootPath).getCanonicalFile();
    }

    // ------------------------------------------------------------------------
    //  Responses to requests
    // ------------------------------------------------------------------------

    @Override
    protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, T gateway) throws Exception {
        final HttpRequest request = routedRequest.getRequest();
        final String requestPath;

        // make sure we request the "index.html" in case there is a directory request
        if (routedRequest.getPath().endsWith("/")) {
            requestPath = routedRequest.getPath() + "index.html";
        }
        // in case the files being accessed are logs or stdout files, find appropriate paths.
        else if (routedRequest.getPath().equals("/jobmanager/log") || routedRequest.getPath().equals("/jobmanager/stdout")) {
            requestPath = "";
        } else {
            requestPath = routedRequest.getPath();
        }

        respondToRequest(channelHandlerContext, request, requestPath);
    }

    //......

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (ctx.channel().isActive()) {
            logger.error("Caught exception", cause);
            sendError(ctx, INTERNAL_SERVER_ERROR);
        }
    }
}
  • For /jobmanager/log and /jobmanager/stdout, it will reset the requestPath and then call the respondToRequest process, which transfers files according to the rootPath.

TaskManager FileHandler

TaskManagerLogFileHandler

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogFileHandler.java

/**
 * Rest handler which serves the log files from {@link TaskExecutor}.
 */
public class TaskManagerLogFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {

    public TaskManagerLogFileHandler(
            @Nonnull CompletableFuture<String> localAddressFuture,
            @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever,
            @Nonnull Time timeout,
            @Nonnull Map<String, String> responseHeaders,
            @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders,
            @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
            @Nonnull TransientBlobService transientBlobService,
            @Nonnull Time cacheEntryDuration) {
        super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
    }

    @Override
    protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) {
        return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.LOG, timeout);
    }
}
  • Its requestFileUpload calls resourcemanager.requesttaskmanagerfileupload, and the FileType passed is FileType.LOG

TaskManagerStdoutFileHandler.requestFileUpload

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerStdoutFileHandler.java

/**
 * Rest handler which serves the stdout file of the {@link TaskExecutor}.
 */
public class TaskManagerStdoutFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {

    public TaskManagerStdoutFileHandler(
            @Nonnull CompletableFuture<String> localAddressFuture,
            @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever,
            @Nonnull Time timeout,
            @Nonnull Map<String, String> responseHeaders,
            @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders,
            @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
            @Nonnull TransientBlobService transientBlobService,
            @Nonnull Time cacheEntryDuration) {
        super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
    }

    @Override
    protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) {
        return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.STDOUT, timeout);
    }
}
  • Its requestFileUpload calls resourcemanager.requesttaskmanagerfileupload, and the FileType passed is FileType.STDOUT

ResourceManager.requestTaskManagerFileUpload

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/resourcemanager/ResourceManager.java

    @Override
    public CompletableFuture<TransientBlobKey> requestTaskManagerFileUpload(ResourceID taskManagerId, FileType fileType, Time timeout) {
        log.debug("Request file {} upload from TaskExecutor {}.", fileType, taskManagerId);

        final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId);

        if (taskExecutor == null) {
            log.debug("Requested file {} upload from unregistered TaskExecutor {}.", fileType, taskManagerId);
            return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId));
        } else {
            return taskExecutor.getTaskExecutorGateway().requestFileUpload(fileType, timeout);
        }
    }
  • RequestTaskManagerFileUpload for ResourceManager is implemented through TaskExecutor.requestFileUpload

TaskExecutor.requestFileUpload

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/taskexecutor/TaskExecutor.java

    @Override
    public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, Time timeout) {
        log.debug("Request file {} upload.", fileType);

        final String filePath;

        switch (fileType) {
            case LOG:
                filePath = taskManagerConfiguration.getTaskManagerLogPath();
                break;
            case STDOUT:
                filePath = taskManagerConfiguration.getTaskManagerStdoutPath();
                break;
            default:
                filePath = null;
        }

        if (filePath != null && !filePath.isEmpty()) {
            final File file = new File(filePath);

            if (file.exists()) {
                final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService();
                final TransientBlobKey transientBlobKey;
                try (FileInputStream fileInputStream = new FileInputStream(file)) {
                    transientBlobKey = transientBlobService.putTransient(fileInputStream);
                } catch (IOException e) {
                    log.debug("Could not upload file {}.", fileType, e);
                    return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileType + '.', e));
                }

                return CompletableFuture.completedFuture(transientBlobKey);
            } else {
                log.debug("The file {} does not exist on the TaskExecutor {}.", fileType, getResourceID());
                return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " does not exist on the TaskExecutor."));
            }
        } else {
            log.debug("The file {} is unavailable on the TaskExecutor {}.", fileType, getResourceID());
            return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " is not available on the TaskExecutor."));
        }
    }
  • RequestFileUpload of TaskExecutor will get filePath according to fileType, if logtype is taskmanagerconfiguration.gettaskmanagerlogpath (); If it is STDOUT type, it takes taskmanagerconfiguration. gettaskmanagerstdoutpath (), and then transfers the file in the past

TaskManagerRunner.startTaskManager

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java

    public static TaskExecutor startTaskManager(
            Configuration configuration,
            ResourceID resourceID,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            BlobCacheService blobCacheService,
            boolean localCommunicationOnly,
            FatalErrorHandler fatalErrorHandler) throws Exception {

        checkNotNull(configuration);
        checkNotNull(resourceID);
        checkNotNull(rpcService);
        checkNotNull(highAvailabilityServices);

        LOG.info("Starting TaskManager with ResourceID: {}", resourceID);

        InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());

        TaskManagerServicesConfiguration taskManagerServicesConfiguration =
            TaskManagerServicesConfiguration.fromConfiguration(
                configuration,
                remoteAddress,
                localCommunicationOnly);

        TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
            taskManagerServicesConfiguration,
            resourceID,
            rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io.
            EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
            EnvironmentInformation.getMaxJvmHeapMemory());

        TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
            metricRegistry,
            taskManagerServices.getTaskManagerLocation(),
            taskManagerServices.getNetworkEnvironment());

        TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);

        return new TaskExecutor(
            rpcService,
            taskManagerConfiguration,
            highAvailabilityServices,
            taskManagerServices,
            heartbeatServices,
            taskManagerMetricGroup,
            blobCacheService,
            fatalErrorHandler);
    }
  • TaskmanagerRunner. StartTaskManager constructs taskManagerConfiguration through TaskmanagerConfiguration.

TaskManagerConfiguration.fromConfiguration

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java

    public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
        int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);

        if (numberSlots == -1) {
            numberSlots = 1;
        }

        //......

        final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"));
        final String taskManagerStdoutPath;

        if (taskManagerLogPath != null) {
            final int extension = taskManagerLogPath.lastIndexOf('.');

            if (extension > 0) {
                taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + ".out";
            } else {
                taskManagerStdoutPath = null;
            }
        } else {
            taskManagerStdoutPath = null;
        }

        return new TaskManagerConfiguration(
            numberSlots,
            tmpDirPaths,
            timeout,
            finiteRegistrationDuration,
            initialRegistrationPause,
            maxRegistrationPause,
            refusedRegistrationPause,
            configuration,
            exitOnOom,
            FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
            alwaysParentFirstLoaderPatterns,
            taskManagerLogPath,
            taskManagerStdoutPath);
    }
  • Task _ manager _ log _ path _ key (taskmanager.log.path) read taskManagerLogPath from flink’s Configuration, and if it cannot be read, take the system attribute log.file;; If the taskManagerLogPath read is not null, construct taskManagerStdoutPath with another suffix

Summary

  • Flink’s log4j.properties is configured with file appender and uses the system property log.file
  • Flink’s MiniCluster will create DispatcherRestEndpoint when it starts. Its start method will use initializehandlers to initialize a series of handlers. For JobManager’s fileHandler, Use webmonitorutils.logFileLocation.find (clusterconfiguration) to get logfilelocation. it reads the logfilelocation attribute from the system attribute first, and if it is not flink, it reads WebOptions.LOG_PATH (web.log.path) configuration; After that, two StaticFileServerHandler were created using logFileLocation.logFile and logFileLocation.stdOutFile respectively.
  • For fileHandler of TaskManager, TaskManagerLogFileHandler and TaskManagerStdoutFileHandler are respectively created to handle the download of LOG and stdout files. both of them call the resourcemanager.requesttaskmanagerfileupload method internally, but fileType are different, one is log and the other is stdout; However, the resourcemanager.requesttaskmanagerfileupload method finally completes the file transfer through TaskExecutor.requestFileUpload; TaskmanagerRunner. StartTaskManager constructed TaskManagerConfiguration when it created TaskExecutor, in which it first obtained configconstants.task _ manager _ log _ path _ key (taskmanager.log.path), if not, take the system attribute log.file

doc