Order
This article mainly studies dubbo’s Filter
Filter
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
@SPI
public interface Filter {
/**
* Does not need to override/implement this method.
*/
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
/**
* Filter itself should only be response for passing invocation, all callbacks has been placed into {@link Listener}
*
* @param appResponse
* @param invoker
* @param invocation
* @return
*/
@Deprecated
default Result onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
return appResponse;
}
interface Listener {
void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}
}
- Filter defines invoke and onResponse methods, and also defines Listener interface, which defines onResponse and onError methods.
ProtocolFilterWrapper
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
// onError callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
return asyncResult;
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
@Override
public int getDefaultPort() {
return protocol.getDefaultPort();
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
@Override
public void destroy() {
protocol.destroy();
}
static class CallbackRegistrationInvoker<T> implements Invoker<T> {
private final Invoker<T> filterInvoker;
private final List<Filter> filters;
public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {
this.filterInvoker = filterInvoker;
this.filters = filters;
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = filterInvoker.invoke(invocation);
asyncResult.thenApplyWithContext(r -> {
for (int i = filters.size() - 1; i >= 0; i--) {
Filter filter = filters.get(i);
// onResponse callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onResponse(r, filterInvoker, invocation);
}
} else {
filter.onResponse(r, filterInvoker, invocation);
}
}
return r;
});
return asyncResult;
}
@Override
public Class<T> getInterface() {
return filterInvoker.getInterface();
}
@Override
public URL getUrl() {
return filterInvoker.getUrl();
}
@Override
public boolean isAvailable() {
return filterInvoker.isAvailable();
}
@Override
public void destroy() {
filterInvoker.destroy();
}
}
}
- The ProtocolFilterWrapper implements the Protocol interface, which defines a static class CallbackRegistrationinvoker, which implements the invoker interface. Its Invoke method first calls the Invoke method of filterInvoker to obtain asyncResult. After that, the callback upon completion of rpc call is registered through thenApplyWithContext. Here, filters will be traversed one by one to call back the onResponse method of each filter.
Summary
- Filter defines invoke and onResponse methods, and also defines Listener interface, which defines onResponse and onError methods.
- The Result returned by the invoke method defined by Filter has an abstract class called AbstractResult, while AbstractResult has several subclasses, namely AppResponse, AsyncRpcResult (
Replace the original RpcResult.
) - ProtocolFilterWrapper defines a static class CallbackRegistrationInvoker, which implements the Invoker interface. its invoker method first calls the invoker method of filterInvoker to obtain asyncResult, and then registers the callback at the completion of rpc call through thenApplyWithContext. here, filters are traversed one by one to call back the onResponse method of each filter.