Talk about storm’s reportError

  storm

Order

This article mainly studies storm’s reportError

IErrorReporter

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java

public interface IErrorReporter {
    void reportError(Throwable error);
}
  • ISpoutOutputCollector, IOutputCollector, IBasicOutputCollector all inherit the IErrorReporter interface.

ISpoutOutputCollector

storm-core/1.2.2/storm-core-1.2.2-sources.jar! /org/apache/storm/spout/ISpoutOutputCollector.java

public interface ISpoutOutputCollector extends IErrorReporter{
    /**
        Returns the task ids that received the tuples.
    */
    List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
    void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
    long getPendingCount();
}
  • The implementation classes of ISpoutOutputCollector include SPOUTPUTOUTPUTCULECTOR, SpoutOutputCollectorImpl, etc.

IOutputCollector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java

public interface IOutputCollector extends IErrorReporter {
    /**
     * Returns the task ids that received the tuples.
     */
    List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);

    void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);

    void ack(Tuple input);

    void fail(Tuple input);

    void resetTimeout(Tuple input);

    void flush();
}
  • The implementation classes of IOutputCollector include OutputCollector, BoltOutputCollectorImpl, etc.

IBasicOutputCollector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java

public interface IBasicOutputCollector extends IErrorReporter {
    List<Integer> emit(String streamId, List<Object> tuple);

    void emitDirect(int taskId, String streamId, List<Object> tuple);

    void resetTimeout(Tuple tuple);
}
  • The implementation class of IBasicOutputCollector is basiccoutputcollector.

reportError

SpoutOutputCollectorImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java

    @Override
    public void reportError(Throwable error) {
        executor.getErrorReportingMetrics().incrReportedErrorCount();
        executor.getReportError().report(error);
    }

BoltOutputCollectorImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java

    @Override
    public void reportError(Throwable error) {
        executor.getErrorReportingMetrics().incrReportedErrorCount();
        executor.getReportError().report(error);
    }

You can see the reportError methods of SpoutOutputCollectorImpl and BoltOutputCollectorImpl, both calling Executor. GetReportError (). Report (Error);

ReportError.report

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java

public class ReportError implements IReportError {

    private static final Logger LOG = LoggerFactory.getLogger(ReportError.class);

    private final Map<String, Object> topoConf;
    private final IStormClusterState stormClusterState;
    private final String stormId;
    private final String componentId;
    private final WorkerTopologyContext workerTopologyContext;

    private int maxPerInterval;
    private int errorIntervalSecs;
    private AtomicInteger intervalStartTime;
    private AtomicInteger intervalErrors;

    public ReportError(Map<String, Object> topoConf, IStormClusterState stormClusterState, String stormId, String componentId,
                       WorkerTopologyContext workerTopologyContext) {
        this.topoConf = topoConf;
        this.stormClusterState = stormClusterState;
        this.stormId = stormId;
        this.componentId = componentId;
        this.workerTopologyContext = workerTopologyContext;
        this.errorIntervalSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS));
        this.maxPerInterval = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL));
        this.intervalStartTime = new AtomicInteger(Time.currentTimeSecs());
        this.intervalErrors = new AtomicInteger(0);
    }

    @Override
    public void report(Throwable error) {
        LOG.error("Error", error);
        if (Time.deltaSecs(intervalStartTime.get()) > errorIntervalSecs) {
            intervalErrors.set(0);
            intervalStartTime.set(Time.currentTimeSecs());
        }
        if (intervalErrors.incrementAndGet() <= maxPerInterval) {
            try {
                stormClusterState.reportError(stormId, componentId, Utils.hostname(),
                                              workerTopologyContext.getThisWorkerPort().longValue(), error);
            } catch (UnknownHostException e) {
                throw Utils.wrapInRuntime(e);
            }

        }
    }
}
  • It can be seen here that whether interval needs to be reset is determined first, and then whether error exceeds the maximum number of intervals. if not, stormClusterState.reportError is called to write to storage, such as zk

StormClusterStateImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java

    @Override
    public void reportError(String stormId, String componentId, String node, Long port, Throwable error) {
        String path = ClusterUtils.errorPath(stormId, componentId);
        String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
        ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs());
        errorInfo.set_host(node);
        errorInfo.set_port(port.intValue());
        byte[] serData = Utils.serialize(errorInfo);
        stateStorage.mkdirs(path, defaultAcls);
        stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, defaultAcls);
        stateStorage.set_data(lastErrorPath, serData, defaultAcls);
        List<String> childrens = stateStorage.get_children(path, false);

        Collections.sort(childrens, new Comparator<String>() {
            public int compare(String arg0, String arg1) {
                return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1)));
            }
        });

        while (childrens.size() > 10) {
            String znodePath = path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0);
            try {
                stateStorage.delete_node(znodePath);
            } catch (Exception e) {
                if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
                    // if the node is already deleted, do nothing
                    LOG.warn("Could not find the znode: {}", znodePath);
                } else {
                    throw e;
                }
            }
        }
    }
  • Here, we use clusterutils.errorpath (storeid, componentid) to get the directory written, and then we use clusterutils.lasterrorpath (storeid, componentid) to get the path written.
  • Since zk is not suitable for storing a large amount of data, it will be judged here that if childrens exceed 10, redundant nodes will be deleted. here, the nodes will be sorted in ascending order according to the node name substring(1), and then deleted one by one.

