Talk about flink’s RichParallelSourceFunction

  flink

Order

This article mainly studies flink’s RichParallelSourceFunction

RichParallelSourceFunction

/**
 * Base class for implementing a parallel data source. Upon execution, the runtime will
 * execute as many parallel instances of this function function as configured parallelism
 * of the source.
 *
 * <p>The data source has access to context information (such as the number of parallel
 * instances of the source, and which parallel instance the current instance is)
 * via {@link #getRuntimeContext()}. It also provides additional life-cycle methods
 * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.</p>
 *
 * @param <OUT> The type of the records produced by this source.
 */
@Public
public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
        implements ParallelSourceFunction<OUT> {

    private static final long serialVersionUID = 1L;
}
  • RichParallelSourceFunction implements the ParallelSourceFunction interface and inherits AbstractRichFunction at the same time.

ParallelSourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java

/**
 * A stream data source that is executed in parallel. Upon execution, the runtime will
 * execute as many parallel instances of this function function as configured parallelism
 * of the source.
 *
 * <p>This interface acts only as a marker to tell the system that this source may
 * be executed in parallel. When different parallel instances are required to perform
 * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime
 * context, which reveals information like the number of parallel tasks, and which parallel
 * task the current instance is.
 *
 * @param <OUT> The type of the records produced by this source.
 */
@Public
public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
}
  • ParallelSourceFunction inherits the SourceFunction interface. It does not define other additional methods, but only uses the interface name to express the intention, that is, the stream data source that can be executed in parallel.

AbstractRichFunction

flink-core-1.6.2-sources.jar! /org/apache/flink/api/common/functions/AbstractRichFunction.java

/**
 * An abstract stub implementation for rich user-defined functions.
 * Rich functions have additional methods for initialization ({@link #open(Configuration)}) and
 * teardown ({@link #close()}), as well as access to their runtime execution context via
 * {@link #getRuntimeContext()}.
 */
@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {

    private static final long serialVersionUID = 1L;

    // --------------------------------------------------------------------------------------------
    //  Runtime context access
    // --------------------------------------------------------------------------------------------

    private transient RuntimeContext runtimeContext;

    @Override
    public void setRuntimeContext(RuntimeContext t) {
        this.runtimeContext = t;
    }

    @Override
    public RuntimeContext getRuntimeContext() {
        if (this.runtimeContext != null) {
            return this.runtimeContext;
        } else {
            throw new IllegalStateException("The runtime context has not been initialized.");
        }
    }

    @Override
    public IterationRuntimeContext getIterationRuntimeContext() {
        if (this.runtimeContext == null) {
            throw new IllegalStateException("The runtime context has not been initialized.");
        } else if (this.runtimeContext instanceof IterationRuntimeContext) {
            return (IterationRuntimeContext) this.runtimeContext;
        } else {
            throw new IllegalStateException("This stub is not part of an iteration step function.");
        }
    }

    // --------------------------------------------------------------------------------------------
    //  Default life cycle methods
    // --------------------------------------------------------------------------------------------

    @Override
    public void open(Configuration parameters) throws Exception {}

    @Override
    public void close() throws Exception {}
}
  • AbstractRichFunction mainly implements setRuntimeContext, getRuntimeContext, getIterationRuntimeContext methods of RichFunction interface. The open and close methods are both empty operations.

RuntimeContext

flink-core-1.6.2-sources.jar! /org/apache/flink/api/common/functions/RuntimeContext.java

/**
 * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
 * of the function will have a context through which it can access static contextual information (such as
 * the current parallelism) and other constructs like accumulators and broadcast variables.
 *
 * <p>A function can, during runtime, obtain the RuntimeContext via a call to
 * {@link AbstractRichFunction#getRuntimeContext()}.
 */
@Public
public interface RuntimeContext {

    /**
     * Returns the name of the task in which the UDF runs, as assigned during plan construction.
     *
     * @return The name of the task in which the UDF runs.
     */
    String getTaskName();

    /**
     * Returns the metric group for this parallel subtask.
     *
     * @return The metric group for this parallel subtask.
     */
    @PublicEvolving
    MetricGroup getMetricGroup();

    /**
     * Gets the parallelism with which the parallel task runs.
     *
     * @return The parallelism with which the parallel task runs.
     */
    int getNumberOfParallelSubtasks();

    /**
     * Gets the number of max-parallelism with which the parallel task runs.
     *
     * @return The max-parallelism with which the parallel task runs.
     */
    @PublicEvolving
    int getMaxNumberOfParallelSubtasks();

