Kafka stream errorlog alarm instance

  kafka

KafkaAppender

log4j-core-2.7-sources.jar! /org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java

public void append(final LogEvent event) {
        if (event.getLoggerName().startsWith("org.apache.kafka")) {
            LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
        } else {
            try {
                final Layout<? extends Serializable> layout = getLayout();
                byte[] data;
                if (layout != null) {
                    if (layout instanceof SerializedLayout) {
                        final byte[] header = layout.getHeader();
                        final byte[] body = layout.toByteArray(event);
                        data = new byte[header.length + body.length];
                        System.arraycopy(header, 0, data, 0, header.length);
                        System.arraycopy(body, 0, data, header.length, body.length);
                    } else {
                        data = layout.toByteArray(event);
                    }
                } else {
                    data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
                }
                manager.send(data);
            } catch (final Exception e) {
                LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
                throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
            }
        }
    }

If JsonLayout is used, it takes data = layout.toByteArray(event); This step, while toByteArray is called
log4j-core-2.7-sources.jar! /org/apache/logging/log4j/core/layout/AbstractStringLayout.java

    /**
     * Formats the Log Event as a byte array.
     *
     * @param event The Log Event.
     * @return The formatted event as a byte array.
     */
    @Override
    public byte[] toByteArray(final LogEvent event) {
        return getBytes(toSerializable(event));
    }
    protected byte[] getBytes(final String s) {
        if (useCustomEncoding) { // rely on branch prediction to eliminate this check if false
            return StringEncoder.encodeSingleByteChars(s);
        }
        try { // LOG4J2-935: String.getBytes(String) gives better performance
            return s.getBytes(charsetName);
        } catch (final UnsupportedEncodingException e) {
            return s.getBytes(charset);
        }
    }

ToSerializable is a call to

log4j-core-2.7-sources.jar! /org/apache/logging/log4j/core/layout/AbstractJacksonLayout.java

    @Override
    public String toSerializable(final LogEvent event) {
        final StringBuilderWriter writer = new StringBuilderWriter();
        try {
            toSerializable(event, writer);
            return writer.toString();
        } catch (final IOException e) {
            // Should this be an ISE or IAE?
            LOGGER.error(e);
            return Strings.EMPTY;
        }
    }
    public void toSerializable(final LogEvent event, final Writer writer)
            throws JsonGenerationException, JsonMappingException, IOException {
        objectWriter.writeValue(writer, convertMutableToLog4jEvent(event));
        writer.write(eol);
        markEvent();
    }
    private static LogEvent convertMutableToLog4jEvent(final LogEvent event) {
        // TODO Jackson-based layouts have certain filters set up for Log4jLogEvent.
        // TODO Need to set up the same filters for MutableLogEvent but don't know how...
        // This is a workaround.
        return event instanceof MutableLogEvent
                ? ((MutableLogEvent) event).createMemento()
                : event;
    }

Json instance

