SynchronousQueue usage instance

  java

Order

This article mainly talks about SynchronousQueue.

Definition

SynchronousQueue, in fact, is not a real queue because it does not maintain storage space for the elements in the queue. Unlike other queues, it maintains a set of threads that are waiting to add or remove elements from the queue.

If the analogy of washing dishes is taken as an example, it is equivalent to not having a dish rack, but putting the washed dishes directly into the next idle dryer. This way of implementing queues may seem strange, but because work can be delivered directly, the delay in moving data from producers to consumers is reduced. (In the traditional queue, before a work unit can be delivered, operations such as [Enqueue] or [Dequeue] must be completed in series. )

The direct delivery method will also feed back more information about the task status to the producers. When the delivery is accepted, it knows that the consumer has already received the task, instead of simply putting the task in a queue-the difference is like handing the file directly to a colleague, or putting the file in her mailbox and hoping that she can get the file as soon as possible.

Because SynchronousQueue has no storage capability, put and take will block until another thread is ready to participate in the delivery process. Synchronization queues are only suitable when there are enough consumers and there is always one consumer ready to take delivery of the job.

Example

public class SynchronousQueueExample {

    static class SynchronousQueueProducer implements Runnable {

        protected BlockingQueue<String> blockingQueue;
        final Random random = new Random();

        public SynchronousQueueProducer(BlockingQueue<String> queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = UUID.randomUUID().toString();
                    System.out.println("Put: " + data);
                    blockingQueue.put(data);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    static class SynchronousQueueConsumer implements Runnable {

        protected BlockingQueue<String> blockingQueue;

        public SynchronousQueueConsumer(BlockingQueue<String> queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName()
                            + " take(): " + data);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public static void main(String[] args) {
        final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>();

        SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
                synchronousQueue);
        new Thread(queueProducer).start();

        SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer1).start();

        SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer2).start();

    }
}

The thread inserting the data and the thread acquiring the data execute alternately.

Application scenario

Executors.newCachedThreadPool()

 /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

Since the non-blocking enqueue method (offer method) of the work queue (the implementation class of the BlockingQueue interface) is called when the task submission is implemented in the ThreadPoolExecutor, under the premise of using SynchronousQueue as the work queue, when the client code submits the task to the thread pool, However, if there is no idle thread in the thread pool that can fetch a task from the SynchronousQueue instance, the corresponding offer method call will fail (i.e. the task is not stored in the work queue). At this time, ThreadPoolExecutor will create a new worker thread to process this queued failed task (assuming that the thread pool size has not reached its maximum thread pool size at this time).

Therefore, using SynchronousQueue as the work queue, the work queue itself does not limit the number of tasks to be executed. However, at this time, it is necessary to limit the maximum size of the thread pool to a reasonable limited value instead of Integer.MAX_VALUE, otherwise, the number of worker threads in the thread pool may increase until the system resources cannot afford it.

If the application really needs a relatively large work queue capacity and wants to avoid problems that unbounded work queues may cause, consider SynchronousQueue. Cache space is not used on SynchronousQueue implementations.

The purpose of using SynchronousQueue is to ensure that “for submitted tasks, if there is an idle thread, the idle thread is used for processing; Otherwise, create a new thread to handle the task. “

doc