Spring webflux returned application/stream+json

  springboot

Order

This article mainly studies the instance of returning application/stream+json by spring webflux.

maven

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

controller

    /**
     * curl -i localhost:8080/stream
     * @return
     */
    @GetMapping(value = "/stream",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Price> priceStream(){
        return Flux.interval(Duration.ofMillis(500))
                .map(l -> new Price(System.currentTimeMillis(),ThreadLocalRandom.current().nextInt(100, 125)))
                .log();
    }

Note here that productions = mediatype.application _ stream _ json _ value
If it is not application/stream+json, the caller will not be able to scroll to the result and will block waiting for the data stream to end or time out.

Output

  • Background output
2018-02-08 21:36:49.701  INFO 1270 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onSubscribe(FluxMap.MapSubscriber)
2018-02-08 21:36:49.702  INFO 1270 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : request(1)
2018-02-08 21:36:50.208  INFO 1270 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Price(timestamp=1518097010208, value=120.0))
2018-02-08 21:36:50.229  INFO 1270 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : request(31)
2018-02-08 21:36:50.708  INFO 1270 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Price(timestamp=1518097010708, value=124.0))
2018-02-08 21:36:51.208  INFO 1270 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Price(timestamp=1518097011208, value=119.0))
2018-02-08 21:36:51.707  INFO 1270 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Price(timestamp=1518097011707, value=120.0))
2018-02-08 21:36:52.207  INFO 1270 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Price(timestamp=1518097012207, value=109.0))
2018-02-08 21:36:52.707  INFO 1270 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Price(timestamp=1518097012707, value=101.0))
2018-02-08 21:36:53.208  INFO 1270 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Price(timestamp=1518097013208, value=114.0))
2018-02-08 21:36:53.707  INFO 1270 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Price(timestamp=1518097013707, value=113.0))
2018-02-08 21:36:54.206  INFO 1270 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Price(timestamp=1518097014206, value=105.0))
2018-02-08 21:36:54.708  INFO 1270 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Price(timestamp=1518097014708, value=103.0))
2018-02-08 21:36:55.208  INFO 1270 --- [     parallel-2] reactor.Flux.Map.1                       : onNext(Price(timestamp=1518097015207, value=123.0))
2018-02-08 21:36:55.212  INFO 1270 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : cancel()
  • Calling output
curl -i localhost:8080/stream
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/stream+json;charset=UTF-8

{"timestamp":1518097010208,"value":120.0}
{"timestamp":1518097010708,"value":124.0}
{"timestamp":1518097011208,"value":119.0}
{"timestamp":1518097011707,"value":120.0}
{"timestamp":1518097012207,"value":109.0}
{"timestamp":1518097012707,"value":101.0}
{"timestamp":1518097013208,"value":114.0}
{"timestamp":1518097013707,"value":113.0}
{"timestamp":1518097014206,"value":105.0}
^C

As you can see, due to the application/stream+json, the returned transfer-encoding is chunked, so the caller can scroll the output.

Paging

After using webflux, you may wonder what to do with the previous paging call. Reactive-streams use data as a data stream, so spring data reactive does not support returning Page, but the call parameter can pass the Pageable parameter.

public interface StocDao extends ReactiveCrudRepository<Stock, String> {

    Flux<Stock> findByName(String name,Pageable pageable);
}

Note that Flux<Stock > is returned here instead of Page<Stock >
This is equivalent to the loss of total count.

Summary

For the stream data of Flux returned by webflux, it is required to cooperate with the return of mediaType.application _ stream _ JSON _ value, and the caller also needs to be able to support this mediatype (WebClient supportOnly in this way can the effect of reactive streams be realized.

doc