Talk about replication

  architecture, distributed


This article mainly talks about the replication methods of mainstream open source products.


Replication and partition/sharding are two necessary capabilities for distributed systems. For details, seeReplication, fragmentation, and routing.
For massive data, replication can increase redundancy and ensure system availability on the one hand, and improve reading efficiency on the other.
This article focuses mainly on replication, that is, assume that each node is sufficient to store the entire replica.

replication type

According to the presence or absence of leaders and the number of leaders, it can be divided into:

  • single leader replication
    That is, a master-slave replication method, the leader synchronizes/notifies the follower, only the leader can accept the write operation, and the follower can only read but not write.
  • multi leader replication
    That is, multiple masters and multiple slaves, with multiple leaders distributed in different node and receiving write operations at the same time, and each leader is a follower to each other. It is more suitable for multi-data center scenarios, but the complexity of conflict resolution for concurrent writing of multi-data centers also increases.
  • leaderless replication
    There is no central replication, and no distinction is made between master and slave replicas. Any node can receive the request and then notify other replicas to update.

leader-based replication way

For details, seeCopy update policy, there are mainly the following kinds

  • sync replication
    Synchronous replication, this can ensure strong consistency, but when there are many follower, the delay is too large and is rarely used.
  • async replication
    Asynchronous replication, which may result in inconsistent reads, but high write efficiency
  • semi sync replication
    Semi-synchronization generally adopts quorum mechanism, that is, when the number of nodes written meets the specified conditions, the write is counted as successful, and then multiple nodes are concurrently requested to meet the read consistency.

leaderless replication way

Centerless replication can be divided into three topologies: ring, star/tree, and mesh

replication implementation

Mainly divided into the following categories

  • statement/trigger-based replication
    This kind of replication is realized by database-based statements or triggers, but there are some problems, such as some functions such as now()/rand()/seq (), which may cause uncertainty of master-slave synchronization, such as the execution results of now()/rand () of slave nodes are different from those of master. Mysql5.1 used this version before version 5.1, version 5.1+. When there are uncertain statements, switch to row-based log replication
  • write-ahead-log replication(WAL)
    WAL is an efficient log algorithm in databases. For non-memory databases, disk I/O operation is a bottleneck in database efficiency. With the same amount of data, when a transaction is committed in a database system using WAL log, the disk write operation is only about half of the traditional rollback log, which greatly improves the efficiency of the database disk I/O operation, thus improving the performance of the database.

This is what PG uses.

  • row-based-log replication(logical log)
    WAL is coupled with the database storage engine, while row-based-log is also called logical log, which is independent of the storage engine and adopts the method of change data capture, which is very convenient for data synchronization of heterogeneous data sources.

Problems Caused by replication

replication lag

  • Synchronization difference is large
    For example, mongo’s oplog is too small to keep up with the writing speed, causing the old operation log to be discarded. the master-slave delay has been increasing, causing the replica synchronization to fail.
  • Synchronization of newly added node
    For example, online expansion increases replication, which involves the replication of the node of the new node at this time. Generally, this synchronization method is separate from the synchronization method of the normal online node, and the new node will not change to the normal incremental synchronization method until a certain time.

master slave failover

In general, replication adds redundancy and is often used for hot standby (query support)/warm standby (query not support) of master.

  • When the primary node is hung up, this time it involves the question of which replication is the primary choice.
  • When the old master is restored, the data difference between the old master and the new master will be handled at this time.

read consistency

Once replication supports reading, it involves the consistency of reading. Generally speaking, in addition to strong consistency, there are several kinds of final consistency:

  • (1) Causal consistency
    That is, process a notifies process b after updating the data, then the range of the data in process b is the latest value updated by process a.
  • (2) Read your writes
    After Process A updates a data, it can always access the latest value it has updated.
  • (3) Session consistency
    Set the data consistency frame in the conversation, and realize the consistency of reading and writing in one conversation. That is, after performing the update, the client can always read the latest value of the data in the same session.
  • (4) Monotonic read consistency
    If a process reads a certain value of a data item from the system, the system should not return an older value for any subsequent data access to the process.
  • (5) Monotoic write consistency
    A system needs to ensure that writes from the same process are executed sequentially.

