Springboot2 reports metrics to statsd

  springboot

Order

This article mainly studies how springboot2 reports metrics to statsd

maven

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-statsd</artifactId>
        </dependency>

Configuration file

# Whether exporting of metrics to StatsD is enabled.
management.metrics.export.statsd.enabled=true
# StatsD line protocol to use. datalog or esty
management.metrics.export.statsd.flavor=etsy
# Host of the StatsD server to receive exported metrics.
management.metrics.export.statsd.host=192.168.99.100
# Port of the StatsD server to receive exported metrics.
management.metrics.export.statsd.port=8125
# Total length of a single payload should be kept within your network's MTU.
management.metrics.export.statsd.max-packet-length=1400
# How often gauges will be polled. When a gauge is polled, its value is recalculated and if the value has changed (or publishUnchangedMeters is true), it is sent to the StatsD server.
management.metrics.export.statsd.polling-frequency=10s
# Whether to send unchanged meters to the StatsD server.
management.metrics.export.statsd.publish-unchanged-meters=true
# Maximum size of the queue of items waiting to be sent to the StatsD server.
management.metrics.export.statsd.queue-size=2147483647

flavor

micrometer-registry-statsd-1.0.1-sources.jar! /io/micrometer/statsd/StatsdFlavor.java

public enum StatsdFlavor {
    /**
     * https://github.com/etsy/statsd/blob/master/docs/metric_types.md
     */
    ETSY,

    /**
     * https://docs.datadoghq.com/guides/dogstatsd/#datagram-format
     */
    DATADOG,

    /**
     * https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/
     *
     * For gauges to work as expected, you should set `delete_gauges = false` in your input options as documented here:
     * https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd
     */
    TELEGRAF
}

There are several kinds of flavor. The default is DATALOG. ETSY is used here.

StatsdProperties

spring-boot-actuator-autoconfigure-2.0.0.RELEASE-sources.jar! /org/springframework/boot/actuate/autoconfigure/metrics/export/statsd/StatsdProperties.java

@ConfigurationProperties(prefix = "management.metrics.export.statsd")
public class StatsdProperties {

    /**
     * Whether exporting of metrics to StatsD is enabled.
     */
    private boolean enabled = true;

    /**
     * StatsD line protocol to use.
     */
    private StatsdFlavor flavor = StatsdFlavor.DATADOG;

    /**
     * Host of the StatsD server to receive exported metrics.
     */
    private String host = "localhost";

    /**
     * Port of the StatsD server to receive exported metrics.
     */
    private Integer port = 8125;

    /**
     * Total length of a single payload should be kept within your network's MTU.
     */
    private Integer maxPacketLength = 1400;

    /**
     * How often gauges will be polled. When a gauge is polled, its value is recalculated
     * and if the value has changed (or publishUnchangedMeters is true), it is sent to the
     * StatsD server.
     */
    private Duration pollingFrequency = Duration.ofSeconds(10);

    /**
     * Maximum size of the queue of items waiting to be sent to the StatsD server.
     */
    private Integer queueSize = Integer.MAX_VALUE;

    /**
     * Whether to send unchanged meters to the StatsD server.
     */
    private boolean publishUnchangedMeters = true;

    //......
}

Note that queueSize here defaults to infinity. However, a closer look at the source code does not seem to see the place to call.

Example

View heap

curl -i http://localhost:8080/actuator/metrics/jvm.memory.used?tag=area:heap

Return

{
  "name": "jvm.memory.used",
  "measurements": [
    {
      "statistic": "VALUE",
      "value": 442224240
    }
  ],
  "availableTags": [
    {
      "tag": "id",
      "values": [
        "PS Eden Space",
        "PS Old Gen",
        "PS Survivor Space"
      ]
    }
  ]
}

View eden

curl -i http://localhost:8080/actuator/metrics/jvm.memory.used?tag=area:heap&tag=id:PS Eden Space

Return

{
  "name": "jvm.memory.used",
  "measurements": [
    {
      "statistic": "VALUE",
      "value": 388454976
    }
  ],
  "availableTags": []
}

Grafana show

图片描述

Source code analysis

Tag to name

Since statsd of esty does not support tag, tag is converted to be part of name when outputting.

micrometer-registry-statsd-1.0.1-sources.jar! /io/micrometer/statsd/internal/FlavorStatsdLineBuilder.java

/**
 * A Statsd serializer for a particular {@link Meter} that formats the line in different
 * ways depending on the prevailing {@link StatsdFlavor}.
 *
 * @author Jon Schneider
 */
public class FlavorStatsdLineBuilder implements StatsdLineBuilder {
    private final Meter.Id id;
    private final StatsdFlavor flavor;
    private final HierarchicalNameMapper nameMapper;
    private final MeterRegistry.Config config;

