Talk about ReleasableLock of Elasticsearch.

  elasticsearch

Order

This article mainly studies the ReleasableLock of Elasticsearch.

ReleasableLock

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java

public class ReleasableLock implements Releasable {
    private final Lock lock;


    // a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled
    private final ThreadLocal<Integer> holdingThreads;

    public ReleasableLock(Lock lock) {
        this.lock = lock;
        if (Assertions.ENABLED) {
            holdingThreads = new ThreadLocal<>();
        } else {
            holdingThreads = null;
        }
    }

    @Override
    public void close() {
        lock.unlock();
        assert removeCurrentThread();
    }


    public ReleasableLock acquire() throws EngineException {
        lock.lock();
        assert addCurrentThread();
        return this;
    }

    private boolean addCurrentThread() {
        final Integer current = holdingThreads.get();
        holdingThreads.set(current == null ? 1 : current + 1);
        return true;
    }

    private boolean removeCurrentThread() {
        final Integer count = holdingThreads.get();
        assert count != null && count > 0;
        if (count == 1) {
            holdingThreads.remove();
        } else {
            holdingThreads.set(count - 1);
        }
        return true;
    }

    public boolean isHeldByCurrentThread() {
        if (holdingThreads == null) {
            throw new UnsupportedOperationException("asserts must be enabled");
        }
        final Integer count = holdingThreads.get();
        return count != null && count > 0;
    }
}
  • ReleasableLock implements the Releasable interface (Close method); Its constructor requires input of Lock parameter, and holdingThreads; will be initialized only when assertions is turned on. The isHeldByCurrentThread method determines whether the calling thread is using lock
  • The acquire method first calls the lock method of lock, and then asserts the addCurrentThread method using assert, which increases the number of times the calling thread is using lock
  • The close method first calls the unlock method of lock, and then asserts the removeCurrentThread method with assert, which reduces the number of times the calling thread is using lock.

ReleasableLockTests

elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java

public class ReleasableLockTests extends ESTestCase {

    /**
     * Test that accounting on whether or not a thread holds a releasable lock is correct. Previously we had a bug where on a re-entrant
     * lock that if a thread entered the lock twice we would declare that it does not hold the lock after it exits its first entrance but
     * not its second entrance.
     *
     * @throws BrokenBarrierException if awaiting on the synchronization barrier breaks
     * @throws InterruptedException   if awaiting on the synchronization barrier is interrupted
     */
    public void testIsHeldByCurrentThread() throws BrokenBarrierException, InterruptedException {
        final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        final ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
        final ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());

        final int numberOfThreads = scaledRandomIntBetween(1, 32);
        final int iterations = scaledRandomIntBetween(1, 32);
        final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
        final List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < numberOfThreads; i++) {
            final Thread thread = new Thread(() -> {
                try {
                    barrier.await();
                } catch (final BrokenBarrierException | InterruptedException e) {
                    throw new RuntimeException(e);
                }
                for (int j = 0; j < iterations; j++) {
                    if (randomBoolean()) {
                        acquire(readLock, writeLock);
                    } else {
                        acquire(writeLock, readLock);
                    }
                }
                try {
                    barrier.await();
                } catch (final BrokenBarrierException | InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
            threads.add(thread);
            thread.start();
        }

        barrier.await();
        barrier.await();
        for (final Thread thread : threads) {
            thread.join();
        }
    }

    private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) {
        try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) {
            assertTrue(lockToAcquire.isHeldByCurrentThread());
            assertFalse(otherLock.isHeldByCurrentThread());
            try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) {
                assertTrue(lockToAcquire.isHeldByCurrentThread());
                assertFalse(otherLock.isHeldByCurrentThread());
            }
            // previously there was a bug here and this would return false
            assertTrue(lockToAcquire.isHeldByCurrentThread());
            assertFalse(otherLock.isHeldByCurrentThread());
        }
        assertFalse(lockToAcquire.isHeldByCurrentThread());
        assertFalse(otherLock.isHeldByCurrentThread());
    }

}
  • ReleasableLockTests uses multithreading to randomly execute acquire, which asserts that lockToAcquire is held by the current thread and otherLock is not held by the current thread.