{
  "timeMillis": 1508045813249,
  "thread": "http-nio-8080-exec-1",
  "level": "ERROR",
  "loggerName": "com.example.demo.controller.ErrorController",
  "message": "/ by zero",
  "thrown": {
    "commonElementCount": 0,
    "localizedMessage": "/ by zero",
    "message": "/ by zero",
    "name": "java.lang.ArithmeticException",
    "extendedStackTrace": [
      {
        "class": "com.example.demo.controller.ErrorController",
        "method": "logError",
        "file": "ErrorController.java",
        "line": 20,
        "exact": true,
        "location": "classes/",
        "version": "?"
      },
      {
        "class": "sun.reflect.NativeMethodAccessorImpl",
        "method": "invoke0",
        "file": "NativeMethodAccessorImpl.java",
        "line": -2,
        "exact": false,
        "location": "?",
        "version": "1.8.0_71"
      },
      {
        "class": "sun.reflect.NativeMethodAccessorImpl",
        "method": "invoke",
        "file": "NativeMethodAccessorImpl.java",
        "line": 62,
        "exact": false,
        "location": "?",
        "version": "1.8.0_71"
      },
      {
        "class": "sun.reflect.DelegatingMethodAccessorImpl",
        "method": "invoke",
        "file": "DelegatingMethodAccessorImpl.java",
        "line": 43,
        "exact": false,
        "location": "?",
        "version": "1.8.0_71"
      },
      {
        "class": "java.lang.reflect.Method",
        "method": "invoke",
        "file": "Method.java",
        "line": 497,
        "exact": false,
        "location": "?",
        "version": "1.8.0_71"
      },
      {
        "class": "org.springframework.web.method.support.InvocableHandlerMethod",
        "method": "doInvoke",
        "file": "InvocableHandlerMethod.java",
        "line": 205,
        "exact": true,
        "location": "spring-web-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.springframework.web.method.support.InvocableHandlerMethod",
        "method": "invokeForRequest",
        "file": "InvocableHandlerMethod.java",
        "line": 133,
        "exact": true,
        "location": "spring-web-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod",
        "method": "invokeAndHandle",
        "file": "ServletInvocableHandlerMethod.java",
        "line": 97,
        "exact": true,
        "location": "spring-webmvc-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter",
        "method": "invokeHandlerMethod",
        "file": "RequestMappingHandlerAdapter.java",
        "line": 827,
        "exact": true,
        "location": "spring-webmvc-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter",
        "method": "handleInternal",
        "file": "RequestMappingHandlerAdapter.java",
        "line": 738,
        "exact": true,
        "location": "spring-webmvc-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter",
        "method": "handle",
        "file": "AbstractHandlerMethodAdapter.java",
        "line": 85,
        "exact": true,
        "location": "spring-webmvc-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.springframework.web.servlet.DispatcherServlet",
        "method": "doDispatch",
        "file": "DispatcherServlet.java",
        "line": 967,
        "exact": true,
        "location": "spring-webmvc-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.springframework.web.servlet.DispatcherServlet",
        "method": "doService",
        "file": "DispatcherServlet.java",
        "line": 901,
        "exact": true,
        "location": "spring-webmvc-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.springframework.web.servlet.FrameworkServlet",
        "method": "processRequest",
        "file": "FrameworkServlet.java",
        "line": 970,
        "exact": true,
        "location": "spring-webmvc-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.springframework.web.servlet.FrameworkServlet",
        "method": "doGet",
        "file": "FrameworkServlet.java",
        "line": 861,
        "exact": true,
        "location": "spring-webmvc-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "javax.servlet.http.HttpServlet",
        "method": "service",
        "file": "HttpServlet.java",
        "line": 635,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.springframework.web.servlet.FrameworkServlet",
        "method": "service",
        "file": "FrameworkServlet.java",
        "line": 846,
        "exact": true,
        "location": "spring-webmvc-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "javax.servlet.http.HttpServlet",
        "method": "service",
        "file": "HttpServlet.java",
        "line": 742,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "internalDoFilter",
        "file": "ApplicationFilterChain.java",
        "line": 231,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "doFilter",
        "file": "ApplicationFilterChain.java",
        "line": 166,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.tomcat.websocket.server.WsFilter",
        "method": "doFilter",
        "file": "WsFilter.java",
        "line": 52,
        "exact": true,
        "location": "tomcat-embed-websocket-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "internalDoFilter",
        "file": "ApplicationFilterChain.java",
        "line": 193,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "doFilter",
        "file": "ApplicationFilterChain.java",
        "line": 166,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.springframework.boot.web.filter.ApplicationContextHeaderFilter",
        "method": "doFilterInternal",
        "file": "ApplicationContextHeaderFilter.java",
        "line": 55,
        "exact": true,
        "location": "spring-boot-1.5.7.RELEASE.jar",
        "version": "1.5.7.RELEASE"
      },
      {
        "class": "org.springframework.web.filter.OncePerRequestFilter",
        "method": "doFilter",
        "file": "OncePerRequestFilter.java",
        "line": 107,
        "exact": true,
        "location": "spring-web-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "internalDoFilter",
        "file": "ApplicationFilterChain.java",
        "line": 193,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "doFilter",
        "file": "ApplicationFilterChain.java",
        "line": 166,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.springframework.boot.actuate.trace.WebRequestTraceFilter",
        "method": "doFilterInternal",
        "file": "WebRequestTraceFilter.java",
        "line": 110,
        "exact": true,
        "location": "spring-boot-actuator-1.5.7.RELEASE.jar",
        "version": "1.5.7.RELEASE"
      },
      {
        "class": "org.springframework.web.filter.OncePerRequestFilter",
        "method": "doFilter",
        "file": "OncePerRequestFilter.java",
        "line": 107,
        "exact": true,
        "location": "spring-web-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "internalDoFilter",
        "file": "ApplicationFilterChain.java",
        "line": 193,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "doFilter",
        "file": "ApplicationFilterChain.java",
        "line": 166,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.springframework.web.filter.RequestContextFilter",
        "method": "doFilterInternal",
        "file": "RequestContextFilter.java",
        "line": 99,
        "exact": true,
        "location": "spring-web-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.springframework.web.filter.OncePerRequestFilter",
        "method": "doFilter",
        "file": "OncePerRequestFilter.java",
        "line": 107,
        "exact": true,
        "location": "spring-web-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "internalDoFilter",
        "file": "ApplicationFilterChain.java",
        "line": 193,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "doFilter",
        "file": "ApplicationFilterChain.java",
        "line": 166,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.springframework.web.filter.HttpPutFormContentFilter",
        "method": "doFilterInternal",
        "file": "HttpPutFormContentFilter.java",
        "line": 108,
        "exact": true,
        "location": "spring-web-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.springframework.web.filter.OncePerRequestFilter",
        "method": "doFilter",
        "file": "OncePerRequestFilter.java",
        "line": 107,
        "exact": true,
        "location": "spring-web-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "internalDoFilter",
        "file": "ApplicationFilterChain.java",
        "line": 193,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "doFilter",
        "file": "ApplicationFilterChain.java",
        "line": 166,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.springframework.web.filter.HiddenHttpMethodFilter",
        "method": "doFilterInternal",
        "file": "HiddenHttpMethodFilter.java",
        "line": 81,
        "exact": true,
        "location": "spring-web-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.springframework.web.filter.OncePerRequestFilter",
        "method": "doFilter",
        "file": "OncePerRequestFilter.java",
        "line": 107,
        "exact": true,
        "location": "spring-web-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "internalDoFilter",
        "file": "ApplicationFilterChain.java",
        "line": 193,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "doFilter",
        "file": "ApplicationFilterChain.java",
        "line": 166,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.springframework.web.filter.CharacterEncodingFilter",
        "method": "doFilterInternal",
        "file": "CharacterEncodingFilter.java",
        "line": 197,
        "exact": true,
        "location": "spring-web-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.springframework.web.filter.OncePerRequestFilter",
        "method": "doFilter",
        "file": "OncePerRequestFilter.java",
        "line": 107,
        "exact": true,
        "location": "spring-web-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "internalDoFilter",
        "file": "ApplicationFilterChain.java",
        "line": 193,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "doFilter",
        "file": "ApplicationFilterChain.java",
        "line": 166,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.springframework.boot.actuate.autoconfigure.MetricsFilter",
        "method": "doFilterInternal",
        "file": "MetricsFilter.java",
        "line": 106,
        "exact": true,
        "location": "spring-boot-actuator-1.5.7.RELEASE.jar",
        "version": "1.5.7.RELEASE"
      },
      {
        "class": "org.springframework.web.filter.OncePerRequestFilter",
        "method": "doFilter",
        "file": "OncePerRequestFilter.java",
        "line": 107,
        "exact": true,
        "location": "spring-web-4.3.11.RELEASE.jar",
        "version": "4.3.11.RELEASE"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "internalDoFilter",
        "file": "ApplicationFilterChain.java",
        "line": 193,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.ApplicationFilterChain",
        "method": "doFilter",
        "file": "ApplicationFilterChain.java",
        "line": 166,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.StandardWrapperValve",
        "method": "invoke",
        "file": "StandardWrapperValve.java",
        "line": 198,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.StandardContextValve",
        "method": "invoke",
        "file": "StandardContextValve.java",
        "line": 96,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.authenticator.AuthenticatorBase",
        "method": "invoke",
        "file": "AuthenticatorBase.java",
        "line": 478,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.StandardHostValve",
        "method": "invoke",
        "file": "StandardHostValve.java",
        "line": 140,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.valves.ErrorReportValve",
        "method": "invoke",
        "file": "ErrorReportValve.java",
        "line": 80,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.core.StandardEngineValve",
        "method": "invoke",
        "file": "StandardEngineValve.java",
        "line": 87,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.catalina.connector.CoyoteAdapter",
        "method": "service",
        "file": "CoyoteAdapter.java",
        "line": 342,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.coyote.http11.Http11Processor",
        "method": "service",
        "file": "Http11Processor.java",
        "line": 799,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.coyote.AbstractProcessorLight",
        "method": "process",
        "file": "AbstractProcessorLight.java",
        "line": 66,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.coyote.AbstractProtocol$ConnectionHandler",
        "method": "process",
        "file": "AbstractProtocol.java",
        "line": 868,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.tomcat.util.net.NioEndpoint$SocketProcessor",
        "method": "doRun",
        "file": "NioEndpoint.java",
        "line": 1457,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "org.apache.tomcat.util.net.SocketProcessorBase",
        "method": "run",
        "file": "SocketProcessorBase.java",
        "line": 49,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor",
        "method": "runWorker",
        "file": "ThreadPoolExecutor.java",
        "line": 1142,
        "exact": true,
        "location": "?",
        "version": "1.8.0_71"
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor$Worker",
        "method": "run",
        "file": "ThreadPoolExecutor.java",
        "line": 617,
        "exact": true,
        "location": "?",
        "version": "1.8.0_71"
      },
      {
        "class": "org.apache.tomcat.util.threads.TaskThread$WrappingRunnable",
        "method": "run",
        "file": "TaskThread.java",
        "line": 61,
        "exact": true,
        "location": "tomcat-embed-core-8.5.20.jar",
        "version": "8.5.20"
      },
      {
        "class": "java.lang.Thread",
        "method": "run",
        "file": "Thread.java",
        "line": 745,
        "exact": true,
        "location": "?",
        "version": "1.8.0_71"
      }
    ]
  },
  "endOfBatch": false,
  "loggerFqcn": "org.apache.logging.slf4j.Log4jLogger",
  "threadId": 26,
  "threadPriority": 5
}

