概述

SSDB 使用 LevelDB 存储数据,读写数据直接使用 LevelDB 接口实现。在网络时间处理方面使用读/写事件线程池提高处理性能,优先使用 epoll 作为事件处理器,否则降级为 select。

一 网络事件处理

在 SSDB 中将事件抽象为 Fdevent 类型,时间处理抽象为 Fdevents。在 Fdevents 中使用 vector 保存需要监听的全部 Fdevent 事件(events 中保存)、以及已经触发的 SSDB 指令事件(ready_events 中保存)。

事件处理循环读起来有些混乱,在整个循环的前部分将需要处理的事件放入 ready_events 中,后半部分将 ready_events 中的事件匹配对应的 Redis 指令处理函数,并将其放入读/写事件处理线程池中。在线程池中,会将指令的处理结果发送给客户端

  • [ssdb]/src/net/server.cpp:serve()

    事件处理主体

  • [ssdb]/src/net/server.cpp:serve()

    const Request *req = link->recv();:指令解析

    int result=this->proc(job);:将解析出来的指令作为任务添加到线程池中

    job->cmd = proc_map.get_proc(req->at(0));:设置指令对应的处理函数指针

二 指令处理

在读/写线程池中,实际的指令处理函数为 int ProcWorker::proc(ProcJob *job)。所有的函数指针在 SSDBServer 对象创建阶段进行注册(通过 reg_procs 函数进行注册)。根据注册函数表可以找到对应的处理函数,例如注册 REG_PROC(get, "rt") 找到处理函数为 proc_get,即可以跟踪指令处理。

1. set 指令处理

函数调用顺序:proc_set() => serv->ssdb->set() => binlogs->Put()/add_log()/commit()
从上面的调用流程可以看出数据先保存在 binlog 中(LevelDB)实现。

2. get 指令处理

函数调用顺序:proc_get() => serv->ssdb->get() => ldb->Get()
为什么在 set 指令中将数据保存在 binlog 中,而 get 指令却在 ldb 中进行数据查找?其实 binlog 底层使用 leveldb 进行数据存储,其使用的数据文件即 SSDBImpl::ldb 成员。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
SSDB* SSDB::open(const Options &opt, const std::string &dir){
SSDBImpl *ssdb = new SSDBImpl();
ssdb->options.max_file_size = 32 * 1048576; // leveldb 1.20
ssdb->options.create_if_missing = true;
ssdb->options.max_open_files = opt.max_open_files;
ssdb->options.filter_policy = leveldb::NewBloomFilterPolicy(10);
ssdb->options.block_cache = leveldb::NewLRUCache(opt.cache_size * 1048576);
ssdb->options.block_size = opt.block_size * 1024;
ssdb->options.write_buffer_size = opt.write_buffer_size * 1024 * 1024;
ssdb->options.compaction_speed = opt.compaction_speed;
if(opt.compression == "yes"){
ssdb->options.compression = leveldb::kSnappyCompression;
}else{
ssdb->options.compression = leveldb::kNoCompression;
}

leveldb::Status status;

status = leveldb::DB::Open(ssdb->options, dir, &ssdb->ldb);
if(!status.ok()){
log_error("open db failed: %s", status.ToString().c_str());
goto err;
}
// binlogs 底层使用 ldb 存储数据
ssdb->binlogs = new BinlogQueue(ssdb->ldb, opt.binlog, opt.binlog_capacity);

return ssdb;
err:
if(ssdb){
delete ssdb;
}
return NULL;
}