set
set
实际上就是hash
的简化,实现起来基本上也是一样的,set
中的member
和hash
中的field
基本上一样,唯一不同的是set
中的member
并不会存字符串值。
数据格式
下面四个表格分别:
meta |
原始key字符串 |
S(1) |
key(var) |
meta |
key长度 |
key字符串 |
member |
s(1) |
key size(1) |
key(var) |
member key(var) |
举例
sadd test member1
sadd test member2
meta |
原始key字符串 |
S(1) |
key(var) |
S |
test |
meta |
key长度 |
key字符串 |
member |
s(1) |
key size(1) |
key(var) |
member key(var) |
s |
4 |
test |
member1 |
s |
4 |
test |
member2 |
meta |
原始key字符串 |
S(1) |
key(var) |
inline std::string EncodeSSizeKey(const rocksdb::Slice &key) {
std::string buf;
buf.append(1, DataType::kSSize);
buf.append(key.data(), key.size());
return buf;
}
inline int DecodeSSizeKey(const rocksdb::Slice &slice, std::string *size) {
Decoder decoder(slice.data(), slice.size());
if (decoder.Skip(1) == -1) {
return -1;
}
if (decoder.ReadData(size) == -1) {
return -1;
}
return 0;
}
typedef DefaultMeta SetMeta;
struct DefaultMeta : public NemoMeta {
int64_t len;
DefaultMeta() : len(0) {}
explicit DefaultMeta(int64_t _len):len(_len) {}
virtual bool DecodeFrom(const std::string& raw_meta) {
if (raw_meta.size() != sizeof(uint64_t)) {
return false;
}
len = *(int64_t *)raw_meta.data();
return true;
}
virtual bool EncodeTo(std::string& raw_meta) {
raw_meta.clear();
raw_meta.append((char *)&len, sizeof(int64_t));
return true;
}
virtual std::string ToString() {
char buf[32];
std::string res("Len : ");
Int64ToStr(buf, 32, len);
res.append(buf);
return res;
}
};
Set member key
meta |
key长度 |
key字符串 |
member |
s(1) |
key size(1) |
key(var) |
member key(var) |
inline std::string EncodeSetKey(const rocksdb::Slice &key, const rocksdb::Slice &member) {
std::string buf;
buf.append(1, DataType::kSet);
buf.append(1, (uint8_t)key.size());
buf.append(key.data(), key.size());
buf.append(member.data(), member.size());
return buf;
}
inline int DecodeSetKey(const rocksdb::Slice &slice, std::string *key, std::string *member) {
Decoder decoder(slice.data(), slice.size());
if (decoder.Skip(1) == -1) {
return -1;
}
if (decoder.ReadLenData(key) == -1) {
return -1;
}
if (decoder.ReadData(member) == -1) {
return -1;
}
return 0;
}
Set member value
代码实现
SAdd
- 功能:将一个member元素加入到集合key当中,已经存在于集合的member元素将被忽略。
- 实现:
- 获取行锁
- 根据传入的key,编码出
set member key
,调用Get
获取到set member value
- 如果
set member key
不存在
set meta value
中代表member数量增加1
- 如果
set member key
存在,什么都不做
int Nemo::IncrSSize(const std::string &key, int64_t incr, rocksdb::WriteBatch &writebatch) {
int64_t len = SCard(key);
if (len == -1) {
return -1;
}
std::string size_key = EncodeSSizeKey(key);
len += incr;
writebatch.Put(size_key, rocksdb::Slice((char *)&len, sizeof(int64_t)));
return 0;
}
Status Nemo::SAdd(const std::string &key, const std::string &member, int64_t *res) {
if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
return Status::InvalidArgument("Invalid key length");
}
Status s;
RecordLock l(&mutex_set_record_, key);
rocksdb::WriteBatch writebatch;
std::string set_key = EncodeSetKey(key, member);
std::string val;
s = set_db_->Get(rocksdb::ReadOptions(), set_key, &val);
if (s.IsNotFound()) { // not found
*res = 1;
if (IncrSSize(key, 1, writebatch) < 0) {
return Status::Corruption("incrSSize error");
}
writebatch.Put(set_key, rocksdb::Slice());
} else if (s.ok()) {
*res = 0;
} else {
return Status::Corruption("sadd check member error");
}
s = set_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &(writebatch));
return s;
}
SRem
- 功能:移除集合key中的一个member元素,不存在的member元素会被忽略
- 实现:
- 获取行锁
- 用key和member编码出
set memeber key
,调用Get
- 如果存在,设置返回结果为1
- 修改
set meta value
,member数量减1
- 如果不存在,设置返回结果为0
Status Nemo::SRem(const std::string &key, const std::string &member, int64_t *res) {
if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
return Status::InvalidArgument("Invalid key length");
}
Status s;
RecordLock l(&mutex_set_record_, key);
rocksdb::WriteBatch writebatch;
std::string set_key = EncodeSetKey(key, member);
std::string val;
s = set_db_->Get(rocksdb::ReadOptions(), set_key, &val);
if (s.ok()) {
*res = 1;
if (IncrSSize(key, -1, writebatch) < 0) {
return Status::Corruption("incrSSize error");
}
writebatch.Delete(set_key);
s = set_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &(writebatch));
} else if (s.IsNotFound()) {
*res = 0;
} else {
return Status::Corruption("srem check member error");
}
return s;
}
SCard
- 功能:返回有序集key的基数。
- 实现:
- 根据传入的key编码出
set meta key
,调用Get
接口获取到set meta value
- 如果
set meta key
存在
- 如果
set meta key
不存在
int64_t Nemo::SCard(const std::string &key) {
std::string size_key = EncodeSSizeKey(key);
std::string val;
Status s;
s = set_db_->Get(rocksdb::ReadOptions(), size_key, &val);
if (s.IsNotFound()) {
return 0;
} else if(!s.ok()) {
return -1;
} else {
if (val.size() != sizeof(int64_t)) {
return 0;
}
int64_t ret = *(int64_t *)val.data();
return ret < 0 ? 0 : ret;
}
}
SMembers
- 功能:返回集合key中的所有成员。不存在的key被视为空集合。
- 实现:
SIterator* Nemo::SScan(const std::string &key, uint64_t limit, bool use_snapshot) {
std::string set_key = EncodeSetKey(key, "");
rocksdb::ReadOptions read_options;
if (use_snapshot) {
read_options.snapshot = set_db_->GetSnapshot();
}
read_options.fill_cache = false;
rocksdb::Iterator *it = set_db_->NewIterator(read_options);
it->Seek(set_key);
IteratorOptions iter_options("", limit, read_options);
return new SIterator(it, iter_options, key);
}
Status Nemo::SMembers(const std::string &key, std::vector<std::string> &members) {
SIterator *iter = SScan(key, -1, true);
for (; iter->Valid(); iter->Next()) {
members.push_back(iter->member());
}
set_db_->ReleaseSnapshot(iter->read_options().snapshot);
delete iter;
return Status::OK();
}
SUnionStore
- 功能:这个命令类似于SUNION命令,但它将结果保存到destination集合,而不是简单地返回结果集。如果destination已经存在,则将其覆盖。
- 实现:
- 遍历keys集合
- 对每个key遍历所有member,加入临时map中
- 判断目的destination是否存在
- 不存在忽略
- 存在的话,遍历这个key所有的member,执行删除操作,同时修改
set meta value
- 遍历临时map中结果,分别添加到destination中
Status Nemo::SRemNoLock(const std::string &key, const std::string &member, int64_t *res) {
Status s;
rocksdb::WriteBatch writebatch;
std::string set_key = EncodeSetKey(key, member);
std::string val;
s = set_db_->Get(rocksdb::ReadOptions(), set_key, &val);
if (s.ok()) {
*res = 1;
if (IncrSSize(key, -1, writebatch) < 0) {
return Status::Corruption("incrSSize error");
}
writebatch.Delete(set_key);
s = set_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &(writebatch));
} else if (s.IsNotFound()) {
*res = 0;
} else {
return Status::Corruption("srem check member error");
}
return s;
}
Status Nemo::SUnionStore(const std::string &destination, const std::vector<std::string> &keys, int64_t *res) {
int numkey = keys.size();
if (numkey <= 0) {
return Status::InvalidArgument("invalid parameter, no keys");
}
std::map<std::string, int> member_result;
std::map<std::string, int>::iterator it;
for (int i = 0; i < numkey; i++) {
RecordLock l(&mutex_set_record_, keys[i]);
SIterator *iter = SScan(keys[i], -1, true);
for (; iter->Valid(); iter->Next()) {
member_result[iter->member()] = 1;
}
set_db_->ReleaseSnapshot(iter->read_options().snapshot);
delete iter;
}
// we delete the destination if it exists
Status s;
int64_t tmp_res;
RecordLock l(&mutex_set_record_, destination);
if (SCard(destination) > 0) {
SIterator *iter = SScan(destination, -1, true);
for (; iter->Valid(); iter->Next()) {
s = SRemNoLock(destination, iter->member(), &tmp_res);
if (!s.ok()) {
delete iter;
return s;
}
}
set_db_->ReleaseSnapshot(iter->read_options().snapshot);
delete iter;
}
for (it = member_result.begin(); it != member_result.end(); it++) {
s = SAddNoLock(destination, it->first, &tmp_res);
if (!s.ok()) {
return s;
}
}
*res = member_result.size();
return Status::OK();
}
SUnion
- 功能:返回一个集合的全部成员,该集合是所有给定集合的并集。不存在的key被视为空集。
- 实现
TODO
SInterStore
- 功能:这个命令类似于SINTER命令,但它将结果保存到destination集合,而不是简单地返回结果集。如果 destination 集合已经存在,则将其覆盖。
- 实现
TODO
SInter
- 功能:返回一个集合的全部成员,该集合是所有给定集合的交集。不存在的 key 被视为空集。
- 实现
TODO
SDiffStore
- 功能:这个命令的作用和SDIFF类似,但它将结果保存到destination集合,而不是简单地返回结果集。如果destination集合已经存在,则将其覆盖。
- 实现
TODO
SDiff
- 功能:返回一个集合的全部成员,该集合是所有给定集合之间的差集。不存在的key被视为空集。
- 实现
TODO
SPop
- 功能:移除并返回集合中的一个随机元素。
- 实现:
- 获取行锁
- 生成随机数k
- 获取迭代器,迭代k次,得到第k个member,删除迭代器
- 判断执行spop次数和耗时,满足一定条件需要Compact
- 删除member元素,修改
set meta value
Status Nemo::SRemNoLock(const std::string &key, const std::string &member, int64_t *res) {
Status s;
rocksdb::WriteBatch writebatch;
std::string set_key = EncodeSetKey(key, member);
std::string val;
s = set_db_->Get(rocksdb::ReadOptions(), set_key, &val);
if (s.ok()) {
*res = 1;
if (IncrSSize(key, -1, writebatch) < 0) {
return Status::Corruption("incrSSize error");
}
writebatch.Delete(set_key);
s = set_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &(writebatch));
} else if (s.IsNotFound()) {
*res = 0;
} else {
return Status::Corruption("srem check member error");
}
return s;
}
Status Nemo::SPop(const std::string &key, std::string &member) {
int card = SCard(key);
if (card <= 0) {
return Status::NotFound();
}
uint64_t start_us = NowMicros(), duration_us;
srand (start_us);
int k = rand() % (card < 100 ? card : 100 ) + 1;
RecordLock l(&mutex_set_record_, key);
SIterator *iter = SScan(key, -1, true);
for (int i = 0; i < k - 1; i++) {
iter->Next();
}
member = iter->member();
set_db_->ReleaseSnapshot(iter->read_options().snapshot);
delete iter;
duration_us = NowMicros() - start_us;
int64_t spop_count = AddAndGetSpopCount(key);
//执行超过一定删除次数
if (spop_count >= SPOP_COMPACT_THRESHOLD_COUNT) {
//执行Compact操作
//范围确定举例:bcc,转换为cdd
AddBGTask({DBType::kSET_DB, OPERATION::kDEL_KEY, key, ""});
ResetSpopCount(key);
//耗时比较长
} else if (duration_us > 1000000UL) {
//执行Compact操作
AddBGTask({DBType::kSET_DB, OPERATION::kDEL_KEY, key, ""});
}
int64_t res;
return SRemNoLock(key, member, &res);
}
SRandMember
- 功能:如果命令执行时,只提供了key参数,那么返回集合中的一个随机元素。
- 实现
TODO
SMove
- 功能:将member元素从source集合移动到destination集合。如果source集合不存在或不包含指定的member元素,则SMOVE命令不执行任何操作,仅返回0 。否则,member元素从source集合中被移除,并添加到destination集合中去。当destination集合已经包含member元素时,SMOVE命令只是简单地将source集合中的member元素删除。
- 实现
TODO