    /**
     * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to
     * parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
     *
     * @return The index of the parallel subtask.
     */
    int getIndexOfThisSubtask();

    /**
     * Gets the attempt number of this parallel subtask. First attempt is numbered 0.
     *
     * @return Attempt number of the subtask.
     */
    int getAttemptNumber();

    /**
     * Returns the name of the task, appended with the subtask indicator, such as "MyTask (3/6)",
     * where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be
     * {@link #getNumberOfParallelSubtasks()}.
     *
     * @return The name of the task, with subtask indicator.
     */
    String getTaskNameWithSubtasks();

    /**
     * Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing
     * job.
     */
    ExecutionConfig getExecutionConfig();

    //.......
}
  • The RuntimeContext defines many methods. Here we look at the getNumberOfParallelSubtasks method, which can return the parallelism; of the current task. GetIndexOfThisSubtask can get the subscript of the current parallel subtask. Based on this information, a ParallelSourceFunction can be developed that can be executed in parallel but does not duplicate the data transmitted by each other.

JobMaster.startJobExecution

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/jobmaster/JobMaster.java

    private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
        validateRunsInMainThread();

        checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");

        if (Objects.equals(getFencingToken(), newJobMasterId)) {
            log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);

            return Acknowledge.get();
        }

        setNewFencingToken(newJobMasterId);

        startJobMasterServices();

        log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());

        resetAndScheduleExecutionGraph();

        return Acknowledge.get();
    }

    private void resetAndScheduleExecutionGraph() throws Exception {
        validateRunsInMainThread();

        final CompletableFuture<Void> executionGraphAssignedFuture;

        if (executionGraph.getState() == JobStatus.CREATED) {
            executionGraphAssignedFuture = CompletableFuture.completedFuture(null);
        } else {
            suspendAndClearExecutionGraphFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
            final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
            final ExecutionGraph newExecutionGraph = createAndRestoreExecutionGraph(newJobManagerJobMetricGroup);

            executionGraphAssignedFuture = executionGraph.getTerminationFuture().handleAsync(
                (JobStatus ignored, Throwable throwable) -> {
                    assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup);
                    return null;
                },
                getMainThreadExecutor());
        }

        executionGraphAssignedFuture.thenRun(this::scheduleExecutionGraph);
    }

    private void scheduleExecutionGraph() {
        checkState(jobStatusListener == null);
        // register self as job status change listener
        jobStatusListener = new JobManagerJobStatusListener();
        executionGraph.registerJobStatusListener(jobStatusListener);

        try {
            executionGraph.scheduleForExecution();
        }
        catch (Throwable t) {
            executionGraph.failGlobal(t);
        }
    }
  • The resetAndscheduleExecutionGraph method is called here, while resetAndScheduleExecutionGraph combines the ScheduleExecutionGraph method; Schedulexecution () is called here to schedule execution.

ExecutionGraph.scheduleForExecution

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/executiongraph/ExecutionGraph.java

    public void scheduleForExecution() throws JobException {

        final long currentGlobalModVersion = globalModVersion;

        if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {

            final CompletableFuture<Void> newSchedulingFuture;

            switch (scheduleMode) {

                case LAZY_FROM_SOURCES:
                    newSchedulingFuture = scheduleLazy(slotProvider);
                    break;

                case EAGER:
                    newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
                    break;

                default:
                    throw new JobException("Schedule mode is invalid.");
            }

            if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
                schedulingFuture = newSchedulingFuture;

                newSchedulingFuture.whenCompleteAsync(
                    (Void ignored, Throwable throwable) -> {
                        if (throwable != null && !(throwable instanceof CancellationException)) {
                            // only fail if the scheduling future was not canceled
                            failGlobal(ExceptionUtils.stripCompletionException(throwable));
                        }
                    },
                    futureExecutor);
            } else {
                newSchedulingFuture.cancel(false);
            }
        }
        else {
            throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
        }
    }
  • This is EAGER mode, so the scheduleEager method is called.

