Talk about CRDT

  crdt

Order

This article mainly studies CRDT

CRDT

CRDT is short for Conflict-Free Replicated Data Type, also known as a passive synchronisation, which is a conflict-free replicable data type. This data type can be used for data replication across networks and can automatically resolve conflicts to achieve consistency. It is very suitable for systems using AP architecture to replicate data between partition. The concrete implementation can be divided into State-based CvRDT, Operation-based CmRDT, Delta-based and Pure operation-based etc.

Consistency without Consensus,guarantee convergence to the same value in spite of network delays, partitions and message reordering

State-based(CvRDT)

  • CvRDT is short for Convergent Replicated Data Type, also known an active synchronisation, and is commonly used in file systems such as NFS, AFS, Coda and KV storage such as Riak, Dynamo.
  • This method is accomplished by passing the states of the entire object. a merge function needs to be defined to merge the input object states.
  • The merge function needs to satisfy common and idempotent, i.e. monotonically increasing, so that it can be retried and has nothing to do with order.

Operation-based(CmRDT)

  • CmRDT is short for common replicated datatype and is commonly used in cooperative systems such as Bayou, Rover, IceCube, Telex.
  • This method is completed by passing operations, requiring the prepare method to generate operations, and the effect method to apply the changes represented by the input operations to the local state
  • Here, the transmission protocol is required to be reliable, if repeated transmission is possible, the effect is required to be idempotent, and there is a certain requirement for order. if order cannot be guaranteed, the effect needs to be added together to be the effect of or

Delta-based

Delta-based can be understood as an improvement combining State-based and Operation-based, which performs replicate through delta-mutators.

Pure operation-based

Generally, the Operation-based method requires the prepare method to generate operations, and there may be delay. Pure operation-based means that the implementation of prepare does not generate operations by comparing state, but only returns ready-made operations, which requires recording the operations of each operation on object state

Convergent Operations

For CRDT, in order to realize some operations of Conflict-free Replicated on the data structure, the following conditions need to be met:

  • Associative

(a+(b+c)=(a+b)+c), i.e. grouping has no effect

  • Commutative

(a+b=b+a), i.e. order has no effect

  • Idempotent

(a+a=a), i.e. duplication has no effect (Idempotent)

elementary data type

The basic data types of CRDT are Counters, Registers, Sets

Counters

  • Grow-only counter(G-Counter)

Merge using max function

  • Positive-negative counter(PN-Counter)

It is implemented by using two G-counters, one for increment, one for decrement, and the final value is sum.

Registers

Register has two operations: assign () and value ()

  • Last Write Wins -register(LWW-Register)

Add unique ids to each assign operation, such as timestamps or vector clock, and merge using the max function.

  • Multi-valued -register(MV-Register)

Similar to G-Counter, each assignment adds a new version and merge with max function

Sets

  • Grow-only set(G-Set)

Merge using union operation

  • Two-phase set(2P-Set)

It is implemented with two g-sets, one addSet for adding and one removeSet for removing.

  • Last write wins set(LWW-element Set)

Similar to 2P-Set, there is an addSet and a removeSet, but the timestamp information is added to the element, and the timestamp higher add and remove take precedence.

  • Observed-remove set(OR-Set)

Similar to 2P-Set, there is an addSet and a removeSet, but the tag information is added to the element, and the operation add for the same tag takes precedence over remove.

Other data types

Array

About ArrayThere is a replicated grown Array (RGA), which supports addRight(v, a) operations.

Graph

Graph can be implemented based on Sets structure, but concurrent addge (u, v) and removeVertex(u) operations need to be processed.

Map

Map needs to handle concurrent put and rmv operations.

Example

Used herewurmloch-crdtImplementation of

GCounter

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/GCounter.java

public class GCounter extends AbstractCrdt<GCounter, GCounter.UpdateCommand> {

    // fields
    private Map<String, Long> entries = HashMap.empty();


