zset

zsetset多一个score key,我的理解这个score key相当于为score增加一个列索引,当需要根据score精确查找、范围扫描的命令可以直接使用这个score key.例如:zrangebysocre(根据指定分数范围获取key)

数据格式

分别下面六个表格表示:

zset meta key:

meta 原始key字符串
S(1) key(var)

zset meta value:

数据成员个数
size(8)

zset member key:

meta key长度 key字符串 member
s(1) key size(1) key(var) member key(var)

zset member value:

value字符串
score

zset score key

meta key长度 key字符串 分数 member
y(1) key size(1) key(var) score(8) member key(var)

zset score value

value字符串
空字符串

举例

zadd test 100 member1

zadd test 99 member2

zadd test 100 member3

  • zset meta key:
meta 原始key字符串
S(1) key(var)
S test
  • zset meta value:
数据成员个数
size(8)
1
2
3
  • zset member key:
meta key长度 key字符串 member
s(1) key size(1) key(var) member key(var)
s 4 test member1
s 4 test member2
s 4 test member3
  • zset member value:
value字符串
100
99
100
  • zset score key:
meta key长度 key字符串 分数 member
y(1) key size(1) key(var) score(8) member key(var)
y 4 test 100 member1
y 4 test 99 member2
y 4 test 100 member3
  • zset score value:
value字符串
空字符串

zset meta key

inline std::string EncodeZSizeKey(const rocksdb::Slice key) {
    std::string buf;
    buf.append(1, DataType::kZSize);
    buf.append(key.data(), key.size());
    return buf;
}

inline int DecodeZSizeKey(const rocksdb::Slice &slice, std::string *size) {
    Decoder decoder(slice.data(), slice.size());
    if (decoder.Skip(1) == -1) {
        return -1;
    }
    if (decoder.ReadData(size) == -1) {
        return -1;
    }
    return 0;
}

zset meta value

typedef DefaultMeta ZSetMeta;

struct DefaultMeta : public NemoMeta {
  int64_t len;

  DefaultMeta() : len(0) {}
  explicit DefaultMeta(int64_t _len):len(_len) {}
  virtual bool DecodeFrom(const std::string& raw_meta) {
    if (raw_meta.size() != sizeof(uint64_t)) {
      return false;
    }
    len = *(int64_t *)raw_meta.data();
    return true;
  }
  virtual bool EncodeTo(std::string& raw_meta) {
    raw_meta.clear();
    raw_meta.append((char *)&len, sizeof(int64_t));
    return true;
  }
  virtual std::string ToString() {
    char buf[32];
    std::string res("Len : ");
    Int64ToStr(buf, 32, len);
    res.append(buf);
    return res;
  }
};

zset member key

inline std::string EncodeZScoreKey(const rocksdb::Slice &key, const rocksdb::Slice &member, const double score) {
    std::string buf;
    uint64_t new_score = EncodeScore(score);
    buf.append(1, DataType::kZScore);
    buf.append(1, (uint8_t)key.size());
    buf.append(key.data(), key.size());
    new_score = htobe64(new_score);
    buf.append((char *)&new_score, sizeof(int64_t));
    buf.append(member.data(), member.size());
    return buf;
}

inline int DecodeZScoreKey(const rocksdb::Slice &slice, std::string *key, std::string *member, double *score) {
    Decoder decoder(slice.data(), slice.size());
    if (decoder.Skip(1) == -1) {
        return -1;
    }
    if (decoder.ReadLenData(key) == -1) {
        return -1;
    }
    uint64_t iscore = 0;
    decoder.ReadUInt64(&iscore);
    //iscore = be64toh(iscore);
    *score = DecodeScore(iscore);
    if (decoder.ReadData(member) == -1) {
        return -1;
    }
    return 0;
}

zset member value

value字符串
score

zset score key

meta key长度 key字符串 分数 member
y(1) key size(1) key(var) score(8) member key(var)
inline std::string EncodeZScoreKey(const rocksdb::Slice &key, const rocksdb::Slice &member, const double score) {
    std::string buf;
    uint64_t new_score = EncodeScore(score);
    buf.append(1, DataType::kZScore);
    buf.append(1, (uint8_t)key.size());
    buf.append(key.data(), key.size());
    new_score = htobe64(new_score);
    buf.append((char *)&new_score, sizeof(int64_t));
    buf.append(member.data(), member.size());
    return buf;
}

