September 29, 2024

Build a serverless ACID database with this one neat trick (atomic PutIfAbsent)

Delta Lake is an open protocol for serverless ACID databases. Due to its simplicity, scalability, and the number of open-source implementations, it's quickly becoming the DuckDB of serverless transactional databases for analytics workloads. Iceberg is a contender too, and is similar in many ways. But since Delta Lake is simpler (simple != better) that's where we'll focus in this post.

Delta Lake has one of the most accessible database papers I've read (link). It's kind of like the movfuscator of databases.

Thanks to its simplicity, in this post we'll implement a Delta Lake-inspired serverless ACID database in 500 lines of Go code with zero dependencies. It will support creating tables, inserting rows into a table, and scanning all rows in a table. All while allowing concurrent readers and writers and achieving snapshot isolation.

There are other critical parts of Delta Lake we'll ignore: updating rows, deleting rows, checkpointing the transaction metadata log, compaction, and probably much more I'm not aware of. We must start somewhere.

All code for this post is available on GitHub.

Delta Lake basics

Delta Lake writes immutable data files to blob storage. It stores the names of new data files for a transaction in a metadata file. It handles concurrency (i.e. achieves snapshot isolation) with an atomic PutIfAbsent operation on the metadata file for the transaction.

This method of concurrency control works because the metadata files follow a naming scheme that includes the transaction id in the file name. When a new transaction starts, it finds all existing metadata files and picks its own transaction id by adding 1 to the largest transaction id it sees.

When a transaction goes to commit, writing the metadata file will fail if another transaction has already picked the same transaction id.

If a transaction does no writes and creates no tables, the transaction does not attempt to write any metadata file. Snapshot isolation!

Let's dig into the implementation.

Boilerplate

Let's give ourselves some nice assertion methods, a debug method, and a uuid generator. In main.go:

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "os"
    "path"
    "slices"
    "strings"
)

func assert(b bool, msg string) {
    if !b {
        panic(msg)
    }
}

func assertEq[C comparable](a C, b C, prefix string) {
    if a != b {
        panic(fmt.Sprintf("%s '%v' != '%v'", prefix, a, b))
    }
}

var DEBUG = slices.Contains(os.Args, "--debug")

func debug(a ...any) {
    if !DEBUG {
        return
    }

    args := append([]any{"[DEBUG]"}, a...)
    fmt.Println(args...)
}

// https://datatracker.ietf.org/doc/html/rfc4122#section-4.4
func uuidv4() string {
    f, err := os.Open("/dev/random")
    assert(err == nil, fmt.Sprintf("could not open /dev/random: %s", err))
    defer f.Close()

    buf := make([]byte, 16)
    n, err := f.Read(buf)
    assert(err == nil, fmt.Sprintf("could not read 16 bytes from /dev/random: %s", err))
    assert(n == len(buf), "expected 16 bytes from /dev/random")

    // Set bit 6 to 0
    buf[8] &= ^(byte(1) << 6)
    // Set bit 7 to 1
    buf[8] |= 1 << 7

    // Set version
    buf[6] &= ^(byte(1) << 4)
    buf[6] &= ^(byte(1) << 5)
    buf[6] |= 1 << 6
    buf[6] &= ^(byte(1) << 7)

    return fmt.Sprintf("%x-%x-%x-%x-%x",
        buf[:4],
        buf[4:6],
        buf[6:8],
        buf[8:10],
        buf[10:16])
}

Is that uuid method correct? Hopefully. Efficient? No. But it's preferable to avoid dependencies in pedagogical projects.

Moving on.

Blob storage requirements

As mentioned above, the basic requirement is that we support atomically writing some bytes to a location if the location doesn't already exist.

On top of that we also need the ability to list locations by prefix, and the ability to read the bytes at some location.

We'll diverge from Delta Lake in how we name files on disk. For one, we'll keep all files in the same directory with a fixed prefix for metadata and another table name prefix for each data file. This simplifies the implementation of listPrefix a bit.

So let's set up an interface to describe these requirements:

type objectStorage interface {
    // Must be atomic.
    putIfAbsent(name string, bytes []byte) error
    listPrefix(prefix string) ([]string, error)
    read(name string) ([]byte, error)
}

And this is literally all we need to get ACID transactions. That's crazy!

