kv

kv数据结构不需要做额外的封装,只需要调用rocksdb提供的接口。

数据格式说明

下面两个表格分别说明KV存储到rocksdb的格式:

Key:

原始key字符串
key(var)

Value:

value字符串
value(var)

代码实现

set

  • 功能:设置key的value
  • 实现:
    • 如果设置设置了ttl,调用PutWithKeyTTL接口
    • 如果没有设置ttl,调用Put接口
Status Nemo::Set(const std::string &key, const std::string &val, const int32_t ttl) {
    Status s;
    if (ttl <= 0) {
        s = kv_db_->Put(rocksdb::WriteOptions(), key, val);
    } else {
        s = kv_db_->PutWithKeyTTL(rocksdb::WriteOptions(), key, val, ttl);
    }
    return s;
}

get

  • 功能:读取key的value
  • 实现:
    • 调用Get接口
Status Nemo::Get(const std::string &key, std::string *val) {
    Status s;
    s = kv_db_->Get(rocksdb::ReadOptions(), key, val);
    return s;
}

mset

  • 功能:批量设置key的value
  • 实现:
    • 调用WriteWithKeyTTL批量写入
      Status Nemo::MSet(const std::vector<KV> &kvs) {
      Status s;
      std::vector<KV>::const_iterator it;
      rocksdb::WriteBatch batch;
      for (it = kvs.begin(); it != kvs.end(); it++) {
          batch.Put(it->key, it->val); 
      }
      s = kv_db_->WriteWithKeyTTL(rocksdb::WriteOptions(), &(batch), 0);
      return s;
      }
      

kmdel

  • 功能:批量删除key
  • 实现:
    • 首先Get,key如果存在,删除
    • WriteWithKeyTTL批量删除
Status Nemo::KMDel(const std::vector<std::string> &keys, int64_t* count) {
    *count = 0;
    Status s;
    std::string val;
    std::vector<std::string>::const_iterator it;
    rocksdb::WriteBatch batch;
    for (it = keys.begin(); it != keys.end(); it++) {
        s = kv_db_->Get(rocksdb::ReadOptions(), *it, &val);
        if (s.ok()) {
            (*count)++;
            batch.Delete(*it); 
        }
    }
    s = kv_db_->WriteWithKeyTTL(rocksdb::WriteOptions(), &(batch));
    return s;
}

mget

  • 功能:批量获取key的值
  • 实现:
    • 每个key分别调用Get
Status Nemo::MGet(const std::vector<std::string> &keys, std::vector<KVS> &kvss) {
    Status s;
    std::vector<std::string>::const_iterator it_key;
    for (it_key = keys.begin(); it_key != keys.end(); it_key++) {
        std::string val("");
        s = kv_db_->Get(rocksdb::ReadOptions(), *it_key, &val);
        kvss.push_back((KVS){*(it_key), val, s});
    }
    return Status::OK();
}

incrby

  • 功能:增加key的value
  • 实现:
    • Get到当前值,计算最新的值
      • 如果存在,需要将当前值转换成数字,然后与参数相加,校验范围,最终得到新的值
      • 如果不存在,最新值就是参数
    • 判断当前key的ttl
      • ttl存在,调用PutWithKeyTTL
      • ttl不存在,调用Put
Status Nemo::Incrby(const std::string &key, const int64_t by, std::string &new_val) {
    Status s;
    std::string val;
    RecordLock l(&mutex_kv_record_, key);
    s = kv_db_->Get(rocksdb::ReadOptions(), key, &val);
    if (s.IsNotFound()) {
        new_val = std::to_string(by);        
    } else if (s.ok()) {
        int64_t ival;
        if (!StrToInt64(val.data(), val.size(), &ival)) {
            return Status::Corruption("value is not a integer");
        }
        if ((by >= 0 && LLONG_MAX - by < ival) || (by < 0 && LLONG_MIN - by > ival)) {
            return Status::InvalidArgument("Overflow");
        }
        new_val = std::to_string(ival + by);
    } else {
        return Status::Corruption("Get error");
    }
    int64_t ttl;
    s = KTTL(key, &ttl);
    if (ttl) {
        s = kv_db_->PutWithKeyTTL(rocksdb::WriteOptions(), key, new_val, (int32_t)ttl);
    } else {
        s = kv_db_->Put(rocksdb::WriteOptions(), key, new_val);
    }
    return s;
}

decrby

  • 功能:
  • 实现:
    • Get到当前值,计算最新的值
      • 如果存在,需要将当前值转换成数字,然后与参数相加,校验范围,最终得到新的值
      • 如果不存在,参数取负就是最新的值
    • 判断当前key的ttl
      • ttl存在,调用PutWithKeyTTL
      • ttl不存在,调用Put
