Talk about flink’s MetricQueryServiceGateway

  flink

Order

This article mainly studies flink’s MetricQueryServiceGateway.

MetricQueryServiceGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java

public interface MetricQueryServiceGateway {

    CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout);

    String getAddress();
}
  • MetricQueryServiceGateway defines two methods, one is queryMetrics and the other is getAddress; ; It has an implementation class AkkaQueryServiceGateway

AkkaQueryServiceGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java

public class AkkaQueryServiceGateway implements MetricQueryServiceGateway {

    private final ActorRef queryServiceActorRef;

    public AkkaQueryServiceGateway(ActorRef queryServiceActorRef) {
        this.queryServiceActorRef = Preconditions.checkNotNull(queryServiceActorRef);
    }

    @Override
    public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout) {
        return FutureUtils.toJava(
            Patterns.ask(queryServiceActorRef, MetricQueryService.getCreateDump(), timeout.toMilliseconds())
                .mapTo(ClassTag$.MODULE$.apply(MetricDumpSerialization.MetricSerializationResult.class))
        );
    }

    @Override
    public String getAddress() {
        return queryServiceActorRef.path().toString();
    }
}
  • AkkaQueryServiceGateway implements the MetricQueryServiceGateway interface, and its constructor requires that queryServiceActorRef; be passed in; The message type of queryMetrics method ask is MetricQueryService.CreateDump; ; The getAddress method returns queryServiceActorRef.path ()

MetricQueryService

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java

public class MetricQueryService extends UntypedActor {
    private static final Logger LOG = LoggerFactory.getLogger(MetricQueryService.class);

    public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService";
    private static final String SIZE_EXCEEDED_LOG_TEMPLATE =  "{} will not be reported as the metric dump would exceed the maximum size of {} bytes.";

    private static final CharacterFilter FILTER = new CharacterFilter() {
        @Override
        public String filterCharacters(String input) {
            return replaceInvalidChars(input);
        }
    };

    private final MetricDumpSerializer serializer = new MetricDumpSerializer();

    private final Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
    private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
    private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
    private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();

    private final long messageSizeLimit;

    //......

