Talk about flink’s Queryable State



This article mainly studies flink’s Queryable State.



    public void testValueStateForQuery() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .createRemoteEnvironment("", 8081, SubmitTest.JAR_FILE);
        env.addSource(new RandomTuple2Source())
                .keyBy(0) //key by first value of tuple
                .flatMap(new CountWindowAverage())
        JobExecutionResult result = env.execute("testQueryableState");"submit job result:{}",result);
  • Here is a job that takes the first value of the tuple as the key, and then the flatMap operation uses CountWindowAverage


public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum

    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        Tuple2<Long, Long> currentSum = sum.value();
        if(currentSum == null){
            currentSum = Tuple2.of(1L,input.f1);
            currentSum.f0 += 1;
            currentSum.f1 += input.f1;


        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));

    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
        sum = getRuntimeContext().getState(descriptor);
  • CountWindowAverage declares state queryable through the setQueryable(“query-name “) method of ValueStateDescriptor


    public void testQueryStateByJobId() throws InterruptedException, IOException {
        //get jobId from flink ui running job page
        JobID jobId = JobID.fromHexString("793edfa93f354aa0274f759cb13ce79e");
        long key = 1L;
        QueryableStateClient client = new QueryableStateClient("", 9069);

        // the state descriptor of the state to be fetched.
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

        CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
                client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);"get kv state return future, waiting......");
        // org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException: Queryable State Server : No state for the specified key/namespace.
        ValueState<Tuple2<Long, Long>> res = resultFuture.join();"query result:{}",res.value());
  • Querystate is performed here via queryablestateclientconnection QueryableStateClient; The jobId here can be obtained through ui interface query after the job is submitted, and then converted into a JobID object using the JobID.fromHexString method


  • The function of Queryable State is currently in beta version. the release version of flink1.7 is not turned on by default. to turn it on, you need to copy flink-queryable-state-runtime _ 2.11-1.7.0.jar to the /opt/flink/lib/ directory. in this way, task manager will print such asStarted Queryable State Proxy Server @ / this way, it can be confirmed that the function is enabled.
  • Queryable state involves three components in the architecture. One is QueryableStateServer, which runs on each task manager and is responsible for the local State storage. One is QueryableStateClientProxy, which also runs on each task manager and is responsible for receiving the query request from the client, obtaining the corresponding state from the corresponding task manager and returning it to the client. One is QueryableStateClient, which is the state query that is usually run outside flink cluster and is used to submit users.
  • QueryableStateServer and QueryableStateClientProxy have the properties of ports, network-threads, query-threads that can be configured. The default query.server.ports value for QueryableStateServer is 9097; The default value of QueryableStateClientProxy is 9096. the client needs to use this port to make requests.
  • There are two ways to declare state queryable, one is to convert it to QueryableStateStream; via KeyedStream.asQueryableState method; One is setQueryable calling StateDescriptor of Managed keyed State to declare; The difference between the two is that asQueryableState must act directly on the KeyedStream object, so KeyedStream cannot perform subsequent transform operations, similar to sink; ; However, it is relatively flexible to declare through setQueryable of StateDescriptor. Note here that there is no queryable ListState.
  • Queryable State currently has several limitations. One is that its life cycle is the same as that of a task. It is destroyed when the task is finished. It cannot be queried. Subsequent queries may be supported after the task is finished. One is that the Notifications of the current KvState use the te ll mechanism, which may be changed to ack mode later. One is that statistics of query is currently disabled by default, and may support publishing to metrics system in the future.