    // constructor
    public GCounter(String nodeId, String crdtId) {
        super(nodeId, crdtId, BehaviorProcessor.create());
    }


    // crdt
    @Override
    protected Option<UpdateCommand> processCommand(UpdateCommand command) {
        final Map<String, Long> oldEntries = entries;
        entries = entries.merge(command.entries, Math::max);
        return entries.equals(oldEntries)? Option.none() : Option.of(new UpdateCommand(crdtId, entries));
    }


    // core functionality
    public long get() {
        return entries.values().sum().longValue();
    }

    public void increment() {
        increment(1L);
    }

    public void increment(long value) {
        if (value < 1L) {
            throw new IllegalArgumentException("Value needs to be a positive number.");
        }
        entries = entries.put(nodeId, entries.get(nodeId).getOrElse(0L) + value);
        commands.onNext(new UpdateCommand(
                crdtId,
                entries
        ));
    }

    //......
}
  • Here, GCounter is implemented by HashMap, whose processCommand receives UpdateCommand and merges it by HashMap’s merge method, where BiFunction is math:: max; The get () method sums entries.values () to get the result

PNCounter

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/PNCounter.java

public class PNCounter extends AbstractCrdt<PNCounter, PNCounter.UpdateCommand> {

    // fields
    private Map<String, Long> pEntries = HashMap.empty();
    private Map<String, Long> nEntries = HashMap.empty();


    // constructor
    public PNCounter(String nodeId, String crtdId) {
        super(nodeId, crtdId, BehaviorProcessor.create());
    }


    // crdt
    protected Option<UpdateCommand> processCommand(PNCounter.UpdateCommand command) {
        final Map<String, Long> oldPEntries = pEntries;
        final Map<String, Long> oldNEntries = nEntries;
        pEntries = pEntries.merge(command.pEntries, Math::max);
        nEntries = nEntries.merge(command.nEntries, Math::max);
        return pEntries.equals(oldPEntries) && nEntries.equals(oldNEntries)? Option.none()
                : Option.of(new UpdateCommand(crdtId, pEntries, nEntries));
    }


    // core functionality
    public long get() {
        return pEntries.values().sum().longValue() - nEntries.values().sum().longValue();
    }

    public void increment() {
        increment(1L);
    }

    public void increment(long value) {
        if (value < 1L) {
            throw new IllegalArgumentException("Value needs to be a positive number.");
        }
        pEntries = pEntries.put(nodeId, pEntries.get(nodeId).getOrElse(0L) + value);
        commands.onNext(new UpdateCommand(
                crdtId,
                pEntries,
                nEntries
        ));
    }

    public void decrement() {
        decrement(1L);
    }

    public void decrement(long value) {
        if (value < 1L) {
            throw new IllegalArgumentException("Value needs to be a positive number.");
        }
        nEntries = nEntries.put(nodeId, nEntries.get(nodeId).getOrElse(0L) + value);
        commands.onNext(new UpdateCommand(
                crdtId,
                pEntries,
                nEntries
        ));
    }

    //......
}
  • PNCounter is implemented here using two HashMap, where pEntries are used for incrementing and nEntries are used for decrementing; ProcessCommand uses HashMap’s merge method to merge pEntries and nEntries respectively, where BiFunction is Math:: Max; The get () method uses the sum of pEntries.values () minus the sum of nEntries.values ()

LWWRegister

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/LWWRegister.java

public class LWWRegister<T> extends AbstractCrdt<LWWRegister<T>, LWWRegister.SetCommand<T>> {

    // fields
    private T value;
    private StrictVectorClock clock;


    // constructor
    public LWWRegister(String nodeId, String crdtId) {
        super(nodeId, crdtId, BehaviorProcessor.create());
        this.clock = new StrictVectorClock(nodeId);
    }


