Talk about the execute method of flink LocalEnvironment.

  storm

Order

This article mainly studies the execute method of flink LocalEnvironment.

Example

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
                .pojoType(RecordDto.class, "playerName", "country", "year", "game", "gold", "silver", "bronze", "total");

        DataSet<Tuple2<String, Integer>> groupedByCountry = csvInput
                .flatMap(new FlatMapFunction<RecordDto, Tuple2<String, Integer>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void flatMap(RecordDto record, Collector<Tuple2<String, Integer>> out) throws Exception {

                        out.collect(new Tuple2<String, Integer>(record.getCountry(), 1));
                    }
                }).groupBy(0).sum(1);
        System.out.println("===groupedByCountry===");
        groupedByCountry.print();
  • Here, DataSet is used to read data from csv, then flatMap, groupBy, sum operations are performed, and finally print output is called.

DataSet.print

flink-java-1.6.2-sources.jar! /org/apache/flink/api/java/DataSet.java

    /**
     * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
     * the print() method. For programs that are executed in a cluster, this method needs
     * to gather the contents of the DataSet back to the client, to print it there.
     *
     * <p>The string written for each element is defined by the {@link Object#toString()} method.
     *
     * <p>This method immediately triggers the program execution, similar to the
     * {@link #collect()} and {@link #count()} methods.
     *
     * @see #printToErr()
     * @see #printOnTaskManager(String)
     */
    public void print() throws Exception {
        List<T> elements = collect();
        for (T e: elements) {
            System.out.println(e);
        }
    }
  • The print method is mainly to call the collect method to obtain the results and then print one by one.

DataSet.collect

flink-java-1.6.2-sources.jar! /org/apache/flink/api/java/DataSet.java

    /**
     * Convenience method to get the elements of a DataSet as a List.
     * As DataSet can contain a lot of data, this method should be used with caution.
     *
     * @return A List containing the elements of the DataSet
     */
    public List<T> collect() throws Exception {
        final String id = new AbstractID().toString();
        final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());

        this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
        JobExecutionResult res = getExecutionEnvironment().execute();

        ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
        if (accResult != null) {
            try {
                return SerializedListAccumulator.deserializeList(accResult, serializer);
            } catch (ClassNotFoundException e) {
                throw new RuntimeException("Cannot find type class of collected data type.", e);
            } catch (IOException e) {
                throw new RuntimeException("Serialization error while deserializing collected data", e);
            }
        } else {
            throw new RuntimeException("The call to collect() could not retrieve the DataSet.");
        }
    }
  • GetExecutionEnvironment (). Execute () is called here to get JobExecutionResult; ; ExecutionEnvironment this is the LocalEnvironment.

ExecutionEnvironment.execute

flink-java-1.6.2-sources.jar! /org/apache/flink/api/java/ExecutionEnvironment.java

    /**
     * Triggers the program execution. The environment will execute all parts of the program that have
     * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
     * writing results (e.g. {@link DataSet#writeAsText(String)},
     * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
     * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
     *
     * <p>The program execution will be logged and displayed with a generated default name.
     *
     * @return The result of the job execution, containing elapsed time and accumulators.
     * @throws Exception Thrown, if the program executions fails.
     */
    public JobExecutionResult execute() throws Exception {
        return execute(getDefaultName());
    }

    /**
     * Gets a default job name, based on the timestamp when this method is invoked.
     *
     * @return A default job name.
     */
    private static String getDefaultName() {
        return "Flink Java Job at " + Calendar.getInstance().getTime();
    }

    /**
     * Triggers the program execution. The environment will execute all parts of the program that have
     * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
     * writing results (e.g. {@link DataSet#writeAsText(String)},
     * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
     * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
     *
     * <p>The program execution will be logged and displayed with the given job name.
     *
     * @return The result of the job execution, containing elapsed time and accumulators.
     * @throws Exception Thrown, if the program executions fails.
     */
    public abstract JobExecutionResult execute(String jobName) throws Exception;
  • The concrete execute abstract method is implemented by subclasses. Here we mainly look at the execute method of LocalEnvironment.

LocalEnvironment.execute

