Talk about kafka consumer offset lag increase exception

  kafka

Order

This article mainly analyzes a kafka consumer offset lag increasing anomaly.

Check consumer Consumption

Group       Topic        Pid    Offset     logSize         Lag             Owner
demo-group demo-topic     0    9678273         9858394         180121          xxx-service-dpqpc-1510557406684-e2171bd6-0
demo-group demo-topic     1    9689443         9873522         184079          xxx-service-dpqpc-1510557406684-e2171bd6-1
demo-group demo-topic     2    9676875         9855874         178999          xxx-service-q7vch-1510557399475-b1d7d22c-0
demo-group demo-topic     3    9683393         9864518         181125          xxx-service-q7vch-1510557399475-b1d7d22c-1

It is found that the offset of consumers is too far from logSize, and the lag value is over 10w.

Normal situation

Group           Topic         Pid   Offset          logSize         Lag             Owner
demo-group      demo-topic    0     9860587         9860587         0               demo-group_tomcat2-1512984437115-fc1ee57b-0
demo-group      demo-topic    1     9875814         9875814         0               demo-group_tomcat2-1512984437115-fc1ee57b-0
demo-group      demo-topic    2     9858213         9858214         1               demo-group_tomcat2-1512984437115-fc1ee57b-1
demo-group      demo-topic    3     9866744         9866744         0               demo-group_tomcat2-1512984437115-fc1ee57b-2

It is normal that lag gap like this is relatively small.

View partition of topic

    Topic:demo-topic    PartitionCount:4    ReplicationFactor:2 Configs:
    Topic: demo-topic   Partition: 0    Leader: 3   Replicas: 3,4   Isr: 4,3
    Topic: demo-topic   Partition: 1    Leader: 4   Replicas: 4,1   Isr: 1,4
    Topic: demo-topic   Partition: 2    Leader: 1   Replicas: 1,2   Isr: 1,2
    Topic: demo-topic   Partition: 3    Leader: 2   Replicas: 2,3   Isr: 2,3

Topic is divided into 4 zones, so it is normal for 4 consumers to consume.
The problem may be that consumers are consuming too slowly, or consumers are consuming abnormally.

Investigation

jstack -l pid

2017-12-27 04:06:23
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode):

"Attach Listener" #12286 daemon prio=9 os_prio=0 tid=0x00007f2920001000 nid=0x3087 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"ConsumerFetcherThread-xxx-service-dpqpc-1510557406684-e2171bd6-0-3" #9263 prio=5 os_prio=0 tid=0x00007f287400d800 nid=0x2440 waiting on condition [0x00007f285e6eb000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007048874b0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"ConsumerFetcherThread-xxx-service-dpqpc-1510557406684-e2171bd6-0-4" #9262 prio=5 os_prio=0 tid=0x00007f28740c2800 nid=0x243f waiting on condition [0x00007f291950d000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007048086d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"xxx-service-dpqpc-1510557406684-e2171bd6-leader-finder-thread" #9261 prio=5 os_prio=0 tid=0x0000000002302800 nid=0x243e waiting on condition [0x00007f28bd1df000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000703d06518> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:61)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"consume-2" #62 prio=5 os_prio=0 tid=0x00007f28f8e86000 nid=0x51 waiting on condition [0x00007f28bd3e1000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000070440cd38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"consume-1" #61 prio=5 os_prio=0 tid=0x00007f28f8e84800 nid=0x50 waiting on condition [0x00007f28bd4e2000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000070440cd38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"xxx-service-dpqpc-1510557406684-e2171bd6_watcher_executor" #59 prio=5 os_prio=0 tid=0x00007f28fb685800 nid=0x4e waiting on condition [0x00007f28bd8e4000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007048878d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:544)

Consumer-1 and Consumer-2 above are the specific business threads that consume kafka.

error log

2017-12-16 12:53:34.257  INFO 7 --- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector    : [xxx-service-q7vch-1510557399475-b1d7d22c], begin rebalancing consumer xxx-service-q7vch-1510557399475-b1d7d22c try #1
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] kafka.consumer.ConsumerFetcherManager    : [ConsumerFetcherManager-1510557399586] Stopping leader finder thread
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] kafka.consumer.ConsumerFetcherManager    : [ConsumerFetcherManager-1510557399586] Stopping all fetchers
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] kafka.consumer.ConsumerFetcherManager    : [ConsumerFetcherManager-1510557399586] All connections stopped
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector    : [xxx-service-q7vch-1510557399475-b1d7d22c], Cleared all relevant queues for this fetcher
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector    : [xxx-service-q7vch-1510557399475-b1d7d22c], Cleared the data chunks in all the consumer message iterators
2017-12-16 12:53:34.348  INFO 7 --- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector    : [xxx-service-q7vch-1510557399475-b1d7d22c], Committing all offsets after clearing the fetcher queues

The logs show little sign of news consumption, but lag does have so many.

At first, I looked at the exception log, found this, and added jstack above. I saw that ConsumerFetcherThread has been blocking in PartitionTopicInfo.enqueue, which is a bit suspicious of deadlock or blocking caused by rebalance. Before jstack forgot to add -l and could not see the deadlock information. I checked online and sawConsumerFetcherThread deadlock?There are similar problems mentioned, but it is 14 years of posts, kafka version kafka0.8.2.2 should have been fixed. Then I saw

The fetchers are blocked on the queue since it is full, is your consumer
iterator stopped and hence not getting more data from it?

A little bit began to doubt whether it was his own business thread that did not catch the exception and hung up, so there was no consumption. Restart the next program, look at log, brush the consumption message. Let’s compare jstack again.