SerializedLayout or PatternLayout can be configured, but the compatibility between the two is not as good as json. The former is based on jdk serialization and is deprecated in version 2.9 of log4j and is not recommended. However, Patternlayout is limited by Layout. If different applications define different PatternLayout, parsing the old program will have to be modified accordingly.
If you think json transmission text is too long, you can compress it or expand and define a layout based on pb.

Parsing json

When parsing json at kafka’s consumer end, when parsing with ObjectMapper,

Log4jLogEvent logEvent = mapper.readValue(value,Log4jLogEvent.class);

Then an error is reported.

com.fasterxml.jackson.databind.JsonMappingException: Can not find a deserializer for non-concrete Collection type [collection type; class org.apache.logging.log4j.ThreadContext$ContextStack, contains [simple type, class java.lang.String]]
 at [Source: {"timeMillis":1508047984078,"thread":"http-nio-8080-exec-6","level":"ERROR","loggerName":"com.example.demo.controller.ErrorController","message":"/ by zero","thrown":{"commonElementCount":0,"localizedMessage":"/ by zero","message":"/ by zero","name":"java.lang.ArithmeticException","extendedStackTrace":[{"class":"com.example.demo.controller.ErrorController","method":"logError","file":"ErrorController.java","line":20,"exact":true,"location":"classes/","version":"?"},{"class":"sun.reflect.NativeMethodAccessorImpl","method":"invoke0","file":"NativeMethodAccessorImpl.java","line":-2,"exact":false,"location":"?","version":"1.8.0_71"},{"class":"sun.reflect.NativeMethodAccessorImpl","method":"invoke","file":"NativeMethodAccessorImpl.java","line":62,"exact":false,"location":"?","version":"1.8.0_71"},{"class":"sun.reflect.DelegatingMethodAccessorImpl","method":"invoke","file":"DelegatingMethodAccessorImpl.java","line":43,"exact":false,"location":"?","version":"1.8.0_71"},{"class":"java.lang.reflect.Method","method":"invoke","file":"Method.java","line":497,"exact":false,"location":"?","version":"1.8.0_71"},{"class":"org.springframework.web.method.support.InvocableHandlerMethod","method":"doInvoke","file":"InvocableHandlerMethod.java","line":205,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.method.support.InvocableHandlerMethod","method":"invokeForRequest","file":"InvocableHandlerMethod.java","line":133,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod","method":"invokeAndHandle","file":"ServletInvocableHandlerMethod.java","line":97,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter","method":"invokeHandlerMethod","file":"RequestMappingHandlerAdapter.java","line":827,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter","method":"handleInternal","file":"RequestMappingHandlerAdapter.java","line":738,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter","method":"handle","file":"AbstractHandlerMethodAdapter.java","line":85,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.DispatcherServlet","method":"doDispatch","file":"DispatcherServlet.java","line":967,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.DispatcherServlet","method":"doService","file":"DispatcherServlet.java","line":901,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.FrameworkServlet","method":"processRequest","file":"FrameworkServlet.java","line":970,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.FrameworkServlet","method":"doGet","file":"FrameworkServlet.java","line":861,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"javax.servlet.http.HttpServlet","method":"service","file":"HttpServlet.java","line":635,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.web.servlet.FrameworkServlet","method":"service","file":"FrameworkServlet.java","line":846,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"javax.servlet.http.HttpServlet","method":"service","file":"HttpServlet.java","line":742,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":231,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.tomcat.websocket.server.WsFilter","method":"doFilter","file":"WsFilter.java","line":52,"exact":true,"location":"tomcat-embed-websocket-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.boot.web.filter.ApplicationContextHeaderFilter","method":"doFilterInternal","file":"ApplicationContextHeaderFilter.java","line":55,"exact":true,"location":"spring-boot-1.5.7.RELEASE.jar","version":"1.5.7.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.boot.actuate.trace.WebRequestTraceFilter","method":"doFilterInternal","file":"WebRequestTraceFilter.java","line":110,"exact":true,"location":"spring-boot-actuator-1.5.7.RELEASE.jar","version":"1.5.7.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.web.filter.RequestContextFilter","method":"doFilterInternal","file":"RequestContextFilter.java","line":99,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.web.filter.HttpPutFormContentFilter","method":"doFilterInternal","file":"HttpPutFormContentFilter.java","line":108,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.web.filter.HiddenHttpMethodFilter","method":"doFilterInternal","file":"HiddenHttpMethodFilter.java","line":81,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.web.filter.CharacterEncodingFilter","method":"doFilterInternal","file":"CharacterEncodingFilter.java","line":197,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.boot.actuate.autoconfigure.MetricsFilter","method":"doFilterInternal","file":"MetricsFilter.java","line":106,"exact":true,"location":"spring-boot-actuator-1.5.7.RELEASE.jar","version":"1.5.7.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.StandardWrapperValve","method":"invoke","file":"StandardWrapperValve.java","line":198,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.StandardContextValve","method":"invoke","file":"StandardContextValve.java","line":96,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.authenticator.AuthenticatorBase","method":"invoke","file":"AuthenticatorBase.java","line":478,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.StandardHostValve","method":"invoke","file":"StandardHostValve.java","line":140,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.valves.ErrorReportValve","method":"invoke","file":"ErrorReportValve.java","line":80,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.StandardEngineValve","method":"invoke","file":"StandardEngineValve.java","line":87,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.connector.CoyoteAdapter","method":"service","file":"CoyoteAdapter.java","line":342,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.coyote.http11.Http11Processor","method":"service","file":"Http11Processor.java","line":799,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.coyote.AbstractProcessorLight","method":"process","file":"AbstractProcessorLight.java","line":66,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.coyote.AbstractProtocol$ConnectionHandler","method":"process","file":"AbstractProtocol.java","line":868,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.tomcat.util.net.NioEndpoint$SocketProcessor","method":"doRun","file":"NioEndpoint.java","line":1457,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.tomcat.util.net.SocketProcessorBase","method":"run","file":"SocketProcessorBase.java","line":49,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","file":"ThreadPoolExecutor.java","line":1142,"exact":true,"location":"?","version":"1.8.0_71"},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","file":"ThreadPoolExecutor.java","line":617,"exact":true,"location":"?","version":"1.8.0_71"},{"class":"org.apache.tomcat.util.threads.TaskThread$WrappingRunnable","method":"run","file":"TaskThread.java","line":61,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"java.lang.Thread","method":"run","file":"Thread.java","line":745,"exact":true,"location":"?","version":"1.8.0_71"}]},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":31,"threadPriority":5,"source":{"class":"com.example.demo.controller.ErrorController","method":"logError","file":"ErrorController.java","line":22}}
; line: 1, column: 1]
    at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:305)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:268)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:244)
    at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:142)
    at com.fasterxml.jackson.databind.DeserializationContext.findNonContextualValueDeserializer(DeserializationContext.java:466)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.resolve(BeanDeserializerBase.java:479)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:293)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:244)
    at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:142)
    at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:476)
    at com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3915)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3810)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2858)
    at com.example.demo.KafkaLog4jDemoApplicationTests.lambda$consumeErrorLog$0(KafkaLog4jDemoApplicationTests.java:54)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at com.example.demo.KafkaLog4jDemoApplicationTests.consumeErrorLog(KafkaLog4jDemoApplicationTests.java:47)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.IllegalArgumentException: Can not find a deserializer for non-concrete Collection type [collection type; class org.apache.logging.log4j.ThreadContext$ContextStack, contains [simple type, class java.lang.String]]
    at com.fasterxml.jackson.databind.deser.BasicDeserializerFactory.createCollectionDeserializer(BasicDeserializerFactory.java:1021)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:391)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:349)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:264)
    ... 46 more

