基于Reactor模式下的epoll多路复用服务器
创始人
2025-05-28 02:55:25
0

文章目录

  • 一、认识Reactor模式
    • 1.1 Reactor 模式的概念
    • 1.2 Reactor 模式的组件
    • 1.3 Reactor 模式的流程
    • 1.4 Reactor 模式的优点
  • 二、Reactor模式下的 epoll ET服务器
    • 2.1 总体设计思路
    • 2.2 Connection 类结构
    • 2.3 封装 socket 实现 Sock 类
    • 2.4 封装 epoll 实现 Epoller 类
      • 2.4.1 CreateEpoller函数
      • 2.4.2 AddEvent函数
      • 2.4.3 DelEvent函数
      • 2.4.4 ModEvent函数
      • 2.4.5 Loop函数
    • 2.5 SetNonBlock函数
    • 2.6 基于Reactor模式设计TcpServer类
      • 2.6.1 TcpServer类的结构
      • 2.6.2 回调函数
        • 2.6.2.1 Accepter函数
        • 2.6.2.2 TcpRecver函数
        • 2.6.2.3 TcpSender函数
        • 2.6.2.4 TcpExcepter函数
      • 2.6.3 AddConnection函数
      • 2.6.4 Dispatcher事件派发函数
      • 2.6.5 EnableReadWrite函数
  • 三、简单的业务处理
    • 3.1 简单协议定制
    • 3.2 业务处理函数
    • 3.4 运行服务器
  • 四、总结


一、认识Reactor模式

1.1 Reactor 模式的概念

Reactor模式称为反应器模式或应答者模式,是基于事件驱动的设计模式。Reactor模式是一种常用于网络编程的设计模式,它旨在提供一种高效且可扩展的方式来处理并发请求。该模式的核心思想是将请求的处理逻辑与输入输出分离开来,通过异步I/O和事件驱动的方式来处理请求。

1.2 Reactor 模式的组件

Reactor模式包含以下主要组件:

  1. Reactor:该组件负责处理事件循环并分发事件给对应的处理器。它使用一个或多个I/O线程来监听事件,并根据不同的事件类型将请求路由到不同的处理器。
  2. Handlers:处理器负责处理特定类型的请求。例如,HTTP请求可以由HTTP处理器处理,TCP请求可以由TCP处理器处理。每个处理器都包含了处理特定请求类型的逻辑,并且在事件触发时调用对应的处理函数来完成请求处理。
  3. Synchronous Event Demultiplexer:该组件用于等待并监视输入事件,例如来自客户端的连接请求。当事件发生时,它会将事件通知给Reactor组件。
  4. Asynchronous Event Demultiplexer:该组件用于等待并监视异步I/O操作的完成事件。当I/O操作完成时,它会将事件通知给Reactor组件。

1.3 Reactor 模式的流程

Reactor模式的基本流程如下:

  1. Reactor组件启动并开始监听输入事件。
  2. 当有输入事件发生时,Reactor组件会将事件通知给对应的处理器。
  3. 处理器使用异步I/O进行请求处理,当请求处理完成时,它会将响应数据写入输出缓冲区。
  4. Reactor组件检查输出缓冲区是否有数据需要写入客户端,如果有则进行输出操作。
  5. 重复步骤2-4,直到连接关闭或出现错误。

1.4 Reactor 模式的优点

Reactor模式具有以下优点:

  1. 高并发:Reactor模式采用异步I/O和事件驱动的方式,可以处理大量并发请求。

  2. 高性能:Reactor模式避免了每个请求都创建一个新的线程或进程的开销,从而提高了性能。

  3. 可扩展性:Reactor模式支持添加新的处理器来处理不同类型的请求,并且可以通过添加更多的I/O线程来处理更多的请求。

  4. 可维护性:Reactor模式将请求处理逻辑与输入输出分离开来,降低了系统的耦合度,从而使得系统更易于维护和扩展。

二、Reactor模式下的 epoll ET服务器

2.1 总体设计思路

