PinkConn

重要的虚函数GetRequest,SendReply,如果需要支持不同的协议,只需要继承PinkConn,重载这两个函数即可

class PinkConn
{
public:
  PinkConn(const int fd, const std::string &ip_port);
  virtual ~PinkConn();
  /*
   * Set the fd to nonblock && set the flag_ the the fd flag
   */
  bool SetNonblock();
  int flags() { 
    return flags_; 
  };
  virtual ReadStatus GetRequest() = 0;
  virtual WriteStatus SendReply() = 0;
  void set_fd(const int fd) { 
    fd_ = fd; 
  }
  int fd() const {
    return fd_;
  }
  std::string ip_port() const {
    return ip_port_;
  }
  void set_is_reply(const bool is_reply) {
    is_reply_ = is_reply;
  }
  bool is_reply() const {
    return is_reply_;
  }
  void set_last_interaction(const struct timeval &now) {
    last_interaction_ = now;
  }
  struct timeval last_interaction() const {
    return last_interaction_;
  };
private:
  int fd_;
  std::string ip_port_;
  bool is_reply_;
  struct timeval last_interaction_;
  int flags_;
  /*
   * No allowed copy and copy assign operator
   */
  PinkConn(const PinkConn&);
  void operator=(const PinkConn&);
};

RedisConn

说明

支持redis协议的连接,redis消息都会有头标识,消息行还有就行里可能还有个数据块大小描述.首先Redis是以行来划分,每行以\r\n行结束。每一行都有一个消息头,消息头共分为5种分别如下:

  • (+) 表示一个正确的状态信息,具体信息是当前行+后面的字符
  • (-) 表示一个错误信息,具体信息是当前行-后面的字符
  • (*) 表示消息体总共有多少行,不包括当前行,*后面是具体的行数
  • ($) 表示下一行数据长度,不包括换行符长度\r\n,$后面则是对应的长度的数据
  • (:) 表示返回一个数值,:后面是相应的数字节符

    typedef std::vector<std::string> RedisCmdArgsType;
    class RedisConn: public PinkConn
    {
    public:
    RedisConn(const int fd, const std::string &ip_port);
    virtual ~RedisConn();
    void ResetClient();
    
    bool ExpandWbuf();
    uint32_t wbuf_size_;
    
    virtual ReadStatus GetRequest();
    virtual WriteStatus SendReply();
    virtual int DealMessage() = 0;
    ConnStatus connStatus_;
    private:
    int32_t last_read_pos_;
    int32_t next_parse_pos_;
    int32_t req_type_;
    int32_t multibulk_len_;
    int32_t bulk_len_;
    bool is_find_sep_;
    bool is_overtake_;
    /*
     * The Variable need by read the buf,
     * We allocate the memory when we start the server
     */
    char* rbuf_;
    uint32_t wbuf_pos_;
    
    ReadStatus ProcessInputBuffer();
    ReadStatus ProcessMultibulkBuffer();
    ReadStatus ProcessInlineBuffer();
    int32_t FindNextSeparators();
    int32_t GetNextNum(int32_t pos, int32_t *value);
    protected:
    char* wbuf_;
    uint32_t wbuf_len_;
    RedisCmdArgsType argv_;
    };
    

实现

ReadStatus RedisConn::GetRequest()
{
  ssize_t nread = 0;
  int32_t next_read_pos = (last_read_pos_ + 1) % REDIS_MAX_MESSAGE;
  int32_t read_len = 0;
  if (next_read_pos == next_parse_pos_ && !is_find_sep_) {
    //too big message, close client;
    //err_msg_ = "-ERR: Protocol error: too big mbulk count string\r\n";  
    return kParseError;
  } else if (next_read_pos >= next_parse_pos_) {
    read_len = REDIS_IOBUF_LEN < (REDIS_MAX_MESSAGE - next_read_pos) ? REDIS_IOBUF_LEN : (REDIS_MAX_MESSAGE - next_read_pos);
  } else if (next_read_pos < next_parse_pos_) {
    read_len = next_parse_pos_ - next_read_pos;
  } 

  nread = read(fd(), rbuf_ + next_read_pos, read_len);
  if (nread == -1) {
    if (errno == EAGAIN) {
      nread = 0;
      return kReadHalf; //HALF
    } else {
      // error happened, close client
      return kReadError;
    }
  } else if (nread == 0) {
    // client closed, close client
    return kReadClose;
  }

  if (nread) {
    last_read_pos_ = (last_read_pos_ + nread) % REDIS_MAX_MESSAGE;
    is_overtake_ = false;
  }
  ReadStatus ret = ProcessInputBuffer();
  if (ret == kFullError/*FULL_ERROR*/) {
    is_find_sep_ = false;
  }
  return ret; //OK || HALF || FULL_ERROR || PARSE_ERROR
}
ReadStatus RedisConn::ProcessInputBuffer() {
  ReadStatus ret;
  while (!is_overtake_) {
    if (!req_type_) {
      if (rbuf_[next_parse_pos_] == '*') {
        req_type_ = REDIS_REQ_MULTIBULK;
      } else {
        req_type_ = REDIS_REQ_INLINE;
      }
    }

    if (req_type_ == REDIS_REQ_INLINE) {
      ret = ProcessInlineBuffer();
      if (ret != kReadAll) {
        return ret;
      }
    } else if (req_type_ == REDIS_REQ_MULTIBULK) {
      ret = ProcessMultibulkBuffer();
      if (ret != kReadAll/*OK*/) { //FULL_ERROR || HALF || PARSE_ERROR
        return ret;
      }
    } else {
      //Unknown requeset type;
      return kParseError;
    }

    if (argv_.size() == 0) {
      ResetClient();
    } else {
      DealMessage(); 
    }
  }
  req_type_ = 0;
  next_parse_pos_ = 0;
  last_read_pos_ = -1;
  return kReadAll;/*OK*/
}