ClusterUtils.errorPath

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java

    public static final String ZK_SEPERATOR = "/";

    public static final String ERRORS_ROOT = "errors";

    public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT;

    public static String errorPath(String stormId, String componentId) {
        try {
            return errorStormRoot(stormId) + ZK_SEPERATOR + URLEncoder.encode(componentId, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public static String lastErrorPath(String stormId, String componentId) {
        return errorPath(stormId, componentId) + "-last-error";
    }

    public static String errorStormRoot(String stormId) {
        return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;
    }
  • The path of errorPath is/errors/{storeid}/{componentid}, under which an EPHEMERAL_SEQUENTIAL node beginning with e is created. error information is first appended to this directory, and then it is judged that if more than 10 nodes are deleted.
  • The path of lastErrorPath is/errors/{storeid}/{componentId}-last-error, which is used to store the last error of the componentid

ZkCli view

[zk: localhost:2181(CONNECTED) 21] ls /storm/errors
[DRPCStateQuery-1-1540185943, reportErrorDemo-1-1540260375]
[zk: localhost:2181(CONNECTED) 22] ls /storm/errors/reportErrorDemo-1-1540260375
[print, print-last-error]
[zk: localhost:2181(CONNECTED) 23] ls /storm/errors/reportErrorDemo-1-1540260375/print
[e0000000291, e0000000290, e0000000295, e0000000294, e0000000293, e0000000292, e0000000299, e0000000298, e0000000297, e0000000296]
[zk: localhost:2181(CONNECTED) 24] ls /storm/errors/reportErrorDemo-1-1540260375/print/e0000000299
[]
[zk: localhost:2181(CONNECTED) 25] ls /storm/errors/reportErrorDemo-1-1540260375/print-last-error
[]

storm-ui

curl -i http://192.168.99.100:8080/api/v1/topology/reportErrorDemo-1-1540260375?sys=false
  • Storm-ui requested the above interface and obtained topology-related data, of which spout or bolt includes lastError, showing the latest error information

StormApiResource

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java

    /**
     * /api/v1/topology -> topo.
     */
    @GET
    @Path("/topology/{id}")
    @AuthNimbusOp(value = "getTopology", needsTopoId = true)
    @Produces("application/json")
    public Response getTopology(@PathParam("id") String id,
                                @DefaultValue(":all-time") @QueryParam("window") String window,
                                @QueryParam("sys") boolean sys,
                                @QueryParam(callbackParameterName) String callback) throws TException {
        topologyPageRequestMeter.mark();
        try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(config)) {
            return UIHelpers.makeStandardResponse(
                    UIHelpers.getTopologySummary(
                            nimbusClient.getClient().getTopologyPageInfo(id, window, sys),
                            window, config,
                            servletRequest.getRemoteUser()
                    ),
                    callback
            );
        }
    }
  • The nimbusclient.getclient (). gettopologypageinfo (id, window, sys) method is called here

Nimbus.getTopologyPageInfo

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

    @Override
    public TopologyPageInfo getTopologyPageInfo(String topoId, String window, boolean includeSys)
        throws NotAliveException, AuthorizationException, TException {
        try {
            getTopologyPageInfoCalls.mark();
            CommonTopoInfo common = getCommonTopoInfo(topoId, "getTopologyPageInfo");
            String topoName = common.topoName;
            IStormClusterState state = stormClusterState;
            int launchTimeSecs = common.launchTimeSecs;
            Assignment assignment = common.assignment;
            Map<List<Integer>, Map<String, Object>> beats = common.beats;
            Map<Integer, String> taskToComp = common.taskToComponent;
            StormTopology topology = common.topology;
            Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
            StormBase base = common.base;
            if (base == null) {
                throw new WrappedNotAliveException(topoId);
            }
            Map<WorkerSlot, WorkerResources> workerToResources = getWorkerResourcesForTopology(topoId);
            List<WorkerSummary> workerSummaries = null;
            Map<List<Long>, List<Object>> exec2NodePort = new HashMap<>();
            if (assignment != null) {
                Map<List<Long>, NodeInfo> execToNodeInfo = assignment.get_executor_node_port();
                Map<String, String> nodeToHost = assignment.get_node_host();
                for (Entry<List<Long>, NodeInfo> entry : execToNodeInfo.entrySet()) {
                    NodeInfo ni = entry.getValue();
                    List<Object> nodePort = Arrays.asList(ni.get_node(), ni.get_port_iterator().next());
                    exec2NodePort.put(entry.getKey(), nodePort);
                }

                workerSummaries = StatsUtil.aggWorkerStats(topoId,
                                                           topoName,
                                                           taskToComp,
                                                           beats,
                                                           exec2NodePort,
                                                           nodeToHost,
                                                           workerToResources,
                                                           includeSys,
                                                           true); //this is the topology page, so we know the user is authorized
            }

            TopologyPageInfo topoPageInfo = StatsUtil.aggTopoExecsStats(topoId,
                                                                        exec2NodePort,
                                                                        taskToComp,
                                                                        beats,
                                                                        topology,
                                                                        window,
                                                                        includeSys,
                                                                        state);

            //......
            return topoPageInfo;
        } catch (Exception e) {
            LOG.warn("Get topo page info exception. (topology id='{}')", topoId, e);
            if (e instanceof TException) {
                throw (TException) e;
            }
            throw new RuntimeException(e);
        }
    }
  • StatsUtil.aggTopoExecsStats is called here to get TopologyPageInfo.

StatsUtil.aggTopoExecsStats

storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java

    /**
     * aggregate topo executors stats.
     *
     * @param topologyId     topology id
     * @param exec2nodePort  executor -> host+port
     * @param task2component task -> component
     * @param beats          executor[start, end] -> executor heartbeat
     * @param topology       storm topology
     * @param window         the window to be aggregated
     * @param includeSys     whether to include system streams
     * @param clusterState   cluster state
     * @return TopologyPageInfo thrift structure
     */
    public static TopologyPageInfo aggTopoExecsStats(
        String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>, Map<String, Object>> beats,
        StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) {
        List<Map<String, Object>> beatList = extractDataFromHb(exec2nodePort, task2component, beats, includeSys, topology);
        Map<String, Object> topoStats = aggregateTopoStats(window, includeSys, beatList);
        return postAggregateTopoStats(task2component, exec2nodePort, topoStats, topologyId, clusterState);
    }
  • The StatsUtil.aggTopoExecsStats method finally called the postAggregateTopoStats method

StatsUtil.postAggregateTopoStats

storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java

    private static TopologyPageInfo postAggregateTopoStats(Map task2comp, Map exec2nodePort, Map<String, Object> accData,
                                                          String topologyId, IStormClusterState clusterState) {
        TopologyPageInfo ret = new TopologyPageInfo(topologyId);

        ret.set_num_tasks(task2comp.size());
        ret.set_num_workers(((Set) accData.get(WORKERS_SET)).size());
        ret.set_num_executors(exec2nodePort != null ? exec2nodePort.size() : 0);

        Map bolt2stats = ClientStatsUtil.getMapByKey(accData, BOLT_TO_STATS);
        Map<String, ComponentAggregateStats> aggBolt2stats = new HashMap<>();
        for (Object o : bolt2stats.entrySet()) {
            Map.Entry e = (Map.Entry) o;
            Map m = (Map) e.getValue();
            long executed = getByKeyOr0(m, EXECUTED).longValue();
            if (executed > 0) {
                double execLatencyTotal = getByKeyOr0(m, EXEC_LAT_TOTAL).doubleValue();
                m.put(EXEC_LATENCY, execLatencyTotal / executed);

                double procLatencyTotal = getByKeyOr0(m, PROC_LAT_TOTAL).doubleValue();
                m.put(PROC_LATENCY, procLatencyTotal / executed);
            }
            m.remove(EXEC_LAT_TOTAL);
            m.remove(PROC_LAT_TOTAL);
            String id = (String) e.getKey();
            m.put("last-error", getLastError(clusterState, topologyId, id));

            aggBolt2stats.put(id, thriftifyBoltAggStats(m));
        }

        //......

        return ret;
    }

    private static ErrorInfo getLastError(IStormClusterState stormClusterState, String stormId, String compId) {
        return stormClusterState.lastError(stormId, compId);
    }
  • Here is the addition of last-error, called by getLastError, and then converted to thrift object by thriftifyBoltAggStats.
  • Stormclusterstate.lasterror (storeid, compid) is called here to get last-error

UIHelpers.getTopologySummary

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java

    /**
     * getTopologySummary.
     * @param topologyPageInfo topologyPageInfo
     * @param window window
     * @param config config
     * @param remoteUser remoteUser
     * @return getTopologySummary
     */
    public static Map<String, Object> getTopologySummary(TopologyPageInfo topologyPageInfo,
                                                         String window, Map<String, Object> config, String remoteUser) {
        Map<String, Object> result = new HashMap();
        Map<String, Object> topologyConf = (Map<String, Object>) JSONValue.parse(topologyPageInfo.get_topology_conf());
        long messageTimeout = (long) topologyConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
        Map<String, Object> unpackedTopologyPageInfo =
                unpackTopologyInfo(topologyPageInfo, window, config);
        result.putAll(unpackedTopologyPageInfo);
        result.put("user", remoteUser);
        result.put("window", window);
        result.put("windowHint", getWindowHint(window));
        result.put("msgTimeout", messageTimeout);
        result.put("configuration", topologyConf);
        result.put("visualizationTable", new ArrayList());
        result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
        return result;
    }
  • After obtaining TopologyPageInfo, UIHelpers.getTopologySummary unpacks it.

UIHelpers.unpackTopologyInfo

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java

    /**
     * unpackTopologyInfo.
     * @param topologyPageInfo topologyPageInfo
     * @param window window
     * @param config config
     * @return unpackTopologyInfo
     */
    private static Map<String,Object> unpackTopologyInfo(TopologyPageInfo topologyPageInfo, String window, Map<String,Object> config) {
        Map<String, Object> result = new HashMap();
        result.put("id", topologyPageInfo.get_id());
        //......

        Map<String, ComponentAggregateStats> spouts = topologyPageInfo.get_id_to_spout_agg_stats();
        List<Map> spoutStats = new ArrayList();

        for (Map.Entry<String, ComponentAggregateStats> spoutEntry : spouts.entrySet()) {
            spoutStats.add(getTopologySpoutAggStatsMap(spoutEntry.getValue(), spoutEntry.getKey()));
        }
        result.put("spouts", spoutStats);

        Map<String, ComponentAggregateStats> bolts = topologyPageInfo.get_id_to_bolt_agg_stats();
        List<Map> boltStats = new ArrayList();

        for (Map.Entry<String, ComponentAggregateStats> boltEntry : bolts.entrySet()) {
            boltStats.add(getTopologyBoltAggStatsMap(boltEntry.getValue(), boltEntry.getKey()));
        }
        result.put("bolts", boltStats);

        //......
        result.put("samplingPct", samplingPct);
        result.put("replicationCount", topologyPageInfo.get_replication_count());
        result.put("topologyVersion", topologyPageInfo.get_topology_version());
        result.put("stormVersion", topologyPageInfo.get_storm_version());
        return result;
    }

    /**
     * getTopologySpoutAggStatsMap.
     * @param componentAggregateStats componentAggregateStats
     * @param spoutId spoutId
     * @return getTopologySpoutAggStatsMap
     */
    private static Map<String, Object> getTopologySpoutAggStatsMap(ComponentAggregateStats componentAggregateStats,
                                                                   String spoutId) {
        Map<String, Object> result = new HashMap();
        CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
        result.putAll(getCommonAggStatsMap(commonStats));
        result.put("spoutId", spoutId);
        result.put("encodedSpoutId", URLEncoder.encode(spoutId));
        SpoutAggregateStats spoutAggregateStats = componentAggregateStats.get_specific_stats().get_spout();
        result.put("completeLatency", spoutAggregateStats.get_complete_latency_ms());
        ErrorInfo lastError = componentAggregateStats.get_last_error();
        result.put("lastError", Objects.isNull(lastError) ?  "" : getTruncatedErrorString(lastError.get_error()));
        return result;
    }

    /**
     * getTopologyBoltAggStatsMap.
     * @param componentAggregateStats componentAggregateStats
     * @param boltId boltId
     * @return getTopologyBoltAggStatsMap
     */
    private static Map<String, Object> getTopologyBoltAggStatsMap(ComponentAggregateStats componentAggregateStats,
                                                                  String boltId) {
        Map<String, Object> result = new HashMap();
        CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
        result.putAll(getCommonAggStatsMap(commonStats));
        result.put("boltId", boltId);
        result.put("encodedBoltId", URLEncoder.encode(boltId));
        BoltAggregateStats boltAggregateStats = componentAggregateStats.get_specific_stats().get_bolt();
        result.put("capacity", StatsUtil.floatStr(boltAggregateStats.get_capacity()));
        result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));
        result.put("executed", boltAggregateStats.get_executed());
        result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));
        ErrorInfo lastError = componentAggregateStats.get_last_error();
        result.put("lastError", Objects.isNull(lastError) ?  "" : getTruncatedErrorString(lastError.get_error()));
        return result;
    }

    /**
     * getTruncatedErrorString.
     * @param errorString errorString
     * @return getTruncatedErrorString
     */
    private static String getTruncatedErrorString(String errorString) {
        return errorString.substring(0, Math.min(errorString.length(), 200));
    }
  • Note that getTopologySpoutAggStatsMap is called here for spout and getTopologyBoltAggStatsMap is called for bolt.
  • Both of these methods have carried out getTruncatedErrorString processing on lastError, with maximum substring(0,200)

