Talk about SequenceProducerImpl of rocketmq

  mq

Order

This article mainly studies rocketmq’s SequenceProducerImpl

SequenceProducerImpl

io/openmessaging/rocketmq/producer/SequenceProducerImpl.java

public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {

    private BlockingQueue<Message> msgCacheQueue;

    public SequenceProducerImpl(final KeyValue properties) {
        super(properties);
        this.msgCacheQueue = new LinkedBlockingQueue<>();
    }

    @Override
    public KeyValue properties() {
        return properties;
    }

    @Override
    public void send(final Message message) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message);
        try {
            Validators.checkMessage(rmqMessage, this.rocketmqProducer);
        } catch (MQClientException e) {
            throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
        }
        msgCacheQueue.add(message);
    }

    @Override
    public void send(final Message message, final KeyValue properties) {
        send(message);
    }

    @Override
    public synchronized void commit() {
        List<Message> messages = new ArrayList<>();
        msgCacheQueue.drainTo(messages);

        List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>();

        for (Message message : messages) {
            rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
        }

        if (rmqMessages.size() == 0) {
            return;
        }

        try {
            SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
            String[] msgIdArray = sendResult.getMsgId().split(",");
            for (int i = 0; i < messages.size(); i++) {
                Message message = messages.get(i);
                message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
            }
        } catch (Exception e) {
            throw checkProducerException("", "", e);
        }
    }

    @Override
    public synchronized void rollback() {
        msgCacheQueue.clear();
    }
}
  • The LinkedBlockingQueue is used, and the send method is actually called to add to the queue.
  • In addition, commit and rollback methods are provided, and synchronized is added to ensure thread safety for LinkedBlockingQueue operation.
  • During commit, the queue’s data is dragged to list and then sent in batch. Empty the entire LinkedBlockingQueue when rolling back.

Summary

Rocketmq’s SequenceProducerImpl is not a real method when sending the method, but is added to the queue. it is only send in batch when committing, and the queue is emptied when rollback. The semantics of the send method here is not very good. It can be changed to a name such as pending.

doc