Talk about Parallel Execution of flink

  flink

Order

This article mainly studies flink’s Parallel Execution

Example

Operator Level

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
  • Operators, data sources, datashinks can all call the setParallelism () method to set parallelism.

Execution Environment Level

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");
  • In ExecutionEnvironment, you can set the default Parallelism for operators, data sources, and data sinks by setting Parallelism. If operators, data sources and datashinks have their own parallelism settings, parallelism set by ExecutionEnvironment will be overwritten.

Client Level

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

Or ..

try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}
  • Using CLI client, parallelism can be specified in the parameters of Client.run when the command line call is -p or when Java/Scala calls.

System Level

# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 1
  • You can specify the system-level default parallelism for all execution environments through the parallelism.default configuration item in flink-conf.yaml.

ExecutionEnvironment

flink-java-1.7.1-sources.jar! /org/apache/flink/api/java/ExecutionEnvironment.java

@Public
public abstract class ExecutionEnvironment {
    //......

    private final ExecutionConfig config = new ExecutionConfig();

    /**
     * Sets the parallelism for operations executed through this environment.
     * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
     * x parallel instances.
     *
     * <p>This method overrides the default parallelism for this environment.
     * The {@link LocalEnvironment} uses by default a value equal to the number of hardware
     * contexts (CPU cores / threads). When executing the program via the command line client
     * from a JAR file, the default parallelism is the one configured for that setup.
     *
     * @param parallelism The parallelism
     */
    public void setParallelism(int parallelism) {
        config.setParallelism(parallelism);
    }

    @Internal
    public Plan createProgramPlan(String jobName, boolean clearSinks) {
        if (this.sinks.isEmpty()) {
            if (wasExecuted) {
                throw new RuntimeException("No new data sinks have been defined since the " +
                        "last execution. The last execution refers to the latest call to " +
                        "'execute()', 'count()', 'collect()', or 'print()'.");
            } else {
                throw new RuntimeException("No data sinks have been created yet. " +
                        "A program needs at least one sink that consumes data. " +
                        "Examples are writing the data set or printing it.");
            }
        }

        if (jobName == null) {
            jobName = getDefaultName();
        }

        OperatorTranslation translator = new OperatorTranslation();
        Plan plan = translator.translateToPlan(this.sinks, jobName);

        if (getParallelism() > 0) {
            plan.setDefaultParallelism(getParallelism());
        }
        plan.setExecutionConfig(getConfig());

        // Check plan for GenericTypeInfo's and register the types at the serializers.
        if (!config.isAutoTypeRegistrationDisabled()) {
            plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>() {

                private final Set<Class<?>> registeredTypes = new HashSet<>();
                private final Set<org.apache.flink.api.common.operators.Operator<?>> visitedOperators = new HashSet<>();

                @Override
                public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
                    if (!visitedOperators.add(visitable)) {
                        return false;
                    }
                    OperatorInformation<?> opInfo = visitable.getOperatorInfo();
                    Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, registeredTypes);
                    return true;
                }

                @Override
                public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {}
            });
        }

        try {
            registerCachedFilesWithPlan(plan);
        } catch (Exception e) {
            throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e);
        }

        // clear all the sinks such that the next execution does not redo everything
        if (clearSinks) {
            this.sinks.clear();
            wasExecuted = true;
        }

        // All types are registered now. Print information.
        int registeredTypes = config.getRegisteredKryoTypes().size() +
                config.getRegisteredPojoTypes().size() +
                config.getRegisteredTypesWithKryoSerializerClasses().size() +
                config.getRegisteredTypesWithKryoSerializers().size();
        int defaultKryoSerializers = config.getDefaultKryoSerializers().size() +
                config.getDefaultKryoSerializerClasses().size();
        LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers);

        if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) {
            LOG.warn("In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer");
        }
        if (config.isForceKryoEnabled()) {
            LOG.info("Using KryoSerializer for serializing POJOs");
        }
        if (config.isForceAvroEnabled()) {
            LOG.info("Using AvroSerializer for serializing POJOs");
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Registered Kryo types: {}", config.getRegisteredKryoTypes().toString());
            LOG.debug("Registered Kryo with Serializers types: {}", config.getRegisteredTypesWithKryoSerializers().entrySet().toString());
            LOG.debug("Registered Kryo with Serializer Classes types: {}", config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString());
            LOG.debug("Registered Kryo default Serializers: {}", config.getDefaultKryoSerializers().entrySet().toString());
            LOG.debug("Registered Kryo default Serializers Classes {}", config.getDefaultKryoSerializerClasses().entrySet().toString());
            LOG.debug("Registered POJO types: {}", config.getRegisteredPojoTypes().toString());

            // print information about static code analysis
            LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode());
        }

        return plan;
    }

    //......
}
  • ExecutionEnvironment provides the setParallelism method, specifying parallelism; to ExecutionConfig; Finally, the createProgramPlan method reads parallelism of ExecutionConfig after creating the Plan, and sets defaultParallelism to the Plan.

