Dispatch Thread
初始化
DispatchThread
继承于Thread
,初始化函数中需要提供监听的port
,挂接的WorkerThread
数量和指针数组,以及定时器触发周期。DispatchThread
创建PinkEpoll
,然后对监听的socket上增加EPOLLIN | EPOLLERR | EPOLLHUP
事件监控,然后添加到PinkEpoll
中。
- 保存工作线程指针
- 创建监听socket,在指定端口监听
- 开始监听
- 创建EventLoop
- 将listen socket加入到EventLoop监控范围之内
- 启动工作线程
template <typename T>
class DispatchThread : public Thread
{
public:
// This type Dispatch thread just get Connection and then Dispatch the fd to
// worker thead
/**
* @brief
*
* @param port the port number
* @param work_num
* @param worker_thread the worker thred we define
* @param cron_interval the cron job interval
*/
DispatchThread(int port, int work_num, WorkerThread<T> **worker_thread, int cron_interval = 0) :
Thread::Thread(cron_interval),
work_num_(work_num)
{
//保存工作线程指针
worker_thread_ = worker_thread;
//创建监听socket,在指定端口监听
server_socket_ = new ServerSocket(port);
//开始监听
server_socket_->Listen();
// init epoll
//创建EventLoop
pink_epoll_ = new PinkEpoll();
//将listen socket加入到EventLoop监控范围之内
pink_epoll_->PinkAddEvent(server_socket_->sockfd(), EPOLLIN | EPOLLERR | EPOLLHUP);
last_thread_ = 0;
//启动工作线程
for (int i = 0; i < work_num_; i++) {
worker_thread_[i]->StartThread();
}
}
int work_num() {
return work_num_;
}
WorkerThread<T>** worker_thread() {
return worker_thread_;
}
virtual ~DispatchThread() {
should_exit_ = true;
pthread_join(thread_id(), NULL);
delete(pink_epoll_);
server_socket_->Close();
delete(server_socket_);
}
virtual void CronHandle() {
}
virtual bool AccessHandle(std::string& ip) {
return true;
}
private:
/*
* The tcp server port and address
*/
ServerSocket *server_socket_;
/*
* The Epoll event handler
*/
PinkEpoll *pink_epoll_;
/*
* Here we used auto poll to find the next work thread,
* last_thread_ is the last work thread
*/
int last_thread_;
int work_num_;
/*
* This is the work threads
*/
WorkerThread<T> **worker_thread_;
// No copying allowed
DispatchThread(const DispatchThread&);
void operator=(const DispatchThread&);
};
EventLoop线程
EventLoop
线程主要处理定时器事件,以及server socket
上的事件。
- 设置定时器的情况下,首先判断定时器是否超时,超时的时候执行
CronHandle
函数,重新计算下次超时时间,这个时间需要作为epoll_wait
的其中一个参数 处理io事件,
DispatchThread
线程只处理server socket
上的事件EPOLLIN
事件- 调用
accept
函数得到client socket,调用AccessHandle
来验证是否允许连接,如果允许连接则新建PinkItem
对象,将这个对象压入,workerthread的conn_queue_
中,在挑选workerthread的时候,按照轮询的策略来做,简单但是足够有效。 - 通过写pipe的方式唤醒work_thread,work_thread新建的时候需要创建一对pipe fd,一个用来读,一个用来写。
- 调用
EPOLLERR
或者EPOLLHUP
事件- 关闭server socket
线程函数如下:
virtual void *ThreadMain() { int nfds; PinkFiredEvent *pfe; Status s; struct sockaddr_in cliaddr; socklen_t clilen = sizeof(struct sockaddr); int fd, connfd; struct timeval when; struct timeval now; gettimeofday(&when, NULL); when.tv_sec += (cron_interval_ / 1000); when.tv_usec += ((cron_interval_ % 1000 ) * 1000); int timeout = cron_interval_; if (timeout <= 0) { timeout = PINK_CRON_INTERVAL; } std::string ip_port; char port_buf[32]; char ip_addr[INET_ADDRSTRLEN] = ""; while (!should_exit_) { //设置定时器的情况下 if (cron_interval_ > 0 ) { gettimeofday(&now, NULL); //定时器未到期,设置下次超时时间 if (when.tv_sec > now.tv_sec || (when.tv_sec == now.tv_sec && when.tv_usec > now.tv_usec)) { timeout = (when.tv_sec - now.tv_sec) * 1000 + (when.tv_usec - now.tv_usec) / 1000; } else { //定时器到期,执行CronHandle, 设置下次超时时间 when.tv_sec = now.tv_sec + (cron_interval_ / 1000); when.tv_usec = now.tv_usec + ((cron_interval_ % 1000 ) * 1000); CronHandle(); timeout = cron_interval_; } } nfds = pink_epoll_->PinkPoll(timeout); /* * we just have the listen socket fd in the epoll */ for (int i = 0; i < nfds; i++) { pfe = (pink_epoll_->firedevent()) + i; fd = pfe->fd_; log_info("come fd is %d\n", fd); //判断是否是listen socket if (fd == server_socket_->sockfd()) { //判断是否是读事件? if (pfe->mask_ & EPOLLIN) { //调用accept函数,返回client socket //重点看看accept函数出错异常情况处理? connfd = accept(server_socket_->sockfd(), (struct sockaddr *) &cliaddr, &clilen); log_info("Connect fd %d", connfd); //这个地方处理的有问题 if (connfd == -1) { if (errno != EWOULDBLOCK) { continue; } } //设置socket的FD_CLOEXEC选项 fcntl(connfd, F_SETFD, fcntl(connfd, F_GETFD) | FD_CLOEXEC); ip_port = inet_ntop(AF_INET, &cliaddr.sin_addr, ip_addr, sizeof(ip_addr)); //通过ip地址判断 if (!AccessHandle(ip_port)) { close(connfd); continue; } ip_port.append(":"); sprintf(port_buf, "%d", ntohs(cliaddr.sin_port)); //组合成$ip:$port ip_port.append(port_buf); //加入工作线程连接队列 std::queue<PinkItem> *q = &(worker_thread_[last_thread_]->conn_queue_); PinkItem ti(connfd, ip_port); { MutexLock l(&worker_thread_[last_thread_]->mutex_); q->push(ti); } //写入pipe,通知这个线程有新连接,需要处理 write(worker_thread_[last_thread_]->notify_send_fd(), "", 1); last_thread_++; last_thread_ %= work_num_; //如果有错误就关闭 } else if (pfe->mask_ & (EPOLLERR | EPOLLHUP)) { /* * this branch means there is error on the listen fd */ log_info("close the fd here"); close(fd); } } else { //不是listen socket上的事件忽略 continue; } } } return NULL; }