Challenge #6a: Single-Node, Totally Available Transactions

#6a is to build a single node KV store which implements transactions and supports micro-operations. Since this is a single node, we can simply use a mutex when reading and writing to KV store to have consistent updates. Since we only need to support weak consistency, we can also directly modify the store when applying micro-operations. We can also simply loop through micro-operations and apply them in order as can be seen in the code below.

type Store struct {
	mu   sync.RWMutex
	data map[int]int
}

// Directly change the store when processing each micro operation.
func (s *Store) processTxns(txs [][3]any) [][3]any {
	for _, t := range txs {
		// read value from DB
		if t[0] == "r" {
			t[2] = s.read(int(t[1].(float64)))
		}
		// write value to DB
		if t[0] == "w" {
			s.write(int(t[1].(float64)), int(t[2].(float64)))
		}
	}
	return txs
}

func (s *Store) read(k int) any {
	s.mu.RLock()
	var r any = nil
	if v, ok := s.data[k]; ok {
		r = v
	}
	s.mu.RUnlock()
	return r
}

func (s *Store) write(k, v int) {
	s.mu.Lock()
	s.data[k] = v
	s.mu.Unlock()
}
go

After similar changes, let’s run it with the below command.

cd maelstrom-txn-a; go install; cd ..
maelstrom test -w txn-rw-register --bin ~/go/bin/maelstrom-txn --node-count 1 --time-limit 20 --rate 1000 --concurrency 2n --consistency-models read-uncommitted --availability total
shell

And it works :D

Please take a look at the complete commit here.

Challenge #6b: Totally-Available, Read Uncommitted Transactions

#6b is to make the above single node to multi node. This challenge also only needs weak consistency. Since it is very similar to #6c, let’s combine the solution. In #6c, we need to support Strong consistency, which guarantees that transactions shouldn’t see uncommitted changes from other transactions. This can be easily supported with a simple and in-efficient solution of using a mutex when processing transactions. This mutex means only one transaction can run at once. Hence, it automatically guarantees there are no uncommitted changes visible.

Making the solution multi-node can be achieved by broadcasting all requests to a node to all other nodes via an RPC call with a new type replicate. These RPC calls can be done in the background, ensuring a node doesn’t depend on other nodes for handling requests. This ensures maximum availability of nodes.

The gist of the code can be seen below.

func (s *Store) processTxns(txs [][3]any) [][3]any {
	// We acquire a lock so that only one transaction is handled at a time
	s.mu.Lock()
	for _, t := range txs {
		if t[0] == "s" {
			t[2] = s.read(int(t[1].(float64)))
		}
		if t[0] == "w" {
			s.write(int(t[1].(float64)), int(t[2].(float64)))
		}
	}
	s.mu.Unlock()
	return txs
}

func main() {
    // A new type to handle broadcasts from other nodes.
	n.Handle("replicate", func(msg maelstrom.Message) error {
		type Txn struct {
			Typ   string   `json:"type"`
			MsgId int      `json:"msg_id"`
			Txn   [][3]any `json:"txn"`
		}

		var body Txn
		if err := json.Unmarshal(msg.Body, &body); err != nil {
			return err
		}

		s.processTxns(body.Txn)
		body.Typ = "replicate_ok"

		return n.Reply(msg, body)
	})

	n.Handle("txn", func(msg maelstrom.Message) error {
		type Txn struct {
			Typ   string   `json:"type"`
			MsgId int      `json:"msg_id"`
			Txn   [][3]any `json:"txn"`
		}

		var body Txn
		if err := json.Unmarshal(msg.Body, &body); err != nil {
			return err
		}

		// process transaction
		s.processTxns(body.Txn)

		// Replicate to all other nodes
		// Ideally, we can skip sending the reads
		go func() {
			repl := body
			repl.Typ = "replicate"
			for _, node := range neighbourNodes {
				_, err := n.SyncRPC(ctx, node, repl)
				for slices.Contains([]int{maelstrom.Timeout, maelstrom.TemporarilyUnavailable,
					maelstrom.Crash, maelstrom.Abort, maelstrom.TxnConflict,
				}, maelstrom.ErrorCode(err)) {
					_, err = n.SyncRPC(ctx, node, repl)
				}
			}
		}()
		body.Typ = "txn_ok"

		return n.Reply(msg, body)
	})
	....
go

Let’s run our solution with the below command.

cd maelstrom-txn-b; go install; cd ..
maelstrom test -w txn-rw-register --bin ~/go/bin/maelstrom-txn --node-count 2 --concurrency 2n --time-limit 20 --rate 1000 --consistency-models read-committed --availability total –-nemesis partition
shell

And it works :D

Please take a look at the complete commit here.

The end

With this, we have come to the end of the Gossip Glomers challenge.