Talk about storm’s LoggingClusterMetricsConsumer

  storm

Order

This article mainly studies storm’s LoggingClusterMetricsConsumer

LoggingClusterMetricsConsumer

storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/LoggingClusterMetricsConsumer.java

public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer {
    public static final Logger LOG = LoggerFactory.getLogger(LoggingClusterMetricsConsumer.class);

    static private String padding = "                       ";

    @Override
    public void prepare(Object registrationArgument) {
    }

    @Override
    public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) {
        StringBuilder sb = new StringBuilder();
        String header = String.format("%d\t%15s\t%40s\t",
                                      clusterInfo.getTimestamp(), "<cluster>", "<cluster>");
        sb.append(header);
        logDataPoints(dataPoints, sb, header);
    }

    @Override
    public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) {
        StringBuilder sb = new StringBuilder();
        String header = String.format("%d\t%15s\t%40s\t",
                                      supervisorInfo.getTimestamp(),
                                      supervisorInfo.getSrcSupervisorHost(),
                                      supervisorInfo.getSrcSupervisorId());
        sb.append(header);
        for (DataPoint p : dataPoints) {
            sb.delete(header.length(), sb.length());
            sb.append(p.getName())
              .append(padding).delete(header.length() + 23, sb.length()).append("\t")
              .append(p.getValue());
            LOG.info(sb.toString());
        }
    }

    @Override
    public void cleanup() {
    }

    private void logDataPoints(Collection<DataPoint> dataPoints, StringBuilder sb, String header) {
        for (DataPoint p : dataPoints) {
            sb.delete(header.length(), sb.length());
            sb.append(p.getName())
              .append(padding).delete(header.length() + 23, sb.length()).append("\t")
              .append(p.getValue());
            LOG.info(sb.toString());
        }
    }
}
  • This is a cluster-level metrics consumer and can only be configured in storm.yaml
  • Its handleDataPoints are callbacks for clustermetricsconsumer.
  • HandleDataPoints are only printed to the log file here

Storm.yaml configuration

## Cluster Metrics Consumers
storm.cluster.metrics.consumer.register:
   - class: "org.apache.storm.metric.LoggingClusterMetricsConsumer"
#   - class: "com.example.demo.metric.FixedLoggingClusterMetricsConsumer"
#     argument:
#       - endpoint: "metrics-collector.mycompany.org"
#
storm.cluster.metrics.consumer.publish.interval.secs: 5
  • Here, the consumer class is specified as LoggingClusterMetricsConsumer, and the publish interval is specified as 5 seconds.

cluster.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->

<configuration monitorInterval="60" shutdownHook="disable" packages="org.apache.logging.log4j.core,io.sentry.log4j2">
<properties>
    <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property>
    <property name="patternMetrics">%d %-8r %m%n</property>
</properties>
<appenders>
    <RollingFile name="A1" immediateFlush="false"
                 fileName="${sys:storm.log.dir}/${sys:logfile.name}"
                 filePattern="${sys:storm.log.dir}/${sys:logfile.name}.%i.gz">
        <PatternLayout>
            <pattern>${pattern}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <RollingFile name="WEB-ACCESS" immediateFlush="false"
                 fileName="${sys:storm.log.dir}/access-web-${sys:daemon.name}.log"
                 filePattern="${sys:storm.log.dir}/access-web-${sys:daemon.name}.log.%i.gz">
        <PatternLayout>
            <pattern>${pattern}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <RollingFile name="THRIFT-ACCESS" immediateFlush="false"
                 fileName="${sys:storm.log.dir}/access-${sys:logfile.name}"
                 filePattern="${sys:storm.log.dir}/access-${sys:logfile.name}.%i.gz">
    <PatternLayout>
        <pattern>${pattern}</pattern>
    </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <RollingFile name="METRICS"
                 fileName="${sys:storm.log.dir}/${sys:logfile.name}.metrics"
                 filePattern="${sys:storm.log.dir}/${sys:logfile.name}.metrics.%i.gz">
        <PatternLayout>
            <pattern>${patternMetrics}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="2 MB"/>
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"
            protocol="UDP" appName="[${sys:daemon.name}]" mdcId="mdc" includeMDC="true"
            facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"
            messageId="[${sys:user.name}:S0]" id="storm" immediateFlush="true" immediateFail="true"/>