LocalEnvironment

flink-java-1.7.1-sources.jar! /org/apache/flink/api/java/LocalEnvironment.java

@Public
public class LocalEnvironment extends ExecutionEnvironment {

    //......

    public JobExecutionResult execute(String jobName) throws Exception {
        if (executor == null) {
            startNewSession();
        }

        Plan p = createProgramPlan(jobName);

        // Session management is disabled, revert this commit to enable
        //p.setJobId(jobID);
        //p.setSessionTimeout(sessionTimeout);

        JobExecutionResult result = executor.executePlan(p);

        this.lastJobExecutionResult = result;
        return result;
    }

    //......
}
  • Execute of LocalEnvironment calls executePlan of LocalExecutor.

LocalExecutor

flink-clients_2.11-1.7.1-sources.jar! /org/apache/flink/client/LocalExecutor.java

public class LocalExecutor extends PlanExecutor {
    
    //......

    @Override
    public JobExecutionResult executePlan(Plan plan) throws Exception {
        if (plan == null) {
            throw new IllegalArgumentException("The plan may not be null.");
        }

        synchronized (this.lock) {

            // check if we start a session dedicated for this execution
            final boolean shutDownAtEnd;

            if (jobExecutorService == null) {
                shutDownAtEnd = true;

                // configure the number of local slots equal to the parallelism of the local plan
                if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
                    int maxParallelism = plan.getMaximumParallelism();
                    if (maxParallelism > 0) {
                        this.taskManagerNumSlots = maxParallelism;
                    }
                }

                // start the cluster for us
                start();
            }
            else {
                // we use the existing session
                shutDownAtEnd = false;
            }

            try {
                // TODO: Set job's default parallelism to max number of slots
                final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
                final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
                plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);

                Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
                OptimizedPlan op = pc.compile(plan);

                JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
                JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());

                return jobExecutorService.executeJobBlocking(jobGraph);
            }
            finally {
                if (shutDownAtEnd) {
                    stop();
                }
            }
        }
    }

    //......
}
  • The executePlan method of LocalExecutor also sets defaultParallelism on the plan according to slotsPerTaskManager and numTaskManagers.

RemoteEnvironment

flink-java-1.7.1-sources.jar! /org/apache/flink/api/java/RemoteEnvironment.java

@Public
public class RemoteEnvironment extends ExecutionEnvironment {

    //......

    public JobExecutionResult execute(String jobName) throws Exception {
        PlanExecutor executor = getExecutor();

        Plan p = createProgramPlan(jobName);

        // Session management is disabled, revert this commit to enable
        //p.setJobId(jobID);
        //p.setSessionTimeout(sessionTimeout);

        JobExecutionResult result = executor.executePlan(p);

        this.lastJobExecutionResult = result;
        return result;
    }

    //......
}
  • The execute of RemoteEnvironment calls the executePlan of RemoteExecutor.

RemoteExecutor

flink-clients_2.11-1.7.1-sources.jar! /org/apache/flink/client/RemoteExecutor.java

public class RemoteExecutor extends PlanExecutor {

    private final Object lock = new Object();

    private final List<URL> jarFiles;

    private final List<URL> globalClasspaths;

    private final Configuration clientConfiguration;

    private ClusterClient<?> client;

    //......

    @Override
    public JobExecutionResult executePlan(Plan plan) throws Exception {
        if (plan == null) {
            throw new IllegalArgumentException("The plan may not be null.");
        }

        JobWithJars p = new JobWithJars(plan, this.jarFiles, this.globalClasspaths);
        return executePlanWithJars(p);
    }

    public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
        if (program == null) {
            throw new IllegalArgumentException("The job may not be null.");
        }

        synchronized (this.lock) {
            // check if we start a session dedicated for this execution
            final boolean shutDownAtEnd;

            if (client == null) {
                shutDownAtEnd = true;
                // start the executor for us
                start();
            }
            else {
                // we use the existing session
                shutDownAtEnd = false;
            }

            try {
                return client.run(program, defaultParallelism).getJobExecutionResult();
            }
            finally {
                if (shutDownAtEnd) {
                    stop();
                }
            }
        }
    }

    //......
}
  • ExecutePlan of RemoteExecutor called the executePlanWithJars method, which called run of ClusterClient and specified defaultParallelism in the parameter.

