Log4j2 Output to kafka

  kafka, log4j2, springboot

maven

<!-- log4j2 kafka appender -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.1</version>
            <exclusions> <!-- exclude掉过时的log依赖 -->
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.4</version>
        </dependency>
        <!-- log4j2 async -->
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.0</version>
        </dependency>

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="warn" name="MyApp" packages="">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT" ignoreExceptions="false">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] (%F:%L)  - %m%n" />
        </Console>
        <Kafka name="KafkaAppender" topic="error-log">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] (%F:%L)  - %m%n" />
            <Property name="bootstrap.servers">localhost:9092</Property>
            <Property name="timeout.ms">10000</Property>
        </Kafka>
    </Appenders>
    <Loggers>
        <AsyncLogger name="async">
            <AppenderRef ref="KafkaAppender" />
        </AsyncLogger>
        <Root level="info">
            <AppenderRef ref="Console" />
            <AppenderRef ref="KafkaAppender" level="error" />
        </Root>
    </Loggers>
</Configuration>

test

@Test
    public void consumeErrorLog(){
        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("auto.offset.reset","smallest");
        props.put("group.id", "testgroup6");
        props.put("enable.auto.commit", "true");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "2000"); // 从200修改成2000 太短有rebalance错误
        props.put("auto.commit.interval.ms", "1000");
        ConsumerConfig consumerConfig =  new kafka.consumer.ConsumerConfig(props);
        ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        int localConsumerCount = 1;
        topicCountMap.put(TOPIC, localConsumerCount);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
        streams.stream().forEach(stream -> {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println(new String(it.next().message()));
            }
        });
    }

docs

How to use Kafka to collect and store these Log4j logs in real time? One solution is to use other components (such as Flume or a self-developed program) to monitor these log files in real time and then send them to Kafka. However, another convenient scheme is to use Log4jAppender provided by Kafka, and make corresponding configuration in Log4j configuration file, thus completing the real-time transmission of Log4j generated logs to Kafka.