[case7]Flux OOM instance

  reactor

Order

This paper mainly studies the OOM generation scenarios of Flux

FluxSink.OverflowStrategy

reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/FluxSink.java

    /**
     * Enumeration for backpressure handling.
     */
    enum OverflowStrategy {
        /**
         * Completely ignore downstream backpressure requests.
         * <p>
         * This may yield {@link IllegalStateException} when queues get full downstream.
         */
        IGNORE,
        /**
         * Signal an {@link IllegalStateException} when the downstream can't keep up
         */
        ERROR,
        /**
         * Drop the incoming signal if the downstream is not ready to receive it.
         */
        DROP,
        /**
         * Downstream will get only the latest signals from upstream.
         */
        LATEST,
        /**
         * Buffer all signals if the downstream can't keep up.
         * <p>
         * Warning! This does unbounded buffering and may lead to {@link OutOfMemoryError}.
         */
        BUFFER
    }

It can be seen that BUFFER uses unbounded queues and may generate OOM

Example

    @Test
    public void testFluxOOM() throws InterruptedException {
        final Flux<Integer> flux = Flux.<Integer> create(fluxSink -> {
            //NOTE sink:class reactor.core.publisher.FluxCreate$SerializedSink
            LOGGER.info("sink:{}",fluxSink.getClass());
            while (true) {
                fluxSink.next(ThreadLocalRandom.current().nextInt());
            }
        }, FluxSink.OverflowStrategy.BUFFER)
                .publishOn(Schedulers.elastic(),Integer.MAX_VALUE); //NOTE 测试OOM

        //NOTE flux:class reactor.core.publisher.FluxCreate,prefetch:-1
        LOGGER.info("flux:{},prefetch:{}",flux.getClass(),flux.getPrefetch());

        flux.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        });

        TimeUnit.MINUTES.sleep(20);
    }

Jvm parameters

-Xmx2160K -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -XX:+PrintGCDetails -Xloggc:/tmp/gc.log

Note that publishOn is used here, and the prefetch parameter is set to Integer.MAX_VALUE (The default is 256), is to reproduce OOM caused by unbounded queues

Output

java.lang.OutOfMemoryError: GC overhead limit exceeded
Dumping heap to /tmp/java_pid5295.hprof ...
Heap dump file created [6410067 bytes in 0.149 secs]
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.jar.Manifest$FastInputStream.<init>(Manifest.java:332)
    at java.util.jar.Manifest$FastInputStream.<init>(Manifest.java:327)
    at java.util.jar.Manifest.read(Manifest.java:195)
    at java.util.jar.Manifest.<init>(Manifest.java:69)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"

Process finished with exit code 137

heap dump

图片描述

MAT analysis shows that reactor.util.concurrent.spsclinkedarrayqueue holds a lot of unreleased data, and the queue is held by FluxCreate$BufferAsyncSink

    static final class BufferAsyncSink<T> extends BaseSink<T> {

        final Queue<T> queue;

        Throwable error;
        volatile boolean done;

        volatile int wip;
        @SuppressWarnings("rawtypes")
        static final AtomicIntegerFieldUpdater<BufferAsyncSink> WIP =
                AtomicIntegerFieldUpdater.newUpdater(BufferAsyncSink.class, "wip");

        BufferAsyncSink(CoreSubscriber<? super T> actual, int capacityHint) {
            super(actual);
            this.queue = Queues.<T>unbounded(capacityHint).get();
        }
        //......
    }    

reactor-core-3.1.3.RELEASE-sources.jar! /reactor/util/concurrent/Queues.java

    /**
     * Returns an unbounded, linked-array-based Queue. Integer.max sized link will
     * return the default {@link #SMALL_BUFFER_SIZE} size.
     * @param linkSize the link size
     * @param <T> the reified {@link Queue} generic type
     * @return an unbounded {@link Queue} {@link Supplier}
     */
    @SuppressWarnings("unchecked")
    public static <T> Supplier<Queue<T>> unbounded(int linkSize) {
        if (linkSize == XS_BUFFER_SIZE) {
            return XS_UNBOUNDED;
        }
        else if (linkSize == Integer.MAX_VALUE || linkSize == SMALL_BUFFER_SIZE) {
            return unbounded();
        }
        return  () -> new SpscLinkedArrayQueue<>(linkSize);
    }

You can see that Queues’ unbounded method creates an unbounded queue SpscLinkedArrayQueue to buffer data

Summary

Use Flux to pay attention to the OOM problem, but reactor’s class library has been as careful as possible to avoid this problem. api calls in ordinary scenes seem to be no problem. pay extra attention when you personalize your own parameters. this example specifically specifies prefetch as Integer.MAX_VALUE when using publishOn to cause OOM