Worker Thread

初始化

创建一对pipe fd,一个用来读,一个用来写,主要用来于DispatchThread通信,当DispatchThread接收到新的socket,需要将这些socket分配给work thread,由work thread来处理这些socket山的事件,充分利用多核。

template <typename Conn>
class WorkerThread : public Thread
{
public:
  WorkerThread(int cron_interval = 0):
    Thread::Thread(cron_interval)
  {
    /*
     * install the protobuf handler here
     */
    log_info("WorkerThread construct");
    pthread_rwlock_init(&rwlock_, NULL);
    pink_epoll_ = new PinkEpoll();
    int fds[2];
    if (pipe(fds)) {
      // LOG(FATAL) << "Can't create notify pipe";
      log_err("Can't create notify pipe");
    }
    notify_receive_fd_ = fds[0];
    notify_send_fd_ = fds[1];
    pink_epoll_->PinkAddEvent(notify_receive_fd_, EPOLLIN | EPOLLERR | EPOLLHUP);
  }
  virtual ~WorkerThread() {
    should_exit_ = true;
    pthread_join(thread_id(), NULL);
    delete(pink_epoll_);
  }

  virtual void CronHandle() {
  }
  /*
   * The PbItem queue is the fd queue, receive from dispatch thread
   */
  std::queue<PinkItem> conn_queue_;
  int notify_receive_fd() { 
    return notify_receive_fd_; 
  }
  int notify_send_fd() { 
    return notify_send_fd_; 
  }
  PinkEpoll* pink_epoll() {
    return pink_epoll_;
  }
  Mutex mutex_;
  pthread_rwlock_t rwlock_;
  std::map<int, void *> conns_;
private:
  /*
   * These two fd receive the notify from dispatch thread
   */
  int notify_receive_fd_;
  int notify_send_fd_;
  /*
   * The epoll handler
   */
  PinkEpoll *pink_epoll_;
  // clean conns
  void cleanup()
  {
    RWLock l(&rwlock_, true);
    Conn *in_conn;
    std::map<int, void *>::iterator iter = conns_.begin();
    for (; iter != conns_.end(); iter++) {
         close(iter->first);
         in_conn = static_cast<Conn *>(iter->second);
         delete in_conn;
    }
  }
};