    private final Function<NamingConvention, String> datadogTagString;
    private final Function<NamingConvention, String> telegrafTagString;

    public FlavorStatsdLineBuilder(Meter.Id id, StatsdFlavor flavor, HierarchicalNameMapper nameMapper, MeterRegistry.Config config) {
        this.id = id;
        this.flavor = flavor;
        this.nameMapper = nameMapper;
        this.config = config;

        // service:payroll,region:us-west
        this.datadogTagString = memoize(convention ->
                id.getTags().iterator().hasNext() ?
                        id.getConventionTags(convention).stream()
                                .map(t -> t.getKey() + ":" + t.getValue())
                                .collect(Collectors.joining(","))
                        : null
        );

        // service=payroll,region=us-west
        this.telegrafTagString = memoize(convention ->
                id.getTags().iterator().hasNext() ?
                        id.getConventionTags(convention).stream()
                                .map(t -> t.getKey() + "=" + t.getValue())
                                .collect(Collectors.joining(","))
                        : null
        );
    }

    @Override
    public String count(long amount, Statistic stat) {
        return line(Long.toString(amount), stat, "c");
    }

    @Override
    public String gauge(double amount, Statistic stat) {
        return line(DoubleFormat.decimalOrNan(amount), stat, "g");
    }

    @Override
    public String histogram(double amount) {
        return line(DoubleFormat.decimalOrNan(amount), null, "h");
    }

    @Override
    public String timing(double timeMs) {
        return line(DoubleFormat.decimalOrNan(timeMs), null, "ms");
    }

    private String line(String amount, @Nullable Statistic stat, String type) {
        switch (flavor) {
            case ETSY:
                return metricName(stat) + ":" + amount + "|" + type;
            case DATADOG:
                return metricName(stat) + ":" + amount + "|" + type + tags(stat, datadogTagString.apply(config.namingConvention()),":", "|#");
            case TELEGRAF:
            default:
                return metricName(stat) + tags(stat, telegrafTagString.apply(config.namingConvention()),"=", ",") + ":" + amount + "|" + type;
        }
    }

    private String tags(@Nullable Statistic stat, String otherTags, String keyValueSeparator, String preamble) {
        String tags = of(stat == null ? null : "statistic" + keyValueSeparator + stat.getTagValueRepresentation(), otherTags)
                .filter(Objects::nonNull)
                .collect(Collectors.joining(","));

        if(!tags.isEmpty())
            tags = preamble + tags;
        return tags;
    }

    private String metricName(@Nullable Statistic stat) {
        switch (flavor) {
            case ETSY:
                return nameMapper.toHierarchicalName(stat != null ? id.withTag(stat) : id, config.namingConvention());
            case DATADOG:
            case TELEGRAF:
            default:
                return config.namingConvention().name(id.getName(), id.getType(), id.getBaseUnit());
        }
    }
}

Focus on tags Method

StatsdMetricsExportAutoConfiguration

spring-boot-actuator-autoconfigure-2.0.0.RELEASE-sources.jar! /org/springframework/boot/actuate/autoconfigure/metrics/export/statsd/StatsdMetricsExportAutoConfiguration.java

@Configuration
@AutoConfigureBefore({ CompositeMeterRegistryAutoConfiguration.class,
        SimpleMetricsExportAutoConfiguration.class })
@AutoConfigureAfter(MetricsAutoConfiguration.class)
@ConditionalOnBean(Clock.class)
@ConditionalOnClass(StatsdMeterRegistry.class)
@ConditionalOnProperty(prefix = "management.metrics.export.statsd", name = "enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(StatsdProperties.class)
public class StatsdMetricsExportAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean(StatsdConfig.class)
    public StatsdConfig statsdConfig(StatsdProperties statsdProperties) {
        return new StatsdPropertiesConfigAdapter(statsdProperties);
    }

    @Bean
    @ConditionalOnMissingBean
    public StatsdMeterRegistry statsdMeterRegistry(StatsdConfig statsdConfig,
            HierarchicalNameMapper hierarchicalNameMapper, Clock clock) {
        return new StatsdMeterRegistry(statsdConfig, hierarchicalNameMapper, clock);
    }

    @Bean
    @ConditionalOnMissingBean
    public HierarchicalNameMapper hierarchicalNameMapper() {
        return HierarchicalNameMapper.DEFAULT;
    }

    @Bean
    public StatsdMetrics statsdMetrics() {
        return new StatsdMetrics();
    }

}

Note that statsdProperties is adapted to statsdConfig here using StatsdPropertiesConfigAdapter.
StatsdMeterRegistry was also created here.

StatsdMeterRegistry

