C++实战——高并发服务器

文章代码来源:高并发Web服务器

简单介绍

服务器的工作实际上就是监听并且处理各种各样的事件。当服务器处于启动状态,就会不断用Epoll去监听,正在被监听的文件描述符事件。根据返回值和事件类型做出相应的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
while(!isClose_) {
if(timeoutMS_ > 0) {
timeMS = timer_->GetNextTick();
}
//一直使用 epoller_->Wait 去检测事件 返回值 有多少个
int eventCnt = epoller_->Wait(timeMS);
//遍历这些事件
for(int i = 0; i < eventCnt; i++) {
/* 处理事件 */
int fd = epoller_->GetEventFd(i);
uint32_t events = epoller_->GetEvents(i);
//如果是监听描述符 说明有监听进来
if(fd == listenFd_) {
DealListen_(); //处理事件监听
}
//出现了错误 关闭文件描述符
else if(events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {
assert(users_.count(fd) > 0);
CloseConn_(&users_[fd]);
}
//非监听文件描述符有事件 处理读操作
else if(events & EPOLLIN) {
assert(users_.count(fd) > 0);
DealRead_(&users_[fd]);
}
//处理写操作
else if(events & EPOLLOUT) {
assert(users_.count(fd) > 0);
DealWrite_(&users_[fd]);
} else {//关闭连接
LOG_ERROR("Unexpected event");
}
}
}

这里面就挑选个人感觉比较重要的 两个时间的处理代码。

处理监听事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void WebServer::AddClient_(int fd, sockaddr_in addr) {
assert(fd > 0);
users_[fd].init(fd, addr);
if(timeoutMS_ > 0) {
timer_->add(fd, timeoutMS_, std::bind(&WebServer::CloseConn_, this, &users_[fd]));
}
epoller_->AddFd(fd, EPOLLIN | connEvent_);
SetFdNonblock(fd);
LOG_INFO("Client[%d] in!", users_[fd].GetFd());
}

void WebServer::DealListen_() {
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
do {
int fd = accept(listenFd_, (struct sockaddr *)&addr, &len);
if(fd <= 0) { return;}
//超过最大数量 做相应处理
else if(HttpConn::userCount >= MAX_FD) {
SendError_(fd, "Server busy!");
LOG_WARN("Clients is full!");
return;
}
AddClient_(fd, addr);//添加客户端
} while(listenEvent_ & EPOLLET); //如果是ET模式 就必须一次性把事件读完
}

处理读写

处理读写事件主要工作相当于将读数据的任务发布给线程池。经过了很多封装,这里面要去源码去层层查看理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void WebServer::DealRead_(HttpConn* client) {
assert(client);
ExtentTime_(client);
threadpool_->AddTask(std::bind(&WebServer::OnRead_, this, client));
}
void WebServer::DealWrite_(HttpConn* client) {
assert(client);
ExtentTime_(client);
threadpool_->AddTask(std::bind(&WebServer::OnWrite_, this, client));
}

void WebServer::OnRead_(HttpConn* client) {
assert(client);
int ret = -1;
int readErrno = 0;
ret = client->read(&readErrno); //读取客户端数据
if(ret <= 0 && readErrno != EAGAIN) {
CloseConn_(client);
return;
}
//业务逻辑处理
OnProcess(client);
}
void WebServer::OnWrite_(HttpConn* client) {
assert(client);
int ret = -1;
int writeErrno = 0;
ret = client->write(&writeErrno);
if(client->ToWriteBytes() == 0) {
/* 传输完成 */
if(client->IsKeepAlive()) {
//业务逻辑处理
OnProcess(client);
return;
}
}
else if(ret < 0) {
if(writeErrno == EAGAIN) {
/* 继续传输 */
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);
return;
}
}
CloseConn_(client);
}

业务处理逻辑

主要解释上部分处理读写事件中的OnProcess的,这部分代码 线程池里的子线程处理 。注意这里并不是真正的处理逻辑,真正的处理逻辑是client->process(),他的返回值为真的话,说明处理完一个用户请求了,该做出相应了所以可以看到监听描述符的时间是EPOLLOUT。当监听到TCP缓冲区不满也就是可写,就会开始写的任务。否则就还是处理读(监听读使事件)。注意这里的读写是对于TCP缓冲区而言。

1
2
3
4
5
6
7
8
void WebServer::OnProcess(HttpConn* client) {
//如果client->process()为真 则说明处理完请求,要做出相应 所以事件是EPOLLOUT
if(client->process()) {
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);
} else {
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLIN);
}
}

(待更新。。。)