Category : reactor

Order This article mainly studies the difference between flux and flatMap. map @Test public void testMap() throws InterruptedException { Flux.just(1, 2, 3, 4) .log() .map(i -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return i * 2; }) .subscribe(e -> LOGGER.info(“get:{}”,e)); } Here the map of the header is a pure ..

Read more

Order This article mainly studies the retry of reactor extra. maven <dependency> <groupId>io.projectreactor.addons</groupId> <artifactId>reactor-extra</artifactId> <version>3.1.4.RELEASE</version> </dependency> Example TcpClient client = TcpClient.create(“localhost”, 8888); client.newHandler((inbound,outbound) -> { return outbound.sendString(Mono.just(“Hello World!”)).then(inbound.receive() .asString().next().log().then()); }).doOnError(e -> e.printStackTrace()) .subscribe(); The TcpClient above will directly report an error when the server is not started. io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8888 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at ..

Read more

Order This article mainly studies how to use webflux to improve the efficiency of data export. Traditional export @GetMapping(“/download-old”) public ResponseEntity<Resource> downloadInOldWays(){ return ResponseEntity.ok() .header(HttpHeaders.CONTENT_DISPOSITION, “attachment; filename=demo.xls”) .header(“Accept-Ranges”, “bytes”) .body(new ByteArrayResource(exportBytes(1000))); } public byte[] exportBytes(int dataRow){ StringBuilder output = new StringBuilder(); output.append(ExcelUtil.startWorkbook()); output.append(ExcelUtil.startSheet()); output.append(ExcelUtil.startTable()); output.append(ExcelUtil.writeTitleRow(Sets.newHashSet(“title”,”content”))); IntStream.rangeClosed(1,dataRow).forEach(i -> { try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) ..

Read more

Order This paper mainly studies the mechanism of FluxInterval FluxInterval reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/FluxInterval.java /** * Periodically emits an ever increasing long value either via a ScheduledExecutorService * or a custom async callback function * @see <a href=”https://github.com/reactor/reactive-streams-commons”>Reactive-Streams-Commons</a> */ final class FluxInterval extends Flux<Long> { final Scheduler timedScheduler; final long initialDelay; final long period; final TimeUnit unit; ..

Read more

Order This paper mainly studies the mechanism of FluxSink FluxSink reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/FluxSink.java /** * Wrapper API around a downstream Subscriber for emitting any number of * next signals followed by zero or one onError/onComplete. * <p> * @param <T> the value type */ public interface FluxSink<T> { /** * @see Subscriber#onComplete() */ void complete(); /** ..

Read more

Order This paper mainly studies the encapsulation of HttpClient of reactor-netty by WebClient of spring 5. DefaultWebClientBuilder spring-webflux-5.0.2.RELEASE-sources.jar! /org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java @Override public WebClient build() { ExchangeFunction exchange = initExchangeFunction(); ExchangeFunction filteredExchange = (this.filters != null ? this.filters.stream() .reduce(ExchangeFilterFunction::andThen) .map(filter -> filter.apply(exchange)) .orElse(exchange) : exchange); return new DefaultWebClient(filteredExchange, initUriBuilderFactory(), unmodifiableCopy(this.defaultHeaders), unmodifiableCopy(this.defaultCookies), new DefaultWebClientBuilder(this)); } The build here ..

Read more

Order This paper mainly studies the OOM generation scenarios of Flux FluxSink.OverflowStrategy reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/FluxSink.java /** * Enumeration for backpressure handling. */ enum OverflowStrategy { /** * Completely ignore downstream backpressure requests. * <p> * This may yield {@link IllegalStateException} when queues get full downstream. */ IGNORE, /** * Signal an {@link IllegalStateException} when the downstream ..

Read more

Order This paper mainly studies the encapsulation of TcpClien by HttpClient in reactor-netty maven <dependency> <groupId>io.projectreactor.ipc</groupId> <artifactId>reactor-netty</artifactId> <version>0.7.3.RELEASE</version> </dependency> Example HttpClient client = HttpClient.create(); Mono<HttpClientResponse> mono = client.get(“http://baidu.com”); //NOTE reactor.ipc.netty.http.client.MonoHttpClientResponse LOGGER.info(“mono resp:{}”,mono.getClass()); mono.subscribe(); HttpClient.request reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/http/client/HttpClient.java /** * Use the passed HTTP method to send to the given URL. When connection has been made, * ..

Read more

Order This article mainly studies the process of creating TcpClient in reactor-netty. maven <dependency> <groupId>io.projectreactor.ipc</groupId> <artifactId>reactor-netty</artifactId> <version>0.7.3.RELEASE</version> </dependency> TcpClient reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/tcp/TcpClient.java protected TcpClient(TcpClient.Builder builder) { ClientOptions.Builder<?> clientOptionsBuilder = ClientOptions.builder(); if (Objects.nonNull(builder.options)) { builder.options.accept(clientOptionsBuilder); } if (!clientOptionsBuilder.isLoopAvailable()) { clientOptionsBuilder.loopResources(TcpResources.get()); } if (!clientOptionsBuilder.isPoolAvailable() && !clientOptionsBuilder.isPoolDisabled()) { clientOptionsBuilder.poolResources(TcpResources.get()); } this.options = clientOptionsBuilder.build(); } LoopResources and poolResources were actually ..

Read more

Order This paper mainly studies the newHandler process of TcpClient in reactor-netty maven <dependency> <groupId>io.projectreactor.ipc</groupId> <artifactId>reactor-netty</artifactId> <version>0.7.3.RELEASE</version> </dependency> TcpClient.newHandler reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/tcp/TcpClient.java /** * @param handler * @param address * @param secure * @param onSetup * * @return a new Mono to connect on subscribe */ protected Mono<NettyContext> newHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends ..

Read more

Order This article mainly studies how the TcpClient of reactor-netty submits task to eventLoop. Example TcpClient client = TcpClient.create(“localhost”, 8888); LOGGER.info(“client:{}”,client.getClass()); Mono<? extends NettyContext> handler = client.newHandler((inbound,outbound) -> { return outbound.sendString(Mono.just(“Hello World!”)).then(inbound.receive() .asString().next().log().then()); }); LOGGER.info(“handler:{}”,handler.getClass()); //NOTE reactor.core.publisher.MonoCreate handler.subscribe(); TcpClient.newHandler reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/tcp/TcpClient.java protected Mono<NettyContext> newHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler, InetSocketAddress address, ..

Read more

Order This paper mainly studies variable transfer of reactor asynchronous thread The problem with threadlocal In the traditional request/response synchronization mode, it is very convenient to use threadlocal to pass context variables, which can save adding common variables to each method parameter, such as the currently logged-in user. However, the business method may use async ..

Read more