Atomic Put and cloud blob storage

We could implement the atomic putIfAbsent part of this interface in 2024 using conditional writes on S3. Or we could implement this interface with the If-None-Match header on Azure Cloud Storage. Or we could implement this interface with the x-goog-if-generation-match header on Google Cloud Storage.

Indeed a good exercise for the reader would be to implement this interface for other blob storage providers and see your serverless cloud database in action!

But the simplest method of all is to implement it on the filesystem, which is what we'll do next.

A filesystem blob store

If we had a server we could implement atomic putIfAbsent with a mutex. But we're serverless baby. Thankfully, POSIX supports atomic file creation if you open the file with O_CREAT | O_EXCL.

So we'll just open the file and write out all the bytes. If there is an error at any point, we must remove the file.

type fileObjectStorage struct {
    basedir string
}

func newFileObjectStorage(basedir string) *fileObjectStorage {
    return &fileObjectStorage{basedir}
}

func (fos *fileObjectStorage) putIfAbsent(name string, bytes []byte) error {
    filename := path.Join(fos.basedir, name)
    f, err := os.OpenFile(filename, os.O_WRONLY|os.O_EXCL|os.O_CREATE, 0644)
    if err != nil {
        return err
    }

    written := 0
    bufSize := 1024 * 16
    for written < len(bytes) {
        toWrite := min(written+bufSize, len(bytes))
        n, err := f.Write(bytes[written:toWrite])
        if err != nil {
            removeErr := os.Remove(filename)
            assert(removeErr == nil, "could not remove")
            return err
        }

        written += n
    }

    err = f.Sync()
    if err != nil {
        removeErr := os.Remove(filename)
        assert(removeErr == nil, "could not remove")
        return err
    }

    err = f.Close()
    if err != nil {
        removeErr := os.Remove(filename)
        assert(removeErr == nil, "could not remove")
        return err
    }

    return nil
}

listPrefix and read are minimal wrappers around filesystem APIs:

func (fos *fileObjectStorage) listPrefix(prefix string) ([]string, error) {
    dir := path.Join(fos.basedir)
    f, err := os.Open(dir)
    if err != nil {
        return nil, err
    }

    var files []string
    for err != io.EOF {
        var names []string
        names, err = f.Readdirnames(100)
        if err != nil && err != io.EOF {
            return nil, err
        }

        for _, n := range names {
            if prefix == "" || strings.HasPrefix(n, prefix) {
                files = append(files, n)
            }
        }
    }
    err = f.Close()
    return files, err
}

func (fos *fileObjectStorage) read(name string) ([]byte, error) {
    filename := path.Join(fos.basedir, name)
    return os.ReadFile(filename)
}

It is worth talking a bit about reading a directory though. Go doesn't provide a nice iterator API for us and I didn't want to implement this as callbacks with path/filepath.WalkDir.

We could use os.File.ReadDir but it allocates for all files in the directory. Sure, in a pedagogical project we don't worry about millions of files. But the ReadDir API, the error cases in particular, also isn't much simpler than Readdirnames.

What's more, even though we iterated through batches of directory entries, and did prefix filtering before accumulating, we still could have considered returning an iterator here ourselves. It seems possible and likely that the number of data files grows quite large in a production system. But I was lazy.

It would be nice if Go introduced an actual iterator API for reading a directory. :)

Delta Lake and stale reads

In any case the ACID properties of Delta Lake (and Iceberg) don't depend on being able to read up-to-date data.

This is because concurrent (or stale) transactions that write will fail on commit. And also because all files written (even metadata files) are immutable.

Since all data is immutable, we will always be able to read at least a consistent snapshot of data. But we will never be able to get SERIALIZABLE read-only transactions. This is just how Delta Lake and Iceberg work. And it is a similar or better consistency level to what any major SQL database gives you by default.

You'll see what I mean later on when we implement transaction commits.

Transaction boilerplate

Now that we've got a blob storage abstraction and a filesystem implementation of it, let's start sketching out what a client and what a transaction looks like.

In Delta Lake, a transaction consists of a list of actions. An action might be to define a table's schema, or to add a data file, or to remove a data file, etc. In this post we'll only implement the first two actions.

type DataobjectAction struct {
    Name  string
    Table string
}