ExecutionGraph.scheduleEager

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/executiongraph/ExecutionGraph.java

    /**
     *
     *
     * @param slotProvider  The resource provider from which the slots are allocated
     * @param timeout       The maximum time that the deployment may take, before a
     *                      TimeoutException is thrown.
     * @returns Future which is completed once the {@link ExecutionGraph} has been scheduled.
     * The future can also be completed exceptionally if an error happened.
     */
    private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) {
        checkState(state == JobStatus.RUNNING, "job is not running currently");

        // Important: reserve all the space we need up front.
        // that way we do not have any operation that can fail between allocating the slots
        // and adding them to the list. If we had a failure in between there, that would
        // cause the slots to get lost
        final boolean queued = allowQueuedScheduling;

        // collecting all the slots may resize and fail in that operation without slots getting lost
        final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());

        // allocate the slots (obtain all their futures
        for (ExecutionJobVertex ejv : getVerticesTopologically()) {
            // these calls are not blocking, they only return futures
            Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
                slotProvider,
                queued,
                LocationPreferenceConstraint.ALL,
                allocationTimeout);

            allAllocationFutures.addAll(allocationFutures);
        }

        // this future is complete once all slot futures are complete.
        // the future fails once one slot future fails.
        final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

        final CompletableFuture<Void> currentSchedulingFuture = allAllocationsFuture
            .thenAccept(
                (Collection<Execution> executionsToDeploy) -> {
                    for (Execution execution : executionsToDeploy) {
                        try {
                            execution.deploy();
                        } catch (Throwable t) {
                            throw new CompletionException(
                                new FlinkException(
                                    String.format("Could not deploy execution %s.", execution),
                                    t));
                        }
                    }
                })
            // Generate a more specific failure message for the eager scheduling
            .exceptionally(
                (Throwable throwable) -> {
                    final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
                    final Throwable resultThrowable;

                    if (strippedThrowable instanceof TimeoutException) {
                        int numTotal = allAllocationsFuture.getNumFuturesTotal();
                        int numComplete = allAllocationsFuture.getNumFuturesCompleted();
                        String message = "Could not allocate all requires slots within timeout of " +
                            timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete;

                        resultThrowable = new NoResourceAvailableException(message);
                    } else {
                        resultThrowable = strippedThrowable;
                    }

                    throw new CompletionException(resultThrowable);
                });

        return currentSchedulingFuture;
    }
  • The scheduleEager method first calls getVerticesTopologically here to get ExecutionJobVertex
  • After that, call executionjobvertex.allocateeresourcesforall to allocate resources to get collection < completableutility < execution > >
  • Finally, wait for these Future through futureutils.combineall (allallocationfutures), and then call execution.deploy () one by one for deployment.

ExecutionJobVertex.allocateResourcesForAll

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java

    /**
     * Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns
     * pairs of the slots and execution attempts, to ease correlation between vertices and execution
     * attempts.
     *
     * <p>If this method throws an exception, it makes sure to release all so far requested slots.
     *
     * @param resourceProvider The resource provider from whom the slots are requested.
     * @param queued if the allocation can be queued
     * @param locationPreferenceConstraint constraint for the location preferences
     * @param allocationTimeout timeout for allocating the individual slots
     */
    public Collection<CompletableFuture<Execution>> allocateResourcesForAll(
            SlotProvider resourceProvider,
            boolean queued,
            LocationPreferenceConstraint locationPreferenceConstraint,
            Time allocationTimeout) {
        final ExecutionVertex[] vertices = this.taskVertices;
        final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length];

        // try to acquire a slot future for each execution.
        // we store the execution with the future just to be on the safe side
        for (int i = 0; i < vertices.length; i++) {
            // allocate the next slot (future)
            final Execution exec = vertices[i].getCurrentExecutionAttempt();
            final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(
                resourceProvider,
                queued,
                locationPreferenceConstraint,
                allocationTimeout);
            slots[i] = allocationFuture;
        }

        // all good, we acquired all slots
        return Arrays.asList(slots);
    }
  • Here, according to taskVertices of ExecutionJobVertex, exec.allocateandassignslotforexecution is called one by one for allocation. It can be found that the overall parallelism is determined by taskVertices.

Execution.deploy

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/executiongraph/Execution.java

    /**
     * Deploys the execution to the previously assigned resource.
     *
     * @throws JobException if the execution cannot be deployed to the assigned resource
     */
    public void deploy() throws JobException {
        final LogicalSlot slot  = assignedResource;

        checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");

        //......

        try {

            //......

            final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
                attemptId,
                slot,
                taskRestore,
                attemptNumber);

            // null taskRestore to let it be GC'ed
            taskRestore = null;

            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

            final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout);

            submitResultFuture.whenCompleteAsync(
                (ack, failure) -> {
                    // only respond to the failure case
                    if (failure != null) {
                        if (failure instanceof TimeoutException) {
                            String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';

                            markFailed(new Exception(
                                "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
                                    + ") not responding after a rpcTimeout of " + rpcTimeout, failure));
                        } else {
                            markFailed(failure);
                        }
                    }
                },
                executor);
        }
        catch (Throwable t) {
            markFailed(t);
            ExceptionUtils.rethrow(t);
        }
    }
  • Execution.deploy creates a TaskDeploymentDescriptor and then submits the deployment; through taskManagerGateway.submitTask; This is followed by the run method that triggers the TaskExecutor to trigger the tasktask.