inline int DecodeZScoreKey(const rocksdb::Slice &slice, std::string *key, std::string *member, double *score) {
    Decoder decoder(slice.data(), slice.size());
    if (decoder.Skip(1) == -1) {
        return -1;
    }
    if (decoder.ReadLenData(key) == -1) {
        return -1;
    }
    uint64_t iscore = 0;
    decoder.ReadUInt64(&iscore);
    //iscore = be64toh(iscore);
    *score = DecodeScore(iscore);
    if (decoder.ReadData(member) == -1) {
        return -1;
    }
    return 0;
}

代码实现

ZAdd

  • 功能:将一个member元素及其score值加入到有序集key当中
  • 实现:
    • 获取行锁
    • 调用DoZSet执行具体的写入操作,需要写入的kv,加到writebatch中
      • keymember编码出zset member key,调用Get接口获取zset member value
        • 如果zset member key存在
          • key,member, old_score编码出old zset member key,删除操作加入writebatch
          • key,member,score编码出zset score key加入writebatch
          • zset member key加入writebatch
          • 设置返回值1
        • 如果zset member key不存在
          • key,member,score编码出zset score key加入writebatch
          • zset member key加入writebatch
          • 设置返回值2
    • 判断返回值
      • 返回值2
        • 修改zset meta value,将member数量增加1
        • 调用WriteWithOldKeyTTL写入writebatch
      • 返回值1
        • 调用WriteWithOldKeyTTL写入writebatch
int Nemo::DoZSet(const std::string &key, const double score, const std::string &member, rocksdb::WriteBatch &writebatch) {
    Status s;
    std::string old_score;
    std::string score_key;
    std::string db_key = EncodeZSetKey(key, member);
    s = zset_db_->Get(rocksdb::ReadOptions(), db_key, &old_score);
    if (s.ok()) {
        double dval = *((double *)old_score.data());
        /* find the same value */ 
        if (fabs(dval - score) < eps) {
            return 0;
        } else {
          score_key = EncodeZScoreKey(key, member, dval);
          writebatch.Delete(score_key);

          score_key = EncodeZScoreKey(key, member, score);
          writebatch.Put(score_key, "");

          std::string buf;
          buf.append((char *)(&score), sizeof(double));
          writebatch.Put(db_key, buf);
          return 1;
        }
    } else if (s.IsNotFound()) {
        score_key = EncodeZScoreKey(key, member, score);
        writebatch.Put(score_key, "");

        std::string buf;
        buf.append((char *)(&score), sizeof(double));
        writebatch.Put(db_key, buf);
        return 2;
    } else {
        return -1;
    }
}

Status Nemo::ZAdd(const std::string &key, const double score, const std::string &member, int64_t *res) {
    if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
       return Status::InvalidArgument("Invalid key length");
    }
    Status s;
    if (score < ZSET_SCORE_MIN || score > ZSET_SCORE_MAX) {
       return Status::InvalidArgument("score overflow");
    }
    if (key.size() >= KEY_MAX_LENGTH) {
       return Status::InvalidArgument("Invalid key length");
    }
    rocksdb::WriteBatch batch;
    RecordLock l(&mutex_zset_record_, key);
    int ret = DoZSet(key, score, member, batch);
    if (ret == 2) {
        if (IncrZLen(key, 1, batch) == 0) {
            s = zset_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
            *res = 1;
            return s;
        } else {
            return Status::Corruption("incr zsize error");
        }
    } else if (ret == 1) {
        *res = 0;
        s = zset_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
        return s;
    } else if (ret == 0) {
        *res = 0;
        return Status::OK();
    } else {
        return Status::Corruption("zadd error");
    }
}

ZCard

  • 功能:返回有序集key的基数。
  • 实现:
    • key编码出zset meta key,调用Get获取zset meta value
      • 如果zset meta key存在
        • 返回zset meta value中的size
      • 如果zset meta key不存在
        • 返回0
int64_t Nemo::ZCard(const std::string &key) {
    Status s;
    std::string val;
    std::string size_key = EncodeZSizeKey(key);
    s = zset_db_->Get(rocksdb::ReadOptions(), size_key, &val);
    if (s.ok()) {
        if (val.size() != sizeof(uint64_t)) {
            return 0;
        }
        int64_t ret = *(int64_t *)val.data();
        return ret < 0? 0 : ret;
    } else if (s.IsNotFound()) {
        return 0;
    } else {
        return -1;
    }
}