</appenders>
<loggers>

    <Logger name="org.apache.storm.logging.filters.AccessLoggingFilter" level="info" additivity="false">
        <AppenderRef ref="WEB-ACCESS"/>
        <AppenderRef ref="syslog"/>
    </Logger>
    <Logger name="org.apache.storm.logging.ThriftAccessLogger" level="info" additivity="false">
        <AppenderRef ref="THRIFT-ACCESS"/>
        <AppenderRef ref="syslog"/>
    </Logger>
    <Logger name="org.apache.storm.metric.LoggingClusterMetricsConsumer" level="info" additivity="false">
        <appender-ref ref="METRICS"/>
    </Logger>
    <root level="info"> <!-- We log everything -->
        <appender-ref ref="A1"/>
        <appender-ref ref="syslog"/>
        <appender-ref ref="Sentry" level="ERROR" />
    </root>
</loggers>
</configuration>
  • Cluster.xml specifies the relevant configuration of metrics logging. Here, a METRICS appender is used, which is a RollingFile with the file name $ {sys: storm.log.dir}/$ {sys: logfile.name}. Metrics. For example, the default logfile.name of nimbus is nimbus.log, and the default logfile.name of supervisor is supervisor.log therefore, the files written are nimbus.log.metrics and supervisor.log.metrics

Log output instance

2018-11-06 07:51:51,488 18628    1541490711           <cluster>                                <cluster>        supervisors             1
2018-11-06 07:51:51,488 18628    1541490711           <cluster>                                <cluster>        topologies              0
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        slotsTotal              4
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        slotsUsed               0
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        slotsFree               4
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        executorsTotal          0
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        tasksTotal              0
2018-11-06 07:51:51,496 18636    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        slotsTotal              4
2018-11-06 07:51:51,497 18637    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        slotsUsed               0
2018-11-06 07:51:51,497 18637    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        totalMem                3072.0
2018-11-06 07:51:51,497 18637    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        totalCpu                400.0
2018-11-06 07:51:51,498 18638    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        usedMem                 0.0
2018-11-06 07:51:51,498 18638    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        usedCpu                 0.0

ClusterMetricsConsumerExecutor

storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java

public class ClusterMetricsConsumerExecutor {
    public static final Logger LOG = LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class);
    private static final String ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED =
        "Preparation of Cluster Metrics Consumer failed. " +
        "Please check your configuration and/or corresponding systems and relaunch Nimbus. " +
        "Skipping handle metrics.";

    private IClusterMetricsConsumer metricsConsumer;
    private String consumerClassName;
    private Object registrationArgument;

    public ClusterMetricsConsumerExecutor(String consumerClassName, Object registrationArgument) {
        this.consumerClassName = consumerClassName;
        this.registrationArgument = registrationArgument;
    }

    public void prepare() {
        try {
            metricsConsumer = (IClusterMetricsConsumer) Class.forName(consumerClassName).newInstance();
            metricsConsumer.prepare(registrationArgument);
        } catch (Exception e) {
            LOG.error("Could not instantiate or prepare Cluster Metrics Consumer with fully qualified name " +
                      consumerClassName, e);

            if (metricsConsumer != null) {
                metricsConsumer.cleanup();
            }
            metricsConsumer = null;
        }
    }

    public void handleDataPoints(final IClusterMetricsConsumer.ClusterInfo clusterInfo, final Collection<DataPoint> dataPoints) {
        if (metricsConsumer == null) {
            LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED);
            return;
        }

        try {
            metricsConsumer.handleDataPoints(clusterInfo, dataPoints);
        } catch (Throwable e) {
            LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e);
        }
    }

    public void handleDataPoints(final IClusterMetricsConsumer.SupervisorInfo supervisorInfo, final Collection<DataPoint> dataPoints) {
        if (metricsConsumer == null) {
            LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED);
            return;
        }

        try {
            metricsConsumer.handleDataPoints(supervisorInfo, dataPoints);
        } catch (Throwable e) {
            LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e);
        }
    }

    public void cleanup() {
        if (metricsConsumer != null) {
            metricsConsumer.cleanup();
        }
    }
}
  • ClusterMetricsConsumerExecutor instantiates the implementation class of IClusterMetricsConsumer according to consumerClassName when preparing, and then passes in the call Metropolis Consumer. prepare (Registration Argument) to make some preparations.
  • The handleDataPoints method of clustermetricsConsumer is actually handleDataPoints that represents metricsconsumer.
  • The handleDataPoints method has two, they all have a common parameter dataPoints, another different parameter, is a transfer is ClusterInfo, one is SupervisorInfo, respectively used for nimbus and supervisor

