Design of High Performance SPSC Lockless Queue

  java

Order

This article has sorted outSingle Producer/Consumer lock free Queue step by stepIn this article, several principles for the use of high-performance SPSC lock-free queues are discussed:

  • Single write principle

  • Use lazySet instead of volatile set

  • Substituting Bit Operation for Modular Operation

  • Avoid false sharing

  • Reduce cache coherency conflicts

1.Single Writer Principle(Single write principle)

If only one thread writes to a resource, it is actually easier than you think. This scheme is feasible and does not require CPU waste to manage resource contention or context switching. Of course, if there are multiple threads reading the same data. The CPU can broadcast a copy of read-only data to other cores through a cache-consistent subsystem. Although this has cost, its scale is very good.
If multiple threads write the same resource at the same time, there must be contention. Blocking methods such as locks or optimistic locking are needed. Non-blocking single-thread writing is faster than multi-thread writing, which can achieve high throughput and low latency, especially in the case of multi-core, one CPU core per thread, greatly increasing the probability that other CPU cores run other threads in parallel.

Method Time (ms)
One Thread 300
One Thread with Memory Barrier 4,700
One Thread with CAS 5,700
Two Threads with CAS 18,000
One Thread with Lock 10,000
Two Threads with Lock 118,000

Disruptor separates attention and truly realizes the principle of single writing. (Disruptor is characterized by changing multi-threaded producers into single-threaded consumers through Ringbuffer and writing to shared resources through single-threaded consumers)
Currently Node.js, Erlang, Actor mode and SEDA all adopt single write solutions, but most of them are implemented under queue-based environment, which breaks the principle of single write.

2. replace volatile set with lazySet

  • LazySet uses the Unsafe.putOrderedObject method and will be preceded by a store-store barrier (either no-op or very light under the current hardware system) instead of a store-load barrier.

  • Store-load barrier is slow and is always used for volatile writes. In operation sequence Store1; StoreStore; In Store2, Store1 data will be visible to other processors before Store2 and subsequent writes. In other words, the order of writing visible to other data is ensured.

  • If only one thread writes, we don’t need store-load barrier, lazySet and volatile set are equivalent under the single write principle.

  • This performance improvement comes at a price. Although it is cheap, that is, the result after writing will not be seen by other threads, even one’s own thread, which is usually seen by other threads after a few nanoseconds. lazySet’s writing is delayed by nanoseconds in practice, which is relatively short, so the cost is tolerable.

  • Similar to Unsafe.putOrderedObject and unsafe.putordedlong and other methods, unsafe.putordedlong is about three times faster than using volatile long.

3. Use bit operation instead of modulo operation

For example, this paragraph

public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail;
        final long wrapPoint = currentTail - buffer.length;
        if (head <= wrapPoint) {
            return false;
        }

        buffer[(int) (currentTail % buffer.length)] = e;
        tail = currentTail + 1;

        return true;
    }

After using bit operations

mask = capacity - 1;
public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail.get();
        final long wrapPoint = currentTail - buffer.length;
        if (head.get() <= wrapPoint) {
            return false;
        }

        buffer[(int) currentTail & mask] = e;
        tail.lazySet(currentTail + 1);

        return true;
    }

Performance comparison

X% 8 == x&(8-1) but bit operation is faster

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 20, time = 3, timeUnit = TimeUnit.SECONDS)
@Fork(1)
@State(Scope.Benchmark)
public class ModuloMaskTest {

    private static final int LENGTH = 16;
    int[] ints = new int[LENGTH];
    int mask = LENGTH - 1;
    int someIndex = 5;

    @Benchmark
    public int moduloLengthNoMask() {
        return someIndex % ints.length;
    }

    @Benchmark
    public int moduloLengthMask() {
        return someIndex & (ints.length - 1);
    }

    @Benchmark
    public int moduloConstantLengthNoMask() {
        return someIndex % LENGTH;
    }

    @Benchmark
    public int moduloMask() {
        return someIndex & mask;
    }

    @Benchmark
    public int consume() {
        return someIndex;
    }
    @Benchmark
    public void noop() {
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(".*" +ModuloMaskTest.class.getSimpleName()+ ".*")
                .forks(1)
                .build();
        new Runner(opt).run();
    }
}