    @Override
    public void onReceive(Object message) {
        try {
            if (message instanceof AddMetric) {
                AddMetric added = (AddMetric) message;

                String metricName = added.metricName;
                Metric metric = added.metric;
                AbstractMetricGroup group = added.group;

                QueryScopeInfo info = group.getQueryServiceMetricInfo(FILTER);

                if (metric instanceof Counter) {
                    counters.put((Counter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
                } else if (metric instanceof Gauge) {
                    gauges.put((Gauge<?>) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
                } else if (metric instanceof Histogram) {
                    histograms.put((Histogram) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
                } else if (metric instanceof Meter) {
                    meters.put((Meter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
                }
            } else if (message instanceof RemoveMetric) {
                Metric metric = (((RemoveMetric) message).metric);
                if (metric instanceof Counter) {
                    this.counters.remove(metric);
                } else if (metric instanceof Gauge) {
                    this.gauges.remove(metric);
                } else if (metric instanceof Histogram) {
                    this.histograms.remove(metric);
                } else if (metric instanceof Meter) {
                    this.meters.remove(metric);
                }
            } else if (message instanceof CreateDump) {
                MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);

                dump = enforceSizeLimit(dump);

                getSender().tell(dump, getSelf());
            } else {
                LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());
                getSender().tell(new Status.Failure(new IOException("MetricQueryServiceActor received an invalid message. " + message.toString())), getSelf());
            }
        } catch (Exception e) {
            LOG.warn("An exception occurred while processing a message.", e);
        }
    }

    public static Object getCreateDump() {
        return CreateDump.INSTANCE;
    }

    private static class CreateDump implements Serializable {
        private static final CreateDump INSTANCE = new CreateDump();
    }
    //......
}
  • MetricQueryService inherits UntypedActor. Its onReceive method determines the message type. If it is CreateDump, it calls MetricDUMP SERIALIZATION. MetricDUMP SERIALIZER. SERIALIZE (COUNTERS, GAUGES, Histograms, meters) method to serialize metrics to obtain metricdumpserialization. metricserializationresult, and then use getSender().tell(dump, getSelf ()) to return the data

MetricDumpSerialization

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java

public class MetricDumpSerialization {
    //......

    public static class MetricSerializationResult implements Serializable {

        private static final long serialVersionUID = 6928770855951536906L;

        public final byte[] serializedCounters;
        public final byte[] serializedGauges;
        public final byte[] serializedMeters;
        public final byte[] serializedHistograms;

        public final int numCounters;
        public final int numGauges;
        public final int numMeters;
        public final int numHistograms;

        public MetricSerializationResult(
            byte[] serializedCounters,
            byte[] serializedGauges,
            byte[] serializedMeters,
            byte[] serializedHistograms,
            int numCounters,
            int numGauges,
            int numMeters,
            int numHistograms) {

            Preconditions.checkNotNull(serializedCounters);
            Preconditions.checkNotNull(serializedGauges);
            Preconditions.checkNotNull(serializedMeters);
            Preconditions.checkNotNull(serializedHistograms);
            Preconditions.checkArgument(numCounters >= 0);
            Preconditions.checkArgument(numGauges >= 0);
            Preconditions.checkArgument(numMeters >= 0);
            Preconditions.checkArgument(numHistograms >= 0);
            this.serializedCounters = serializedCounters;
            this.serializedGauges = serializedGauges;
            this.serializedMeters = serializedMeters;
            this.serializedHistograms = serializedHistograms;
            this.numCounters = numCounters;
            this.numGauges = numGauges;
            this.numMeters = numMeters;
            this.numHistograms = numHistograms;
        }
    }

    public static class MetricDumpSerializer {

        private DataOutputSerializer countersBuffer = new DataOutputSerializer(1024 * 8);
        private DataOutputSerializer gaugesBuffer = new DataOutputSerializer(1024 * 8);
        private DataOutputSerializer metersBuffer = new DataOutputSerializer(1024 * 8);
        private DataOutputSerializer histogramsBuffer = new DataOutputSerializer(1024 * 8);

        public MetricSerializationResult serialize(
            Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
            Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
            Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
            Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {

            countersBuffer.clear();
            int numCounters = 0;
            for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
                try {
                    serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
                    numCounters++;
                } catch (Exception e) {
                    LOG.debug("Failed to serialize counter.", e);
                }
            }

            gaugesBuffer.clear();
            int numGauges = 0;
            for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
                try {
                    serializeGauge(gaugesBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
                    numGauges++;
                } catch (Exception e) {
                    LOG.debug("Failed to serialize gauge.", e);
                }
            }

            histogramsBuffer.clear();
            int numHistograms = 0;
            for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
                try {
                    serializeHistogram(histogramsBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
                    numHistograms++;
                } catch (Exception e) {
                    LOG.debug("Failed to serialize histogram.", e);
                }
            }

            metersBuffer.clear();
            int numMeters = 0;
            for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
                try {
                    serializeMeter(metersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
                    numMeters++;
                } catch (Exception e) {
                    LOG.debug("Failed to serialize meter.", e);
                }
            }

            return new MetricSerializationResult(
                countersBuffer.getCopyOfBuffer(),
                gaugesBuffer.getCopyOfBuffer(),
                metersBuffer.getCopyOfBuffer(),
                histogramsBuffer.getCopyOfBuffer(),
                numCounters,
                numGauges,
                numMeters,
                numHistograms);
        }

        public void close() {
            countersBuffer = null;
            gaugesBuffer = null;
            metersBuffer = null;
            histogramsBuffer = null;
        }
    }

    //......
}
  • There are several static classes for MetricDumpSerialization: MetricSerializationResult, MetricDumpSerializer, MetricDumpDeserializer; ; MetricDumpserializer provides the Serialize method to serialize the counters, gauges, histograms, meters indicators into Metropolis SerializationResult

Summary

  • MetricQueryServiceGateway defines two methods, one is queryMetrics and the other is getAddress; ; It has an implementation class AkkaQueryServiceGateway
  • AkkaQueryServiceGateway implements the MetricQueryServiceGateway interface, and its constructor requires that queryServiceActorRef; be passed in; The message type of queryMetrics method ask is MetricQueryService.CreateDump; ; The getAddress method returns queryServiceActorRef.path ()
  • MetricQueryService inherits UntypedActor. Its onReceive method determines the message type. If it is CreateDump, it calls MetricDUMP SERIALIZATION. MetricDUMP SERIALIZER. SERIALIZE (COUNTERS, GAUGES, Histograms, meters) method to serialize metrics to obtain metricdumpserialization. metricserializationresult, and then use getSender().tell(dump, getSelf ()) to return data; There are several static classes for MetricDumpSerialization: MetricSerializationResult, MetricDumpSerializer, MetricDumpDeserializer; ; MetricDumpserializer provides the Serialize method to serialize the counters, gauges, histograms, meters indicators into Metropolis SerializationResult

doc