Nimbus.launchServer

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

    public void launchServer() throws Exception {
        try {
            BlobStore store = blobStore;
            IStormClusterState state = stormClusterState;
            NimbusInfo hpi = nimbusHostPortInfo;

            LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));
            validator.prepare(conf);

            //......

            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
                                    () -> {
                                        try {
                                            if (isLeader()) {
                                                sendClusterMetricsToExecutors();
                                            }
                                        } catch (Exception e) {
                                            throw new RuntimeException(e);
                                        }
                                    });
            
            timer.scheduleRecurring(5, 5, clusterMetricSet);
        } catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
                throw e;
            }

            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {
                throw e;
            }
            LOG.error("Error on initialization of nimbus", e);
            Utils.exitProcess(13, "Error on initialization of nimbus");
        }
    }

    private boolean isLeader() throws Exception {
        return leaderElector.isLeader();
    }
  • Nimbus’s launchServer method creates a timed task. If it is a leader node, it calls sendClusterMetricsToExecutors method to send relevant metrics to metrics consumer
  • The scheduled time interval for scheduled tasks is daemon config.store _ cluster _ metrics _ consumer _ publish _ interval _ secs (storm.cluster.metrics.consumer.publish.interval.secs), which defaults to 60 in defaults.yaml file, or can be specified by itself in storm.yaml
  • In addition to sending metrics to the metrics consumer, it also has a scheduled task to call the ClusterSummaryMetricSet thread every 5 seconds.

Nimbus.sendClusterMetricsToExecutors

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

    private void sendClusterMetricsToExecutors() throws Exception {
        ClusterInfo clusterInfo = mkClusterInfo();
        ClusterSummary clusterSummary = getClusterInfoImpl();
        List<DataPoint> clusterMetrics = extractClusterMetrics(clusterSummary);
        Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> supervisorMetrics = extractSupervisorMetrics(clusterSummary);
        for (ClusterMetricsConsumerExecutor consumerExecutor : clusterConsumerExceutors) {
            consumerExecutor.handleDataPoints(clusterInfo, clusterMetrics);
            for (Entry<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> entry : supervisorMetrics.entrySet()) {
                consumerExecutor.handleDataPoints(entry.getKey(), entry.getValue());
            }
        }
    }
  • Nimbus’s sendClusterMetricsToExecutors method extracts relevant metrics through extractClusterMetrics and extractSupervisorMetrics, and then calls ConsumerExecutor. HandleDataPoints to pass the data.