flink-java-1.6.2-sources.jar! /org/apache/flink/api/java/LocalEnvironment.java

    @Override
    public JobExecutionResult execute(String jobName) throws Exception {
        if (executor == null) {
            startNewSession();
        }

        Plan p = createProgramPlan(jobName);

        // Session management is disabled, revert this commit to enable
        //p.setJobId(jobID);
        //p.setSessionTimeout(sessionTimeout);

        JobExecutionResult result = executor.executePlan(p);

        this.lastJobExecutionResult = result;
        return result;
    }

    @Override
    @PublicEvolving
    public void startNewSession() throws Exception {
        if (executor != null) {
            // we need to end the previous session
            executor.stop();
            // create also a new JobID
            jobID = JobID.generate();
        }

        // create a new local executor
        executor = PlanExecutor.createLocalExecutor(configuration);
        executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());

        // if we have a session, start the mini cluster eagerly to have it available across sessions
        if (getSessionTimeout() > 0) {
            executor.start();

            // also install the reaper that will shut it down eventually
            executorReaper = new ExecutorReaper(executor);
        }
    }
  • If it is judged here that the executor is null, startNewSession will be called. startNewSession will create the Executor through Plan Executor. CreatelocaleExecutor (Configuration); If sessionTimeout is greater than 0, then executor.start () will be immediately called here, and the default value is 0
  • After that, the plan is created by the createProgramPlan method.
  • Finally, the JobExecutionResult is obtained through executor.executePlan(p)

PlanExecutor.createLocalExecutor

flink-core-1.6.2-sources.jar! /org/apache/flink/api/common/PlanExecutor.java

    private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";

    /**
     * Creates an executor that runs the plan locally in a multi-threaded environment.
     * 
     * @return A local executor.
     */
    public static PlanExecutor createLocalExecutor(Configuration configuration) {
        Class<? extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
        
        try {
            return leClass.getConstructor(Configuration.class).newInstance(configuration);
        }
        catch (Throwable t) {
            throw new RuntimeException("An error occurred while loading the local executor ("
                    + LOCAL_EXECUTOR_CLASS + ").", t);
        }
    }

    private static Class<? extends PlanExecutor> loadExecutorClass(String className) {
        try {
            Class<?> leClass = Class.forName(className);
            return leClass.asSubclass(PlanExecutor.class);
        }
        catch (ClassNotFoundException cnfe) {
            throw new RuntimeException("Could not load the executor class (" + className
                    + "). Do you have the 'flink-clients' project in your dependencies?");
        }
        catch (Throwable t) {
            throw new RuntimeException("An error occurred while loading the executor (" + className + ").", t);
        }
    }
  • The planexecutor.createlocalexecutior method creates org.Apache.flink.client.localexecutor by reflection.

LocalExecutor.executePlan

flink-clients_2.11-1.6.2-sources.jar! /org/apache/flink/client/LocalExecutor.java

    /**
     * Executes the given program on a local runtime and waits for the job to finish.
     *
     * <p>If the executor has not been started before, this starts the executor and shuts it down
     * after the job finished. If the job runs in session mode, the executor is kept alive until
     * no more references to the executor exist.</p>
     *
     * @param plan The plan of the program to execute.
     * @return The net runtime of the program, in milliseconds.
     *
     * @throws Exception Thrown, if either the startup of the local execution context, or the execution
     *                   caused an exception.
     */
    @Override
    public JobExecutionResult executePlan(Plan plan) throws Exception {
        if (plan == null) {
            throw new IllegalArgumentException("The plan may not be null.");
        }

        synchronized (this.lock) {

            // check if we start a session dedicated for this execution
            final boolean shutDownAtEnd;

            if (jobExecutorService == null) {
                shutDownAtEnd = true;

                // configure the number of local slots equal to the parallelism of the local plan
                if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
                    int maxParallelism = plan.getMaximumParallelism();
                    if (maxParallelism > 0) {
                        this.taskManagerNumSlots = maxParallelism;
                    }
                }

                // start the cluster for us
                start();
            }
            else {
                // we use the existing session
                shutDownAtEnd = false;
            }

            try {
                // TODO: Set job's default parallelism to max number of slots
                final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
                final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
                plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);

                Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
                OptimizedPlan op = pc.compile(plan);

                JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
                JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());

                return jobExecutorService.executeJobBlocking(jobGraph);
            }
            finally {
                if (shutDownAtEnd) {
                    stop();
                }
            }
        }
    }
  • Here, when jobExecutorService is null, the start method will be called to start cluster to create jobExecutorService.
  • After that, the JobGraphGenerator is created, and then the plan is built as JobGraph through the jobgraph generator. compilejobgraph method.
  • Finally, call jobexecutorservice.executejobblocking (jobGraph), execute the jobgraph, and then return JobExecutionResult.

