What is CockroachDB under the hood? Take a look at its go.mod and notice a number of dependencies that do a lot of work: a PostgreSQL wire protocol implementation, a storage layer, a Raft implementation for distributed consensus. And not part of go.mod but still building on 3rd party code, PostgreSQL's grammar definition.
To be absurdly reductionist, CockroachDB is just the glue around these libraries. With that reductionist mindset, let's try building a distributed Postgres proof of concept ourselves! We'll use only four major external libraries: for parsing SQL, handling Postgres's wire protocol, handling Raft, and handling the storage of table metadata and rows themselves.
For a not-reductionist understanding of the CockroachDB internals, I recommend following the excellent Cockroach Engineering blog and Jordan Lewis's Hacking CockroachDB Twitch stream.
By the end of this post, in around 600 lines of code, we'll have a
distributed "Postgres implementation" that will accept writes
(CREATE TABLE
, INSERT
) on the leader and accept reads (SELECT
)
on any node. All nodes will contain the same data.
Here is a sample interaction against the leader:
$ psql -h localhost -p 6000
psql (13.4, server 0.0.0)
Type "help" for help.
phil=> create table x (age int, name text);
CREATE ok
phil=> insert into x values(14, 'garry'), (20, 'ted');
could not interpret result from server: INSERT ok
INSERT ok
phil=> select name, age from x;
name | age
---------+-----
"garry" | 14
"ted" | 20
(2 rows)
And against a follower (note the different port):
$ psql -h 127.0.0.1 -p 6001
psql (13.4, server 0.0.0)
Type "help" for help.
phil=> select age, name from x;
age | name
-----+---------
20 | "ted"
14 | "garry"
(2 rows)
All code for this post is available on Github in the fondly named WaterbugDB repo.
Plan of attack
Influenced by Philip O'Toole's talk on rqlite at Hacker
Nights we'll
have a Postgres wire protocol server in front. As it receives queries
it will respond immediately to SELECT
s. Otherwise for CREATE TABLE
s
and INSERT
s it will send the entire query string to the Raft
cluster. Each process that is part of the Raft cluster will implement
the appropriate functions for handling Raft messages. In this case the
messages will just be to create a table or insert data.
So every running process will run a Postgres wire protocol server, a Raft server, and an HTTP server that you'll see is an implementation detail about how processes join to the same Raft cluster.
Every running process will have its own directory for storing data.
Raft
There is likely a difference between Raft, the paper, and Raft, the implementations. When I refer to Raft in the rest of this post I'm going to be referring to an implementation.
And although CockroachDB use's etcd's Raft implementation, I didn't realize that when I started building this project. I used Hashicorp's Raft implementation.
Raft allows us to reliably keep multiple nodes in sync with a log of messages. Each node in the Raft cluster implements a finite state machine (FSM) with three operations: apply, snapshot, and restore. Our finite state machine will embed a postgres engine we'll build out after this to handle query execution.
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"path"
"strings"
"time"
"github.com/google/uuid"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
"github.com/jackc/pgproto3/v2"
pgquery "github.com/pganalyze/pg_query_go/v2"
bolt "go.etcd.io/bbolt"
)
type pgFsm struct {
pe *pgEngine
}
From what I understand, the snapshot operation allows Raft to truncate logs. It is used in conjuction with restoring. On startup if there is a snapshot, restore is called so you can load the snapshot. Then afterwards all logs not yet snapshotted are replayed through the apply operation.
To keep this implementation simple we'll just fail all snapshots so restore will never be called and all logs will be replayed every time on startup through the apply operation. This is of course inefficient but it keeps the code simpler.
When we write the startup code we'll need to delete the database so that these apply calls happen fresh.
type snapshotNoop struct{}
func (sn snapshotNoop) Persist(sink raft.SnapshotSink) error {
return sink.Cancel()
}
func (sn snapshotNoop) Release() {}
func (pf *pgFsm) Snapshot() (raft.FSMSnapshot, error) {
return snapshotNoop{}, nil
}
func (pf *pgFsm) Restore(rc io.ReadCloser) error {
return fmt.Errorf("Nothing to restore")
}
Finally, applying is receiving a single message and applying it for the
node. In this project the message will be a CREATE TABLE
or INSERT
query. So we'll parse the query and pass it to the postgres engine for
execution.
func (pf *pgFsm) Apply(log *raft.Log) interface{} {
switch log.Type {
case raft.LogCommand:
ast, err := pgquery.Parse(string(log.Data))
if err != nil {
panic(fmt.Errorf("Could not parse payload: %s", err))
}
err = pf.pe.execute(ast)
if err != nil {
panic(err)
}
default:
panic(fmt.Errorf("Unknown raft log type: %#v", log.Type))
}
return nil
}
Panic-ing here is actually the advised behavior.
Raft server
Now we can set up the actual Raft server and pass an instance of this FSM. This is a bunch of boilerplate that would matter in production installs but for us basically we just need to tell Raft where to run and how to store its own internal data, including its all-important message log.
func setupRaft(dir, nodeId, raftAddress string, pf *pgFsm) (*raft.Raft, error) {
os.MkdirAll(dir, os.ModePerm)
store, err := raftboltdb.NewBoltStore(path.Join(dir, "bolt"))
if err != nil {
return nil, fmt.Errorf("Could not create bolt store: %s", err)
}
snapshots, err := raft.NewFileSnapshotStore(path.Join(dir, "snapshot"), 2, os.Stderr)
if err != nil {
return nil, fmt.Errorf("Could not create snapshot store: %s", err)
}
tcpAddr, err := net.ResolveTCPAddr("tcp", raftAddress)
if err != nil {
return nil, fmt.Errorf("Could not resolve address: %s", err)
}
transport, err := raft.NewTCPTransport(raftAddress, tcpAddr, 10, time.Second*10, os.Stderr)
if err != nil {
return nil, fmt.Errorf("Could not create tcp transport: %s", err)
}
raftCfg := raft.DefaultConfig()
raftCfg.LocalID = raft.ServerID(nodeId)
r, err := raft.NewRaft(raftCfg, pf, store, store, snapshots, transport)
if err != nil {
return nil, fmt.Errorf("Could not create raft instance: %s", err)
}
// Cluster consists of unjoined leaders. Picking a leader and
// creating a real cluster is done manually after startup.
r.BootstrapCluster(raft.Configuration{
Servers: []raft.Server{
{
ID: raft.ServerID(nodeId),
Address: transport.LocalAddr(),
},
},
})
return r, nil
}
Every instance of this process will run this and will start off as a leader in a new cluster. We'll expose an HTTP server that allows a leader to talk to other leaders to tell them to stop leading and follow it. This HTTP endpoint in the HTTP server is how we'll get from N process with N leaders and N clusters to N processes with 1 leader and 1 cluster.
That's basically it for the core Raft bits. So let's build out that HTTP server and follow endpoint.
HTTP follow endpoint
Our HTTP server will have just one endpoint that tells the process (a) to contact another process (b) so that process (b) joins the process (a) cluster.
The HTTP server will need to have the process (a)'s Raft instance to be able to start this join action. And in order for Raft to know how to contact the process (b) we'll need to tell it both the process (b)'s unique Raft node id (we'll give it a unique id ourselves when we start the process) and the process (b)'s Raft server port.
type httpServer struct {
r *raft.Raft
}
func (hs httpServer) addFollowerHandler(w http.ResponseWriter, r *http.Request) {
followerId := r.URL.Query().Get("id")
followerAddr := r.URL.Query().Get("addr")
if hs.r.State() != raft.Leader {
json.NewEncoder(w).Encode(struct {
Error string `json:"error"`
}{
"Not the leader",
})
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
err := hs.r.AddVoter(raft.ServerID(followerId), raft.ServerAddress(followerAddr), 0, 0).Error()
if err != nil {
log.Printf("Failed to add follower: %s", err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusOK)
}
That's it! Let's move on to the query engine.
Query engine
The query engine is a wrapper around a storage layer. We'll bring in bbolt.
I originally built this with Cockroach's pebble but pebble has a transitive dependency on a C library that has function names that conflict with function names in the C library that pg_query_go wraps.
type pgEngine struct {
db *bolt.DB
bucketName []byte
}
func newPgEngine(db *bolt.DB) *pgEngine {
return &pgEngine{db, []byte("data")}
}
bbolt organizes data into buckets. Buckets might be a natural way to store table rows (one bucket per table) but to keep the implementation simple we'll put all table metadata and row data into a single `data` bucket.
The entrypoint we called in the Raft apply implementation above was
execute
. It took a parsed list of statements. We'll iterate over the
statements, figuring out the kind of each statement, and call out to a
dedicated helper for each kind.
func (pe *pgEngine) execute(tree *pgquery.ParseResult) error {
for _, stmt := range tree.GetStmts() {
n := stmt.GetStmt()
if c := n.GetCreateStmt(); c != nil {
return pe.executeCreate(c)
}
if c := n.GetInsertStmt(); c != nil {
return pe.executeInsert(c)
}
if c := n.GetSelectStmt(); c != nil {
_, err := pe.executeSelect(c)
return err
}
return fmt.Errorf("Unknown statement type: %s", stmt)
}
return nil
}
The pg_query_go docs are not super helpful. I had to build a separate AST explorer program to make it easier to understand this parser.
Let's start with creating a table.
Create table
When a table is created, we'll need to store its metadata.
type tableDefinition struct {
Name string
ColumnNames []string
ColumnTypes []string
}
First we pull that metadata out of the AST.
func (pe *pgEngine) executeCreate(stmt *pgquery.CreateStmt) error {
tbl := tableDefinition{}
tbl.Name = stmt.Relation.Relname
for _, c := range stmt.TableElts {
cd := c.GetColumnDef()
tbl.ColumnNames = append(tbl.ColumnNames, cd.Colname)
// Names is namespaced. So `INT` is pg_catalog.int4. `BIGINT` is pg_catalog.int8.
var columnType string
for _, n := range cd.TypeName.Names {
if columnType != "" {
columnType += "."
}
columnType += n.GetString_().Str
}
tbl.ColumnTypes = append(tbl.ColumnTypes, columnType)
}
Now we need to store this in the storage layer. The easiest/dumbest
way to do this is to serialize the metadata to JSON and store it with
key: tables_${tableName}
.
tableBytes, err := json.Marshal(tbl)
if err != nil {
return fmt.Errorf("Could not marshal table: %s", err)
}
err = pe.db.Update(func(tx *bolt.Tx) error {
bkt, err := tx.CreateBucketIfNotExists(pe.bucketName)
if err != nil {
return err
}
return bkt.Put([]byte("tables_"+tbl.Name), tableBytes)
})
if err != nil {
return fmt.Errorf("Could not set key-value: %s", err)
}
return nil
}
Next we'll build a helper to reverse that operation, pulling out table metadata from the storage layer by the table name:
func (pe *pgEngine) getTableDefinition(name string) (*tableDefinition, error) {
var tbl tableDefinition
err := pe.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(pe.bucketName)
if bkt == nil {
return fmt.Errorf("Table does not exist")
}
valBytes := bkt.Get([]byte("tables_" + name))
err := json.Unmarshal(valBytes, &tbl)
if err != nil {
return fmt.Errorf("Could not unmarshal table: %s", err)
}
return nil
})
return &tbl, err
}
That's it for our basic CREATE TABLE
support! Let's do INSERT
next.
Insert row
Our support for insert will only support literal/constant VALUES
.
func (pe *pgEngine) executeInsert(stmt *pgquery.InsertStmt) error {
tblName := stmt.Relation.Relname
slct := stmt.GetSelectStmt().GetSelectStmt()
for _, values := range slct.ValuesLists {
var rowData []any
for _, value := range values.GetList().Items {
if c := value.GetAConst(); c != nil {
if s := c.Val.GetString_(); s != nil {
rowData = append(rowData, s.Str)
continue
}
if i := c.Val.GetInteger(); i != nil {
rowData = append(rowData, i.Ival)
continue
}
}
return fmt.Errorf("Unknown value type: %s", value)
}
It would be better to abstract this VALUES
code into a helper so it
could be used by SELECT
s too but out of laziness we'll just keep
this here.
Next we need to write the row to the storage layer. We'll serialize the row data to JSON (inefficient because we know the row structure, but JSON is easy). We'll store the row with a prefix including the table name and we'll give its key a unique UUID. When we're iterating over rows in the table we'll be able to do a prefix scan that will recover just the rows in this table.
rowBytes, err := json.Marshal(rowData)
if err != nil {
return fmt.Errorf("Could not marshal row: %s", err)
}
id := uuid.New().String()
err = pe.db.Update(func(tx *bolt.Tx) error {
bkt, err := tx.CreateBucketIfNotExists(pe.bucketName)
if err != nil {
return err
}
return bkt.Put([]byte("rows_"+tblName+"_"+id), rowBytes)
})
if err != nil {
return fmt.Errorf("Could not store row: %s", err)
}
}
return nil
}
Finally we can move on to support SELECT
!
Select rows
Unlike CREATE TABLE
and INSERT
, SELECT
will need to return rows,
column names, and because the Postgres wire protocol wants it, column
types.
type pgResult struct {
fieldNames []string
fieldTypes []string
rows [][]any
}
First we pull out the table name and the fields selected, looking up field types in the table metadata.
func (pe *pgEngine) executeSelect(stmt *pgquery.SelectStmt) (*pgResult, error) {
tblName := stmt.FromClause[0].GetRangeVar().Relname
tbl, err := pe.getTableDefinition(tblName)
if err != nil {
return nil, err
}
results := &pgResult{}
for _, c := range stmt.TargetList {
fieldName := c.GetResTarget().Val.GetColumnRef().Fields[0].GetString_().Str
results.fieldNames = append(results.fieldNames, fieldName)
fieldType := ""
for i, cn := range tbl.ColumnNames {
if cn == fieldName {
fieldType = tbl.ColumnTypes[i]
}
}
if fieldType == "" {
return nil, fmt.Errorf("Unknown field: %s", fieldName)
}
results.fieldTypes = append(results.fieldTypes, fieldType)
}
Finally, we do a prefix scan to grab all rows in the table from the storage layer.
prefix := []byte("rows_" + tblName + "_")
pe.db.View(func(tx *bolt.Tx) error {
c := tx.Bucket(pe.bucketName).Cursor()
for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
var row []any
err = json.Unmarshal(v, &row)
if err != nil {
return fmt.Errorf("Unable to unmarshal row: %s", err)
}
var targetRow []any
for _, target := range results.fieldNames {
for i, field := range tbl.ColumnNames {
if target == field {
targetRow = append(targetRow, row[i])
}
}
}
results.rows = append(results.rows, targetRow)
}
return nil
})
return results, nil
}
That's it for SELECT
! The last function we'll implement is a
helper for deleting all data in the storage layer. This will be called
on startup before Raft logs are applied so the database always ends up
in a consistent state.
func (pe *pgEngine) delete() error {
return pe.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(pe.bucketName)
if bkt != nil {
return tx.DeleteBucket(pe.bucketName)
}
return nil
})
}
And we're ready to move on to the final layer, the Postgres wire protocol.
Postgres wire protocol server
jackc/pgproto3 is an
implementation of the Postgres wire protocol for Go. It allows us to
implement a server that can respond to requests by Postgres clients
like psql
.
It works by wrapping a TCP connection. So we'll start by building a function that does the TCP serving loop.
func runPgServer(port string, db *bolt.DB, r *raft.Raft) {
ln, err := net.Listen("tcp", "localhost:"+port)
if err != nil {
log.Fatal(err)
}
for {
conn, err := ln.Accept()
if err != nil {
log.Fatal(err)
}
pc := pgConn{conn, db, r}
go pc.handle()
}
}
The pgConn
instance needs access to the database directly so it can
respond to SELECT
s. And it needs the Raft instance for all other
queries.
type pgConn struct {
conn net.Conn
db *bolt.DB
r *raft.Raft
}
The handle
function we called above will grab the current message
via the pgproto3 package and handle startup messages and regular
messages.
func (pc pgConn) handle() {
pgc := pgproto3.NewBackend(pgproto3.NewChunkReader(pc.conn), pc.conn)
defer pc.conn.Close()
err := pc.handleStartupMessage(pgc)
if err != nil {
log.Println(err)
return
}
for {
err := pc.handleMessage(pgc)
if err != nil {
log.Println(err)
return
}
}
}
Startup messages include authorization and SSL checks. We'll allow anything in the former and respond "no" to the latter.
func (pc pgConn) handleStartupMessage(pgconn *pgproto3.Backend) error {
startupMessage, err := pgconn.ReceiveStartupMessage()
if err != nil {
return fmt.Errorf("Error receiving startup message: %s", err)
}
switch startupMessage.(type) {
case *pgproto3.StartupMessage:
buf := (&pgproto3.AuthenticationOk{}).Encode(nil)
buf = (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(buf)
_, err = pc.conn.Write(buf)
if err != nil {
return fmt.Errorf("Error sending ready for query: %s", err)
}
return nil
case *pgproto3.SSLRequest:
_, err = pc.conn.Write([]byte("N"))
if err != nil {
return fmt.Errorf("Error sending deny SSL request: %s", err)
}
return pc.handleStartupMessage(pgconn)
default:
return fmt.Errorf("Unknown startup message: %#v", startupMessage)
}
}
Within the main handleMessage
logic we'll check the type of message.
func (pc pgConn) handleMessage(pgc *pgproto3.Backend) error {
msg, err := pgc.Receive()
if err != nil {
return fmt.Errorf("Error receiving message: %s", err)
}
switch t := msg.(type) {
case *pgproto3.Query:
// TODO
case *pgproto3.Terminate:
return nil
default:
return fmt.Errorf("Received message other than Query from client: %s", msg)
}
return nil
}
If the message is a query we'll parse it and respond immediately to SELECT
s.
switch t := msg.(type) {
case *pgproto3.Query:
stmts, err := pgquery.Parse(t.String)
if err != nil {
return fmt.Errorf("Error parsing query: %s", err)
}
if len(stmts.GetStmts()) > 1 {
return fmt.Errorf("Only make one request at a time.")
}
stmt := stmts.GetStmts()[0]
// Handle SELECTs here
s := stmt.GetStmt().GetSelectStmt()
if s != nil {
pe := newPgEngine(pc.db)
res, err := pe.executeSelect(s)
if err != nil {
return err
}
pc.writePgResult(res)
return nil
}
(We'll implement that writePgResult
helper shortly below.) Otherwise
we'll add the query to the Raft log and return a basic response.
// Otherwise it's DDL/DML, raftify
future := pc.r.Apply([]byte(t.String), 500*time.Millisecond)
if err := future.Error(); err != nil {
return fmt.Errorf("Could not apply: %s", err)
}
e := future.Response()
if e != nil {
return fmt.Errorf("Could not apply (internal): %s", e)
}
pc.done(nil, strings.ToUpper(strings.Split(t.String, " ")[0])+" ok")
case *pgproto3.Terminate:
return nil
default:
return fmt.Errorf("Received message other than Query from client: %s", msg)
}
return nil
}
done
is an important helper that tells the Postgres connection that
the query is complete and the server is ready to receive another
query. Without this response psql
just hangs.
func (pc pgConn) done(buf []byte, msg string) {
buf = (&pgproto3.CommandComplete{CommandTag: []byte(msg)}).Encode(buf)
buf = (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(buf)
_, err := pc.conn.Write(buf)
if err != nil {
log.Printf("Failed to write query response: %s", err)
}
}
And now let's implement the writePgResult
helper. This function
needs to translate from our pgResult
struct to the format require by
pgproto3.
var dataTypeOIDMap = map[string]uint32{
"text": 25,
"pg_catalog.int4": 23,
}
func (pc pgConn) writePgResult(res *pgResult) {
rd := &pgproto3.RowDescription{}
for i, field := range res.fieldNames {
rd.Fields = append(rd.Fields, pgproto3.FieldDescription{
Name: []byte(field),
DataTypeOID: dataTypeOIDMap[res.fieldTypes[i]],
})
}
buf := rd.Encode(nil)
for _, row := range res.rows {
dr := &pgproto3.DataRow{}
for _, value := range row {
bs, err := json.Marshal(value)
if err != nil {
log.Printf("Failed to marshal cell: %s\n", err)
return
}
dr.Values = append(dr.Values, bs)
}
buf = dr.Encode(buf)
}
pc.done(buf, fmt.Sprintf("SELECT %d", len(res.rows)))
}
And we're done with everything but func main()
!
Main
On startup, each process must be assigned (by the parent process) a
unique node id (any unique string is ok) and ports for the Raft
server, Postgres server, and HTTP server. We'll build a short
getConfig
helper to grab these from arguments.
type config struct {
id string
httpPort string
raftPort string
pgPort string
}
func getConfig() config {
cfg := config{}
for i, arg := range os.Args[1:] {
if arg == "--node-id" {
cfg.id = os.Args[i+2]
i++
continue
}
if arg == "--http-port" {
cfg.httpPort = os.Args[i+2]
i++
continue
}
if arg == "--raft-port" {
cfg.raftPort = os.Args[i+2]
i++
continue
}
if arg == "--pg-port" {
cfg.pgPort = os.Args[i+2]
i++
continue
}
}
if cfg.id == "" {
log.Fatal("Missing required parameter: --node-id")
}
if cfg.raftPort == "" {
log.Fatal("Missing required parameter: --raft-port")
}
if cfg.httpPort == "" {
log.Fatal("Missing required parameter: --http-port")
}
if cfg.pgPort == "" {
log.Fatal("Missing required parameter: --pg-port")
}
return cfg
}
Now in main
we'll grab the config and set up this process's
database. All processes will put their data in a top-level data
directory to make managing the directories easier. But within that
directory each process will have their own unique directories for data
storage based on the unique node id.
func main() {
cfg := getConfig()
dataDir := "data"
err := os.MkdirAll(dataDir, os.ModePerm)
if err != nil {
log.Fatalf("Could not create data directory: %s", err)
}
db, err := bolt.Open(path.Join(dataDir, "/data"+cfg.id), 0600, nil)
if err != nil {
log.Fatalf("Could not open bolt db: %s", err)
}
defer db.Close()
We need to clean up the database.
pe := newPgEngine(db)
// Start off in clean state
pe.delete()
Set up the Raft server.
pf := &pgFsm{pe}
r, err := setupRaft(path.Join(dataDir, "raft"+cfg.id), cfg.id, "localhost:"+cfg.raftPort, pf)
if err != nil {
log.Fatal(err)
}
Set up the HTTP server.
hs := httpServer{r}
http.HandleFunc("/add-follower", hs.addFollowerHandler)
go func() {
err := http.ListenAndServe(":"+cfg.httpPort, nil)
if err != nil {
log.Fatal(err)
}
}()
And finally, kick off the Postgres server.
runPgServer(cfg.pgPort, db, r)
}
Finally. Finally. Finally done. Let's give it a go. :)
What hath god wrought
First, initialize the go module and then build the app.
$ go mod init waterbugdb
$ go mod tidy
$ go build
Now in terminal 1 start an instance of the database,
$ ./waterbugdb --node-id node1 --raft-port 2222 --http-port 8222 --pg-port 6000
Then in terminal 2 start another instance.
$ ./waterbugdb --node-id node2 --raft-port 2223 --http-port 8223 --pg-port 6001
And in terminal 3, tell node1
to have node2
follow it.
$ curl 'localhost:8222/add-follower?addr=localhost:2223&id=node2'
And then open psql
against port 6000
, the leader.
$ psql -h localhost -p 6000
psql -h 127.0.0.1 -p 6000
psql (13.4, server 0.0.0)
Type "help" for help.
phil=> create table x (age int, name text);
CREATE ok
phil=> insert into x values(14, 'garry'), (20, 'ted');
could not interpret result from server: INSERT ok
INSERT ok
phil=> select name, age from x;
name | age
---------+-----
"garry" | 14
"ted" | 20
(2 rows)
Now kill node1
in terminal 1. Then start it up again. node2
will
now be the leader. So exit psql
in terminal 3 and enter it again
pointed at node2
, port 6001
. Add new data.
$ psql -h 127.0.0.1 -p 6001
psql (13.4, server 0.0.0)
Type "help" for help.
phil=> insert into x values(19, 'ava'), (18, 'ming');
could not interpret result from server: INSERT ok
phil=> select age, name from x;
age | name
-----+---------
20 | "ted"
14 | "garry"
18 | "ming"
19 | "ava"
Exit psql
in terminal 3 and start it up again against node1
again,
port 6000
.
$ psql -h 127.0.0.1 -p 6000
psql (13.4, server 0.0.0)
Type "help" for help.
phil=> select age, name from x;
age | name
-----+---------
20 | "ted"
14 | "garry"
18 | "ming"
19 | "ava"
(2 rows)
Nifty stuff.
Summary
So on the one hand this was a more complex post than my usual. Each process needed three servers running. Two of those servers we managed directly and the Raft server was managed by the Raft library.
On the other hand, we did this all in a really small amount of code. Yes many edge cases were unhandled and massive amount of SQL was unhandled. And yes there are tons of inefficiencies like using JSON, an unstructured format when every table has fixed structure. But hopefully now you have an idea of how a project like this could be structured. And there's the beginnings of a framework for filling in syntax/edge cases over time.
Additionally, the only problem we solved with consensus was replication, not sharding. This, and it's more complicated cousin (cross-shard transactions), is truly the special sauce Cockroach brings.
Read more about building an intuition for sharding, replication, and distributed consensus [here](https://notes.eatonphil.com/2024-02-08-an-intuition-for-distributed-consensus-in-oltp-systems.html.
New blog post is up :) Let's build a distributed postgres proof of concept.https://t.co/Z8BDzF1bUw pic.twitter.com/aSkOjr9Yrh
— Phil Eaton (@phil_eaton) May 17, 2022