Talk about bulkhead of resilience4j

  resilience4j

Order

This paper mainly studies bulkhead of resilience4j

Bulkhead

resilience4j-bulkhead-0.13.0-sources.jar! /io/github/resilience4j/bulkhead/Bulkhead.java

/**
 *  A Bulkhead instance is thread-safe can be used to decorate multiple requests.
 *
 * A {@link Bulkhead} represent an entity limiting the amount of parallel operations. It does not assume nor does it mandate usage
 * of any particular concurrency and/or io model. These details are left for the client to manage. This bulkhead, depending on the
 * underlying concurrency/io model can be used to shed load, and, where it makes sense, limit resource use (i.e. limit amount of
 * threads/actors involved in a particular flow, etc).
 *
 * In order to execute an operation protected by this bulkhead, a permission must be obtained by calling {@link Bulkhead#isCallPermitted()}
 * If the bulkhead is full, no additional operations will be permitted to execute until space is available.
 *
 * Once the operation is complete, regardless of the result, client needs to call {@link Bulkhead#onComplete()} in order to maintain
 * integrity of internal bulkhead state.
 *
 */
public interface Bulkhead {

    /**
     * Dynamic bulkhead configuration change.
     * NOTE! New `maxWaitTime` duration won't affect threads that are currently waiting for permission.
     * @param newConfig new BulkheadConfig
     */
    void changeConfig(BulkheadConfig newConfig);

    /**
     * Attempts to acquire a permit, which allows an call to be executed.
     *
     * @return boolean whether a call should be executed
     */
    boolean isCallPermitted();

    /**
     * Records a completed call.
     */
    void onComplete();

    /**
     * Returns the name of this bulkhead.
     *
     * @return the name of this bulkhead
     */
    String getName();

    /**
     * Returns the BulkheadConfig of this Bulkhead.
     *
     * @return bulkhead config
     */
    BulkheadConfig getBulkheadConfig();

    /**
     * Get the Metrics of this Bulkhead.
     *
     * @return the Metrics of this Bulkhead
     */
    Metrics getMetrics();

    /**
     * Returns an EventPublisher which subscribes to the reactive stream of BulkheadEvent and
     * can be used to register event consumers.
     *
     * @return an EventPublisher
     */
    EventPublisher getEventPublisher();

    //......

    /**
     * Returns a callable which is decorated by a bulkhead.
     *
     * @param bulkhead the bulkhead
     * @param callable the original Callable
     * @param <T> the result type of callable
     *
     * @return a supplier which is decorated by a Bulkhead.
     */
    static <T> Callable<T> decorateCallable(Bulkhead bulkhead, Callable<T> callable){
        return () -> {
            BulkheadUtils.isCallPermitted(bulkhead);
            try {
                return callable.call();
            }
            finally {
                bulkhead.onComplete();
            }
        };
    }

    /**
     * Returns a supplier which is decorated by a bulkhead.
     *
     * @param bulkhead the bulkhead
     * @param supplier the original supplier
     * @param <T> the type of results supplied by this supplier
     *
     * @return a supplier which is decorated by a Bulkhead.
     */
    static <T> Supplier<T> decorateSupplier(Bulkhead bulkhead, Supplier<T> supplier){
        return () -> {
            BulkheadUtils.isCallPermitted(bulkhead);
            try {
                return supplier.get();
            }
            finally {
                bulkhead.onComplete();
            }
        };
    }

    interface Metrics {


        /**
         * Returns the number of parallel executions this bulkhead can support at this point in time.
         *
         * @return remaining bulkhead depth
         */
        int getAvailableConcurrentCalls();
    }

    /**
     * An EventPublisher which can be used to register event consumers.
     */
    interface EventPublisher extends io.github.resilience4j.core.EventPublisher<BulkheadEvent> {

        EventPublisher onCallRejected(EventConsumer<BulkheadOnCallRejectedEvent> eventConsumer);

        EventPublisher onCallPermitted(EventConsumer<BulkheadOnCallPermittedEvent> eventConsumer);

        EventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> eventConsumer);
    }

    //......
}
  • This interface defines isCallPermitted and onComplete methods.
  • After that, many methods at the beginning of decorate are also defined, mainly executing BULKHEAD UTILS. ISCALLPERMITTED (BULKHEAD) before calling, and then calling bulkhead.onComplete ();
  • Decorate methods include decorateCheckedSupplier, decorateCompletionStage, decorateCheckedRunnable, decorateCallable, decorateSupplier, decorateConsumer, decorateCheckedConsumer、decorateRunnable、decorateFunction、decorateCheckedFunction。
  • In addition, the Metrics interface and EventPublisher interface are defined.

BulkheadUtils.isCallPermitted

resilience4j-bulkhead-0.13.0-sources.jar! /io/github/resilience4j/bulkhead/utils/BulkheadUtils.java

public final class BulkheadUtils {

    public static void isCallPermitted(Bulkhead bulkhead) {
        if(!bulkhead.isCallPermitted()) {
            throw new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName()));
        }
    }
}

Judging by bulkhead.isCallPermitted (), if not, BulkheadFullException is thrown.

SemaphoreBulkhead

resilience4j-bulkhead-0.13.0-sources.jar! /io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java

/**
 * A Bulkhead implementation based on a semaphore.
 */