Therefore, it is suggested to customize a bean to parse the corresponding json, or parse it based on JSONObject.

Log4jLogEventJson

public class Log4jLogEventJson {

    private long timeMillis;

    private String thread;

    private String level;

    private String loggerName;

    private String message;

    private Thrown thrown;

    static class Thrown {
        private int commonElementCount;

        private String localizedMessage;

        private String message;

        private String name;

        @JSONField(name = "extendedStackTrace")
        private List<ExtendedStackTrace> extendedStackTraces;

        private boolean endOfBatch;

        private String loggerFqcn;

        private JSONObject contextMap;

        private int threadid;

        private int threadPriority;

        private Source source;

        public int getCommonElementCount() {
            return commonElementCount;
        }

        public void setCommonElementCount(int commonElementCount) {
            this.commonElementCount = commonElementCount;
        }

        public String getLocalizedMessage() {
            return localizedMessage;
        }

        public void setLocalizedMessage(String localizedMessage) {
            this.localizedMessage = localizedMessage;
        }

        public String getMessage() {
            return message;
        }

        public void setMessage(String message) {
            this.message = message;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public List<ExtendedStackTrace> getExtendedStackTraces() {
            return extendedStackTraces;
        }

        public void setExtendedStackTraces(List<ExtendedStackTrace> extendedStackTraces) {
            this.extendedStackTraces = extendedStackTraces;
        }

        public boolean isEndOfBatch() {
            return endOfBatch;
        }

        public void setEndOfBatch(boolean endOfBatch) {
            this.endOfBatch = endOfBatch;
        }

        public String getLoggerFqcn() {
            return loggerFqcn;
        }

        public void setLoggerFqcn(String loggerFqcn) {
            this.loggerFqcn = loggerFqcn;
        }

        public JSONObject getContextMap() {
            return contextMap;
        }

        public void setContextMap(JSONObject contextMap) {
            this.contextMap = contextMap;
        }

        public int getThreadid() {
            return threadid;
        }

        public void setThreadid(int threadid) {
            this.threadid = threadid;
        }

        public int getThreadPriority() {
            return threadPriority;
        }

        public void setThreadPriority(int threadPriority) {
            this.threadPriority = threadPriority;
        }

        public Source getSource() {
            return source;
        }

        public void setSource(Source source) {
            this.source = source;
        }
    }