ClusterClient

flink-clients_2.11-1.7.1-sources.jar! /org/apache/flink/client/program/ClusterClient.java

public abstract class ClusterClient<T> {
    //......

    public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException {
        return run(program, parallelism, SavepointRestoreSettings.none());
    }

    public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
            throws CompilerException, ProgramInvocationException {
        ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
        if (classLoader == null) {
            throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
        }

        OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
        return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
    }

    private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism)
            throws CompilerException, ProgramInvocationException {
        return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
    }

    public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
        Logger log = LoggerFactory.getLogger(ClusterClient.class);

        if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
            log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
            p.setDefaultParallelism(parallelism);
        }
        log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());

        return compiler.compile(p);
    }

    //......
}
  • Parallelism in ClusterClient’s run method is applied to Plan when parallelism > 0 and p getdefaultparallelism () < = 0

DataStreamSource

flink-streaming-java_2.11-1.7.1-sources.jar! /org/apache/flink/streaming/api/datastream/DataStreamSource.java

@Public
public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {

    boolean isParallel;

    public DataStreamSource(StreamExecutionEnvironment environment,
            TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
            boolean isParallel, String sourceName) {
        super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));

        this.isParallel = isParallel;
        if (!isParallel) {
            setParallelism(1);
        }
    }

    public DataStreamSource(SingleOutputStreamOperator<T> operator) {
        super(operator.environment, operator.getTransformation());
        this.isParallel = true;
    }

    @Override
    public DataStreamSource<T> setParallelism(int parallelism) {
        if (parallelism != 1 && !isParallel) {
            throw new IllegalArgumentException("Source: " + transformation.getId() + " is not a parallel source");
        } else {
            super.setParallelism(parallelism);
            return this;
        }
    }
}
  • DataStreamSource inherits SingleOutputStreamOperator, which provides the setParallelism method, and finally calls the setParallelism of the parent class SingleOutputStreamOperator.

SingleOutputStreamOperator

flink-streaming-java_2.11-1.7.1-sources.jar! /org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java

@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {
    //......

    /**
     * Sets the parallelism for this operator.
     *
     * @param parallelism
     *            The parallelism for this operator.
     * @return The operator with set parallelism.
     */
    public SingleOutputStreamOperator<T> setParallelism(int parallelism) {
        Preconditions.checkArgument(canBeParallel() || parallelism == 1,
                "The parallelism of non parallel operator must be 1.");

        transformation.setParallelism(parallelism);

        return this;
    }

    //......
}
  • SingleOutputStreamOperator’s setParallelism is finally applied to StreamTransformation

DataStreamSink

flink-streaming-java_2.11-1.7.1-sources.jar! /org/apache/flink/streaming/api/datastream/DataStreamSink.java

@Public
public class DataStreamSink<T> {

    private final SinkTransformation<T> transformation;

    //......

    /**
     * Sets the parallelism for this sink. The degree must be higher than zero.
     *
     * @param parallelism The parallelism for this sink.
     * @return The sink with set parallelism.
     */
    public DataStreamSink<T> setParallelism(int parallelism) {
        transformation.setParallelism(parallelism);
        return this;
    }

    //......
}
  • DataStreamSink provides the setParallelism method and finally acts on SinkTransformation.

Summary

  • Flink can set several levels of parallelism, including Operator Level, Execution Environment Level, Client Level, System Level.
  • Specify the system-level default parallelism for all execution environments through the parallelism.default configuration item in flink-conf.yaml; In ExecutionEnvironment, you can set the default Parallelism for operators, data sources, and data sinks by setting Parallelism. If operators, data sources and datashinks have their own parallelism settings, parallelism set by ExecutionEnvironment will be overwritten.
  • The setParallelism method provided by ExecutionEnvironment is used to specify parallelism (If CLI client is used, parallelism; can be specified in the parameters of Client.run when the command line call is -p or when Java/Scala call is made; Parallelism set by LocalEnvironment and RemoteEnvironment is finally set to Plan.); DataStreamSource inherits SingleUtputTreamoperator, which provides setParallelism method, and finally calls SetParallelism of parent class SingleUtputTreamoperator; SingleOutputStreamOperator’s setParallelism is finally applied to StreamTransformation;; DataStreamSink provides the setParallelism method and finally acts on SinkTransformation.

doc