"ConsumerFetcherThread-xxx-376jt-1514353818187-b37be1c0-0-3" #81 prio=5 os_prio=0 tid=0x00007fe39c004000 nid=0x63 waiting on condition [0x00007fe3931f4000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007822ac4e0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"ConsumerFetcherThread-xxx-376jt-1514353818187-b37be1c0-0-4" #80 prio=5 os_prio=0 tid=0x00007fe39c003000 nid=0x62 waiting on condition [0x00007fe3926ea000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007821c9a68> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"xxx-376jt-1514353818187-b37be1c0-leader-finder-thread" #79 prio=5 os_prio=0 tid=0x0000000001f5a000 nid=0x61 waiting on condition [0x00007fe3920e7000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000782154c30> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:61)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"consume-2" #62 prio=5 os_prio=0 tid=0x00007fe48da13800 nid=0x51 runnable [0x00007fe392ff1000]
   java.lang.Thread.State: RUNNABLE
    //......
    at org.springframework.data.mongodb.core.MongoTemplate.executeFindMultiInternal(MongoTemplate.java:1948)
    at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1768)
    at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1751)
    at org.springframework.data.mongodb.core.MongoTemplate.find(MongoTemplate.java:625)
    at org.springframework.data.mongodb.core.MongoTemplate.findOne(MongoTemplate.java:590)
    at org.springframework.data.mongodb.core.MongoTemplate.findOne(MongoTemplate.java:582)
    at com.xxx.consumer.KafkaStreamProcessor.process(KafkaStreamProcessor.java:37)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:115)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"consume-1" #61 prio=5 os_prio=0 tid=0x00007fe48e310000 nid=0x50 runnable [0x00007fe3930f2000]
   java.lang.Thread.State: RUNNABLE
    //...
    at org.springframework.data.mongodb.core.MongoTemplate$12.doInCollection(MongoTemplate.java:1157)
    at org.springframework.data.mongodb.core.MongoTemplate$12.doInCollection(MongoTemplate.java:1137)
    at org.springframework.data.mongodb.core.MongoTemplate.execute(MongoTemplate.java:463)
    at org.springframework.data.mongodb.core.MongoTemplate.doUpdate(MongoTemplate.java:1137)
    at org.springframework.data.mongodb.core.MongoTemplate.upsert(MongoTemplate.java:1099)
    at com.xxx.consumer.KafkaStreamProcessor.process(KafkaStreamProcessor.java:37)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:115)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"xxx-376jt-1514353818187-b37be1c0_watcher_executor" #59 prio=5 os_prio=0 tid=0x00007fe48fe7c000 nid=0x4e waiting on condition [0x00007fe3934f5000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000782155248> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:544)

By way of comparison, it is found that the original suspect ConsumerFetcherThread still exists after the restart of PartitionTopicInfo.enqueue, so it may be normal.

By comparing Consumer-1 with Consumer-2, we found the problem. The thread stack with the problem did not see its own business method, but found the business method after restarting. Therefore, the cause of the problem gradually became clear, because there was no catch exception.

Business method

The original business method is roughly as follows

@Async
public void process(KafkaStream<byte[], byte[]> stream){
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while (it.hasNext()) {
        System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message()));
    }
}

There is a question here that if there async catch for thread exception, in theory, the id of the new thread should be increased. however, experiments show that the thread id will not change after the exception is thrown.

spring-core-4.3.13.RELEASE-sources.jar! /org/springframework/util/CustomizableThreadCreator.java

public class CustomizableThreadCreator implements Serializable {
    private final AtomicInteger threadCount = new AtomicInteger(0);
    /**
     * Template method for the creation of a new {@link Thread}.
     * <p>The default implementation creates a new Thread for the given
     * {@link Runnable}, applying an appropriate thread name.
     * @param runnable the Runnable to execute
     * @see #nextThreadName()
     */
    public Thread createThread(Runnable runnable) {
        Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
        thread.setPriority(getThreadPriority());
        thread.setDaemon(isDaemon());
        return thread;
    }

    /**
     * Return the thread name to use for a newly created {@link Thread}.
     * <p>The default implementation returns the specified thread name prefix
     * with an increasing thread count appended: e.g. "SimpleAsyncTaskExecutor-0".
     * @see #getThreadNamePrefix()
     */
    protected String nextThreadName() {
        return getThreadNamePrefix() + this.threadCount.incrementAndGet();
    }
    //...
}

The threadCount here does not see a call to the decrement method, so if the thread hangs abnormally, the newly added thread id should theoretically be incremented.

/Library/Java/JavaVirtualMachines/jdk1.8.0_71.jdk/Contents/Home/src.zip! /java/util/concurrent/ThreadPoolExecutor.java

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }    

Debugging found that completedAbruptly is false, so the business thread does not throw exceptions. Isn’t this a contradiction? Suddenly remind of async annotation intercept, gradually suddenly enlightened.

AsyncExecutionInterceptor

spring-aop-4.3.13.RELEASE-sources.jar! /org/springframework/aop/interceptor/AsyncExecutionInterceptor.java

@Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }

        Callable<Object> task = new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                try {
                    Object result = invocation.proceed();
                    if (result instanceof Future) {
                        return ((Future<?>) result).get();
                    }
                }
                catch (ExecutionException ex) {
                    handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
                }
                catch (Throwable ex) {
                    handleError(ex, userDeclaredMethod, invocation.getArguments());
                }
                return null;
            }
        };

        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

Async comments are intercepted by AsyncExecutionInterceptor, and then wrapped in a layer to handle exceptions, so there are no exceptions in the thread pool.

Summary

  • When using kafka consumption data, the lag value of offset needs to be monitored in real time to confirm whether the consumption speed is ok
  • Iterator consuming thread calling KafkaStream must catch the exception, otherwise it throws the exception and stops consuming.

doc