September 17, 2022

A minimal distributed key-value database with Hashicorp's Raft library

When I wrote the "build a distributed PostgreSQL proof of concept" post I first had to figure out how to use Hashicorp's Raft implementation.

There weren't any examples I could find in the Hashicorp repo itself. And the only example I could find was Philip O'Toole's hraftd. It's great! However, I have a hard time following multi-file examples in general.

So I built my own single-file example. It's not perfect but it helped me get started and may help you too. We'll walk through that code, ~260 lines of Go, in this post.

The key-value database will only be able to set keys, not delete them. But it will be able to overwrite existing entries. And it will expose this distributed key-value database over an HTTP API.

Here's a sample interaction it will be able to support.

Terminal 1:

$ ./raft-example --node-id node1 --raft-port 2222 --http-port 8222

Terminal 2:

$ ./raft-example --node-id node2 --raft-port 2223 --http-port 8223

Terminal 3, tell 1 to have 2 follow it:

$ curl 'localhost:8222/join?followerAddr=localhost:2223&followerId=node2'

Terminal 3, now add a key:

$ curl -X POST 'localhost:8222/set' -d '{"key": "x", "value": "23"}' -H 'content-type: application/json'

Terminal 3, now get the key from either server:

$ curl 'localhost:8222/get?key=x'
{"data":"23"}
$ curl 'localhost:8223/get?key=x'
{"data":"23"}

Let's make it happen!

Eine kleine background

Raft is an algorithm for managing a replicated (basically append-only) log over a cluster of nodes. When you combine this with a state machine you get a stateful, distributed application. Log entries act as commands for the state machine. When a node in the Raft cluster crashes, it is brought up to date by sending (also called "replaying") all commands in the log through the state machine.

This can be made more efficient by implementing an application-specific concept of state snapshots. But since snapshots are just an optimization, we'll skip it entirely to keep this application simple.

If you want the details, just read the Raft paper! It is surprisingly accessible, especially as a user.

Our app

In our distributed key-value application, commands will be a serialized struct with a key and a value. The state machine will take each struct and set the key to the value in memory. Thus after replaying the entire log (and continuing to apply future log entries), each node will have an in-memory key-value store that is up to date with all other nodes in the cluster.

Note that although each node's key-value store will only be in memory, it will be backed by the durable append-only log! So with, Raft each in-memory key-value store will still be durable.

Let's get things set up in a file, main.go.

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "log"
    "net"
    "net/http"
    "os"
    "path"
    "sync"
    "time"

    "github.com/hashicorp/raft"
    "github.com/hashicorp/raft-boltdb"
)

The state machine

The state machine acts on an in-memory key-value store.

type kvFsm struct {
    db *sync.Map
}

There are three operations this Raft library wants us to implement on our state machine struct.

Apply

The Apply operation is sent to basically-up-to-date nodes to keep them up to date. An Apply call is made for each new log the leader commits.

Each log message will contain a key and value to store in the in-memory key-value store.

type setPayload struct {
    Key   string
    Value string
}

func (kf *kvFsm) Apply(log *raft.Log) any {
    switch log.Type {
    case raft.LogCommand:
        var sp setPayload
        err := json.Unmarshal(log.Data, &sp)
        if err != nil {
            return fmt.Errorf("Could not parse payload: %s", err)
        }

        kf.db.Store(sp.Key, sp.Value)
    default:
        return fmt.Errorf("Unknown raft log type: %#v", log.Type)
    }

    return nil
}

Here we're reading a log in a custom format. Later on down in the HTTP server we'll write the part that submits that log in this custom format.

The Raft library just cares that logs are (opaque) bytes. Whatever format works.

Restore

The Restore operation reads all logs and applies them to the state machine.

It looks very similar to the Apply function we just wrote except for that this operates on an io.ReadCloser of serialized log data rather than the high-level raft.Log struct.

And most importantly, and unlike the Apply function, Restore must reset all local state.

func (kf *kvFsm) Restore(rc io.ReadCloser) error {
    // Must always restore from a clean state!!
    kf.db.Range(func(key any, _ any) bool {
        kf.db.Delete(key)
        return true
    })

    decoder := json.NewDecoder(rc)

    for decoder.More() {
        var sp setPayload
        err := decoder.Decode(&sp)
        if err != nil {
            return fmt.Errorf("Could not decode payload: %s", err)
        }

        kf.db.Store(sp.Key, sp.Value)
    }

    return rc.Close()
}

