Gossip Glomers #5: Kafka-Style Log

Golang distributed-systems

Challenge #5a: Single-Node Kafka-Style Log

In #5a, we need to build an append-only log system on a single-node. You can see the semantics we need to support in the above link.

As it is a single-node setup, we can use a map. The key of the map can be key in the send request and the value of map can be a list of offset, msg pairs. As you can see the offset is not provided, and we should generate an offset which is monotonically increasing. This can be done by using a simple counter which is protected by a mutex in a single-node. As you can see from examples, the offset should be unique to a single key. This means we can reuse offsets across different keys if we want. For simplicity reasons we don’t do that in this challenge.

Since this is a single-node implementation, it is straight forward. The gist of the implementation can be seen below.

// Our unique ID generator
func (i *InMemoryUniqueIdGenerator) Next() int {
    i.mu.Lock()
    defer i.mu.Unlock()
    i.id++
    return i.id
}

func (i *InMemoryDataStore) save(key string, offset int, value int) {
	// Lock protects from in-consistent updates
	i.mu.Lock()
	defer i.mu.Unlock()
	i.store[key] = append(i.store[key], []int{offset, value})
}

// Provides all values for a key, with an offset which is greater than the offset
func (i *InMemoryDataStore) get(key string, offset int) [][]int {
	i.mu.RLock()
	defer i.mu.RUnlock()
	vals := i.store[key]
	out := make([][]int, 0)
	// Since we store a list of all (offset, value) pairs, we should iterate through the list
    // and find offsets greater than the provided offset
	for _, v := range vals {
		if v[0] >= offset {
			out = append(out, v)
		}
	}
	return out
}

type InMemoryOffsetStore struct {
    store map[string]int
    mu    sync.RWMutex
}

func (o *InMemoryOffsetStore) commit(key string, offset int) {
    o.mu.Lock()
    defer o.mu.Unlock()
    o.store[key] = offset
}

func (o *InMemoryOffsetStore) get(key string) int {
    o.mu.RLock()
    defer o.mu.RUnlock()
    return o.store[key]
}

