Let’s talk about AtomicArray of Elasticsearch.

  elasticsearch

Order

This article mainly studies AtomicArray of Elasticsearch.

AtomicArray

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java

public class AtomicArray<E> {
    private final AtomicReferenceArray<E> array;
    private volatile List<E> nonNullList;

    public AtomicArray(int size) {
        array = new AtomicReferenceArray<>(size);
    }

    /**
     * The size of the expected results, including potential null values.
     */
    public int length() {
        return array.length();
    }

    /**
     * Sets the element at position {@code i} to the given value.
     *
     * @param i     the index
     * @param value the new value
     */
    public void set(int i, E value) {
        array.set(i, value);
        if (nonNullList != null) { // read first, lighter, and most times it will be null...
            nonNullList = null;
        }
    }

    public final void setOnce(int i, E value) {
        if (array.compareAndSet(i, null, value) == false) {
            throw new IllegalStateException("index [" + i + "] has already been set");
        }
        if (nonNullList != null) { // read first, lighter, and most times it will be null...
            nonNullList = null;
        }
    }

    /**
     * Gets the current value at position {@code i}.
     *
     * @param i the index
     * @return the current value
     */
    public E get(int i) {
        return array.get(i);
    }

    /**
     * Returns the it as a non null list.
     */
    public List<E> asList() {
        if (nonNullList == null) {
            if (array == null || array.length() == 0) {
                nonNullList = Collections.emptyList();
            } else {
                List<E> list = new ArrayList<>(array.length());
                for (int i = 0; i < array.length(); i++) {
                    E e = array.get(i);
                    if (e != null) {
                        list.add(e);
                    }
                }
                nonNullList = list;
            }
        }
        return nonNullList;
    }

    /**
     * Copies the content of the underlying atomic array to a normal one.
     */
    public E[] toArray(E[] a) {
        if (a.length != array.length()) {
            throw new ElasticsearchGenerationException("AtomicArrays can only be copied to arrays of the same size");
        }
        for (int i = 0; i < array.length(); i++) {
            a[i] = array.get(i);
        }
        return a;
    }
}
  • AtomicArray encapsulates AtomicReferenceArray, defines nonNullList, and provides asList method to convert to ArrayList; ; The setOnce method is implemented by using the compareAndSet method of AtomicReferenceArray. In addition, both set and setOnce will determine whether the nonNullList is null or not and reset it to null if it is not null.

GroupedActionListener

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java

public final class GroupedActionListener<T> implements ActionListener<T> {
    private final CountDown countDown;
    private final AtomicInteger pos = new AtomicInteger();
    private final AtomicArray<T> results;
    private final ActionListener<Collection<T>> delegate;
    private final Collection<T> defaults;
    private final AtomicReference<Exception> failure = new AtomicReference<>();

    /**
     * Creates a new listener
     * @param delegate the delegate listener
     * @param groupSize the group size
     */
    public GroupedActionListener(ActionListener<Collection<T>> delegate, int groupSize,
                                 Collection<T> defaults) {
        results = new AtomicArray<>(groupSize);
        countDown = new CountDown(groupSize);
        this.delegate = delegate;
        this.defaults = defaults;
    }

    @Override
    public void onResponse(T element) {
        results.setOnce(pos.incrementAndGet() - 1, element);
        if (countDown.countDown()) {
            if (failure.get() != null) {
                delegate.onFailure(failure.get());
            } else {
                List<T> collect = this.results.asList();
                collect.addAll(defaults);
                delegate.onResponse(Collections.unmodifiableList(collect));
            }
        }
    }

    @Override
    public void onFailure(Exception e) {
        if (failure.compareAndSet(null, e) == false) {
            failure.accumulateAndGet(e, (previous, current) -> {
                previous.addSuppressed(current);
                return previous;
            });
        }
        if (countDown.countDown()) {
            delegate.onFailure(failure.get());
        }
    }
}
  • GroupedActionListener’s constructor creates AtomicArray and CountDown based on groupSize
  • The onResponse method will call AtomicArray’s setOnce method to set the result, then judge whether countDown is complete, if so, whether there is failure, if there is, call delegate.onFailure, if there is no failure, call AtomicArray’s aslist method to obtain the result in the form of List, and finally call delegate.onResponse
  • The onFailure method will update failure. if the compareAndSet fails, it will use accumulateAndGet to update it. then it will determine whether countDown is complete or not, and call back delegate.onFailure if it is complete.

CountDown

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/CountDown.java

public final class CountDown {

    private final AtomicInteger countDown;
    private final int originalCount;

    public CountDown(int count) {
        if (count < 0) {
            throw new IllegalArgumentException("count must be greater or equal to 0 but was: " + count);
        }
        this.originalCount = count;
        this.countDown = new AtomicInteger(count);
    }

    /**
     * Decrements the count-down and returns <code>true</code> iff this call
     * reached zero otherwise <code>false</code>
     */
    public boolean countDown() {
        assert originalCount > 0;
        for (;;) {
            final int current = countDown.get();
            assert current >= 0;
            if (current == 0) {
                return false;
            }
            if (countDown.compareAndSet(current, current - 1)) {
                return current == 1;
            }
        }
    }

    /**
     * Fast forwards the count-down to zero and returns <code>true</code> iff
     * the count down reached zero with this fast forward call otherwise
     * <code>false</code>
     */
    public boolean fastForward() {
        assert originalCount > 0;
        assert countDown.get() >= 0;
        return countDown.getAndSet(0) > 0;
    }
    
    /**
     * Returns <code>true</code> iff the count-down has reached zero. Otherwise <code>false</code>
     */
    public boolean isCountedDown() {
        assert countDown.get() >= 0;
        return countDown.get() == 0;
    }
}
  • CountDown is a simple thread-safe non-blocking version of CountDownLatch, which provides the countDown method to use compareAndSet to decrement the value while returning whether countDown is complete (countDown.get() == 0); In addition, isCountedDown is also provided to query whether countDown is complete. There is also the fastForward method for setting countDown directly to 0

Summary

  • AtomicArray encapsulates AtomicReferenceArray, defines nonNullList, and provides asList method to convert to ArrayList; ; The setOnce method is implemented by using the compareAndSet method of AtomicReferenceArray. In addition, both set and setOnce will determine whether the nonNullList is null or not and reset it to null if it is not null.
  • GroupedActionListener’s constructor created AtomicArray and CountDown; based on groupSize; The onResponse method will call AtomicArray’s setOnce method to set the result, and then judge whether countDown has been completed. If it is completed, judge whether there is failure, call delegate.onFailure, if there is no failure, call AtomicArray’s aslist method to obtain the result in the form of List, and finally call delegate.onResponse; ; The onFailure method will update failure. if the compareAndSet fails, it will use accumulateAndGet to update it. then it will determine whether countDown is complete or not, and call back delegate.onFailure if it is complete.
  • CountDown is a simple thread-safe non-blocking version of CountDownLatch, which provides the countDown method to use compareAndSet to decrement the value while returning whether countDown is complete (countDown.get() == 0); In addition, isCountedDown is also provided to query whether countDown is complete. There is also the fastForward method for setting countDown directly to 0

doc