Talking about FileAppender of rocketmq

  mq

Order

This article mainly studies FileAppender of rocketmq.

WriterAppender

org/apache/rocketmq/logging/inner/LoggingBuilder.java

   public static class WriterAppender extends Appender {


        protected boolean immediateFlush = true;

        protected String encoding;


        protected QuietWriter qw;

        public WriterAppender() {

        }

        public void setImmediateFlush(boolean value) {
            immediateFlush = value;
        }


        public boolean getImmediateFlush() {
            return immediateFlush;
        }

        public void activateOptions() {
        }


        public void append(LoggingEvent event) {
            if (!checkEntryConditions()) {
                return;
            }
            subAppend(event);
        }

        protected boolean checkEntryConditions() {
            if (this.closed) {
                SysLogger.warn("Not allowed to write to a closed appender.");
                return false;
            }

            if (this.qw == null) {
                handleError("No output stream or file set for the appender named [" +
                    name + "].");
                return false;
            }

            if (this.layout == null) {
                handleError("No layout set for the appender named [" + name + "].");
                return false;
            }
            return true;
        }

        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            writeFooter();
            reset();
        }

        protected void closeWriter() {
            if (qw != null) {
                try {
                    qw.close();
                } catch (IOException e) {
                    handleError("Could not close " + qw, e, CODE_CLOSE_FAILURE);
                }
            }
        }

        protected OutputStreamWriter createWriter(OutputStream os) {
            OutputStreamWriter retval = null;

            String enc = getEncoding();
            if (enc != null) {
                try {
                    retval = new OutputStreamWriter(os, enc);
                } catch (IOException e) {
                    SysLogger.warn("Error initializing output writer.");
                    SysLogger.warn("Unsupported encoding?");
                }
            }
            if (retval == null) {
                retval = new OutputStreamWriter(os);
            }
            return retval;
        }

        public String getEncoding() {
            return encoding;
        }

        public void setEncoding(String value) {
            encoding = value;
        }


        public synchronized void setWriter(Writer writer) {
            reset();
            this.qw = new QuietWriter(writer, this);
            writeHeader();
        }

        protected void subAppend(LoggingEvent event) {
            this.qw.write(this.layout.format(event));

            if (layout.ignoresThrowable()) {
                String[] s = event.getThrowableStr();
                if (s != null) {
                    for (String s1 : s) {
                        this.qw.write(s1);
                        this.qw.write(LINE_SEP);
                    }
                }
            }

            if (shouldFlush(event)) {
                this.qw.flush();
            }
        }

        protected void reset() {
            closeWriter();
            this.qw = null;
        }

        protected void writeFooter() {
            if (layout != null) {
                String f = layout.getFooter();
                if (f != null && this.qw != null) {
                    this.qw.write(f);
                    this.qw.flush();
                }
            }
        }

        protected void writeHeader() {
            if (layout != null) {
                String h = layout.getHeader();
                if (h != null && this.qw != null) {
                    this.qw.write(h);
                }
            }
        }

        protected boolean shouldFlush(final LoggingEvent event) {
            return event != null && immediateFlush;
        }
    }
  • This interface defines writeHeader, writeFooter, append and other methods.
  • Append Method The parameter here is LoggingEvent, which is internally delegated to subAppend
  • The subAppend method calls layout to format the event, and then flushes the qw directly if flush is required.

FileAppender