micrometer-registry-statsd-1.0.1-sources.jar! /io/micrometer/statsd/StatsdMeterRegistry.java

public class StatsdMeterRegistry extends MeterRegistry {
    //......
    private StatsdMeterRegistry(StatsdConfig config,
                                HierarchicalNameMapper nameMapper,
                                NamingConvention namingConvention,
                                Clock clock,
                                @Nullable Function<Meter.Id, StatsdLineBuilder> lineBuilderFunction,
                                @Nullable Consumer<String> lineSink) {
        super(clock);

        this.statsdConfig = config;
        this.nameMapper = nameMapper;
        this.lineBuilderFunction = lineBuilderFunction;
        this.lineSink = lineSink;
        config().namingConvention(namingConvention);

        UnicastProcessor<String> processor = UnicastProcessor.create(Queues.<String>unboundedMultiproducer().get());

        try {
            Class.forName("ch.qos.logback.classic.turbo.TurboFilter", false, getClass().getClassLoader());
            this.publisher = new LogbackMetricsSuppressingUnicastProcessor(processor);
        } catch (ClassNotFoundException e) {
            this.publisher = processor;
        }

        if (lineSink != null) {
            publisher.subscribe(new Subscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(String line) {
                    if (started.get()) {
                        lineSink.accept(line);
                    }
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onComplete() {
                    meterPoller.dispose();
                }
            });

            // now that we're connected, start polling gauges and other pollable meter types
            meterPoller.replace(Flux.interval(statsdConfig.pollingFrequency())
                    .doOnEach(n -> pollableMeters.forEach(StatsdPollable::poll))
                    .subscribe());
        }

        if (config.enabled())
            start();
    }

    public void start() {
        if (started.compareAndSet(false, true) && lineSink == null) {
            UdpClient.create(statsdConfig.host(), statsdConfig.port())
                    .newHandler((in, out) -> out
                            .options(NettyPipeline.SendOptions::flushOnEach)
                            .sendString(publisher)
                            .neverComplete()
                    )
                    .subscribe(client -> {
                        this.udpClient.replace(client);

                        // now that we're connected, start polling gauges and other pollable meter types
                        meterPoller.replace(Flux.interval(statsdConfig.pollingFrequency())
                                .doOnEach(n -> pollableMeters.forEach(StatsdPollable::poll))
                                .subscribe());
                    });
        }
    }

    public void stop() {
        if (started.compareAndSet(true, false)) {
            udpClient.dispose();
            meterPoller.dispose();
        }
    }

    @Override
    public void close() {
        stop();
        super.close();
    }
    //......
}

1. it can be seen that the bottom layer is UdpClient using reactor, processor uses UnicastProcessor and uses unbounded MpscLinkedQueue.
2. I haven’t seen the function of queueSize set in the configuration file for half a day.
3. The specific data transfer is processed through this processor. UdpClient subscribes to the processor here, and then each metrics generates data to the processor.

such as
micrometer-registry-statsd-1.0.1-sources.jar! /io/micrometer/statsd/StatsdGauge.java

public class StatsdGauge<T> extends AbstractMeter implements Gauge, StatsdPollable {
    private final StatsdLineBuilder lineBuilder;
    private final Subscriber<String> publisher;

    private final WeakReference<T> ref;
    private final ToDoubleFunction<T> value;
    private final AtomicReference<Double> lastValue = new AtomicReference<>(Double.NaN);
    private final boolean alwaysPublish;

    StatsdGauge(Id id, StatsdLineBuilder lineBuilder, Subscriber<String> publisher, @Nullable T obj, ToDoubleFunction<T> value, boolean alwaysPublish) {
        super(id);
        this.lineBuilder = lineBuilder;
        this.publisher = publisher;
        this.ref = new WeakReference<>(obj);
        this.value = value;
        this.alwaysPublish = alwaysPublish;
    }

    @Override
    public double value() {
        T obj = ref.get();
        return obj != null ? value.applyAsDouble(ref.get()) : 0;
    }

    @Override
    public void poll() {
        double val = value();
        if (alwaysPublish || lastValue.getAndSet(val) != val) {
            publisher.onNext(lineBuilder.gauge(val));
        }
    }

    @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
    @Override
    public boolean equals(Object o) {
        return MeterEquivalence.equals(this, o);
    }

    @Override
    public int hashCode() {
        return MeterEquivalence.hashCode(this);
    }
}

You can see the poll method here going to publisheronNext data

Summary

Springboot2 currently does not seem to support specifying prefix (Version 1.x of spring.metrics.export.statsd.prefix has been marked as obsolete in version 2), which will make it impossible to distinguish which service indicator (It is estimated that the future version will support it. If the version 2.0.0 is to support it, you can also write some code extensions to support prefix.)。

doc