在epoll ET服务器中,主要处理三个事件,那就是读、写、异常就绪。在服务器中设置了关于处理这些事件的回调函数。

  • 读就绪:如果是监听套接字的读就绪则调用Accepter函数获取连接,如果其他套接字的读就绪则调用TcpRecver读取客户端发来的消息。
  • 写就绪:如果写事件就绪就调用 TcpSender函数将待发送的数据写入到发生缓冲区中。
  • 异常就绪:如果程序运行中出现了异常,就调用TcpExcepter函数来关闭相应的套接字及释放其资源。

在该服务器中,将所有的文件描述符都交给epoll进行监管,如果发生了事件就绪,就通过Dispatcher事件派发函数派发事件给相应的回调函数进行处理。

2.2 Connection 类结构

因为每一个服务端与客户端的连接都有一个套接字、输入输出缓冲区、读写异常回调函数,因此将这个连接封装成一个Connection类。
Connection 类的结构如下:

class Connection
{
public:int _sock;// 当前sock对应的TcpServer对象TcpServer *_R;// 输入缓冲区std::string _inbuffer;// 输出缓冲区std::string _outbuffer;// 回调方法// 读取func_t _recver;// 发送func_t _sender;// 异常func_t _excepter;public:Connection(int sock, TcpServer *r) : _sock(sock), _R(r) {}~Connection() {}void SetRecver(func_t recver) { _recver = recver; }void SetSender(func_t sender) { _sender = sender; }void SetExcepter(func_t excepter) { _excepter = excepter; }
};
  • 该类中包含了一个_sock的成员变量,即当前连接的文件描述符。
  • 该类中包含了一个指向Reactor服务器的回指指针_R,便于在外部使用Connection对象调用Reactor中的成员函数。
  • 该类中包含了对应文件描述符的读写缓冲区_inbuffer_outbuffer,用于暂时缓存数据。
  • 该类中包含了_recver_sender_excepter三个回调函数对象,分别用于处理读、写、异常事件。
  • 在该类中还提供了SetRecverSetSenderSetExcepter三个设置读写异常的回调方法。其中func_t 函数对象的定义如下:
using func_t = std::function;

该函数对象的参数是一个Connection类型的指针,后续在进行回调的时候可以通过该指针访问Connection中的元素,返回值为整型。

2.3 封装 socket 实现 Sock 类

这里对socket进行了简单的封装,并且其中的方法全部设置为静态,便于使用。其结构如下:

#pragma once#include 
#include 
#include 
#include 
#include 
#include 
#include "Log.hpp"class Sock
{
public:static const int gbacklog = 3;static int Socket(){int listenSock = socket(PF_INET, SOCK_STREAM, 0);if (listenSock < 0){logMsg(FATAL, "socket create: %d:%s", errno, strerror(errno));exit(-1);}// 运行服务器快速重启int opt = 1;setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));return listenSock;}static void Bind(int sock, uint16_t port){struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = PF_INET;local.sin_addr.s_addr = INADDR_ANY;local.sin_port = htons(port);if (bind(sock, (const sockaddr *)&local, sizeof local) < 0){logMsg(FATAL, "socket bind: %d:%s", errno, strerror(errno));exit(-1);}}static void Listen(int sock){if (listen(sock, gbacklog) < 0){logMsg(FATAL, "socket listen: %d:%s", errno, strerror(errno));exit(-1);}}static int Accept(int sock, std::string *clientIp, uint16_t *clientPort){struct sockaddr_in peer;socklen_t len = sizeof(peer);int sock_fd = accept(sock, (sockaddr *)&peer, &len);if (sock_fd < 0){// logMsg(FATAL, "socket accept: %d:%s", errno, strerror(errno));return -1;}if (clientIp)*clientIp = inet_ntoa(peer.sin_addr);if (clientPort)*clientPort = ntohs(peer.sin_port);return sock_fd;}
};
  • 其中Socket函数用于创建监听套接字,并且其中设置了setsocketopt函数使得服务器进程在退出后能够快速重启。
  • Bind函数用于绑定监听套接字对应的网络信息。
  • Listen函数用于监听客户端的连接。
  • Accept函数用于获取客户端的连接请求。

2.4 封装 epoll 实现 Epoller 类

这里同样也对epoll进行简单的封装,同样将其成员函数设置为静态方便后续调用。关于epoll可见博主的另一篇文章:IO多路复用(select、poll、epoll网络编程)。

2.4.1 CreateEpoller函数

CreateEpoller函数用于创建epoll实例:

class Epoller{static int CreateEpoller(){int epfd = epoll_create(SIZE);if (epfd == -1){logMsg(FATAL, "epoll_create error: %d: %s", errno, strerror(errno));exit(-1);}return epfd;}
}

2.4.2 AddEvent函数

AddEvent函数用于添加文件描述sockepoll实例中对其进行监视:

class AddEvent{static bool AddEvent(int epfd, int sock, uint32_t event){epoll_event ev;ev.data.fd = sock;ev.events = event;int res = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);return res == 0;}
}

