[case41] Talk about storm’s GraphiteStormReporter

  storm

Order

This article mainly studies storm’s GraphiteStormReporter

GraphiteStormReporter

storm-core-1.2.2-sources.jar! /org/apache/storm/metrics2/reporters/GraphiteStormReporter.java

public class GraphiteStormReporter extends ScheduledStormReporter {
    private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class);

    public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with";
    public static final String GRAPHITE_HOST = "graphite.host";
    public static final String GRAPHITE_PORT = "graphite.port";
    public static final String GRAPHITE_TRANSPORT = "graphite.transport";

    @Override
    public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) {
        LOG.debug("Preparing...");
        GraphiteReporter.Builder builder = GraphiteReporter.forRegistry(metricsRegistry);

        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf);
        if (durationUnit != null) {
            builder.convertDurationsTo(durationUnit);
        }

        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
        if (rateUnit != null) {
            builder.convertRatesTo(rateUnit);
        }

        StormMetricsFilter filter = getMetricsFilter(reporterConf);
        if(filter != null){
            builder.filter(filter);
        }
        String prefix = getMetricsPrefixedWith(reporterConf);
        if (prefix != null) {
            builder.prefixedWith(prefix);
        }

        //defaults to 10
        reportingPeriod = getReportPeriod(reporterConf);

        //defaults to seconds
        reportingPeriodUnit = getReportPeriodUnit(reporterConf);

        // Not exposed:
        // * withClock(Clock)

        String host = getMetricsTargetHost(reporterConf);
        Integer port = getMetricsTargetPort(reporterConf);
        String transport = getMetricsTargetTransport(reporterConf);
        GraphiteSender sender = null;
        if (transport.equalsIgnoreCase("udp")) {
            sender = new GraphiteUDP(host, port);
        } else {
            sender = new Graphite(host, port);
        }
        reporter = builder.build(sender);
    }

    private static String getMetricsPrefixedWith(Map reporterConf) {
        return Utils.getString(reporterConf.get(GRAPHITE_PREFIXED_WITH), null);
    }

    private static String getMetricsTargetHost(Map reporterConf) {
        return Utils.getString(reporterConf.get(GRAPHITE_HOST), null);
    }

    private static Integer getMetricsTargetPort(Map reporterConf) {
        return Utils.getInt(reporterConf.get(GRAPHITE_PORT), null);
    }

    private static String getMetricsTargetTransport(Map reporterConf) {
        return Utils.getString(reporterConf.get(GRAPHITE_TRANSPORT), "tcp");
    }
}
  • Inherited ScheduledStormReporter and implemented the prepare method.
  • The prepare method creates com.codahale.metrics.graphitesender based on the configuration file, and then creates com.codahale.metrics.graphitereporter.

ScheduledStormReporter

storm-core-1.2.2-sources.jar! /org/apache/storm/metrics2/reporters/ScheduledStormReporter.java

public abstract class ScheduledStormReporter implements StormReporter{
    private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class);
    protected ScheduledReporter reporter;
    protected long reportingPeriod;
    protected TimeUnit reportingPeriodUnit;

    @Override
    public void start() {
        if (reporter != null) {
            LOG.debug("Starting...");
            reporter.start(reportingPeriod, reportingPeriodUnit);
        } else {
            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
        }
    }

    @Override
    public void stop() {
        if (reporter != null) {
            LOG.debug("Stopping...");
            reporter.stop();
        } else {
            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
        }
    }


    public static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) {
        TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS);
        return unit == null ? TimeUnit.SECONDS : unit;
    }

    private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) {
        String rateUnitString = Utils.getString(reporterConf.get(configName), null);
        if (rateUnitString != null) {
            return TimeUnit.valueOf(rateUnitString);
        }
        return null;
    }

    public static long getReportPeriod(Map reporterConf) {
        return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue();
    }

    public static StormMetricsFilter getMetricsFilter(Map reporterConf){
        StormMetricsFilter filter = null;
        Map<String, Object> filterConf = (Map)reporterConf.get("filter");
        if(filterConf != null) {
            String clazz = (String) filterConf.get("class");
            if (clazz != null) {
                filter = Utils.newInstance(clazz);
                filter.prepare(filterConf);
            }
        }
        return filter;
    }
}
  • ScheduledStormReporter encapsulates the control of the reporter’s life cycle, calling start when it is started and stop when it is closed.

Summary

  • Storm has started to use new metrics, namely, metrics2, from version 1.2. the new version of metrics is based on Dropwizard Metrics.
  • Console Reporter, cssvreporter, Ganglia Reporter, Graphite Reporter, JMX Reporter are provided by default.

doc