Worker Thread
初始化
创建一对pipe fd,一个用来读,一个用来写,主要用来于DispatchThread
通信,当DispatchThread
接收到新的socket,需要将这些socket分配给work thread,由work thread来处理这些socket山的事件,充分利用多核。
template <typename Conn>
class WorkerThread : public Thread
{
public:
WorkerThread(int cron_interval = 0):
Thread::Thread(cron_interval)
{
/*
* install the protobuf handler here
*/
log_info("WorkerThread construct");
pthread_rwlock_init(&rwlock_, NULL);
pink_epoll_ = new PinkEpoll();
int fds[2];
if (pipe(fds)) {
// LOG(FATAL) << "Can't create notify pipe";
log_err("Can't create notify pipe");
}
notify_receive_fd_ = fds[0];
notify_send_fd_ = fds[1];
pink_epoll_->PinkAddEvent(notify_receive_fd_, EPOLLIN | EPOLLERR | EPOLLHUP);
}
virtual ~WorkerThread() {
should_exit_ = true;
pthread_join(thread_id(), NULL);
delete(pink_epoll_);
}
virtual void CronHandle() {
}
/*
* The PbItem queue is the fd queue, receive from dispatch thread
*/
std::queue<PinkItem> conn_queue_;
int notify_receive_fd() {
return notify_receive_fd_;
}
int notify_send_fd() {
return notify_send_fd_;
}
PinkEpoll* pink_epoll() {
return pink_epoll_;
}
Mutex mutex_;
pthread_rwlock_t rwlock_;
std::map<int, void *> conns_;
private:
/*
* These two fd receive the notify from dispatch thread
*/
int notify_receive_fd_;
int notify_send_fd_;
/*
* The epoll handler
*/
PinkEpoll *pink_epoll_;
// clean conns
void cleanup()
{
RWLock l(&rwlock_, true);
Conn *in_conn;
std::map<int, void *>::iterator iter = conns_.begin();
for (; iter != conns_.end(); iter++) {
close(iter->first);
in_conn = static_cast<Conn *>(iter->second);
delete in_conn;
}
}
};
工作线程
- 当事件是由
notify_receive_fd_
这个socket触发EPOLLIN
事件- 从
notify_receive_fd_
读出1个字节,保证不会重复触发可读事件 - 从队列中取出
PinkItem
对象 - 创建
Conn
指针,设置非阻塞,放到变量conns_
这个map中,key是socket的fd
,value是Conn
指针 - 给这个fd添加
EPOLLIN
选项,创建事件变量,添加到EventLoop中,标识需要监控这个fd上的读事件
- 从
事件不是由
notify_receive_fd_
触发EPOLLIN
事件GetRequest
完成socket上数据的读取并判断状态,是否出错,如果出错直接关闭连接,只有kReadAll
,kReadHalf
这两个状态正常- 更新socket的最后活跃时间
- 调用
is_reply
判断是否有数据需要发送,如果有数据需要发送,需要对当前socket增加EPOLLOUT
选项,重新加入EventLoop
中
EPOLLOUT
事件SendReply
判断是否发送完kWriteAll
- 设置不需要再发送数据标志位
- 给当前socket增加
EPOLLIN
选项,重新加入EventLoop
中
kWriteHalf
- 下次继续发送
kWriteError
- 关闭socket
virtual void *ThreadMain() { int nfds; PinkFiredEvent *pfe = NULL; char bb[1]; PinkItem ti; Conn *in_conn = NULL; struct timeval when; gettimeofday(&when, NULL); struct timeval now = when; when.tv_sec += (cron_interval_ / 1000); when.tv_usec += ((cron_interval_ % 1000 ) * 1000); int timeout = cron_interval_; if (timeout <= 0) { timeout = PINK_CRON_INTERVAL; } while (!should_exit_) { if (cron_interval_ > 0 ) { gettimeofday(&now, NULL); if (when.tv_sec > now.tv_sec || (when.tv_sec == now.tv_sec && when.tv_usec > now.tv_usec)) { timeout = (when.tv_sec - now.tv_sec) * 1000 + (when.tv_usec - now.tv_usec) / 1000; } else { when.tv_sec = now.tv_sec + (cron_interval_ / 1000); when.tv_usec = now.tv_usec + ((cron_interval_ % 1000 ) * 1000); CronHandle(); timeout = cron_interval_; } } nfds = pink_epoll_->PinkPoll(timeout); for (int i = 0; i < nfds; i++) { pfe = (pink_epoll_->firedevent()) + i; log_info("pfe->fd_ %d pfe->mask_ %d", pfe->fd_, pfe->mask_); //判断是否是管道fd if (pfe->fd_ == notify_receive_fd_) { //管道可读事件,说明DispatchThread写入了数据,有新的连接需要处理 if (pfe->mask_ & EPOLLIN) { //读出一个字节数据 read(notify_receive_fd_, bb, 1); //连接队列中取出连接ti { MutexLock l(&mutex_); ti = conn_queue_.front(); conn_queue_.pop(); } //创建Conn Conn *tc = new Conn(ti.fd(), ti.ip_port(), this); //修改为非阻塞 tc->SetNonblock(); //保存fd->Conn指针映射关系 { RWLock l(&rwlock_, true); conns_[ti.fd()] = tc; } //监控这个client socket上的读事件 pink_epoll_->PinkAddEvent(ti.fd(), EPOLLIN); log_info("receive one fd %d", ti.fd()); } else { continue; } } else { //不是管道fd,则一定是client socket上的事件 in_conn = NULL; int should_close = 0; std::map<int, void *>::iterator iter = conns_.begin(); if (pfe == NULL) { continue; } //通过fd查Conn指针,一般来说应该是可以找到的,找不到直接关闭fd iter = conns_.find(pfe->fd_); if (iter == conns_.end()) { pink_epoll_->PinkDelEvent(pfe->fd_); continue; } //是否是读事件 if (pfe->mask_ & EPOLLIN) { in_conn = static_cast<Conn *>(iter->second); //读取数据,读完之后判断状态 ReadStatus getRes = in_conn->GetRequest(); in_conn->set_last_interaction(now); log_info("now: %d, %d", now.tv_sec, now.tv_usec); log_info("in_conn->is_reply() %d", in_conn->is_reply()); //数据包状态只能是kReadAll,kReadHalf,否则就直接关闭 if (getRes != kReadAll && getRes != kReadHalf) { // kReadError kReadClose kFullError kParseError should_close = 1; } else if (in_conn->is_reply()) { //是否给client socket回复数据,如果回复了,需要异步发送数据,关注写事件 pink_epoll_->PinkModEvent(pfe->fd_, 0, EPOLLOUT); } else { continue; } } //可写事件 if (pfe->mask_ & EPOLLOUT) { in_conn = static_cast<Conn *>(iter->second); log_info("in work thead SendReply before"); //写入数据,写完之后判断写入状态 WriteStatus write_status = in_conn->SendReply(); log_info("in work thead SendReply after"); //写完状态,需要重新关注读事件 if (write_status == kWriteAll) { in_conn->set_is_reply(false); pink_epoll_->PinkModEvent(pfe->fd_, 0, EPOLLIN); } else if (write_status == kWriteHalf) { continue; } else if (write_status == kWriteError) { should_close = 1; } } //判断socket是否出错,需要关闭 if ((pfe->mask_ & EPOLLERR) || (pfe->mask_ & EPOLLHUP) || should_close) { log_info("close pfe fd here"); { RWLock l(&rwlock_, true); pink_epoll_->PinkDelEvent(pfe->fd_); close(pfe->fd_); delete(in_conn); in_conn = NULL; conns_.erase(pfe->fd_); } } } } } cleanup(); return NULL; }