2.4.3 DelEvent函数

DelEvent函数函数用于将指定的文件描述符从epoll实例中删除:

class DelEvent{static bool DelEvent(int epfd, int sock){int res = epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);return res == 0;}
}

2.4.4 ModEvent函数

ModEvent函数用于修改epoll实例监视的文件描述符的事件:

class ModEvent{static bool ModEvent(int epfd, int sock, uint32_t event){epoll_event ev;ev.data.fd = sock;ev.events = event;int res = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev);return res == 0;}
}

2.4.5 Loop函数

Loop函数用于从epoll中获取事件已经就绪的文件描述符:

class Loop
{static int Loop(int epfd, epoll_event revs[], int num){int n = epoll_wait(epfd, revs, num, -1);if(n == -1){logMsg(FATAL, "epoll_wait error: %d: %s", errno, strerror(errno));}return n;}
}

2.5 SetNonBlock函数

因为在epoll服务器中基本采用的是ET模式,而该模式要求程序使用非阻塞的IO操作方式,因此要将文件描述符设置为非阻塞,因此定义了SetNonBlock函数,将其封装在工具类Util中:

class Util
{
public:// 设置为非阻塞static void SetNonBlock(int fd){int _fd = fcntl(fd, F_GETFL);fcntl(fd, F_SETFL, _fd | O_NONBLOCK);}
};

2.6 基于Reactor模式设计TcpServer类

2.6.1 TcpServer类的结构

TcpServer类就是基于Reactor模式所设计的,其基本框架如下:

class TcpServer
{
public:TcpServer(callback_t cb, int port) : _listenSock(-1), _port(port), _epfd(-1), _cb(cb){_revs = new epoll_event[revs_num];// 网络_listenSock = Sock::Socket();Sock::Bind(_listenSock, _port);Sock::Listen(_listenSock);// 创建epoll实例_epfd = Epoller::CreateEpoller();// 添加listenSock到epoll, 并且建立与Connection之间的映射AddConnection(_listenSock, EPOLLIN | EPOLLET, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);}~TcpServer(){if (_listenSock != -1)close(_listenSock);if (_epfd != -1)close(_epfd);delete[] _revs;}public://添加文件描述符到epoll, 创建对应的Connection对象void AddConnection(int sockfd, uint32_t event, func_t recver, func_t sender, func_t excepter);// 用于处理连接请求就绪int Accepter(Connection *conn);// 普通IO的读、写、异常就绪处理函数int TcpRecver(Connection *conn);int TcpSender(Connection *conn);int TcpExcepter(Connection *conn);// 修改epoll对sock所关心的事件void EnableReadWrite(int sock, bool readable, bool writeable);// 派发就绪事件void Dispatcher();//判断unordered_map释放存在该sockbool IsExists(int sock);//用于运行服务器void Run();private:// 就绪事件列表的上限static const int revs_num = 64;// 监听的socketint _listenSock;// 监听的端口号int _port;// epoll实例int _epfd;// 就绪事件列表epoll_event *_revs;// sock 与 Connection之间的映射关系std::unordered_map _connections;// 处理业务请求的回调方法callback_t _cb;
};

该类中设置的成员变量有:

