etcd 如何用 bbolt 存储数据

一点小感悟

最近再看一本稍微有点古老的书:《Linux 内核源代码情景分析》。之所以说古老是因为该书出版年份已久(18 年前的书),而且分析 2.4.0 版本的内核如今早已失去实用价值(不过内核很多基础设施几乎还是那个大的架子),但是该书比较有价值的是分析的方法:情景分析。所谓情景分析就是:针对一个具体的场景来剖析代码实现。其实说白了,就是读源码必须带着问题来读。如果不带着问题读代码,容易深陷繁琐细节的泥潭而无法跳出(相信这也是大多数人遇到的苦恼之一)。所以,当我们在阅读某个项目源码的时候,不妨带着几个问题去读,先给问题找到答案,然后再顺藤摸瓜,思考背后设计的原理,最后首尾呼应,一通百通

根据我的经验:大多数项目的基本设计思路都可以用自然语言简洁地总结出来,但落实到具体的代码就可能有很多细节,毕竟现实编码中必须考虑语言特性、边界条件、系统整合等各种因素,不可能做到像自然语言描述一样有较强的概括性(与之带来的就是精确性不够)。

我觉得:阅读源码,其实就是将现实中的代码映射成自然语言的过程。当你可以将几百行、几千行甚至几万行的代码用简洁的、高度概括的和清晰的自然语言描述出来,甚至也能够让你身边的「小黄鸭」听懂时,那么你就离真正理解项目的目标不远了。当然,这相当考验工程师的功力

几个 etcd 存储的问题

承接上文,给自己提这么几个问题:

  • etcd 是如何用 bbolt 来存储 key-value
  • etcd 如何保证数据读写的事务性

希望本文能够顺利回答这几个问题(本文代码基于 etcd 3.1.11 来分析,可能跟最新版本有所出入,但相信大逻辑应该变化不多)。阅读前请注意,本文为了减少心智负担,很多数据结构和方法都尽可能做了省略,只保留最核心的逻辑,而不考虑诸如错误处理这一类的分支逻辑

核心逻辑分析

核心代码均在 etcd/mvcc/backend/backend.goetcd/mvcc/backend/batch_tx.go

关键数据结构

对于用户层,看到的 backend 存储都必须符合 Backend 接口:

1
2
3
4
5
type Backend interface {
// 创建一个批量事务
BatchTx() BatchTx
...
}

其中 BatchTx 也是一个接口:

1
2
3
4
5
6
7
8
9
10
11
12
type BatchTx interface {
Lock()
Unlock()
UnsafeCreateBucket(name []byte)
UnsafePut(bucketName []byte, key []byte, value []byte)
UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeDelete(bucketName []byte, key []byte)
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
Commit()
CommitAndStop()
}

也就是说,对于更上层的应用(比如 etcd/mvcc/backend/kvstore.go),它们所使用的后端存储都是 Backend 类型,而存储类型支持的读写方法都通过 BatchTx 接口暴露。

仔细观察 BatchTx 类型,所有 Unsafe* 就是基于 bbolt 的读写接口的 wrapper,之所以叫 Unsafe,这是因为直接调用该接口结束后事务不一定能立刻提交,而是异步提交,所以此时数据并未真正落盘,仍处于不安全的状态。这些读写接口的第一个参数都是 bucketName。etcd 中的所有 key 都保存在名为 key 的 bucket 中,元数据则保存在名为 meta 的 bucket(即两个 bucket)。

UnsafePut()UnsafeSeqPut 区别只在于是否指定顺序写。如果是顺序写,则将 bbolt 中 bucket 的填充率(fill percent)设置为 90%,这在大部分都是 append-only 的操作中可有效暂缓 page 的分裂并减少存储空间(这部分细节后面专门聊 B+ Tree 和 bbolt 的时候再说,可以认为标记是否为顺序写可有效提升性能)。

Backend 接口由 backend 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type backend struct {
...
// 从启动到目前为止,已经提交的事务数
commits int64

mu sync.RWMutex

// BoltDB 实例
db *bolt.DB

// 提交批量读写事务的间隔时间,默认是 100ms
batchInterval time.Duration

// 每个批量读写事务能包含的最多的操作个数
// 当超过这个阈值之后,当前批量读写事务会自动提交
// 默认是 10000
batchLimit int

// BatchTx 接口的实现者
batchTx *batchTx
...
}

BatchTx 接口由 batchTx 实现:

1
2
3
4
5
6
7
8
9
10
11
12
type batchTx struct {
sync.Mutex

// bbolt 的事务
tx *bolt.Tx

// backend 对象
backend *backend

// 当前事务中执行的修改操作数,在当前读写事务提交时,该值将被重置为 0
pending int
}

从这里可以看到,两个具体的对象,是彼此都有对方的引用的。

初始化过程

上层会先用 Backend 的初始化接口创建 backend 对象,在创建 backend 对象的同时,batchTx 作为内部字段也将被创建,此时,我们就拥有了 backendbatchTx 的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func newBackend(path string, d time.Duration, limit int) *backend {
// 调用 bbolt 的 Open 接口来打开一个数据库
db, err := bolt.Open(path, 0600, boltOpenOptions)
...

// d 是批量事务提交时间
b := &backend{
db: db,
batchInterval: d,
batchLimit: limit,
...
}

// 传入 backend,构造 batchTx 结构
b.batchTx = newBatchTx(b)

// 启动一个单独的 goroutine
// 定时提交当前的批量读写事务,并开启新的批量读写事务
go b.run()
return b
}

