ConcurrentLinkedQueue usage instance

  java

Order

ConcurrentLinkedQueue is an unbounded thread-safe queue based on linked nodes. It uses a first-in-first-out rule to sort nodes. When we add an element, it is added to the tail of the queue. When we get an element, it returns the element at the head of the queue.

Contrast

queue Blocking or not Bounded or not Thread safety guarantee Applicable scenario Matters needing attention
ArrayBlockingQueue Blocking Bounded A global lock Production and consumption model, balancing processing speed on both sides
LinkedBlockingQueue Blocking Configurable Access uses 2 locks Production and consumption model, balancing processing speed on both sides Pay attention to memory overflow when unbounded
ConcurrentLinkedQueue Non-blocking Unbounded CAS A scene in which a global set is manipulated. Size () is to traverse the set once. Use caution.

Example

Because concurrency is unbounded, special attention should be paid to memory overflow when using it. Akka’s actor model, the default mailbox, is implemented with this.

object UnboundedMailbox {
  class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue {
    final def queue: Queue[Envelope] = this
  }
}

MyUnboundedMailbox

public class MyUnboundedMailbox implements MailboxType,
  ProducesMessageQueue<MyUnboundedMailbox.MyMessageQueue> {

  // This is the MessageQueue implementation
  public static class MyMessageQueue implements MessageQueue,
      MyUnboundedMessageQueueSemantics {
    private final Queue<Envelope> queue =
      new ConcurrentLinkedQueue<Envelope>();

    // these must be implemented; queue used as example
    public void enqueue(ActorRef receiver, Envelope handle) {
      queue.offer(handle);
    }
    public Envelope dequeue() { return queue.poll(); }
    public int numberOfMessages() { return queue.size(); }
    public boolean hasMessages() { return !queue.isEmpty(); }
    public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
      for (Envelope handle: queue) {
        deadLetters.enqueue(owner, handle);
      }
    }
  }

  // This constructor signature must exist, it will be called by Akka
  public MyUnboundedMailbox(ActorSystem.Settings settings, Config config) {
    // put your initialization code here
  }

  // The create method is called to create the MessageQueue
  public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
    return new MyMessageQueue();
  }
}

doc