Talk about flink’s ActorGateway.

  flink

Order

This article mainly studies flink’s ActorGateway

ActorGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java

public interface ActorGateway extends Serializable {

    /**
     * Sends a message asynchronously and returns its response. The response to the message is
     * returned as a future.
     *
     * @param message Message to be sent
     * @param timeout Timeout until the Future is completed with an AskTimeoutException
     * @return Future which contains the response to the sent message
     */
    Future<Object> ask(Object message, FiniteDuration timeout);

    /**
     * Sends a message asynchronously without a result.
     *
     * @param message Message to be sent
     */
    void tell(Object message);

    /**
     * Sends a message asynchronously without a result with sender being the sender.
     *
     * @param message Message to be sent
     * @param sender Sender of the message
     */
    void tell(Object message, ActorGateway sender);

    /**
     * Forwards a message. For the receiver of this message it looks as if sender has sent the
     * message.
     *
     * @param message Message to be sent
     * @param sender Sender of the forwarded message
     */
    void forward(Object message, ActorGateway sender);

    /**
     * Retries to send asynchronously a message up to numberRetries times. The response to this
     * message is returned as a future. The message is re-sent if the number of retries is not yet
     * exceeded and if an exception occurred while sending it.
     *
     * @param message Message to be sent
     * @param numberRetries Number of times to retry sending the message
     * @param timeout Timeout for each sending attempt
     * @param executionContext ExecutionContext which is used to send the message multiple times
     * @return Future of the response to the sent message
     */
    Future<Object> retry(
            Object message,
            int numberRetries,
            FiniteDuration timeout,
            ExecutionContext executionContext);

    /**
     * Returns the path of the remote instance.
     *
     * @return Path of the remote instance.
     */
    String path();

    /**
     * Returns the underlying actor with which is communicated
     *
     * @return ActorRef of the target actor
     */
    ActorRef actor();

    /**
     * Returns the leaderSessionID associated with the remote actor or null.
     *
     * @return Leader session ID if its associated with this gateway, otherwise null
     */
    UUID leaderSessionID();
}
  • The actorGateway interface defines ask, tell, forward, retry, path, ACTOR, leaderSessionID methods; It has an implementation class AkkaActorGateway

AkkaActorGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java

public class AkkaActorGateway implements ActorGateway, Serializable {

    private static final long serialVersionUID = 42L;

    // ActorRef of the remote instance
    private final ActorRef actor;

    // Associated leader session ID, which is used for RequiresLeaderSessionID messages
    private final UUID leaderSessionID;

    // Decorator for messages
    private final MessageDecorator decorator;

    public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) {
        this.actor = Preconditions.checkNotNull(actor);
        this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID);
        // we want to wrap RequiresLeaderSessionID messages in a LeaderSessionMessage
        this.decorator = new LeaderSessionMessageDecorator(leaderSessionID);
    }

    /**
     * Sends a message asynchronously and returns its response. The response to the message is
     * returned as a future.
     *
     * @param message Message to be sent
     * @param timeout Timeout until the Future is completed with an AskTimeoutException
     * @return Future which contains the response to the sent message
     */
    @Override
    public Future<Object> ask(Object message, FiniteDuration timeout) {
        Object newMessage = decorator.decorate(message);
        return Patterns.ask(actor, newMessage, new Timeout(timeout));
    }

    /**
     * Sends a message asynchronously without a result.
     *
     * @param message Message to be sent
     */
    @Override
    public void tell(Object message) {
        Object newMessage = decorator.decorate(message);
        actor.tell(newMessage, ActorRef.noSender());
    }

    /**
     * Sends a message asynchronously without a result with sender being the sender.
     *
     * @param message Message to be sent
     * @param sender Sender of the message
     */
    @Override
    public void tell(Object message, ActorGateway sender) {
        Object newMessage = decorator.decorate(message);
        actor.tell(newMessage, sender.actor());
    }

    /**
     * Forwards a message. For the receiver of this message it looks as if sender has sent the
     * message.
     *
     * @param message Message to be sent
     * @param sender Sender of the forwarded message
     */
    @Override
    public void forward(Object message, ActorGateway sender) {
        Object newMessage = decorator.decorate(message);
        actor.tell(newMessage, sender.actor());
    }

    /**
     * Retries to send asynchronously a message up to numberRetries times. The response to this
     * message is returned as a future. The message is re-sent if the number of retries is not yet
     * exceeded and if an exception occurred while sending it.
     *
     * @param message Message to be sent
     * @param numberRetries Number of times to retry sending the message
     * @param timeout Timeout for each sending attempt
     * @param executionContext ExecutionContext which is used to send the message multiple times
     * @return Future of the response to the sent message
     */
    @Override
    public Future<Object> retry(
            Object message,
            int numberRetries,
            FiniteDuration timeout,
            ExecutionContext executionContext) {

        Object newMessage = decorator.decorate(message);

        return AkkaUtils.retry(
            actor,
            newMessage,
            numberRetries,
            executionContext,
            timeout);
    }

    /**
     * Returns the ActorPath of the remote instance.
     *
     * @return ActorPath of the remote instance.
     */
    @Override
    public String path() {
        return actor.path().toString();
    }

    /**
     * Returns {@link ActorRef} of the target actor
     *
     * @return ActorRef of the target actor
     */
    @Override
    public ActorRef actor() {
        return actor;
    }

    @Override
    public UUID leaderSessionID() {
        return leaderSessionID;
    }

    @Override
    public String toString() {
        return String.format("AkkaActorGateway(%s, %s)", actor.path(), leaderSessionID);
    }
}
  • AkkaActorGateway implements the ActorGateway interface, and its constructor requires the input of ActorRef and leadsessionid. at the same time, it creates LeaderSessionMessageDecorator; based on leadsessionid. Ask, tell, forward, and retry methods all call the leader sessionmessagederector. decorate method to wrap the message parameter first, and then call the corresponding method of ActorRef