Reading involves reading what you have written, causal reading (For orderly operation). Monotone reading (Don't read old data)

Quorum/RWN Solution to Read Conflicts

write quorum

Assuming that a certain piece of data needs to be copied to three nodes, in order to ensure strong consistency, it is not necessary for all nodes to confirm the write operation, only two of them (that is, more than half of the nodes) need to confirm. In this case, if two conflicting write operations occur, only one of the operations can be approved by more than half of the nodes. this is write quorum. in a slightly more normal way, it is W>N/2. this inequality means that the number of nodes w participating in the write operation must exceed half of the number of replica nodes n, and the number of replica nodes is also called replication factor.

read quorum

Read quorum, that is, how many nodes must be contacted in order to be able to read the latest data. Assuming that the write operation requires two nodes to confirm (W=2), then we must contact at least two nodes to ensure the latest data is obtained. However, if some write operations are only confirmed by one node (W=1), then we must all communicate once among the three nodes to ensure that the acquired data is up to date. In one case, an update conflict may occur because the write operation does not get sufficient node support. However, as long as data is read out from a sufficient number of nodes, such conflicts can certainly be detected. Therefore, even if the write operation does not have strong consistency, the read operation with strong consistency can be realized.


  • R
    The number of nodes r to contact when performing a read operation
  • W
    The number of nodes w to be consulted when confirming the write operation
  • N
    Replication factor n

The relationship between the three can be expressed by an inequality, that is, only when R+W>N can the strong consistency of the reading operation be ensured.

Replication Overview of Mainstream Open Source Products

Products copy mode Implementation mode Other
mysql Master-slave semi-synchronization MySQL 5.0 and previous versions only support statement-based replication, and version 5.1+ switches to row-based log replication when there are uncertain statements. Master-slave delay processing
kafka Master-slave ISR semi-synchronization The leader writes the message and copies it to all follower,ISR. if the copy in the ISR is successfully written, the commit is successful only if ack is returned to the leader. The producer can choose whether to wait for the ISR’s ack or not.
elasticsearch Master-slave semi-synchronization, default replication=sync The optional values for consistency are quorum, one, and all. The default setting is quorum Tradelog and fsync and refresh
pg Master-slave asynchronous replication Based on Write-ahead log Archive and stream methods
redis Master-slave asynchronous replication Incremental Redis Protocol Sentinel failover
mongo Master-slave asynchronous, Replica set mode Persistent ring-buffer (initial _ sync, steady-sync) Arbitrator chooses the master

It can be seen that some high consistency requirements can be met by semi-synchronous mechanisms, generally based on quorum mechanism, such as es, and kafka adopts ISR mechanism, both of which can be configured.
The rest is basically asynchronous replication. For the synchronization of newly added node and recovery node, different synchronization methods are adopted. The newly added node generally adopts full synchronization, while the node in normal state generally adopts incremental synchronization

ISR of kafka (Abbreviation for in-syncreplicates, which means replica synchronization queue)

All replicas ARe collectively called Assigned Replicas, or ar. ISR is a subset of AR. leader maintains ISR list. follower has some delay in synchronizing data from leader. Any one exceeding the threshold value will remove follower from ISR and store it in OSR(Outof-Sync Replicas) list. The newly added follower will also be stored in OSR first. AR=ISR+OSR。

When producer sends a message to broker, leader writes the message and copies it to all follower. The message was successfully copied to all synchronized copies after submission. The message replication delay is limited by the slowest follower. It is important to quickly detect the slow copy. If the follower “lags” too much or fails, leader will delete it from the ISR.


Thus, Kafka’s replication mechanism is neither complete synchronous replication nor simple asynchronous replication. In fact, synchronous replication requires all working follower to be replicated before this message is committed, which greatly affects throughput. In the asynchronous replication mode, the follower asynchronously replicates data from the leader, and the data is deemed to have been committed as long as it is written into log by the leader. in this case, if the follower has not been replicated completely and falls behind the leader, the leader will suddenly go down, and the data will be lost. However, Kafka’s ISR method well balances the data loss and throughput.

Copy consistency of es

The consistency of es mainly has two aspects:

  • Refresh Problem Caused by lucene Index Mechanism

Between Elasticsearch and disk is the file system cache. Documents in the memory index buffer will be written to a new segment, but here the new segment will be written to the file system cache first-this step is cheaper, and later flushed to disk-this step is more expensive. However, as long as the file is already in the cache, it can be opened and read just like other files.

In Elasticsearch, the lightweight process of writing and opening a new segment is called refresh. By default, each slice is refreshed automatically every second. This is why we say that Elasticsearch is a near real-time search: document changes are not immedi ately visible to the search, but become visible within one second.

These behaviors may confuse new users: they index a document and then try to search it, but they fail to find it. The solution to this problem is to use the refresh API to perform a manual refresh.

Refresh_interval can be dynamically updated on an existing index. In a production environment, when you are building a large new index, you can turn off automatic refresh first, and then switch it back when you start using the index.

  • Replica consistency problems caused by fragmentation and replication (consistency:one, all, quorum)
    Under the condition of duplicate configuration, the data flows from sending to the Elasticsearch node to receiving the response from the Elasticsearch node and returning as follows

  • 1) the client request is sent to Node1 node, and it can also be sent to other nodes here.
  • 2)Node1 node uses the _id of the data to calculate that the data should be stored on shard0. Through cluster state information, it is found that the main shard0 fragment is on Node3,Node3. Node1 forwards the requested data to Node3, which completes the index of the data. The index process is described in detail in the previous blog.
  • 3) Node3 forwards the data in parallel to replica slices Node1 and Node2 assigned shard0. After receiving the report from any node that the duplicate fra gment data was successfully written, Node3 will return to the initial receiving node Node1 to announce that the data was successfully written. Node1 was successfully returned to the client.


The replication details of different products are different, but the big theories are consistent. In addition to the above-mentioned replication-related methods, replication needs to pay extra attention to replication-related abnormal scenarios in order to achieve mature application.