The results are as follows:

# Run complete. Total time: 00:07:34

Benchmark                                  Mode  Cnt  Score   Error  Units
ModuloMaskTest.consume                     avgt   20  3.099 ± 0.152  ns/op
ModuloMaskTest.moduloConstantLengthNoMask  avgt   20  3.430 ± 0.509  ns/op
ModuloMaskTest.moduloLengthMask            avgt   20  3.505 ± 0.058  ns/op
ModuloMaskTest.moduloLengthNoMask          avgt   20  6.490 ± 0.143  ns/op
ModuloMaskTest.moduloMask                  avgt   20  3.304 ± 0.159  ns/op
ModuloMaskTest.noop                        avgt   20  0.404 ± 0.010  ns/op

It can be found that the worst performance of% operation is 6.x nanoseconds, & operation is basically about 3ns

4. Avoid false sharing

L1 L2 L3 cache

When the CPU performs operations, it first goes to L1 to find the required data, then goes to L2, then L3, and finally if none of these caches are available, the required data will go to main memory. The further you go, the longer the computation takes. So if you are doing something very frequently, you have to make sure the data is in L1 cache.

From CPU to Approximately required CPU cycles About the time needed
Main memory About 60-80ns
QPI bus transfers (between sockets, not drawn) About 20ns
L3 cache About 40-45 cycles About 15ns
L2 cache About 10 cycles About 3ns
L1 cache About 3-4 cycles About 1ns
Register 1 cycle

It can be seen that the CPU will read the data in main memory nearly 2 orders of magnitude slower than reading from L1.

Definition

Cache is made up of many cachelines. Each cache line is usually 64 bytes, and it effectively refers to a block of addresses in main memory. A Java long variable is 8 bytes, so 8 long variables can be stored in a cache line.
Every time the CPU pulls data from main memory, it will also store the adjacent data into the same cache line.
When accessing a long array, if one value in the array is loaded into the cache, it will automatically load the other 7. So you can traverse this array very quickly. In fact, you can traverse any data structure allocated in consecutive memory blocks very quickly. This phenomenon that cache line characteristics cannot be fully used is called pseudo-sharing.

图片描述

When multiple threads modify mutually independent variables, if these variables share the same cache line, they will inadvertently affect each other’s performance, which is pseudo-sharing. Write contention on cache lines is the most important limiting factor for the scalability of parallel threads running in SMP systems. Some people describe pseudo-sharing as a silent performance killer.

图片描述

Fig. 1 illustrates the problem of pseudo-sharing. Threads running on core 1 want to update variable x, while threads on core 2 want to update variable y. Unfortunately, these two variables are in the same cache line. Each thread has to compete for ownership of cache lines to update variables. If core 1 acquires ownership, the cache subsystem will invalidate the corresponding cache line in core 2. When core 2 acquires ownership and then performs an update operation, core 1 invalidates its corresponding cache line. This will go back and forth through L3 cache, greatly affecting performance. If competing cores are located in different slots, additional cross-slot connections are required, and the problem may be even more serious.

Solve

For pseudo-sharing, the general solution is to increase the interval of array elements so that elements accessed by different threads are located on different cache lines, and to exchange space for time. In jdk1.8, there is a special annotation @Contended to avoid false sharing and solve the problem more gracefully.

@Contended
public class VolatileLong {
    public volatile long value = 0L;
}

public class FalseSharingJdk8 implements Runnable {
    public static int NUM_THREADS = 4; // change
    public final static long ITERATIONS = 500L * 1000L * 1000L;
    private final int arrayIndex;
    private static VolatileLong[] longs;

    public FalseSharingJdk8(final int arrayIndex) {
        this.arrayIndex = arrayIndex;
    }

    /**
     * -XX:-RestrictContended
     * –XX:+PrintFieldLayout  --- 只是在调试版jdk有效
     * @param args
     * @throws Exception
     */
    public static void main(final String[] args) throws Exception {
        Thread.sleep(10000);
        System.out.println("starting....");
        if (args.length == 1) {
            NUM_THREADS = Integer.parseInt(args[0]);
        }

        longs = new VolatileLong[NUM_THREADS];
        for (int i = 0; i < longs.length; i++) {
            longs[i] = new VolatileLong();
        }
        final long start = System.nanoTime();
        runTest();
        System.out.println("duration = " + (System.nanoTime() - start));
    }

