zset
zset
比set
多一个score key
,我的理解这个score key
相当于为score
增加一个列索引,当需要根据score
精确查找、范围扫描的命令可以直接使用这个score key
.例如:zrangebysocre
(根据指定分数范围获取key)
数据格式
分别下面六个表格表示:
meta |
原始key字符串 |
S(1) |
key(var) |
zset member key:
meta |
key长度 |
key字符串 |
member |
s(1) |
key size(1) |
key(var) |
member key(var) |
zset member value:
zset score key
meta |
key长度 |
key字符串 |
分数 |
member |
y(1) |
key size(1) |
key(var) |
score(8) |
member key(var) |
zset score value
举例
zadd test 100 member1
zadd test 99 member2
zadd test 100 member3
meta |
原始key字符串 |
S(1) |
key(var) |
S |
test |
meta |
key长度 |
key字符串 |
member |
s(1) |
key size(1) |
key(var) |
member key(var) |
s |
4 |
test |
member1 |
s |
4 |
test |
member2 |
s |
4 |
test |
member3 |
meta |
key长度 |
key字符串 |
分数 |
member |
y(1) |
key size(1) |
key(var) |
score(8) |
member key(var) |
y |
4 |
test |
100 |
member1 |
y |
4 |
test |
99 |
member2 |
y |
4 |
test |
100 |
member3 |
inline std::string EncodeZSizeKey(const rocksdb::Slice key) {
std::string buf;
buf.append(1, DataType::kZSize);
buf.append(key.data(), key.size());
return buf;
}
inline int DecodeZSizeKey(const rocksdb::Slice &slice, std::string *size) {
Decoder decoder(slice.data(), slice.size());
if (decoder.Skip(1) == -1) {
return -1;
}
if (decoder.ReadData(size) == -1) {
return -1;
}
return 0;
}
typedef DefaultMeta ZSetMeta;
struct DefaultMeta : public NemoMeta {
int64_t len;
DefaultMeta() : len(0) {}
explicit DefaultMeta(int64_t _len):len(_len) {}
virtual bool DecodeFrom(const std::string& raw_meta) {
if (raw_meta.size() != sizeof(uint64_t)) {
return false;
}
len = *(int64_t *)raw_meta.data();
return true;
}
virtual bool EncodeTo(std::string& raw_meta) {
raw_meta.clear();
raw_meta.append((char *)&len, sizeof(int64_t));
return true;
}
virtual std::string ToString() {
char buf[32];
std::string res("Len : ");
Int64ToStr(buf, 32, len);
res.append(buf);
return res;
}
};
zset member key
inline std::string EncodeZScoreKey(const rocksdb::Slice &key, const rocksdb::Slice &member, const double score) {
std::string buf;
uint64_t new_score = EncodeScore(score);
buf.append(1, DataType::kZScore);
buf.append(1, (uint8_t)key.size());
buf.append(key.data(), key.size());
new_score = htobe64(new_score);
buf.append((char *)&new_score, sizeof(int64_t));
buf.append(member.data(), member.size());
return buf;
}
inline int DecodeZScoreKey(const rocksdb::Slice &slice, std::string *key, std::string *member, double *score) {
Decoder decoder(slice.data(), slice.size());
if (decoder.Skip(1) == -1) {
return -1;
}
if (decoder.ReadLenData(key) == -1) {
return -1;
}
uint64_t iscore = 0;
decoder.ReadUInt64(&iscore);
//iscore = be64toh(iscore);
*score = DecodeScore(iscore);
if (decoder.ReadData(member) == -1) {
return -1;
}
return 0;
}
zset member value
zset score key
meta |
key长度 |
key字符串 |
分数 |
member |
y(1) |
key size(1) |
key(var) |
score(8) |
member key(var) |
inline std::string EncodeZScoreKey(const rocksdb::Slice &key, const rocksdb::Slice &member, const double score) {
std::string buf;
uint64_t new_score = EncodeScore(score);
buf.append(1, DataType::kZScore);
buf.append(1, (uint8_t)key.size());
buf.append(key.data(), key.size());
new_score = htobe64(new_score);
buf.append((char *)&new_score, sizeof(int64_t));
buf.append(member.data(), member.size());
return buf;
}
inline int DecodeZScoreKey(const rocksdb::Slice &slice, std::string *key, std::string *member, double *score) {
Decoder decoder(slice.data(), slice.size());
if (decoder.Skip(1) == -1) {
return -1;
}
if (decoder.ReadLenData(key) == -1) {
return -1;
}
uint64_t iscore = 0;
decoder.ReadUInt64(&iscore);
//iscore = be64toh(iscore);
*score = DecodeScore(iscore);
if (decoder.ReadData(member) == -1) {
return -1;
}
return 0;
}
代码实现
ZAdd
- 功能:将一个member元素及其score值加入到有序集key当中
- 实现:
- 获取行锁
- 调用
DoZSet
执行具体的写入操作,需要写入的kv,加到writebatch中
- 用
key
和member
编码出zset member key
,调用Get
接口获取zset member value
- 如果
zset member key
存在
- 用
key
,member
, old_score
编码出old zset member key
,删除操作加入writebatch
- 用
key
,member
,score
编码出zset score key
加入writebatch
zset member key
加入writebatch
- 设置返回值1
- 如果
zset member key
不存在
- 用
key
,member
,score
编码出zset score key
加入writebatch
zset member key
加入writebatch
- 设置返回值2
- 判断返回值
- 返回值2
- 修改
zset meta value
,将member
数量增加1
- 调用
WriteWithOldKeyTTL
写入writebatch
- 返回值1
- 调用
WriteWithOldKeyTTL
写入writebatch
int Nemo::DoZSet(const std::string &key, const double score, const std::string &member, rocksdb::WriteBatch &writebatch) {
Status s;
std::string old_score;
std::string score_key;
std::string db_key = EncodeZSetKey(key, member);
s = zset_db_->Get(rocksdb::ReadOptions(), db_key, &old_score);
if (s.ok()) {
double dval = *((double *)old_score.data());
/* find the same value */
if (fabs(dval - score) < eps) {
return 0;
} else {
score_key = EncodeZScoreKey(key, member, dval);
writebatch.Delete(score_key);
score_key = EncodeZScoreKey(key, member, score);
writebatch.Put(score_key, "");
std::string buf;
buf.append((char *)(&score), sizeof(double));
writebatch.Put(db_key, buf);
return 1;
}
} else if (s.IsNotFound()) {
score_key = EncodeZScoreKey(key, member, score);
writebatch.Put(score_key, "");
std::string buf;
buf.append((char *)(&score), sizeof(double));
writebatch.Put(db_key, buf);
return 2;
} else {
return -1;
}
}
Status Nemo::ZAdd(const std::string &key, const double score, const std::string &member, int64_t *res) {
if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
return Status::InvalidArgument("Invalid key length");
}
Status s;
if (score < ZSET_SCORE_MIN || score > ZSET_SCORE_MAX) {
return Status::InvalidArgument("score overflow");
}
if (key.size() >= KEY_MAX_LENGTH) {
return Status::InvalidArgument("Invalid key length");
}
rocksdb::WriteBatch batch;
RecordLock l(&mutex_zset_record_, key);
int ret = DoZSet(key, score, member, batch);
if (ret == 2) {
if (IncrZLen(key, 1, batch) == 0) {
s = zset_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
*res = 1;
return s;
} else {
return Status::Corruption("incr zsize error");
}
} else if (ret == 1) {
*res = 0;
s = zset_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
return s;
} else if (ret == 0) {
*res = 0;
return Status::OK();
} else {
return Status::Corruption("zadd error");
}
}
ZCard
- 功能:返回有序集key的基数。
- 实现:
- 用
key
编码出zset meta key
,调用Get
获取zset meta value
- 如果
zset meta key
存在
- 如果
zset meta key
不存在
int64_t Nemo::ZCard(const std::string &key) {
Status s;
std::string val;
std::string size_key = EncodeZSizeKey(key);
s = zset_db_->Get(rocksdb::ReadOptions(), size_key, &val);
if (s.ok()) {
if (val.size() != sizeof(uint64_t)) {
return 0;
}
int64_t ret = *(int64_t *)val.data();
return ret < 0? 0 : ret;
} else if (s.IsNotFound()) {
return 0;
} else {
return -1;
}
}
ZCount
- 功能:返回有序集key中, score值在min和max之间(默认包括score值等于min或max)的成员的数量。
- 实现:
- 用
ZScan
返回一个迭代器
key
,`,
begin编码
zset score key`作为起点
- 遍历迭代器,判断每个member的score是否在这个范围之内,如果在给定的score范围之内,返回值增加1
ReleaseSnapshot
释放snapshot
ZIterator* Nemo::ZScan(const std::string &key, const double begin, const double end, uint64_t limit, bool use_snapshot) {
double rel_begin = begin;
double rel_end = end + eps;
if (begin < ZSET_SCORE_MIN) {
rel_begin = ZSET_SCORE_MIN;
}
if (end > ZSET_SCORE_MAX) {
rel_end = ZSET_SCORE_MAX;
}
std::string key_start, key_end;
key_start = EncodeZScoreKey(key, "", rel_begin);
key_end = EncodeZScoreKey(key, "", rel_end);
rocksdb::ReadOptions read_options;
if (use_snapshot) {
read_options.snapshot = zset_db_->GetSnapshot();
}
read_options.fill_cache = false;
IteratorOptions iter_options(key_end, limit, read_options);
rocksdb::Iterator *it = zset_db_->NewIterator(read_options);
it->Seek(key_start);
return new ZIterator(it, iter_options, key);
}
int64_t Nemo::ZCount(const std::string &key, const double begin, const double end, bool is_lo, bool is_ro) {
double b = is_lo ? begin + eps : begin;
double e = is_ro ? end - eps : end;
ZIterator* it = ZScan(key, b, e, -1, true);
double s;
int64_t n = 0;
for (; it->Valid(); it->Next()) {
s = it->score();
if (s >= begin && s <= end ) {
n++;
} else {
break;
}
}
zset_db_->ReleaseSnapshot(it->read_options().snapshot);
delete it;
return n;
}
ZIncrby
- 功能:为有序集key的成员member的score值加上增量increment
- 实现:
TODO
ZRange
- 功能:返回有序集key中,指定排序位置区间内的成员。其中成员的位置按score值递增(从小到大)来排序。
- 实现:
ZCard
获取元素总数
- 确定开始位置t_start、结束位置t_stop
- 用
key
,`,
ZSET_SCORE_MAX编码最大的
zset score key`
- 从后面开始迭代,直到到了t_stop位置,开始将迭代到的值放到返回结果里面
Status Nemo::ZRange(const std::string &key, const int64_t start, const int64_t stop, std::vector<SM> &sms) {
if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
return Status::InvalidArgument("Invalid key length");
}
int64_t t_size = ZCard(key);
if (t_size >= 0) {
int64_t t_start = start >= 0 ? start : t_size + start;
int64_t t_stop = stop >= 0 ? stop : t_size + stop;
if (t_start < 0) {
t_start = 0;
}
if (t_stop > t_size - 1) {
t_stop = t_size - 1;
}
if (t_start > t_stop || t_start > t_size - 1 || t_stop < 0) {
return Status::OK();
} else {
int n = 0;
ZIterator* iter = NULL;
//member总数大于1000并且start大于size/2
if (t_size > 1000 && t_start > t_size / 2) {
std::string zscore_key_end = EncodeZScoreKey(key, "", ZSET_SCORE_MAX);
std::string zscore_key_start = "";
rocksdb::ReadOptions read_options;
read_options.snapshot = zset_db_->GetSnapshot();
read_options.fill_cache = false;
IteratorOptions iter_options(zscore_key_start, -1, read_options, kBackward);
rocksdb::Iterator* rocksdb_it = zset_db_->NewIterator(read_options);
rocksdb_it->Seek(zscore_key_end);
rocksdb_it->Prev();
iter = new ZIterator(rocksdb_it, iter_options, key);
n = t_size - 1;
for (; n > t_stop && iter->Valid(); iter->Next(), n--);
if (n != t_stop) {
zset_db_->ReleaseSnapshot(iter->read_options().snapshot);
delete iter;
return Status::Corruption("ziterate error");
}
sms.resize(t_stop - t_start + 1);
for (; n >= t_start && iter->Valid(); n--) {
sms[n - t_start] = {iter->score(), iter->member()};
iter->Next();
}
} else {
iter = ZScan(key, ZSET_SCORE_MIN, ZSET_SCORE_MAX, -1, true);
if (iter == NULL) {
return Status::Corruption("zscan error");
}
n = 0;
for (; n < t_start && iter->Valid(); iter->Next(), n++);
if (n < t_start) {
zset_db_->ReleaseSnapshot(iter->read_options().snapshot);
delete iter;
return Status::Corruption("ziterate error");
}
for (; n <= t_stop && iter->Valid(); iter->Next(), n++) {
sms.push_back({iter->score(), iter->member()});
}
}
zset_db_->ReleaseSnapshot(iter->read_options().snapshot);
delete iter;
return Status::OK();
}
} else {
return Status::Corruption("get zsize error");
}
}
ZUnionStore
- 功能:计算给定的一个或多个有序集的并集,其中给定key的数量必须以numkeys参数指定,并将该并集(结果集)储存到destination 。默认情况下,结果集中某个成员的score值是所有给定集下该成员score值之和 。
实现:
Status Nemo::ZUnionStore(const std::string &destination, const int numkeys, const std::vector<std::string>& keys, const std::vector<double>& weights = std::vector<double>(), Aggregate agg = SUM, int64_t *res = 0) {
std::map<std::string, double> mp_member_score;
*res = 0;
RecordLock l(&mutex_zset_record_, destination);
for (int key_i = 0; key_i < numkeys; key_i++) {
mutex_set_record_.Lock(keys[key_i]);
}
int weights_size = static_cast<int>(weights.size());
for (int key_i = 0; key_i < numkeys; key_i++) {
ZIterator *iter = ZScan(keys[key_i], ZSET_SCORE_MIN, ZSET_SCORE_MAX, -1);
for (; iter->Valid(); iter->Next()) {
std::string member = iter->member();
double weight = 1;
if (weights_size > key_i) {
weight = weights[key_i];
}
if (mp_member_score.find(member) == mp_member_score.end()) {
mp_member_score[member] = weight * iter->score();
} else {
double score = mp_member_score[member];
switch (agg) {
case SUM: score += weight * iter->score(); break;
case MIN: score = std::min(score, weight * iter->score()); break;
case MAX: score = std::max(score, weight * iter->score()); break;
}
mp_member_score[member] = score;
}
}
delete iter;
}
int64_t add_ret;
int64_t rem_ret;
std::map<std::string, double>::iterator it;
Status status;
status = ZRemrangebyrankNoLock(destination, 0, -1, &rem_ret);
for (it = mp_member_score.begin(); it != mp_member_score.end(); it++) {
status = ZAddNoLock(destination, it->second, it->first, &add_ret);
if (!status.ok()) {
break;
}
}
for (int key_i = 0; key_i < numkeys; key_i++) {
mutex_set_record_.Unlock(keys[key_i]);
}
*res = mp_member_score.size();
return Status::OK();
}s
ZInterStore
- 功能:计算给定的一个或多个有序集的交集,其中给定key的数量必须以numkeys参数指定,并将该交集(结果集)储存到destination。默认情况下,结果集中某个成员的score值是所有给定集下该成员score值之和.
- 实现:
TODO
ZRangebyscore
- 功能:返回有序集key中,所有score值介于min和max之间(包括等于min或max)的成员。有序集成员按score值递增(从小到大)次序排列。
- 实现:
- 实现起来简单,用
key
,`,
start score,
stop score编码
zset score key`,获取迭代器,迭代就可以了
Status Nemo::ZRangebyscore(const std::string &key, const double mn, const double mx, std::vector<SM> &sms, bool is_lo, bool is_ro) {
if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
return Status::InvalidArgument("Invalid key length");
}
double start = is_lo ? mn + eps : mn;
double stop = is_ro ? mx - eps : mx;
ZIterator *iter = ZScan(key, start, stop, -1, true);
for (; iter->Valid(); iter->Next()) {
sms.push_back({iter->score(), iter->member()});
}
zset_db_->ReleaseSnapshot(iter->read_options().snapshot);
delete iter;
return Status::OK();
}
ZRem
- 功能:移除有序集key中的一个或多个成员,不存在的成员将被忽略。当key存在但不是有序集类型时,返回一个错误。
- 实现:
key
,member
编码zset member key
获取zset member value
- 如果存在
- 删除
zset member key
- 删除
zse score key
- 对
zset meta value
中的member size
减1
Status Nemo::ZRem(const std::string &key, const std::string &member, int64_t *res) {
if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
return Status::InvalidArgument("Invalid key length");
}
Status s;
*res = 0;
rocksdb::WriteBatch batch;
std::string old_score;
RecordLock l(&mutex_zset_record_, key);
std::string db_key = EncodeZSetKey(key, member);
s = zset_db_->Get(rocksdb::ReadOptions(), db_key, &old_score);
if (s.ok()) {
batch.Delete(db_key);
double dscore = *((double *)old_score.data());
std::string score_key = EncodeZScoreKey(key, member, dscore);
batch.Delete(score_key);
if (IncrZLen(key, -1, batch) == 0) {
s = zset_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &batch);
*res = 1;
return s;
} else {
return Status::Corruption("incr zsize error");
}
} else {
return s;
}
}
ZRank
- 功能:返回有序集key中成员member的排名。其中有序集成员按score值递增(从小到大)顺序排列。排名以0为底,也就是说,score值最小的成员排名为0
- 实现:
- 用
key
,member
编码zset member key
调用Get
获取
- 如果存在
- 用
key
,`,
ZSET_SCORE_MIN,
ZSET_SCORE_MAX`得到迭代器
- 遍历迭代器每一项,判断
member
是不是相等,不相等排名+1,相等停止迭代
- 返回排名
Status Nemo::ZRank(const std::string &key, const std::string &member, int64_t *rank) {
Status s;
*rank = 0;
std::string old_score;
std::string db_key = EncodeZSetKey(key, member);
s = zset_db_->Get(rocksdb::ReadOptions(), db_key, &old_score);
int64_t count = 0;
if (s.ok()) {
ZIterator *iter = ZScan(key, ZSET_SCORE_MIN, ZSET_SCORE_MAX, -1, true);
for (; iter->Valid() && iter->member().compare(member) != 0; iter->Next()) {
count++;
}
if (iter->member().compare(member) == 0) {
*rank = count;
}
zset_db_->ReleaseSnapshot(iter->read_options().snapshot);
delete iter;
}
return s;
}
ZRevrank
- 功能:返回有序集key中成员member的排名。其中有序集成员按score值递减(从大到小)排序。排名以0为底,也就是说,score值最大的成员排名为0。使用ZRANK命令可以获得成员按score值递增(从小到大)排列的排名。
- 实现:
TODO
ZScore
- 功能:返回有序集key中,成员member的score值。如果member元素不是有序集key的成员,或key不存在,返回nil。
- 实现:
Status Nemo::ZScore(const std::string &key, const std::string &member, double *score) {
Status s;
*score = 0;
std::string str_score;
std::string db_key = EncodeZSetKey(key, member);
s = zset_db_->Get(rocksdb::ReadOptions(), db_key, &str_score);
if (s.ok()) {
*score = *((double *)(str_score.data()));
}
return s;
}
ZRangebylex
- 功能:命令返回存储在键的排序集合在指定的范围元素
- 实现:
TODO
ZLexcount
- 功能:命令在计算有序集合中指定字典区间内成员数量。
- 实现:
TODO
ZRemrangebylex
- 功能:命令删除有序集合存储在由最小和最大指定的字典范围之间的所有键元素。
- 实现:
TODO
ZRemrangebyrank
- 功能:移除有序集 key 中,指定排名(rank)区间内的所有成员。区间分别以下标参数 start 和 stop 指出,包含 start 和 stop 在内。下标参数 start 和 stop 都以0为底,也就是说,以0表示有序集第一个成员,以1表示有序集第二个成员,以此类推。你也可以使用负数下标,以-1表示最后一个成员,-2表示倒数第二个成员,以此类推。
- 实现:
TODO
ZRemrangebyscore
- 功能:移除有序集key中,所有score值介于min和max之间(包括等于min或max)的成员。
- 实现:
TODO