ExecutionJobVertex

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java

    private final ExecutionVertex[] taskVertices;

    public ExecutionJobVertex(
            ExecutionGraph graph,
            JobVertex jobVertex,
            int defaultParallelism,
            Time timeout,
            long initialGlobalModVersion,
            long createTimestamp) throws JobException {

        if (graph == null || jobVertex == null) {
            throw new NullPointerException();
        }

        this.graph = graph;
        this.jobVertex = jobVertex;

        int vertexParallelism = jobVertex.getParallelism();
        int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;

        final int configuredMaxParallelism = jobVertex.getMaxParallelism();

        this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism);

        // if no max parallelism was configured by the user, we calculate and set a default
        setMaxParallelismInternal(maxParallelismConfigured ?
                configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));

        // verify that our parallelism is not higher than the maximum parallelism
        if (numTaskVertices > maxParallelism) {
            throw new JobException(
                String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",
                    jobVertex.getName(),
                    numTaskVertices,
                    maxParallelism));
        }

        this.parallelism = numTaskVertices;

        this.serializedTaskInformation = null;

        this.taskVertices = new ExecutionVertex[numTaskVertices];
        //......

        // create all task vertices
        for (int i = 0; i < numTaskVertices; i++) {
            ExecutionVertex vertex = new ExecutionVertex(
                    this,
                    i,
                    producedDataSets,
                    timeout,
                    initialGlobalModVersion,
                    createTimestamp,
                    maxPriorAttemptsHistoryLength);

            this.taskVertices[i] = vertex;
        }

        //......
    }
  • TaskVertices is an ExecutionVertex[]. Its size is determined by numTaskVertices
  • ExecutionJobVertex first determines whether jobVertex.getParallelism () is greater than 0 (Generally greater than 0), if it is greater than 0, the value of jobVertex.getParallelism () is numTaskVertices;; If it is not greater than 0, defaultParallelism (When ExecutionJobVertex was created in the attachJobGraph method of ExecutionGraph, the defaultParallelism passed was 1)
  • Then create ExecutionVertex one by one according to numtaskVertices and put it into TaskVertices data.
  • The parallelism of jobVertex is set by StreamingJobGraphGenerator according to streamNode.getParallelism () in the createJobVertex method (If the value of streamNode.getParallelism () is greater than 0)
  • If the parallelism of streamNode is not set by itself, the default is to take the parallelism of StreamExecutionEnvironment (For details, please refer to the constructors of DataStreamSource, DataStream.transform method and DataStreamSink; Parallelism will be reset to 1 in DataStreamSource for source that are not of type parallel.); If it is LocalEnvironment, it defaults to runtime.getruntime (). availableprocessors ()

Summary

  • RichParallelSourceFunction implements the ParallelSourceFunction interface and inherits AbstractRichFunction;. AbstractRichFunction mainly implements setRuntimeContext, getRuntimeContext, getIterationRuntimeContext methods of RichFunction interface. GetNumberOfParallelSubtasks method defined by RuntimeContext (Returns parallelism for the current task) and getIndexOfThisSubtask (Gets the subscript of the current parallel subtask) method, it is convenient to develop ParallelSourceFunction which can be executed in parallel but the data transmitted by each can not be repeated.
  • JobMaster calls executiongraph.scheduleforexecution () to schedule when startJobExecution; During this period, resources are allocated through executionjobvertex. allocatesourcesforall to get collection < completableutility < execution > > and then execute exec ution.deploy () one by one for deployment. Execution.deploy creates a TaskDeploymentDescriptor and then submits the deployment; through taskManagerGateway.submitTask; This is followed by the run method that triggers the TaskExecutor to trigger the tasktask.
  • ExecutionJobVertex. AllocateResourcesFulll is based on ExecutionJobVertex’s taskVertices to call Exec. AllocateAndssignsLotForexecution one by one for distribution, and the whole parallelism is determined by taskVertices. TaskVertices is initialized in the ExecutionJobVertex constructor. If jobVertex.getParallelism () is greater than 0, this value is taken; otherwise, defaultParallelism is taken as 1; The parallelism of jobVertex is set by StreamingJobGraphGenerator according to streamNode.getParallelism () in the createJobVertex method (If the value of streamNode.getParallelism () is greater than 0), if the user does not set it, the default is to take parallelism; of StreamExecutionEnvironment; If LocalEnvironment is used, it defaults to runtime.getruntime (). availableprocessors ()

doc