    static class ExtendedStackTrace {

        @JSONField(name ="class")
        private String clz;

        private String method;

        private String file;

        private int line;

        private boolean exact;

        private String location;

        private String version;

        public String getClz() {
            return clz;
        }

        public void setClz(String clz) {
            this.clz = clz;
        }

        public String getMethod() {
            return method;
        }

        public void setMethod(String method) {
            this.method = method;
        }

        public String getFile() {
            return file;
        }

        public void setFile(String file) {
            this.file = file;
        }

        public int getLine() {
            return line;
        }

        public void setLine(int line) {
            this.line = line;
        }

        public boolean isExact() {
            return exact;
        }

        public void setExact(boolean exact) {
            this.exact = exact;
        }

        public String getLocation() {
            return location;
        }

        public void setLocation(String location) {
            this.location = location;
        }

        public String getVersion() {
            return version;
        }

        public void setVersion(String version) {
            this.version = version;
        }
    }

    static class Source {
        @JSONField(name = "class")
        private String clz;

        private String method;

        private String file;

        private int line;

        public String getClz() {
            return clz;
        }

        public void setClz(String clz) {
            this.clz = clz;
        }

        public String getMethod() {
            return method;
        }

        public void setMethod(String method) {
            this.method = method;
        }

        public String getFile() {
            return file;
        }

