使用 C++ 构建简易网络服务器
本文的目标是构建一个简易的网络通信服务器,服务器使用 C++ 实现、运行在 linux 之上。服务器采用 master-worker 的进程模型,利用 epoll 实现网络 IO 的多路复用,并利用线程池实现业务逻辑的并发处理。
所谓简易是指期望在最小化代码量的前提下,实现一个具备基本功能、可扩展的网络服务器框架。
进程模型¶
使用 master-worker 的进程模型。程序由 1 个 master 进程和若干个 worker 进程组成。其中:
- master 进程作为守护进程存在,不负责具体的业务,只负责管理。master 进程负责配置文件加载、创建监听端口和信号处理等。
- worker 进程作为 master 的子进程,继承 master 创建的监听 socket fd,以抢占式的方法接收来自客户端的连接请求,并负责维护自己的事件循环、实现完整的网络 I/O,每个 worker 另有自己的线程池,用于处理具体业务。
code 1
// server.cpp, main()
// 1. 加载配置
Config config;
// 2. 创建监听 socket
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
perror("socket");
return 1;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(config.port);
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
if (bind(listen_fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
perror("bind");
return 1;
}
if (listen(listen_fd, 128) == -1) {
perror("listen");
return 1;
}
// 3. 创建 worker 进程
std::vector<pid_t> workers;
for (int i = 0; i < config.worker_num; ++i) {
pid_t pid = fork();
if (pid == 0) {
// worker 进程
WorkerProcess(listen_fd, config);
return 0;
} else if (pid > 0) {
// master 进程
workers.push_back(pid);
} else {
perror("fork");
}
}
// 4. master 进程等待子进程退出
while (wait(NULL) > 0);
return 0;
网络 I/O¶
处理连接请求¶
前文提到,master 进程负责创建监听窗口,即只负责 socket()、bind()、listen()。而接受来自客户端的连接的操作 accept() 是在各个 worker 中实现的。
由于现代 linux 内核对 accept() 的优化,使得我们不需要实现一个专门的分配器来手动地控制 worker 接受连接请求。只需要让每个 worker 尝试 accept() 即可。操作系统内核会维护一个等待队列,当所有的 worker 都在监听同一个 socket 时,内核将会根据特定的调度算法决定唤醒等待队列中的某一个 worker。
连接产生的新的 socket 使用非阻塞模式。使用水平触发的 epoll 复用网络 I/O。初始只向 epoll 注册 EPOLLIN,即只关注该 socket 的可读事件发生。
code 2
// server.cpp, in Worker class
void AcceptConnection() {
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(this->listen_fd, (struct sockaddr*)&client_addr, &client_len);
if (client_fd > 0) {
int flags = fcntl(client_fd, F_GETFL, 0);
fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = client_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
perror("epoll_ctl: client_fd");
close(client_fd);
return;
}
this->connections[client_fd] = std::make_shared<Connection>(client_fd);
}
}
连接池¶
为每个 socket fd 分配的按 fd 检索的结构体,以存储与该连接有关的一切必要数据。其中比较重要的有:
- 每个连接都需要有自己的字节缓冲区,收发各一个,分别用于处理半包/粘包问题和一次写不完的问题。socket 和 worker 在此处交接数据。
- 每个连接都需要有自己的收发缓冲区,收发各一个,用于存放分割完整的完整电文体。worker 和业务线程在此处交接数据。
- 发缓冲区需要用互斥锁保护,由于 worker 线程和多个业务线程需要互斥访问该缓冲区。
这里使用两组四个缓冲区的方案,其目标是尽可能解耦各个功能模块。现在 worker 只负责搬运字节和协议转换,业务线程只负责处理逻辑对象。
以及可选项,此处举一例:
- 每个连接记录自己的最后活动时间,用来关闭长时间不活跃的连接
由于每个 worker 都独立地尝试 accept() ,因此每个 worker 都维护自己的连接池,如先前的代码已经涉及的那样。使用 std::make_shared<Connection> 来创建连接连接对象并生成智能指针作为连接池的元素,是因为连接对象在将来可能会被业务线程引用,如果业务线程还在处理连接时,worker 已经关闭了该连接,那么就会在业务线程中遇到悬空指针的问题。使用智能指针可以确保连接对象在没有任何引用时才被释放。
由于需要按名查找,使用哈希表实现连接表是比较合适的。
code 3
// Connection.h
struct Connection {
int fd;
// 字节流缓冲
std::string byte_read_buf; // 接收到的原始字节
std::string byte_write_buf; // 等待发送的原始字节
// 报文缓冲
std::queue<std::string> msg_read_queue; // 解析完成的报文体
std::queue<std::string> msg_write_queue; // 待发送的报文体
std::mutex msg_write_mtx; // msg_write_queue 互斥访问
time_t last_active;
Connection(int f) : fd(f) {
last_active = time(nullptr);
}
};
向 socket 读写¶
数据流:从 socket 到读字节缓冲区;或从写字节缓冲区到 socket。
从 socket 读:等待 EPOLLIN -> 循环读取数据 -> 获得 EAGAIN -> 结束。
向 socket 写:直接写入。此时会有两种可能:
case 1. 写入成功 -> 结束。
case 2. 获得 EAGAIN -> 注册 EPOLLOUT 事件 -> 等待 EPOLLOUT 事件 -> 向 socket 写数据 -> 写完则下一步,未写完则回到 case 2 起点处 -> 注销 EPOLLOUT 事件 -> 结束。
code 4
代码片段的注释中,使用数字 `1` ~ `7` 标记了读写数据的完整流程。// server.cpp, in Worker::Run(), main epoll loop
while (true) {
int nfds = epoll_wait(epoll_fd, events, 1024, 1000);
if (nfds == -1) {
if (errno == EINTR) continue;
perror("epoll_wait");
break;
}
for (int i = 0; i < nfds; ++i) {
int fd = events[i].data.fd;
if (fd == this->listen_fd) {
// case 1: 新连接
AcceptConnection();
} else if (fd == this->wakeup_fd) {
// case 2
// 4. 业务线程完成任务,唤醒 worker 主线程处理待发送数据
uint64_t val;
read(this->wakeup_fd, &val, sizeof(val));
for (auto& kv : this->connections) {
// 5. 将业务线程产生的响应拼装成报文并放到字节缓冲区
ProcessWriteQueue(kv.second);
// 6. 尝试发送字节缓冲区的数据
HandleWrite(kv.second);
}
} else {
// case 3: 已连接 socket 的读写事件
auto it = this->connections.find(fd);
if (it == this->connections.end()) continue;
std::shared_ptr<Connection> conn = it->second;
if (events[i].events & EPOLLIN) {
// 1. 尝试从 socket 读取数据到字节缓冲区;如果返回 false 则表示连接已关闭或出错,跳过后续处理
if (!HandleRead(conn)) continue;
// 2. 尝试切分报文放到报文缓冲区;如果返回 false 则表示报文协议解析出错,此时在解析函数内已经关闭连接,跳过后续处理
if (!ProcessReadBuffer(conn)) continue;
// 3. 尝试从报文缓冲区构造任务,交给业务线程并处理
ProcessTasks(conn);
}
if (events[i].events & EPOLLOUT) {
// 7. 这意味着一次写没有写完,当有 EPOLLOUT 事件时继续写
HandleWrite(conn);
}
}
}
// 每次循环结束后的时隙,清理不活跃连接
CleanInactiveConnections();
}
code 4a
// server.cpp, in Worker class
bool HandleRead(std::shared_ptr<Connection> conn) {
char buf[4096];
while (true) {
ssize_t n = read(conn->fd, buf, sizeof(buf));
if (n > 0) {
conn->byte_read_buf.append(buf, n);
conn->last_active = time(nullptr);
} else if (n == -1 && errno == EAGAIN) {
break;
} else {
CloseConnection(conn);
return false;
}
}
return true;
}
// server.cpp, in Worker class
void HandleWrite(std::shared_ptr<Connection> conn) {
if (conn->byte_write_buf.empty()) return;
ssize_t n = write(conn->fd, conn->byte_write_buf.data(), conn->byte_write_buf.size());
if (n > 0) {
conn->byte_write_buf.erase(0, n);
conn->last_active = time(nullptr);
}
struct epoll_event ev;
ev.data.fd = conn->fd;
if (!conn->byte_write_buf.empty()) {
ev.events = EPOLLIN | EPOLLOUT;
} else {
ev.events = EPOLLIN;
}
epoll_ctl(epoll_fd, EPOLL_CTL_MOD, conn->fd, &ev);
}
断开连接¶
worker 也需要负责处理连接的断开和断开连接。
被动断开:当 read 返回 0 时,表示对端关闭了连接。此时我们需要关闭 socket,从 epoll 中移除,并释放连接对象。HandleRead() 内部已经实现了被动断开逻辑。
主动断开:当服务器检测到错误、超时或业务逻辑要求断开时,主动发起关闭。需要从 epoll 删除监听、关闭 fd、从连接池移除并释放对象。
code 5
主动断开连接的操作可以用来实现自动清除不活跃的连接:在 epoll 循环的空闲间隙,遍历连接池,清理超时连接。
code 6
// server.cpp, in Worker class
void CleanInactiveConnections() {
time_t now = time(nullptr);
for (auto it = this->connections.begin(); it != this->connections.end(); ) {
if (difftime(now, it->second->last_active) > this->config.keep_alive_timeout / 1000) {
std::shared_ptr<Connection> conn = it->second;
it = this->connections.erase(it);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, conn->fd, nullptr);
close(conn->fd);
} else {
++it;
}
}
}
应用层协议¶
报文格式定义¶
使用专门的类管理报文格式,这样可以将切分字节流的逻辑独立并集中管理,更具拓展性。定义一个类 Protocol,包含两个核心静态方法分别为:
TryDecode(buffer, &message): 尝试从缓冲区解析报文。返回状态:完整、不完整、错误。Encode(message, &buffer): 将业务消息序列化为字节流。
在 worker 中就只需要调用 Protocol::TryDecode() 和 Encode() 即可。
作为例子,使用固定包头长度的协议,定义报文头长度为 4B,以十进制 ASCII 码给出报文体的长度。
code 7
// Protocol.h
class Protocol {
public:
// TryDecode 返回 0 代表不完整;-1 代表错误;>0 代表完整包的长度
static int TryDecode(const std::string& buf, std::string& out_msg) {
if (buf.size() < 4) return 0;
try {
int len = std::stoi(buf.substr(0, 4));
if (buf.size() < 4 + (size_t)len) return 0;
out_msg = buf.substr(4, len);
return 4 + len;
} catch (...) { return -1; }
}
static void Encode(const std::string& msg, std::string& out_buf) {
char len_str[5];
snprintf(len_str, 5, "%04lu", msg.size());
out_buf.append(len_str);
out_buf.append(msg);
}
};
处理粘包/拆包¶
数据流:从字节缓冲区到报文缓冲区。
从接收字节流中提取出完整报文,将报文体部分放到接收报文的缓冲区,等待组装成任务并交由业务线程处理。
由于上文实现了从字节流中切分出报文的方法,而在从 socket 读数据的时候我们已经把所有的数据按字节为最小单元放在了缓冲区中,worker 中需要做的事情就只有调用该方法、检查返回值、稍做处理放到报文缓冲区。
code 8
// server.cpp, in Worker class
bool ProcessReadBuffer(std::shared_ptr<Connection> conn) {
std::string msg;
int len;
while (true) {
len = Protocol::TryDecode(conn->byte_read_buf, msg);
if (len > 0) {
conn->byte_read_buf.erase(0, len);
conn->msg_read_queue.push(msg);
} else if (len == 0) {
break;
} else {
CloseConnection(conn);
return false;
}
}
return true;
}
void ProcessTasks(std::shared_ptr<Connection> conn) {
while (!conn->msg_read_queue.empty()) {
std::string msg = conn->msg_read_queue.front();
conn->msg_read_queue.pop();
CreateTask(conn, msg);
}
}
组装报文¶
数据流:从报文缓冲区到字节缓冲区。
从发送报文缓冲区中提取出业务线程生成的报文体,组装成完整报文,放到发送字节流的缓冲区。
code 9
// server.cpp, in Worker class
void ProcessWriteQueue(std::shared_ptr<Connection> conn) {
std::lock_guard<std::mutex> lock(conn->msg_write_mtx);
while (!conn->msg_write_queue.empty()) {
std::string msg = conn->msg_write_queue.front();
conn->msg_write_queue.pop();
Protocol::Encode(msg, conn->byte_write_buf);
}
}
线程模型¶
目的:分出专门的线程来处理业务逻辑,避免业务逻辑阻塞网络 I/O。
数据流:从报文缓冲到业务线程或相反,其中用流动的数据是报文体。
每个 worker 进程维护一个线程池,线程池中的线程负责处理具体的业务逻辑。收到完整报文后从线程池中取出一个线程,构造业务任务并交给线程处理。处理完成后由该线程生成响应报文并返回。
构造任务队列¶
任务队列:通过维护一个 std::queue<std::function<void()>> 来实现,任务队列的每一个任务是一个 Lambda 表达式,表示期望让线程池中的线程执行的代码,形如:
auto task = [=]() {
std::string response = ProcessBusiness(msg);
SendResponse(client_fd, response);
};
由于任务队列被多个线程访问,因此需要使用互斥锁保护。
code 10
// server.cpp, in Worker class
void CreateTask(std::shared_ptr<Connection> conn, std::string msg) {
int wfd = this->wakeup_fd;
auto task = [conn, msg, wfd]() {
std::string response = ProcessBusiness(msg);
{
std::lock_guard<std::mutex> lock(conn->msg_write_mtx);
conn->msg_write_queue.push(response);
}
uint64_t val = 1;
write(wfd, &val, sizeof(val));
};
thread_pool.Push(task);
}
在 ProcessBusiness() 中实现业务任务。作为例子,业务处理需要实现的功能为:
- 如果报文体以
REVERSE:开头,则反转字符串。构造响应为REVERSE:+ 反转后的字符串; - 否则,计算报文体的总长度。构造响应为
LEN:+ 总长度。
code 11
线程池¶
线程池的主要目的是减少线程创建和销毁的开销。是一个生产者-消费者模型。
预先创建一组线程,让他们进入 Lambda 表达式内的 while() 循环,使用条件变量等待任务的到来,如果没有任务到来就阻塞等待,当有任务到来时唤醒线程。
线程池维护一个任务队列,当有新任务到来时,将任务放入队列并通知一个等待的线程。
code 12
// ThreadPool.h
class ThreadPool {
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
public:
ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i)
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty()) return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
void Push(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.push(task);
}
condition.notify_one();
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker: workers) worker.join();
}
};
跨线程唤醒¶
一个 naive 的想法是,当业务线程处理完任务后,将响应放入连接的发送报文缓冲区,然后直接返回。Worker 在本次 epoll 循环结束前,检查是否有待发送的业务消息,然后将业务线程产生的响应拼装成报文并放到字节缓冲区。
然而在这个模型中,存在一个时序问题:
- worker 收到报文,将任务放入线程池。
- worker 继续循环,如果没有其他网络事件,它将阻塞在
epoll_wait中。 - 业务线程处理完任务,生成响应并放入
msg_write_queue。 - 此时问题出现:在这时 worker 正在沉睡,不知道有新数据需要发送。直到下一个网络事件到来时 worker 才会醒来检查队列,才能将响应发送出去。这会导致响应延迟。
为了解决这个问题,需要想办法让业务线程在完成任务后唤醒 worker。在 Linux 中可以使用 eventfd 来实现这种跨线程通知。eventfd 是一个由内核维护的 64 位计数器,当业务线程完成任务后,向 eventfd 写入一个数值,计数器增加。当计数器大于 0 时,eventfd 变为可读,因此可以让 worker 在 epoll 中注册 eventfd 的 EPOLLIN,就能够实现唤醒 worker。
在 worker 初始化时创建 wakeup_fd,并将其添加到 epoll 监听列表中。当业务线程完成任务后,向 wakeup_fd 写入一个数值。这样,worker 在 epoll_wait 中就会被唤醒,处理 wakeup_fd 的读事件,从而知道有新的响应需要发送。在上面的代码块中有两部分涉及 this->wakeup_fd :code 10 中展示了业务线程向 wakeup_fd 写入的过程;而在 code 4 中展示了 worker 的主线程处理 wakeup_fd 的读事件的过程。
总结与组装¶
对于某一个连接,数据流图如下:
socket socket
| [worker] read ^
v | [worker] write
byte_read_buf byte_write_buf
| [worker] parse bytestream ^
v | [worker] assemble message
msg_read_buf msg_write_buf
| [worker] create task ^
v | [business thread] notify by wakeup_fd
task lambda expr --------------------------/
[business thread] process
组装与构建¶
按下面的框架组装各个代码片段为四个文件:server.cpp、ThreadPool.h、Protocol.h、Connection.h。
注释中的 file: xxx 表示该代码片段所在的文件名,// ... code X ... 表示该处插入前文中标记为 code X 的代码片段。其他注释为代码说明。
// file: Protocol.h
#pragma once
#include <string>
#include <cstdio>
#include <stdexcept>
// 报文格式定义及协议编码解码
// ... code 7 ...
// file: Connection.h
#pragma once
#include <string>
#include <queue>
#include <mutex>
#include <ctime>
// 连接对象定义
// ... code 3 ...
// file: ThreadPool.h
#pragma once
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
// 线程池实现
// ... code 12 ...
// file: server.cpp
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <sys/eventfd.h>
#include <vector>
#include <unordered_map>
#include <iostream>
#include <cstring>
#include <algorithm>
#include <memory>
#include "ThreadPool.h"
#include "Protocol.h"
#include "Connection.h"
struct Config {
int worker_num = 4;
int port = 8002;
int keep_alive_timeout = 120000;
};
class Worker {
int listen_fd;
Config config;
int epoll_fd;
int wakeup_fd;
std::unordered_map<int, std::shared_ptr<Connection>> connections;
ThreadPool thread_pool;
public:
Worker(int lfd, const Config& cfg)
: listen_fd(lfd), config(cfg), thread_pool(4) {
wakeup_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (wakeup_fd == -1) {
perror("eventfd");
exit(EXIT_FAILURE);
}
}
void Run() {
epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
struct epoll_event ev, events[1024];
ev.events = EPOLLIN;
ev.data.fd = listen_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {
perror("epoll_ctl: listen_fd");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = wakeup_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, wakeup_fd, &ev) == -1) {
perror("epoll_ctl: wakeup_fd");
exit(EXIT_FAILURE);
}
// epoll 循环
// ... code 4 ...
}
private:
// 方法:接受新的连接请求
// ... code 2 ...
// 方法:从 socket 读取数据到接收的字节流缓冲区
// 方法:从发送的字节流缓冲区向 socket 写数据
// ... code 4a ...
// 方法:关闭连接
// ... code 5 ...
// 方法:从接收的字节流缓冲区中提取出完整报文,放到接收报文缓冲区
// 方法:从接收报文缓冲区中提取报文体
// ... code 8 ...
// 方法:根据接收的报文体,创建业务任务并交给线程池
// ... code 10 ...
// 方法:从发送报文缓冲区中提取出业务线程生成的报文体,组装成完整报文,放到发送字节流的缓冲区
// ... code 9 ...
// 方法:清理不活跃连接
// ... code 6 ...
};
void WorkerProcess(int listen_fd, const Config& config) {
Worker worker(listen_fd, config);
worker.Run();
}
// 业务逻辑
// ... code 11 ...
int main() {
// 主函数,包括 master 进程的全部逻辑和创建 worker 进程
// ... code 1 ...
}
确保 server.cpp、ThreadPool.h、Protocol.h、Connection.h 四个文件在同一目录下,使用以下 Makefile 进行编译:
CXX = g++
CXXFLAGS = -std=c++17 -Wall -pthread -O2
all: server
server: server.cpp ThreadPool.h Protocol.h Connection.h
$(CXX) $(CXXFLAGS) -o server server.cpp
clean:
rm -f server
执行 make 命令进行编译,这将生成可执行文件 server。
TODO
将编译任务分散到子目录,避免手动管理每个模块的构建。通过递归调用和通用规则,实现依赖自动生成和增量编译。