Talk about AsyncAppender of rocketmq

  mq

Order

This article mainly studies AsyncAppender of rocketmq.

AsyncAppender

org/apache/rocketmq/logging/inner/LoggingBuilder.java

   public static class AsyncAppender extends Appender implements Appender.AppenderPipeline {

        public static final int DEFAULT_BUFFER_SIZE = 128;

        private final List<LoggingEvent> buffer = new ArrayList<LoggingEvent>();

        private final Map<String, DiscardSummary> discardMap = new HashMap<String, DiscardSummary>();

        private int bufferSize = DEFAULT_BUFFER_SIZE;

        private final AppenderPipelineImpl appenderPipeline;

        private final Thread dispatcher;

        private boolean blocking = true;

        public AsyncAppender() {
            appenderPipeline = new AppenderPipelineImpl();

            dispatcher = new Thread(new Dispatcher(this, buffer, discardMap, appenderPipeline));

            dispatcher.setDaemon(true);

            dispatcher.setName("AsyncAppender-Dispatcher-" + dispatcher.getName());
            dispatcher.start();
        }

        public void addAppender(final Appender newAppender) {
            synchronized (appenderPipeline) {
                appenderPipeline.addAppender(newAppender);
            }
        }

        public void append(final LoggingEvent event) {
            if ((dispatcher == null) || !dispatcher.isAlive() || (bufferSize <= 0)) {
                synchronized (appenderPipeline) {
                    appenderPipeline.appendLoopOnAppenders(event);
                }

                return;
            }

            event.getThreadName();
            event.getRenderedMessage();

            synchronized (buffer) {
                while (true) {
                    int previousSize = buffer.size();

                    if (previousSize < bufferSize) {
                        buffer.add(event);

                        if (previousSize == 0) {
                            buffer.notifyAll();
                        }

                        break;
                    }

                    boolean discard = true;
                    if (blocking
                        && !Thread.interrupted()
                        && Thread.currentThread() != dispatcher) {
                        try {
                            buffer.wait();
                            discard = false;
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    if (discard) {
                        String loggerName = event.getLoggerName();
                        DiscardSummary summary = discardMap.get(loggerName);

                        if (summary == null) {
                            summary = new DiscardSummary(event);
                            discardMap.put(loggerName, summary);
                        } else {
                            summary.add(event);
                        }

                        break;
                    }
                }
            }
        }

        public void close() {

            synchronized (buffer) {
                closed = true;
                buffer.notifyAll();
            }

            try {
                dispatcher.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                SysLogger.error(
                    "Got an InterruptedException while waiting for the "
                        + "dispatcher to finish.", e);
            }

            synchronized (appenderPipeline) {
                Enumeration iter = appenderPipeline.getAllAppenders();
                if (iter != null) {
                    while (iter.hasMoreElements()) {
                        Object next = iter.nextElement();
                        if (next instanceof Appender) {
                            ((Appender) next).close();
                        }
                    }
                }
            }
        }

        public Enumeration getAllAppenders() {
            synchronized (appenderPipeline) {
                return appenderPipeline.getAllAppenders();
            }
        }

        public Appender getAppender(final String name) {
            synchronized (appenderPipeline) {
                return appenderPipeline.getAppender(name);
            }
        }

        public boolean isAttached(final Appender appender) {
            synchronized (appenderPipeline) {
                return appenderPipeline.isAttached(appender);
            }
        }

        public void removeAllAppenders() {
            synchronized (appenderPipeline) {
                appenderPipeline.removeAllAppenders();
            }
        }

        public void removeAppender(final Appender appender) {
            synchronized (appenderPipeline) {
                appenderPipeline.removeAppender(appender);
            }
        }

        public void removeAppender(final String name) {
            synchronized (appenderPipeline) {
                appenderPipeline.removeAppender(name);
            }
        }

        public void setBufferSize(final int size) {
            if (size < 0) {
                throw new NegativeArraySizeException("size");
            }

            synchronized (buffer) {
                bufferSize = (size < 1) ? 1 : size;
                buffer.notifyAll();
            }
        }

        public int getBufferSize() {
            return bufferSize;
        }

        public void setBlocking(final boolean value) {
            synchronized (buffer) {
                blocking = value;
                buffer.notifyAll();
            }
        }

        public boolean getBlocking() {
            return blocking;
        }

    }
  • Initialize Dispatcher, the constructor calls Dispatcher’s start, and then the close method calls dispatcher.join ()
  • The append method will determine whether buffer is enough, add events to buffer if it is enough, and discard if it is not enough, and conduct DiscardSummary statistics at the same time.
  • Dispatcher takes the buffer consumption log and makes a real append.

DiscardSummary

       private final class DiscardSummary {

            private LoggingEvent maxEvent;

            private int count;

            public DiscardSummary(final LoggingEvent event) {
                maxEvent = event;
                count = 1;
            }

            public void add(final LoggingEvent event) {
                if (event.getLevel().toInt() > maxEvent.getLevel().toInt()) {
                    maxEvent = event;
                }
                count++;
            }

            public LoggingEvent createEvent() {
                String msg =
                    MessageFormat.format(
                        "Discarded {0} messages due to full event buffer including: {1}",
                        count, maxEvent.getMessage());

                return new LoggingEvent(
                    "AsyncAppender.DONT_REPORT_LOCATION",
                    Logger.getLogger(maxEvent.getLoggerName()),
                    maxEvent.getLevel(),
                    msg,
                    null);
            }
        }
  • Record the LoggingEvent and the number of times it was discarded.

Dispatcher

        private class Dispatcher implements Runnable {

            private final AsyncAppender parent;

            private final List<LoggingEvent> buffer;

            private final Map<String, DiscardSummary> discardMap;

            private final AppenderPipelineImpl appenderPipeline;

            public Dispatcher(
                final AsyncAppender parent, final List<LoggingEvent> buffer, final Map<String, DiscardSummary> discardMap,
                final AppenderPipelineImpl appenderPipeline) {

                this.parent = parent;
                this.buffer = buffer;
                this.appenderPipeline = appenderPipeline;
                this.discardMap = discardMap;
            }

            public void run() {
                boolean isActive = true;

                try {
                    while (isActive) {
                        LoggingEvent[] events = null;

                        synchronized (buffer) {
                            int bufferSize = buffer.size();
                            isActive = !parent.closed;

                            while ((bufferSize == 0) && isActive) {
                                buffer.wait();
                                bufferSize = buffer.size();
                                isActive = !parent.closed;
                            }

                            if (bufferSize > 0) {
                                events = new LoggingEvent[bufferSize + discardMap.size()];
                                buffer.toArray(events);

                                int index = bufferSize;
                                Collection<DiscardSummary> values = discardMap.values();
                                for (DiscardSummary value : values) {
                                    events[index++] = value.createEvent();
                                }

                                buffer.clear();
                                discardMap.clear();

                                buffer.notifyAll();
                            }
                        }
                        if (events != null) {
                            for (LoggingEvent event : events) {
                                synchronized (appenderPipeline) {
                                    appenderPipeline.appendLoopOnAppenders(event);
                                }
                            }
                        }
                    }
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
        }
  • Lock the buffer here, then call buffer.toArray(events) to get the LoggingEvent array, and then loop through and call appenderpipeline.appendloops appenders (event)

AppenderPipelineImpl

    public static class AppenderPipelineImpl implements AppenderPipeline {


        protected Vector<Appender> appenderList;

        public void addAppender(Appender newAppender) {
            if (newAppender == null) {
                return;
            }

            if (appenderList == null) {
                appenderList = new Vector<Appender>(1);
            }
            if (!appenderList.contains(newAppender)) {
                appenderList.addElement(newAppender);
            }
        }

        public int appendLoopOnAppenders(LoggingEvent event) {
            int size = 0;
            Appender appender;

            if (appenderList != null) {
                size = appenderList.size();
                for (int i = 0; i < size; i++) {
                    appender = appenderList.elementAt(i);
                    appender.doAppend(event);
                }
            }
            return size;
        }

        public Enumeration getAllAppenders() {
            if (appenderList == null) {
                return null;
            } else {
                return appenderList.elements();
            }
        }

        public Appender getAppender(String name) {
            if (appenderList == null || name == null) {
                return null;
            }

            int size = appenderList.size();
            Appender appender;
            for (int i = 0; i < size; i++) {
                appender = appenderList.elementAt(i);
                if (name.equals(appender.getName())) {
                    return appender;
                }
            }
            return null;
        }

        public boolean isAttached(Appender appender) {
            if (appenderList == null || appender == null) {
                return false;
            }

            int size = appenderList.size();
            Appender a;
            for (int i = 0; i < size; i++) {
                a = appenderList.elementAt(i);
                if (a == appender) {
                    return true;
                }
            }
            return false;
        }

        public void removeAllAppenders() {
            if (appenderList != null) {
                int len = appenderList.size();
                for (int i = 0; i < len; i++) {
                    Appender a = appenderList.elementAt(i);
                    a.close();
                }
                appenderList.removeAllElements();
                appenderList = null;
            }
        }

        public void removeAppender(Appender appender) {
            if (appender == null || appenderList == null) {
                return;
            }
            appenderList.removeElement(appender);
        }

        public void removeAppender(String name) {
            if (name == null || appenderList == null) {
                return;
            }
            int size = appenderList.size();
            for (int i = 0; i < size; i++) {
                if (name.equals((appenderList.elementAt(i)).getName())) {
                    appenderList.removeElementAt(i);
                    break;
                }
            }
        }

    }
  • The appendLoopOnAppenders method here performs doAppend operations on appender one by one.

Summary

The AsyncAppender implementation of rocketmq is not very efficient, locking buffer in large quantities for appending and appendloops.

doc