The io.ReadCloser represents the latest snapshot or the beginning of time if there are no snapshots.

Snapshot

We won't implement this. But to satisfy the Go interface we must have empty some functions.

type snapshotNoop struct{}

func (sn snapshotNoop) Persist(_ raft.SnapshotSink) error { return nil }
func (sn snapshotNoop) Release()                          {}

func (kf *kvFsm) Snapshot() (raft.FSMSnapshot, error) {
    return snapshotNoop{}, nil
}

I think this is a correct noop. If we implemented a real snapshot we'd serialize the current key-value state, and raft.SnapshotSink.Write() it to the raft.SnapshotSink. That sink, in turn, is what is passed (as an io.ReadCloser) to the Restore method above.

So it must be that when we do not call raft.SnapshotSink.Close(), as the docs suggest, no snapshot gets recorded.

Since we aren't implementing snapshots, the Raft library must be doing its own serialization, writing each message's bytes directly to some sink.

If I'm wrong, feel free to correct me.

That's it for the state machine!

Raft node initialization

In order to start the Raft library behavior for each node, we need a whole bunch of boilerplate for Raft library initialization.

Each Raft node needs a TCP port that it uses to communicate with other nodes in the same cluster.

Each node starts out in a single-node cluster where it is the leader. Only when told to (and given the address of other nodes) does there become a multi-node cluster.

Each node also needs a permanent store for the append-only log. The Hashicorp Raft library suggests boltdb.

func setupRaft(dir, nodeId, raftAddress string, kf *kvFsm) (*raft.Raft, error) {
    os.MkdirAll(dir, os.ModePerm)

    store, err := raftboltdb.NewBoltStore(path.Join(dir, "bolt"))
    if err != nil {
        return nil, fmt.Errorf("Could not create bolt store: %s", err)
    }

    snapshots, err := raft.NewFileSnapshotStore(path.Join(dir, "snapshot"), 2, os.Stderr)
    if err != nil {
        return nil, fmt.Errorf("Could not create snapshot store: %s", err)
    }

    tcpAddr, err := net.ResolveTCPAddr("tcp", raftAddress)
    if err != nil {
        return nil, fmt.Errorf("Could not resolve address: %s", err)
    }

    transport, err := raft.NewTCPTransport(raftAddress, tcpAddr, 10, time.Second*10, os.Stderr)
    if err != nil {
        return nil, fmt.Errorf("Could not create tcp transport: %s", err)
    }

    raftCfg := raft.DefaultConfig()
    raftCfg.LocalID = raft.ServerID(nodeId)

    r, err := raft.NewRaft(raftCfg, kf, store, store, snapshots, transport)
    if err != nil {
        return nil, fmt.Errorf("Could not create raft instance: %s", err)
    }

    // Cluster consists of unjoined leaders. Picking a leader and
    // creating a real cluster is done manually after startup.
    r.BootstrapCluster(raft.Configuration{
        Servers: []raft.Server{
            {
                ID:      raft.ServerID(nodeId),
                Address: transport.LocalAddr(),
            },
        },
    })

    return r, nil
}

Now let's dig into how nodes learn about each other.

An HTTP API

This key-value store application will have an HTTP API serving two purposes:

  • Cluster management: telling a leader to add followers
  • Key-value storage: setting and getting keys
type httpServer struct {
    r  *raft.Raft
    db *sync.Map
}

Cluster management

In this library, the leader is told to add other nodes as its follower. (This feels backwards to me, but it is what it is!)

For this, the library requires a node ID and its internal TCP port for Raft messages.

These will both be parameters we give each node later on when the node process is started.

func (hs httpServer) joinHandler(w http.ResponseWriter, r *http.Request) {
    followerId := r.URL.Query().Get("followerId")
    followerAddr := r.URL.Query().Get("followerAddr")

    if hs.r.State() != raft.Leader {
        json.NewEncoder(w).Encode(struct {
            Error string `json:"error"`
        }{
            "Not the leader",
        })
        http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
        return
    }

    err := hs.r.AddVoter(raft.ServerID(followerId), raft.ServerAddress(followerAddr), 0, 0).Error()
    if err != nil {
        log.Printf("Failed to add follower: %s", err)
        http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
    }

    w.WriteHeader(http.StatusOK)
}

Key-value storage

This part of the HTTP API exposes setting and getting.

Set

Setting is where, instead of modifying the local database directly, we pass a message to the Raft cluster to store a log that contains the key and value.

