Talk about flink’s KvStateRegistryGateway

  flink

Order

This article mainly studies flink’s KvStateRegistryGateway

KvStateRegistryGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.java

public interface KvStateRegistryGateway {

    /**
     * Notifies that queryable state has been registered.
     *
     * @param jobId    identifying the job for which to register a key value state
     * @param jobVertexId JobVertexID the KvState instance belongs to.
     * @param keyGroupRange Key group range the KvState instance belongs to.
     * @param registrationName Name under which the KvState has been registered.
     * @param kvStateId ID of the registered KvState instance.
     * @param kvStateServerAddress Server address where to find the KvState instance.
     * @return Future acknowledge if the key-value state has been registered
     */
    CompletableFuture<Acknowledge> notifyKvStateRegistered(
        final JobID jobId,
        final JobVertexID jobVertexId,
        final KeyGroupRange keyGroupRange,
        final String registrationName,
        final KvStateID kvStateId,
        final InetSocketAddress kvStateServerAddress);

    /**
     * Notifies that queryable state has been unregistered.
     *
     * @param jobId    identifying the job for which to unregister a key value state
     * @param jobVertexId JobVertexID the KvState instance belongs to.
     * @param keyGroupRange Key group index the KvState instance belongs to.
     * @param registrationName Name under which the KvState has been registered.
     * @return Future acknowledge if the key-value state has been unregistered
     */
    CompletableFuture<Acknowledge> notifyKvStateUnregistered(
        final JobID jobId,
        final JobVertexID jobVertexId,
        final KeyGroupRange keyGroupRange,
        final String registrationName);
}
  • KvStateRegistryGateway interface defines two methods: notifyKvStateRegistered and notifyKvStateUnregistered. JobMaster implements these two methods.

JobMaster

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {

    /** Default names for Flink's distributed components. */
    public static final String JOB_MANAGER_NAME = "jobmanager";
    public static final String ARCHIVE_NAME = "archive";

    // ------------------------------------------------------------------------

    private final JobMasterConfiguration jobMasterConfiguration;

    private final ResourceID resourceId;

    private final JobGraph jobGraph;

    private final Time rpcTimeout;

    private final HighAvailabilityServices highAvailabilityServices;

    private final BlobServer blobServer;

    private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;

    private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;

    private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;

    private final ScheduledExecutorService scheduledExecutorService;

    private final OnCompletionActions jobCompletionActions;

    private final FatalErrorHandler fatalErrorHandler;

    private final ClassLoader userCodeLoader;

    private final SlotPool slotPool;

    private final SlotPoolGateway slotPoolGateway;

    private final RestartStrategy restartStrategy;

    // --------- BackPressure --------

    private final BackPressureStatsTracker backPressureStatsTracker;

    // --------- ResourceManager --------

    private final LeaderRetrievalService resourceManagerLeaderRetriever;

    // --------- TaskManagers --------

    private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;

    // -------- Mutable fields ---------

    private ExecutionGraph executionGraph;

    @Nullable
    private JobManagerJobStatusListener jobStatusListener;

    @Nullable
    private JobManagerJobMetricGroup jobManagerJobMetricGroup;

    @Nullable
    private String lastInternalSavepoint;

    @Nullable
    private ResourceManagerAddress resourceManagerAddress;

    @Nullable
    private ResourceManagerConnection resourceManagerConnection;

    @Nullable
    private EstablishedResourceManagerConnection establishedResourceManagerConnection;

    //......

    @Override
    public CompletableFuture<Acknowledge> notifyKvStateRegistered(
            final JobID jobId,
            final JobVertexID jobVertexId,
            final KeyGroupRange keyGroupRange,
            final String registrationName,
            final KvStateID kvStateId,
            final InetSocketAddress kvStateServerAddress) {
        if (jobGraph.getJobID().equals(jobId)) {
            if (log.isDebugEnabled()) {
                log.debug("Key value state registered for job {} under name {}.",
                    jobGraph.getJobID(), registrationName);
            }

            try {
                executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
                    jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);

                return CompletableFuture.completedFuture(Acknowledge.get());
            } catch (Exception e) {
                log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
                return FutureUtils.completedExceptionally(e);
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Notification about key-value state registration for unknown job {} received.", jobId);
            }
            return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
        }
    }

    @Override
    public CompletableFuture<Acknowledge> notifyKvStateUnregistered(
            JobID jobId,
            JobVertexID jobVertexId,
            KeyGroupRange keyGroupRange,
            String registrationName) {
        if (jobGraph.getJobID().equals(jobId)) {
            if (log.isDebugEnabled()) {
                log.debug("Key value state unregistered for job {} under name {}.",
                    jobGraph.getJobID(), registrationName);
            }

            try {
                executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
                    jobVertexId, keyGroupRange, registrationName);

                return CompletableFuture.completedFuture(Acknowledge.get());
            } catch (Exception e) {
                log.error("Failed to notify KvStateRegistry about registration {}.", registrationName, e);
                return FutureUtils.completedExceptionally(e);
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Notification about key-value state deregistration for unknown job {} received.", jobId);
            }
            return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
        }
    }

    //......
}
  • JobMaster’s notifyKvStateRegistered method mainly triggers executiongraph.getkvstatelocationregistry (). notifykvstateregistered; The notifyKvStateUnregistered method mainly triggers executiongraph. getkvstatelocationregistry (). notifykvstateunregistered

KvStateLocationRegistry

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java

public class KvStateLocationRegistry {

    /** JobID this coordinator belongs to. */
    private final JobID jobId;

    /** Job vertices for determining parallelism per key. */
    private final Map<JobVertexID, ExecutionJobVertex> jobVertices;

