PinkConn
重要的虚函数GetRequest
,SendReply
,如果需要支持不同的协议,只需要继承PinkConn
,重载这两个函数即可
class PinkConn
{
public:
PinkConn(const int fd, const std::string &ip_port);
virtual ~PinkConn();
/*
* Set the fd to nonblock && set the flag_ the the fd flag
*/
bool SetNonblock();
int flags() {
return flags_;
};
virtual ReadStatus GetRequest() = 0;
virtual WriteStatus SendReply() = 0;
void set_fd(const int fd) {
fd_ = fd;
}
int fd() const {
return fd_;
}
std::string ip_port() const {
return ip_port_;
}
void set_is_reply(const bool is_reply) {
is_reply_ = is_reply;
}
bool is_reply() const {
return is_reply_;
}
void set_last_interaction(const struct timeval &now) {
last_interaction_ = now;
}
struct timeval last_interaction() const {
return last_interaction_;
};
private:
int fd_;
std::string ip_port_;
bool is_reply_;
struct timeval last_interaction_;
int flags_;
/*
* No allowed copy and copy assign operator
*/
PinkConn(const PinkConn&);
void operator=(const PinkConn&);
};
RedisConn
说明
支持redis协议的连接,redis消息都会有头标识,消息行还有就行里可能还有个数据块大小描述.首先Redis是以行来划分,每行以\r\n行结束。每一行都有一个消息头,消息头共分为5种分别如下:
- (+) 表示一个正确的状态信息,具体信息是当前行+后面的字符
- (-) 表示一个错误信息,具体信息是当前行-后面的字符
- (*) 表示消息体总共有多少行,不包括当前行,*后面是具体的行数
- ($) 表示下一行数据长度,不包括换行符长度\r\n,$后面则是对应的长度的数据
(:) 表示返回一个数值,:后面是相应的数字节符
typedef std::vector<std::string> RedisCmdArgsType; class RedisConn: public PinkConn { public: RedisConn(const int fd, const std::string &ip_port); virtual ~RedisConn(); void ResetClient(); bool ExpandWbuf(); uint32_t wbuf_size_; virtual ReadStatus GetRequest(); virtual WriteStatus SendReply(); virtual int DealMessage() = 0; ConnStatus connStatus_; private: int32_t last_read_pos_; int32_t next_parse_pos_; int32_t req_type_; int32_t multibulk_len_; int32_t bulk_len_; bool is_find_sep_; bool is_overtake_; /* * The Variable need by read the buf, * We allocate the memory when we start the server */ char* rbuf_; uint32_t wbuf_pos_; ReadStatus ProcessInputBuffer(); ReadStatus ProcessMultibulkBuffer(); ReadStatus ProcessInlineBuffer(); int32_t FindNextSeparators(); int32_t GetNextNum(int32_t pos, int32_t *value); protected: char* wbuf_; uint32_t wbuf_len_; RedisCmdArgsType argv_; };
实现
ReadStatus RedisConn::GetRequest()
{
ssize_t nread = 0;
int32_t next_read_pos = (last_read_pos_ + 1) % REDIS_MAX_MESSAGE;
int32_t read_len = 0;
if (next_read_pos == next_parse_pos_ && !is_find_sep_) {
//too big message, close client;
//err_msg_ = "-ERR: Protocol error: too big mbulk count string\r\n";
return kParseError;
} else if (next_read_pos >= next_parse_pos_) {
read_len = REDIS_IOBUF_LEN < (REDIS_MAX_MESSAGE - next_read_pos) ? REDIS_IOBUF_LEN : (REDIS_MAX_MESSAGE - next_read_pos);
} else if (next_read_pos < next_parse_pos_) {
read_len = next_parse_pos_ - next_read_pos;
}
nread = read(fd(), rbuf_ + next_read_pos, read_len);
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
return kReadHalf; //HALF
} else {
// error happened, close client
return kReadError;
}
} else if (nread == 0) {
// client closed, close client
return kReadClose;
}
if (nread) {
last_read_pos_ = (last_read_pos_ + nread) % REDIS_MAX_MESSAGE;
is_overtake_ = false;
}
ReadStatus ret = ProcessInputBuffer();
if (ret == kFullError/*FULL_ERROR*/) {
is_find_sep_ = false;
}
return ret; //OK || HALF || FULL_ERROR || PARSE_ERROR
}
ReadStatus RedisConn::ProcessInputBuffer() {
ReadStatus ret;
while (!is_overtake_) {
if (!req_type_) {
if (rbuf_[next_parse_pos_] == '*') {
req_type_ = REDIS_REQ_MULTIBULK;
} else {
req_type_ = REDIS_REQ_INLINE;
}
}
if (req_type_ == REDIS_REQ_INLINE) {
ret = ProcessInlineBuffer();
if (ret != kReadAll) {
return ret;
}
} else if (req_type_ == REDIS_REQ_MULTIBULK) {
ret = ProcessMultibulkBuffer();
if (ret != kReadAll/*OK*/) { //FULL_ERROR || HALF || PARSE_ERROR
return ret;
}
} else {
//Unknown requeset type;
return kParseError;
}
if (argv_.size() == 0) {
ResetClient();
} else {
DealMessage();
}
}
req_type_ = 0;
next_parse_pos_ = 0;
last_read_pos_ = -1;
return kReadAll;/*OK*/
}
PbConn
说明
支持protobuf协议的连接
class PbConn: public PinkConn
{
public:
PbConn(const int fd, const std::string &ip_port);
~PbConn();
void InitPara();
ReadStatus GetRequest();
WriteStatus SendReply();
virtual int DealMessage() = 0;
/*
* The Variable need by read the buf,
* We allocate the memory when we start the server
*/
uint32_t header_len_;
char* rbuf_;
uint32_t cur_pos_;
uint32_t rbuf_len_;
ConnStatus connStatus_;
google::protobuf::Message *res_;
char* wbuf_;
uint32_t wbuf_len_;
uint32_t wbuf_pos_;
private:
virtual Status BuildObuf();
};
实现
// Msg is [ length(COMMAND_HEADER_LENGTH) | body(length bytes) ]
// step 1. kHeader, we read COMMAND_HEADER_LENGTH bytes;
// step 2. kPacket, we read header_len bytes;
ReadStatus PbConn::GetRequest()
{
// TODO cur_pos_ can be omitted
while (true) {
switch (connStatus_) {
case kHeader: {
ssize_t nread = read(fd(), rbuf_ + rbuf_len_, COMMAND_HEADER_LENGTH - rbuf_len_);
if (nread == -1) {
if (errno == EAGAIN) {
return kReadHalf;
} else {
return kReadError;
}
} else if (nread == 0) {
return kReadClose;
} else {
rbuf_len_ += nread;
if (rbuf_len_ - cur_pos_ == COMMAND_HEADER_LENGTH) {
uint32_t integer = 0;
memcpy((char *)(&integer), rbuf_ + cur_pos_, sizeof(uint32_t));
header_len_ = ntohl(integer);
remain_packet_len_ = header_len_;
cur_pos_ += COMMAND_HEADER_LENGTH;
connStatus_ = kPacket;
}
log_info ("GetRequest kHeader header_len=%u cur_pos=%u rbuf_len=%u remain_packet_len_=%d nread=%d\n", header_len_, cur_pos_, rbuf_len_, remain_packet_len_, nread);
}
break;
}
case kPacket: {
if (header_len_ >= PB_MAX_MESSAGE - COMMAND_HEADER_LENGTH) {
return kFullError;
} else {
// read msg body
ssize_t nread = read(fd(), rbuf_ + rbuf_len_, remain_packet_len_);
if (nread == -1) {
if (errno == EAGAIN) {
return kReadHalf;
} else {
return kReadError;
}
} else if (nread == 0) {
return kReadClose;
}
rbuf_len_ += nread;
remain_packet_len_ -= nread;
if (remain_packet_len_ == 0) {
cur_pos_ = rbuf_len_;
connStatus_ = kComplete;
}
log_info ("GetRequest kPacket header_len=%u cur_pos=%u rbuf_len=%u remain_packet_len_=%d nread=%d\n", header_len_, cur_pos_, rbuf_len_, remain_packet_len_, nread);
}
break;
}
case kComplete: {
DealMessage();
connStatus_ = kHeader;
cur_pos_ = 0;
rbuf_len_ = 0;
return kReadAll;
}
// Add this switch case just for delete compile warning
case kBuildObuf:
break;
case kWriteObuf:
break;
}
}
return kReadHalf;
}
WriteStatus PbConn::SendReply()
{
BuildObuf();
ssize_t nwritten = 0;
while (wbuf_len_ > 0) {
nwritten = write(fd(), wbuf_ + wbuf_pos_, wbuf_len_ - wbuf_pos_);
if (nwritten <= 0) {
break;
}
wbuf_pos_ += nwritten;
if (wbuf_pos_ == wbuf_len_) {
wbuf_len_ = 0;
wbuf_pos_ = 0;
}
}
if (nwritten == -1) {
if (errno == EAGAIN) {
return kWriteHalf;
} else {
// Here we should close the connection
return kWriteError;
}
}
if (wbuf_len_ == 0) {
return kWriteAll;
} else {
return kWriteHalf;
}