Talk about the streaming-media-types property of spring cloud gateway.

  springcloud

Order

This paper mainly studies the streaming-media-types attribute of spring cloud gateway.

Configuration

Configuration description

    {
      "sourceType": "org.springframework.cloud.gateway.config.GatewayProperties",
      "name": "spring.cloud.gateway.streaming-media-types",
      "type": "java.util.List<org.springframework.http.MediaType>"
    }

GatewayProperties

spring-cloud-gateway-core-2.0.0.RC2-sources.jar! /org/springframework/cloud/gateway/config/GatewayProperties.java

@ConfigurationProperties("spring.cloud.gateway")
@Validated
public class GatewayProperties {

    /**
     * List of Routes
     */
    @NotNull
    @Valid
    private List<RouteDefinition> routes = new ArrayList<>();

    /**
     * List of filter definitions that are applied to every route.
     */
    private List<FilterDefinition> defaultFilters = new ArrayList<>();

    private List<MediaType> streamingMediaTypes = Arrays.asList(MediaType.TEXT_EVENT_STREAM,
            MediaType.APPLICATION_STREAM_JSON);

    public List<RouteDefinition> getRoutes() {
        return routes;
    }

    public void setRoutes(List<RouteDefinition> routes) {
        this.routes = routes;
    }

    public List<FilterDefinition> getDefaultFilters() {
        return defaultFilters;
    }

    public void setDefaultFilters(List<FilterDefinition> defaultFilters) {
        this.defaultFilters = defaultFilters;
    }

    public List<MediaType> getStreamingMediaTypes() {
        return streamingMediaTypes;
    }

    public void setStreamingMediaTypes(List<MediaType> streamingMediaTypes) {
        this.streamingMediaTypes = streamingMediaTypes;
    }

    @Override
    public String toString() {
        return "GatewayProperties{" +
                "routes=" + routes +
                ", defaultFilters=" + defaultFilters +
                ", streamingMediaTypes=" + streamingMediaTypes +
                '}';
    }
}

Text _ event _ stream (text/event-stream)、MediaType.APPLICATION_STREAM_JSON(application/stream+json)

Use

GatewayAutoConfiguration

spring-cloud-gateway-core-2.0.0.RC2-sources.jar! /org/springframework/cloud/gateway/config/GatewayAutoConfiguration.java

    @Configuration
    @ConditionalOnClass(HttpClient.class)
    protected static class NettyConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public HttpClient httpClient(@Qualifier("nettyClientOptions") Consumer<? super HttpClientOptions.Builder> options) {
            return HttpClient.create(options);
        }

        //......

        @Bean
        public HttpClientProperties httpClientProperties() {
            return new HttpClientProperties();
        }

        @Bean
        public NettyRoutingFilter routingFilter(HttpClient httpClient,
                                                ObjectProvider<List<HttpHeadersFilter>> headersFilters) {
            return new NettyRoutingFilter(httpClient, headersFilters);
        }

        @Bean
        public NettyWriteResponseFilter nettyWriteResponseFilter(GatewayProperties properties) {
            return new NettyWriteResponseFilter(properties.getStreamingMediaTypes());
        }

        @Bean
        public ReactorNettyWebSocketClient reactorNettyWebSocketClient(@Qualifier("nettyClientOptions") Consumer<? super HttpClientOptions.Builder> options) {
            return new ReactorNettyWebSocketClient(options);
        }
    }

NettyWriteResponseFilter here uses Properties. GetStreamingMediaTypes ()

NettyWriteResponseFilter

spring-cloud-gateway-core-2.0.0.RC2-sources.jar! /org/springframework/cloud/gateway/filter/NettyWriteResponseFilter.javac

public class NettyWriteResponseFilter implements GlobalFilter, Ordered {

    private static final Log log = LogFactory.getLog(NettyWriteResponseFilter.class);

    public static final int WRITE_RESPONSE_FILTER_ORDER = -1;

    private final List<MediaType> streamingMediaTypes;

    public NettyWriteResponseFilter(List<MediaType> streamingMediaTypes) {
        this.streamingMediaTypes = streamingMediaTypes;
    }

