Challenge #3: Broadcast
The third challenge is to build a broadcast system based on Gossips. It is broken down in to multiple parts.
Challenge #3a: Single-Node Broadcast
The challenge is broadcast-a, which is to build a single-node broadcast system.
The task here is to store all messages a node has received in an array in the memory, when a broadcast message is received.
When a read message is received, it just needs to return the array. When a topology message is received, we just update our topology.
Let’s implement it in code as shown below.
type Topology struct {
store map[string]any
mu sync.Mutex
}
func (t *Topology) Update(m map[string]any) error {
t.mu.Lock()
t.store = m
t.mu.Unlock()
return nil
}
type KV struct {
store []int64
mu sync.Mutex
}
func (kv *KV) Put(v int64) error {
kv.mu.Lock()
kv.store = append(kv.store, v)
kv.mu.Unlock()
return nil
}
func (kv *KV) Get() ([]int64, error) {
kv.mu.Lock()
values := kv.store
kv.mu.Unlock()
return values, nil
}
func main() {
n := maelstrom.NewNode()
kv := KV{
store: make([]int64, 0),
mu: sync.Mutex{},
}
topology := Topology{
store: make(map[string]any),
mu: sync.Mutex{},
}
n.Handle("broadcast", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
// JSON number is of float type
err := kv.Put(int64(body["message"].(float64)))
if err != nil {
return err
}
return n.Reply(msg, map[string]string{"type": "broadcast_ok"})
})
n.Handle("read", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
vals, err := kv.Get()
if err != nil {
return err
}
return n.Reply(msg, map[string]any{"type": "read_ok", "messages": vals})
})
n.Handle("topology", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
err := topology.Update(body)
if err != nil {
return err
}
return n.Reply(msg, map[string]string{"type": "topology_ok"})
})
if err := n.Run(); err != nil {
log.Fatal(err)
}We can run it with the below command.
cd maelstrom-broadcast-a; go install; cd ..
maelstrom test -w broadcast --bin ~/go/bin/maelstrom-broadcast --node-count 1 --time-limit 20 --rate 10And it works :)
Please take a look at the complete commit here.
Challenge #3b: Multi-Node Broadcast
The challenge is broadcast-b, which is to build a multi-node broadcast system.
The assumption here is that if a node A is somehow (directly/indirectly) connected to node B, the updates from node A will reach node B eventually.
So our task here is to send each time, the message we received via broadcast message to all the nodes in our topology along with adding it to our array.
We only need minor changes to handling of our broadcast message compared to our previous implementation, which can be as follows.
n.Handle("broadcast", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
// JSON number is of float type
uniqueMessage := int64(body["message"].(float64))
if kv.Contains(uniqueMessage) {
// Message was already seen, skip it
return nil
}
err := kv.Put(uniqueMessage, uniqueMessage)
if err != nil {
return err
}
for _, o := range topology.store[n.ID()] {
_ = n.Send(o, body)
}
return n.Reply(msg, map[string]string{"type": "broadcast_ok"})
})We also need to make a minor change to our topology message handling, massaging the format of the message to a map to make accessing neighboring nodes easier.
func convertFullTopology(m map[string]any) map[string][]string {
newMap := make(map[string][]string)
for k, v := range m {
tmpSlice := make([]string, 0)
for _, vv := range v.([]interface{}) {
tmpSlice = append(tmpSlice, vv.(string))
}
newMap[k] = tmpSlice
}
return newMap
}We can run it with the below command.
cd maelstrom-broadcast-a; go install; cd ..
maelstrom test -w broadcast --bin ~/go/bin/maelstrom-broadcast --node-count 5 --time-limit 20 --rate 10And it works :)
Please take a look at the complete commit here.
Challenge #3c: Fault Tolerant Broadcast
The challenge is broadcast-c, which is to make the system work in the face of network partitions. When a network partition happens, nodes will not be able to communicate with each other. This means that our approach of sending the message we received to all other nodes in the topology won’t work, as they can’t communicate with each other. One solution would be to add a full sync of data, say every 2 seconds. Since all the messages are unique, this might work, even though it is very in-efficient. So let’s introduce a background goroutine to do this.
type BackgroundSyncer struct {
channel chan SendTo
node *maelstrom.Node
kv *KV
topology *FullTopology
}
func (b *BackgroundSyncer) enqueueMsg(v SendTo) {
b.channel <- v
}
func (b *BackgroundSyncer) run() {
for {
v := <-b.channel
err := b.node.Send(v.dest, v.body)
// only retry temporary errors
if slices.Contains([]int{maelstrom.Timeout, maelstrom.TemporarilyUnavailable,
maelstrom.Crash, maelstrom.Abort, maelstrom.TxnConflict,
}, maelstrom.ErrorCode(err)) {
b.channel <- v
}
}
}
// Do full run every 2 second
func (b *BackgroundSyncer) sync() {
for range time.Tick(time.Second * 2) {
allVals, _ := b.kv.Get()
for _, v := range allVals {
for _, o := range b.topology.store[b.node.ID()] {
b.enqueueMsg(SendTo{
dest: o,
body: map[string]any{"type": "broadcast", "message": v},
})
}
}
}
}We can start the background syncer by calling run(). And we can call sync() to send the full data every 2 seconds.
We can run it with the below command.
cd maelstrom-broadcast-c; go install; cd ..
maelstrom test -w broadcast --bin ~/go/bin/maelstrom-broadcast --node-count 5 --time-limit 20 --rate 10 --nemesis partitionAnd it works :)
Please take a look at the complete commit here.
Challenge #3d: Efficient Broadcast, Part I
3d is the part where the challenge gets a bit tricky. We need to apply a few tricks to pass this challenge. The challenge mentions that we can use our own topology, but I have kept the topology from Maelstrom. Let’s take a look at the constraints. * The delay in network is 100 ms * The message per operation should be less than 30 * Median latency <400 ms and Max latency <600 ms
This means we should have at most five hops to the furthest node. And less than three hopes to most of the nodes. To reach this, we first need to take a look at our topology, which is a 2D grid as shown below.
We can see that simply sending the messages to direct neighbors won’t work as the hops between two furthest away nodes
are eight hops. So let’s send messages directly to three hops away nodes. The assumption here is that the furthest away
nodes will get messages faster.
Another constraint is that we can have only 30 messages per operation. This means we should at most send a message only 30 times. With a topology like we have, it would be hard to achieve and will need smart routing decisions inside nodes. So rather than going that way, we can go with batching of messages. If we can batch say five messages and send it once, we can reduce the message count by 5X. To hit our latency requirements, we batch messages based on time. For example, we batch all messages for 100 ms and send it at once. This also means we can easily fine-tune latency and message per operation by adjusting our batching time.
Lastly, we also need to deal with network partitions. When we have a network partition, nodes on one partition can’t talk with
node on another partition. This means if we do a simple fire and forget Send(), it can happen other nodes will never receive those messages.
If we need to keep the fire and forget semantics, we will need a setup which will send the full copy of messages once in a while, so that
nodes can be reconciled, similar to our solution to #3c.
A different approach would be to send messages with acknowledgements like with SyncRPC(). In this case, we can retry if a message
didn’t get an acknowledgement after a given period of time. Together with batching, we can design our system in a way that we keep a
mapping from node to list of messages to send to that node. This means if a sending fails, next time we retry, we can also send all the new messages
that arrived while that message was being sent, making our system even more efficient.
Armed with this knowledge, let’s go the implementation. We will first take a look at the batch message sender.
func (b *BackgroundSyncer) sync() {
batch := make(map[string][]int64)
mu := sync.Mutex{}
// We will batch all requests in the last 165 milliseconds.
// This means we will have fewer messages
// The time can be fine-tuned w.r.t latency/message count tradeoff
ticker := time.NewTicker(165 * time.Millisecond)
go func() {
for {
select {
case v := <-b.channel:
// If it is a client, skip it
if strings.HasPrefix(v.dest, "c") {
continue
}
mu.Lock()
msg, ok := v.body.(map[string]any)
if ok {
if m, ok := msg["message"]; ok {
collectMessage(m, batch, v)
} else if msgs, ok := msg["messages"]; ok {
collectMessage(msgs, batch, v)
}
}
mu.Unlock()
}
}
}()
// Every 100 milliseconds, we will send and clear the batched messages
for range ticker.C {
mu.Lock()
for dest, messages := range batch {
if len(messages) == 0 {
continue
}
go func(dest string, messages []int64) {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
// We can reuse the same broadcast message to support taking a []int along with int.
// I just added a new message type called batch and their handling is the same.
_, err := b.node.SyncRPC(ctx, dest, map[string]any{
"type": "batch",
"messages": Unique(messages),
})
cancel()
if err != nil {
return
}
delete(batch, dest)
}(dest, messages)
}
mu.Unlock()
}
}
func collectMessage(m any, batch map[string][]int64, v SendTo) {
switch val := m.(type) {
case int64:
batch[v.dest] = append(batch[v.dest], val)
case []int64:
batch[v.dest] = append(batch[v.dest], val...)
case []interface{}:
for _, i := range val {
if f, ok := i.(float64); ok {
batch[v.dest] = append(batch[v.dest], int64(f))
} else if i64, ok := i.(int64); ok {
batch[v.dest] = append(batch[v.dest], i64)
}
}
}
}The code to send the messages to nodes three hops away can be as below.
// Save the message to local memory and send it to three hop neighbors
// We choose 3 hops neighbors to reduce the message propagation latency as the furthest node is eight hops away
func (b *BackgroundSyncer) saveAndPropagate(uniqueMessage int64, srcNode string) {
if b.kv.Contains(uniqueMessage) {
return
}
_ = b.kv.Put(uniqueMessage, uniqueMessage)
allTwoHopPaths := make([]string, 0, 20)
for _, o := range b.topology.Neighbors(b.node.ID()) {
allTwoHopPaths = append(allTwoHopPaths, b.topology.Neighbors(o)...)
}
allTwoHopPaths = Unique(allTwoHopPaths)
allThreeHopPaths := make([]string, 0, 20)
for _, o := range allTwoHopPaths {
allThreeHopPaths = append(allThreeHopPaths, b.topology.Neighbors(o)...)
}
allThreeHopPaths = Unique(allThreeHopPaths)
for _, o := range allThreeHopPaths {
if o == srcNode {
continue
}
if slices.Contains(b.topology.store[b.node.ID()], o) {
continue
}
if strings.HasPrefix(o, "c") {
continue
}
b.enqueueMsg(SendTo{
dest: o,
body: map[string]any{"type": "broadcast", "message": uniqueMessage},
})
}
dst, _ := strconv.Atoi(b.node.ID()[1:])
d := "n" + strconv.Itoa(24-dst)
b.enqueueMsg(SendTo{
dest: d,
body: map[string]any{"type": "broadcast", "message": uniqueMessage},
})
}Finally for completeness, the handling of batch and broadcast messages can be as below.
n.Handle("batch", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
messages := body["messages"].([]interface{})
for _, m := range messages {
uniqueMessage := int64(m.(float64))
bkSyncer.saveAndPropagate(uniqueMessage, msg.Src)
}
return nil
})
n.Handle("broadcast", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
uniqueMessage := int64(body["message"].(float64))
bkSyncer.saveAndPropagate(uniqueMessage, msg.Src)
return n.Reply(msg, map[string]string{"type": "broadcast_ok"})
})We can run it with the below command.
cd maelstrom-broadcast-d; go install; cd ..
maelstrom test -w broadcast --bin ~/go/bin/maelstrom-broadcast --node-count 25 --time-limit 20 --rate 100 --latency 100We can see the metrics as follows, which satisfies the constraints, along with network partitions :D
:servers {:send-count 29298,
:recv-count 29298,
:msg-count 29298,
:msgs-per-op 14.641679},
:stable-latencies {0 0,
0.5 386,
0.95 484,
0.99 493,
1 512}
Please take a look at the complete commit here.
Challenge #3e: Efficient Broadcast, Part II
For 3e, let’s take a look at the constraints.
- messages per operation should be below 20
- median latency <1 s and maximum latency <2 s
Taking at look at constraints, it seems that we just need to reuse the solution from 3d with a slight tuning of batching time. We can simply increase the batching time to 250 ms.
We can run it with the below command.
cd maelstrom-broadcast-e; go install; cd ..
maelstrom test -w broadcast --bin ~/go/bin/maelstrom-broadcast --node-count 25 --time-limit 20 --rate 100 --latency 100We can see the metrics as follows, which satisfies the constraints :D
:servers {:send-count 19380,
:recv-count 19244,
:msg-count 19380,
:msgs-per-op 9.704556},
:stable-latencies {0 0,
0.5 595,
0.95 736,
0.99 747,
1 842},
Please take a look at the complete commit here.