Talk about EsThreadPoolExecutor of Elasticsearch.

  elasticsearch

Order

This article mainly studies EsThreadPoolExecutor of Elasticsearch.

EsThreadPoolExecutor

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java

public class EsThreadPoolExecutor extends ThreadPoolExecutor {

    private final ThreadContext contextHolder;
    private volatile ShutdownListener listener;

    private final Object monitor = new Object();
    /**
     * Name used in error reporting.
     */
    private final String name;

    final String getName() {
        return name;
    }

    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {
        this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder);
    }

    @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
            ThreadContext contextHolder) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.name = name;
        this.contextHolder = contextHolder;
    }

    public void shutdown(ShutdownListener listener) {
        synchronized (monitor) {
            if (this.listener != null) {
                throw new IllegalStateException("Shutdown was already called on this thread pool");
            }
            if (isTerminated()) {
                listener.onTerminated();
            } else {
                this.listener = listener;
            }
        }
        shutdown();
    }

    @Override
    protected synchronized void terminated() {
        super.terminated();
        synchronized (monitor) {
            if (listener != null) {
                try {
                    listener.onTerminated();
                } finally {
                    listener = null;
                }
            }
        }
    }

    public interface ShutdownListener {
        void onTerminated();
    }

    @Override
    public void execute(Runnable command) {
        command = wrapRunnable(command);
        try {
            super.execute(command);
        } catch (EsRejectedExecutionException ex) {
            if (command instanceof AbstractRunnable) {
                // If we are an abstract runnable we can handle the rejection
                // directly and don't need to rethrow it.
                try {
                    ((AbstractRunnable) command).onRejection(ex);
                } finally {
                    ((AbstractRunnable) command).onAfter();

                }
            } else {
                throw ex;
            }
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        EsExecutors.rethrowErrors(unwrap(r));
        assert assertDefaultContext(r);
    }

    private boolean assertDefaultContext(Runnable r) {
        try {
            assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +
                Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";
        } catch (IllegalStateException ex) {
            // sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks
            // this must not trigger an exception here since we only assert if the default is restored and
            // we don't really care if we are closed
            if (contextHolder.isClosed() == false) {
                throw ex;
            }
        }
        return true;
    }

    /**
     * Returns a stream of all pending tasks. This is similar to {@link #getQueue()} but will expose the originally submitted
     * {@link Runnable} instances rather than potentially wrapped ones.
     */
    public Stream<Runnable> getTasks() {
        return this.getQueue().stream().map(this::unwrap);
    }

    @Override
    public final String toString() {
        StringBuilder b = new StringBuilder();
        b.append(getClass().getSimpleName()).append('[');
        b.append("name = ").append(name).append(", ");
        if (getQueue() instanceof SizeBlockingQueue) {
            @SuppressWarnings("rawtypes")
            SizeBlockingQueue queue = (SizeBlockingQueue) getQueue();
            b.append("queue capacity = ").append(queue.capacity()).append(", ");
        }
        appendThreadPoolExecutorDetails(b);
        /*
         * ThreadPoolExecutor has some nice information in its toString but we
         * can't get at it easily without just getting the toString.
         */
        b.append(super.toString()).append(']');
        return b.toString();
    }

    /**
     * Append details about this thread pool to the specified {@link StringBuilder}. All details should be appended as key/value pairs in
     * the form "%s = %s, "
     *
     * @param sb the {@link StringBuilder} to append to
     */
    protected void appendThreadPoolExecutorDetails(final StringBuilder sb) {

    }

    protected Runnable wrapRunnable(Runnable command) {
        return contextHolder.preserveContext(command);
    }

    protected Runnable unwrap(Runnable runnable) {
        return contextHolder.unwrap(runnable);
    }
}
  • EsThreadPoolExecutor inherits the ThreadPoolExecutor and provides two constructors. they require RejectedExecutionHandler to be of XRejectedExecutionHa ndler type. one of the constructors defaults to EsAbortPolicy, and they also require passing in the ThreadContext
  • It covers the terminated, execute, afterExecute methods, where the terminated method calls back the Listener. OnTerminated (); The execute method captures the EsRejectedExecutionException exception and calls back its onRejection and onAfter methods when command is of type AbstractRunnable; The afterExecute method executes the esexecutors.returnowerrors (unwrap (r)) method.
  • It provides wrapRunnable and unwrap methods, calling contextHolder.preserveContext and contextHolder.unwrap methods respectively.

XRejectedExecutionHandler

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/XRejectedExecutionHandler.java

public interface XRejectedExecutionHandler extends RejectedExecutionHandler {

    /**
     * The number of rejected executions.
     */
    long rejected();
}
  • The XRejectedExecutionHandler interface inherits the RejectedExecutionHandler interface, which defines the number of rejected returned by the rejected method; It has two implementation classes: EsAbortPolicy and ForceQueuePolicy