    /**
     * Location info keyed by registration name. The name needs to be unique
     * per JobID, i.e. two operators cannot register KvState with the same
     * name.
     */
    private final Map<String, KvStateLocation> lookupTable = new HashMap<>();

    /**
     * Creates the registry for the job.
     *
     * @param jobId       JobID this coordinator belongs to.
     * @param jobVertices Job vertices map of all vertices of this job.
     */
    public KvStateLocationRegistry(JobID jobId, Map<JobVertexID, ExecutionJobVertex> jobVertices) {
        this.jobId = Preconditions.checkNotNull(jobId, "JobID");
        this.jobVertices = Preconditions.checkNotNull(jobVertices, "Job vertices");
    }

    /**
     * Returns the {@link KvStateLocation} for the registered KvState instance
     * or <code>null</code> if no location information is available.
     *
     * @param registrationName Name under which the KvState instance is registered.
     * @return Location information or <code>null</code>.
     */
    public KvStateLocation getKvStateLocation(String registrationName) {
        return lookupTable.get(registrationName);
    }

    /**
     * Notifies the registry about a registered KvState instance.
     *
     * @param jobVertexId JobVertexID the KvState instance belongs to
     * @param keyGroupRange Key group range the KvState instance belongs to
     * @param registrationName Name under which the KvState has been registered
     * @param kvStateId ID of the registered KvState instance
     * @param kvStateServerAddress Server address where to find the KvState instance
     *
     * @throws IllegalArgumentException If JobVertexID does not belong to job
     * @throws IllegalArgumentException If state has been registered with same
     * name by another operator.
     * @throws IndexOutOfBoundsException If key group index is out of bounds.
     */
    public void notifyKvStateRegistered(
            JobVertexID jobVertexId,
            KeyGroupRange keyGroupRange,
            String registrationName,
            KvStateID kvStateId,
            InetSocketAddress kvStateServerAddress) {

        KvStateLocation location = lookupTable.get(registrationName);

        if (location == null) {
            // First registration for this operator, create the location info
            ExecutionJobVertex vertex = jobVertices.get(jobVertexId);

            if (vertex != null) {
                int parallelism = vertex.getMaxParallelism();
                location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName);
                lookupTable.put(registrationName, location);
            } else {
                throw new IllegalArgumentException("Unknown JobVertexID " + jobVertexId);
            }
        }

        // Duplicated name if vertex IDs don't match
        if (!location.getJobVertexId().equals(jobVertexId)) {
            IllegalStateException duplicate = new IllegalStateException(
                    "Registration name clash. KvState with name '" + registrationName +
                            "' has already been registered by another operator (" +
                            location.getJobVertexId() + ").");

            ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
            if (vertex != null) {
                vertex.fail(new SuppressRestartsException(duplicate));
            }

            throw duplicate;
        }
        location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress);
    }

    /**
     * Notifies the registry about an unregistered KvState instance.
     *
     * @param jobVertexId JobVertexID the KvState instance belongs to
     * @param keyGroupRange Key group index the KvState instance belongs to
     * @param registrationName Name under which the KvState has been registered
     * @throws IllegalArgumentException If another operator registered the state instance
     * @throws IllegalArgumentException If the registration name is not known
     */
    public void notifyKvStateUnregistered(
            JobVertexID jobVertexId,
            KeyGroupRange keyGroupRange,
            String registrationName) {

        KvStateLocation location = lookupTable.get(registrationName);

        if (location != null) {
            // Duplicate name if vertex IDs don't match
            if (!location.getJobVertexId().equals(jobVertexId)) {
                throw new IllegalArgumentException("Another operator (" +
                        location.getJobVertexId() + ") registered the KvState " +
                        "under '" + registrationName + "'.");
            }

            location.unregisterKvState(keyGroupRange);

            if (location.getNumRegisteredKeyGroups() == 0) {
                lookupTable.remove(registrationName);
            }
        } else {
            throw new IllegalArgumentException("Unknown registration name '" +
                    registrationName + "'. " + "Probably registration/unregistration race.");
        }
    }

}
  • The constructor of KvStateLocationRegistry requires jobId and jobVertices; to be passed in; It has an attribute of lookupTable, which stores the mapping relationship between registrationName and KvStateLocation.
  • The notifyKvStateRegistered method creates a KvStateLocation when the lookupTable cannot find the corresponding KvStateLocation, stores it in the lookupTable, and finally calls the location.registerKvState method
  • The notifyKvStateUnregistere method triggers location.unregisterKvState when lookupTable looks for the corresponding KvStateLocation, and then removes the KvStateLocation from lookupTable

Summary

  • KvStateRegistryGateway interface defines two methods: notifyKvStateRegistered and notifyKvStateUnregistered. JobMaster implements these two methods.
  • JobMaster’s notifyKvStateRegistered method mainly triggers executiongraph.getkvstatelocationregistry (). notifykvstateregistered; The notifyKvStateUnregistered method mainly triggers executiongraph. getkvstatelocationregistry (). notifykvstateunregistered
  • The constructor of KvStateLocationRegistry requires jobId and jobVertices; to be passed in; It has an attribute of lookupTable, which stores the mapping relationship between registrationName and KvStateLocation; The notifyKvStateRegistered method creates a KvStateLocation and stores it in the lookupTable when the lookupTable cannot find the corresponding KvStat eLocation, and finally calls the location.registerKvState method. The notifyKvStateUnregistere method triggers location.unregisterKvState when lookupTable looks for the corresponding KvStateLocation, and then removes the KvStateLocation from lookupTable

doc