Challenge #4: Grow-Only Counter
In the Grow-Only Counter challenge, we are given a sequentially consistent KV store, and we should
implement a counter which is eventually consistent. Sequentially consistent means if a client A performs write W1, it is guaranteed that
read R1 from client A will see the changes. But it is not guaranteed that client B read R2 after write W1 will see the changes.
This means that simply updating a key counter in the KV store will solve the challenge. The reason being that changes by client A is not
visible by client B which is required for an eventually consistent Grow-Only counter.
The key idea here is that when we do a read R, we should return the latest value of the counter which is seen by any of the clients.
This is possible by sending the read request R to all nodes and taking the max of all the counters returned by the nodes. Also when we update the counter
we should do a CAS(Compare and Swap).
The gist of the code can be seen below.
func main() {
n := maelstrom.NewNode()
kv := maelstrom.NewSeqKV(n)
ctx := context.Background()
n.Handle("add", func(msg maelstrom.Message) error {
....
updateCounter(kv, ctx, newValue)
....
return n.Reply(msg, resp)
})
// We introduce a new operation to do targeted counter reads from a node
n.Handle("readone", func(msg maelstrom.Message) error {
....
value, err := readCounter(kv, ctx)
....
return n.Reply(msg, resp)
})
n.Handle("read", func(msg maelstrom.Message) error {
....
req := make(map[string]any)
req["type"] = "readone"
// Get value from own replica
value, err := readCounter(kv, ctx)
for slices.Contains([]int{maelstrom.Timeout, maelstrom.TemporarilyUnavailable,
maelstrom.Crash, maelstrom.Abort, maelstrom.TxnConflict,
}, maelstrom.ErrorCode(err)) {
// We need to retry as failures are temporary
value, err = readCounter(kv, ctx)
}
// We read from all nodes and compute the maximum value
for _, v := range n.NodeIDs() {
ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
if v == n.ID() {
cancel()
continue
}
var j map[string]any
reply, err := n.SyncRPC(ctx, v, req)
....
if err := json.Unmarshal(reply.Body, &j); err != nil {
cancel()
continue
}
....
value = int(math.Max(j["value"].(float64), float64(value)))
cancel()
}
resp := make(map[string]any)
resp["type"] = "read_ok"
resp["value"] = value
return n.Reply(msg, resp)
})
if err := n.Run(); err != nil {
log.Fatal(err)
}
}
func readCounter(kv *maelstrom.KV, ctx context.Context) (int, error) {
v, err := kv.ReadInt(ctx, C)
return v, err
}
func updateCounter(kv *maelstrom.KV, ctx context.Context, delta int) error {
createIfNotExists := false
from := 0
readValue, err := kv.ReadInt(ctx, C)
if maelstrom.ErrorCode(err) == maelstrom.KeyDoesNotExist {
// Key doesn't exist, so create it
createIfNotExists = true
} else {
// Key exists, so try CAS on the current value
from = readValue
}
err = kv.CompareAndSwap(ctx, C, from, readValue+delta, createIfNotExists)
return err
}We can run it with the below command.
cd maelstrom-counter; go install; cd ..
maelstrom test -w g-counter --bin ~/go/bin/maelstrom-counter --node-count 3 --rate 100 --time-limit 20 --nemesis partitionAnd it works :D
Please take a look at the complete commit here.