Gossip Glomers #4: Grow-Only Counter

Golang distributed-systems

Challenge #4: Grow-Only Counter

In the Grow-Only Counter challenge, we are given a sequentially consistent KV store, and we should implement a counter which is eventually consistent. Sequentially consistent means if a client A performs write W1, it is guaranteed that read R1 from client A will see the changes. But it is not guaranteed that client B read R2 after write W1 will see the changes. This means that simply updating a key counter in the KV store will solve the challenge. The reason being that changes by client A is not visible by client B which is required for an eventually consistent Grow-Only counter.

The key idea here is that when we do a read R, we should return the latest value of the counter which is seen by any of the clients. This is possible by sending the read request R to all nodes and taking the max of all the counters returned by the nodes. Also when we update the counter we should do a CAS(Compare and Swap).

The gist of the code can be seen below.

func main() {
	n := maelstrom.NewNode()
	kv := maelstrom.NewSeqKV(n)
	ctx := context.Background()

	n.Handle("add", func(msg maelstrom.Message) error {
		....
		updateCounter(kv, ctx, newValue)
        ....
		return n.Reply(msg, resp)
	})

	// We introduce a new operation to do targeted counter reads from a node
	n.Handle("readone", func(msg maelstrom.Message) error {
		....
		value, err := readCounter(kv, ctx)
	    ....	
		return n.Reply(msg, resp)
	})

	n.Handle("read", func(msg maelstrom.Message) error {
		....
		req := make(map[string]any)
		req["type"] = "readone"

		// Get value from own replica
		value, err := readCounter(kv, ctx)
		for slices.Contains([]int{maelstrom.Timeout, maelstrom.TemporarilyUnavailable,
			maelstrom.Crash, maelstrom.Abort, maelstrom.TxnConflict,
		}, maelstrom.ErrorCode(err)) {
			// We need to retry as failures are temporary
			value, err = readCounter(kv, ctx)
		}

		// We read from all nodes and compute the maximum value
		for _, v := range n.NodeIDs() {
			ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
			if v == n.ID() {
				cancel()
				continue
			}
			var j map[string]any
			reply, err := n.SyncRPC(ctx, v, req)
			....
			if err := json.Unmarshal(reply.Body, &j); err != nil {
				cancel()
				continue
			}
			....
			value = int(math.Max(j["value"].(float64), float64(value)))
			cancel()
		}

		resp := make(map[string]any)
		resp["type"] = "read_ok"
		resp["value"] = value
		return n.Reply(msg, resp)
	})

	if err := n.Run(); err != nil {
		log.Fatal(err)
	}
}

func readCounter(kv *maelstrom.KV, ctx context.Context) (int, error) {
	v, err := kv.ReadInt(ctx, C)
	return v, err
}

func updateCounter(kv *maelstrom.KV, ctx context.Context, delta int) error {
	createIfNotExists := false
	from := 0
	readValue, err := kv.ReadInt(ctx, C)
	if maelstrom.ErrorCode(err) == maelstrom.KeyDoesNotExist {
		// Key doesn't exist, so create it
		createIfNotExists = true
	} else {
		// Key exists, so try CAS on the current value
		from = readValue
	}
	err = kv.CompareAndSwap(ctx, C, from, readValue+delta, createIfNotExists)
	return err
}

We can run it with the below command.

cd maelstrom-counter; go install; cd ..
maelstrom test -w g-counter --bin ~/go/bin/maelstrom-counter --node-count 3 --rate 100 --time-limit 20 --nemesis partition

And it works :D

Please take a look at the complete commit here.