        public void setFile(String file) {
            this.file = file;
        }

        public int getLine() {
            return line;
        }

        public void setLine(int line) {
            this.line = line;
        }
    }

    public String stacktrace(){
        if(thrown == null){
            return "";
        }
        StringBuilder builder = new StringBuilder();
        builder.append(thrown.getName());
        builder.append(": ");
        builder.append(thrown.getMessage());
        builder.append("\n");
        if(thrown.getExtendedStackTraces() == null){
            return builder.toString();
        }
        thrown.getExtendedStackTraces().stream().forEach(e -> {
            builder.append(" at ");
            builder.append(e.getClz());
            builder.append(".");
            builder.append(e.getMethod());
            builder.append("(");
            builder.append(e.getFile());
            builder.append(":");
            builder.append(e.getLine());
            builder.append(")");
            if(e.isExact()){
                builder.append(" [");
            }else{
                builder.append(" ~[");
            }
            builder.append(e.getLocation());
            builder.append(":");
            builder.append(e.getVersion());
            builder.append("]");
            builder.append("\n");
        });
        return builder.toString();
    }

    public long getTimeMillis() {
        return timeMillis;
    }

    public void setTimeMillis(long timeMillis) {
        this.timeMillis = timeMillis;
    }

    public String getThread() {
        return thread;
    }