    @Override
    public int getOrder() {
        return WRITE_RESPONSE_FILTER_ORDER;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
        // until the WebHandler is run
        return chain.filter(exchange).then(Mono.defer(() -> {
            HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);

            if (clientResponse == null) {
                return Mono.empty();
            }
            log.trace("NettyWriteResponseFilter start");
            ServerHttpResponse response = exchange.getResponse();

            NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
            //TODO: what if it's not netty

            final Flux<NettyDataBuffer> body = clientResponse.receive()
                    .retain() //TODO: needed?
                    .map(factory::wrap);

            MediaType contentType = response.getHeaders().getContentType();
            return (isStreamingMediaType(contentType) ?
                    response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body));
        }));
    }

    //TODO: use framework if possible
    //TODO: port to WebClientWriteResponseFilter
    private boolean isStreamingMediaType(@Nullable MediaType contentType) {
        return (contentType != null && this.streamingMediaTypes.stream()
                        .anyMatch(contentType::isCompatibleWith));
    }

}

It can be seen that whether it is stream type is judged according to the isStreamingMediaType method. If yes, the writeAndFlushWith is adopted, otherwise, the WriteWithmethod is adopted.

ReactiveHttpOutputMessage

spring-web-5.0.6.RELEASE-sources.jar! /org/springframework/http/ReactiveHttpOutputMessage.java

/**
 * A "reactive" HTTP output message that accepts output as a {@link Publisher}.
 *
 * <p>Typically implemented by an HTTP request on the client-side or an
 * HTTP response on the server-side.
 *
 * @author Arjen Poutsma
 * @author Sebastien Deleuze
 * @since 5.0
 */
public interface ReactiveHttpOutputMessage extends HttpMessage {

    /**
     * Return a {@link DataBufferFactory} that can be used to create the body.
     * @return a buffer factory
     * @see #writeWith(Publisher)
     */
    DataBufferFactory bufferFactory();

    /**
     * Register an action to apply just before the HttpOutputMessage is committed.
     * <p><strong>Note:</strong> the supplied action must be properly deferred,
     * e.g. via {@link Mono#defer} or {@link Mono#fromRunnable}, to ensure it's
     * executed in the right order, relative to other actions.
     * @param action the action to apply
     */
    void beforeCommit(Supplier<? extends Mono<Void>> action);

    /**
     * Whether the HttpOutputMessage is committed.
     */
    boolean isCommitted();

    /**
     * Use the given {@link Publisher} to write the body of the message to the
     * underlying HTTP layer.
     * @param body the body content publisher
     * @return a {@link Mono} that indicates completion or error
     */

    Mono<Void> writeWith(Publisher<? extends DataBuffer> body);

    /**
     * Use the given {@link Publisher} of {@code Publishers} to write the body
     * of the HttpOutputMessage to the underlying HTTP layer, flushing after
     * each {@code Publisher<DataBuffer>}.
     * @param body the body content publisher
     * @return a {@link Mono} that indicates completion or error
     */
    Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body);

    /**
     * Indicate that message handling is complete, allowing for any cleanup or
     * end-of-processing tasks to be performed such as applying header changes
     * made via {@link #getHeaders()} to the underlying HTTP message (if not
     * applied already).
     * <p>This method should be automatically invoked at the end of message
     * processing so typically applications should not have to invoke it.
     * If invoked multiple times it should have no side effects.
     * @return a {@link Mono} that indicates completion or error
     */
    Mono<Void> setComplete();

}

As can be seen from the comments of the interface, writeWith is different from writeAndFlushWith’s parameter generics. one is Publisher <? Extends DataBuffer >, one is Publisher <? extends Publisher<? extends DataBuffer>>。 While writeAndFlushWith flush after every Publisher<DataBuffer > writes.

Summary

NettyWriteresponseFilter determines whether writeAndFlushWith or writeWith according to the type configured by Spring.Cloud.Gateway.Streaming-Media-Types. If it is the specified type, writeAndFlushWith is selected to write to Response. Text _ event _ stream (text/event-stream)、MediaType.APPLICATION_STREAM_JSON(application/stream+json) These two types.

doc