初始化的过程很简单,不过有几点需要注意

  • 调用 bolt.Open() 的时候所用的 boltOpenOptions

    在 Linux 场景下,boltOpenOptions 为:

    1
    2
    3
    4
    var boltOpenOptions = &bolt.Options{
    MmapFlags: syscall.MAP_POPULATE,
    InitialMmapSize: int(InitialMmapSize),
    }

    其中关注一下 MAP_POPULATE 这个 mmap() 的标志。MAP_POPULATE 是 linux 2.6.23 之后的一个特性:在文件映射的场景上对文件执行一个超前读取。这意味着后续对映射内容的访问不会因 page fault 而发生阻塞(即提前 load 到内存避免访问不到后产生缺页中断)。这是一个性能优化选项,如果内核不支持的话,将默认忽略。

  • b.run() 启动一个后台提交事务的 goroutine;

    可见下文分析。

异步批量提交事务

etcd 直接用 bolt.Tx手动控制事务的提交。其具体逻辑是:

  1. 执行 BatchTx 接口时候必须先获取锁 BatchTx.Lock()BatchTx.UnLock()
  2. 每次写操作都累积一个操作数,当累积的操作数量达到一定阈值的时候,执行 commit 动作提交一个事务,这动作发生在 BatchTx.UnLock() 阶段;
  3. 每间隔一段时间(默认是 100ms)批量提交事务。这部分逻辑是启动一个 goroutine 进行的,所以不会阻塞主逻辑,达到异步化的目的;

这么做的好处是显而易见:batch 化和异步操作来降低磁盘 IO 压力

每次进行写操作

  • BatchTx.UnsafeCreateBucket()
  • BatchTx.UnsafePut()
  • BatchTx.UnsafeSeqPut()
  • BatchTx.UnsafeDelete()

的时候,都会将 batchTx.pending 自增 1。batchTx.pending 代表了当前累积的操作数量,当 batchTx.pending 达到一定数值时,BatchTx.UnLock() 将触发 commit 动作(之所以要在 Unlock() 逻辑这是因为操作 BatchTx 要先获取锁,而操作结束后最后动作就是 Unlock,在释放锁的时候检查操作数量是否达到阈值从而达到每次都检查操作数量的目的):

1
2
3
4
5
6
7
8
func (t *batchTx) Unlock() {
// 检测当前事务的修改操作数量是否达到上限,如果是则提交事务,并重置 pending
if t.pending >= t.backend.batchLimit {
t.commit(false)
t.pending = 0
}
t.Mutex.Unlock()
}

定期执行 commit 动作的 goroutine 逻辑为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (b *backend) run() {
// 执行 run() 的协程停止后关闭 b.donec
// 关闭 b.done.c 将制造出一个关闭信号用以关闭数据库
defer close(b.donec)

// 创建 timer
t := time.NewTimer(b.batchInterval)

// 执行 run() 的协程停止后关闭 timer,避免资源泄漏
defer t.Stop()

// 主逻辑
for {
select {
// t.C 是一个定时的 timer 信号
// 如果收到 timer 信号
// 将跳出 select,然后在 for-loop 下重新进入 select
case <-t.C:
case <-b.stopc:
b.batchTx.CommitAndStop()
return
}

// 提交批量读写事务
b.batchTx.Commit()

// 重置定时器
t.Reset(b.batchInterval)
}
}

其实很简单:收到定时器信号 -> 执行 commit -> 重置定时器,就这样重复进行。

BatchTx.Commit() 接口的实现

这个接口其实是由 batchTx.commit() 实现,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (t *batchTx) commit(stop bool) {
...

// 如果 tx 不为 nil,即合法 tx
if t.tx != nil {
...

// backend.commits 保存了已经提交的事务数,此处加 1
atomic.AddInt64(&t.backend.commits, 1)

// 操作数量已经全部提交,重置变量
t.pending = 0
...
}

// stop 为 true:不开启新事务
// stop 为 false:创建一个新的事务
if stop {
return
}

// 获取 bbolt 的读锁,创建一个新的读写事务
t.backend.mu.RLock()
defer t.backend.mu.RUnlock()

t.tx, err = t.backend.db.Begin(true)
...

// 将数据库的大小保存在 backend.size 字段中
atomic.StoreInt64(&t.backend.size, t.tx.Size())
}

BatchTx.Commit()BatchTx.CommitAndStop() 都是用 batchTx.commit() 实现,差异就在于前者 stopfalse,后者为 true,即前者将开启一个新的 bbolt 事务,后者则不开启。

总结

通过上面的分析,其实 etcd 对 bbolt 的使用并不复杂(两个接口,两个具体的数据结构和异步批量提交事务)。不过,这种使用是不是有问题呢 ?比如在异步批量提交事务的逻辑中,如果在某个时间窗口,数据未落盘(还没有进行 commit 动作),那么是否会出现数据不一致的情况:写成功但是读出来是错误数据。这其实就是一个很经典的线性一致性读的问题。etcd 通过 ReadIndex 算法来保障线性一致性读,但这个算法在 mvcc 中又是如何体现的呢 ?我们在后续的文章中再聊。

祝大家读得愉快 !