ZCount

  • 功能:返回有序集key中, score值在min和max之间(默认包括score值等于min或max)的成员的数量。
  • 实现:
    • ZScan返回一个迭代器
      • key,`,begin编码zset score key`作为起点
    • 遍历迭代器,判断每个member的score是否在这个范围之内,如果在给定的score范围之内,返回值增加1
    • ReleaseSnapshot释放snapshot
ZIterator* Nemo::ZScan(const std::string &key, const double begin, const double end, uint64_t limit, bool use_snapshot) {
    double rel_begin = begin;
    double rel_end = end + eps;
    if (begin < ZSET_SCORE_MIN) {
      rel_begin = ZSET_SCORE_MIN;
    }
    if (end > ZSET_SCORE_MAX) {
      rel_end = ZSET_SCORE_MAX;
    }
    std::string key_start, key_end;
    key_start = EncodeZScoreKey(key, "", rel_begin);
    key_end = EncodeZScoreKey(key, "", rel_end);
    rocksdb::ReadOptions read_options;
    if (use_snapshot) {
        read_options.snapshot = zset_db_->GetSnapshot();
    }
    read_options.fill_cache = false;
    IteratorOptions iter_options(key_end, limit, read_options);
    rocksdb::Iterator *it = zset_db_->NewIterator(read_options);
    it->Seek(key_start);
    return new ZIterator(it, iter_options, key); 
}

int64_t Nemo::ZCount(const std::string &key, const double begin, const double end, bool is_lo, bool is_ro) {
    double b = is_lo ? begin + eps : begin;
    double e = is_ro ? end - eps : end;
    ZIterator* it = ZScan(key, b, e, -1, true);
    double s;
    int64_t n = 0;
    for (; it->Valid(); it->Next()) {
        s = it->score();
        if (s >= begin && s <= end ) {
            n++;
        } else {
            break;
        }
    }
    zset_db_->ReleaseSnapshot(it->read_options().snapshot);
    delete it;
    return n;
}

ZIncrby

  • 功能:为有序集key的成员member的score值加上增量increment
  • 实现:
    TODO
    

ZRange

  • 功能:返回有序集key中,指定排序位置区间内的成员。其中成员的位置按score值递增(从小到大)来排序。
  • 实现:
    • ZCard获取元素总数
    • 确定开始位置t_start、结束位置t_stop
    • key,`,ZSET_SCORE_MAX编码最大的zset score key`
      • 从后面开始迭代,直到到了t_stop位置,开始将迭代到的值放到返回结果里面
Status Nemo::ZRange(const std::string &key, const int64_t start, const int64_t stop, std::vector<SM> &sms) {
    if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
       return Status::InvalidArgument("Invalid key length");
    }
    int64_t t_size = ZCard(key);
    if (t_size >= 0) {
        int64_t t_start = start >= 0 ? start : t_size + start;
        int64_t t_stop = stop >= 0 ? stop : t_size + stop;
        if (t_start < 0) {
            t_start = 0;
        }
        if (t_stop > t_size - 1) {
            t_stop = t_size - 1;
        }
        if (t_start > t_stop || t_start > t_size - 1 || t_stop < 0) {
            return Status::OK();
        } else {
            int n = 0;
            ZIterator* iter = NULL;
            //member总数大于1000并且start大于size/2
            if (t_size > 1000 && t_start > t_size / 2) {
              std::string zscore_key_end = EncodeZScoreKey(key, "", ZSET_SCORE_MAX);
              std::string zscore_key_start = "";
              rocksdb::ReadOptions read_options;
              read_options.snapshot = zset_db_->GetSnapshot();
              read_options.fill_cache = false;
              IteratorOptions iter_options(zscore_key_start, -1, read_options, kBackward);
              rocksdb::Iterator* rocksdb_it = zset_db_->NewIterator(read_options);
              rocksdb_it->Seek(zscore_key_end);
              rocksdb_it->Prev();
              iter = new ZIterator(rocksdb_it, iter_options, key);
              n = t_size - 1;
              for (; n > t_stop && iter->Valid(); iter->Next(), n--);
              if (n != t_stop) {
                zset_db_->ReleaseSnapshot(iter->read_options().snapshot);
                delete iter;
                return Status::Corruption("ziterate error");
              }
              sms.resize(t_stop - t_start + 1);
              for (; n >= t_start && iter->Valid(); n--) {
                sms[n - t_start] = {iter->score(), iter->member()};
                iter->Next();
              }
            } else {
              iter = ZScan(key, ZSET_SCORE_MIN, ZSET_SCORE_MAX, -1, true);
              if (iter == NULL) {
                return Status::Corruption("zscan error");
              }
              n = 0;
              for (; n < t_start && iter->Valid(); iter->Next(), n++);

              if (n < t_start) {
                  zset_db_->ReleaseSnapshot(iter->read_options().snapshot);
                  delete iter;
                  return Status::Corruption("ziterate error");
              }
              for (; n <= t_stop && iter->Valid(); iter->Next(), n++) {
                  sms.push_back({iter->score(), iter->member()});
              }
            }
            zset_db_->ReleaseSnapshot(iter->read_options().snapshot);
            delete iter;
            return Status::OK();
        }
    } else {
        return Status::Corruption("get zsize error");
    }
}