    // crdt
    protected Option<SetCommand<T>> processCommand(SetCommand<T> command) {
        if (clock.compareTo(command.getClock()) < 0) {
            clock = clock.merge(command.getClock());
            doSet(command.getValue());
            return Option.of(command);
        }
        return Option.none();
    }


    // core functionality
    public T get() {
        return value;
    }

    public void set(T newValue) {
        if (! Objects.equals(value, newValue)) {
            doSet(newValue);
            commands.onNext(new SetCommand<>(
                    crdtId,
                    value,
                    clock
            ));
        }
    }


    // implementation
    private void doSet(T value) {
        this.value = value;
        clock = clock.increment();
    }

    //......
}
  • LWWRegister uses StrictVectorClock here, and its processCommand receives SetCommand, which is less than command locally. clock () merge clock first, then doSet updates value, and updates locally to clock.increment ()

MVRegister

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/MVRegister.java

public class MVRegister<T> extends AbstractCrdt<MVRegister<T>, MVRegister.SetCommand<T>> {

    // fields
    private Array<Entry<T>> entries = Array.empty();


    // constructor
    public MVRegister(String nodeId, String crdtId) {
        super(nodeId, crdtId, ReplayProcessor.create());
    }


    // crdt
    protected Option<SetCommand<T>> processCommand(SetCommand<T> command) {
        final Entry<T> newEntry = command.getEntry();
        if (!entries.exists(entry -> entry.getClock().compareTo(newEntry.getClock()) > 0
                || entry.getClock().equals(newEntry.getClock()))) {
            final Array<Entry<T>> newEntries = entries
                    .filter(entry -> entry.getClock().compareTo(newEntry.getClock()) == 0)
                    .append(newEntry);
            doSet(newEntries);
            return Option.of(command);
        }
        return Option.none();
    }


    // core functionality
    public Array<T> get() {
        return entries.map(Entry::getValue);
    }

    public void set(T newValue) {
        if (entries.size() != 1 || !Objects.equals(entries.head().getValue(), newValue)) {
            final Entry<T> newEntry = new Entry<>(newValue, incVV());
            doSet(Array.of(newEntry));
            commands.onNext(new SetCommand<>(
                    crdtId,
                    newEntry
            ));
        }
    }


    // implementation
    private void doSet(Array<Entry<T>> newEntries) {
        entries = newEntries;
    }

    private VectorClock incVV() {
        final Array<VectorClock> clocks = entries.map(Entry::getClock);
        final VectorClock mergedClock = clocks.reduceOption(VectorClock::merge).getOrElse(new VectorClock());
        return mergedClock.increment(nodeId);
    }

    //......
}
  • LWWRegister uses Array and StrictVectorClock here, and its processCommand receives SetCommand, which creates newEntries when the clock of no entry is greater than or equal to new entry. getclock () . The newentries does not contain entries equal to clock and newEntry.getClock (). At the same time, newEntry is added, and finally doSet is used to assign to the local Entries.

GSet

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/GSet.java

public class GSet<E> extends AbstractSet<E> implements Crdt<GSet<E>, GSet.AddCommand<E>> {

    // fields
    private final String crdtId;
    private final Set<E> elements = new HashSet<>();
    private final Processor<AddCommand<E>, AddCommand<E>> commands = ReplayProcessor.create();


    // constructor
    public GSet(String crdtId) {
        this.crdtId = Objects.requireNonNull(crdtId, "Id must not be null");
    }


    // crdt
    @Override
    public String getCrdtId() {
        return crdtId;
    }

    @Override
    public void subscribe(Subscriber<? super AddCommand<E>> subscriber) {
        commands.subscribe(subscriber);
    }