工作线程

  • 当事件是由notify_receive_fd_这个socket触发
    • EPOLLIN事件
      • notify_receive_fd_读出1个字节,保证不会重复触发可读事件
      • 从队列中取出PinkItem对象
      • 创建Conn指针,设置非阻塞,放到变量conns_这个map中,key是socket的fd,value是Conn指针
      • 给这个fd添加EPOLLIN选项,创建事件变量,添加到EventLoop中,标识需要监控这个fd上的读事件
  • 事件不是由notify_receive_fd_触发

    • EPOLLIN事件
      • GetRequest完成socket上数据的读取并判断状态,是否出错,如果出错直接关闭连接,只有kReadAll,kReadHalf这两个状态正常
      • 更新socket的最后活跃时间
      • 调用is_reply判断是否有数据需要发送,如果有数据需要发送,需要对当前socket增加EPOLLOUT选项,重新加入EventLoop
    • EPOLLOUT事件
      • SendReply判断是否发送完
        • kWriteAll
          • 设置不需要再发送数据标志位
          • 给当前socket增加EPOLLIN选项,重新加入EventLoop
        • kWriteHalf
          • 下次继续发送
        • kWriteError
          • 关闭socket
    virtual void *ThreadMain()
    {
     int nfds;
     PinkFiredEvent *pfe = NULL;
     char bb[1];
     PinkItem ti;
     Conn *in_conn = NULL;
    
     struct timeval when;
     gettimeofday(&when, NULL);
     struct timeval now = when;
    
     when.tv_sec += (cron_interval_ / 1000);
     when.tv_usec += ((cron_interval_ % 1000 ) * 1000);
     int timeout = cron_interval_;
     if (timeout <= 0) {
       timeout = PINK_CRON_INTERVAL;
     }
    
     while (!should_exit_) {
       if (cron_interval_ > 0 ) {
         gettimeofday(&now, NULL);
         if (when.tv_sec > now.tv_sec || (when.tv_sec == now.tv_sec && when.tv_usec > now.tv_usec)) {
           timeout = (when.tv_sec - now.tv_sec) * 1000 + (when.tv_usec - now.tv_usec) / 1000;
         } else {
           when.tv_sec = now.tv_sec + (cron_interval_ / 1000);
           when.tv_usec = now.tv_usec + ((cron_interval_ % 1000 ) * 1000);
           CronHandle();
           timeout = cron_interval_;
         }
       }
       nfds = pink_epoll_->PinkPoll(timeout);
       for (int i = 0; i < nfds; i++) {
         pfe = (pink_epoll_->firedevent()) + i;
         log_info("pfe->fd_ %d pfe->mask_ %d", pfe->fd_, pfe->mask_);
         //判断是否是管道fd
         if (pfe->fd_ == notify_receive_fd_) {
           //管道可读事件,说明DispatchThread写入了数据,有新的连接需要处理
           if (pfe->mask_ & EPOLLIN) {
             //读出一个字节数据
             read(notify_receive_fd_, bb, 1);
             //连接队列中取出连接ti
             {
               MutexLock l(&mutex_);
               ti = conn_queue_.front();
               conn_queue_.pop();
             }
             //创建Conn
             Conn *tc = new Conn(ti.fd(), ti.ip_port(), this);
             //修改为非阻塞
             tc->SetNonblock();
             //保存fd->Conn指针映射关系
             {
             RWLock l(&rwlock_, true);
             conns_[ti.fd()] = tc;
             }
             //监控这个client socket上的读事件
             pink_epoll_->PinkAddEvent(ti.fd(), EPOLLIN);
             log_info("receive one fd %d", ti.fd());
           } else {
             continue;
           }
         } else {
           //不是管道fd,则一定是client socket上的事件
           in_conn = NULL;
           int should_close = 0;
           std::map<int, void *>::iterator iter = conns_.begin();
           if (pfe == NULL) {
             continue;
           }
           //通过fd查Conn指针,一般来说应该是可以找到的,找不到直接关闭fd
           iter = conns_.find(pfe->fd_);
           if (iter == conns_.end()) {
             pink_epoll_->PinkDelEvent(pfe->fd_);
             continue;
           }
           //是否是读事件
           if (pfe->mask_ & EPOLLIN) {
             in_conn = static_cast<Conn *>(iter->second);
             //读取数据,读完之后判断状态
             ReadStatus getRes = in_conn->GetRequest();
             in_conn->set_last_interaction(now);
             log_info("now: %d, %d", now.tv_sec, now.tv_usec);
             log_info("in_conn->is_reply() %d", in_conn->is_reply());
             //数据包状态只能是kReadAll,kReadHalf,否则就直接关闭
             if (getRes != kReadAll && getRes != kReadHalf) {
               // kReadError kReadClose kFullError kParseError
               should_close = 1;
             } else if (in_conn->is_reply()) {
               //是否给client socket回复数据,如果回复了,需要异步发送数据,关注写事件
               pink_epoll_->PinkModEvent(pfe->fd_, 0, EPOLLOUT);
             } else {
               continue;
             }
           }
           //可写事件
           if (pfe->mask_ & EPOLLOUT) {
             in_conn = static_cast<Conn *>(iter->second);
             log_info("in work thead SendReply before");
             //写入数据,写完之后判断写入状态
             WriteStatus write_status = in_conn->SendReply();
             log_info("in work thead SendReply after");
             //写完状态,需要重新关注读事件
             if (write_status == kWriteAll) {
               in_conn->set_is_reply(false);
               pink_epoll_->PinkModEvent(pfe->fd_, 0, EPOLLIN);
             } else if (write_status == kWriteHalf) {
               continue;
             } else if (write_status == kWriteError) {
               should_close = 1;
             }
           }
           //判断socket是否出错,需要关闭
           if ((pfe->mask_  & EPOLLERR) || (pfe->mask_ & EPOLLHUP) || should_close) {
             log_info("close pfe fd here");
             {
             RWLock l(&rwlock_, true);
             pink_epoll_->PinkDelEvent(pfe->fd_);
             close(pfe->fd_);
             delete(in_conn);
             in_conn = NULL;
             conns_.erase(pfe->fd_);
             }
           }
         }
       }
     }
     cleanup();
     return NULL;
    }
    

results matching ""

    No results matching ""