Delta Lake is an open protocol for serverless ACID databases. Due to its simplicity, scalability, and the number of open-source implementations, it's quickly becoming the DuckDB of serverless transactional databases for analytics workloads. Iceberg is a contender too, and is similar in many ways. But since Delta Lake is simpler (simple != better) that's where we'll focus in this post.
Delta Lake has one of the most accessible database papers I've read (link). It's kind of like the movfuscator of databases.
Thanks to its simplicity, in this post we'll implement a Delta Lake-inspired serverless ACID database in 500 lines of Go code with zero dependencies. It will support creating tables, inserting rows into a table, and scanning all rows in a table. All while allowing concurrent readers and writers and achieving snapshot isolation.
There are other critical parts of Delta Lake we'll ignore: updating rows, deleting rows, checkpointing the transaction metadata log, compaction, and probably much more I'm not aware of. We must start somewhere.
All code for this post is available on GitHub.
Delta Lake basics
Delta Lake writes immutable data files to blob storage. It stores the names of new data files for a transaction in a metadata file. It handles concurrency (i.e. achieves snapshot isolation) with an atomic PutIfAbsent operation on the metadata file for the transaction.
This method of concurrency control works because the metadata files follow a naming scheme that includes the transaction id in the file name. When a new transaction starts, it finds all existing metadata files and picks its own transaction id by adding 1 to the largest transaction id it sees.
When a transaction goes to commit, writing the metadata file will fail if another transaction has already picked the same transaction id.
If a transaction does no writes and creates no tables, the transaction does not attempt to write any metadata file. Snapshot isolation!
Let's dig into the implementation.
Boilerplate
Let's give ourselves some nice assertion methods, a debug method, and
a uuid generator. In main.go
:
package main
import (
"encoding/json"
"fmt"
"io"
"os"
"path"
"slices"
"strings"
)
func assert(b bool, msg string) {
if !b {
panic(msg)
}
}
func assertEq[C comparable](a C, b C, prefix string) {
if a != b {
panic(fmt.Sprintf("%s '%v' != '%v'", prefix, a, b))
}
}
var DEBUG = slices.Contains(os.Args, "--debug")
func debug(a ...any) {
if !DEBUG {
return
}
args := append([]any{"[DEBUG]"}, a...)
fmt.Println(args...)
}
// https://datatracker.ietf.org/doc/html/rfc4122#section-4.4
func uuidv4() string {
f, err := os.Open("/dev/random")
assert(err == nil, fmt.Sprintf("could not open /dev/random: %s", err))
defer f.Close()
buf := make([]byte, 16)
n, err := f.Read(buf)
assert(err == nil, fmt.Sprintf("could not read 16 bytes from /dev/random: %s", err))
assert(n == len(buf), "expected 16 bytes from /dev/random")
// Set bit 6 to 0
buf[8] &= ^(byte(1) << 6)
// Set bit 7 to 1
buf[8] |= 1 << 7
// Set version
buf[6] &= ^(byte(1) << 4)
buf[6] &= ^(byte(1) << 5)
buf[6] |= 1 << 6
buf[6] &= ^(byte(1) << 7)
return fmt.Sprintf("%x-%x-%x-%x-%x",
buf[:4],
buf[4:6],
buf[6:8],
buf[8:10],
buf[10:16])
}
Is that uuid method correct? Hopefully. Efficient? No. But it's preferable to avoid dependencies in pedagogical projects.
Moving on.
Blob storage requirements
As mentioned above, the basic requirement is that we support atomically writing some bytes to a location if the location doesn't already exist.
On top of that we also need the ability to list locations by prefix, and the ability to read the bytes at some location.
We'll diverge from Delta Lake in how we name files on disk. For one,
we'll keep all files in the same directory with a fixed prefix for
metadata and another table name prefix for each data file. This
simplifies the implementation of listPrefix
a bit.
However, this also diverges from Delta Lake in that transactions
will represent all tables. In Delta Lake that is not so. Delta Lake
has a per-table transaction log. Only transactions that read and
write the same table in Delta Lake achieve snapshot isolation.
So let's set up an interface to describe these requirements:
type objectStorage interface {
// Must be atomic.
putIfAbsent(name string, bytes []byte) error
listPrefix(prefix string) ([]string, error)
read(name string) ([]byte, error)
}
And this is literally all we need to get ACID transactions. That's crazy!
Atomic Put and cloud blob storage
We could implement the atomic putIfAbsent
part of this interface in
2024 using conditional
writes
on S3. Or we could implement this interface with the If-None-Match
header
on Azure Cloud Storage. Or we could implement this interface with the
x-goog-if-generation-match
header on
Google Cloud Storage.
Indeed a good exercise for the reader would be to implement this interface for other blob storage providers and see your serverless cloud database in action!
But the simplest method of all is to implement it on the filesystem, which is what we'll do next.
A filesystem blob store
If we had a server we could implement atomic putIfAbsent
with a
mutex. But we're serverless baby. Thankfully, POSIX supports atomic
link
which will fail if the new name is already a file.
So we'll just create a temporary file and write out all bytes. Finally, we link the temporary file to the permanent name we intended. For cleanliness (not correctness), if there is an error at any point, we'll remove the temporary file.
type fileObjectStorage struct {
basedir string
}
func newFileObjectStorage(basedir string) *fileObjectStorage {
return &fileObjectStorage{basedir}
}
func (fos *fileObjectStorage) putIfAbsent(name string, bytes []byte) error {
tmpfilename := path.Join(fos.basedir, uuidv4())
f, err := os.OpenFile(tmpfilename, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return err
}
written := 0
bufSize := 1024 * 16
for written < len(bytes) {
toWrite := min(written+bufSize, len(bytes))
n, err := f.Write(bytes[written:toWrite])
if err != nil {
removeErr := os.Remove(tmpfilename)
assert(removeErr == nil, "could not remove")
return err
}
written += n
}
err = f.Sync()
if err != nil {
removeErr := os.Remove(tmpfilename)
assert(removeErr == nil, "could not remove")
return err
}
err = f.Close()
if err != nil {
removeErr := os.Remove(tmpfilename)
assert(removeErr == nil, "could not remove")
return err
}
filename := path.Join(fos.basedir, name)
err = os.Link(tmpfilename, filename)
if err != nil {
removeErr := os.Remove(tmpfilename)
assert(removeErr == nil, "could not remove")
return err
}
return nil
}
yencabulator
on HN pointed out that an earlier version of this post had a buggy
implementation of putIfAbsent
(that attempted to manage
atomicity solely via O_EXCL | O_CREAT
) would leave
around potentially bad metadata files if the os.Remove
call ever failed.
The link
approach works around that because the file is
already fully and correctly written by the time we do the link.
listPrefix
and read
are minimal wrappers around filesystem APIs:
func (fos *fileObjectStorage) listPrefix(prefix string) ([]string, error) {
dir := path.Join(fos.basedir)
f, err := os.Open(dir)
if err != nil {
return nil, err
}
var files []string
for err != io.EOF {
var names []string
names, err = f.Readdirnames(100)
if err != nil && err != io.EOF {
return nil, err
}
for _, n := range names {
if prefix == "" || strings.HasPrefix(n, prefix) {
files = append(files, n)
}
}
}
err = f.Close()
return files, err
}
func (fos *fileObjectStorage) read(name string) ([]byte, error) {
filename := path.Join(fos.basedir, name)
return os.ReadFile(filename)
}
It is worth talking a bit about reading a directory though. Go doesn't
provide a nice iterator API for us and I didn't want to implement this
as callbacks with
path/filepath.WalkDir
.
We could use os.File.ReadDir
but it allocates for all files in the directory. Sure, in a
pedagogical project we don't worry about millions of files. But the
ReadDir
API, the error cases in particular, also isn't much simpler
than Readdirnames
.
What's more, even though we iterated through batches of directory entries, and did prefix filtering before accumulating, we still could have considered returning an iterator here ourselves. It seems possible and likely that the number of data files grows quite large in a production system. But I was lazy.
It would be nice if Go introduced an actual iterator API for reading a directory. :)
Delta Lake and stale reads
In any case the ACID properties of Delta Lake (and Iceberg) don't depend on being able to read up-to-date data.
This is because concurrent (or stale) transactions that write will fail on commit. And also because all files written (even metadata files) are immutable.
Since all data is immutable, we will always be able to read at least a consistent snapshot of data. But we will never be able to get SERIALIZABLE read-only transactions. This is just how Delta Lake and Iceberg work. And it is a similar or better consistency level to what any major SQL database gives you by default.
You'll see what I mean later on when we implement transaction commits.
Transaction boilerplate
Now that we've got a blob storage abstraction and a filesystem implementation of it, let's start sketching out what a client and what a transaction looks like.
In Delta Lake, a transaction consists of a list of actions. An action might be to define a table's schema, or to add a data file, or to remove a data file, etc. In this post we'll only implement the first two actions.
type DataobjectAction struct {
Name string
Table string
}
type ChangeMetadataAction struct {
Table string
Columns []string
}
// an enum, only one field will be non-nil
type Action struct {
AddDataobject *DataobjectAction
ChangeMetadata *ChangeMetadataAction
// TODO: Support object removal.
// DeleteDataobject *DataobjectAction
}
These fields are all exported (i.e. capitalized, if you're not familiar with Go) because we will be writing them to disk when the transaction commits as the transaction's metadata.
In fact Action
s and the transaction's id will be the only parts of
the transaction we write to disk. Everything else will be in-memory
state.
For our convenience we will track in memory a history of all previous actions, a mapping of table columns, and a mapping of unflushed data by table.
type transaction struct {
Id int
// Both are mapping table name to a list of actions on the table.
previousActions map[string][]Action
Actions map[string][]Action
// Mapping tables to column names.
tables map[string][]string
// Mapping table name to unflushed/in-memory rows. When rows
// are flushed, the dataobject that contains them is added to
// `tx.actions` above and `tx.unflushedDataPointer[table]` is
// reset to `0`.
unflushedData map[string]*[DATAOBJECT_SIZE][]any
unflushedDataPointer map[string]int
}
Only the current transaction
will ever have
transaction.previousActions
filled out. transaction.tables
will be
populated when the transaction starts by reading through
transaction.previousActions
for ChangeMetadataAction
s, and we will
also add onto it when we create a table in the current transaction.
We will append to transaction.Actions
every time we write a new data
file and every time we create a new table.
We will add rows to transaction.unflushedData
for a table until
transaction.unflushedDataPointer
for that table reaches
DATAOBJECT_SIZE
upon which time we will write that data to disk and
add a DataobjectAction
entry to transaction.Actions
.
Client boilerplate
A client
will consist of an objectStorage
implementation and a
possibly empty *transaction
. Empty meaning there is no current
transaction.
type client struct {
os objectStorage
// Current transaction, if any. Only one transaction per
// client at a time. All reads and writes must be within a
// transaction.
tx *transaction
}
func newClient(os objectStorage) client {
return client{os, nil}
}
var (
errExistingTx = fmt.Errorf("Existing Transaction")
errNoTx = fmt.Errorf("No Transaction")
errTableExists = fmt.Errorf("Table Exists")
errNoTable = fmt.Errorf("No Such Table")
)
Client or database?
In a previous version of my code I named this client
struct
database
. But that's misleading. There is no central database. There
is just the client and the blob storage.
Clients work with transactions directly and only when attempting to commit does the blob storage abstraction let the client know if the transaction succeeded or not.
Starting a transaction
When we start a transaction, we will first read all existing transactions from disk and accumulate the actions from each prior transaction.
We will interpret ChangeMetadataAction
s and materialize them into a
current view of all tables.
And we will assign a transaction ID to this transaction to be 1 greater than the largest existing transaction ID we see.
Again it doesn't matter if the listPrefix
call we use returns an
up-to-date list. Notably on blob storage there are few guarantees
about LIST operations recency. The Delta Lake paper mentions this too.
Out-of-date transactions attempting to write will be caught when we go to commit the transaction. Out-of-date transactions attempting only to read will still read a consistent snapshot.
func (d *client) newTx() error {
if d.tx != nil {
return errExistingTx
}
logPrefix := "_log_"
txLogFilenames, err := d.os.listPrefix(logPrefix)
if err != nil {
return err
}
tx := &transaction{}
tx.previousActions = map[string][]Action{}
tx.Actions = map[string][]Action{}
tx.tables = map[string][]string{}
tx.unflushedData = map[string]*[DATAOBJECT_SIZE][]any{}
tx.unflushedDataPointer = map[string]int{}
for _, txLogFilename := range txLogFilenames {
bytes, err := d.os.read(txLogFilename)
if err != nil {
return err
}
var oldTx transaction
err = json.Unmarshal(bytes, &oldTx)
if err != nil {
return err
}
// Transaction metadata files are sorted
// lexicographically so that the most recent
// transaction (i.e. the one with the largest
// transaction id) will be last and tx.Id will end up
// 1 greater than the most recent transaction ID we
// see on disk.
tx.Id = oldTx.Id + 1
for table, actions := range oldTx.Actions {
for _, action := range actions {
if action.AddDataobject != nil {
tx.previousActions[table] = append(tx.previousActions[table], action)
} else if action.ChangeMetadata != nil {
// Store the latest version of
// each table in memory for
// easy lookup.
mtd := action.ChangeMetadata
tx.tables[table] = mtd.Columns
} else {
panic(fmt.Sprintf("unsupported action: %v", action))
}
}
}
}
d.tx = tx
return nil
}
And we're set.
Creating a table
When we create a table, we need to add a ChangeMetadataAction
to the
transactions Actions
. And we also want to add the table info to the
in-memory transaction.tables
field.
We don't do any of this durably. The change here will be written to disk on commit (if the transaction succeeds).
func (d *client) createTable(table string, columns []string) error {
if d.tx == nil {
return errNoTx
}
if _, exists := d.tx.tables[table]; exists {
return errTableExists
}
// Store it in the in-memory mapping.
d.tx.tables[table] = columns
// And also add it to the action history for future transactions.
d.tx.Actions[table] = append(d.tx.Actions[table], Action{
ChangeMetadata: &ChangeMetadataAction{
Table: table,
Columns: columns,
},
})
return nil
}
Easy peasy. Now for the fun part, writing data!
Writing a row
This is the next area where we'll diverge from Delta Lake. For the
sake of zero dependencies we are going to store data in-memory as an
array of array of any
. And when we later write rows to disk we'll
write them as JSON. A real Delta Lake implementation would store data
in-memory in Apache Arrow format, and write to disk as Parquet.
In line with Delta Lake though we will buffer data in memory until we get 64K rows. When we get 64K rows for a particular table we will flush all those rows to disk. (When we go to commit a transaction we will flush any outstanding rows.)
func (d *client) writeRow(table string, row []any) error {
if d.tx == nil {
return errNoTx
}
if _, ok := d.tx.tables[table]; !ok {
return errNoTable
}
// Try to find an unflushed/in-memory dataobject for this table
pointer, ok := d.tx.unflushedDataPointer[table]
if !ok {
d.tx.unflushedDataPointer[table] = 0
d.tx.unflushedData[table] = &[DATAOBJECT_SIZE][]any{}
}
if pointer == DATAOBJECT_SIZE {
d.flushRows(table)
pointer = 0
}
d.tx.unflushedData[table][pointer] = row
d.tx.unflushedDataPointer[table]++
return nil
}
Now let's implement flushing.
Flushing a data object
Recall that data objects in Delta Lake (and Iceberg) are
immutable. Once we've got enough data to write a data object, we give
it a unique name, write it to disk, and add a AddObjectAction
to the
transaction's list of Actions
.
type dataobject struct {
Table string
Name string
Data [DATAOBJECT_SIZE][]any
Len int
}
func (d *client) flushRows(table string) error {
if d.tx == nil {
return errNoTx
}
// First write out dataobject if there is anything to write out.
pointer, exists := d.tx.unflushedDataPointer[table]
if !exists || pointer == 0 {
return nil
}
df := dataobject{
Table: table,
Name: uuidv4(),
Data: *d.tx.unflushedData[table],
Len: pointer,
}
bytes, err := json.Marshal(df)
if err != nil {
return err
}
err = d.os.putIfAbsent(fmt.Sprintf("_table_%s_%s", table, df.Name), bytes)
if err != nil {
return err
}
// Then record the newly written data file.
d.tx.Actions[table] = append(d.tx.Actions[table], Action{
AddDataobject: &DataobjectAction{
Table: table,
Name: df.Name,
},
})
// Reset in-memory pointer.
d.tx.unflushedDataPointer[table] = 0
return nil
}
That's it for writing data! Let's now look at reading data.
Scanning a table
We're going to make scanning mildly more complicated than it needed to
be in pedagogical code because we'll have client.scan()
return an
iterator rather than an array with all rows.
The scanIterator
will first read from in-memory (unflushed)
data. And then it will read through every data object for the table
that is still a part of this transaction. We will know which data
objects are still a part of this transaction by reading through all
AddDataobject
actions. A future version of this project would also
eliminate data object files from the list by observing
DeleteDataobject
actions. But we don't do that in this post.
func (d *client) scan(table string) (*scanIterator, error) {
if d.tx == nil {
return nil, errNoTx
}
var dataobjects []string
allActions := append(d.tx.previousActions[table], d.tx.Actions[table]...)
for _, action := range allActions {
if action.AddDataobject != nil {
dataobjects = append(dataobjects, action.AddDataobject.Name)
}
}
var unflushedRows [DATAOBJECT_SIZE][]any
if data, ok := d.tx.unflushedData[table]; ok {
unflushedRows = *data
}
return &scanIterator{
unflushedRows: unflushedRows,
unflushedRowsLen: d.tx.unflushedDataPointer[table],
d: d,
table: table,
dataobjects: dataobjects,
}, nil
}
The scanIterator
needs to track where we are in in-memory rows, in
data objects, and within a particular data object.
type scanIterator struct {
d *client
table string
// First we iterate through unflushed rows.
unflushedRows [DATAOBJECT_SIZE][]any
unflushedRowsLen int
unflushedRowPointer int
// Then we move through each dataobject.
dataobjects []string
dataobjectsPointer int
// And within each dataobject we iterate through rows.
dataobject *dataobject
dataobjectRowPointer int
}
And the scanIterator
will be driven by a next()
method that goes
through in-memory data first and then through what's on disk.
func (d *client) readDataobject(table, name string) (*dataobject, error) {
bytes, err := d.os.read(fmt.Sprintf("_table_%s_%s", table, name))
if err != nil {
return nil, err
}
var do dataobject
err = json.Unmarshal(bytes, &do)
return &do, err
}
// returns (nil, nil) when done
func (si *scanIterator) next() ([]any, error) {
// Iterate through in-memory rows first.
if si.unflushedRowPointer < si.unflushedRowsLen {
row := si.unflushedRows[si.unflushedRowPointer]
si.unflushedRowPointer++
return row, nil
}
// If we've gotten through all dataobjects on disk we're done.
if si.dataobjectsPointer == len(si.dataobjects) {
return nil, nil
}
if si.dataobject == nil {
name := si.dataobjects[si.dataobjectsPointer]
o, err := si.d.readDataobject(si.table, name)
if err != nil {
return nil, err
}
si.dataobject = o
}
if si.dataobjectRowPointer > si.dataobject.Len {
si.dataobjectsPointer++
si.dataobject = nil
si.dataobjectRowPointer = 0
return si.next()
}
row := si.dataobject.Data[si.dataobjectRowPointer]
si.dataobjectRowPointer++
return row, nil
}
That's it for scanning a table! The final piece of the puzzle is committing a transaction.
Committing a transaction
When we commit a transaction we must flush any remaining data. A
read-only transaction (one which has no Actions
) is immediately
done. There is no concurrency check.
Otherwise we will serialize transaction state and attempt to
atomically putIfAbsent
.
The only way this will fail is if there is another concurrent writer.
func (d *client) commitTx() error {
if d.tx == nil {
return errNoTx
}
// Flush any outstanding data
for table := range d.tx.tables {
err := d.flushRows(table)
if err != nil {
d.tx = nil
return err
}
}
wrote := false
for _, actions := range d.tx.Actions {
if len(actions) > 0 {
wrote = true
break
}
}
// Read-only transaction, no need to do a concurrency check.
if !wrote {
d.tx = nil
return nil
}
filename := fmt.Sprintf("_log_%020d", d.tx.Id)
// We won't store previous actions, they will be recovered on
// new transactions. So unset them. Honestly not totally
// clear why.
d.tx.previousActions = nil
bytes, err := json.Marshal(d.tx)
if err != nil {
d.tx = nil
return err
}
err = d.os.putIfAbsent(filename, bytes)
d.tx = nil
return err
}
func main() {
panic("unimplemented")
}
This is the crux of Delta Lake. It's simple. And honestly it's a bit shocking. Real Delta Lake does support automatic retries in some cases. But primarily you are limited to a single writer per table, even if the writers are writing non-conflicting rows. Iceberg is basically the same here, it's just how metadata is tracked that differs.
As mentioned in another note above, our implementation is actually stricter than Delta Lake since it manages all table transaction logs together. This means you can get snapshot isolation across all tables (which Delta Lake doesn't support) but it will mean significantly more contention and failed write transactions.
The Delta Lake and Iceberg folks apparently wanted to avoid FoundationDB (i.e. the Snowflake architecture, which is mentioned in the Delta Lake paper) so much that they'd give up row-level concurrency to be mostly serverless.
Is it worth it? Dunno. Delta Lake and Iceberg are getting massive adoption. Many very smart people have worked, and continue to work, on both. Moreover it is apparently what the market wants. Every database-like product is implementing, or is planning to implement, Delta Lake or Iceberg.
Trying it out
Let's add a test in main_test.go
to see what happens with concurrent
writers. Follow the comments and debug logs for details:
package main
import (
"os"
"testing"
)
func TestConcurrentTableWriters(t *testing.T) {
dir, err := os.MkdirTemp("", "test-database")
if err != nil {
panic(err)
}
defer os.Remove(dir)
fos := newFileObjectStorage(dir)
c1Writer := newClient(fos)
c2Writer := newClient(fos)
// Have c2Writer start up a transaction.
err = c2Writer.newTx()
assertEq(err, nil, "could not start first c2 tx")
debug("[c2] new tx")
// But then have c1Writer start a transaction and commit it first.
err = c1Writer.newTx()
assertEq(err, nil, "could not start first c1 tx")
debug("[c1] new tx")
err = c1Writer.createTable("x", []string{"a", "b"})
assertEq(err, nil, "could not create x")
debug("[c1] Created table")
err = c1Writer.writeRow("x", []any{"Joey", 1})
assertEq(err, nil, "could not write first row")
debug("[c1] Wrote row")
err = c1Writer.writeRow("x", []any{"Yue", 2})
assertEq(err, nil, "could not write second row")
debug("[c1] Wrote row")
err = c1Writer.commitTx()
assertEq(err, nil, "could not commit tx")
debug("[c1] Committed tx")
// Now go back to c2 and write data.
err = c2Writer.createTable("x", []string{"a", "b"})
assertEq(err, nil, "could not create x")
debug("[c2] Created table")
err = c2Writer.writeRow("x", []any{"Holly", 1})
assertEq(err, nil, "could not write first row")
debug("[c2] Wrote row")
err = c2Writer.commitTx()
assert(err != nil, "concurrent commit must fail")
debug("[c2] tx not committed")
}
Try it out:
$ go mod init otf
$ go mod tidy
$ go test -run TestConcurrentTableWriters -- --debug
[DEBUG] [c2] new tx
[DEBUG] [c1] new tx
[DEBUG] [c1] Created table
[DEBUG] [c1] Wrote row
[DEBUG] [c1] Wrote row
[DEBUG] [c1] Committed tx
[DEBUG] [c2] Created table
[DEBUG] [c2] Wrote row
[DEBUG] [c2] tx not committed
PASS
ok otf 0.311s
That's pretty cool.
And what about a reader and concurrent writer? Observe that the reader always reads a snapshot. Follow the comments again for detail:
func TestConcurrentReaderWithWriterReadsSnapshot(t *testing.T) {
dir, err := os.MkdirTemp("", "test-database")
if err != nil {
panic(err)
}
defer os.Remove(dir)
fos := newFileObjectStorage(dir)
c1Writer := newClient(fos)
c2Reader := newClient(fos)
// First create some data and commit the transaction.
err = c1Writer.newTx()
assertEq(err, nil, "could not start first c1 tx")
debug("[c1Writer] Started tx")
err = c1Writer.createTable("x", []string{"a", "b"})
assertEq(err, nil, "could not create x")
debug("[c1Writer] Created table")
err = c1Writer.writeRow("x", []any{"Joey", 1})
assertEq(err, nil, "could not write first row")
debug("[c1Writer] Wrote row")
err = c1Writer.writeRow("x", []any{"Yue", 2})
assertEq(err, nil, "could not write second row")
debug("[c1Writer] Wrote row")
err = c1Writer.commitTx()
assertEq(err, nil, "could not commit tx")
debug("[c1Writer] Committed tx")
// Now start a new transaction for more edits.
err = c1Writer.newTx()
assertEq(err, nil, "could not start second c1 tx")
debug("[c1Writer] Starting new write tx")
// Before we commit this second write-transaction, start a
// read transaction.
err = c2Reader.newTx()
assertEq(err, nil, "could not start c2 tx")
debug("[c2Reader] Started tx")
// Write and commit rows in c1.
err = c1Writer.writeRow("x", []any{"Ada", 3})
assertEq(err, nil, "could not write third row")
debug("[c1Writer] Wrote third row")
// Scan x in read-only transaction
it, err := c2Reader.scan("x")
assertEq(err, nil, "could not scan x")
debug("[c2Reader] Started scanning")
seen := 0
for {
row, err := it.next()
assertEq(err, nil, "could not iterate x scan")
if row == nil {
debug("[c2Reader] Done scanning")
break
}
debug("[c2Reader] Got row in reader tx", row)
if seen == 0 {
assertEq(row[0], "Joey", "row mismatch in c1")
assertEq(row[1], 1.0, "row mismatch in c1")
} else {
assertEq(row[0], "Yue", "row mismatch in c1")
assertEq(row[1], 2.0, "row mismatch in c1")
}
seen++
}
assertEq(seen, 2, "expected two rows")
// Scan x in c1 write transaction
it, err = c1Writer.scan("x")
assertEq(err, nil, "could not scan x in c1")
debug("[c1Writer] Started scanning")
seen = 0
for {
row, err := it.next()
assertEq(err, nil, "could not iterate x scan in c1")
if row == nil {
debug("[c1Writer] Done scanning")
break
}
debug("[c1Writer] Got row in tx", row)
if seen == 0 {
assertEq(row[0], "Ada", "row mismatch in c1")
// Since this hasn't been serialized to JSON, it's still an int not a float.
assertEq(row[1], 3, "row mismatch in c1")
} else if seen == 1 {
assertEq(row[0], "Joey", "row mismatch in c1")
assertEq(row[1], 1.0, "row mismatch in c1")
} else {
assertEq(row[0], "Yue", "row mismatch in c1")
assertEq(row[1], 2.0, "row mismatch in c1")
}
seen++
}
assertEq(seen, 3, "expected three rows")
// Writer committing should succeed.
err = c1Writer.commitTx()
assertEq(err, nil, "could not commit second tx")
debug("[c1Writer] Committed tx")
// Reader committing should succeed.
err = c2Reader.commitTx()
assertEq(err, nil, "could not commit read-only tx")
debug("[c2Reader] Committed tx")
}
Run it:
$ go test -run TestConcurrentReaderWithWriterReadsSnapshot -- --debug
[DEBUG] [c1Writer] Started tx
[DEBUG] [c1Writer] Created table
[DEBUG] [c1Writer] Wrote row
[DEBUG] [c1Writer] Wrote row
[DEBUG] [c1Writer] Committed tx
[DEBUG] [c1Writer] Starting new write tx
[DEBUG] [c2Reader] Started tx
[DEBUG] [c1Writer] Wrote third row
[DEBUG] [c2Reader] Started scanning
[DEBUG] [c2Reader] Got row in reader tx [Joey 1]
[DEBUG] [c2Reader] Got row in reader tx [Yue 2]
[DEBUG] [c2Reader] Done scanning
[DEBUG] [c1Writer] Started scanning
[DEBUG] [c1Writer] Got row in tx [Ada 3]
[DEBUG] [c1Writer] Got row in tx [Joey 1]
[DEBUG] [c1Writer] Got row in tx [Yue 2]
[DEBUG] [c1Writer] Done scanning
[DEBUG] [c1Writer] Committed tx
[DEBUG] [c2Reader] Committed tx
PASS
ok otf 0.252s
Sweet.
What's next?
As mentioned, we didn't touch a lot of things. Handling updates and deletes, transaction log checkpoints, data object compaction, etc.
Take a close look at the Delta Lake paper and the Delta Lake Spec and see what you can do!
Build a serverless ACID database with this one neat trick.
— Phil Eaton (@eatonphil) September 29, 2024
(New blog post)https://t.co/rHgfKSPY6q pic.twitter.com/1hmjsxIk6w