    private static void runTest() throws InterruptedException {
        Thread[] threads = new Thread[NUM_THREADS];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new FalseSharingJdk8(i));
        }
        for (Thread t : threads) {
            t.start();
        }
        for (Thread t : threads) {
            t.join();
        }
    }

    public void run() {
        long i = ITERATIONS + 1;
        while (0 != --i) {
            longs[arrayIndex].value = i;
        }
    }
}

If you don’t use annotations, you need to fill them yourself.

public final static class ValuePadding {
        protected long p1, p2, p3, p4, p5, p6, p7;
        protected volatile long value = 0L;
        protected long p9, p10, p11, p12, p13, p14;
        protected long p15;
    }

5. Reduce cache consistency conflicts

As long as the system has only one CPU core working, everything will be fine. If there are multiple cores and each core has its own cache, then we have a problem: what happens if the corresponding memory content in one CPU cache segment is secretly changed by another CPU?
The cache consistency protocol is designed to solve this problem. It makes the contents of multiple groups of caches consistent, that is, uses multiple groups of caches, but makes them behave as if there is only one group of caches.

    private final AtomicLong tail = new AtomicLong(0);
    private final AtomicLong head = new AtomicLong(0);

    public static class PaddedLong {
        public long value = 0, p1, p2, p3, p4, p5, p6;
    }

    private final PaddedLong tailCache = new PaddedLong();
    private final PaddedLong headCache = new PaddedLong();

    public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail.get();
        final long wrapPoint = currentTail - capacity;
        if (headCache.value <= wrapPoint) {
            headCache.value = head.get();
            if (headCache.value <= wrapPoint) {
                return false;
            }
        }

        buffer[(int) currentTail & mask] = e;
        tail.lazySet(currentTail + 1);

        return true;
    }

    public E poll() {
        final long currentHead = head.get();
        if (currentHead >= tailCache.value) {
            tailCache.value = tail.get();
            if (currentHead >= tailCache.value) {
                return null;
            }
        }

        final int index = (int) currentHead & mask;
        final E e = buffer[index];
        buffer[index] = null;
        head.lazySet(currentHead + 1);

        return e;
    }

Compare versions without cache

private final AtomicLong tail = new AtomicLong(0);
private final AtomicLong head = new AtomicLong(0);

public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail.get();
        final long wrapPoint = currentTail - buffer.length;
        if (head.get() <= wrapPoint) {
            return false;
        }

        buffer[(int) currentTail & mask] = e;
        tail.lazySet(currentTail + 1);

        return true;
    }

    public E poll() {
        final long currentHead = head.get();
        if (currentHead >= tail.get()) {
            return null;
        }

        final int index = (int) currentHead & mask;
        final E e = buffer[index];
        buffer[index] = null;
        head.lazySet(currentHead + 1);

        return e;
    }

correlation data

0 - ops/sec=56,689,539 - OneToOneConcurrentArrayQueue2 result=777
1 - ops/sec=33,578,974 - OneToOneConcurrentArrayQueue2 result=777
2 - ops/sec=54,105,692 - OneToOneConcurrentArrayQueue2 result=777
3 - ops/sec=84,290,815 - OneToOneConcurrentArrayQueue2 result=777
4 - ops/sec=79,851,727 - OneToOneConcurrentArrayQueue2 result=777
-----
0 - ops/sec=110,506,679 - OneToOneConcurrentArrayQueue3 result=777
1 - ops/sec=117,252,276 - OneToOneConcurrentArrayQueue3 result=777
2 - ops/sec=115,639,936 - OneToOneConcurrentArrayQueue3 result=777
3 - ops/sec=116,555,884 - OneToOneConcurrentArrayQueue3 result=777
4 - ops/sec=115,712,336 - OneToOneConcurrentArrayQueue3 result=777

On the whole, there has been some improvement.

doc