Status Nemo::Decrby(const std::string &key, const int64_t by, std::string &new_val) {
    Status s;
    std::string val;
    RecordLock l(&mutex_kv_record_, key);
    s = kv_db_->Get(rocksdb::ReadOptions(), key, &val);
    if (s.IsNotFound()) {
        new_val = std::to_string(-by);        
    } else if (s.ok()) {
        int64_t ival;
        if (!StrToInt64(val.data(), val.size(), &ival)) {
            return Status::Corruption("value is not a integer");
        }
        if ((by >= 0 && LLONG_MIN + by > ival) || (by < 0 && LLONG_MAX + by < ival)) {
            return Status::InvalidArgument("Overflow");
        }
        new_val = std::to_string(ival - by);
    } else {
        return Status::Corruption("Get error");
    }
    int64_t ttl;
    s = KTTL(key, &ttl);
    if (ttl) {
        s = kv_db_->PutWithKeyTTL(rocksdb::WriteOptions(), key, new_val, (int32_t)ttl);
    } else {
        s = kv_db_->Put(rocksdb::WriteOptions(), key, new_val);
    }
    return s;
}

incrbyfloat

  • 功能:
  • 实现:
    • Get到当前值,计算最新的值
      • 如果存在,需要将当前值转换成数字,然后与参数相加,校验范围,最终得到新的值
      • 如果不存在,最新值就是参数
    • 对float形式做处理:
      • 例如:12.010需要处理成12.01
      • 例如:12.需要处理成12
    • 判断当前key的ttl
      • ttl存在,调用PutWithKeyTTL
      • ttl不存在,调用Put
Status Nemo::Incrbyfloat(const std::string &key, const double by, std::string &new_val) {
    Status s;
    std::string val;
    std::string res;
    RecordLock l(&mutex_kv_record_, key);
    s = kv_db_->Get(rocksdb::ReadOptions(), key, &val);
    if (s.IsNotFound()) {
        res = std::to_string(by);        
    } else if (s.ok()) {
        double dval;
        if (!StrToDouble(val.data(), val.size(), &dval)) {
            return Status::Corruption("value is not a float");
        } 

        dval += by;
        if (isnan(dval) || isinf(dval)) {
            return Status::InvalidArgument("Overflow");
        }
        res = std::to_string(dval);
    } else {
        return Status::Corruption("Get error");
    }
    size_t pos = res.find_last_not_of("0", res.size());
    pos = pos == std::string::npos ? pos : pos+1;
    new_val = res.substr(0, pos); 
    if (new_val[new_val.size()-1] == '.') {
        new_val = new_val.substr(0, new_val.size()-1);
    }
    int64_t ttl;
    s = KTTL(key, &ttl);
    if (ttl) {
        s = kv_db_->PutWithKeyTTL(rocksdb::WriteOptions(), key, new_val, (int32_t)ttl);
    } else {
        s = kv_db_->Put(rocksdb::WriteOptions(), key, new_val);
    }
    return s;
}

getset

  • 功能:
  • 实现:
    • Get当前值
    • Put新值
    • 返回原来的值
Status Nemo::GetSet(const std::string &key, const std::string &new_val, std::string *old_val) {
    Status s;
    std::string val;
    *old_val = "";
    RecordLock l(&mutex_kv_record_, key);
    s = kv_db_->Get(rocksdb::ReadOptions(), key, old_val);
    if (!s.ok() && !s.IsNotFound()) {
        return Status::Corruption("Get error");
    }else {
        s = kv_db_->Put(rocksdb::WriteOptions(), key, new_val);
        return s;
    }
}

append

  • 功能:
  • 实现:
    • Get获取当前值
      • 原来不存在
      • 原来存在
    • 判断ttl
      • 如果有ttl,调用PutWithKeyTTL
      • 如果没有ttl,调用Put
Status Nemo::Append(const std::string &key, const std::string &value, int64_t *new_len) {
    Status s;
    *new_len = 0;
    std::string old_val;
    RecordLock l(&mutex_kv_record_, key);
    s = kv_db_->Get(rocksdb::ReadOptions(), key, &old_val);
    std::string new_val;
    if (s.ok()) {
        new_val = old_val.append(value);
    } else if (s.IsNotFound()) {
        new_val = value;
    } else {
        return s;
    }

    int64_t ttl;
    s = KTTL(key, &ttl);
    if (ttl) {
        s = kv_db_->PutWithKeyTTL(rocksdb::WriteOptions(), key, new_val, (int32_t)ttl);
    } else {
        s = kv_db_->Put(rocksdb::WriteOptions(), key, new_val);
    }
    *new_len = new_val.size();
    return s;
}

