Talk about RunOnce of Elasticsearch.

  elasticsearch

Order

This article mainly studies the RunOnce of Elasticsearch.

RunOnce

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/RunOnce.java

public class RunOnce implements Runnable {

    private final Runnable delegate;
    private final AtomicBoolean hasRun;

    public RunOnce(final Runnable delegate) {
        this.delegate = Objects.requireNonNull(delegate);
        this.hasRun = new AtomicBoolean(false);
    }

    @Override
    public void run() {
        if (hasRun.compareAndSet(false, true)) {
            delegate.run();
        }
    }

    /**
     * {@code true} if the {@link RunOnce} has been executed once.
     */
    public boolean hasRun() {
        return hasRun.get();
    }
}
  • RunOnce implements the Runnable interface. Its constructor requires input of Runnable and constructs hasRun variable at the same time. The run method will first use compareAndSet to set hasRun from false to true, and if successful, execute the agent’s Runnable run method; The hasRun method returns the hasRun value.

Example

elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/RunOnceTests.java

public class RunOnceTests extends ESTestCase {

    public void testRunOnce() {
        final AtomicInteger counter = new AtomicInteger(0);
        final RunOnce runOnce = new RunOnce(counter::incrementAndGet);
        assertFalse(runOnce.hasRun());

        runOnce.run();
        assertTrue(runOnce.hasRun());
        assertEquals(1, counter.get());

        runOnce.run();
        assertTrue(runOnce.hasRun());
        assertEquals(1, counter.get());
    }

    public void testRunOnceConcurrently() throws InterruptedException {
        final AtomicInteger counter = new AtomicInteger(0);
        final RunOnce runOnce = new RunOnce(counter::incrementAndGet);

        final Thread[] threads = new Thread[between(3, 10)];
        final CountDownLatch latch = new CountDownLatch(1);
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                runOnce.run();
            });
            threads[i].start();
        }

        latch.countDown();
        for (Thread thread : threads) {
            thread.join();
        }
        assertTrue(runOnce.hasRun());
        assertEquals(1, counter.get());
    }

    public void testRunOnceWithAbstractRunnable() {
        final AtomicInteger onRun = new AtomicInteger(0);
        final AtomicInteger onFailure = new AtomicInteger(0);
        final AtomicInteger onAfter = new AtomicInteger(0);

        final RunOnce runOnce = new RunOnce(new AbstractRunnable() {
            @Override
            protected void doRun() throws Exception {
                onRun.incrementAndGet();
                throw new RuntimeException("failure");
            }

            @Override
            public void onFailure(Exception e) {
                onFailure.incrementAndGet();
            }

            @Override
            public void onAfter() {
                onAfter.incrementAndGet();
            }
        });

        final int iterations = randomIntBetween(1, 10);
        for (int i = 0; i < iterations; i++) {
            runOnce.run();
            assertEquals(1, onRun.get());
            assertEquals(1, onFailure.get());
            assertEquals(1, onAfter.get());
            assertTrue(runOnce.hasRun());
        }
    }
}
  • The testRunOnce method verified the scenario in which runOnce was executed multiple times in sequence; The testrunOnceConcurrently method verifies the scenario of executing RunOnce multiple times concurrently. TestNonceWithAbstractrunnable verified the scenario using AbstractRunnable as Runnable.

Summary

RunOnce implements the Runnable interface. Its constructor requires input of Runnable and constructs hasRun variable at the same time. The run method will first use compareAndSet to set hasRun from false to true, and if successful, execute the agent’s Runnable run method; The hasRun method returns the hasRun value.

doc