Talk about SizeBlockingQueue of Elasticsearch.

  elasticsearch

Order

This article mainly studies SizeBlockingQueue of Elasticsearch.

SizeBlockingQueue

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java

public class SizeBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {

    private final BlockingQueue<E> queue;
    private final int capacity;

    private final AtomicInteger size = new AtomicInteger();

    public SizeBlockingQueue(BlockingQueue<E> queue, int capacity) {
        assert capacity >= 0;
        this.queue = queue;
        this.capacity = capacity;
    }

    @Override
    public int size() {
        return size.get();
    }

    public int capacity() {
        return this.capacity;
    }

    @Override
    public Iterator<E> iterator() {
        final Iterator<E> it = queue.iterator();
        return new Iterator<E>() {
            E current;

            @Override
            public boolean hasNext() {
                return it.hasNext();
            }

            @Override
            public E next() {
                current = it.next();
                return current;
            }

            @Override
            public void remove() {
                // note, we can't call #remove on the iterator because we need to know
                // if it was removed or not
                if (queue.remove(current)) {
                    size.decrementAndGet();
                }
            }
        };
    }

    @Override
    public E peek() {
        return queue.peek();
    }

    @Override
    public E poll() {
        E e = queue.poll();
        if (e != null) {
            size.decrementAndGet();
        }
        return e;
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = queue.poll(timeout, unit);
        if (e != null) {
            size.decrementAndGet();
        }
        return e;
    }

    @Override
    public boolean remove(Object o) {
        boolean v = queue.remove(o);
        if (v) {
            size.decrementAndGet();
        }
        return v;
    }

    /**
     * Forces adding an element to the queue, without doing size checks.
     */
    public void forcePut(E e) throws InterruptedException {
        size.incrementAndGet();
        try {
            queue.put(e);
        } catch (InterruptedException ie) {
            size.decrementAndGet();
            throw ie;
        }
    }


    @Override
    public boolean offer(E e) {
        while (true) {
            final int current = size.get();
            if (current >= capacity()) {
                return false;
            }
            if (size.compareAndSet(current, 1 + current)) {
                break;
            }
        }
        boolean offered = queue.offer(e);
        if (!offered) {
            size.decrementAndGet();
        }
        return offered;
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        // note, not used in ThreadPoolExecutor
        throw new IllegalStateException("offer with timeout not allowed on size queue");
    }

    @Override
    public void put(E e) throws InterruptedException {
        // note, not used in ThreadPoolExecutor
        throw new IllegalStateException("put not allowed on size queue");
    }

    @Override
    public E take() throws InterruptedException {
        E e;
        try {
            e = queue.take();
            size.decrementAndGet();
        } catch (InterruptedException ie) {
            throw ie;
        }
        return e;
    }

    @Override
    public int remainingCapacity() {
        return capacity() - size.get();
    }

    @Override
    public int drainTo(Collection<? super E> c) {
        int v = queue.drainTo(c);
        size.addAndGet(-v);
        return v;
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements) {
        int v = queue.drainTo(c, maxElements);
        size.addAndGet(-v);
        return v;
    }

    @Override
    public Object[] toArray() {
        return queue.toArray();
    }

    @Override
    public <T> T[] toArray(T[] a) {
        return (T[]) queue.toArray(a);
    }

    @Override
    public boolean contains(Object o) {
        return queue.contains(o);
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return queue.containsAll(c);
    }
}
  • SizeBlockingQueue inherits AbstractQueue and implements BlockingQueue interface at the same time. Its constructor requires input of blockingQueue and capacity parameters
  • SizeBlockingqueue has an AtomicInteger type size parameter to record the size of the Queue. It maintains this SIZE parameter in poll, remove, offer, take and other methods.
  • The offer method will determine whether the current size is greater than or equal to capacity, and if it is greater than or equal to, it will directly return false; ; The put method directly throws IllegalStateException

ResizableBlockingQueue

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java

final class ResizableBlockingQueue<E> extends SizeBlockingQueue<E> {

    private volatile int capacity;

    ResizableBlockingQueue(BlockingQueue<E> queue, int initialCapacity) {
        super(queue, initialCapacity);
        this.capacity = initialCapacity;
    }

    @Override
    public int capacity() {
        return this.capacity;
    }

    @Override
    public int remainingCapacity() {
        return Math.max(0, this.capacity());
    }

    /** Resize the limit for the queue, returning the new size limit */
    public synchronized int adjustCapacity(int optimalCapacity, int adjustmentAmount, int minCapacity, int maxCapacity) {
        assert adjustmentAmount > 0 : "adjustment amount should be a positive value";
        assert optimalCapacity >= 0 : "desired capacity cannot be negative";
        assert minCapacity >= 0 : "cannot have min capacity smaller than 0";
        assert maxCapacity >= minCapacity : "cannot have max capacity smaller than min capacity";

        if (optimalCapacity == capacity) {
            // Yahtzee!
            return this.capacity;
        }

        if (optimalCapacity > capacity + adjustmentAmount) {
            // adjust up
            final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount);
            this.capacity = newCapacity;
            return newCapacity;
        } else if (optimalCapacity < capacity - adjustmentAmount) {
            // adjust down
            final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount);
            this.capacity = newCapacity;
            return newCapacity;
        } else {
            return this.capacity;
        }
    }
}
  • ResizableBlockingQueue inherits SizeBlockingQueue, which provides a thread-safe adjustCapacity method for resize queue capacity.

EsExecutors

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

public class EsExecutors {
    //......

    public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity,
                                                ThreadFactory threadFactory, ThreadContext contextHolder) {
        BlockingQueue<Runnable> queue;
        if (queueCapacity < 0) {
            queue = ConcurrentCollections.newBlockingQueue();
        } else {
            queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
        }
        return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
            queue, threadFactory, new EsAbortPolicy(), contextHolder);
    }

    public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize,
                                                         int maxQueueSize, int frameSize, TimeValue targetedResponseTime,
                                                         ThreadFactory threadFactory, ThreadContext contextHolder) {
        if (initialQueueCapacity <= 0) {
            throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " +
                            initialQueueCapacity);
        }
        ResizableBlockingQueue<Runnable> queue =
                new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);
        return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
                queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory,
                new EsAbortPolicy(), contextHolder);
    }

    //......
}
  • NewFixed of EsExecutors creates EsThreadPoolExecutor using SizeBlockingQueue, while newAutoQueueFixed creates QUEUERESIZINGES THREAD POOLECUTOR using ResizableBlockingQueuede

Summary

  • SizeBlockingQueue inherits AbstractQueue and implements BlockingQueue interface at the same time. Its constructor requires input of blockingQueue and capacity parameters; It has an AtomicInteger type size parameter to record the size of the queue. It maintains this SIZE parameter in poll, remove, offer, take and other methods. The offer method will determine whether the current size is greater than or equal to capacity, and if it is greater than or equal to, it will directly return false; ; The put method directly throws IllegalStateException
  • ResizableBlockingQueue inherits SizeBlockingQueue, which provides a thread-safe adjustCapacity method for resize queue capacity.
  • NewFixed of EsExecutors creates EsThreadPoolExecutor using SizeBlockingQueue, while newAutoQueueFixed creates QUEUERESIZINGES THREAD POOLECUTOR using ResizableBlockingQueuede

doc