Dispatch Thread

初始化

DispatchThread继承于Thread,初始化函数中需要提供监听的port,挂接的WorkerThread数量和指针数组,以及定时器触发周期。DispatchThread创建PinkEpoll,然后对监听的socket上增加EPOLLIN | EPOLLERR | EPOLLHUP事件监控,然后添加到PinkEpoll中。

  • 保存工作线程指针
  • 创建监听socket,在指定端口监听
  • 开始监听
  • 创建EventLoop
  • 将listen socket加入到EventLoop监控范围之内
  • 启动工作线程
template <typename T>
class DispatchThread : public Thread
{
public:
  // This type Dispatch thread just get Connection and then Dispatch the fd to
  // worker thead
  /**
   * @brief 
   *
   * @param port the port number
   * @param work_num
   * @param worker_thread the worker thred we define
   * @param cron_interval the cron job interval
   */
  DispatchThread(int port, int work_num, WorkerThread<T> **worker_thread, int cron_interval = 0) :
    Thread::Thread(cron_interval),
    work_num_(work_num)
  {
   //保存工作线程指针
    worker_thread_ = worker_thread;
    //创建监听socket,在指定端口监听
    server_socket_ = new ServerSocket(port);
    //开始监听
    server_socket_->Listen();
    // init epoll
    //创建EventLoop
    pink_epoll_ = new PinkEpoll();
    //将listen socket加入到EventLoop监控范围之内
    pink_epoll_->PinkAddEvent(server_socket_->sockfd(), EPOLLIN | EPOLLERR | EPOLLHUP);

    last_thread_ = 0;
    //启动工作线程
    for (int i = 0; i < work_num_; i++) {
      worker_thread_[i]->StartThread();
    }
  }

  int work_num() {
    return work_num_;
  }

  WorkerThread<T>** worker_thread() {
    return worker_thread_;
  }

  virtual ~DispatchThread() {
    should_exit_ = true;
    pthread_join(thread_id(), NULL);
    delete(pink_epoll_);
    server_socket_->Close();
    delete(server_socket_);
  }

  virtual void CronHandle() {
  }

  virtual bool AccessHandle(std::string& ip) {
    return true;
  }
private:
  /*
   * The tcp server port and address
   */
  ServerSocket *server_socket_;
  /*
   * The Epoll event handler
   */
  PinkEpoll *pink_epoll_;
  /*
   * Here we used auto poll to find the next work thread,
   * last_thread_ is the last work thread
   */
  int last_thread_;
  int work_num_;
  /*
   * This is the work threads
   */
  WorkerThread<T> **worker_thread_;
  // No copying allowed
  DispatchThread(const DispatchThread&);
  void operator=(const DispatchThread&);
};

EventLoop线程

EventLoop线程主要处理定时器事件,以及server socket上的事件。

  • 设置定时器的情况下,首先判断定时器是否超时,超时的时候执行CronHandle函数,重新计算下次超时时间,这个时间需要作为epoll_wait的其中一个参数
  • 处理io事件,DispatchThread线程只处理server socket上的事件

    • EPOLLIN事件
      • 调用accept函数得到client socket,调用AccessHandle来验证是否允许连接,如果允许连接则新建PinkItem对象,将这个对象压入,workerthreadconn_queue_中,在挑选workerthread的时候,按照轮询的策略来做,简单但是足够有效。
      • 通过写pipe的方式唤醒work_thread,work_thread新建的时候需要创建一对pipe fd,一个用来读,一个用来写。
    • EPOLLERR或者EPOLLHUP事件
      • 关闭server socket
  • 线程函数如下:

    virtual void *ThreadMain()
    {
      int nfds;
      PinkFiredEvent *pfe;
      Status s;
      struct sockaddr_in cliaddr;
      socklen_t clilen = sizeof(struct sockaddr);
      int fd, connfd;
    
      struct timeval when;
      struct timeval now;
      gettimeofday(&when, NULL);
    
      when.tv_sec += (cron_interval_ / 1000);
      when.tv_usec += ((cron_interval_ % 1000 ) * 1000);
      int timeout = cron_interval_;
      if (timeout <= 0) {
        timeout = PINK_CRON_INTERVAL;
      }
    
      std::string ip_port;
      char port_buf[32];
      char ip_addr[INET_ADDRSTRLEN] = "";
    
      while (!should_exit_) {
        //设置定时器的情况下
        if (cron_interval_ > 0 ) {
          gettimeofday(&now, NULL);
          //定时器未到期,设置下次超时时间
          if (when.tv_sec > now.tv_sec || (when.tv_sec == now.tv_sec && when.tv_usec > now.tv_usec)) {
            timeout = (when.tv_sec - now.tv_sec) * 1000 + (when.tv_usec - now.tv_usec) / 1000;
          } else {
            //定时器到期,执行CronHandle, 设置下次超时时间
            when.tv_sec = now.tv_sec + (cron_interval_ / 1000);
            when.tv_usec = now.tv_usec + ((cron_interval_ % 1000 ) * 1000);
            CronHandle();
            timeout = cron_interval_;
          }
        }
    
        nfds = pink_epoll_->PinkPoll(timeout);
        /*
         * we just have the listen socket fd in the epoll 
         */
        for (int i = 0; i < nfds; i++) {
          pfe = (pink_epoll_->firedevent()) + i;
          fd = pfe->fd_;
          log_info("come fd is %d\n", fd);
          //判断是否是listen socket
          if (fd == server_socket_->sockfd()) {
            //判断是否是读事件?
            if (pfe->mask_ & EPOLLIN) {
              //调用accept函数,返回client socket
              //重点看看accept函数出错异常情况处理?
              connfd = accept(server_socket_->sockfd(), (struct sockaddr *) &cliaddr, &clilen);
              log_info("Connect fd %d", connfd);
              //这个地方处理的有问题
              if (connfd == -1) {
                if (errno != EWOULDBLOCK) {
                  continue;
                }
              }
              //设置socket的FD_CLOEXEC选项
              fcntl(connfd, F_SETFD, fcntl(connfd, F_GETFD) | FD_CLOEXEC);
    
              ip_port = inet_ntop(AF_INET, &cliaddr.sin_addr, ip_addr, sizeof(ip_addr));
              //通过ip地址判断
              if (!AccessHandle(ip_port)) {
                close(connfd);
                continue;
              }
    
              ip_port.append(":");
              sprintf(port_buf, "%d", ntohs(cliaddr.sin_port));
              //组合成$ip:$port
              ip_port.append(port_buf);
    
              //加入工作线程连接队列
              std::queue<PinkItem> *q = &(worker_thread_[last_thread_]->conn_queue_);
              PinkItem ti(connfd, ip_port);
              {
                MutexLock l(&worker_thread_[last_thread_]->mutex_);
                q->push(ti);
              }
              //写入pipe,通知这个线程有新连接,需要处理
              write(worker_thread_[last_thread_]->notify_send_fd(), "", 1);
              last_thread_++;
              last_thread_ %= work_num_;
            //如果有错误就关闭
            } else if (pfe->mask_ & (EPOLLERR | EPOLLHUP)) {
              /*
               * this branch means there is error on the listen fd
               */
              log_info("close the fd here");
              close(fd);
            }
          } else {
            //不是listen socket上的事件忽略
            continue;
          }
        }
      }
      return NULL;
    }
    

results matching ""

    No results matching ""