Challenge #5a: Single-Node Kafka-Style Log
In #5a, we need to build an append-only log system on a single-node. You can see the semantics we need to support in the above link.
As it is a single-node setup, we can use a map. The key of the map can be key in the send request and the value of map can be a
list of offset, msg pairs. As you can see the offset is not provided, and we should generate an offset which is monotonically increasing.
This can be done by using a simple counter which is protected by a mutex in a single-node. As you can see from examples, the offset should be
unique to a single key. This means we can reuse offsets across different keys if we want. For simplicity reasons we don’t do that in this challenge.
Since this is a single-node implementation, it is straight forward. The gist of the implementation can be seen below.
// Our unique ID generator
func (i *InMemoryUniqueIdGenerator) Next() int {
i.mu.Lock()
defer i.mu.Unlock()
i.id++
return i.id
}
func (i *InMemoryDataStore) save(key string, offset int, value int) {
// Lock protects from in-consistent updates
i.mu.Lock()
defer i.mu.Unlock()
i.store[key] = append(i.store[key], []int{offset, value})
}
// Provides all values for a key, with an offset which is greater than the offset
func (i *InMemoryDataStore) get(key string, offset int) [][]int {
i.mu.RLock()
defer i.mu.RUnlock()
vals := i.store[key]
out := make([][]int, 0)
// Since we store a list of all (offset, value) pairs, we should iterate through the list
// and find offsets greater than the provided offset
for _, v := range vals {
if v[0] >= offset {
out = append(out, v)
}
}
return out
}
type InMemoryOffsetStore struct {
store map[string]int
mu sync.RWMutex
}
func (o *InMemoryOffsetStore) commit(key string, offset int) {
o.mu.Lock()
defer o.mu.Unlock()
o.store[key] = offset
}
func (o *InMemoryOffsetStore) get(key string) int {
o.mu.RLock()
defer o.mu.RUnlock()
return o.store[key]
}
func main() {
n.Handle("send", func(msg maelstrom.Message) error {
.....
// Get a unique offset
offset := logs.id.Next()
// Store the message in the above offset
logs.dataStore.save(body.Key, offset, body.Msg)
result["offset"] = offset
return n.Reply(msg, result)
})
n.Handle("poll", func(msg maelstrom.Message) error {
.....
result := make(map[string]any)
result["type"] = "poll_ok"
out := make(map[string][][]int)
for k, v := range body.Offsets {
out[k] = logs.dataStore.get(k, v)
}
result["msgs"] = out
return n.Reply(msg, result)
})
n.Handle("commit_offsets", func(msg maelstrom.Message) error {
.....
// Offset store is responsible for storing the offsets for each key
for k, v := range body.Offsets {
logs.offsetStore.commit(k, v)
}
return n.Reply(msg, result)
})
n.Handle("list_committed_offsets", func(msg maelstrom.Message) error {
....
// Get offset for each key
for _, v := range body.Keys {
offset[v] = logs.offsetStore.get(v)
}
result["offsets"] = offset
return n.Reply(msg, result)
})
....We can run it with the below command.
cd maelstrom-kafka-a; go install; cd ..
maelstrom test -w kafka --bin ~/go/bin/maelstrom-kafka --node-count 1 --concurrency 2n --time-limit 20 --rate 1000And it works :D
Please take a look at the complete commit here.
Challenge #5b: Multi-Node Kafka-Style Log
In #5b, we need to convert the single-node to multi-node. The concepts remain the same.
We should move the backing store from local memory to lin-kv, which
supports linearizability over a key. This means if we update a key in client A,
client B, will also see the changes done by client A. This means it provides similar guarantees to our in-memory map. Similar to our use of mutex,
we can use CAS updates to guarantee consistency of updates. Our UniqueID generator can also be backed by lin-kv protected by CAS.
Let’s take a look at how we can use lin-kv and CAS to get similar semantics to single node.
type KVUniqueIdGenerator struct {
kv *maelstrom.KV
}
// We create a uniqueID per key. This is more scalable as it avoids contentions in CAS
func (i *KVUniqueIdGenerator) Next(ctx context.Context, key string) (int, error) {
idKey := "id_" + key
for {
val, err := i.kv.ReadInt(ctx, idKey)
if err != nil && maelstrom.ErrorCode(err) != maelstrom.KeyDoesNotExist {
return 0, err
}
newVal := val + 1
// CAS makes sure updates are consistent
err = i.kv.CompareAndSwap(ctx, idKey, val, newVal, maelstrom.ErrorCode(err) == maelstrom.KeyDoesNotExist)
if err == nil {
return newVal, nil
}
if !slices.Contains([]int{maelstrom.PreconditionFailed, maelstrom.KeyDoesNotExist,
maelstrom.Timeout, maelstrom.TemporarilyUnavailable, maelstrom.Crash, maelstrom.Abort, maelstrom.TxnConflict,
}, maelstrom.ErrorCode(err)) {
return 0, err
}
}
}We just need to update poll, save and commit_offsets etc. to use the lin-kv backing store with CAS.
The gist of required changes is provided below.
n.Handle("send", func(msg maelstrom.Message) error {
.....
// Generate Unique ID
offset, err := logs.id.Next(ctx, body.Key)
if err != nil {
return err
}
// Use the above ID to store the data
err = logs.dataStore.save(ctx, body.Key, offset, body.Msg)
if err != nil {
return err
}
result["offset"] = offset
return n.Reply(msg, result)
}
type KVDataStore struct {
kv *maelstrom.KV
}
func (i *KVDataStore) save(ctx context.Context, key string, offset int, value int) error {
dataKey := "data_" + key
// Retry in a loop to avoid message loss due to failures
for {
var data [][]int
err := i.kv.ReadInto(ctx, dataKey, &data)
if err != nil && maelstrom.ErrorCode(err) != maelstrom.KeyDoesNotExist {
return err
}
// We sort data so that we can find the required offset
newData := append(data, []int{offset, value})
sort.Slice(newData, func(i, j int) bool {
return newData[i][0] < newData[j][0]
})
err = i.kv.CompareAndSwap(ctx, dataKey, data, newData, maelstrom.ErrorCode(err) == maelstrom.KeyDoesNotExist)
if err == nil {
return nil
}
.....
}
}After similar changes, let’s run it with the below command.
cd maelstrom-kafka-b; go install; cd ..
maelstrom test -w kafka --bin ~/go/bin/maelstrom-kafka --node-count 2 --concurrency 2n --time-limit 20 --rate 1000And it works :D
Please take a look at the complete commit here.
Challenge #5c: Efficient Kafka-Style Log
In #5c, we need to take a look at our existing implementation and improve the performance.
Let’s take a look at our msgs-per-op for our current solution.
:net {:all {:send-count 1958345,
:recv-count 1958345,
:msg-count 1958345,
:msgs-per-op 109.14864},
:clients {:send-count 39107,
:recv-count 39107,
:msg-count 39107},
:servers {:send-count 1919238,
:recv-count 1919238,
:msg-count 1919238,
:msgs-per-op 106.96901},
We see that we have over 109 messages per operation, which is a lot.
When we take a look at CAS errors, there are over a 1000 of them.
This seems to be the reason for the increased message count, as CAS failures are expensive due to retries.
One possible solution to avoid CAS failures for a key, is to make a single node responsible for a key.
This can be achieved by using a consistent hashing algorithm to map keys to nodes. This is also how systems like Kafka work.
Taking a look at examples, the keys are integers in our case. Which means simply doing a mod operation with number of nodes will give us a node to map to.
So key to node mapping can be done using key % num_nodes.
This means if a node A receives read/update for key K which is mapped to node B, it will forward the request to node B.
To implement the change, we also introduce a new action called internal, to handle these forwarded requests.
The gist of the code is as below.
func getResponsibleNode(key string, n *maelstrom.Node) string {
nodes := n.NodeIDs()
slices.Sort(nodes)
keyInt, err := strconv.Atoi(key)
if err != nil {
return nodes[0]
}
return nodes[keyInt%len(nodes)]
}
func func main() {
n.Handle("send", func(msg maelstrom.Message) error {
....
if getResponsibleNode(body.Key, n) != n.ID() {
resp, err := n.SyncRPC(ctx, getResponsibleNode(body.Key, n), map[string]any{
"type": "internal",
"method": "send",
"key": body.Key,
"msg": body.Msg,
})
if err != nil {
return err
}
return n.Reply(msg, resp.Body)
}
result := make(map[string]any)
result["type"] = "send_ok"
offset, err := logs.id.Next(ctx, body.Key)
if err != nil {
return err
}
err = logs.dataStore.save(ctx, body.Key, offset, body.Msg)
if err != nil {
return err
}
result["offset"] = offset
return n.Reply(msg, result)
})
n.Handle("internal", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
method := body["method"].(string)
switch method {
case "send":
key := body["key"].(string)
msgVal := int(body["msg"].(float64))
offset, err := logs.id.Next(ctx, key)
if err != nil {
return err
}
err = logs.dataStore.save(ctx, key, offset, msgVal)
if err != nil {
return err
}
return n.Reply(msg, map[string]any{"type": "send_ok", "offset": offset})
case "poll":
key := body["key"].(string)
offset := int(body["offset"].(float64))
msgs, err := logs.dataStore.get(ctx, key, offset)
if err != nil {
return err
}
return n.Reply(msg, map[string]any{"type": "poll_ok", "msgs": msgs})
case "commit_offsets":
key := body["key"].(string)
offset := int(body["offset"].(float64))
err := logs.offsetStore.commit(ctx, key, offset)
if err != nil {
return err
}
return n.Reply(msg, map[string]any{"type": "commit_offsets_ok"})
case "list_committed_offsets":
key := body["key"].(string)
off, err := logs.offsetStore.get(ctx, key)
if err != nil {
return err
}
return n.Reply(msg, map[string]any{"type": "list_committed_offsets_ok", "offset": off})
}
return nil
})
....After similar changes, let’s run it with the below command.
cd maelstrom-kafka-c; go install; cd ..
maelstrom test -w kafka --bin ~/go/bin/maelstrom-kafka --node-count 2 --concurrency 2n --time-limit 20 --rate 1000Let’s take a look at new metrics.
:net {:all {:send-count 247320,
:recv-count 247320,
:msg-count 247320,
:msgs-per-op 13.473524},
:clients {:send-count 40000,
:recv-count 40000,
:msg-count 40000},
:servers {:send-count 207320,
:recv-count 207320,
:msg-count 207320,
:msgs-per-op 11.294399},
We see that message per operation, we see it drastically reduced ~10x. Also, the CAS conflicts are <10. So ~100X improvement.
Please take a look at the complete commit here.