setnx

  • 功能:不存在时才设置
  • 实现:
    • Get判断当前key是否存在
      • 不存在
        • 如果没有ttl
        • 如果有ttl
      • 存在直接忽略
Status Nemo::Setnx(const std::string &key, const std::string &value, int64_t *ret, const int32_t ttl) {
    *ret = 0;
    std::string val;
    RecordLock l(&mutex_kv_record_, key);
    Status s = kv_db_->Get(rocksdb::ReadOptions(), key, &val);
    if (s.IsNotFound()) {
        if (ttl <= 0) {
            s = kv_db_->Put(rocksdb::WriteOptions(), key, value);
        } else {
            s = kv_db_->PutWithKeyTTL(rocksdb::WriteOptions(), key, value, ttl);
        }
        *ret = 1;
    }
    return s;
}

setxx

  • 功能:
  • 实现:
Status Nemo::Setxx(const std::string &key, const std::string &value, int64_t *ret, const int32_t ttl) {
    *ret = 0;
    std::string val;
    RecordLock l(&mutex_kv_record_, key);
    Status s = kv_db_->Get(rocksdb::ReadOptions(), key, &val);
    if (s.ok()) {
        if (ttl <= 0) {
            s = kv_db_->Put(rocksdb::WriteOptions(), key, value);
        } else {
            s = kv_db_->PutWithKeyTTL(rocksdb::WriteOptions(), key, value, ttl);
        }
        *ret = 1;
    }
    return s;
}

msetnx

  • 功能:
  • 实现:
Status Nemo::MSetnx(const std::vector<KV> &kvs, int64_t *ret) {
    Status s;
    std::vector<KV>::const_iterator it;
    rocksdb::WriteBatch batch;
    std::string val;
    *ret = 1;
    for (it = kvs.begin(); it != kvs.end(); it++) {
        s = kv_db_->Get(rocksdb::ReadOptions(), it->key, &val);
        if (s.ok()) {
            *ret = 0;
            break;
        }
        batch.Put(it->key, it->val); 
    }
    if (*ret == 1) {
        s = kv_db_->WriteWithKeyTTL(rocksdb::WriteOptions(), &(batch), 0);
    }
    return s;
}

getrange

  • 功能:
  • 实现:
Status Nemo::Getrange(const std::string key, const int64_t start, const int64_t end, std::string &substr) {
    substr = "";
    std::string val;
    Status s = kv_db_->Get(rocksdb::ReadOptions(), key, &val);
    if (s.ok()) {
        int64_t size = val.length();
        int64_t start_t = start >= 0 ? start : size + start;
        int64_t end_t = end >= 0 ? end : size + end;
        if (start_t > size - 1 || (start_t != 0 && start_t > end_t) || (start_t != 0 && end_t < 0)) {
            return Status::OK();
        }
        if (start_t < 0) {
            start_t  = 0;
        }
        if (end_t >= size) {
            end_t = size - 1;
        }
        if (start_t == 0 && end_t < 0) {
            end_t = 0;
        }
        substr = val.substr(start_t, end_t-start_t+1);
    }
    return s;
}

setrange

  • 功能:
  • 实现:
Status Nemo::Setrange(const std::string key, const int64_t offset, const std::string &value, int64_t *len) {
    std::string val;
    std::string new_val;
    if (offset < 0) {
        return Status::Corruption("offset < 0");
    }
    RecordLock l(&mutex_kv_record_, key);
    Status s = kv_db_->Get(rocksdb::ReadOptions(), key, &val);
    if (s.ok()) {
        if (val.length() + offset > (1<<29)) {
            return Status::Corruption("too big");
        }
        if ((size_t)offset > val.length()) {
            val.resize(offset);
            new_val = val.append(value);
        } else {
            std::string head = val.substr(0, offset);
            std::string tail;
            if (offset + value.length() - 1 < val.length() -1 ) {
                tail = val.substr(offset+value.length());
            }
            new_val = head + value + tail;
        }
        *len = new_val.length();
    } else if (s.IsNotFound()) {
        std::string tmp(offset, '\0');
        new_val = tmp.append(value);
        *len = new_val.length();
    }
    int64_t ttl;
    s = KTTL(key, &ttl);
    if (ttl) {
        s = kv_db_->PutWithKeyTTL(rocksdb::WriteOptions(), key, new_val, (int32_t)ttl);
    } else {
        s = kv_db_->Put(rocksdb::WriteOptions(), key, new_val);
    }
    return s;
}

strlen

  • 功能:
  • 实现:
Status Nemo::Strlen(const std::string &key, int64_t *len) {
    Status s;
    std::string val;
    s = Get(key, &val);
    if (s.ok()) {
        *len = val.length();
    } else if (s.IsNotFound()) {
        *len = 0;
    }
    return s;
}

results matching ""

    No results matching ""