Talk about storm’s IEventLogger

  storm

Order

This article mainly studies storm’s IEventLogger

IEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/IEventLogger.java

/**
 * EventLogger interface for logging the event info to a sink like log file or db for inspecting the events via UI for debugging.
 */
public interface IEventLogger {

    void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context);

    /**
     * This method would be invoked when the {@link EventLoggerBolt} receives a tuple from the spouts or bolts that has event logging
     * enabled.
     *
     * @param e the event
     */
    void log(EventInfo e);

    void close();

    /**
     * A wrapper for the fields that we would log.
     */
    class EventInfo {
        private long ts;
        private String component;
        private int task;
        private Object messageId;
        private List<Object> values;

        public EventInfo(long ts, String component, int task, Object messageId, List<Object> values) {
            this.ts = ts;
            this.component = component;
            this.task = task;
            this.messageId = messageId;
            this.values = values;
        }

        public long getTs() {
            return ts;
        }

        public String getComponent() {
            return component;
        }

        public int getTask() {
            return task;
        }

        public Object getMessageId() {
            return messageId;
        }

        public List<Object> getValues() {
            return values;
        }

        /**
         * Returns a default formatted string with fields separated by ","
         *
         * @return a default formatted string with fields separated by ","
         */
        @Override
        public String toString() {
            return new Date(ts).toString() + "," + component + "," + String.valueOf(task) + ","
                   + (messageId == null ? "" : messageId.toString()) + "," + values.toString();
        }
    }
}
  • IEventLogger defines the log method as well as the EventInfo object.

FileBasedEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java

public class FileBasedEventLogger implements IEventLogger {
    private static final Logger LOG = LoggerFactory.getLogger(FileBasedEventLogger.class);

    private static final int FLUSH_INTERVAL_MILLIS = 1000;

    private Path eventLogPath;
    private BufferedWriter eventLogWriter;
    private ScheduledExecutorService flushScheduler;
    private volatile boolean dirty = false;

    private void initLogWriter(Path logFilePath) {
        try {
            LOG.info("logFilePath {}", logFilePath);
            eventLogPath = logFilePath;
            eventLogWriter = Files.newBufferedWriter(eventLogPath, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
                                                     StandardOpenOption.WRITE, StandardOpenOption.APPEND);
        } catch (IOException e) {
            LOG.error("Error setting up FileBasedEventLogger.", e);
            throw new RuntimeException(e);
        }
    }


    private void setUpFlushTask() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("event-logger-flush-%d")
            .setDaemon(true)
            .build();

        flushScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    if (dirty) {
                        eventLogWriter.flush();
                        dirty = false;
                    }
                } catch (IOException ex) {
                    LOG.error("Error flushing " + eventLogPath, ex);
                    throw new RuntimeException(ex);
                }
            }
        };

        flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
    }


    @Override
    public void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context) {
        String stormId = context.getStormId();
        int port = context.getThisWorkerPort();

        /*
         * Include the topology name & worker port in the file name so that
         * multiple event loggers can log independently.
         */
        String workersArtifactRoot = ConfigUtils.workerArtifactsRoot(conf, stormId, port);

        Path path = Paths.get(workersArtifactRoot, "events.log");
        File dir = path.toFile().getParentFile();
        if (!dir.exists()) {
            dir.mkdirs();
        }
        initLogWriter(path);
        setUpFlushTask();
    }

    @Override
    public void log(EventInfo event) {
        try {
            //TODO: file rotation
            eventLogWriter.write(buildLogMessage(event));
            eventLogWriter.newLine();
            dirty = true;
        } catch (IOException ex) {
            LOG.error("Error logging event {}", event, ex);
            throw new RuntimeException(ex);
        }
    }

    protected String buildLogMessage(EventInfo event) {
        return event.toString();
    }

    @Override
    public void close() {
        try {
            eventLogWriter.close();

        } catch (IOException ex) {
            LOG.error("Error closing event log.", ex);
        }

        closeFlushScheduler();
    }

    private void closeFlushScheduler() {
        if (flushScheduler != null) {
            flushScheduler.shutdown();
            try {
                if (!flushScheduler.awaitTermination(2, TimeUnit.SECONDS)) {
                    flushScheduler.shutdownNow();
                }
            } catch (InterruptedException ie) {
                // (Re-)Cancel if current thread also interrupted
                flushScheduler.shutdownNow();
                // Preserve interrupt status
                Thread.currentThread().interrupt();
            }
        }
    }
}
  • The default implementation of IEventLogger is FileBasedEventLogger, which starts a timed task to flush data to disk at every FLUSH_INTERVAL_MILLIS time (If it's dirty)
  • The default file path is events.log under the workersArtifactRoot directory

StormCommon.addEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java

    public static void addEventLogger(Map<String, Object> conf, StormTopology topology) {
        Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
                                                   ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
        if (numExecutors == null || numExecutors == 0) {
            return;
        }
        HashMap<String, Object> componentConf = new HashMap<>();
        componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
        componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
        Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(
            eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);

        for (Object component : allComponents(topology).values()) {
            ComponentCommon common = getComponentCommon(component);
            common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
        }
        topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
    }

    public static List<String> eventLoggerBoltFields() {
        return Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID,
                             EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES);
    }

    public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
        Set<String> allIds = new HashSet<String>();
        allIds.addAll(topology.get_bolts().keySet());
        allIds.addAll(topology.get_spouts().keySet());

        for (String id : allIds) {
            inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),
                       Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
        }
        return inputs;
    }
  • NumExecutors are read from config.topology _ event logger _ executors here. if null, the value of Config.TOPOLOGY_WORKERS is used. the default value is 0, i.e. eventlogger is disabled.
  • Config.topology _ message _ timeout _ secs is also read here as config.topology _ tick _ tuple _ freq _ secs
  • Here, EventLoggerBolt is created. the bolt uses fieldsGrouping(“component-id “) and utils. getglobalstreamid (id, eventlogger _ stream _ id) to take all spout and bolt as inputs of the bolt. So as to receive all tuple, the fields of which are ventloggerbolt.field _ component _ id, eventloggerbolt.field _ message _ id, eventloggerbolt.field _ ts, eventloggerbolt.field _ values; At the same time, a declaration output to the stream named EVENTLOGGER_STREAM_ID will be added to each spout or bolt, so that data can flow to EventLoggerBolt.

EventLoggerBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/EventLoggerBolt.java

public class EventLoggerBolt implements IBolt {

    /*
     The below field declarations are also used in common.clj to define the event logger output fields
      */
    public static final String FIELD_TS = "ts";
    public static final String FIELD_VALUES = "values";
    public static final String FIELD_COMPONENT_ID = "component-id";
    public static final String FIELD_MESSAGE_ID = "message-id";
    private static final Logger LOG = LoggerFactory.getLogger(EventLoggerBolt.class);
    private List<IEventLogger> eventLoggers;

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        LOG.info("EventLoggerBolt prepare called");

        eventLoggers = new ArrayList<>();
        List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) topoConf.get(Config.TOPOLOGY_EVENT_LOGGER_REGISTER);
        if (registerInfo != null && !registerInfo.isEmpty()) {
            initializeEventLoggers(topoConf, context, registerInfo);
        } else {
            initializeDefaultEventLogger(topoConf, context);
        }
    }

    @Override
    public void execute(Tuple input) {
        LOG.debug("** EventLoggerBolt got tuple from sourceComponent {}, with values {}", input.getSourceComponent(), input.getValues());

        Object msgId = input.getValueByField(FIELD_MESSAGE_ID);
        EventInfo eventInfo = new EventInfo(input.getLongByField(FIELD_TS), input.getSourceComponent(),
                                            input.getSourceTask(), msgId, (List<Object>) input.getValueByField(FIELD_VALUES));

        for (IEventLogger eventLogger : eventLoggers) {
            eventLogger.log(eventInfo);
        }
    }

    @Override
    public void cleanup() {
        for (IEventLogger eventLogger : eventLoggers) {
            eventLogger.close();
        }
    }

    private void initializeEventLoggers(Map<String, Object> topoConf, TopologyContext context, List<Map<String, Object>> registerInfo) {
        for (Map<String, Object> info : registerInfo) {
            String className = (String) info.get(TOPOLOGY_EVENT_LOGGER_CLASS);
            Map<String, Object> arguments = (Map<String, Object>) info.get(TOPOLOGY_EVENT_LOGGER_ARGUMENTS);

            IEventLogger eventLogger;
            try {
                eventLogger = (IEventLogger) Class.forName(className).newInstance();
            } catch (Exception e) {
                throw new RuntimeException("Could not instantiate a class listed in config under section "
                                           + Config.TOPOLOGY_EVENT_LOGGER_REGISTER + " with fully qualified name " + className, e);
            }

            eventLogger.prepare(topoConf, arguments, context);
            eventLoggers.add(eventLogger);
        }
    }

    private void initializeDefaultEventLogger(Map<String, Object> topoConf, TopologyContext context) {
        FileBasedEventLogger eventLogger = new FileBasedEventLogger();
        eventLogger.prepare(topoConf, null, context);
        eventLoggers.add(eventLogger);
    }
}
  • EventLoggerBolt reads config.topology _ event _ logger _ register information from topoConf when prepare. If registerInfo is empty, the default FileBasedEventLogger is used; otherwise, it is initialized by eventLoggers registered in registerInfo.
  • The execute method here is to iterate through eventLoggers one by one and then call the log method.

Summary

  • To open EventLogger, set the value of config.topology _ eventlogger _ executors to be greater than 0 (conf.setNumEventLoggers), which is 0 by default, i.e. disabled. If you open event logger, you can click debug of spout or bolt and then open the events link to view tuple data during debug on the interface.
  • After setting config.topology _ eventlogger _ executors to be greater than 0, if you do not set config.topology _ event _ logger _ register yourself, FileBasedEventLogger will be enabled by default. when you open debug for spout or bolt, EventInfo will be printed to events.log in the workersArtifactRoot directory.
  • If config.topology _ event _ logger _ register (conf.registerEventLogger, StormCommon uses this configuration to initialize the EventLogger, and the default FileBasedEventLogger will not be initialized if it is not set. StormCommon adds a declareStream to all spout and bolt when adding eventlogger, and outputs it to eventlogger _ stream _ id; At the same time, regarding EventLoggerBolt, all spout and bolt are taken as inputs; through fields grouping (components, utils. GetGlobalStream (ID, EventLogger _ Stream _ ID) and New Fields (“components-ID”); The fields entered into the tuple of EventLoggerBolt are eventloggerbolt.field _ component _ id, eventloggerbolt.field _ message _ id, eventloggerbolt.field _ ts, eventloggerbolt.field _ values

doc