LocalExecutor.start

flink-clients_2.11-1.6.2-sources.jar! /org/apache/flink/client/LocalExecutor.java

    @Override
    public void start() throws Exception {
        synchronized (lock) {
            if (jobExecutorService == null) {
                // create the embedded runtime
                jobExecutorServiceConfiguration = createConfiguration();

                // start it up
                jobExecutorService = createJobExecutorService(jobExecutorServiceConfiguration);
            } else {
                throw new IllegalStateException("The local executor was already started.");
            }
        }
    }

    private Configuration createConfiguration() {
        Configuration newConfiguration = new Configuration();
        newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
        newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());

        newConfiguration.addAll(baseConfiguration);

        return newConfiguration;
    }

    private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
        final JobExecutorService newJobExecutorService;
        if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {

            if (!configuration.contains(RestOptions.PORT)) {
                configuration.setInteger(RestOptions.PORT, 0);
            }

            final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
                .setConfiguration(configuration)
                .setNumTaskManagers(
                    configuration.getInteger(
                        ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
                        ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
                .setRpcServiceSharing(RpcServiceSharing.SHARED)
                .setNumSlotsPerTaskManager(
                    configuration.getInteger(
                        TaskManagerOptions.NUM_TASK_SLOTS, 1))
                .build();

            final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
            miniCluster.start();

            configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());

            newJobExecutorService = miniCluster;
        } else {
            final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
            localFlinkMiniCluster.start();

            newJobExecutorService = localFlinkMiniCluster;
        }

        return newJobExecutorService;
    }
  • The start method here first creates a configuration file through createConfiguration, and then creates JobExecutorService through createJobExecutorService.
  • CreateConfiguration mainly sets TaskmanagerOptions.num _ task _ slots and coreoptions.filesytem _ default _ override
  • The createJobExecutorService method is mainly to create different newJobExecutorService according to the configuration of Configuration. GetString (Core Options. Mode)
  • The default is CoreOptions.NEW_MODE mode, which creates MiniClusterConfiguration first, then MiniCluster (JobExecutorService), and then call the MiniCluster.start method to return after startup.
  • New _ mode mode, LocalFlinkMiniCluster (JobExecutorService), and then call LocalFlinkMiniCluster.start () to return after startup.

MiniCluster.executeJobBlocking

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/minicluster/MiniCluster.java

    /**
     * This method runs a job in blocking mode. The method returns only after the job
     * completed successfully, or after it failed terminally.
     *
     * @param job  The Flink job to execute
     * @return The result of the job execution
     *
     * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
     *         or if the job terminally failed.
     */
    @Override
    public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
        checkNotNull(job, "job is null");

        final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);

        final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
            (JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));

        final JobResult jobResult;

        try {
            jobResult = jobResultFuture.get();
        } catch (ExecutionException e) {
            throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));
        }

        try {
            return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
        } catch (IOException | ClassNotFoundException e) {
            throw new JobExecutionException(job.getJobID(), e);
        }
    }
  • The MiniCluster.executeJobBlocking method first calls the submitJob(job) method to submit the JobGraph, which returns a CompletableFuture (submissionFuture)
  • This CompletableFuture (submissionFuture) connected to the requestJobResult method via thenCompose to request jobResult (jobResultFuture)
  • Finally get JobExecutionResult through jobResultFuture.get ()

Summary

  • The print method of the DataSet calls the collect method, while the collect method calls getexecutionEnvironment (). execute () to get JobExecutionResult. executionenvironment here is LocalEnvironment
  • ExecutionEnvironment.execute method internally calls the abstract method execute (stringyname), which is implemented by subclasses, here is LocalEnvironment.execute, which first passes through startNewSession. Createlocalexecutior is used to create LocalExecutor, then createProgramPlan is used to create plan, and finally LocalExecutor.executePlan is called to obtain JobExecutionResult.
  • Executeplan method it judges jobExecutorService first, if null, it calls start method to create jobExecutorService (According to the configuration of CoreOptions.MODE, if it is CoreOptions.NEW_MODE, the jobExecutorService created is MiniCluster, otherwise, the jobExecutorService created is LocalFlinkMiniCluster.), the jobExecutorService created here is MiniCluster;; After that, the plan is converted to jobGraph; by JobGraphGenerator; Finally, call jobexecutorservice.executejobblocking (jobGraph), execute the jobgraph, and then return JobExecutionResult.

doc