ZUnionStore

  • 功能:计算给定的一个或多个有序集的并集,其中给定key的数量必须以numkeys参数指定,并将该并集(结果集)储存到destination 。默认情况下,结果集中某个成员的score值是所有给定集下该成员score值之和 。
  • 实现:

    Status Nemo::ZUnionStore(const std::string &destination, const int numkeys, const std::vector<std::string>& keys, const std::vector<double>& weights = std::vector<double>(), Aggregate agg = SUM, int64_t *res = 0) {
      std::map<std::string, double> mp_member_score;
      *res = 0;
      RecordLock l(&mutex_zset_record_, destination);
      for (int key_i = 0; key_i < numkeys; key_i++) {
        mutex_set_record_.Lock(keys[key_i]);
      }
      int weights_size = static_cast<int>(weights.size());
      for (int key_i = 0; key_i < numkeys; key_i++) {
          ZIterator *iter = ZScan(keys[key_i], ZSET_SCORE_MIN, ZSET_SCORE_MAX, -1);
          for (; iter->Valid(); iter->Next()) {
              std::string member = iter->member();
    
              double weight = 1;
              if (weights_size > key_i) {
                  weight = weights[key_i];
              }
    
              if (mp_member_score.find(member) == mp_member_score.end()) {
                  mp_member_score[member] = weight * iter->score();
              } else {
                  double score = mp_member_score[member];
                  switch (agg) {
                    case SUM: score += weight * iter->score(); break;
                    case MIN: score = std::min(score, weight * iter->score()); break;
                    case MAX: score = std::max(score, weight * iter->score()); break;
                  }
                  mp_member_score[member] = score;
              }
          }
          delete iter;
      }
    
      int64_t add_ret;
      int64_t rem_ret;
      std::map<std::string, double>::iterator it;
      Status status;
      status = ZRemrangebyrankNoLock(destination, 0, -1, &rem_ret);
      for (it = mp_member_score.begin(); it != mp_member_score.end(); it++) {
          status = ZAddNoLock(destination, it->second, it->first, &add_ret);
          if (!status.ok()) {
              break;
          }
      }
      for (int key_i = 0; key_i < numkeys; key_i++) {
        mutex_set_record_.Unlock(keys[key_i]);
      }
      *res = mp_member_score.size();
      return Status::OK();
    }s
    

ZInterStore

  • 功能:计算给定的一个或多个有序集的交集,其中给定key的数量必须以numkeys参数指定,并将该交集(结果集)储存到destination。默认情况下,结果集中某个成员的score值是所有给定集下该成员score值之和.
  • 实现:
    TODO
    

ZRangebyscore

  • 功能:返回有序集key中,所有score值介于min和max之间(包括等于min或max)的成员。有序集成员按score值递增(从小到大)次序排列。
  • 实现:
    • 实现起来简单,用key,`,start score,stop score编码zset score key`,获取迭代器,迭代就可以了
Status Nemo::ZRangebyscore(const std::string &key, const double mn, const double mx, std::vector<SM> &sms, bool is_lo, bool is_ro) {
    if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
       return Status::InvalidArgument("Invalid key length");
    }
    double start = is_lo ? mn + eps : mn;
    double stop = is_ro ? mx - eps : mx;
    ZIterator *iter = ZScan(key, start, stop, -1, true);
    for (; iter->Valid(); iter->Next()) {
        sms.push_back({iter->score(), iter->member()});
    }
    zset_db_->ReleaseSnapshot(iter->read_options().snapshot);
    delete iter;
    return Status::OK();
}

