Order
This article mainly studies sentinel’s SentinelGatewayFilter
SentinelGatewayFilter
Sentinel-1.6.2/sentinel-adapter/sentinel-spring-cloud-gateway-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/gateway/sc/SentinelGatewayFilter.java
public class SentinelGatewayFilter implements GatewayFilter, GlobalFilter {
private final GatewayParamParser<ServerWebExchange> paramParser = new GatewayParamParser<>(
new ServerWebExchangeItemParser());
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
Mono<Void> asyncResult = chain.filter(exchange);
if (route != null) {
String routeId = route.getId();
Object[] params = paramParser.parseParameterFor(routeId, exchange,
r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_ROUTE_ID);
String origin = Optional.ofNullable(GatewayCallbackManager.getRequestOriginParser())
.map(f -> f.apply(exchange))
.orElse("");
asyncResult = asyncResult.transform(
new SentinelReactorTransformer<>(new EntryConfig(routeId, EntryType.IN,
1, params, new ContextConfig(contextName(routeId), origin)))
);
}
Set<String> matchingApis = pickMatchingApiDefinitions(exchange);
for (String apiName : matchingApis) {
Object[] params = paramParser.parseParameterFor(apiName, exchange,
r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_CUSTOM_API_NAME);
asyncResult = asyncResult.transform(
new SentinelReactorTransformer<>(new EntryConfig(apiName, EntryType.IN, 1, params))
);
}
return asyncResult;
}
private String contextName(String route) {
return SentinelGatewayConstants.GATEWAY_CONTEXT_ROUTE_PREFIX + route;
}
Set<String> pickMatchingApiDefinitions(ServerWebExchange exchange) {
return GatewayApiMatcherManager.getApiMatcherMap().values()
.stream()
.filter(m -> m.test(exchange))
.map(WebExchangeApiMatcher::getApiName)
.collect(Collectors.toSet());
}
}
- SentinelGatewayFilter implements GatewayFilter and GlobalFilter interfaces; Its filter method is mainly to obtain route information, and then transform asyncResult. SentinelReactorTransformer is used here.
SentinelReactorTransformer
Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorTransformer.java
public class SentinelReactorTransformer<T> implements Function<Publisher<T>, Publisher<T>> {
private final EntryConfig entryConfig;
public SentinelReactorTransformer(String resourceName) {
this(new EntryConfig(resourceName));
}
public SentinelReactorTransformer(EntryConfig entryConfig) {
AssertUtil.notNull(entryConfig, "entryConfig cannot be null");
this.entryConfig = entryConfig;
}
@Override
public Publisher<T> apply(Publisher<T> publisher) {
if (publisher instanceof Mono) {
return new MonoSentinelOperator<>((Mono<T>) publisher, entryConfig);
}
if (publisher instanceof Flux) {
return new FluxSentinelOperator<>((Flux<T>) publisher, entryConfig);
}
throw new IllegalStateException("Publisher type is not supported: " + publisher.getClass().getCanonicalName());
}
}
- SentinelReactorTransformer used entryConfig to create MonoSentinelOperator Monosenello.
MonoSentinelOperator
Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperator.java
public class MonoSentinelOperator<T> extends MonoOperator<T, T> {
private final EntryConfig entryConfig;
public MonoSentinelOperator(Mono<? extends T> source, EntryConfig entryConfig) {
super(source);
AssertUtil.notNull(entryConfig, "entryConfig cannot be null");
this.entryConfig = entryConfig;
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new SentinelReactorSubscriber<>(entryConfig, actual, true));
}
}
- When subscribing, MonoSentinelOperator uses sentinelreactor subscribe.
FluxSentinelOperator
Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperator.java
public class FluxSentinelOperator<T> extends FluxOperator<T, T> {
private final EntryConfig entryConfig;
public FluxSentinelOperator(Flux<? extends T> source, EntryConfig entryConfig) {
super(source);
AssertUtil.notNull(entryConfig, "entryConfig cannot be null");
this.entryConfig = entryConfig;
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new SentinelReactorSubscriber<>(entryConfig, actual, false));
}
}
- FluxSentinelOperator uses sentinelreactor subscribe when subscribing.
SentinelReactorSubscriber
Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java
public class SentinelReactorSubscriber<T> extends InheritableBaseSubscriber<T> {
private final EntryConfig entryConfig;
private final CoreSubscriber<? super T> actual;
private final boolean unary;
private volatile AsyncEntry currentEntry;
private final AtomicBoolean entryExited = new AtomicBoolean(false);
public SentinelReactorSubscriber(EntryConfig entryConfig,
CoreSubscriber<? super T> actual,
boolean unary) {
checkEntryConfig(entryConfig);
this.entryConfig = entryConfig;
this.actual = actual;
this.unary = unary;
}
private void checkEntryConfig(EntryConfig config) {
AssertUtil.notNull(config, "entryConfig cannot be null");
}
@Override
public Context currentContext() {
if (currentEntry == null || entryExited.get()) {
return actual.currentContext();
}
com.alibaba.csp.sentinel.context.Context sentinelContext = currentEntry.getAsyncContext();
if (sentinelContext == null) {
return actual.currentContext();
}
return actual.currentContext()
.put(SentinelReactorConstants.SENTINEL_CONTEXT_KEY, currentEntry.getAsyncContext());
}
private void doWithContextOrCurrent(Supplier<Optional<com.alibaba.csp.sentinel.context.Context>> contextSupplier,
Runnable f) {
Optional<com.alibaba.csp.sentinel.context.Context> contextOpt = contextSupplier.get();
if (!contextOpt.isPresent()) {
// Provided context is absent, use current context.
f.run();
} else {
// Run on provided context.
ContextUtil.runOnContext(contextOpt.get(), f);
}
}
private void entryWhenSubscribed() {
ContextConfig sentinelContextConfig = entryConfig.getContextConfig();
if (sentinelContextConfig != null) {
// If current we're already in a context, the context config won't work.
ContextUtil.enter(sentinelContextConfig.getContextName(), sentinelContextConfig.getOrigin());
}
try {
AsyncEntry entry = SphU.asyncEntry(entryConfig.getResourceName(), entryConfig.getEntryType(),
entryConfig.getAcquireCount(), entryConfig.getArgs());
this.currentEntry = entry;
actual.onSubscribe(this);
} catch (BlockException ex) {
// Mark as completed (exited) explicitly.
entryExited.set(true);
// Signal cancel and propagate the {@code BlockException}.
cancel();
actual.onSubscribe(this);
actual.onError(ex);
} finally {
if (sentinelContextConfig != null) {
ContextUtil.exit();
}
}
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
doWithContextOrCurrent(() -> currentContext().getOrEmpty(SentinelReactorConstants.SENTINEL_CONTEXT_KEY),
this::entryWhenSubscribed);
}
@Override
protected void hookOnNext(T value) {
if (isDisposed()) {
tryCompleteEntry();
return;
}
doWithContextOrCurrent(() -> Optional.ofNullable(currentEntry).map(AsyncEntry::getAsyncContext),
() -> actual.onNext(value));
if (unary) {
// For some cases of unary operator (Mono), we have to do this during onNext hook.
// e.g. this kind of order: onSubscribe() -> onNext() -> cancel() -> onComplete()
// the onComplete hook will not be executed so we'll need to complete the entry in advance.
tryCompleteEntry();
}
}
@Override
protected void hookOnComplete() {
tryCompleteEntry();
actual.onComplete();
}
@Override
protected boolean shouldCallErrorDropHook() {
// When flow control triggered or stream terminated, the incoming
// deprecated exceptions should be dropped implicitly, so we'll not call the `onErrorDropped` hook.
return !entryExited.get();
}
@Override
protected void hookOnError(Throwable t) {
if (currentEntry != null && currentEntry.getAsyncContext() != null) {
// Normal requests with non-BlockException will go through here.
Tracer.traceContext(t, 1, currentEntry.getAsyncContext());
}
tryCompleteEntry();
actual.onError(t);
}
@Override
protected void hookOnCancel() {
}
private boolean tryCompleteEntry() {
if (currentEntry != null && entryExited.compareAndSet(false, true)) {
currentEntry.exit(1, entryConfig.getArgs());
return true;
}
return false;
}
}
- SentinelReactorSubscriber Inherited InheritableBaseSubscriber (
Copy from reactor.core.publisher.basesubscriber, allowing subclasses to override onSubscribe, onNext, onError, onComplete methods.
) - HookOnSubscribe calls entryWhenSubscribed here. It will first execute ContextUtil.enter when sentinelContextConfig is not null, then create AsyncEntry with SphU.asyncEntry, and finally execute ContextUtil.exit (), when sentinelContextConfig is not null, in final.
- HookOnNext, hookOnComplete, and hookOnError all call tryCompleteEntry methods, which mainly try to exit AsyncEntry.
Summary
- SentinelGatewayFilter implements GatewayFilter and GlobalFilter interfaces; Its filter method is mainly to obtain route information, and then transform asyncResult. SentinelReactorTransformer is used here.
- SentinelReactorTransformer used entryConfig to create Monosenel Operator or Monosenel Operator; When they subscribe, they use SentinelReactorSubscriber.
- SentinelReactorSubscriber mainly call the entryWhenSubscribed method to create AsyncEntry when hookOnSubscribe, and call the tryCompleteEntry method when hookOnNext, hookOnComplete, and hookOnError to try to exit AsyncEntry.