Talk about storm’s LoggingMetricsConsumer



This article mainly studies storm’s LoggingMetricsConsumer



public class LoggingMetricsConsumer implements IMetricsConsumer {
    public static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsConsumer.class);
    static private String padding = "                       ";

    public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {

    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
        StringBuilder sb = new StringBuilder();
        String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t",
                                      taskInfo.srcWorkerHost, taskInfo.srcWorkerPort,
        for (DataPoint p : dataPoints) {
            sb.delete(header.length(), sb.length());
              .append(padding).delete(header.length() + 23, sb.length()).append("\t")

    public void cleanup() {
  • Log; gingMetricsConsumer implements the IMetricsConsumer interface and prints taskInfo and dataPoints to Log in the handleDataPoints method. Which log to print to? This depends on storm’s log4j2 configuration.


<?xml version="1.0" encoding="UTF-8"?>
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 See the License for the specific language governing permissions and
 limitations under the License.

<configuration monitorInterval="60" shutdownHook="disable">
    <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property>
    <property name="patternNoTime">%msg%n</property>
    <property name="patternMetrics">%d %-8r %m%n</property>
    <RollingFile name="A1"
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        <DefaultRolloverStrategy max="9"/>
    <RollingFile name="STDOUT"
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        <DefaultRolloverStrategy max="4"/>
    <RollingFile name="STDERR"
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        <DefaultRolloverStrategy max="4"/>
    <RollingFile name="METRICS"
            <SizeBasedTriggeringPolicy size="2 MB"/>
        <DefaultRolloverStrategy max="9"/>
    <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"
        protocol="UDP" appName="[${}:${sys:worker.port}]" mdcId="mdc" includeMDC="true"
        facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"
        messageId="[${}:${sys:logging.sensitivity}]" id="storm" immediateFail="true" immediateFlush="true"/>
    <root level="info"> <!-- We log everything -->
        <appender-ref ref="A1"/>
        <appender-ref ref="syslog"/>
    <Logger name="org.apache.storm.metric.LoggingMetricsConsumer" level="info" additivity="false">
        <appender-ref ref="METRICS"/>
    <Logger name="STDERR" level="INFO">
        <appender-ref ref="STDERR"/>
        <appender-ref ref="syslog"/>
    <Logger name="STDOUT" level="INFO">
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="syslog"/>
  • Take worker.xml as an example. logger whose name is org.Apache.storm.metric.loggingmetricsconsumer is configured with info-level output and additivity is false.
  • The appender of METRICS specifies the file name $ {sys: workers.artifacts}/$ {sys:}/$ {sys: worker.port}/$ {sys:}. metrics, such as workers-artifacts/tickedemo-1-1541070680/6700/worker.log.metrics.
  • METRCIS is configured with RollingFile and SizeBasedTriggeringPolicy is 2MB in size.


Topology configuration

conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
  • You can register LoggingMetricsConsumer; in conf when topology is submitted. This configuration is only valid for the worker of the topology, that is, if there is index data, it will be written into the worker.log.metrics file of the topology.

Storm.yaml configuration

  - class: "org.apache.storm.metric.LoggingMetricsConsumer"
    max.retain.metric.tuples: 100
    parallelism.hint: 1
  - class: "org.apache.storm.metric.HttpForwardingMetricsConsumer"
    parallelism.hint: 1
    argument: ""
  • Storm.yaml configuration is applied to all topologies. note that the configuration here is topology. metrics. consumer. register, which is topology level, and the data is written into the worker.log.metrics file.
  • For cluster level, Storm.cluster.metrics.consumer.register is configured, and only storm.yaml configuration can be used. if this is turned on, index data will be written into nimbus.log.metrics and supervisor.log.metrics files
  • The log4j configuration parameters used to start nimbus and supervisor are-dlog4j.configurationfile =/Apache-storm/log4j2/cluster.xml; The log4j configuration parameter used to start woker is-dlog4j.configurationfile =/Apache-storm/log4j2/worker.xml; The parameters of each component are nimbus.log, supervisor.log, worker.log



public class MetricsConsumerBolt implements IBolt {
    public static final Logger LOG = LoggerFactory.getLogger(MetricsConsumerBolt.class);
    private final int _maxRetainMetricTuples;
    private final Predicate<IMetricsConsumer.DataPoint> _filterPredicate;
    private final DataPointExpander _expander;
    private final BlockingQueue<MetricsTask> _taskQueue;
    IMetricsConsumer _metricsConsumer;
    String _consumerClassName;
    OutputCollector _collector;
    Object _registrationArgument;
    private Thread _taskExecuteThread;
    private volatile boolean _running = true;

    public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples,
                               Predicate<IMetricsConsumer.DataPoint> filterPredicate, DataPointExpander expander) {

        _consumerClassName = consumerClassName;
        _registrationArgument = registrationArgument;
        _maxRetainMetricTuples = maxRetainMetricTuples;
        _filterPredicate = filterPredicate;
        _expander = expander;

        if (_maxRetainMetricTuples > 0) {
            _taskQueue = new LinkedBlockingDeque<>(_maxRetainMetricTuples);
        } else {
            _taskQueue = new LinkedBlockingDeque<>();

    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        try {
            _metricsConsumer = (IMetricsConsumer) Class.forName(_consumerClassName).newInstance();
        } catch (Exception e) {
            throw new RuntimeException("Could not instantiate a class listed in config under section " +
                                       Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);
        _metricsConsumer.prepare(topoConf, _registrationArgument, context, collector);
        _collector = collector;
        _taskExecuteThread = new Thread(new MetricsHandlerRunnable());

    public void execute(Tuple input) {
        IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) input.getValue(0);
        Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) input.getValue(1);
        Collection<IMetricsConsumer.DataPoint> expandedDataPoints = _expander.expandDataPoints(dataPoints);
        List<IMetricsConsumer.DataPoint> filteredDataPoints = getFilteredDataPoints(expandedDataPoints);
        MetricsTask metricsTask = new MetricsTask(taskInfo, filteredDataPoints);

        while (!_taskQueue.offer(metricsTask)) {


    private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) {
        return Lists.newArrayList(Iterables.filter(dataPoints, _filterPredicate));

    public void cleanup() {
        _running = false;

    static class MetricsTask {
        private IMetricsConsumer.TaskInfo taskInfo;
        private Collection<IMetricsConsumer.DataPoint> dataPoints;

        public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {
            this.taskInfo = taskInfo;
            this.dataPoints = dataPoints;

        public IMetricsConsumer.TaskInfo getTaskInfo() {
            return taskInfo;

        public Collection<IMetricsConsumer.DataPoint> getDataPoints() {
            return dataPoints;

    class MetricsHandlerRunnable implements Runnable {

        public void run() {
            while (_running) {
                try {
                    MetricsTask task = _taskQueue.take();
                    _metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());
                } catch (InterruptedException e) {
                } catch (Throwable t) {
                    LOG.error("Exception occurred during handle metrics", t);

  • MetricsConsumerBolt created _taskQueue in the constructor. If _ MaxReservingMetricQueues is greater than 0, a bounded queue is created; otherwise, an unbounded queue is created. Read the max.retain.metric.tuples value under topology.metrics.consumer.register, and the default value is 100 if not read.
  • MetricsConsumerBolt started the MetricsHandlerRunnable thread when prepare. The thread takes out Metropolis Task from _taskQueue and then calls _ Metro polis Consumer. HandledDataPoints (Task. GetTaskingfo (), Task. GetDataPoints ());
  • The execute method of MetricsConsumerBolt will add data to _taskQueue when it receives tuple. if it cannot be added, poll will drop one and add another.



    protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {

        StormTopology ret = topology.deepCopy();
        addAcker(topoConf, ret);
        if (hasEventLoggers(topoConf)) {
            addEventLogger(topoConf, ret);
        addMetricComponents(topoConf, ret);
        addSystemComponents(topoConf, ret);


        return ret;

    public static void addMetricComponents(Map<String, Object> conf, StormTopology topology) {
        Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology);
        for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {
            topology.put_to_bolts(entry.getKey(), entry.getValue());

    public static void addMetricStreams(StormTopology topology) {
        for (Object component : allComponents(topology).values()) {
            ComponentCommon common = getComponentCommon(component);
            StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
            common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);

    public static Map<String, Bolt> metricsConsumerBoltSpecs(Map<String, Object> conf, StormTopology topology) {
        Map<String, Bolt> metricsConsumerBolts = new HashMap<>();

        Set<String> componentIdsEmitMetrics = new HashSet<>();

        Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
        for (String componentId : componentIdsEmitMetrics) {
            inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());

        List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
        if (registerInfo != null) {
            Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
            for (Map<String, Object> info : registerInfo) {
                String className = (String) info.get(TOPOLOGY_METRICS_CONSUMER_CLASS);
                Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT);
                Integer maxRetainMetricTuples = ObjectReader.getInt(info.get(
                Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1);
                Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
                metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
                List<String> whitelist = (List<String>) info.get(
                List<String> blacklist = (List<String>) info.get(
                FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist);
                Boolean expandMapType = ObjectReader.getBoolean(info.get(
                String metricNameSeparator = ObjectReader.getString(info.get(
                DataPointExpander expander = new DataPointExpander(expandMapType, metricNameSeparator);
                MetricsConsumerBolt boltInstance = new MetricsConsumerBolt(className, argument,
                                                                           maxRetainMetricTuples, filterPredicate, expander);
                Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
                                                                               boltInstance, null, phintNum, metricsConsumerConf);

                String id = className;
                if (classOccurrencesMap.containsKey(className)) {
                    // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
                    int occurrenceNum = classOccurrencesMap.get(className);
                    classOccurrencesMap.put(className, occurrenceNum);
                    id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum;
                } else {
                    id = Constants.METRICS_COMPONENT_ID_PREFIX + className;
                    classOccurrencesMap.put(className, 1);
                metricsConsumerBolts.put(id, metricsConsumerBolt);
        return metricsConsumerBolts;
  • StormCommon will add some system components when creating systemTopologyImpl. addMetricComponents and addMetricStreams are called here
  • AddMetricComponents creates MetricsConsumerBolt based on conf, and uses shuffle and Constants.METRICS_STREAM_ID to specify all component as input sources
  • AddMetricStreams configured each component with output data to Constants.METRICS_STREAM_ID, and the output field is arrays.aslist (“task-info”, “data-points”)



    protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;

    protected void setupMetrics() {
        for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
            StormTimer timerTask = workerData.getUserTimer();
            timerTask.scheduleRecurring(interval, interval,
                                        () -> {
                                            TupleImpl tuple =
                                                new TupleImpl(workerTopologyContext, new Values(interval), Constants.SYSTEM_COMPONENT_ID,
                                                              (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
                                            AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
                                            try {
                                                receiveQueue.flush();  // avoid buffering
                                            } catch (InterruptedException e) {
                                                LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag.");

    public Map<Integer, Map<Integer, Map<String, IMetric>>> getIntervalToTaskToMetricToRegistry() {
        return intervalToTaskToMetricToRegistry;
  • Executor sets a timing task in the setupMetrics method and sends metricsTickTuple to METRICS_TICK_STREAM_ID in a BROADCAST_DEST manner
  • This is configured according to intervalTotAskToMetricatoregistry, and its key is Interval.
  • IntervalTotAskToMetricatoregistry is initialized in the Executor constructor: IntervalTotAskToMetricatoregistry = newHashMap < > ()



    private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
        Map<String, Object> conf = workerData.getConf();
        return new TopologyContext(
            // This is updated by the Worker and the topology has shared access to it
                ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),
            ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
            workerData.getPort(), workerData.getLocalTaskIds(),
  • The mkTopologyContext method passed in executor.getintervaltotasktomitrictoregistry () when creating the TopologyContext



public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
    private Integer _taskId;
    private Map<String, Object> _taskData = new HashMap<>();
    private List<ITaskHook> _hooks = new ArrayList<>();
    private Map<String, Object> _executorData;
    private Map<Integer, Map<Integer, Map<String, IMetric>>> _registeredMetrics;

    public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
        if (_openOrPrepareWasCalled.get()) {
            throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
                                       "IBolt::prepare() or ISpout::open() method.");

        if (metric == null) {
            throw new IllegalArgumentException("Cannot register a null metric");

        if (timeBucketSizeInSecs <= 0) {
            throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " +
                                               "greater than or equal to 1 second.");

        if (getRegisteredMetricByName(name) != null) {
            throw new RuntimeException("The same metric name `" + name + "` was registered twice.");

        Map<Integer, Map<Integer, Map<String, IMetric>>> m1 = _registeredMetrics;
        if (!m1.containsKey(timeBucketSizeInSecs)) {
            m1.put(timeBucketSizeInSecs, new HashMap<Integer, Map<String, IMetric>>());

        Map<Integer, Map<String, IMetric>> m2 = m1.get(timeBucketSizeInSecs);
        if (!m2.containsKey(_taskId)) {
            m2.put(_taskId, new HashMap<String, IMetric>());

        Map<String, IMetric> m3 = m2.get(_taskId);
        if (m3.containsKey(name)) {
            throw new RuntimeException("The same metric name `" + name + "` was registered twice.");
        } else {
            m3.put(name, metric);

        return metric;

  • Executor’s intervaltotasktomistrictegistry was finally passed to TopologyContext’s _registeredMetrics
  • The registerMetric method adds a value to _ RegisterMetrics, whose key is timeBucketSizeInSecs
  • TimeBucketSizeInSecs with built-in metrics reads config.topology _ builtin _ metrics _ bucket _ size _ secs (topology.builtin.metrics.bucket.size.secs) value, which defaults to 60 in defaults.yaml, that is, Executor launches metricsTickTuple every 60 seconds, and its streamId is constants.metrics _ tick _ stream _ id



    public void metricsTick(Task task, TupleImpl tuple) {
        try {
            Integer interval = tuple.getInteger(0);
            int taskId = task.getTaskId();
            Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = intervalToTaskToMetricToRegistry.get(interval);
            Map<String, IMetric> nameToRegistry = null;
            if (taskToMetricToRegistry != null) {
                nameToRegistry = taskToMetricToRegistry.get(taskId);
            if (nameToRegistry != null) {
                IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
                    hostname, workerTopologyContext.getThisWorkerPort(),
                    componentId, taskId, Time.currentTimeSecs(), interval);
                List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
                for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {
                    IMetric metric = entry.getValue();
                    Object value = metric.getValueAndReset();
                    if (value != null) {
                        IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
                if (!dataPoints.isEmpty()) {
                                        new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
  • SpoutExecutor and BoltExecutor will call the parent class Executor.metricsTick method when they receive a tuple with streamId CONSTANTS.METRICS _ TICK _ STREAM _ ID in tupleActionFn
  • MetricsTick uses task.sendunchored (constants.metrics _ stream _ id, newvalues (taskinfo, datapoints), executortransfer, pendemits); Transmit data to Constants.METRICS_STREAM_ID, values are taskInfo and dataPoints;; Data of dataPoints is read from _registeredMetrics of TopologyContext (This uses an older version of metrics, not V2)
  • After receiving the data, MetricsConsumerBolt puts it into the _taskQueue queue. At the same time, the MetricsHandlerRunnable thread will block fetching data from _taskQueue and then call back the _ Metropolis Consumer. HandleDataPoints method to consume the data.


  • LoggingMetricsConsumer is provided by storm metric and not in metrics2. Nimbus and supervisor use-dlog4j.configurationfile =/Apache-storm/log4j2/cluster.xml; The worker uses-dlog4j.configurationfile =/Apache-storm/log4j2/worker.xml; The of each component is nimbus.log, supervisor.log, worker.log
  • Storm will add the component of the system when building topology, including adding metricsConsumerBolt and metricStreams; ; At the same time, Executor will setupMetrics in init method and launch metricsTickTuple; regularly. SpoutExecutor and BoltExecutor will call metricsTick method to transmit the production data to Constants.METRICS_STREAM_ID when tupleAction Fn receives metricsTickTuple. After that, MetricsConsumerBolt can receive the data and call back the _ Metrics CONSUMER. HandleDataPoints method to consume the data.
  • Two parameters should be noted here. One is max.retain.metric.tuples used in MetricsConsumerBolt, which is configured under TOPOLOGY. METRICS. CONSUMER. REGISTER. If it is not configured, it defaults to 100; It is the size of the _taskQueue queue in MetricsConsumerBolt. If it is set to 0, it means unbounded. Interval with built-in metrics reads config.topology _ builtin _ metrics _ bucket _ size _ secs (topology.builtin.metrics.bucket.size.secs) parameter, the default is 60, that is, 60 seconds to launch a metricsTickTuple