crash log

2018-10-23 02:53:28.118 o.a.s.util Thread-10-print-executor[7 7] [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~[storm-core-1.2.2.jar:1.2.2]
    at com.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26) ~[stormjar.jar:?]
    at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]
    ... 6 more
2018-10-23 02:53:28.129 o.a.s.d.executor Thread-10-print-executor[7 7] [ERROR]
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~[storm-core-1.2.2.jar:1.2.2]
    at com.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26) ~[stormjar.jar:?]
    at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]
    ... 6 more
2018-10-23 02:53:28.175 o.a.s.util Thread-10-print-executor[7 7] [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.worker$fn__11404$fn__11405.invoke(worker.clj:792) [storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.daemon.executor$mk_executor_data$fn__10612$fn__10613.invoke(executor.clj:281) [storm-core-1.2.2.jar:1.2.2]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:494) [storm-core-1.2.2.jar:1.2.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
2018-10-23 02:53:28.176 o.a.s.d.worker Thread-41 [INFO] Shutting down worker reportErrorDemo-2-1540263136 f9856902-cfe9-45c7-b675-93a29d3d3d36 6700
2018-10-23 02:53:28.177 o.a.s.d.worker Thread-41 [INFO] Terminating messaging context
2018-10-23 02:53:28.177 o.a.s.d.worker Thread-41 [INFO] Shutting down executors
2018-10-23 02:53:28.177 o.a.s.d.executor Thread-41 [INFO] Shutting down executor spout:[8 8]
2018-10-23 02:53:28.182 o.a.s.util Thread-3-disruptor-executor[8 8]-send-queue [INFO] Async loop interrupted!
2018-10-23 02:53:28.186 o.a.s.util Thread-4-spout-executor[8 8] [INFO] Async loop interrupted!
2018-10-23 02:53:28.188 o.a.s.d.executor Thread-41 [INFO] Shut down executor spout:[8 8]
2018-10-23 02:53:28.188 o.a.s.d.executor Thread-41 [INFO] Shutting down executor spout:[12 12]
2018-10-23 02:53:28.189 o.a.s.util Thread-5-disruptor-executor[12 12]-send-queue [INFO] Async loop interrupted!
2018-10-23 02:53:28.190 o.a.s.util Thread-6-spout-executor[12 12] [INFO] Async loop interrupted!
2018-10-23 02:53:28.190 o.a.s.d.executor Thread-41 [INFO] Shut down executor spout:[12 12]
2018-10-23 02:53:28.190 o.a.s.d.executor Thread-41 [INFO] Shutting down executor count:[2 2]
2018-10-23 02:53:28.191 o.a.s.util Thread-7-disruptor-executor[2 2]-send-queue [INFO] Async loop interrupted!
2018-10-23 02:53:28.193 o.a.s.util Thread-8-count-executor[2 2] [INFO] Async loop interrupted!
2018-10-23 02:53:28.194 o.a.s.d.executor Thread-41 [INFO] Shut down executor count:[2 2]
2018-10-23 02:53:28.194 o.a.s.d.executor Thread-41 [INFO] Shutting down executor print:[7 7]
2018-10-23 02:53:28.196 o.a.s.util Thread-9-disruptor-executor[7 7]-send-queue [INFO] Async loop interrupted!

Summary

  • In the spout or bolt method, if an exception is thrown, the entire worker die will be dropped, and the exception will be automatically recorded to zk, but the price is that the worker die will be restarted continuously.
  • Reporterror can be used in combination with try catch, so that when there is an exception, the worker will not die off, and the Error information will be recorded at the same time. However, the same component of a topology only records the last 10 exceptions, which are saved by using the EPHEMERAL_SEQUENTIAL node and destroyed with worker’s die. LastError uses the PERSISTENT node. Both kill be deleted when topology is killed.
  • Storm-ui shows the lastError information of each component, and the maximum length of error information displayed is 200

doc