func main() {
	n.Handle("send", func(msg maelstrom.Message) error {
        .....
        // Get a unique offset
		offset := logs.id.Next()
		
		// Store the message in the above offset
		logs.dataStore.save(body.Key, offset, body.Msg)
		result["offset"] = offset

		return n.Reply(msg, result)
	})

	n.Handle("poll", func(msg maelstrom.Message) error {
		.....
		result := make(map[string]any)
		result["type"] = "poll_ok"
		out := make(map[string][][]int)
		for k, v := range body.Offsets {
			out[k] = logs.dataStore.get(k, v)
		}
		result["msgs"] = out
		return n.Reply(msg, result)
	})

	n.Handle("commit_offsets", func(msg maelstrom.Message) error {
		.....
        // Offset store is responsible for storing the offsets for each key
		for k, v := range body.Offsets {
			logs.offsetStore.commit(k, v)
		}
		return n.Reply(msg, result)
	})

	n.Handle("list_committed_offsets", func(msg maelstrom.Message) error {
	    ....
        // Get offset for each key
		for _, v := range body.Keys {
			offset[v] = logs.offsetStore.get(v)
		}
		result["offsets"] = offset
		return n.Reply(msg, result)
	})
....

We can run it with the below command.

cd maelstrom-kafka-a; go install; cd ..
maelstrom test -w kafka --bin ~/go/bin/maelstrom-kafka --node-count 1 --concurrency 2n --time-limit 20 --rate 1000

And it works :D

Please take a look at the complete commit here.

Challenge #5b: Multi-Node Kafka-Style Log

In #5b, we need to convert the single-node to multi-node. The concepts remain the same. We should move the backing store from local memory to lin-kv, which supports linearizability over a key. This means if we update a key in client A, client B, will also see the changes done by client A. This means it provides similar guarantees to our in-memory map. Similar to our use of mutex, we can use CAS updates to guarantee consistency of updates. Our UniqueID generator can also be backed by lin-kv protected by CAS.

Let’s take a look at how we can use lin-kv and CAS to get similar semantics to single node.

type KVUniqueIdGenerator struct {
	kv *maelstrom.KV
}

// We create a uniqueID per key. This is more scalable as it avoids contentions in CAS
func (i *KVUniqueIdGenerator) Next(ctx context.Context, key string) (int, error) {
	idKey := "id_" + key
	for {
		val, err := i.kv.ReadInt(ctx, idKey)
		if err != nil && maelstrom.ErrorCode(err) != maelstrom.KeyDoesNotExist {
			return 0, err
		}

		newVal := val + 1
		// CAS makes sure updates are consistent
		err = i.kv.CompareAndSwap(ctx, idKey, val, newVal, maelstrom.ErrorCode(err) == maelstrom.KeyDoesNotExist)
		if err == nil {
			return newVal, nil
		}

		if !slices.Contains([]int{maelstrom.PreconditionFailed, maelstrom.KeyDoesNotExist,
			maelstrom.Timeout, maelstrom.TemporarilyUnavailable, maelstrom.Crash, maelstrom.Abort, maelstrom.TxnConflict,
		}, maelstrom.ErrorCode(err)) {
			return 0, err
		}
	}
}

We just need to update poll, save and commit_offsets etc. to use the lin-kv backing store with CAS. The gist of required changes is provided below.

n.Handle("send", func(msg maelstrom.Message) error {
     .....
    // Generate Unique ID
    offset, err := logs.id.Next(ctx, body.Key)
    if err != nil {
        return err
    }
    // Use the above ID to store the data
    err = logs.dataStore.save(ctx, body.Key, offset, body.Msg)
    if err != nil {
        return err
    }
    result["offset"] = offset
    
    return n.Reply(msg, result)
}

type KVDataStore struct {
	kv *maelstrom.KV
}

func (i *KVDataStore) save(ctx context.Context, key string, offset int, value int) error {
	dataKey := "data_" + key
	// Retry in a loop to avoid message loss due to failures
	for {
		var data [][]int
		err := i.kv.ReadInto(ctx, dataKey, &data)
		if err != nil && maelstrom.ErrorCode(err) != maelstrom.KeyDoesNotExist {
			return err
		}

		// We sort data so that we can find the required offset 
		newData := append(data, []int{offset, value})
		sort.Slice(newData, func(i, j int) bool {
			return newData[i][0] < newData[j][0]
		})
		err = i.kv.CompareAndSwap(ctx, dataKey, data, newData, maelstrom.ErrorCode(err) == maelstrom.KeyDoesNotExist)
		if err == nil {
			return nil
		}
    .....
	}
}

After similar changes, let’s run it with the below command.

cd maelstrom-kafka-b; go install; cd ..
maelstrom test -w kafka --bin ~/go/bin/maelstrom-kafka --node-count 2 --concurrency 2n --time-limit 20 --rate 1000

And it works :D

Please take a look at the complete commit here.

Challenge #5c: Efficient Kafka-Style Log

In #5c, we need to take a look at our existing implementation and improve the performance.

Let’s take a look at our msgs-per-op for our current solution.

 :net {:all {:send-count 1958345,
             :recv-count 1958345,
             :msg-count 1958345,
             :msgs-per-op 109.14864},
       :clients {:send-count 39107,
                 :recv-count 39107,
                 :msg-count 39107},
       :servers {:send-count 1919238,
                 :recv-count 1919238,
                 :msg-count 1919238,
                 :msgs-per-op 106.96901},

We see that we have over 109 messages per operation, which is a lot. When we take a look at CAS errors, there are over a 1000 of them. CAS errors This seems to be the reason for the increased message count, as CAS failures are expensive due to retries. One possible solution to avoid CAS failures for a key, is to make a single node responsible for a key. This can be achieved by using a consistent hashing algorithm to map keys to nodes. This is also how systems like Kafka work. Taking a look at examples, the keys are integers in our case. Which means simply doing a mod operation with number of nodes will give us a node to map to. So key to node mapping can be done using key % num_nodes. This means if a node A receives read/update for key K which is mapped to node B, it will forward the request to node B.

To implement the change, we also introduce a new action called internal, to handle these forwarded requests. The gist of the code is as below.

func getResponsibleNode(key string, n *maelstrom.Node) string {
    nodes := n.NodeIDs()
    slices.Sort(nodes)
    keyInt, err := strconv.Atoi(key)
    if err != nil {
        return nodes[0]
    }
    return nodes[keyInt%len(nodes)]
}

func func main() {
	n.Handle("send", func(msg maelstrom.Message) error {
		....
		if getResponsibleNode(body.Key, n) != n.ID() {
			resp, err := n.SyncRPC(ctx, getResponsibleNode(body.Key, n), map[string]any{
				"type":   "internal",
				"method": "send",
				"key":    body.Key,
				"msg":    body.Msg,
			})
			if err != nil {
				return err
			}
			return n.Reply(msg, resp.Body)
		}

		result := make(map[string]any)
		result["type"] = "send_ok"
		offset, err := logs.id.Next(ctx, body.Key)
		if err != nil {
			return err
		}
		err = logs.dataStore.save(ctx, body.Key, offset, body.Msg)
		if err != nil {
			return err
		}
		result["offset"] = offset

		return n.Reply(msg, result)
	})
	
	n.Handle("internal", func(msg maelstrom.Message) error {
		var body map[string]any
		if err := json.Unmarshal(msg.Body, &body); err != nil {
			return err
		}
		method := body["method"].(string)
		switch method {
		case "send":
			key := body["key"].(string)
			msgVal := int(body["msg"].(float64))
			offset, err := logs.id.Next(ctx, key)
			if err != nil {
				return err
			}
			err = logs.dataStore.save(ctx, key, offset, msgVal)
			if err != nil {
				return err
			}
			return n.Reply(msg, map[string]any{"type": "send_ok", "offset": offset})
		case "poll":
			key := body["key"].(string)
			offset := int(body["offset"].(float64))
			msgs, err := logs.dataStore.get(ctx, key, offset)
			if err != nil {
				return err
			}
			return n.Reply(msg, map[string]any{"type": "poll_ok", "msgs": msgs})
		case "commit_offsets":
			key := body["key"].(string)
			offset := int(body["offset"].(float64))
			err := logs.offsetStore.commit(ctx, key, offset)
			if err != nil {
				return err
			}
			return n.Reply(msg, map[string]any{"type": "commit_offsets_ok"})
		case "list_committed_offsets":
			key := body["key"].(string)
			off, err := logs.offsetStore.get(ctx, key)
			if err != nil {
				return err
			}
			return n.Reply(msg, map[string]any{"type": "list_committed_offsets_ok", "offset": off})
		}
		return nil
	})
	....

After similar changes, let’s run it with the below command.

cd maelstrom-kafka-c; go install; cd ..
maelstrom test -w kafka --bin ~/go/bin/maelstrom-kafka --node-count 2 --concurrency 2n --time-limit 20 --rate 1000

Let’s take a look at new metrics.

 :net {:all {:send-count 247320,
             :recv-count 247320,
             :msg-count 247320,
             :msgs-per-op 13.473524},
       :clients {:send-count 40000,
                 :recv-count 40000,
                 :msg-count 40000},
       :servers {:send-count 207320,
                 :recv-count 207320,
                 :msg-count 207320,
                 :msgs-per-op 11.294399},

We see that message per operation, we see it drastically reduced ~10x. Also, the CAS conflicts are <10. So ~100X improvement.

Please take a look at the complete commit here.