list

list是最常使用的数据结构之一,由于rocksdb只能支持kv这种数据类型,那么怎么使用rocksdb来实现list的功能?首先想到的就是,节点之间的数据通过特殊的分隔符来分割,比如使用二进制字符\001来分割,这样实现的方式带来的问题就是,执行每个list的命令几乎都要先把整个value读取出来,在value上执行完操作再写回硬盘,当list元素较多的时候,效率会非常差。nemo针对这种情况做了优化,每一个list类型的key分为两种类型数据:meta数据和node数据。meta数据标识了当前key有几个节点、标识头结点和尾节点的数据。node数据只要存储每个元素的数据就可以,由于list有顺序关系,所以也要存取前面和后面节点的位置信息。这样做的好处就是,大部分命令都不需要读取完整的节点数据就可以实现,减少了硬盘读写的数据量。 缺点就是几乎执行每个命令之前都需要先读取meta数据,再访问node数据,会增加磁盘的读写次数。

数据格式

下面四个表格说明了用来标识metanode数据的key&value.

  • List meta key:
meta 原始key字符串
L(1) key(var)
  • List meta value:

由四部分组成,分别是链表长度,链表首部序号,链表尾部序号,下一次可用序号

  1. 链表长度这个字段可以在不用遍历所有node节点就可以直接获取到链表长度
  2. 链表首部序号这个字段可以直接获取到链表的第一个节点的编号,当需要在链表首部插入节点的时候,直接新建节点并将next指向原来的首部节点
  3. 链表尾部序号链表首部序号作用类似,在需要用到链表尾部节点的时候直接定位
  4. 下一次可用序号,这个数字是自增的,用来唯一标识一个节点,每次新建节点的时候都会增加
链表长度 链表首部序号 链表尾部序号 下一次可用序号
len(8) left(8) right(8) cur_seq(8)
  • List data key:
meta key长度 key字符串 序号
l(1) size(1) key(var) seq(8)
  • List data value:

    list的每个需要操作node节点的命令实现的过程中重点关注当前节点的直接前缀节点以及直接后继节点的序号的如何变化,实现的思路就已经很清楚了。

前面序号 后面序号 value字符串
priv(8) next(8) value(var)

举例

运行下面三个命令之后,会新建3个节点出来:

rpush test 123

rpush test 456

lpush test 789

  • List meta key
meta 原始key字符串
L(1) key(var)
L test
  • List meta value
链表长度 链表首部序号 链表尾部序号 下一次可用序号
len(8) left(8) right(8) cur_seq(8)
1 1 1 2
2 1 2 3
3 3 2 4
  • List data key
meta key长度 key字符串 seq
l(1) size(1) key(var) seq(8)
l 4 test 1
l 4 test 2
l 4 test 3
  • List data value
前面序号 后面序号 value字符串
priv(8) next(8) value(var)
0 0 123
前面序号 后面序号 value字符串
priv(8) next(8) value(var)
0 2 123
1 0 456
前面序号 后面序号 value字符串
priv(8) next(8) value(var)
4 2 123
1 0 456
0 1 789

List meta key

meta 原始key字符串
L(1) key(var)
//编码ListMetaKey
inline std::string EncodeLMetaKey(const rocksdb::Slice &key) {
    std::string buf;
    buf.append(1, DataType::kLMeta);
    buf.append(key.data(), key.size());
    return buf;
}

//解码ListMetaKey
inline int DecodeLMetaKey(const rocksdb::Slice &slice, std::string *key) {
    Decoder decoder(slice.data(), slice.size());
    if (decoder.Skip(1) == -1) {
        return -1;
    }
    if (decoder.ReadData(key) == -1) {
        return -1;
    }
    return 0;
}

List meta value

链表长度 链表首部序号 链表尾部序号 下一次可用序号
len(8) left(8) right(8) cur_seq(8)
struct ListMeta : public NemoMeta {
  int64_t len;
  int64_t left;
  int64_t right;
  int64_t cur_seq;
  ListMeta() : len(0), left(0), right(0), cur_seq(1) {}
  ListMeta(int64_t _len, int64_t _left, int64_t _right, int64_t cseq)
      : len(_len), left(_left), right(_right), cur_seq(cseq) {}
  virtual bool DecodeFrom(const std::string& raw_meta);
  virtual bool EncodeTo(std::string& raw_meta);
  virtual std::string ToString();
};

