list
list
是最常使用的数据结构之一,由于rocksdb
只能支持kv这种数据类型,那么怎么使用rocksdb
来实现list
的功能?首先想到的就是,节点之间的数据通过特殊的分隔符来分割,比如使用二进制字符\001
来分割,这样实现的方式带来的问题就是,执行每个list
的命令几乎都要先把整个value读取出来,在value上执行完操作再写回硬盘,当list
元素较多的时候,效率会非常差。nemo
针对这种情况做了优化,每一个list
类型的key分为两种类型数据:meta
数据和node
数据。meta
数据标识了当前key有几个节点、标识头结点和尾节点的数据。node
数据只要存储每个元素的数据就可以,由于list
有顺序关系,所以也要存取前面和后面节点的位置信息。这样做的好处就是,大部分命令都不需要读取完整的节点数据就可以实现,减少了硬盘读写的数据量。
缺点就是几乎执行每个命令之前都需要先读取meta
数据,再访问node
数据,会增加磁盘的读写次数。
数据格式
下面四个表格说明了用来标识meta
和node
数据的key&value.
meta |
原始key字符串 |
L(1) |
key(var) |
由四部分组成,分别是链表长度
,链表首部序号
,链表尾部序号
,下一次可用序号
。
链表长度
这个字段可以在不用遍历所有node
节点就可以直接获取到链表长度
链表首部序号
这个字段可以直接获取到链表的第一个节点的编号,当需要在链表首部插入节点的时候,直接新建节点并将next指向原来的首部节点
链表尾部序号
和链表首部序号
作用类似,在需要用到链表尾部节点的时候直接定位
下一次可用序号
,这个数字是自增的,用来唯一标识一个节点,每次新建节点的时候都会增加
链表长度 |
链表首部序号 |
链表尾部序号 |
下一次可用序号 |
len(8) |
left(8) |
right(8) |
cur_seq(8) |
meta |
key长度 |
key字符串 |
序号 |
l(1) |
size(1) |
key(var) |
seq(8) |
前面序号 |
后面序号 |
value字符串 |
priv(8) |
next(8) |
value(var) |
举例
运行下面三个命令之后,会新建3个节点出来:
rpush test 123
rpush test 456
lpush test 789
meta |
原始key字符串 |
L(1) |
key(var) |
L |
test |
链表长度 |
链表首部序号 |
链表尾部序号 |
下一次可用序号 |
len(8) |
left(8) |
right(8) |
cur_seq(8) |
1 |
1 |
1 |
2 |
2 |
1 |
2 |
3 |
3 |
3 |
2 |
4 |
meta |
key长度 |
key字符串 |
seq |
l(1) |
size(1) |
key(var) |
seq(8) |
l |
4 |
test |
1 |
l |
4 |
test |
2 |
l |
4 |
test |
3 |
前面序号 |
后面序号 |
value字符串 |
priv(8) |
next(8) |
value(var) |
0 |
0 |
123 |
前面序号 |
后面序号 |
value字符串 |
priv(8) |
next(8) |
value(var) |
0 |
2 |
123 |
1 |
0 |
456 |
前面序号 |
后面序号 |
value字符串 |
priv(8) |
next(8) |
value(var) |
4 |
2 |
123 |
1 |
0 |
456 |
0 |
1 |
789 |
meta |
原始key字符串 |
L(1) |
key(var) |
//编码ListMetaKey
inline std::string EncodeLMetaKey(const rocksdb::Slice &key) {
std::string buf;
buf.append(1, DataType::kLMeta);
buf.append(key.data(), key.size());
return buf;
}
//解码ListMetaKey
inline int DecodeLMetaKey(const rocksdb::Slice &slice, std::string *key) {
Decoder decoder(slice.data(), slice.size());
if (decoder.Skip(1) == -1) {
return -1;
}
if (decoder.ReadData(key) == -1) {
return -1;
}
return 0;
}
链表长度 |
链表首部序号 |
链表尾部序号 |
下一次可用序号 |
len(8) |
left(8) |
right(8) |
cur_seq(8) |
struct ListMeta : public NemoMeta {
int64_t len;
int64_t left;
int64_t right;
int64_t cur_seq;
ListMeta() : len(0), left(0), right(0), cur_seq(1) {}
ListMeta(int64_t _len, int64_t _left, int64_t _right, int64_t cseq)
: len(_len), left(_left), right(_right), cur_seq(cseq) {}
virtual bool DecodeFrom(const std::string& raw_meta);
virtual bool EncodeTo(std::string& raw_meta);
virtual std::string ToString();
};
//解码函数
bool ListMeta::DecodeFrom(const std::string &meta_val) {
if (meta_val.size() != sizeof(int64_t) * 4) {
return false;
}
len = *((int64_t *)(meta_val.data()));
left = *((int64_t *)(meta_val.data() + sizeof(int64_t)));
right = *((int64_t *)(meta_val.data() + sizeof(int64_t) * 2));
cur_seq = *((int64_t *)(meta_val.data() + sizeof(int64_t) * 3));
return true;
}
//编码函数
bool ListMeta::EncodeTo(std::string& meta_val) {
meta_val.clear();
meta_val.append((char *)&len, sizeof(int64_t));
meta_val.append((char *)&left, sizeof(int64_t));
meta_val.append((char *)&right, sizeof(int64_t));
meta_val.append((char *)&cur_seq, sizeof(int64_t));
return true;
}
List data key
meta |
key长度 |
key字符串 |
序号 |
l(1) |
size(1) |
key(var) |
seq(8) |
inline std::string EncodeListKey(const rocksdb::Slice &key, const int64_t seq) {
std::string buf;
buf.append(1, DataType::kList);
buf.append(1, (uint8_t)key.size());
buf.append(key.data(), key.size());
buf.append((char *)&seq, sizeof(int64_t));
return buf;
}
inline int DecodeListKey(const rocksdb::Slice &slice, std::string *key, int64_t *seq) {
Decoder decoder(slice.data(), slice.size());
if (decoder.Skip(1) == -1) {
return -1;
}
if (decoder.ReadLenData(key) == -1) {
return -1;
}
if (decoder.ReadInt64(seq) == -1) {
return -1;
}
return 0;
}
List data value
前面序号 |
后面序号 |
value字符串 |
priv(8) |
next(8) |
value(var) |
inline void EncodeListVal(const std::string &raw_val, const int64_t priv, const int64_t next, std::string &en_val) {
en_val.clear();
en_val.append((char *)&priv, sizeof(int64_t));
en_val.append((char *)&next, sizeof(int64_t));
en_val.append(raw_val.data(), raw_val.size());
}
inline void DecodeListVal(const std::string &en_val, int64_t *priv, int64_t *next, std::string &raw_val) {
raw_val.clear();
*priv = *((int64_t *)en_val.data());
*next = *((int64_t *)(en_val.data() + sizeof(int64_t)));
raw_val = en_val.substr(sizeof(int64_t) * 2, en_val.size() - sizeof(int64_t) * 2);
}
代码实现
LIndex
- 功能:获取第N个节点的值
实现:
加行锁
根据传入的key,编码出list meta key
,调用Get
接口得到list meta value
,解码出来得到链表长度
,链表首部序号
,链表尾部序号
,下一次可用序号
判断传入的index是否合法,不合法返回错误
- index大于0
- 从头节点开始遍历,直到遇到这个节点的直接前缀节点停止,得到当前节点的
cur编号
(注意:这里的cur编号
不是序号)
- 根据
cur编号
就可以编码出list data key
,调用Get
得到list data value
- 解码出
list data value
中的value
返回
- index小于0
- 从尾节点开始遍历,直到遇到这个节点的直接前缀节点停止,得到当前节点的
cur编号
(注意:这里的cur编号
不是序号)
- 根据
cur编号
就可以编码出list data key
,调用Get
得到list data value
- 解码出
list data value
中的value
返回
Status Nemo::LIndex(const std::string &key, const int64_t index, std::string *val) {
Status s;
rocksdb::WriteBatch batch;
std::string meta_val;
ListMeta meta;
RecordLock l(&mutex_list_record_, key);
int64_t priv;
int64_t cur;
int64_t next;
std::string en_val;
std::string meta_key = EncodeLMetaKey(key);
s = list_db_->Get(rocksdb::ReadOptions(), meta_key, &meta_val);
if (s.ok()) {
if (meta.DecodeFrom(meta_val)) {
if (meta.len <= 0) {
return Status::NotFound("not found the key");
}
if (index >= meta.len || -index > meta.len ) {
return Status::Corruption("index out of range");
}
if (index >= 0) {
if (L2R(key, index, meta.left, &priv, &cur, &next) != 0) {
return Status::Corruption("error in iterate");
}
} else {
if (R2L(key, -index-1, meta.right, &priv, &cur, &next) != 0) {
return Status::Corruption("error in iterate");
}
}
std::string db_key = EncodeListKey(key, cur);
s = list_db_->Get(rocksdb::ReadOptions(), db_key, &en_val);
DecodeListVal(en_val, &priv, &next, *val);
return s;
} else {
return Status::Corruption("parse listmeta error");
}
} else if (s.IsNotFound()) {
return Status::NotFound("not found the key");
} else {
return Status::Corruption("get listmeta error");
}
}
LLen
- 功能:返回列表key的长度。
- 实现:
- 直接获取
list meta key
对应的list meta value
,把其中的链表长度解析出来返回即可。
Status Nemo::LLen(const std::string &key, int64_t *llen) {
Status s;
std::string meta_key = EncodeLMetaKey(key);
std::string meta_val;
s = list_db_->Get(rocksdb::ReadOptions(), meta_key, &meta_val);
if (s.ok()) {
if(meta_val.size() != sizeof(int64_t) * 4) {
return Status::Corruption("list meta error");
}
*llen = *((int64_t *)(meta_val.data()));
if (*llen <= 0) {
return Status::NotFound("not found the key");
}
} else {
*llen = 0;
}
return s;
}
LPush
- 功能:将一个value插入到列表key的表尾(最左边)。
- 实现:
- 加行锁
- 根据传入的key,编码出
list meta key
,调用Get
接口得到list meta value
,解码出来得到链表长度
,链表首部序号
,链表尾部序号
,下一次可用序号
- 判断
链表首部序号
是否大于0
- 如果大于0,说明当前链表有节点,则需要对首部节点的值进行修改,需要修改这个节点的直接前缀节点编号,因为要在它的前面插入节点
- 编码新节点的
list data key
- 新节点的key中使用的编号是
list meta value
中的序号
- 编码新节点的
list data value
- 新节点的value中
前面序号
就是0, 后面序号
就是当前首节点编号,这样链表的链接关系就生成了
- 修改
list meta value
链表长度
,链表首部序号
,链表尾部序号
,下一次可用序号
Status Nemo::LPush(const std::string &key, const std::string &val, int64_t *llen) {
if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
return Status::InvalidArgument("Invalid key length");
}
Status s;
rocksdb::WriteBatch batch;
ListMeta meta;
std::string meta_val;
int64_t priv;
int64_t next;
std::string en_val;
std::string raw_val;
std::string meta_key = EncodeLMetaKey(key);
RecordLock l(&mutex_list_record_, key);
s = list_db_->Get(rocksdb::ReadOptions(), meta_key, &meta_val);
if (s.ok()) {
if (meta.DecodeFrom(meta_val)) {
if (meta.left > 0) {
std::string l_key = EncodeListKey(key, meta.left);
s = list_db_->Get(rocksdb::ReadOptions(), l_key, &en_val);
if (!s.ok()) {
return Status::Corruption("get left error");
}
DecodeListVal(en_val, &priv, &next, raw_val);
EncodeListVal(raw_val, meta.cur_seq, next, en_val);
batch.Put(l_key, en_val);
}
std::string db_key = EncodeListKey(key, meta.cur_seq);
EncodeListVal(val, 0, meta.left, en_val);
batch.Put(db_key, en_val);
meta.len++;
meta.left = meta.cur_seq;
if (meta.right == 0) {
meta.right = meta.cur_seq;
}
meta.cur_seq++;
meta.EncodeTo(meta_val);
batch.Put(meta_key, meta_val);
s = list_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
*llen = meta.len;
return s;
} else {
return Status::Corruption("parse listmeta error");
}
} else if (s.IsNotFound()) {
ListMeta meta(1, 1, 1, 2); // | len | left | right | cur_seq |
meta_val.reserve(4 * sizeof(int64_t));
meta.EncodeTo(meta_val);
batch.Put(meta_key, meta_val);
EncodeListVal(val, 0, 0, en_val);
batch.Put(EncodeListKey(key, 1), en_val);
s = list_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
*llen = 1;
return s;
} else {
return Status::Corruption("get listmeta error");
}
}
LPop
- 功能:取出链表头部节点
- 实现:
- 加行锁
- 根据传入的key,编码出
list meta key
,调用Get
接口得到list meta value
,解码出来得到链表长度
,链表首部序号
,链表尾部序号
,下一次可用序号
- 根据
list meta value
中的首节点编号,编码list data key
,通过Get
接口得到首节点list data value
。
- 修改首节点的直接后继节点信息,设置返回值
- 修改
list meta value
- 调用
WriteWithOldKeyTTL
写回硬盘
Status Nemo::LPop(const std::string &key, std::string *val) {
if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
return Status::InvalidArgument("Invalid key length");
}
Status s;
rocksdb::WriteBatch batch;
ListMeta meta;
std::string meta_val;
int64_t priv = 0;
int64_t next = 0;
std::string en_val;
std::string raw_val;
std::string meta_key = EncodeLMetaKey(key);
RecordLock l(&mutex_list_record_, key);
s = list_db_->Get(rocksdb::ReadOptions(), meta_key, &meta_val);
if (s.ok()) {
if (meta.DecodeFrom(meta_val)) {
if (meta.len <= 0) {
return Status::NotFound("not found key");
}
std::string db_key = EncodeListKey(key, meta.left);
s = list_db_->Get(rocksdb::ReadOptions(), db_key, &en_val);
if (!s.ok()) {
return Status::Corruption("get meta.left error");
}
DecodeListVal(en_val, &priv, &next, *val);
meta.left = next;
if (next != 0) {
std::string l_key = EncodeListKey(key, next);
s = list_db_->Get(rocksdb::ReadOptions(), l_key, &en_val);
if (!s.ok()) {
return Status::Corruption("get next left error");
}
DecodeListVal(en_val, &priv, &next, raw_val);
EncodeListVal(raw_val, 0, next, en_val);
batch.Put(l_key, en_val);
}
--meta.len;
if (meta.len == 0) {
meta.right = 0;
meta.cur_seq = 1;
}
meta.EncodeTo(meta_val);
batch.Put(meta_key, meta_val);
batch.Delete(db_key);
s = list_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
return s;
} else {
return Status::Corruption("parse listmeta error");
}
} else if (s.IsNotFound()) {
return Status::NotFound("not found key");
} else {
return Status::Corruption("get listmeta error");
}
}
LPushx
- 功能:将值value插入到列表key的表头,当且仅当key存在并且是一个列表。
- 实现:
- 直接获取
list meta key
对应的list meta value
,把其中的链表长度解析出来判断是不是空链表
- 如果不是空链表,直接调用
LPush
命令
Status Nemo::LPushx(const std::string &key, const std::string &val, int64_t *llen) {
if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
return Status::InvalidArgument("Invalid key length");
}
Status s;
ListMeta meta;
std::string meta_val;
std::string meta_key = EncodeLMetaKey(key);
s = list_db_->Get(rocksdb::ReadOptions(), meta_key, &meta_val);
if (s.ok()) {
if (meta.DecodeFrom(meta_val)) {
if (meta.len == 0) {
*llen = 0;
return Status::NotFound("not found the key");
} else if (meta.len > 0) {
s = LPush(key, val, llen);
return s;
} else {
return Status::Corruption("get invalid listlen");
}
} else {
return Status::Corruption("parse listmeta error");
}
} else if (s.IsNotFound()) {
*llen = 0;
return Status::NotFound("not found the key");
} else {
return Status::Corruption("get listmeta error");
}
}
LRange
- 功能:返回列表key中指定区间内的元素,区间以偏移量start和stop指定,下标(index)参数start和stop都以0为底,也就是说,以0表示列表的第一个元素,以1表示列表的第二个元素,以此类推。你也可以使用负数下标,以-1表示列表的最后一个元素,-2表示列表的倒数第二个元素,以此类推。
- 实现:
TODO
LSet
- 功能:将列表key下标为index的元素的值设置为value。
- 实现:
TODO
LTrim
- 功能:对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。
- 实现:
TODO
RPush
- 功能:将一个value插入到列表key的表尾(最右边)。
- 实现:
TODO
RPop
- 功能:移除并返回列表key的尾元素
- 实现:
TODO
RPushx
- 功能:将值value插入到列表key的表尾,当且仅当key存在并且是一个列表。
- 实现:
TODO
RPopLPush
- 功能:命令 RPOPLPUSH 在一个原子时间内,执行以下两个动作:
- 将列表source中的最后一个元素(尾元素)弹出,并返回给客户端。
- 将source弹出的元素插入到列表destination ,作为destination列表的的头元素。
- 实现:
TODO
LInsert
- 功能:将值value插入到链表key当中,位于值特定value位置之前或之后。
- 实现:
- 加行锁
- 根据传入的key,编码出
list meta key
,调用Get
接口得到list meta value
,解码出来得到链表长度
,链表首部序号
,链表尾部序号
,下一次可用序号
- 从首节点开始遍历,直到遍历完链表尾部或者节点的数据和传入的节点value值相等
- 如果需要插入节点value之前
- 获取插入节点直接前缀节点,并修改其后面节点编号
- 修改当前节点的前面节点编号
- 如果需要插入节点value之后
- 获取插入节点直接后继节点,并修改其前面节点编号
- 修改当前节点后面节点编号
- 新建节点,设置正确的前面节点、后面节点编号、value值
WriteWithOldKeyTTL
批量写入
Status Nemo::LInsert(const std::string &key, Position pos, const std::string &pivot, const std::string &val, int64_t *llen) {
if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
return Status::InvalidArgument("Invalid key length");
}
Status s;
rocksdb::WriteBatch batch;
ListMeta meta;
std::string meta_val;
int64_t before_seq;
int64_t after_seq;
int64_t priv;
int64_t next;
std::string en_val;
std::string raw_val;
std::string meta_key = EncodeLMetaKey(key);
//MutexLock l(&mutex_list_);
RecordLock l(&mutex_list_record_, key);
s = list_db_->Get(rocksdb::ReadOptions(), meta_key, &meta_val);
if (s.ok()) {
if (meta.DecodeFrom(meta_val)) {
// traverse to find pivot
next = meta.left;
int64_t tmp_seq;
do {
tmp_seq = next;
std::string cur_listkey = EncodeListKey(key, tmp_seq);
s = list_db_->Get(rocksdb::ReadOptions(), cur_listkey, &en_val);
if (!s.ok()) {
return Status::Corruption("get listkey error");
}
DecodeListVal(en_val, &priv, &next, raw_val);
} while (next != 0 && raw_val != pivot);
if (raw_val == pivot) {
if (pos == AFTER) {
before_seq = tmp_seq;
after_seq = next;
} else {
before_seq = priv;
after_seq = tmp_seq;
}
// modify before node
if (before_seq != 0) {
std::string cur_listkey = EncodeListKey(key, before_seq);
s = list_db_->Get(rocksdb::ReadOptions(), cur_listkey, &en_val);
if (!s.ok()) {
return Status::Corruption("get listkey error");
}
DecodeListVal(en_val, &priv, &next, raw_val);
EncodeListVal(raw_val, priv, meta.cur_seq, en_val);
batch.Put(cur_listkey, en_val);
}
// modify after node
if (after_seq != 0) {
std::string cur_listkey = EncodeListKey(key, after_seq);
s = list_db_->Get(rocksdb::ReadOptions(), cur_listkey, &en_val);
if (!s.ok()) {
return Status::Corruption("get listkey error");
}
DecodeListVal(en_val, &priv, &next, raw_val);
EncodeListVal(raw_val, meta.cur_seq, next, en_val);
batch.Put(cur_listkey, en_val);
}
// add new node
std::string add_key = EncodeListKey(key, meta.cur_seq);
EncodeListVal(val, before_seq, after_seq, en_val);
batch.Put(add_key, en_val);
meta.len++;
if (before_seq == 0) {
meta.left = meta.cur_seq;
}
if (after_seq == 0) {
meta.right = meta.cur_seq;
}
++meta.cur_seq;
meta.EncodeTo(meta_val);
batch.Put(meta_key, meta_val);
s = list_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
*llen = meta.len;
return s;
} else {
*llen = -1;
return s;
}
} else {
return Status::Corruption("parse listmeta error");
}
} else if (s.IsNotFound()) {
*llen = 0;
return s;
} else {
*llen = 0;
return Status::Corruption("get key error");
}
}
LRem
- 功能:根据参数 count 的值,移除列表中与参数value相等的元素。
- count 的值可以是以下几种:
- count > 0 : 从表头开始向表尾搜索,移除与value相等的元素,数量为count 。
- count < 0 : 从表尾开始向表头搜索,移除与value相等的元素,数量为count 的绝对值。
- count = 0 : 移除表中所有与 value 相等的值。
- 实现:
TODO