    @Override
    public void subscribeTo(Publisher<? extends AddCommand<E>> publisher) {
        Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {
            final Option<AddCommand<E>> newCommand = processCommand(command);
            newCommand.peek(commands::onNext);
        });
    }

    private Option<AddCommand<E>> processCommand(AddCommand<E> command) {
        return doAdd(command.getElement())? Option.of(command) : Option.none();
    }


    // core functionality
    @Override
    public int size() {
        return elements.size();
    }

    @Override
    public Iterator<E> iterator() {
        return new GSetIterator();
    }

    @Override
    public boolean add(E element) {
        commands.onNext(new AddCommand<>(crdtId, element));
        return doAdd(element);
    }


    // implementation
    private synchronized boolean doAdd(E element) {
        return elements.add(element);
    }

    //......
}
  • Here GSet is implemented using Set, its processCommand receives AddCommand, and its doAdd method is merged using add of Set.

TwoPhaseSet

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/TwoPSet.java

public class TwoPSet<E> extends AbstractSet<E> implements Crdt<TwoPSet<E>, TwoPSet.TwoPSetCommand<E>> {

    // fields
    private final String crdtId;
    private final Set<E> elements = new HashSet<>();
    private final Set<E> tombstone = new HashSet<>();
    private final Processor<TwoPSetCommand<E>, TwoPSetCommand<E>> commands = ReplayProcessor.create();


    // constructor
    public TwoPSet(String crdtId) {
        this.crdtId = Objects.requireNonNull(crdtId, "CrdtId must not be null");
    }


    // crdt
    @Override
    public String getCrdtId() {
        return crdtId;
    }

    @Override
    public void subscribe(Subscriber<? super TwoPSetCommand<E>> subscriber) {
        commands.subscribe(subscriber);
    }

    @Override
    public void subscribeTo(Publisher<? extends TwoPSetCommand<E>> publisher) {
        Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {
            final Option<TwoPSetCommand<E>> newCommand = processCommand(command);
            newCommand.peek(commands::onNext);

        });
    }

    private Option<TwoPSetCommand<E>> processCommand(TwoPSetCommand<E> command) {
        if (command instanceof TwoPSet.AddCommand) {
            return doAdd(((TwoPSet.AddCommand<E>) command).getElement())? Option.of(command) : Option.none();
        } else if (command instanceof TwoPSet.RemoveCommand) {
            return doRemove(((TwoPSet.RemoveCommand<E>) command).getElement())? Option.of(command) : Option.none();
        }
        return Option.none();
    }


    // core functionality
    @Override
    public int size() {
        return elements.size();
    }

    @Override
    public Iterator<E> iterator() {
        return new TwoPSetIterator();
    }

    @Override
    public boolean add(E value) {
        final boolean changed = doAdd(value);
        if (changed) {
            commands.onNext(new TwoPSet.AddCommand<>(crdtId, value));
        }
        return changed;
    }


    // implementation
    private boolean doAdd(E value) {
        return !tombstone.contains(value) && elements.add(value);
    }

    private boolean doRemove(E value) {
        return tombstone.add(value) | elements.remove(value);
    }

    //......
}
  • TwoPSet is implemented here using two sets, where elements is used for add and tombstone is used for remove; ; Its processCommand method receives TwoPSetCommand, which has two subclasses, TwoPSet.AddCommand and TwoPSet.RemoveCommand, and the two command correspond to doAdd and doRemove methods respectively. DoAdd requires tombstone not to contain the element and add the element to elements; DoRemove adds elements to tombstone and removes elements from elements

ORSet

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/ORSet.java

public class ORSet<E> extends AbstractSet<E> implements Crdt<ORSet<E>, ORSet.ORSetCommand<E>> /*, ObservableSet<E> */ {

    // fields
    private final String crdtId;
    private final Set<Element<E>> elements = new HashSet<>();
    private final Set<Element<E>> tombstone = new HashSet<>();
    private final Processor<ORSetCommand<E>, ORSetCommand<E>> commands = ReplayProcessor.create();


    // constructor
    public ORSet(String crdtId) {
        this.crdtId = Objects.requireNonNull(crdtId, "Id must not be null");
    }