//解码函数
bool ListMeta::DecodeFrom(const std::string &meta_val) {
  if (meta_val.size() != sizeof(int64_t) * 4) {
    return false;
  }
  len = *((int64_t *)(meta_val.data()));
  left = *((int64_t *)(meta_val.data() + sizeof(int64_t)));
  right = *((int64_t *)(meta_val.data() + sizeof(int64_t) * 2));
  cur_seq = *((int64_t *)(meta_val.data() + sizeof(int64_t) * 3));
  return true;
}
//编码函数
bool ListMeta::EncodeTo(std::string& meta_val) {
  meta_val.clear();
  meta_val.append((char *)&len, sizeof(int64_t));
  meta_val.append((char *)&left, sizeof(int64_t));
  meta_val.append((char *)&right, sizeof(int64_t));
  meta_val.append((char *)&cur_seq, sizeof(int64_t));
  return true;
}

List data key

meta key长度 key字符串 序号
l(1) size(1) key(var) seq(8)
inline std::string EncodeListKey(const rocksdb::Slice &key, const int64_t seq) {
    std::string buf;
    buf.append(1, DataType::kList);
    buf.append(1, (uint8_t)key.size());
    buf.append(key.data(), key.size());
    buf.append((char *)&seq, sizeof(int64_t));
    return buf;
}

inline int DecodeListKey(const rocksdb::Slice &slice, std::string *key, int64_t *seq) {
    Decoder decoder(slice.data(), slice.size());
    if (decoder.Skip(1) == -1) {
        return -1;
    }
    if (decoder.ReadLenData(key) == -1) {
        return -1;
    }
    if (decoder.ReadInt64(seq) == -1) {
        return -1;
    }
    return 0;
}

List data value

前面序号 后面序号 value字符串
priv(8) next(8) value(var)
inline void EncodeListVal(const std::string &raw_val, const int64_t priv, const int64_t next, std::string &en_val) {
    en_val.clear();
    en_val.append((char *)&priv, sizeof(int64_t));
    en_val.append((char *)&next, sizeof(int64_t));
    en_val.append(raw_val.data(), raw_val.size());
}

inline void DecodeListVal(const std::string &en_val, int64_t *priv, int64_t *next, std::string &raw_val) {
    raw_val.clear();
    *priv = *((int64_t *)en_val.data());
    *next = *((int64_t *)(en_val.data() + sizeof(int64_t)));
    raw_val = en_val.substr(sizeof(int64_t) * 2, en_val.size() - sizeof(int64_t) * 2);
}

代码实现

LIndex

  • 功能:获取第N个节点的值
  • 实现:

    • 加行锁

    • 根据传入的key,编码出list meta key,调用Get接口得到list meta value,解码出来得到链表长度,链表首部序号,链表尾部序号,下一次可用序号

    • 判断传入的index是否合法,不合法返回错误

      • index大于0
        • 从头节点开始遍历,直到遇到这个节点的直接前缀节点停止,得到当前节点的cur编号(注意:这里的cur编号不是序号)
        • 根据cur编号就可以编码出list data key,调用Get得到list data value
        • 解码出list data value中的value返回
      • index小于0
        • 从尾节点开始遍历,直到遇到这个节点的直接前缀节点停止,得到当前节点的cur编号(注意:这里的cur编号不是序号)
        • 根据cur编号就可以编码出list data key,调用Get得到list data value
        • 解码出list data value中的value返回
Status Nemo::LIndex(const std::string &key, const int64_t index, std::string *val) {
    Status s;
    rocksdb::WriteBatch batch;
    std::string meta_val;
    ListMeta meta;
    RecordLock l(&mutex_list_record_, key);

    int64_t priv;
    int64_t cur;
    int64_t next;
    std::string en_val;
    std::string meta_key = EncodeLMetaKey(key);
    s = list_db_->Get(rocksdb::ReadOptions(), meta_key, &meta_val);
    if (s.ok()) {
        if (meta.DecodeFrom(meta_val)) {
            if (meta.len <= 0) {
                return Status::NotFound("not found the key");
            }
            if (index >= meta.len || -index > meta.len ) {
                return Status::Corruption("index out of range");
            }
            if (index >= 0) {
                if (L2R(key, index, meta.left, &priv, &cur, &next) != 0) {
                    return Status::Corruption("error in iterate");
                }
            } else {
                if (R2L(key, -index-1, meta.right, &priv, &cur, &next) != 0) {
                    return Status::Corruption("error in iterate");
                }
            }
            std::string db_key = EncodeListKey(key, cur);
            s = list_db_->Get(rocksdb::ReadOptions(), db_key, &en_val);
            DecodeListVal(en_val, &priv, &next, *val);
            return s;
        } else {
            return Status::Corruption("parse listmeta error");
        }
    } else if (s.IsNotFound()) {
        return Status::NotFound("not found the key");
    } else {
        return Status::Corruption("get listmeta error");
    }
}