PbConn

说明

支持protobuf协议的连接

class PbConn: public PinkConn
{
public:
  PbConn(const int fd, const std::string &ip_port);
  ~PbConn();
  void InitPara();
  ReadStatus GetRequest();
  WriteStatus SendReply();
  virtual int DealMessage() = 0;
  /*
   * The Variable need by read the buf,
   * We allocate the memory when we start the server
   */
  uint32_t header_len_;
  char* rbuf_;
  uint32_t cur_pos_;
  uint32_t rbuf_len_;
  ConnStatus connStatus_;
  google::protobuf::Message *res_;
  char* wbuf_;
  uint32_t wbuf_len_;
  uint32_t wbuf_pos_;
private:
  virtual Status BuildObuf();
};

实现

// Msg is [ length(COMMAND_HEADER_LENGTH) | body(length bytes) ]
//   step 1. kHeader, we read COMMAND_HEADER_LENGTH bytes;
//   step 2. kPacket, we read header_len bytes;
ReadStatus PbConn::GetRequest()
{
  // TODO  cur_pos_ can be omitted
  while (true) {
    switch (connStatus_) {
      case kHeader: {
        ssize_t nread = read(fd(), rbuf_ + rbuf_len_, COMMAND_HEADER_LENGTH - rbuf_len_);
        if (nread == -1) {
          if (errno == EAGAIN) {
            return kReadHalf;
          } else {
            return kReadError;
          }
        } else if (nread == 0) {
          return kReadClose;
        } else {
          rbuf_len_ += nread;
          if (rbuf_len_ - cur_pos_ == COMMAND_HEADER_LENGTH) {
            uint32_t integer = 0;
            memcpy((char *)(&integer), rbuf_ + cur_pos_, sizeof(uint32_t));
            header_len_ = ntohl(integer);
            remain_packet_len_ = header_len_;
            cur_pos_ += COMMAND_HEADER_LENGTH;
            connStatus_ = kPacket;
          }
          log_info ("GetRequest kHeader header_len=%u cur_pos=%u rbuf_len=%u remain_packet_len_=%d nread=%d\n", header_len_, cur_pos_, rbuf_len_, remain_packet_len_, nread);
        }
        break;
      }
      case kPacket: {
        if (header_len_ >= PB_MAX_MESSAGE - COMMAND_HEADER_LENGTH) {
          return kFullError;
        } else {
          // read msg body
          ssize_t nread = read(fd(), rbuf_ + rbuf_len_, remain_packet_len_);
          if (nread == -1) {
            if (errno == EAGAIN) {
              return kReadHalf;
            } else {
              return kReadError;
            }
          } else if (nread == 0) {
            return kReadClose;
          }
          rbuf_len_ += nread;
          remain_packet_len_ -= nread;
          if (remain_packet_len_ == 0) {
            cur_pos_ = rbuf_len_;
            connStatus_ = kComplete;
          }
          log_info ("GetRequest kPacket header_len=%u cur_pos=%u rbuf_len=%u remain_packet_len_=%d nread=%d\n", header_len_, cur_pos_, rbuf_len_, remain_packet_len_, nread);
        }
        break;
      }
      case kComplete: {
        DealMessage();
        connStatus_ = kHeader;
        cur_pos_ = 0;
        rbuf_len_ = 0;
        return kReadAll;
      }
      // Add this switch case just for delete compile warning
      case kBuildObuf:
        break;
      case kWriteObuf:
        break;
    }
  }
  return kReadHalf;
}

WriteStatus PbConn::SendReply()
{
  BuildObuf();
  ssize_t nwritten = 0;
  while (wbuf_len_ > 0) {
    nwritten = write(fd(), wbuf_ + wbuf_pos_, wbuf_len_ - wbuf_pos_);
    if (nwritten <= 0) {
      break;
    }
    wbuf_pos_ += nwritten;
    if (wbuf_pos_ == wbuf_len_) {
      wbuf_len_ = 0;
      wbuf_pos_ = 0;
    }
  }
  if (nwritten == -1) {
    if (errno == EAGAIN) {
      return kWriteHalf;
    } else {
      // Here we should close the connection
      return kWriteError;
    }
  }
  if (wbuf_len_ == 0) {
    return kWriteAll;
  } else {
    return kWriteHalf;
  }

results matching ""

    No results matching ""