  • _listenSock:服务器监听的socket套接字。
  • _port:监听的端口号。
  • _epfd:epoll实例的文件描述符。
  • _revs:就绪事件列表,用于存储就绪的文件描述符epoll_event 的节点。
  • revs_num:就绪事件列表的最大数量。
  • _connections:所有文件描述符与其对应的Connection对象的映射关系集合。
  • _cb:处理业务请求的回调方法。

其中处理业务请求的回调方法_cb的类型是自定义的callback_t 回调函数,其定义如下:

using callback_t = std::function;

其中参数分别为Connection类型的指针和string类型的字符串,传入的Connection对象指针便于对相关文件描述符进行操作,而string字符串用于存储处理后的结果。

构造函数说明:
TcpServer类的构造函数中对成员变量进行初始化,包括socket套接字的创建、绑定和监听,创建epoll实例,以及将listenSock添加到epoll实例中进行监听和建立其与Connection对象之间的映射关系。由于listenSock关心的事件只有获取连接,因此在AddConnection函数中2传入的回调函数只有Accepter,其他两个不关心则传入nullptr即可。

2.6.2 回调函数

2.6.2.1 Accepter函数

Accepter函数用于2处理连接请求就绪,由于可能存在同时有大量的连接请求,因此在该函数的实现中使用了while循环:

int TcpServer::Accepter(Connection *conn)
{// 如果同时有大量的连接请求到了,就要使用循环进行处理while (true){std::string clientIp;uint16_t clientPort = 0;int sockfd = Sock::Accept(conn->_sock, &clientIp, &clientPort);if (sockfd < 0){if (errno == EINTR) // 信号中断continue;else if (errno == EAGAIN || errno == EWOULDBLOCK) // 发生阻塞break;else{logMsg(FATAL, "accpet error: %d: %s", errno, strerror(errno));return -1;}}logMsg(DEBUG, "get a new link: %d", sockfd);// 默认设置epoll只关心读事件,因为再最开始,写事件是就绪的,最后续代码运行过程中条件发生了改变,再修改epoll要关心的事件AddConnection(sockfd, EPOLLIN | EPOLLET, std::bind(&TcpServer::TcpRecver, this, std::placeholders::_1),std::bind(&TcpServer::TcpSender, this, std::placeholders::_1),std::bind(&TcpServer::TcpExcepter, this, std::placeholders::_1));}

在调用AddConnection函数传入回调函数参数时使用了C++11中的bind函数绑定相应的回调函数。

2.6.2.2 TcpRecver函数

TcpRecver函数用于处理普通文件描述符的读事件就绪,函数实现如下:

int TCPServer::TcpRecver(Connection *conn){// 由于ET模式下采取非阻塞,因此需要循环读取while (true){char buffer[1024];// 123\n -> 实际读取4个字节,因此要减 1ssize_t s = recv(conn->_sock, buffer, sizeof(buffer) - 1, 0);if (s > 0){buffer[s] = 0; // 去掉 \nconn->_inbuffer += buffer;}else if (s == 0){logMsg(DEBUG, "client quit");conn->_excepter(conn);break;}else{if (errno == EINTR)continue;else if (errno == EAGAIN || errno == EWOULDBLOCK)break;else{logMsg(WARNING, "recv error: %d: %s", errno, strerror(errno));conn->_excepter(conn);break;}}}// 本轮读取完成std::vector result;PackageSplit(conn->_inbuffer, &result);for (auto &message : result){std::cout << "message: " << message << " inbuffer: " << conn->_inbuffer << std::endl;_cb(conn, message);}return 0;}

由于采用了非阻塞的操作方法,为了保证读取报文的完整性,因此要进行循环式的不断读取,直到该轮读取完毕才退出,而且每次循环读取都要将读取的数据拼接的对应的Connection对象的输入缓冲区_inbuffer中。

其中:

  • EINTR:指操作被中断唤醒,需要重新读/写。
  • EAGAINEWOULDBLOCK:表明读取完毕,输入缓冲为空。
  • 如果在函数的调用过程中出现了错误,就调用相应的异常处理函数。

EAGAINEWOULDBLOCK的说明:

非阻塞模式(比如epollET模式下进行recv,对应的fd文件描述符设置为非阻塞)下调用了阻塞操作(可理解为已经将输入缓冲区的数据读取完毕),在该操作没有完成就返回这个错误,这个错误不会破坏socket的同步,因此不用管它,下次循环接着recv读取就可以。所以对非阻塞socket而言,EAGAIN不是一种错误。在VxWorksWindows上,EAGAIN的名字也叫做EWOULDBLOCK

2.6.2.3 TcpSender函数

TcpSender函数用于处理普通文件描述符的写事件就绪:

int TcpServer::TcpSender(Connection *conn)
{while (true){ssize_t n = send(conn->_sock, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);if (n > 0){conn->_outbuffer.erase(0, n);}else{if (errno == EINTR)continue;else if (errno == EAGAIN || errno == EWOULDBLOCK)break;else{conn->_excepter(conn);logMsg(WARNING, "send error: %d: %s", errno, strerror(errno));break;}}}return 0;
}

该函数的设计思路基本上和TcpRecver一样。

2.6.2.4 TcpExcepter函数

TcpExcepter函数用于处理普通文件描述符的异常事件就绪:

int TcpServer::TcpExcepter(Connection *conn)
{if (!IsExists(conn->_sock))return -1;// 1. 从epoll中移除sockEpoller::DelEvent(_epfd, conn->_sock);logMsg(DEBUG, "remove sock from epoll!");// 2. 关闭sockclose(conn->_sock);logMsg(DEBUG, "close sock: %d!", conn->_sock);// 3. delete conndelete _connections[conn->_sock];logMsg(DEBUG, "delete connection!");// 4. map中移除sock_connections.erase(conn->_sock);logMsg(DEBUG, "rease connection from connections");return 0;}

在本服务器的实现中就是当某一个连接出现异常时,处理方式就是断开该连接。在异常处理中包含四个步骤:

  1. epoll实例中移除对该文件描述符的监管。
  2. 调用close函数关闭文件描述符。
  3. 使用delete释放 Connection 对象的资源。
  4. 把该文件描述符与其Connection对象的映射关系从unordered_map集合中移除。

2.6.3 AddConnection函数

该函数用于添加连接关系:

void TcpServer::AddConnection(int sockfd, uint32_t event, func_t recver, func_t sender, func_t excepter)
{// 1. 添加sock到epoll// 如果是ET模式,设置为非阻塞if (event & EPOLLET)Util::SetNonBlock(sockfd);Epoller::AddEvent(_epfd, sockfd, event);Connection *conn = new Connection(sockfd, this);conn->SetRecver(recver);conn->SetSender(sender);conn->SetExcepter(excepter);_connections.insert(std::make_pair(sockfd, conn));logMsg(DEBUG, "添加新连接到connections成功: %d", sockfd);
}

该函数的实现包括了:将文件描述符交给epoll实例进行监管,创建对应的Connection对象,建立该文件描述符与Connection之间的映射关系并将其添加到unordered_map集合中。

2.6.4 Dispatcher事件派发函数

该函数用于对就绪的文件描述符进行事件的派发:

void TcpServer::Dispatcher()
{int n = Epoller::Loop(_epfd, _revs, revs_num);for (int i = 0; i < n; ++i){int sock = _revs[i].data.fd;uint32_t revent = _revs[i].events;// 如果出现错误,将错误统一交给读写事件if (revent & EPOLLERR)revent |= (EPOLLIN | EPOLLOUT);if (revent & EPOLLHUP)revent |= (EPOLLIN | EPOLLOUT);// 读事件就绪if (revent & EPOLLIN){// 先判断map中是否有sock的映射关系,再判断回调函数是否被设置if (IsExists(sock) && _connections[sock]->_recver){_connections[sock]->_recver(_connections[sock]);}}// 写事件就绪if (revent & EPOLLOUT){// 先判断map中是否有sock的映射关系,再判断回调函数是否被设置if (IsExists(sock) && _connections[sock]->_sender){_connections[sock]->_sender(_connections[sock]);}}}
}

在该函数的实现中,首先调用Epoller中的Loop成员函数获取事件就绪的文件描述符,然后在遍历就绪的文件描述符进行事件的派发,如果其中的文件描述符出现了异常错误,则将其统一交给读写事件,因为在读写事件处理的函数中也调用了对异常的处理函数。

2.6.5 EnableReadWrite函数

该函数用于修改指定文件描述符中epoll实例中被关系的事件:

 void TcpServer::EnableReadWrite(int sock, bool readable, bool writeable){uint32_t event = 0;event |= (readable ? EPOLLIN : 0);event |= (writeable ? EPOLLOUT : 0);Epoller::ModEvent(_epfd, sock, event);}

因为在最初添加文件描述符到epoll中时只设置了关心读事件,原因是在刚建立连接时,写事件是就绪的,最后续代码运行过程中条件发生了改变,再对epoll要关心的事件进行修改。

三、简单的业务处理

这里实现一个简单的网络版本计算器,客户端连接服务器,以特定的格式向服务器发送运算请求,然后服务器响应运算结果给客户端。

3.1 简单协议定制


struct Request
{int x;int y;char op;
};struct Response
{int code;int result;
};bool Deserialize(std::string &in, Request *req)
{// 1 + 1, 2 * 4, 5 * 9, 6 *1std::size_t spaceOne = in.find(SPACE);if (std::string::npos == spaceOne)return false;std::size_t spaceTwo = in.rfind(SPACE);if (std::string::npos == spaceTwo)return false;std::string dataOne = in.substr(0, spaceOne);std::string dataTwo = in.substr(spaceTwo + SPACE_LEN);std::string oper = in.substr(spaceOne + SPACE_LEN, spaceTwo - (spaceOne + SPACE_LEN));if (oper.size() != 1)return false;// 转成内部成员req->x = atoi(dataOne.c_str());req->y = atoi(dataTwo.c_str());req->op = oper[0];return true;
}void Serialize(const Response &resp, std::string *out)
{// "exitCode_ result_"std::string ec = std::to_string(resp.code);std::string res = std::to_string(resp.result);*out = ec;*out += SPACE;*out += res;*out += CRLF;
}

3.2 业务处理函数

calculator函数用于简单的计算逻辑:

using service_t = std::function;static Response calculator(const Request &req)
{Response resp = {0, 0};switch (req.op){case '+':resp.result = req.x + req.y;break;case '-':resp.result = req.x - req.y;break;case '*':resp.result = req.x * req.y;break;case '/':{ // x_ / y_if (req.y == 0)resp.code = -1; // -1. 除0elseresp.result = req.x / req.y;}break;case '%':{ // x_ / y_if (req.y == 0)resp.code = -2; // -2. 模0elseresp.result = req.x % req.y;}break;default:resp.code = -3; // -3: 非法操作符break;}return resp;
}

main函数代码:

#include "TcpServer.hpp"
#include "Service.hpp"
#include using namespace std;static void usage(std::string process)
{cerr << "\nUsage: " << process << " port\n"<< endl;
}
int BeginHandler(Connection *conn, std::string &message, service_t service)
{// 我们能保证,message一定是一个完整的报文,因为我们已经对它进行了解码Request req;// 反序列化,进行处理的问题if (!Deserialize(message, &req)){// 写回错误消息return -1;// 可以直接关闭连接// conn->excepter_(conn);}// 业务逻辑Response resp = service(req);std::cout << req.x << " " << req.op << " " << req.y << std::endl;std::cout << resp.code << " " << resp.result << std::endl;// 序列化std::string sendstr;Serialize(resp, &sendstr);// 处理完毕的结果,发送回给clientconn->_outbuffer += sendstr;//std::cout << conn->_outbuffer << std::endl;// conn->_sender(conn);// if(conn->_outbuffer.empty()) conn->_R->EnableReadWrite(conn->_sock, true, false);// else conn->_R->EnableReadWrite(conn->_sock, true, true);// conn->R_->EnableReadWrite(conn->sock_, true, true);// conn->_sender()std::cout << "这里就是上次的业务逻辑啦 --- end" << std::endl;return 0;
}// 1 + 1X2 + 3X5 + 6X8 -> 1 + 1
int HandlerRequest(Connection *conn, std::string &message)
{std::cout << "HandlerRequest" << std::endl;return BeginHandler(conn, message, calculator);
}int main(int argc, char *argv[])
{if (argc != 2){usage(argv[0]);exit(0);}unique_ptr server(new TcpServer(HandlerRequest, atoi(argv[1])));server->Run();return 0;
}

3.4 运行服务器

首先启动服务器,可以看到3号文件描述符被添加到connections集合中,3号描述符就是监听的套接字描述符:

由于没有实现客户端程序,就使用telnet工具充当客户端:

此时可以发现新增了一个5号文件描述符的连接,因为4号描述符就是epoll实例的文件描述符,所以第一个客户端连接的文件描述符就是5号。

此时客户端向服务端发起指定格式的计算请求:

注意这里定制的协议就是每个计算请求以“X”作为分隔符,每个请求中的数字与运算符之间以空格隔开。

因为这里采用的是epoll多路复用的方式,虽然该服务器是单进程的,但是却可以同时为多个客户端提供服务:

当一个客户端退出时,其对应的文件描述符已经相关资源也会被释放,下一次客户端再进行连接的时候,就会使用空出来的这个文件描述符:

四、总结

当前服务器存在的弊端:
虽然当前服务器已经实现了多路复用的功能了,但是处理连接请求和业务逻辑等所有的工作都是由当前的服务器来完成的。况且当前的业务逻辑只是进行简单的运算,处理的连接数量也很少,因此对应服务器而言并没有什么压力。如果需要处理更加复杂的业务逻辑或者是同时面临大量的连接,那么此时服务器就会显得有些吃力了。

解决思路:
可再当前服务器的基础之上引入线程池,当读事件回调函数读取完相关的请求之后,就可以将响应的业务逻辑交给线程池进行处理,当线程池处理完毕之后,再将处理结果返回给服务器,由服务器组织响应给客户端程序。此时服务器就可以只关心连接请求,读写异常事件的处理,而将业务逻辑的处理交给线程池,就大大降低了服务器的负担,从而也能够处理更多的连接请求了。

线程池代码:

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include "Lock.hpp"
#include "Log.hpp"using namespace std;const uint32_t gDefaultNum = 5;//改造为单例模式:template 
class ThreadPool
{
private:ThreadPool(uint32_t threadNum = gDefaultNum): _isStart(false),_threadNum(threadNum){assert(threadNum > 0);pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}ThreadPool(const ThreadPool &) = delete;ThreadPool &operator=(const ThreadPool &) = delete;public:static ThreadPool *getInstance(){static Mutex mutex;if (nullptr == instance) //限定LockGuard的生命周期{LockGuard lockGurand(&mutex); //RAIIif (nullptr == instance){instance = new ThreadPool();}}return instance;}static void *threadRoutine(void *args){ThreadPool *ptp = static_cast *>(args);while (true){ptp->lockQueue();//判断当前任务队列有没有任务while (!ptp->hasTask()){//没有任务,当前线程等待ptp->waitTask();}//当前线程处理任务T t = ptp->pop();ptp->unlockQueue();Log() << "新线程完成任务:" << t.run() << endl;}}void start(){assert(!_isStart); //判断线程是否已经启动for (int i = 0; i < _threadNum; ++i){pthread_t tid;pthread_create(&tid, nullptr, threadRoutine, this);}_isStart = true;}//放任务void push(const T &in){lockQueue();_taskQueue.push(in);choiceThreadHandle();unlockQueue();}//消费任务T pop(){T tmp = _taskQueue.front();_taskQueue.pop();return tmp;}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:void lockQueue(){pthread_mutex_lock(&_mutex);}void unlockQueue(){pthread_mutex_unlock(&_mutex);}void waitTask(){pthread_cond_wait(&_cond, &_mutex);}void choiceThreadHandle(){pthread_cond_signal(&_cond);}bool hasTask(){return !_taskQueue.empty();}private:bool _isStart;          //判断线程池是否开启uint32_t _threadNum;    //线程池中线程数量queue _taskQueue;    //任务队列pthread_mutex_t _mutex; //保护任务队列的锁pthread_cond_t _cond;   //线程的条件变量static ThreadPool *instance;
};template 
ThreadPool *ThreadPool::instance = nullptr;

创建并启动线程池:

unique_ptr> tp(ThreadPool::getInstance());tp->start();

实现一个任务类Task:

#include 
#include 
#include "Protocol.hpp"
#include "TcpServer.hpp"class Task
{
public:Task(Request &req, Connection *conn) : _req(req), _conn(conn){}int operator()(){return run();}int run(){Response resp = {0, 0};switch (_req.op){case '+':resp.result = _req.x + _req.y;break;case '-':resp.result = _req.x - _req.y;break;case '*':resp.result = _req.x * _req.y;break;case '/':{if (_req.y == 0){resp.code = -1;}else{resp.result = _req.x / _req.y;}break;}case '%':{if (_req.y == 0){resp.code = -2;}else{resp.result = _req.x % _req.y;}break;}default:resp.code = -3; // -3: 非法操作符break;}std::cout << resp.code << ":" << resp.result << std::endl;std::string sendstr;Serialize(resp, &sendstr);_conn->_outbuffer += sendstr;if (_conn->_outbuffer.empty())_conn->_R->EnableReadWrite(_conn->_sock, true, false);else_conn->_R->EnableReadWrite(_conn->_sock, true, true);return resp.result;}private:Request _req;Connection *_conn;
};

运行展示:

相关内容

热门资讯

446061-19-4,DOT... DOTA-p-苯-氨基-四叔丁酯基本信息DOTA-p-苯-氨基-四叔丁酯,又称S-2-...
信息时代的必修课:冗余度(善用... 文章目录 引言I 冗余度1.1 冗余度的定义1.2 冗余度的好处1.3 信息冗余的问题1.4 善用信...
从头开始完成一个STM32例程 创建新项目 Project-> New,之后选择自己的开发板芯片 确定之后又跳到运行...
同事之间合作合同范本优选10... 同事之间合作合同范本 第一篇协议双方甲方:法人代表:《企业法人营业执照》注册号:乙方:法人代表:《企...
艺术作品授权合同范本(优选1... 艺术作品授权合同范本 第一篇作品名称:本人同意委托 作为办理该合作作品(以下简称本作品)出版事宜的代...
使用Spring Boot和C... 原理 Spring Boot是一个基于Spring框架的快速开发应用程序的框架,其提供...
水利合同范本优选14篇 水利合同范本 第一篇发包方:承包方:拟承包魏岗村下周组下周大塘清淤工程,接受了魏岗村下周组的投标,双...
宁德大型仓库租赁合同范本(通... 宁德大型仓库租赁合同范本 第一篇甲方:乙方:第一条:场地用途第二条:期限自年月日起至年月日止。第三条...
超越想象,博睿数据3D数字展厅... 历经多月精心打磨 博睿数据3D数字展厅正式上线 带来一个有温度、易操作、更全面的 线上形象展览平台 ...
承包小区保洁工程合同范本(通... 承包小区保洁工程合同范本 第一篇甲方:___________________乙方:_________...
垫资合同范本 垫资合同范本  现今社会公众的法律意识不断增强,合同的使用频率呈上升趋势,签订合同是减少和防止发生争...
直播中控合同范本共6篇 直播中控合同范本 第一篇1、甲方有义务按照本协议约定向乙方支付直播合作劳务费用,支付时间为每个月15...
法律意见书 导语:法律意见书是律师提供法律服务的一种综合性的书面文件,其内容包括向咨询者提供法律依据、法律建议以...
Stable Diffusio... Stable Diffusion是一个文本到图像的潜在扩散模型,由CompVis、St...
IBMMQ常用命令(七) 1. 队列管理器 crtmqm -q 名字            创建队列管理器 strmqm 名...
冰库买卖合同范本大全(精选2... 冰库买卖合同范本大全 第一篇甲方:________________乙方:______________...
代码随想录刷题-链表-删除链表... 删除链表的倒数第N个节点 本节对应代码随想录中:代码随想录,讲解视频&#...
盘锦企业培训合同范本40篇 盘锦企业培训合同范本 第一篇订立合同各方:培训单位:____学院(或学校),以下简称甲方;委托培训单...
正规房屋购买合同范本 正规房屋购买合同范本  随着法律观念的深入人心,能够利用到合同的场合越来越多,它也是实现专业化合作的...
河边茶园租赁合同范本推荐8篇 河边茶园租赁合同范本 第一篇甲方:_______________________乙方:_______...