LLen

  • 功能:返回列表key的长度。
  • 实现:
    • 直接获取list meta key对应的list meta value,把其中的链表长度解析出来返回即可。
Status Nemo::LLen(const std::string &key, int64_t *llen) {
    Status s;
    std::string meta_key = EncodeLMetaKey(key);
    std::string meta_val;
    s = list_db_->Get(rocksdb::ReadOptions(), meta_key, &meta_val);
    if (s.ok()) {
        if(meta_val.size() != sizeof(int64_t) * 4) {
            return Status::Corruption("list meta error");
        }
        *llen = *((int64_t *)(meta_val.data()));
        if (*llen <= 0) {
            return Status::NotFound("not found the key");
        }
    } else {
        *llen = 0;
    }
    return s;
}

LPush

  • 功能:将一个value插入到列表key的表尾(最左边)。
  • 实现:
    • 加行锁
    • 根据传入的key,编码出list meta key,调用Get接口得到list meta value,解码出来得到链表长度,链表首部序号,链表尾部序号,下一次可用序号
    • 判断链表首部序号是否大于0
      • 如果大于0,说明当前链表有节点,则需要对首部节点的值进行修改,需要修改这个节点的直接前缀节点编号,因为要在它的前面插入节点
      • 编码新节点的list data key
        • 新节点的key中使用的编号是list meta value中的序号
      • 编码新节点的list data value
        • 新节点的value中前面序号就是0, 后面序号就是当前首节点编号,这样链表的链接关系就生成了
      • 修改list meta value
        • 链表长度,链表首部序号,链表尾部序号,下一次可用序号
Status Nemo::LPush(const std::string &key, const std::string &val, int64_t *llen) {
    if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
       return Status::InvalidArgument("Invalid key length");
    }

    Status s;
    rocksdb::WriteBatch batch;
    ListMeta meta;
    std::string meta_val;

    int64_t priv;
    int64_t next;
    std::string en_val;
    std::string raw_val;
    std::string meta_key = EncodeLMetaKey(key);
    RecordLock l(&mutex_list_record_, key);

    s = list_db_->Get(rocksdb::ReadOptions(), meta_key, &meta_val);
    if (s.ok()) {
        if (meta.DecodeFrom(meta_val)) {
            if (meta.left > 0) {
                std::string l_key = EncodeListKey(key, meta.left);
                s = list_db_->Get(rocksdb::ReadOptions(), l_key, &en_val);
                if (!s.ok()) {
                    return Status::Corruption("get left error");
                }
                DecodeListVal(en_val, &priv, &next, raw_val);
                EncodeListVal(raw_val, meta.cur_seq, next, en_val);
                batch.Put(l_key, en_val);
            }

            std::string db_key = EncodeListKey(key, meta.cur_seq);
            EncodeListVal(val, 0, meta.left, en_val);
            batch.Put(db_key, en_val);

            meta.len++;
            meta.left = meta.cur_seq;
            if (meta.right == 0) {
                meta.right = meta.cur_seq;
            }
            meta.cur_seq++;
            meta.EncodeTo(meta_val);
            batch.Put(meta_key, meta_val);
            s = list_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
            *llen = meta.len;
            return s;
        } else {
            return Status::Corruption("parse listmeta error");
        }
    } else if (s.IsNotFound()) {
        ListMeta meta(1, 1, 1, 2); // | len | left | right | cur_seq |

        meta_val.reserve(4 * sizeof(int64_t));
        meta.EncodeTo(meta_val);

        batch.Put(meta_key, meta_val);
        EncodeListVal(val, 0, 0, en_val);
        batch.Put(EncodeListKey(key, 1), en_val);
        s = list_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
        *llen = 1;
        return s;
    } else {
        return Status::Corruption("get listmeta error");
    }
}

LPop

  • 功能:取出链表头部节点
  • 实现:
    • 加行锁
    • 根据传入的key,编码出list meta key,调用Get接口得到list meta value,解码出来得到链表长度,链表首部序号,链表尾部序号,下一次可用序号
    • 根据list meta value中的首节点编号,编码list data key,通过Get接口得到首节点list data value
    • 修改首节点的直接后继节点信息,设置返回值
    • 修改list meta value
    • 调用WriteWithOldKeyTTL写回硬盘