Since we read log messages in kvFsm.Apply and kvFsm.Restore as a JSON encoding of the setPayload struct we created, we must write log messages like so as well. Or, specifically in this case, we just expect that the user passes a JSON body that matches the setPayload struct.

Then we call Apply on the Raft instance with the log message to get this process going.

func (hs httpServer) setHandler(w http.ResponseWriter, r *http.Request) {
    defer r.Body.Close()
    bs, err := io.ReadAll(r.Body)
    if err != nil {
        log.Printf("Could not read key-value in http request: %s", err)
        http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
        return
    }

    future := hs.r.Apply(bs, 500*time.Millisecond)

    // Blocks until completion
    if err := future.Error(); err != nil {
        log.Printf("Could not write key-value: %s", err)
        http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
        return
    }

    e := future.Response()
    if e != nil {
        log.Printf("Could not write key-value, application: %s", e)
        http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
        return
    }

    w.WriteHeader(http.StatusOK)
}

I'm not completely sure if `future.Response()` is supposed to be called from inside the `future.Error()` error block. You can see the docs for yourself.

Get

If we wanted to be completely consistent we would need to pass a read message through to the Raft cluster and check its result for a key's value. We'd need to implement that read message in the state machine.

But if we don't care strongly about consistency for reads we can just read the local in-memory store, skipping the Raft cluster.

func (hs httpServer) getHandler(w http.ResponseWriter, r *http.Request) {
    key := r.URL.Query().Get("key")
    value, _ := hs.db.Load(key)
    if value == nil {
        value = ""
    }

    rsp := struct {
        Data string `json:"data"`
    }{value.(string)}
    err := json.NewEncoder(w).Encode(rsp)
    if err != nil {
        log.Printf("Could not encode key-value in http response: %s", err)
        http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
    }
}

And that's it for the server!

Configuration

Let's throw together a quick helper for grabbing configuration from the CLI.

When the process is started, each node must be configured with a Raft-level TCP address, a Raft-level unique node ID, and an HTTP address (for our application).

type config struct {
    id       string
    httpPort string
    raftPort string
}

func getConfig() config {
    cfg := config{}
    for i, arg := range os.Args[1:] {
        if arg == "--node-id" {
            cfg.id = os.Args[i+2]
            i++
            continue
        }

        if arg == "--http-port" {
            cfg.httpPort = os.Args[i+2]
            i++
            continue
        }

        if arg == "--raft-port" {
            cfg.raftPort = os.Args[i+2]
            i++
            continue
        }
    }

    if cfg.id == "" {
        log.Fatal("Missing required parameter: --node-id")
    }

    if cfg.raftPort == "" {
        log.Fatal("Missing required parameter: --raft-port")
    }

    if cfg.httpPort == "" {
        log.Fatal("Missing required parameter: --http-port")
    }

    return cfg
}

And finally, the main() that brings it all together.

main

func main() {
    cfg := getConfig()

    db := &sync.Map{}
    kf := &kvFsm{db}

    dataDir := "data"
    err := os.MkdirAll(dataDir, os.ModePerm)
    if err != nil {
        log.Fatalf("Could not create data directory: %s", err)
    }

    r, err := setupRaft(path.Join(dataDir, "raft"+cfg.id), cfg.id, "localhost:"+cfg.raftPort, kf)
    if err != nil {
        log.Fatal(err)
    }

    hs := httpServer{r, db}

    http.HandleFunc("/set", hs.setHandler)
    http.HandleFunc("/get", hs.getHandler)
    http.HandleFunc("/join", hs.joinHandler)
    http.ListenAndServe(":"+cfg.httpPort, nil)
}

Build it.

$ go mod init raft-example
$ go mod tidy
$ go build

And give it a shot. :)

Terminal 1:

$ ./raft-example --node-id node1 --raft-port 2222 --http-port 8222

Terminal 2:

$ ./raft-example --node-id node2 --raft-port 2223 --http-port 8223

Terminal 3, tell 1 to have 2 follow it:

$ curl 'localhost:8222/join?followerAddr=localhost:2223&followerId=node2'

Terminal 3, now add a key:

$ curl -X POST 'localhost:8222/set' -d '{"key": "x", "value": "23"}' -H 'content-type: application/json'

Terminal 3, now get the key from either server:

$ curl 'localhost:8222/get?key=x'
{"data":"23"}
$ curl 'localhost:8223/get?key=x'
{"data":"23"}

And we're golden!