    // crdt
    @Override
    public String getCrdtId() {
        return crdtId;
    }

    @Override
    public void subscribe(Subscriber<? super ORSetCommand<E>> subscriber) {
        commands.subscribe(subscriber);
    }

    @Override
    public void subscribeTo(Publisher<? extends ORSetCommand<E>> publisher) {
        Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {
            final Option<ORSetCommand<E>> newCommand = processCommand(command);
            newCommand.peek(commands::onNext);

        });
    }

    private Option<ORSetCommand<E>> processCommand(ORSetCommand<E> command) {
        if (command instanceof AddCommand) {
            return doAdd(((AddCommand<E>) command).getElement())? Option.of(command) : Option.none();
        } else if (command instanceof RemoveCommand) {
            return doRemove(((RemoveCommand<E>) command).getElements())? Option.of(command) : Option.none();
        }
        return Option.none();
    }


    // core functionality
    @Override
    public int size() {
        return doElements().size();
    }

    @Override
    public Iterator<E> iterator() {
        return new ORSetIterator();
    }

    @Override
    public boolean add(E value) {
        final boolean contained = doContains(value);
        prepareAdd(value);
        return !contained;
    }


    // implementation
    private static <U> Predicate<Element<U>> matches(U value) {
        return element -> Objects.equals(value, element.getValue());
    }

    private synchronized boolean doContains(E value) {
        return elements.parallelStream().anyMatch(matches(value));
    }

    private synchronized Set<E> doElements() {
        return elements.parallelStream().map(Element::getValue).collect(Collectors.toSet());
    }

    private synchronized void prepareAdd(E value) {
        final Element<E> element = new Element<>(value, UUID.randomUUID());
        commands.onNext(new AddCommand<>(getCrdtId(), element));
        doAdd(element);
    }

    private synchronized boolean doAdd(Element<E> element) {
        return (elements.add(element) | elements.removeAll(tombstone)) && (!tombstone.contains(element));
    }

    private synchronized void prepareRemove(E value) {
        final Set<Element<E>> removes = elements.parallelStream().filter(matches(value)).collect(Collectors.toSet());
        commands.onNext(new RemoveCommand<>(getCrdtId(), removes));
        doRemove(removes);
    }

    private synchronized boolean doRemove(Collection<Element<E>> removes) {
        return elements.removeAll(removes) | tombstone.addAll(removes);
    }

    //......
}
  • Here ORSet is implemented using two Set, where elements is used for add and tombstone is used for remove; ; Its processCommand method receives ORSetCommand, which has two subclasses ORSet.AddCommand and ORSet.RemoveCommand, and the two command correspond to doAdd and doRemove methods respectively. The doAdd method first performs prepareAdd to create elements using UUID, then adds elements to elements and removes tombstone; ; The doRemove method first performs prepareRemove to find the elements collection removers that need to be removed, then removes removers from elements and adds removers to tombstone

Summary

  • CRDT is short for conflict-free replicated datatype, also known as a passive synchronisation, i.e. conflict-free replicable datatype; The concrete implementation can be divided into State-based CvRDT, Operation-based CmRDT, Delta-based and Pure operation-based etc.
  • CvRDT is short for Convergent Replicated Data Type, also known an active synchronisation, which is commonly used in file systems such as NFS, AFS, Coda and KV storage such as Riak, Dynamo. CvRDT is short for Convergent Replicated Data Type, also known an active synchronisation, and is commonly used in file systems such as NFS, AFS, Coda and KV storage such as Riak, Dynamo.
  • For CRDT, in order to realize Conflict-free Replicated, the operation on the data structure is Convergent, that is, it needs to satisfy Associative, Common and Idempotent;; The basic data types of CRDT are Counters (G-Counter、PN-Counter)、Registers(LWW-Register、MV-Register)、Sets(G-Set、2P-Set、LWW-element Set、OR-Set`)

doc