EsAbortPolicy

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java

public class EsAbortPolicy implements XRejectedExecutionHandler {
    private final CounterMetric rejected = new CounterMetric();

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r instanceof AbstractRunnable) {
            if (((AbstractRunnable) r).isForceExecution()) {
                BlockingQueue<Runnable> queue = executor.getQueue();
                if (!(queue instanceof SizeBlockingQueue)) {
                    throw new IllegalStateException("forced execution, but expected a size queue");
                }
                try {
                    ((SizeBlockingQueue) queue).forcePut(r);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("forced execution, but got interrupted", e);
                }
                return;
            }
        }
        rejected.inc();
        throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
    }

    @Override
    public long rejected() {
        return rejected.count();
    }
}
  • EsAbortPolicy implements XRejectedExecutionHandler interface, which internally uses CounterMetric class to maintain the rejected quantity, while the rejected method directly returns the value; The rejectedExecution method determines whether isForceExecution is implemented for runnable of Abstractrunnable type and is SizeBlockingQueue, then calls the forcePut method of SizeBlockingQueue to force the Runnable again, followed by incrementing the rejected count.

ForceQueuePolicy

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

    static class ForceQueuePolicy implements XRejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                // force queue policy should only be used with a scaling queue
                assert executor.getQueue() instanceof ExecutorScalingQueue;
                executor.getQueue().put(r);
            } catch (final InterruptedException e) {
                // a scaling queue never blocks so a put to it can never be interrupted
                throw new AssertionError(e);
            }
        }

        @Override
        public long rejected() {
            return 0;
        }

    }
  • ForceQueuePolicy implements the XrejectedExecutionHandler interface. Its rejectedExecution method only re-enqueues ExecutorScalingQueue, while the Rejected method returns 0

AbstractRunnable

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRunnable.java

public abstract class AbstractRunnable implements Runnable {

    /**
     * Should the runnable force its execution in case it gets rejected?
     */
    public boolean isForceExecution() {
        return false;
    }

    @Override
    public final void run() {
        try {
            doRun();
        } catch (Exception t) {
            onFailure(t);
        } finally {
            onAfter();
        }
    }

    /**
     * This method is called in a finally block after successful execution
     * or on a rejection.
     */
    public void onAfter() {
        // nothing by default
    }

    /**
     * This method is invoked for all exception thrown by {@link #doRun()}
     */
    public abstract void onFailure(Exception e);

    /**
     * This should be executed if the thread-pool executing this action rejected the execution.
     * The default implementation forwards to {@link #onFailure(Exception)}
     */
    public void onRejection(Exception e) {
        onFailure(e);
    }

    /**
     * This method has the same semantics as {@link Runnable#run()}
     * @throws InterruptedException if the run method throws an InterruptedException
     */
    protected abstract void doRun() throws Exception;
}
  • AbstractRunnable declaration implements the Runnable interface, and its run method will call back the doRun, onFailure, and onAfter methods respectively. In addition, it also defines the isForceExecution method to determine whether force execution occurs when rejected.

Summary

  • EsThreadPoolExecutor inherits the ThreadPoolExecutor and provides two constructors. they require RejectedExecutionHandler to be of XRejectedExecutionHa ndler type. one of the constructors defaults to EsAbortPolicy, and they also require passing in the ThreadContext
  • It covers the terminated, execute, afterExecute methods, where the terminated method calls back the Listener. OnTerminated (); The execute method captures the EsRejectedExecutionException exception and calls back its onRejection and onAfter methods when command is of type AbstractRunnable; The afterExecute method executes the esexecutors.returnowerrors (unwrap (r)) method.
  • The XRejectedExecutionHandler interface inherits the RejectedExecutionHandler interface, which defines the number of rejected returned by the rejected method; It has two implementation classes: EsAbortPolicy and ForceQueuePolicy
  • EsAbortPolicy implements XRejectedExecutionHandler interface, which internally uses CounterMetric class to maintain the rejected quantity, while the rejected method directly returns the value; The rejectedExecution method determines whether isForceExecution is implemented for runnable of Abstractrunnable type and is SizeBlockingQueue, then calls the forcePut method of SizeBlockingQueue to force the Runnable again, followed by incrementing the rejected count.
  • ForceQueuePolicy implements the XrejectedExecutionHandler interface. Its rejectedExecution method only re-enqueues ExecutorScalingQueue, while the Rejected method returns 0
  • AbstractRunnable declaration implements the Runnable interface, and its run method will call back the doRun, onFailure, and onAfter methods respectively. In addition, it also defines the isForceExecution method to determine whether force execution occurs when rejected.

doc