Status Nemo::LPop(const std::string &key, std::string *val) {
    if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
       return Status::InvalidArgument("Invalid key length");
    }
    Status s;
    rocksdb::WriteBatch batch;
    ListMeta meta;
    std::string meta_val;

    int64_t priv = 0;
    int64_t next = 0;
    std::string en_val;
    std::string raw_val;
    std::string meta_key = EncodeLMetaKey(key);
    RecordLock l(&mutex_list_record_, key);
    s = list_db_->Get(rocksdb::ReadOptions(), meta_key, &meta_val);
    if (s.ok()) {
        if (meta.DecodeFrom(meta_val)) {
            if (meta.len <= 0) {
                return Status::NotFound("not found key");
            }

            std::string db_key = EncodeListKey(key, meta.left);
            s = list_db_->Get(rocksdb::ReadOptions(), db_key, &en_val);
            if (!s.ok()) {
                return Status::Corruption("get meta.left error");
            }
            DecodeListVal(en_val, &priv, &next, *val);

            meta.left = next;
            if (next != 0) {
                std::string l_key = EncodeListKey(key, next);
                s = list_db_->Get(rocksdb::ReadOptions(), l_key, &en_val);
                if (!s.ok()) {
                    return Status::Corruption("get next left error");
                }
                DecodeListVal(en_val, &priv, &next, raw_val);
                EncodeListVal(raw_val, 0, next, en_val);
                batch.Put(l_key, en_val);
            }

            --meta.len;
            if (meta.len == 0) {
                meta.right = 0;
                meta.cur_seq = 1;
            }
            meta.EncodeTo(meta_val);
            batch.Put(meta_key, meta_val);
            batch.Delete(db_key);
            s = list_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
            return s;
        } else {
            return Status::Corruption("parse listmeta error");
        }
    } else if (s.IsNotFound()) {
        return Status::NotFound("not found key");
    } else {
        return Status::Corruption("get listmeta error");
    }
}

LPushx

  • 功能:将值value插入到列表key的表头,当且仅当key存在并且是一个列表。
  • 实现:
    • 直接获取list meta key对应的list meta value,把其中的链表长度解析出来判断是不是空链表
    • 如果不是空链表,直接调用LPush命令
Status Nemo::LPushx(const std::string &key, const std::string &val, int64_t *llen) {
    if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
       return Status::InvalidArgument("Invalid key length");
    }
    Status s;
    ListMeta meta;
    std::string meta_val;
    std::string meta_key = EncodeLMetaKey(key);
    s = list_db_->Get(rocksdb::ReadOptions(), meta_key, &meta_val);
    if (s.ok()) {
        if (meta.DecodeFrom(meta_val)) {
            if (meta.len == 0) {
                *llen = 0;
                return Status::NotFound("not found the key");
            } else if (meta.len > 0) {
                s = LPush(key, val, llen);
                return s;
            } else {
                return Status::Corruption("get invalid listlen");
            }
        } else {
            return Status::Corruption("parse listmeta error");
        }
    } else if (s.IsNotFound()) {
        *llen = 0;
        return Status::NotFound("not found the key");
    } else {
        return Status::Corruption("get listmeta error");
    }
}

LRange

  • 功能:返回列表key中指定区间内的元素,区间以偏移量start和stop指定,下标(index)参数start和stop都以0为底,也就是说,以0表示列表的第一个元素,以1表示列表的第二个元素,以此类推。你也可以使用负数下标,以-1表示列表的最后一个元素,-2表示列表的倒数第二个元素,以此类推。
  • 实现:
    TODO
    

LSet

  • 功能:将列表key下标为index的元素的值设置为value。
  • 实现:
    TODO
    

LTrim

  • 功能:对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。
  • 实现:
    TODO
    

RPush

  • 功能:将一个value插入到列表key的表尾(最右边)。
  • 实现:
    TODO
    

RPop

  • 功能:移除并返回列表key的尾元素
  • 实现:
    TODO
    

RPushx

  • 功能:将值value插入到列表key的表尾,当且仅当key存在并且是一个列表。
  • 实现:
    TODO
    

RPopLPush

  • 功能:命令 RPOPLPUSH 在一个原子时间内,执行以下两个动作:
    • 将列表source中的最后一个元素(尾元素)弹出,并返回给客户端。
    • 将source弹出的元素插入到列表destination ,作为destination列表的的头元素。
  • 实现:
    TODO
    