    public void setThread(String thread) {
        this.thread = thread;
    }

    public String getLevel() {
        return level;
    }

    public void setLevel(String level) {
        this.level = level;
    }

    public String getLoggerName() {
        return loggerName;
    }

    public void setLoggerName(String loggerName) {
        this.loggerName = loggerName;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Thrown getThrown() {
        return thrown;
    }

    public void setThrown(Thrown thrown) {
        this.thrown = thrown;
    }

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }
}

Flow polymerization

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> source = builder.stream("error-log");
        KStream<String, Log4jLogEventJson> beanStream = source.map(new KeyValueMapper<String, String, KeyValue<String, Log4jLogEventJson>>() {
            @Override
            public KeyValue<String, Log4jLogEventJson> apply(String key, String value) {
                Log4jLogEventJson bean = JSON.parseObject(value,Log4jLogEventJson.class);
                return new KeyValue<>(bean.getLoggerName(), bean);
            }
        });

        GenericJsonSerde<ErrorLogStats> statsGenericJsonSerde = new GenericJsonSerde<>(ErrorLogStats.class);
        GenericJsonSerde<Log4jLogEventJson> log4jLogEventJsonGenericJsonSerde = new GenericJsonSerde<>(Log4jLogEventJson.class);

        beanStream.groupByKey(Serdes.String(),log4jLogEventJsonGenericJsonSerde)
                .aggregate(ErrorLogStats::new,
                        (key, value, stats) -> {
                            System.out.println("key:"+key.getClass());
                            System.out.println("value:"+value.getClass());
                            return stats.add(value);
                        },
                        TimeWindows.of(10000).advanceBy(10000),
                        statsGenericJsonSerde,
                        "aggregate")
                .foreach((k,v) -> {
                    LOGGER.info("key:{},start:{},end:{},value:{}",k.key(),k.window().start(),k.window().end(),v.errors.size());
                    //这里进行报警即可
                });

Output log instance

2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:767)  - stream-thread [StreamThread-1] Committing all tasks because the commit interval 10000ms has elapsed
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 0_1
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 1_0
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 0_2
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 1_1
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 1_2
key:class java.lang.String
value:class com.example.demo.Log4jLogEventJson
key:class java.lang.String
value:class com.example.demo.Log4jLogEventJson
key:class java.lang.String
value:class com.example.demo.Log4jLogEventJson
key:class java.lang.String
value:class com.example.demo.Log4jLogEventJson
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:767)  - stream-thread [StreamThread-1] Committing all tasks because the commit interval 10000ms has elapsed
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 0_1
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 1_0
2017-10-15 17:44:29 [StreamThread-1] (ErrorLogStreamTest.java:90)  - key:com.example.demo.controller.DemoController,start:1508060660000,end:1508060670000,value:3
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 0_2
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 1_1
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 1_2
2017-10-15 17:44:29 [StreamThread-1] (ErrorLogStreamTest.java:90)  - key:com.example.demo.controller.ErrorController,start:1508060660000,end:1508060670000,value:1

Summary

Since kafka had stream, I feel that I can reduce a lot of technology stack. For example, I can directly stream on kakfa without learning storm or spark. How to distribute kafka stream will be studied later.

doc