MessageDecorator

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java

public interface MessageDecorator extends java.io.Serializable {

    /**
     * Decorates a message
     *
     * @param message Message to decorate
     * @return Decorated message
     */
    Object decorate(Object message);
}
  • The MessageDecorator interface defines the decorate method to wrap message, and it has an implementation class of LeaderSessionMessageDecorator.

LeaderSessionMessageDecorator

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java

public class LeaderSessionMessageDecorator implements MessageDecorator {

    private static final long serialVersionUID = 5359618147408392706L;
    
    /** Leader session ID with which the RequiresLeaderSessionID messages will be decorated */
    private final UUID leaderSessionID;

    /**
     * Sets the leader session ID with which the messages will be decorated.
     *
     * @param leaderSessionID Leader session ID to be used for decoration
     */
    public LeaderSessionMessageDecorator(UUID leaderSessionID) {
        this.leaderSessionID = leaderSessionID;
    }

    @Override
    public Object decorate(Object message) {
        if (message instanceof RequiresLeaderSessionID) {
            return new JobManagerMessages.LeaderSessionMessage(leaderSessionID, message);
        } else {
            return message;
        }
    }
}
  • LeaderSessionMessagedecorate implements the MessageDecorator interface. If the Decorator method determines that the message of type RequiresLeaderSessi onID, it returns JobManagerMessages. LeaderSessionMessage; otherwise, it returns the original MessageMessage.

JobManagerMessages.LeaderSessionMessage

flink-1.7.2/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala

object JobManagerMessages {

  /** Wrapper class for leader session messages. Leader session messages implement the
    * [[RequiresLeaderSessionID]] interface and have to be wrapped in a [[LeaderSessionMessage]],
    * which also contains the current leader session ID.
    *
    * @param leaderSessionID Current leader session ID
    * @param message [[RequiresLeaderSessionID]] message to be wrapped in a [[LeaderSessionMessage]]
    */
  case class LeaderSessionMessage(leaderSessionID: UUID, message: Any)

  //......
}
  • JobManagermessages. LeaderSessionMessage is a case class with two attributes, leaderSessionID and Message respectively

Summary

  • The actorGateway interface defines ask, tell, forward, retry, path, ACTOR, leaderSessionID methods; It has an implementation class AkkaActorGateway
  • AkkaActorGateway implements the ActorGateway interface, and its constructor requires the input of ActorRef and leadsessionid. at the same time, it creates LeaderSessionMessageDecorator; based on leadsessionid. Ask, tell, forward, and retry methods all call the leader sessionmessagederector. decorate method to wrap the message parameter first, and then call the corresponding method of ActorRef
  • The MessageDecorator interface defines the decorate method to wrap message, which has an implementation class of LeaderSessionMessageDecorator; ; LeaderSessionmessagedecorate implements the message; Decorator interface. If the Decorator method determines that the Messageis of type RequiresLeaderSessionID, it returns JobManagerMessages. LeaderSessionMessage; otherwise, it returns the original Message; JobManagermessages. LeaderSessionMessage is a case class with two attributes, leaderSessionID and Message respectively

doc