ZRem

  • 功能:移除有序集key中的一个或多个成员,不存在的成员将被忽略。当key存在但不是有序集类型时,返回一个错误。
  • 实现:
    • key,member编码zset member key获取zset member value
      • 如果存在
        • 删除zset member key
        • 删除zse score key
        • zset meta value中的member size减1
Status Nemo::ZRem(const std::string &key, const std::string &member, int64_t *res) {
    if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
       return Status::InvalidArgument("Invalid key length");
    }
    Status s;
    *res = 0;
    rocksdb::WriteBatch batch;
    std::string old_score;
    RecordLock l(&mutex_zset_record_, key);
    std::string db_key = EncodeZSetKey(key, member);
    s = zset_db_->Get(rocksdb::ReadOptions(), db_key, &old_score);
    if (s.ok()) {
      batch.Delete(db_key);
      double dscore = *((double *)old_score.data());
      std::string score_key = EncodeZScoreKey(key, member, dscore);
      batch.Delete(score_key);
      if (IncrZLen(key, -1, batch) == 0) {
        s = zset_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
        *res = 1;
        return s;
      } else {
        return Status::Corruption("incr zsize error");
      }
    } else {
      return s;
    }
}

ZRank

  • 功能:返回有序集key中成员member的排名。其中有序集成员按score值递增(从小到大)顺序排列。排名以0为底,也就是说,score值最小的成员排名为0
  • 实现:
    • key,member编码zset member key调用Get获取
      • 如果存在
        • key,`,ZSET_SCORE_MIN,ZSET_SCORE_MAX`得到迭代器
        • 遍历迭代器每一项,判断member是不是相等,不相等排名+1,相等停止迭代
        • 返回排名
Status Nemo::ZRank(const std::string &key, const std::string &member, int64_t *rank) {
    Status s;
    *rank = 0;
    std::string old_score;
    std::string db_key = EncodeZSetKey(key, member);
    s = zset_db_->Get(rocksdb::ReadOptions(), db_key, &old_score);
    int64_t count = 0;
    if (s.ok()) {
        ZIterator *iter = ZScan(key, ZSET_SCORE_MIN, ZSET_SCORE_MAX, -1, true);
        for (; iter->Valid() && iter->member().compare(member) != 0; iter->Next()) {
            count++;
        }
        if (iter->member().compare(member) == 0) {
            *rank = count;
        }
        zset_db_->ReleaseSnapshot(iter->read_options().snapshot);
        delete iter;
    }
    return s;
}

ZRevrank

  • 功能:返回有序集key中成员member的排名。其中有序集成员按score值递减(从大到小)排序。排名以0为底,也就是说,score值最大的成员排名为0。使用ZRANK命令可以获得成员按score值递增(从小到大)排列的排名。
  • 实现:
    TODO
    

ZScore

  • 功能:返回有序集key中,成员member的score值。如果member元素不是有序集key的成员,或key不存在,返回nil。
  • 实现:
Status Nemo::ZScore(const std::string &key, const std::string &member, double *score) {
    Status s;
    *score = 0;
    std::string str_score;

    std::string db_key = EncodeZSetKey(key, member);
    s = zset_db_->Get(rocksdb::ReadOptions(), db_key, &str_score);
    if (s.ok()) {
        *score = *((double *)(str_score.data()));
    }
    return s;
}

ZRangebylex

  • 功能:命令返回存储在键的排序集合在指定的范围元素
  • 实现:
    TODO
    

ZLexcount

  • 功能:命令在计算有序集合中指定字典区间内成员数量。
  • 实现:
    TODO
    

ZRemrangebylex

  • 功能:命令删除有序集合存储在由最小和最大指定的字典范围之间的所有键元素。
  • 实现:
    TODO
    

ZRemrangebyrank

  • 功能:移除有序集 key 中,指定排名(rank)区间内的所有成员。区间分别以下标参数 start 和 stop 指出,包含 start 和 stop 在内。下标参数 start 和 stop 都以0为底,也就是说,以0表示有序集第一个成员,以1表示有序集第二个成员,以此类推。你也可以使用负数下标,以-1表示最后一个成员,-2表示倒数第二个成员,以此类推。
  • 实现:
    TODO
    

ZRemrangebyscore

  • 功能:移除有序集key中,所有score值介于min和max之间(包括等于min或max)的成员。
  • 实现:
    TODO
    

results matching ""

    No results matching ""