ClusterSummaryMetricSet

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

    private class ClusterSummaryMetricSet implements Runnable {
        private static final int CACHING_WINDOW = 5;
        
        private final ClusterSummaryMetrics clusterSummaryMetrics = new ClusterSummaryMetrics();
        
        private final Function<String, Histogram> registerHistogram = (name) -> {
            //This histogram reflects the data distribution across only one ClusterSummary, i.e.,
            // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment.
            // Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update
            final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
            clusterSummaryMetrics.put(name, histogram);
            return histogram;
        };
        private volatile boolean active = false;

        //NImbus metrics distribution
        private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs");

        //Supervisor metrics distribution
        private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs");
        private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers");
        private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers");
        private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem");
        private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu");
        private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem");
        private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu");

        //Topology metrics distribution
        private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks");
        private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors");
        private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers");
        private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs");
        private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count");
        private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap");
        private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap");
        private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu");
        private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap");
        private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap");
        private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu");
        private final StormMetricsRegistry metricsRegistry;

        /**
         * Constructor to put all items in ClusterSummary in MetricSet as a metric.
         * All metrics are derived from a cached ClusterSummary object,
         * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters.
         * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than
         * reporting interval to avoid outdated reporting.
         */
        ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) {
            this.metricsRegistry = metricsRegistry;
            //Break the code if out of sync to thrift protocol
            assert ClusterSummary._Fields.values().length == 3
                && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS
                && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES
                && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES;

            final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) {
                @Override
                protected ClusterSummary loadValue() {
                    try {
                        ClusterSummary newSummary = getClusterInfoImpl();
                        LOG.debug("The new summary is {}", newSummary);
                        /*
                         * Update histograms based on the new summary. Most common implementation of Reporter reports Gauges before
                         * Histograms. Because DerivativeGauge will trigger cache refresh upon reporter's query, histogram will also be
                         * updated before query
                         */
                        updateHistogram(newSummary);
                        return newSummary;
                    } catch (Exception e) {
                        LOG.warn("Get cluster info exception.", e);
                        throw new RuntimeException(e);
                    }
                }
            };

            clusterSummaryMetrics.put("cluster:num-nimbus-leaders", new DerivativeGauge<ClusterSummary, Long>(cachedSummary) {
                @Override
                protected Long transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_nimbuses().stream()
                            .filter(NimbusSummary::is_isLeader)
                            .count();
                }
            });
            clusterSummaryMetrics.put("cluster:num-nimbuses", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_nimbuses_size();
                }
            });
            clusterSummaryMetrics.put("cluster:num-supervisors", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors_size();
                }
            });
            clusterSummaryMetrics.put("cluster:num-topologies", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_topologies_size();
                }
            });
            clusterSummaryMetrics.put("cluster:num-total-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors().stream()
                            .mapToInt(SupervisorSummary::get_num_workers)
                            .sum();
                }
            });
            clusterSummaryMetrics.put("cluster:num-total-used-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors().stream()
                            .mapToInt(SupervisorSummary::get_num_used_workers)
                            .sum();
                }
            });
            clusterSummaryMetrics.put("cluster:total-fragmented-memory-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
                @Override
                protected Double transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors().stream()
                            //Filtered negative value
                            .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_mem(), 0))
                            .sum();
                }
            });
            clusterSummaryMetrics.put("cluster:total-fragmented-cpu-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
                @Override
                protected Double transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors().stream()
                            //Filtered negative value
                            .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_cpu(), 0))
                            .sum();
                }
            });
        }

        private void updateHistogram(ClusterSummary newSummary) {
            for (NimbusSummary nimbusSummary : newSummary.get_nimbuses()) {
                nimbusUptime.update(nimbusSummary.get_uptime_secs());
            }
            for (SupervisorSummary summary : newSummary.get_supervisors()) {
                supervisorsUptime.update(summary.get_uptime_secs());
                supervisorsNumWorkers.update(summary.get_num_workers());
                supervisorsNumUsedWorkers.update(summary.get_num_used_workers());
                supervisorsUsedMem.update(Math.round(summary.get_used_mem()));
                supervisorsUsedCpu.update(Math.round(summary.get_used_cpu()));
                supervisorsFragmentedMem.update(Math.round(summary.get_fragmented_mem()));
                supervisorsFragmentedCpu.update(Math.round(summary.get_fragmented_cpu()));
            }
            for (TopologySummary summary : newSummary.get_topologies()) {
                topologiesNumTasks.update(summary.get_num_tasks());
                topologiesNumExecutors.update(summary.get_num_executors());
                topologiesNumWorker.update(summary.get_num_workers());
                topologiesUptime.update(summary.get_uptime_secs());
                topologiesReplicationCount.update(summary.get_replication_count());
                topologiesRequestedMemOnHeap.update(Math.round(summary.get_requested_memonheap()));
                topologiesRequestedMemOffHeap.update(Math.round(summary.get_requested_memoffheap()));
                topologiesRequestedCpu.update(Math.round(summary.get_requested_cpu()));
                topologiesAssignedMemOnHeap.update(Math.round(summary.get_assigned_memonheap()));
                topologiesAssignedMemOffHeap.update(Math.round(summary.get_assigned_memoffheap()));
                topologiesAssignedCpu.update(Math.round(summary.get_assigned_cpu()));
            }
        }

        void setActive(final boolean active) {
            if (this.active != active) {
                this.active = active;
                if (active) {
                    metricsRegistry.registerAll(clusterSummaryMetrics);
                } else {
                    metricsRegistry.removeAll(clusterSummaryMetrics);
                }
            }
        }

        @Override
        public void run() {
            try {
                setActive(isLeader());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
  • This thread mainly calls the setActive method, and its work is to continuously judge the state change of nodes. if the leader changes, it registers clusterSummaryMetrics if it is the leader and deletes clusterSummaryMetrics if it is not the leader.
  • ClusterSummaryMetrics added clusters: num-nimbus-leaders, cluster: num-nimbus, cluster:num-supervisors, cluster:num-topologies, cluster:num-total-workers, These indicators include: num-total-used-workers, cluster: total-fragmented-memory-non-negative, cluster: total-fragmented-CPU-non-negative

Summary

  • LoggingClusterMetricsConsumer consumes cluster-level indicators. It consumes indicator data and prints it to a log file. log4j2 reads cluster.xml for c onfiguration, and nimbus.log.metrics and supervisor.log.metrics; are the last files written. However, LoggingMetricsConsumer is the worker level of topology, log4j2 configuration reads worker.xml, and the last file written is worker.log.metrics
  • Nimbus will set up a timing task when launchServer. when the current node is leader, nimbus will regularly send metrics indicators to clustermetrics consumer, and then indirectly call back the handleDataPoints method of logging ClusterMetricsConsumerExecutor print the data to the log. LoggingMetricsConsumer uses the Executor to set a timed task to transmit MetricsTickUp, triggering SpoutExecutor and BoltExecutor to send indicators to MetricsConsumerBolt built in topology. Then MetricsConsumerBolt calls back LoggingMet ropolis Consumer. HandleDataPoints method to consume the data and print the data to the log.
  • HandleDataPoints handles two types of info, one is ClusterInfo and the other is SupervisorInfo; ; Note here that the timing task will only be sent to sendClusterMetricsToExecutors if the current node is leader. Normally nimbus and supervisor are not on the same node, so supervisor.log.metrics may be empty.
  • The implementation of LoggingMetricsConsumer relies on the previous version of IMetric, while LoggingClusterMetricsConsumer does not rely on IMetric, which is an indicator obtained from IStormClusterState.
  • Storm version 1.2 introduces a new indicator system based on Dropwizard Metrics. registerMetric that return Metrics in the TopologyContext has been mar ked as Deprecated in version 1.2, so LoggingMetricsConsumer may need to be modified into MetricRegistry based on metrics2 to obtain indicators in the future.

doc