type ChangeMetadataAction struct {
    Table   string
    Columns []string
}

// an enum, only one field will be non-nil
type Action struct {
    AddDataobject  *DataobjectAction
    ChangeMetadata *ChangeMetadataAction
    // TODO: Support object removal.
    // DeleteDataobject *DataobjectAction
}

These fields are all exported (i.e. capitalized, if you're not familiar with Go) because we will be writing them to disk when the transaction commits as the transaction's metadata.

In fact Actions and the transaction's id will be the only parts of the transaction we write to disk. Everything else will be in-memory state.

For our convenience we will track in memory a history of all previous actions, a mapping of table columns, and a mapping of unflushed data by table.

type transaction struct {
    Id int

    // Both are mapping table name to a list of actions on the table.
    previousActions map[string][]Action
    Actions         map[string][]Action

    // Mapping tables to column names.
    tables map[string][]string

    // Mapping table name to unflushed/in-memory rows. When rows
    // are flushed, the dataobject that contains them is added to
    // `tx.actions` above and `tx.unflushedDataPointer[table]` is
    // reset to `0`.
    unflushedData        map[string]*[DATAOBJECT_SIZE][]any
    unflushedDataPointer map[string]int
}

Only the current transaction will ever have transaction.previousActions filled out. transaction.tables will be populated when the transaction starts by reading through transaction.previousActions for ChangeMetadataActions, and we will also add onto it when we create a table in the current transaction.

We will append to transaction.Actions every time we write a new data file and every time we create a new table.

We will add rows to transaction.unflushedData for a table until transaction.unflushedDataPointer for that table reaches DATAOBJECT_SIZE upon which time we will write that data to disk and add a DataobjectAction entry to transaction.Actions.

Client boilerplate

A client will consistent of an objectStorage implementation and a possibly empty *transaction. Empty meaning there is no current transaction.

type client struct {
    os objectStorage
    // Current transaction, if any. Only one transaction per
    // client at a time. All reads and writes must be within a
    // transaction.
    tx *transaction
}

func newClient(os objectStorage) client {
    return client{os, nil}
}

var (
    errExistingTx  = fmt.Errorf("Existing Transaction")
    errNoTx        = fmt.Errorf("No Transaction")
    errTableExists = fmt.Errorf("Table Exists")
    errNoTable     = fmt.Errorf("No Such Table")
)

Client or database?

In a previous version of my code I named this client struct database. But that's misleading. There is no central database. There is just the client and the blob storage.

Clients work with transactions directly and only when attempting to commit does the blob storage abstraction let the client know if the transaction succeeded or not.

Starting a transaction

When we start a transaction, we will first read all existing transactions from disk and accumulate the actions from each prior transaction.

We will interpret ChangeMetadataActions and materialize them into a current view of all tables.

And we will assign a transaction ID to this transaction to be 1 greater than the largest existing transaction ID we see.

Again it doesn't matter if the listPrefix call we use returns an up-to-date list. Notably on blob storage there are few guarantees about LIST operations recency. The Delta Lake paper mentions this too.

Out-of-date transactions attempting to write will be caught when we go to commit the transaction. Out-of-date transactions attempting only to read will still read a consistent snapshot.

func (d *client) newTx() error {
    if d.tx != nil {
        return errExistingTx
    }

    logPrefix := "_log_"
    txLogFilenames, err := d.os.listPrefix(logPrefix)
    if err != nil {
        return err
    }

    tx := &transaction{}
    tx.previousActions = map[string][]Action{}
    tx.Actions = map[string][]Action{}
    tx.tables = map[string][]string{}
    tx.unflushedData = map[string]*[DATAOBJECT_SIZE][]any{}
    tx.unflushedDataPointer = map[string]int{}

    for _, txLogFilename := range txLogFilenames {
        bytes, err := d.os.read(txLogFilename)
        if err != nil {
            return err
        }

        var oldTx transaction
        err = json.Unmarshal(bytes, &oldTx)
        if err != nil {
            return err
        }
        // Transaction metadata files are sorted
        // lexicographically so that the most recent
        // transaction (i.e. the one with the largest
        // transaction id) will be last and tx.Id will end up
        // 1 greater than the most recent transaction ID we
        // see on disk.
        tx.Id = oldTx.Id + 1

        for table, actions := range oldTx.Actions {
            for _, action := range actions {
                if action.AddDataobject != nil {
                    tx.previousActions[table] = append(tx.previousActions[table], action)
                } else if action.ChangeMetadata != nil {
                    // Store the latest version of
                    // each table in memory for
                    // easy lookup.
                    mtd := action.ChangeMetadata
                    tx.tables[table] = mtd.Columns
                } else {
                    panic(fmt.Sprintf("unsupported action: %v", action))
                }
            }
        }
    }

    d.tx = tx
    return nil
}

And we're set.

Creating a table

When we create a table, we need to add a ChangeMetadataAction to the transactions Actions. And we also want to add the table info to the in-memory transaction.tables field.

We don't do any of this durably. The change here will be written to disk on commit (if the transaction succeeds).

func (d *client) createTable(table string, columns []string) error {
    if d.tx == nil {
        return errNoTx
    }

    if _, exists := d.tx.tables[table]; exists {
        return errTableExists
    }

    // Store it in the in-memory mapping.
    d.tx.tables[table] = columns

    // And also add it to the action history for future transactions.
    d.tx.Actions[table] = append(d.tx.Actions[table], Action{
        ChangeMetadata: &ChangeMetadataAction{
            Table:   table,
            Columns: columns,
        },
    })

    return nil
}

Easy peasy. Now for the fun part, writing data!

Writing a row

This is the next area where we'll diverge from Delta Lake. For the sake of zero dependencies we are going to store data in-memory as an array of array of any. And when we later write rows to disk we'll write them as JSON. A real Delta Lake implementation would store data in-memory in Apache Arrow format, and write to disk as Parquet.

In line with Delta Lake though we will buffer data in memory until we get 64K rows. When we get 64K rows for a particular table we will flush all those rows to disk. (When we go to commit a transaction we will flush any outstanding rows.)

func (d *client) writeRow(table string, row []any) error {
    if d.tx == nil {
        return errNoTx
    }

    if _, ok := d.tx.tables[table]; !ok {
        return errNoTable
    }

    // Try to find an unflushed/in-memory dataobject for this table
    pointer, ok := d.tx.unflushedDataPointer[table]
    if !ok {
        d.tx.unflushedDataPointer[table] = 0
        d.tx.unflushedData[table] = &[DATAOBJECT_SIZE][]any{}
    }

    if pointer == DATAOBJECT_SIZE {
        d.flushRows(table)
        pointer = 0
    }

    d.tx.unflushedData[table][pointer] = row
    d.tx.unflushedDataPointer[table]++
    return nil
}

Now let's implement flushing.

Flushing a data object

Recall that data objects in Delta Lake (and Iceberg) are immutable. Once we've got enough data to write a data object, we give it a unique name, write it to disk, and add a AddObjectAction to the transaction's list of Actions.

type dataobject struct {
    Table string
    Name  string
    Data  [DATAOBJECT_SIZE][]any
    Len   int
}

func (d *client) flushRows(table string) error {
    if d.tx == nil {
        return errNoTx
    }

    // First write out dataobject if there is anything to write out.
    pointer, exists := d.tx.unflushedDataPointer[table]
    if !exists || pointer == 0 {
        return nil
    }

    df := dataobject{
        Table: table,
        Name:  uuidv4(),
        Data:  *d.tx.unflushedData[table],
        Len:   pointer,
    }
    bytes, err := json.Marshal(df)
    if err != nil {
        return err
    }

    err = d.os.putIfAbsent(fmt.Sprintf("_table_%s_%s", table, df.Name), bytes)
    if err != nil {
        return err
    }

    // Then record the newly written data file.
    d.tx.Actions[table] = append(d.tx.Actions[table], Action{
        AddDataobject: &DataobjectAction{
            Table: table,
            Name:  df.Name,
        },
    })

    // Reset in-memory pointer.
    d.tx.unflushedDataPointer[table] = 0
    return nil
}

That's it for writing data! Let's now look at reading data.

Scanning a table

We're going to make scanning mildly more complicated than it needed to be in pedagogical code because we'll have client.scan() return an iterator rather than an array with all rows.

The scanIterator will first read from in-memory (unflushed) data. And then it will read through every data object for the table that is still a part of this transaction. We will know which data objects are still a part of this transaction by reading through all AddDataobject actions. A future version of this project would also eliminate data object files from the list by observing DeleteDataobject actions. But we don't do that in this post.

func (d *client) scan(table string) (*scanIterator, error) {
    if d.tx == nil {
        return nil, errNoTx
    }

    var dataobjects []string
    allActions := append(d.tx.previousActions[table], d.tx.Actions[table]...)
    for _, action := range allActions {
        if action.AddDataobject != nil {
            dataobjects = append(dataobjects, action.AddDataobject.Name)
        }
    }

    var unflushedRows [DATAOBJECT_SIZE][]any
    if data, ok := d.tx.unflushedData[table]; ok {
        unflushedRows = *data
    }

    return &scanIterator{
        unflushedRows:    unflushedRows,
        unflushedRowsLen: d.tx.unflushedDataPointer[table],
        d:                d,
        table:            table,
        dataobjects:      dataobjects,
    }, nil
}

The scanIterator needs to track where we are in in-memory rows, in data objects, and within a particular data object.

type scanIterator struct {
    d     *client
    table string

    // First we iterate through unflushed rows.
    unflushedRows       [DATAOBJECT_SIZE][]any
    unflushedRowsLen    int
    unflushedRowPointer int

    // Then we move through each dataobject.
    dataobjects        []string
    dataobjectsPointer int

    // And within each dataobject we iterate through rows.
    dataobject           *dataobject
    dataobjectRowPointer int
}

And the scanIterator will be driven by a next() method that goes through in-memory data first and then through what's on disk.

func (d *client) readDataobject(table, name string) (*dataobject, error) {
    bytes, err := d.os.read(fmt.Sprintf("_table_%s_%s", table, name))
    if err != nil {
        return nil, err
    }

    var do dataobject
    err = json.Unmarshal(bytes, &do)
    return &do, err
}

// returns (nil, nil) when done
func (si *scanIterator) next() ([]any, error) {
    // Iterate through in-memory rows first.
    if si.unflushedRowPointer < si.unflushedRowsLen {
        row := si.unflushedRows[si.unflushedRowPointer]
        si.unflushedRowPointer++
        return row, nil
    }

    // If we've gotten through all dataobjects on disk we're done.
    if si.dataobjectsPointer == len(si.dataobjects) {
        return nil, nil
    }

    if si.dataobject == nil {
        name := si.dataobjects[si.dataobjectsPointer]
        o, err := si.d.readDataobject(si.table, name)
        if err != nil {
            return nil, err
        }

        si.dataobject = o
    }

    if si.dataobjectRowPointer > si.dataobject.Len {
        si.dataobjectsPointer++
        si.dataobject = nil
        si.dataobjectRowPointer = 0
        return si.next()
    }

    row := si.dataobject.Data[si.dataobjectRowPointer]
    si.dataobjectRowPointer++
    return row, nil
}

That's it for scanning a table! The final piece of the puzzle is committing a transaction.

Committing a transaction

When we commit a transaction we must flush any remaining data. A read-only transaction (one which has no Actions) is immediately done. There is no concurrency check.

Otherwise we will serialize transaction state and attempt to atomically putIfAbsent.

The only way this will fail is if there is another concurrent writer.

func (d *client) commitTx() error {
    if d.tx == nil {
        return errNoTx
    }

    // Flush any outstanding data
    for table := range d.tx.tables {
        err := d.flushRows(table)
        if err != nil {
            d.tx = nil
            return err
        }
    }

    wrote := false
    for _, actions := range d.tx.Actions {
        if len(actions) > 0 {
            wrote = true
            break
        }
    }
    // Read-only transaction, no need to do a concurrency check.
    if !wrote {
        d.tx = nil
        return nil
    }

    filename := fmt.Sprintf("_log_%020d", d.tx.Id)
    // We won't store previous actions, they will be recovered on
    // new transactions. So unset them. Honestly not totally
    // clear why.
    d.tx.previousActions = nil
    bytes, err := json.Marshal(d.tx)
    if err != nil {
        d.tx = nil
        return err
    }

    err = d.os.putIfAbsent(filename, bytes)
    d.tx = nil
    return err
}

func main() {
    panic("unimplemented")
}

This is the crux of Delta Lake. It's simple. And honestly it's a bit shocking. Real Delta Lake does support automatic retries in some cases. But primarily you are limited to a single writer per table, even if the writers are writing non-conflicting rows. Iceberg is basically the same here, it's just how metadata is tracked that differs.

The Delta Lake and Iceberg folks apparently wanted to avoid FoundationDB (i.e. the Snowflake architecture, which is mentioned in the Delta Lake paper) so much that they'd give up row-level concurrency to be mostly serverless.

Is it worth it? Dunno. Delta Lake and Iceberg are getting massive adoption. Many very smart people have worked, and continue to work, on both. Moreover it is apparently what the market wants. Every database-like product is implementing, or is planning to implement, Delta Lake or Iceberg.

Trying it out

Let's add a test in main_test.go to see what happens with concurrent writers. Follow the comments and debug logs for details:

package main

import (
    "os"
    "testing"
)

func TestConcurrentTableWriters(t *testing.T) {
    dir, err := os.MkdirTemp("", "test-database")

    if err != nil {
        panic(err)
    }

    defer os.Remove(dir)

    fos := newFileObjectStorage(dir)
    c1Writer := newClient(fos)
    c2Writer := newClient(fos)

    // Have c2Writer start up a transaction.
    err = c2Writer.newTx()
    assertEq(err, nil, "could not start first c2 tx")
    debug("[c2] new tx")

    // But then have c1Writer start a transaction and commit it first.
    err = c1Writer.newTx()
    assertEq(err, nil, "could not start first c1 tx")
    debug("[c1] new tx")
    err = c1Writer.createTable("x", []string{"a", "b"})
    assertEq(err, nil, "could not create x")
    debug("[c1] Created table")
    err = c1Writer.writeRow("x", []any{"Joey", 1})
    assertEq(err, nil, "could not write first row")
    debug("[c1] Wrote row")
    err = c1Writer.writeRow("x", []any{"Yue", 2})
    assertEq(err, nil, "could not write second row")
    debug("[c1] Wrote row")
    err = c1Writer.commitTx()
    assertEq(err, nil, "could not commit tx")
    debug("[c1] Committed tx")

    // Now go back to c2 and write data.
    err = c2Writer.createTable("x", []string{"a", "b"})
    assertEq(err, nil, "could not create x")
    debug("[c2] Created table")
    err = c2Writer.writeRow("x", []any{"Holly", 1})
    assertEq(err, nil, "could not write first row")
    debug("[c2] Wrote row")

    err = c2Writer.commitTx()
    assert(err != nil, "concurrent commit must fail")
    debug("[c2] tx not committed")
}

Try it out:

$ go mod init otf
$ go mod tidy
$ go test -run TestConcurrentTableWriters -- --debug
[DEBUG] [c2] new tx
[DEBUG] [c1] new tx
[DEBUG] [c1] Created table
[DEBUG] [c1] Wrote row
[DEBUG] [c1] Wrote row
[DEBUG] [c1] Committed tx
[DEBUG] [c2] Created table
[DEBUG] [c2] Wrote row
[DEBUG] [c2] tx not committed
PASS
ok      otf 0.311s

That's pretty cool.

And what about a reader and concurrent writer? Observe that the reader always reads a snapshot. Follow the comments again for detail:

func TestConcurrentReaderWithWriterReadsSnapshot(t *testing.T) {
    dir, err := os.MkdirTemp("", "test-database")

    if err != nil {
        panic(err)
    }

    defer os.Remove(dir)

    fos := newFileObjectStorage(dir)
    c1Writer := newClient(fos)
    c2Reader := newClient(fos)

    // First create some data and commit the transaction.
    err = c1Writer.newTx()
    assertEq(err, nil, "could not start first c1 tx")
    debug("[c1Writer] Started tx")
    err = c1Writer.createTable("x", []string{"a", "b"})
    assertEq(err, nil, "could not create x")
    debug("[c1Writer] Created table")
    err = c1Writer.writeRow("x", []any{"Joey", 1})
    assertEq(err, nil, "could not write first row")
    debug("[c1Writer] Wrote row")
    err = c1Writer.writeRow("x", []any{"Yue", 2})
    assertEq(err, nil, "could not write second row")
    debug("[c1Writer] Wrote row")
    err = c1Writer.commitTx()
    assertEq(err, nil, "could not commit tx")
    debug("[c1Writer] Committed tx")

    // Now start a new transaction for more edits.
    err = c1Writer.newTx()
    assertEq(err, nil, "could not start second c1 tx")
    debug("[c1Writer] Starting new write tx")

    // Before we commit this second write-transaction, start a
    // read transaction.
    err = c2Reader.newTx()
    assertEq(err, nil, "could not start c2 tx")
    debug("[c2Reader] Started tx")

    // Write and commit rows in c1.
    err = c1Writer.writeRow("x", []any{"Ada", 3})
    assertEq(err, nil, "could not write third row")
    debug("[c1Writer] Wrote third row")

    // Scan x in read-only transaction
    it, err := c2Reader.scan("x")
    assertEq(err, nil, "could not scan x")
    debug("[c2Reader] Started scanning")
    seen := 0
    for {
        row, err := it.next()
        assertEq(err, nil, "could not iterate x scan")

        if row == nil {
            debug("[c2Reader] Done scanning")
            break
        }

        debug("[c2Reader] Got row in reader tx", row)
        if seen == 0 {
            assertEq(row[0], "Joey", "row mismatch in c1")
            assertEq(row[1], 1.0, "row mismatch in c1")
        } else {
            assertEq(row[0], "Yue", "row mismatch in c1")
            assertEq(row[1], 2.0, "row mismatch in c1")
        }

        seen++
    }
    assertEq(seen, 2, "expected two rows")

    // Scan x in c1 write transaction
    it, err = c1Writer.scan("x")
    assertEq(err, nil, "could not scan x in c1")
    debug("[c1Writer] Started scanning")
    seen = 0
    for {
        row, err := it.next()
        assertEq(err, nil, "could not iterate x scan in c1")

        if row == nil {
            debug("[c1Writer] Done scanning")
            break
        }

        debug("[c1Writer] Got row in tx", row)

        if seen == 0 {
            assertEq(row[0], "Ada", "row mismatch in c1")
            // Since this hasn't been serialized to JSON, it's still an int not a float.
            assertEq(row[1], 3, "row mismatch in c1")
        } else if seen == 1 {
            assertEq(row[0], "Joey", "row mismatch in c1")
            assertEq(row[1], 1.0, "row mismatch in c1")
        } else {
            assertEq(row[0], "Yue", "row mismatch in c1")
            assertEq(row[1], 2.0, "row mismatch in c1")
        }

        seen++
    }
    assertEq(seen, 3, "expected three rows")

    // Writer committing should succeed.
    err = c1Writer.commitTx()
    assertEq(err, nil, "could not commit second tx")
    debug("[c1Writer] Committed tx")

    // Reader committing should succeed.
    err = c2Reader.commitTx()
    assertEq(err, nil, "could not commit read-only tx")
    debug("[c2Reader] Committed tx")
}

Run it:

$ go test -run TestConcurrentReaderWithWriterReadsSnapshot -- --debug
[DEBUG] [c1Writer] Started tx
[DEBUG] [c1Writer] Created table
[DEBUG] [c1Writer] Wrote row
[DEBUG] [c1Writer] Wrote row
[DEBUG] [c1Writer] Committed tx
[DEBUG] [c1Writer] Starting new write tx
[DEBUG] [c2Reader] Started tx
[DEBUG] [c1Writer] Wrote third row
[DEBUG] [c2Reader] Started scanning
[DEBUG] [c2Reader] Got row in reader tx [Joey 1]
[DEBUG] [c2Reader] Got row in reader tx [Yue 2]
[DEBUG] [c2Reader] Done scanning
[DEBUG] [c1Writer] Started scanning
[DEBUG] [c1Writer] Got row in tx [Ada 3]
[DEBUG] [c1Writer] Got row in tx [Joey 1]
[DEBUG] [c1Writer] Got row in tx [Yue 2]
[DEBUG] [c1Writer] Done scanning
[DEBUG] [c1Writer] Committed tx
[DEBUG] [c2Reader] Committed tx
PASS
ok      otf 0.252s

Sweet.

What's next?

As mentioned, we didn't touch a lot of things. Handling updates and deletes, transaction log checkpoints, data object compaction, etc.

Take a close look at the Delta Lake paper and the Delta Lake Spec and see what you can do!