org/apache/rocketmq/logging/inner/LoggingBuilder.java

  public static class FileAppender extends WriterAppender {

        protected boolean fileAppend = true;

        protected String fileName = null;

        protected boolean bufferedIO = false;

        protected int bufferSize = 8 * 1024;

        public FileAppender() {
        }

        public FileAppender(Layout layout, String filename, boolean append)
            throws IOException {
            this.layout = layout;
            this.setFile(filename, append, false, bufferSize);
        }

        public void setFile(String file) {
            fileName = file.trim();
        }

        public boolean getAppend() {
            return fileAppend;
        }

        public String getFile() {
            return fileName;
        }

        public void activateOptions() {
            if (fileName != null) {
                try {
                    setFile(fileName, fileAppend, bufferedIO, bufferSize);
                } catch (IOException e) {
                    handleError("setFile(" + fileName + "," + fileAppend + ") call failed.",
                        e, CODE_FILE_OPEN_FAILURE);
                }
            } else {
                SysLogger.warn("File option not set for appender [" + name + "].");
                SysLogger.warn("Are you using FileAppender instead of ConsoleAppender?");
            }
        }

        protected void closeFile() {
            if (this.qw != null) {
                try {
                    this.qw.close();
                } catch (IOException e) {
                    if (e instanceof InterruptedIOException) {
                        Thread.currentThread().interrupt();
                    }
                    SysLogger.error("Could not close " + qw, e);
                }
            }
        }

        public boolean getBufferedIO() {
            return this.bufferedIO;
        }

        public int getBufferSize() {
            return this.bufferSize;
        }

        public void setAppend(boolean flag) {
            fileAppend = flag;
        }

        public void setBufferedIO(boolean bufferedIO) {
            this.bufferedIO = bufferedIO;
            if (bufferedIO) {
                immediateFlush = false;
            }
        }

        public void setBufferSize(int bufferSize) {
            this.bufferSize = bufferSize;
        }

        public synchronized void setFile(String fileName, boolean append, boolean bufferedIO, int bufferSize)
            throws IOException {
            SysLogger.debug("setFile called: " + fileName + ", " + append);

            if (bufferedIO) {
                setImmediateFlush(false);
            }

            reset();
            FileOutputStream ostream;
            try {
                ostream = new FileOutputStream(fileName, append);
            } catch (FileNotFoundException ex) {
                String parentName = new File(fileName).getParent();
                if (parentName != null) {
                    File parentDir = new File(parentName);
                    if (!parentDir.exists() && parentDir.mkdirs()) {
                        ostream = new FileOutputStream(fileName, append);
                    } else {
                        throw ex;
                    }
                } else {
                    throw ex;
                }
            }
            Writer fw = createWriter(ostream);
            if (bufferedIO) {
                fw = new BufferedWriter(fw, bufferSize);
            }
            this.setQWForFiles(fw);
            this.fileName = fileName;
            this.fileAppend = append;
            this.bufferedIO = bufferedIO;
            this.bufferSize = bufferSize;
            writeHeader();
            SysLogger.debug("setFile ended");
        }

        protected void setQWForFiles(Writer writer) {
            this.qw = new QuietWriter(writer, this);
        }

        protected void reset() {
            closeFile();
            this.fileName = null;
            super.reset();
        }
    }
  • Write a file, which defines bufferSize as 8 * 1024. If bufferedIO is turned on, BufferedWriter is created
  • The setQWForFiles method creates a QuietWriter based on the specified writer.
  • After setting qw, setFile method writeHeader directly.

QuietWriter

org/apache/rocketmq/logging/inner/LoggingBuilder.java

    private static class QuietWriter extends FilterWriter {

        protected Appender appender;

        public QuietWriter(Writer writer, Appender appender) {
            super(writer);
            this.appender = appender;
        }

        public void write(String string) {
            if (string != null) {
                try {
                    out.write(string);
                } catch (Exception e) {
                    appender.handleError("Failed to write [" + string + "].", e,
                        Appender.CODE_WRITE_FAILURE);
                }
            }
        }

        public void flush() {
            try {
                out.flush();
            } catch (Exception e) {
                appender.handleError("Failed to flush writer,", e,
                    Appender.CODE_FLUSH_FAILURE);
            }
        }
    }
  • QuietWriter inherits from jdk’s FilterWriter, implements the write(String) method and rewrites the flush method.
  • FilterWriter implements write(int c), write(char cbuf[], int off, int len), write(String str, int off, int len) methods for filtering strings.

Summary

Rocketmq’s FileAppender is inherited from the WriterAppender and is formatted according to layout for LoggingEvent, then written to QuietWriter and finally written to the file.

doc