public class SemaphoreBulkhead implements Bulkhead {

    private final String name;
    private final Semaphore semaphore;
    private final Object configChangesLock = new Object();
    private volatile BulkheadConfig config;
    private final BulkheadMetrics metrics;
    private final BulkheadEventProcessor eventProcessor;

    /**
     * Creates a bulkhead using a configuration supplied
     *
     * @param name           the name of this bulkhead
     * @param bulkheadConfig custom bulkhead configuration
     */
    public SemaphoreBulkhead(String name, BulkheadConfig bulkheadConfig) {
        this.name = name;
        this.config = bulkheadConfig != null ? bulkheadConfig
                : BulkheadConfig.ofDefaults();
        // init semaphore
        this.semaphore = new Semaphore(this.config.getMaxConcurrentCalls(), true);

        this.metrics = new BulkheadMetrics();
        this.eventProcessor = new BulkheadEventProcessor();
    }

    /**
     * Creates a bulkhead with a default config.
     *
     * @param name the name of this bulkhead
     */
    public SemaphoreBulkhead(String name) {
        this(name, BulkheadConfig.ofDefaults());
    }

    /**
     * Create a bulkhead using a configuration supplier
     *
     * @param name           the name of this bulkhead
     * @param configSupplier BulkheadConfig supplier
     */
    public SemaphoreBulkhead(String name, Supplier<BulkheadConfig> configSupplier) {
        this(name, configSupplier.get());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void changeConfig(final BulkheadConfig newConfig) {
        synchronized (configChangesLock) {
            int delta =  newConfig.getMaxConcurrentCalls() - config.getMaxConcurrentCalls();
            if (delta < 0) {
                semaphore.acquireUninterruptibly(-delta);
            } else if (delta > 0) {
                semaphore.release(delta);
            }
            config = newConfig;
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isCallPermitted() {

        boolean callPermitted = tryEnterBulkhead();

        publishBulkheadEvent(
                () -> callPermitted ? new BulkheadOnCallPermittedEvent(name)
                        : new BulkheadOnCallRejectedEvent(name)
        );

        return callPermitted;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void onComplete() {
        semaphore.release();
        publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name));
    }

    boolean tryEnterBulkhead() {

        boolean callPermitted = false;
        long timeout = config.getMaxWaitTime();

        if (timeout == 0) {
            callPermitted = semaphore.tryAcquire();
        } else {
            try {
                callPermitted = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException ex) {
                callPermitted = false;
            }
        }
        return callPermitted;
    }

    private void publishBulkheadEvent(Supplier<BulkheadEvent> eventSupplier) {
        if (eventProcessor.hasConsumers()) {
            eventProcessor.consumeEvent(eventSupplier.get());
        }
    }

    private final class BulkheadMetrics implements Metrics {
        private BulkheadMetrics() {
        }

        @Override
        public int getAvailableConcurrentCalls() {
            return semaphore.availablePermits();
        }
    }

    //......
}
  • SemaphoreBulkhead is a Bulkhead implemented using semaphores.
  • Semaphore is maxConcurrentCalls with the size of BulkheadConfig.
  • The isCallPermitted method calls the tryEnterBulkhead method and then publishes the BulkheadOnCallPermittedEvent event
  • The tryEnterBulkhead method is mainly to tryAcquire semaphore. if the configured maxWaitTime is not 0, it will be acquired at the specified time timeout.
  • The onComplete method mainly releases semaphores and then issues a BulkheadOnCallFinishedEvent event.
  • Publishing events using publishBulkheadEvent is delegated to eventProcessor.consumeEvent processor, which is BulkheadEventProcessor
  • BulkheadMetrics rewrites the getAvailableConcurrentCalls interface and returns semaphore.availablePermits ()

BulkheadEventProcessor

resilience4j-bulkhead-0.13.0-sources.jar! /io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java

    private class BulkheadEventProcessor extends EventProcessor<BulkheadEvent> implements EventPublisher, EventConsumer<BulkheadEvent> {

        @Override
        public EventPublisher onCallPermitted(EventConsumer<BulkheadOnCallPermittedEvent> onCallPermittedEventConsumer) {
            registerConsumer(BulkheadOnCallPermittedEvent.class, onCallPermittedEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onCallRejected(EventConsumer<BulkheadOnCallRejectedEvent> onCallRejectedEventConsumer) {
            registerConsumer(BulkheadOnCallRejectedEvent.class, onCallRejectedEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> onCallFinishedEventConsumer) {
            registerConsumer(BulkheadOnCallFinishedEvent.class, onCallFinishedEventConsumer);
            return this;
        }

        @Override
        public void consumeEvent(BulkheadEvent event) {
            super.processEvent(event);
        }
    }
  • BulkheadEventProcessor inherits EventProcessor and implements EventPublisher and EventConsumer interfaces.

Summary

  • Bulkhead of resilience4j is essentially the control of concurrent calls to methods. SemaphoreBulkhead uses the realization of semaphores, the size of which is maxconcurrent semaphores. The semaphores are acquired before execution, and if exceptions cannot be thrown, the semaphores are released after execution.
  • Bulkhead defines a series of static methods beginning with decorate to package callable, runnable, supplier, etc., and implant the logic of acquiring and releasing semaphores.

doc