The leveldb library provides a persistent key value store. Keys and values are arbitrary byte arrays. The keys are ordered within the key value store according to a user-specified comparator function.

架构

LevelDB的底层存储使用的是 LSM树,这是一种为了写优化的磁盘存储结构
相比于 传统的 B+树,其写性能会好很多,但是读性能会差不少
整体架构如下

图片来源,这里

主要组件介绍

  • LevelDB对外提供了一套API,应用层可以调用API,包括各种读、写、自定义比较器,各种优化参数等
  • MemTable,新的数据会放到这里,skip-list实现
  • Log,写MemTable之前会先写Log文件,通过append方式追加,完全顺序写入,跟 DB的 WAL 类似
  • Immutable Memtable,当MemTable容量达到上限后,会变成不可变的,后台线程会将其写入成 SST
  • SST,磁盘存储结构,分为 Level0 到 LevelN多层,其中 Level0是由 immutable生成的,其他层是由上一层和本层归并产生的
  • Manifest,记录 SST文件在不同Level的分布,单个SST文件的min、max key等元信息
  • Current,Manifest可能有多个,而LevelDB启动时会找到Current文件,它记录了当前Manifest文件名

读写交互过程

  1. 当 LevelDB 收到一个写入请求 put(k, v) ,会首先将其操作日志追加到日志文件(WAL)中,以备节点意外宕机恢复
  2. 写完 WAL 后,LevelDB 将该条 kv 插入内存中的查找结构:memtable
  3. 在 memtable 积累到一定程度后,会 rotate 为一个只读 memtable,即 immutable memtable;同时生成一个新的 memtable 以供写入
  4. 当内存有压力后,会将 immutable memtable 顺序写入文件系统,生成一个 level0 层的 sstable(sorted strings table) 文件。该过程称为一次 minor compaction
  5. 由于查询操作需要按层次遍历 memtable、immutable 和 sstable。当 sstable 文件生成的越来越多之后,查询性能必然越来越差,因此需要将不同的 sstable 进行归并,称为 major compaction

manifest 版本

  • 每个版本都会记录当前层中所有文件的元数据,如文件大小,min、max key等,还有一些 compaction信息
  • 每次压缩完,都会创建一个新版本,其规则为:versionNew = versionOld + versionEdit
  • versoinEdit 就是基于旧版本基础上,变化的内容
  • manifest 用来记录这些变化信息


图片来源,这里
如上是一个 manifest文件,包含了 3条 versionEdit记录,每条包括

  • 新增哪些 SST
  • 删除哪些 SST
  • 当前 compaction下标
  • 操作 seqNumber信息等

启动时,通过不断apply这些记录,能就得到上一次运行结束后的版本
一个数据库目录中会有多个 manifest 文件,而 current 就指定当前的 manifest文件

主要功能点介绍

基本操作

打开一个数据库

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
#include <cassert>
#include "leveldb/db.h"

leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);
assert(status.ok());
...

// 不存在则报错
options.error_if_exists = true;

// 状态检查
leveldb::Status s = ...;
if (!s.ok()) cerr << s.ToString() << endl;

// 关闭
delete db;

读写

1
2
3
4
std::string value;
leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.ok()) s = db->Put(leveldb::WriteOptions(), key2, value);
if (s.ok()) s = db->Delete(leveldb::WriteOptions(), key1);

原子更新

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
#include "leveldb/write_batch.h"
...
std::string value;
leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.ok()) {
  leveldb::WriteBatch batch;
  batch.Delete(key1);
  batch.Put(key2, value);
  s = db->Write(leveldb::WriteOptions(), &batch);
}

默认是异步写,也可以转为同步写
调用的是fsyncfdatasyncmsync
即使进程挂了,数据如果写入操作系统缓存了,数据也不会丢

1
2
3
leveldb::WriteOptions write_options;
write_options.sync = true;
db->Put(write_options, ...);

leveldb::DB 保证了原子性,被多个线程读写也是安全的

迭代

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
  cout << it->key().ToString() << ": "  << it->value().ToString() << endl;
}
assert(it->status().ok());  // Check for any errors found during the scan
delete it;

// 范围迭代
for (it->Seek(start);
   it->Valid() && it->key().ToString() < limit;
   it->Next()) {
  ...
}

// 反向迭代
for (it->SeekToLast(); it->Valid(); it->Prev()) {
  ...
}

高级特性

快照

