Talk about flink’s Execution Plan Visualization

  flink

Order

This article mainly studies flink’s Execution Plan Visualization

Example

Code

    @Test
    public void testExecutionPlan(){
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(WORDS)
                .flatMap(new WordCountTest.Tokenizer())
                .keyBy(0)
                .sum(1);
        dataStream.print();
        System.out.println(env.getExecutionPlan());
    }

json

{
  "nodes": [
    {
      "id": 1,
      "type": "Source: Collection Source",
      "pact": "Data Source",
      "contents": "Source: Collection Source",
      "parallelism": 1
    },
    {
      "id": 2,
      "type": "Flat Map",
      "pact": "Operator",
      "contents": "Flat Map",
      "parallelism": 4,
      "predecessors": [
        {
          "id": 1,
          "ship_strategy": "REBALANCE",
          "side": "second"
        }
      ]
    },
    {
      "id": 4,
      "type": "Keyed Aggregation",
      "pact": "Operator",
      "contents": "Keyed Aggregation",
      "parallelism": 4,
      "predecessors": [
        {
          "id": 2,
          "ship_strategy": "HASH",
          "side": "second"
        }
      ]
    },
    {
      "id": 5,
      "type": "Sink: Print to Std. Out",
      "pact": "Data Sink",
      "contents": "Sink: Print to Std. Out",
      "parallelism": 4,
      "predecessors": [
        {
          "id": 4,
          "ship_strategy": "FORWARD",
          "side": "second"
        }
      ]
    }
  ]
}

Visualization

Openflink plan visualizerEnter json above into the text box and click Draw to visualize as follows:
图片描述

StreamExecutionEnvironment.getExecutionPlan

flink-streaming-java_2.11-1.7.1-sources.jar! /org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java

@Public
public abstract class StreamExecutionEnvironment {
    //......

    /**
     * Creates the plan with which the system will execute the program, and
     * returns it as a String using a JSON representation of the execution data
     * flow graph. Note that this needs to be called, before the plan is
     * executed.
     *
     * @return The execution plan of the program, as a JSON String.
     */
    public String getExecutionPlan() {
        return getStreamGraph().getStreamingPlanAsJSON();
    }

    /**
     * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
     *
     * @return The streamgraph representing the transformations
     */
    @Internal
    public StreamGraph getStreamGraph() {
        if (transformations.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        }
        return StreamGraphGenerator.generate(this, transformations);
    }

    //......
}
  • The getExecutionPlan method of StreamExecutionEnvironment called the getStreamGraph method; The getStreamGraph method uses StreamGraphGenerator.generate to generate streamgraph; After that, call streamgraph.getstreamingplanasjson to get the execution plan in json format.

StreamGraph.getStreamingPlanAsJSON

flink-streaming-java_2.11-1.7.1-sources.jar! /org/apache/flink/streaming/api/graph/StreamGraph.java

@Internal
public class StreamGraph extends StreamingPlan {

    private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);

    private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;

    private final StreamExecutionEnvironment environment;
    private final ExecutionConfig executionConfig;
    private final CheckpointConfig checkpointConfig;

    private boolean chaining;

    private Map<Integer, StreamNode> streamNodes;
    private Set<Integer> sources;
    private Set<Integer> sinks;
    private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
    private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
    private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtualPartitionNodes;

    protected Map<Integer, String> vertexIDtoBrokerID;
    protected Map<Integer, Long> vertexIDtoLoopTimeout;
    private StateBackend stateBackend;
    private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;

    //......

    public String getStreamingPlanAsJSON() {
        try {
            return new JSONGenerator(this).getJSON();
        }
        catch (Exception e) {
            throw new RuntimeException("JSON plan creation failed", e);
        }
    }

    //......
}
  • The getStreamingPlanAsjson method of StreamGraph uses JSONGenerator to serialize itself and returns execution plan in JSON format.

Summary

  • Flink offersflink plan visualizerIs used for visualization of execution plan, which receives execution plan in json form.
  • The getExecutionPlan method of StreamExecutionEnvironment called the getStreamGraph method; The getStreamGraph method uses StreamGraphGenerator.generate to generate StreamGraph.
  • The getStreamingPlanAsjson method of StreamGraph uses JSONGenerator to serialize itself and returns execution plan in JSON format.

doc