LInsert

  • 功能:将值value插入到链表key当中,位于值特定value位置之前或之后。
  • 实现:
    • 加行锁
    • 根据传入的key,编码出list meta key,调用Get接口得到list meta value,解码出来得到链表长度,链表首部序号,链表尾部序号,下一次可用序号
    • 从首节点开始遍历,直到遍历完链表尾部或者节点的数据和传入的节点value值相等
      • 如果需要插入节点value之前
        • 获取插入节点直接前缀节点,并修改其后面节点编号
        • 修改当前节点的前面节点编号
      • 如果需要插入节点value之后
        • 获取插入节点直接后继节点,并修改其前面节点编号
        • 修改当前节点后面节点编号
      • 新建节点,设置正确的前面节点、后面节点编号、value值
      • WriteWithOldKeyTTL批量写入
Status Nemo::LInsert(const std::string &key, Position pos, const std::string &pivot, const std::string &val, int64_t *llen) {
    if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
       return Status::InvalidArgument("Invalid key length");
    }

    Status s;
    rocksdb::WriteBatch batch;
    ListMeta meta;
    std::string meta_val;
    int64_t before_seq;
    int64_t after_seq;

    int64_t priv;
    int64_t next;
    std::string en_val;
    std::string raw_val;
    std::string meta_key = EncodeLMetaKey(key);
    //MutexLock l(&mutex_list_);
    RecordLock l(&mutex_list_record_, key);
    s = list_db_->Get(rocksdb::ReadOptions(), meta_key, &meta_val);
    if (s.ok()) {
        if (meta.DecodeFrom(meta_val)) {
            // traverse to find pivot
            next = meta.left;
            int64_t tmp_seq;
            do {
              tmp_seq = next;
              std::string cur_listkey = EncodeListKey(key, tmp_seq);
              s = list_db_->Get(rocksdb::ReadOptions(), cur_listkey, &en_val);
              if (!s.ok()) {
                return Status::Corruption("get listkey error");
              }
              DecodeListVal(en_val, &priv, &next, raw_val);
            } while (next != 0 && raw_val != pivot);

            if (raw_val == pivot) {
                if (pos == AFTER) {
                    before_seq = tmp_seq;
                    after_seq = next;
                } else {
                    before_seq = priv;
                    after_seq = tmp_seq;
                }

                // modify before node
                if (before_seq != 0) {
                    std::string cur_listkey = EncodeListKey(key, before_seq);
                    s = list_db_->Get(rocksdb::ReadOptions(), cur_listkey, &en_val);
                    if (!s.ok()) {
                        return Status::Corruption("get listkey error");
                    }
                    DecodeListVal(en_val, &priv, &next, raw_val);
                    EncodeListVal(raw_val, priv, meta.cur_seq, en_val);
                    batch.Put(cur_listkey, en_val);
                }
                // modify after node
                if (after_seq != 0) {
                    std::string cur_listkey = EncodeListKey(key, after_seq);
                    s = list_db_->Get(rocksdb::ReadOptions(), cur_listkey, &en_val);
                    if (!s.ok()) {
                        return Status::Corruption("get listkey error");
                    }
                    DecodeListVal(en_val, &priv, &next, raw_val);
                    EncodeListVal(raw_val, meta.cur_seq, next, en_val);
                    batch.Put(cur_listkey, en_val);
                }
                // add new node
                std::string add_key = EncodeListKey(key, meta.cur_seq);
                EncodeListVal(val, before_seq, after_seq, en_val);
                batch.Put(add_key, en_val);

                meta.len++;
                if (before_seq == 0) {
                    meta.left = meta.cur_seq;
                }
                if (after_seq == 0) {
                    meta.right = meta.cur_seq;
                }
                ++meta.cur_seq;

                meta.EncodeTo(meta_val);
                batch.Put(meta_key, meta_val);
                s = list_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
                *llen = meta.len;
                return s;
            } else {
                *llen = -1;
                return s;
            }
        } else {
            return Status::Corruption("parse listmeta error");
        }
    } else if (s.IsNotFound()) {
        *llen = 0;
        return s;
    } else {
        *llen = 0;
        return Status::Corruption("get key error");
    }
}

LRem

  • 功能:根据参数 count 的值,移除列表中与参数value相等的元素。
    • count 的值可以是以下几种:
    • count > 0 : 从表头开始向表尾搜索,移除与value相等的元素,数量为count 。
    • count < 0 : 从表尾开始向表头搜索,移除与value相等的元素,数量为count 的绝对值。
    • count = 0 : 移除表中所有与 value 相等的值。
  • 实现:
    TODO
    

results matching ""

    No results matching ""