Category : reactive-programming

Order This article mainly studies the reactive streams and observer mode in java. reactive streams Reactive programming paradigm is an asynchronous programming paradigm, which mainly involves data flow and propagation of changes. It can be seen as an extension of observer design pattern. Iterator in java is based on pull model, that is, subscribers use ..

Read more

Order This article mainly talks about two abstract classes Mono and Flux of Publisher interface of reactive streams. Publisher reactive-streams-1.0.1-sources.jar! /org/reactivestreams/Publisher.java /** * A {@link Publisher} is a provider of a potentially unbounded number of sequenced elements, publishing them according to * the demand received from its {@link Subscriber}(s). * <p> * A {@link Publisher} ..

Read more

Order This article mainly studies the doOn method of reactive streams Publisher. DoOn series method Let’s take Flux as an example.reactor-core-3.1.2.RELEASE-sources.jar! /reactor/core/publisher/Flux.java doOnSubscribe /** * Add behavior (side-effect) triggered when the {@link Flux} is subscribed. * <p> * <img class=”marble” src=”https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/doonsubscribe.png” alt=””> * <p> * @param onSubscribe the callback to call on {@link Subscriber#onSubscribe} * ..

Read more

Order This paper mainly studies the backpressure of reactive streams. The difference between reactive streams and traditional streams @Test public void testShowReactiveStreams() throws InterruptedException { Flux.interval(Duration.ofMillis(1000)) .take(500) .subscribe(e -> LOGGER.info(“get {}”,e)); Thread.sleep(5*60*1000); } Examples of output are as follows: 18:52:34.118 [main] DEBUG reactor.util.Loggers$LoggerFactory – Using Slf4j logging framework 18:52:35.157 [parallel-2] INFO com.example.demo.FluxTest – get 0 ..

Read more

Order This article mainly studies the schedulers of reactive streams. Background By default, Mono and Flux both run on the main thread, sometimes blocking the main thread. You can set schedulers to run on other threads. Original output When publishOn and subscribeOn are not used, the output is as follows 11:26:10.668 [main] DEBUG reactor.util.Loggers$LoggerFactory – ..

Read more

Order This paper mainly studies the parallel operation mode of flux of reactive streams. Purpose In some IO operations, such as reading files and accessing databases, asynchronous threads are usually recommended to run in parallel mode to improve performance. Example @Test public void testParallelRunOn(){ Flux.range(1, 1000) .log() .parallel(8) .runOn(Schedulers.parallel()) //parallel flux .sequential() //必须使用sequential来将这些异步线程的执行结果汇集成一个stream .map(e -> ..

Read more

Order This article mainly studies processors of reactive streams. Processors classification Processors are both Publisher and Subscriber. Processor have many implementations in project reactor, and their classification is roughly as follows: direct(DirectProcessor and UnicastProcessor) synchronous(EmitterProcessor and ReplayProcessor) asynchronous(TopicProcessor and WorkQueueProcessor) direct DirectProcessor It does not support the backpressure feature. if publisher publishes n data and ..

Read more

Order This article mainly shows some transform operations of reactive streams. mergeWith @Test public void testMerge(){ Flux<String> flux1 = Flux.interval(Duration.ofSeconds(1)) .take(3) .map(e -> “[flux1]:”+e); Flux<String> mergeFlux = Flux.interval(Duration.ofSeconds(1)) .delayElements(Duration.ofSeconds(1)) .take(3) .map(e -> “[flux2]:”+e) .mergeWith(flux1); mergeFlux.subscribe(e -> { LOGGER.info(“subscribe:{}”,e); }); mergeFlux.blockLast(); } Output instance 21:18:07.583 [main] DEBUG reactor.util.Loggers$LoggerFactory – Using Slf4j logging framework 21:18:08.618 [parallel-2] INFO ..

Read more