Brief Introduction of lamport Bakery Algorithm

  java

Order

Lamport bakery algorithm is an algorithm to solve the mutual exclusion problem of multiple threads accessing a shared single-user resource concurrently. Invented by leslie lamport.

Algorithm analogy

Lamport likens this concurrency control algorithm to customers purchasing from bakeries very intuitively.

  • The bakery can only receive purchases from one customer at a time.
  • It is known that there are n customers who want to enter the bakery to make purchases and arrange them to register a check-in number at the front desk in order. The check-in number is increased by one at a time.
  • Customers enter the store in order of sign-in number from small to large.
  • Customers who have completed the purchase return their check-in number to 0 at the front desk. If customers who have completed the purchase want to enter the store again, they must queue up again.

In this analogy, the customer is equivalent to a thread, and entering the store to purchase is to enter the critical area to have exclusive access to the shared resource. Due to the characteristics of computer implementation, there are cases where two threads obtain the same check-in number. This is because the two threads apply for the queued check-in number almost at the same time and read the check-in number that has been sent out. The data read by the two threads are exactly the same. Then each thread finds the maximum value on the read data and adds 1 as its own queued check-in number.

For this reason, the algorithm stipulates that if the queue check-in numbers of two threads are equal, the one with the smaller thread id number has priority.

principle

Lamport timestamp principle is as follows:

  • Each event corresponds to a Lamport timestamp with an initial value of 0
  • If an event occurs within a node, the timestamp is incremented by 1
  • If the event is a sending event, the timestamp is incremented by 1 and the timestamp is added to the message
  • If the event is a received event, timestamp = Max (local timestamp, timestamp in message)+1

Five Principles

  • In order to request resources, process a sends a message (Tm:A) to all other processes and places this message in the process queue, Tm being the timestamp of the message
  • When process b receives process a’s (Tm:A) request, it will put it into its request queue and then send a timestamp confirmation message to a
  • In order to release resources, process a removes all (Tm:A) request messages, and then sends a time-stamped a release resource request message to all other processes
  • When process b receives a request from process a to release resources, it will remove any (Tm:A) resource request in the queue
  • Process A is assigned this resource when the following two conditions are met:

    • A) there is one (Tm:A) request, which is ranked first in the queue according to = > relation;
    • B)A received a message from all other processes with a timestamp greater than Tm

Code sample

private void processRevcMsg(Message m) throws InterruptedException {
        // 原理4 如果事件属于接收事件,时间戳 = Max(本地时间戳,消息中的时间戳) + 1
        clock.update(m.getTimestamp());
        lastSendMap.put(m.getFrom(), m);
        switch (m.getMsgType()) {
            case REQUEST_RES:
                // rule 2 当进程B接收到了进程A的(Tm:A)请求后,会把它放到自己的请求队列,然后发送一个带时间戳的确认消息给A
                addMessageToReqMap(m);
                Message ackMsg = new Message(pid, m.getMsgId(), MessageType.REQUEST_ACK, clock.time());
                // send ack to sender
                sendToTargetProcess(ackMsg,m.getFrom());
                break;
            case REQUEST_ACK:
                break;
            case RELEASE_RES:
                // rule 4 当进程B接收到进程A释放资源的请求,它会移除队列中任意的(Tm:A)的资源请求
                dropMessageFromReqMap(m);
                break;
            default:
                break;
        }
        tryToAcquireResource();
    }

    private void tryToAcquireResource() {
        synchronized (reqMap) {
            if(!reqMap.containsKey(pid) || reqMap.get(pid).isEmpty()){
                return ;
            }

            Message myMessage = reqMap.get(pid).get(0);
            int acceptCount = 1;

            // rule 5 当满足以下两个条件时,进程A会被分配该资源:a)有一个(Tm:A)的请求,按照=>关系排在队列第一位;b)A接收到了一个时间戳大于Tm的来自所有其他进程的消息

            // condition (ii) of rule 5
            // A接收到了一个来自所有其他进程的消息,而且时间戳大于Tm
            for (Map.Entry<Integer, Message> entry : lastSendMap.entrySet()) {
                if (entry.getKey() == pid) {
                    continue;
                }
                if (isFirstEarlier(myMessage, entry.getValue())) {
                    acceptCount++;
                }else{
                    return ;
                }
            }
            if (!coordinator.hasAcceptedAll(acceptCount)){
                return;
            }

            // condition (i) of rule 5
            // 有一个Tm:A的请求,按照=>关系排在队列第一位
            for (Map.Entry<Integer, List<Message>> entry : reqMap.entrySet()) {
                if (entry.getKey() != pid && !entry.getValue().isEmpty()) {
                    if (!isFirstEarlier(myMessage, entry.getValue().get(0))) {
                        return;
                    }
                }
            }

            // remove this request message
            final Message firstMsg = reqMap.get(pid).remove(0);
            workingPool.execute(new Runnable() {
                public void run() {
                    coordinator.acquire(firstMsg.getMsgId(), pid, firstMsg.getTimestamp());
                    // emulate owning resources for a long time
                    try {
                        Thread.sleep(50L);
                        // rule 3 为了释放资源,进程A移除所有(Tm:A)的请求消息,然后发送带时间戳的A释放资源请求消息给其他所有的进程程
                        coordinator.release(firstMsg.getMsgId(), pid, firstMsg.getTimestamp());
                        Message releaseMsg = new Message(pid, firstMsg.getMsgId(),MessageType.RELEASE_RES, clock.time());
                        sendToOtherProcesses(releaseMsg);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            });
        }
    }

doc