Cache.CacheSegment

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/cache/Cache.java

    private static class CacheSegment<K, V> {
        // read/write lock protecting mutations to the segment
        ReadWriteLock segmentLock = new ReentrantReadWriteLock();

        ReleasableLock readLock = new ReleasableLock(segmentLock.readLock());
        ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock());

        Map<K, CompletableFuture<Entry<K, V>>> map = new HashMap<>();

        SegmentStats segmentStats = new SegmentStats();

        /**
         * get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is
         * pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using the provided callback
         *
         * @param key       the key of the entry to get from the cache
         * @param now       the access time of this entry
         * @param isExpired test if the entry is expired
         * @param onExpiration a callback if the entry associated to the key is expired
         * @return the entry if there was one, otherwise null
         */
        Entry<K, V> get(K key, long now, Predicate<Entry<K, V>> isExpired, Consumer<Entry<K, V>> onExpiration) {
            CompletableFuture<Entry<K, V>> future;
            try (ReleasableLock ignored = readLock.acquire()) {
                future = map.get(key);
            }
            if (future != null) {
                Entry<K, V> entry;
                try {
                    entry = future.get();
                } catch (ExecutionException e) {
                    assert future.isCompletedExceptionally();
                    segmentStats.miss();
                    return null;
                } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
                if (isExpired.test(entry)) {
                    segmentStats.miss();
                    onExpiration.accept(entry);
                    return null;
                } else {
                    segmentStats.hit();
                    entry.accessTime = now;
                    return entry;
                }
            } else {
                segmentStats.miss();
                return null;
            }
        }

        /**
         * put an entry into the segment
         *
         * @param key   the key of the entry to add to the cache
         * @param value the value of the entry to add to the cache
         * @param now   the access time of this entry
         * @return a tuple of the new entry and the existing entry, if there was one otherwise null
         */
        Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {
            Entry<K, V> entry = new Entry<>(key, value, now);
            Entry<K, V> existing = null;
            try (ReleasableLock ignored = writeLock.acquire()) {
                try {
                    CompletableFuture<Entry<K, V>> future = map.put(key, CompletableFuture.completedFuture(entry));
                    if (future != null) {
                        existing = future.handle((ok, ex) -> {
                            if (ok != null) {
                                return ok;
                            } else {
                                return null;
                            }
                        }).get();
                    }
                } catch (ExecutionException | InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            }
            return Tuple.tuple(entry, existing);
        }

        /**
         * remove an entry from the segment
         *
         * @param key       the key of the entry to remove from the cache
         * @param onRemoval a callback for the removed entry
         */
        void remove(K key, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
            CompletableFuture<Entry<K, V>> future;
            try (ReleasableLock ignored = writeLock.acquire()) {
                future = map.remove(key);
            }
            if (future != null) {
                segmentStats.eviction();
                onRemoval.accept(future);
            }
        }

        /**
         * remove an entry from the segment iff the future is done and the value is equal to the
         * expected value
         *
         * @param key the key of the entry to remove from the cache
         * @param value the value expected to be associated with the key
         * @param onRemoval a callback for the removed entry
         */
        void remove(K key, V value, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
            CompletableFuture<Entry<K, V>> future;
            boolean removed = false;
            try (ReleasableLock ignored = writeLock.acquire()) {
                future = map.get(key);
                try {
                    if (future != null) {
                        if (future.isDone()) {
                            Entry<K, V> entry = future.get();
                            if (Objects.equals(value, entry.value)) {
                                removed = map.remove(key, future);
                            }
                        }
                    }
                } catch (ExecutionException | InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            }

            if (future != null && removed) {
                segmentStats.eviction();
                onRemoval.accept(future);
            }
        }

        private static class SegmentStats {
            private final LongAdder hits = new LongAdder();
            private final LongAdder misses = new LongAdder();
            private final LongAdder evictions = new LongAdder();

            void hit() {
                hits.increment();
            }

            void miss() {
                misses.increment();
            }

            void eviction() {
                evictions.increment();
            }
        }
    }
  • CacheSegment uses readLock and writeLock of ReentrantReadWriteLock to create two ReleasableLock, one readLock and the other writelock; Since ReleasableLock implements the Releasable interface (Close method), which inherits the Java. lang. autoclose interface, so it can directly use try with resources syntax to automatically close, thus releasing the lock.

Summary

  • ReleasableLock implements the Releasable interface (Close method); Its constructor requires input of Lock parameter, and holdingThreads; will be initialized only when assertions is turned on. The isHeldByCurrentThread method determines whether the calling thread is using lock
  • The acquire method first calls the lock method of lock, and then asserts the addCurrentThread method using assert, which increases the number of times the calling thread is using lock
  • The close method first calls the unlock method of lock, and then asserts the removeCurrentThread method with assert, which reduces the number of times the calling thread is using lock.

ReleasableLock implements the Releasable interface (Close method), which inherits the Java. lang. autoclose interface, so it can directly use try with resources syntax to automatically close, thus releasing the lock.

doc