-
Notifications
You must be signed in to change notification settings - Fork 0
/
lww_element_dict.go
95 lines (87 loc) · 2.85 KB
/
lww_element_dict.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package crdt
type LWWElementDict struct {
id int
clock Clock
adds map[string]TimedValue
removes map[string]TimedValue
}
func NewLWWElementDict(id int, clock Clock) LWWElementDict {
return LWWElementDict{
id: id,
clock: clock,
adds: map[string]TimedValue{},
removes: map[string]TimedValue{},
}
}
// Add-or-updates a key-value pair, and generates a new timestamp.
// Timestamps are assumed unique, totally ordered, consistent with causal order, and monotonically increasing.
func (l LWWElementDict) Add(key string, value interface{}) {
timedValue := NewTimedValue(value, l.clock.Now())
if add, ok := l.adds[key]; ok && add.Compare(timedValue) >= 0 { // consistent with causal order
return
}
l.adds[key] = timedValue
}
// Removes an existing key-value pair, and generates a new timestamp.
// Timestamps are assumed unique, totally ordered, consistent with causal order, and monotonically increasing.
func (l LWWElementDict) Remove(key string) {
if !l.Lookup(key) { // an element can only be removed if it is present
return
}
timedValue := NewTimedValue(nil, l.clock.Now())
if remove, ok := l.removes[key]; ok && remove.Compare(timedValue) >= 0 { // consistent with causal order
return
}
l.removes[key] = timedValue
}
// Lookup key-value pair by its key.
// The lookup is biased towards addition.
func (l LWWElementDict) Lookup(key string) bool {
add, ok := l.adds[key]
if !ok {
return false
}
remove, ok := l.removes[key]
if ok && remove.Compare(add) > 0 { // biased towards addition
return false
}
return true
}
// Returns the current value:
// A collection of key-value pairs without their underlying add-and-remove timestamps.
func (l LWWElementDict) Values() map[string]interface{} {
values := map[string]interface{}{}
for key, _ := range l.adds {
if l.Lookup(key) {
values[key] = l.adds[key].value
}
}
return values
}
// Merge commutes the values with the highest timestamp.
// On concurrent add-or-update, the node with the lowest identifier (address) takes precedence.
// Therefore, concurrent add-or-update operations are deterministic.
// Merge is commutative, associative, and idempotent.
func (l LWWElementDict) Merge(other LWWElementDict) LWWElementDict {
dominant := l.id >= other.id
result := NewLWWElementDict(l.id, l.clock)
result.adds = merge(l.adds, other.adds, dominant)
result.removes = merge(l.removes, other.removes, dominant)
return result
}
func merge(source map[string]TimedValue, replica map[string]TimedValue, dominant bool) map[string]TimedValue {
result := map[string]TimedValue{}
for key, value := range source {
result[key] = value
}
for key, replicaValue := range replica {
sourceValue, ok := source[key]
if !ok {
} else if sourceValue.Compare(replicaValue) > 0 ||
(sourceValue.Compare(replicaValue) == 0 && !dominant) {
continue
}
result[key] = replicaValue
}
return result
}