1
2
3
4
5
6
7
leveldb::ReadOptions options;
options.snapshot = db->GetSnapshot();
... apply some updates to db ...
leveldb::Iterator* iter = db->NewIterator(options);
... read using iter to view the state when the snapshot was created ...
delete iter;
db->ReleaseSnapshot(options.snapshot);

Slice,只是对 std::string 的包装,包含 length 和指向外部字节数组的指针,所以Slice操作很轻量

1
2
3
4
leveldb::Slice s1 = "hello";

std::string str("world");
leveldb::Slice s2 = str;

自定义比较器,默认是按照字典排序的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class TwoPartComparator : public leveldb::Comparator {
 public:
  // Three-way comparison function:
  //   if a < b: negative result
  //   if a > b: positive result
  //   else: zero result
  int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const {
    int a1, a2, b1, b2;
    ParseKey(a, &a1, &a2);
    ParseKey(b, &b1, &b2);
    if (a1 < b1) return -1;
    if (a1 > b1) return +1;
    if (a2 < b2) return -1;
    if (a2 > b2) return +1;
    return 0;
  }

  // Ignore the following methods for now:
  const char* Name() const { return "TwoPartComparator"; }
  void FindShortestSeparator(std::string*, const leveldb::Slice&) const {}
  void FindShortSuccessor(std::string*) const {}
};

使用新的比较器创建数据库

1
2
3
4
5
6
7
TwoPartComparator cmp;
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
options.comparator = &cmp;
leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);
...

性能

相关配置参数在include/options.h.
BlockSize,默认为 4K,如果是大量 scan可以调大,甚至到 1M,如果都是小的点查,可以调小,比如 1K

压缩,默认是开启的,也可以关闭

1
2
3
leveldb::Options options;
options.compression = leveldb::kNoCompression;
... leveldb::DB::Open(options, name, ...) ....

缓存,内置了一个 LRU

1
2
3
4
5
6
7
8
9
#include "leveldb/cache.h"

leveldb::Options options;
options.block_cache = leveldb::NewLRUCache(100 * 1048576);  // 100MB cache
leveldb::DB* db;
leveldb::DB::Open(options, name, &db);
... use the db ...
delete db
delete options.block_cache;

scan的时候也可以关闭

1
2
3
4
5
6
7
leveldb::ReadOptions options;
options.fill_cache = false;
leveldb::Iterator* it = db->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
  ...
}
delete it;

Filter,内置了一个 bloom filter,用于减少读放大

1
2
3
4
5
6
7
leveldb::Options options;
options.filter_policy = NewBloomFilterPolicy(10);
leveldb::DB* db;
leveldb::DB::Open(options, "/tmp/testdb", &db);
... use the database ...
delete db;
delete options.filter_policy;

上面使用了 每个key 10个 bit
如果要自定义比较器,比如去掉后缀,并关联 bloom filter,可以这么写

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class CustomFilterPolicy : public leveldb::FilterPolicy {
 private:
  leveldb::FilterPolicy* builtin_policy_;

 public:
  CustomFilterPolicy() : builtin_policy_(leveldb::NewBloomFilterPolicy(10)) {}
  ~CustomFilterPolicy() { delete builtin_policy_; }

  const char* Name() const { return "IgnoreTrailingSpacesFilter"; }

  void CreateFilter(const leveldb::Slice* keys, int n, std::string* dst) const {
    // Use builtin bloom filter code after removing trailing spaces
    std::vector<leveldb::Slice> trimmed(n);
    for (int i = 0; i < n; i++) {
      trimmed[i] = RemoveTrailingSpaces(keys[i]);
    }
    builtin_policy_->CreateFilter(trimmed.data(), n, dst);
  }
};

check sum,默认是关闭的,也可以打开

1
ReadOptions::verify_checksums

大致的 size 统计出[a, c)的数量,以及 [x, z) 的数量

1
2
3
4
5
leveldb::Range ranges[2];
ranges[0] = leveldb::Range("a", "c");
ranges[1] = leveldb::Range("x", "z");
uint64_t sizes[2];
db->GetApproximateSizes(ranges, 2, sizes);

兼容性

LevelDB提供了Env,可以兼容不同的环境
比如在一个受限的环境中,需要限制LevelDB,不能让其影响其他活跃应用

1
2
3
4
5
6
7
8
class SlowEnv : public leveldb::Env {
  ... implementation of the Env interface ...
};

SlowEnv env;
leveldb::Options options;
options.env = &env;
Status s = leveldb::DB::Open(options, ...);

移植,可以兼容不同的平台

参考