FluxInterval instance and analysis

  reactor

Order

This paper mainly studies the mechanism of FluxInterval

FluxInterval

reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/FluxInterval.java

/**
 * Periodically emits an ever increasing long value either via a ScheduledExecutorService
 * or a custom async callback function
 * @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
 */
final class FluxInterval extends Flux<Long> {

    final Scheduler timedScheduler;
    
    final long initialDelay;
    
    final long period;
    
    final TimeUnit unit;

    FluxInterval(
            long initialDelay, 
            long period, 
            TimeUnit unit, 
            Scheduler timedScheduler) {
        if (period < 0L) {
            throw new IllegalArgumentException("period >= 0 required but it was " + period);
        }
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = Objects.requireNonNull(unit, "unit");
        this.timedScheduler = Objects.requireNonNull(timedScheduler, "timedScheduler");
    }
    
    @Override
    public void subscribe(CoreSubscriber<? super Long> actual) {
        Worker w = timedScheduler.createWorker();

        IntervalRunnable r = new IntervalRunnable(actual, w);

        actual.onSubscribe(r);

        try {
            w.schedulePeriodically(r, initialDelay, period, unit);
        }
        catch (RejectedExecutionException ree) {
            if (!r.cancelled) {
                actual.onError(Operators.onRejectedExecution(ree, r, null, null,
                        actual.currentContext()));
            }
        }
    }
}    

You can see here that Scheduler is used to create an IntervalRunnable scheduled task.

IntervalRunnable

    static final class IntervalRunnable implements Runnable, Subscription,
                                                   InnerProducer<Long> {
        final CoreSubscriber<? super Long> actual;
        
        final Worker worker;
        
        volatile long requested;
        static final AtomicLongFieldUpdater<IntervalRunnable> REQUESTED =
                AtomicLongFieldUpdater.newUpdater(IntervalRunnable.class, "requested");
        
        long count;
        
        volatile boolean cancelled;

        IntervalRunnable(CoreSubscriber<? super Long> actual, Worker worker) {
            this.actual = actual;
            this.worker = worker;
        }

        @Override
        public CoreSubscriber<? super Long> actual() {
            return actual;
        }

        @Override
        @Nullable
        public Object scanUnsafe(Attr key) {
            if (key == Attr.CANCELLED) return cancelled;

            return InnerProducer.super.scanUnsafe(key);
        }

        @Override
        public void run() {
            if (!cancelled) {
                if (requested != 0L) {
                    actual.onNext(count++);
                    if (requested != Long.MAX_VALUE) {
                        REQUESTED.decrementAndGet(this);
                    }
                } else {
                    cancel();
                    
                    actual.onError(Exceptions.failWithOverflow("Could not emit tick " + count + " due to lack of requests" +
                            " (interval doesn't support small downstream requests that replenish slower than the ticks)"));
                }
            }
        }
        
        @Override
        public void request(long n) {
            if (Operators.validate(n)) {
                Operators.addCap(REQUESTED, this, n);
            }
        }
        
        @Override
        public void cancel() {
            if (!cancelled) {
                cancelled = true;
                worker.dispose();
            }
        }
    }

The emphasis here is on the requested variable. the run method determines requested each time. if requested is 0, the worker is destroyed, otherwise, the count of one element is reduced by one every time it is emitted.
The subscriber will increase the requested value if it continues to request.

Example 1

    public static void main(String[] args) throws InterruptedException {
        Flux<Long> flux = Flux.interval(Duration.ofMillis(1))
                .doOnNext(e -> {
                    System.out.println(e);
                }).doOnError(e -> e.printStackTrace());

        System.out.println("begin to subscribe");
        flux.subscribe(e -> {
            System.out.println(e);
            try {
                TimeUnit.MINUTES.sleep(30);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        });
        TimeUnit.MINUTES.sleep(30);
    }

This example requested is Long.MAX_VALUE, but because the subscribe thread is the same as the thread running interval, the scheduling of interval is blocked due to the sleep operation.

Example 2

    public static void main(String[] args) throws InterruptedException {
        Flux<Long> flux = Flux.interval(Duration.ofMillis(1))
                .doOnNext(e -> {
                    System.out.println(e);
                })
                //NOTE 这里request prefetch=256个
                .publishOn(Schedulers.newElastic("publish-thread"))
                .doOnError(e -> e.printStackTrace());

        System.out.println("begin to subscribe");
        AtomicInteger count = new AtomicInteger(0);
        //NOTE 得有subscribe才能触发request
        flux.subscribe(e -> {
            LOGGER.info("receive:{}",e);
            try {
                //NOTE 使用publishOn将subscribe与interval的线程分开
                if(count.get() == 0){
                    TimeUnit.MINUTES.sleep(2);
                }
                count.incrementAndGet();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        });
        TimeUnit.MINUTES.sleep(30);
    }

Use publishOn to isolate the subscriber thread from the interval thread so that its sleep does not block interval.
PublishOn here implies a prefetch parameter, which defaults to Queues.SMALL_BUFFER_SIZE, i.e. math.max (16, integer.parsign (system.getproperty (“reactor.buffer size.small”, “256”));

    public final Flux<T> publishOn(Scheduler scheduler) {
        return publishOn(scheduler, Queues.SMALL_BUFFER_SIZE);
    }

    final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch, int lowTide) {
        if (this instanceof Callable) {
            if (this instanceof Fuseable.ScalarCallable) {
                @SuppressWarnings("unchecked")
                Fuseable.ScalarCallable<T> s = (Fuseable.ScalarCallable<T>) this;
                try {
                    return onAssembly(new FluxSubscribeOnValue<>(s.call(), scheduler));
                }
                catch (Exception e) {
                    //leave FluxSubscribeOnCallable defer exception call
                }
            }
            @SuppressWarnings("unchecked")
            Callable<T> c = (Callable<T>)this;
            return onAssembly(new FluxSubscribeOnCallable<>(c, scheduler));
        }

        return onAssembly(new FluxPublishOn<>(this, scheduler, delayError, prefetch, lowTide, Queues.get(prefetch)));
    }

Here, use Queues.get(prefetch) to create an indirect queue to hold elements

The final output of this instance

//......
21:06:03.108 [publish-thread-2] INFO com.example.demo.FluxTest - receive:254
21:06:03.108 [publish-thread-2] INFO com.example.demo.FluxTest - receive:255
reactor.core.Exceptions$OverflowException: Could not emit tick 256 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:121)
    at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Since the default value of the first request is 256, the subscriber failed to keep up after 256 elements were emitted, resulting in the cancellation of interval’s worker, and the OverflowException was immediately following the subsequent consumption of 256 elements.

Summary

Reactor itself does not depend on threads. Only methods such as interval and delayElements can create threads. However, reactor itself is an extension of the observer design mode. push+backpressure mode is adopted. when the subscribe method is called at the beginning, requesten is triggered to request to push data. then publisher will push data onNext until complete or cancel. Example 1 is because thread blocking causes onNext blocking of interval, and Example 2 is because interval is cancelled causing flux to close.