作者:lczllx
邮箱:2181719471@qq.com
GitHub: lczllx
开发环境:Ubuntu VS Code
编译器:g++
编程语言:C++
| 项目 | 规格 |
|---|---|
| 云服务器 | 4C8G 5Mbps |
| 操作系统 | Ubuntu 22.04.5 LTS |
| 编译器 | g++ 12.3.0 |
| 构建工具 | CMake 3.22.1 |
git clone https://github.com/lczllx/muduo.git
cd muduo
mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=Release
make -j$(nproc)
../bin/test # Echo 服务
../bin/http_server # HTTP 服务(/hello 等路由)
../bin/shortener_server # 短链接服务(需 MySQL + Redis)短链接服务依赖 MySQL 和 Redis,可通过环境变量配置:
export MYSQL_HOST=127.0.0.1 MYSQL_PORT=3306 MYSQL_USER=root MYSQL_PASS=shortener MYSQL_DB=shortener
export REDIS_HOST=127.0.0.1 REDIS_PORT=6379
../bin/shortener_serverDocker(仓库根目录 Dockerfile,Alpine 3.19 多阶段构建):
docker build -t lcz-muduo .
docker run -p 8080:8080 lcz-muduo压测复现(需要 wrk):
./scripts/bench/run_qps_wrk.sh # keep-alive 基准 QPS
./scripts/bench/run_qps_wrk.sh --100k # 十万长连接 QPS| 项目 | 数值 |
|---|---|
| 总行数 | 7,186 |
| 自己写的行数 | 5,571(不含空行/注释) |
| 核心库 (src+include) | 3,867 行 |
| HTTP+Shortener (net/) | 2,229 行 |
| 源文件数 | 45 |
| 单元测试 | 无(仅 example/test.cc 和 concurrent_test.cc 两个手工测试,共 299 行) |
| 测试覆盖率 | ~0% |
- 无单元测试:未引入 GTest,正确性仅靠手工 echo/HTTP 示例验证,回归依赖人工。
- MySQL 同步阻塞 I/O 线程:Shortener 的 shorten 和 redirect 两条路径中,Redis miss 回源均走
mysql_query()同步调用(单次 0.5–2ms),阻塞 Reactor I/O 线程,长尾延迟恶化。 - Redis Acquire() 阻塞 I/O 线程:池满时
Acquire()的_cond.wait()无超时,永久阻塞持锁的 Reactor I/O 线程;且池内连接断开后的redisReconnect()/redisConnect()是同步调用,可能阻塞数百毫秒。 - TMP_ 占位垃圾残留:两步写入(INSERT TMP_ → UPDATE)在进程崩溃时可能遗留 TMP_ 开头的垃圾行,DB 中无清理机制。
- _next_id 共用计数器:
NewConnection()和RunAfterInLoop()共用_next_id自增,虽然后者将 task ID 注册在 base loop 时间轮,前者将连接 ID 注册在 sub loop 时间轮,不会互相取消,但语义不清,后续增加定时器功能时容易踩坑。 - 单 Acceptor 瓶颈:单 listener 在极端短连接场景(>5 万 accept/s)可能成为瓶颈,多核下可改用
SO_REUSEPORT+ 多 Acceptor。 - 无背压机制:输出缓冲区无上限,慢客户端可撑爆服务端内存。
- HTTP 解析不完整:不支持 chunked 编码、Trailer 头、
Expect: 100-continue。 - 日志系统
vasprintf开销:每条日志调用vasprintf分配堆内存格式化,高频路径下 malloc/free 对性能有可见影响,可改用栈上固定缓冲区 +vsnprintf。 - 无安全机制:无 TLS、无连接数限制、无请求速率限制。
- 时间轮精度粗:1s tick,不适用于毫秒级超时。
项目编译时是基于cmake的,拉取后使用cmake编译即可
# 克隆项目
git clone https://github.com/lczllx/muduo.git
cd muduo
# 编译项目
mkdir build && cd build
cmake ..
make -j$(nproc)
# 运行示例(从build目录)
../bin/test # Echo 服务
../bin/http_server # HTTP 服务(含 /hello 等路由)
# 或者从项目根目录运行
./bin/test
./bin/http_server在项目源码的/autoinstall 目录里面有一个install.sh文件,直接用root权限运行即可编译和安装
写了一个简单的回显服务器用于测试,在项目源码的/example 目录下的test.cc
./scripts/bench/run_qps_wrk.sh- wrk keep-alive 压测(需先安装wrk)./scripts/bench/run_qps_wrk.sh --100k- 十万长连接下 QPS 压测(需 ulimit -n >= 110000)./scripts/bench/run_http_server_for_100k.sh- 十万并发下启动 HTTP 服务./scripts/bench/run_server_for_100k.sh- 十万并发下启动 Echo 服务./scripts/bench/check_limits.sh- 检查系统对十万并发的限制./scripts/bench/test_qps_cpu.sh- 1 万 / 十万长连接下 QPS + CPU 采样./scripts/bench/test_100k_qps_memory.sh- QPS + 内存(RSS/VSS)采样sudo ./scripts/bench/setup_100k_limits.sh- sysctl 调优(十万并发前建议执行)- 脚本说明汇总:
docs/BENCH_SCRIPTS.md
性能、路由与序列化等说明均放在 docs/,例如:PERFORMANCE_OPTIMIZATION.md、CPU_UTILIZATION_ANALYSIS.md、QPS_VARIATION_ANALYSIS.md、PERFORMANCE_TARGET.md、ROUTE_OPTIMIZATION_GUIDE.md、PROTOBUF_INTEGRATION_PLAN.md 等。
服务端与客户端为同内网两台 4C8G 云服务器(192.168.175.86 / 192.168.175.87),服务端独占 CPU,无本地 wrk 争抢。
| 场景 | 工具 | QPS | P50 | P99 | 带宽 | 说明 |
|---|---|---|---|---|---|---|
| 公网 c10000 | wrk -t4 | 2,838 | 33ms | 1.01s | 385 KB/s | 5M 公网带宽瓶颈 |
| 内网 c10000 keep-alive | wrk -t4 | 150,853 | 41ms | 1.49s | 20 MB/s | 基准 QPS |
| 内网 c20000 keep-alive | wrk -t4 | 134,645 | 41ms | 1.60s | 17.9 MB/s | 超时 9.4 万,已接近瓶颈 |
| 内网 c10000 短连接 | wrk -t4 -H "Connection: close" | 4,806 | 718ms | 1.83s | 718 KB/s | 每次新建 TCP,QPS 仅为 keep-alive 的 3.2% |
| 内网 6 万长连接驻留 | wrk -t4 c10000 | 149,955 | 19ms | 504ms | 19.9 MB/s | QPS 仅降 0.6%,连接数不构成吞吐瓶颈 |
关键结论:
- 内网 QPS 上限 15 万(/hello 接口,响应体 ~140B,WARN 日志级别)
- 公网 5M 带宽下 QPS 仅 2838,带宽是真实瓶颈,不是服务端
- 短连接 QPS 4806,TCP 三次握手+四次挥手开销占主导
- 6 万长连接驻留 + 1 万活跃连接,QPS 基本持平,epoll 轮询开销可忽略
- 单连接 RSS ~1.5KB(Connection + 2×1024 Buffer + 内核 socket 缓冲区映射)
2c2g-webench-c1000-3759qps(有同步异步日志) → 2c2g-webench-c1000-12532qps(优化正则表达式) → 2c2g-wrk-c10000-16379qps(工具切换 wrk keep-alive) → 4c8g-wrk-c10000-30000(关闭同步异步日志输出,日志类+Debug 日志限制等级) → 4c8g-wrk-c10000-40000(优化字符串预分配、http 状态码表改为静态、正则预编译,各加 ~10%) → 4c8g-wrk-c10000-115493(重新添加同步异步日志,提升至 WARN 等级,RSS 增量 300MB) → 4c8g 同局域网跨机器 wrk-c10000-150853(内网独立压测机,服务端独占 CPU)
云服务器(4c8g 5M 带宽,WARN 日志级别):
| 环境 | 接口 | 工具 | QPS | 内存/CPU | 说明 |
|---|---|---|---|---|---|
| 4c8g 云 | /hello | wrk 4线程 1万连接 | 115,493 | RSS: 96MB | 空载,WARN 日志 |
| 4c8g 云 | /hello | wrk 4线程 1万连接 | 110,154 | RSS: 547MB | 20 万长连接下,WARN 日志(QPS 仅降 4%) |
| 4c8g 云 | /hello | wrk 4线程 1万连接 | 40,352 | RSS: 212MB | 空载,DEBUG 日志(历史) |
| 4c8g 云 | /hello | wrk 4线程 1万连接 | 37,953 | CPU: 156% | 空载,DEBUG 日志 |
| 4c8g 云 | /hello | wrk 4线程 1万连接 | 34,330 | CPU: 146% | 20 万长连接下,DEBUG 日志 |
| 4c8g 云 | /hello | WebBench 1万 并发 | 29,799 | - | 短连接 |
| 4c8g 云 | /hello | WebBench 1000 并发 | 35,012 | - | 短连接 |
| 2c2g | /hello | WebBench 2000 | 12,500+ | - | 历史数据 |
- 普通 HTTP(无长连接): QPS 115,493,平均 RSS 96MB,峰值 RSS 156MB
- 20 万长连接下: QPS 110,154(仅降 4%),平均 RSS 547MB,峰值 RSS 608MB
- 维持 20 万连接内存增量: 308MB(156 → 464MB)
- 单连接内存开销: ~1.54KB(Connection 对象 + 收发 Buffer 预分配 2×1024 + 内核 socket 缓冲区映射)
日志级别影响: WARN 级别短路 DEBUG/INFO 日志,QPS 从 4 万提升至 11.5 万(约 2.9×)。
- wrk(推荐):默认 keep-alive,复用连接;4 核机建议
-t4 - WebBench:每请求新建连接(Connection: close),测的是建连+处理能力,QPS 会偏低
- 跨机器内网压测:客户端与服务端分离,服务端独占 CPU,避免 wrk 和服务端争抢
- 公网 vs 内网:公网 QPS 受限于 5M 带宽,内网实测带宽可达 20+ MB/s
- 1000 并发: 13+ MB/s
- 500 并发: 13+ MB/s
- 单连接: 47 MB/s(10MB 文件)
- 10 并发: 74 MB/s(总吞吐量)
基于 Reactor 构建的短链接服务,提供短码生成与 302 重定向跳转。
-
短码生成:Base62 编码自增 ID,MySQL
code字段utf8mb4_bin唯一索引防冲突。写入时先用TMP_<pid>_<seq>占位获取自增 ID,再 UPDATE 为 Base62(ID),多进程并发安全 -
缓存策略:Redis 热点缓存,Cache-Aside 模式(读未命中 → 回源 MySQL → 回填 Redis)。Redis 4 连接池化,Acquire/Release 仅锁队列操作,I/O 在临界区外执行
-
数据存储:MySQL 存长短链映射,连接池管理长连接
-
已知瓶颈:Redis miss 后同步调
mysql_query()会阻塞 Reactor I/O 线程(单次 0.5-2ms),后续可引入异步 MySQL 客户端解耦
Docker 多阶段构建(Alpine 3.19),Dockerfile 位于仓库根目录。
这是一个基于 C++11 开发的高性能网络库,主要是将陈硕的 muduo 网络库核心代码进行重写,将原来依赖boost库的地方都替换成了C++ 11语法,主要是为了学习、了解网络库的架构和组成,以及对网络编程的知识进行复习
在muduo库中采用的是reactor模型,Reactor模型是什么呢 Reactor:即非阻塞同步I/O模型,可以这么理解---应用程序向内核注册感兴趣的事件(可读,可写),内核在事件就绪时通知应用程序,应用程序自己执行实际的I/O操作 Proactor:异步I/O模型。应用程序发起I/O操作时,不仅注册事件,还提供缓冲区。事件来了,内核完成整个I/O操作(如读取数据到缓冲区)后,再通知应用程序
reactor模型在实际设计中大概有以下几个部分: Event:事件 Reactor:反应堆 Demultiplex:多路事件分发器 EventHandler:事件处理器
调用关系:
- EventHandler注册Event到Reactor
- Reactor将Event交给Demultiplex进行监控
- Demultiplex检测到Event就绪,通知Reactor
- Reactor将就绪的Event分发给对应的EventHandler处理
上面是一个reactor反应堆中所执行的流程,在muduo代码中的关系如下:
可以看到,EventLoop就是reactor,每一个都执行在一个线程上,形成one thread one loop的设计。每一个EventLoop中,有一个Poller和很多Channel,实现对多个连接的管理,Poller对应的就是Demultiplex(多路事件分发器),Channel对应的就是Event(事件)
但是作为支撑高并发的网络库,单线程往往不能达到想要的效果
因此muduo采用了和Nginx相似的操作,有一个base reactor通过accept组件负责处理新的客户端连接,并将新连接分派给各个从属 reactor,每个从属reactor是负责一个或者多个连接的读写等工作。
主要是在里面定义了定义回调函数类型和连接状态枚举(因为都是在类外,所以统一放到一个头文件中)
using TaskFunc = std::function<void()>- 任务函数using ReleaseFunc = std::function<void()>- 释放函数using PtrConnection = std::shared_ptr<Connection>- 连接指针enum ConneStatus- 连接状态枚举DISCONNECTED- 连接关闭状态CONNECTING- 连接建立状态CONNECTED- 通信状态DISCONNECTING- 待关闭状态
using ConnectedCallBack = std::function<void(const PtrConnection&)>- 连接建立回调using ClosedCallBack = std::function<void(const PtrConnection&)>- 连接关闭回调using MessageCallBack = std::function<void(const PtrConnection&, Buffer*)>- 消息回调using AnyEventCallBack = std::function<void(const PtrConnection&)>- 任意事件回调
这个类用于用户态的缓冲区,对接收发送的数据进行缓冲,有一个读偏移、一个写偏移和一个连续空间
read到write是可读数据,buffer.begin()到read+write是buffer.end()是可写空间---空间足够时会移动数据然后更新读写偏移,不够时会扩容
private:
uint64_t _read_idx- 读偏移uint64_t _write_idx- 写偏移std::vector<char> _buffer- 连续空间
public:
Buffer()- 构造函数char* Begin()- 空间起始位置char* GetWritePtr()- 获取写位置(内联)char* GetReadPtr()- 获取读位置(内联)uint64_t HeadIdleSize()- 缓冲区起始空间大小(内联)uint64_t TailIdleSize()- 缓冲区末尾空间大小(内联)size_t ReadableBytes()- 可读空间大小(内联)void MoveReadoffset(uint64_t len)- 读偏移向后移动void MoveWriteoffset(uint64_t len)- 写偏移向后移动void EnsureWritableBytes(uint64_t len)- 确保可写空间足够void Write(const void* data, uint64_t len)- 写入数据void WriteAndpush(const void* data, uint64_t len)- 写入char*并移动写偏移void Writestring(const std::string& data)- 写入stringvoid WritestringAndpush(const std::string& data)- 写入string并移动写偏移void WriteBuffer(Buffer& data)- 写入buffervoid WriteBufferAndpush(Buffer& data)- 写入buffer并移动写偏移void Read(void* buf, uint64_t len)- 读取len数据std::string ReadAsstring(uint64_t len)- 读取len数据并以string返回void ReadAndpop(void* buf, uint64_t len)- 读取len数据并移动读偏移std::string ReadAsstringandpop(uint64_t len)- 读取len数据并以string返回并移动读偏移char* FindcrLf()- 寻找换行字符std::string GetLine()- 获取一行std::string GetLineAndPop()- 获取一行并移动读偏移void clear()- 重置缓冲区状态
这是自定义的通用类型容器,保存不同类型的数据,控制协议处理的上下文来控制处理节奏,也可以换成c++17的any
private:
Base* _content- 父类指针,利用多态实现类型擦除,方便使用者的获取调用
内部类:
class Base- 基类virtual ~Base() = defaultvirtual const std::type_info &type() const = 0virtual Base *clone() const = 0
template<typename T> class Derived : public Base- 派生类T _value- 存储的值
public:
Any()- 默认构造函数~Any()- 析构函数template<typename T> Any(const T &value)- 模板构造函数Any(const Any &other)- 拷贝构造函数Any(Any &&other) noexcept- 移动构造函数template<typename T> Any &operator=(const T &val)- 赋值运算符Any &operator=(const Any &other)- 拷贝赋值运算符Any &swap(Any &other)- 交换bool has_value() const- 是否有值template<typename T> T *get()- 获取值(非const)template<typename T> const T *get() const- 获取值(const)const std::type_info &type() const- 获取类型信息void reset() noexcept- 重置
这是一个日志类,只是一个简单的类似日志宏功能的类,针对更加详细具体的日志实现在另外一个项目日志器中
private:
LogLevel _level- 日志级别std::string _message- 日志消息
enum LogLevelDEBUG = 0//调试日志输出INFO//正常日志输出WARN//警告日志输出ERROR//错误日志输出FATAL//致命错误日志输出
public:
Logger(LogLevel level)- 构造函数~Logger()- 析构函数template<typename... Args> void operator()(const char* format, Args... args)- 支持格式化输出(内联)void operator()(const char* msg)- 支持字符串参数void operator()(const std::string& msg)- 支持string参数
private:
static const char* LevelToString(LogLevel level)- 将日志级别转换为字符串static std::string GetCurrentTime()- 获取当前时间
#define L_DEBUG muduo::Logger(muduo::DEBUG)#define L_INFO muduo::Logger(muduo::INFO)#define L_WARN muduo::Logger(muduo::WARN)#define L_ERROR muduo::Logger(muduo::ERROR)#define L_FATAL muduo::Logger(muduo::FATAL)
使用宏定义,在使用时调用L_DEBUG()就可以进行打印
这是对所有描述符,包含监听套接字描述符、连接描述符、eventfd等进行管理的一个类,让对于描述符的监控事件在用户态更容易维护,以及触发事件后的操作流程更加清晰
private:
EventLoop* _loop- 关联的EventLoopint _fd- 文件描述符uint32_t _events- 当前需要监控的事件uint32_t _revents- 当前连接触发的事件EventCallback _read_cb- 可读回调EventCallback _write_cb- 可写回调EventCallback _error_cb- 错误回调EventCallback _close_cb- 连接断开回调EventCallback _event_cb- 任意事件回调
类型定义:
using EventCallback = std::function<void()>
public:
Channel(EventLoop *loop, int fd)- 构造函数int Fd() const- 获取描述符fd(内联)uint32_t Events()- 获取当前事件(内联)void SetREvent(uint32_t events)- 设置当前触发事件(内联)void SetReadCallback(const EventCallback& cb)- 设置读回调(内联)void SetWriteCallback(const EventCallback& cb)- 设置写回调(内联)void SetErrorCallback(const EventCallback& cb)- 设置错误回调(内联)void SetCloseCallback(const EventCallback& cb)- 设置关闭回调(内联)void SetEventCallback(const EventCallback& cb)- 设置任意事件回调(内联)bool ReadAble() const- 描述符是否可读(内联)bool WriteAble() const- 描述符是否可写(内联)void EnableRead()- 对描述符监控可读void EnableWrite()- 对描述符监控可写void DisableRead()- 解除可读事件监控void DisableWrite()- 解除可写事件监控void DisableAll()- 解除所有事件监控void Remove()- 移除监控void Update()- 更新监控void HandleEvent()- 事件处理,根据revents确定触发的事件,调用对应回调
这个类主要是对epoll进行封装的一个类,也就是底层的demultiplex(多路事件分发),里面有一个就绪事件数组和一个描述符与其对应channel指针的映射表
private:
int _epfd- epoll文件描述符struct epoll_event _evs[MAX_EPOLLEVENTS]- 就绪事件数组std::unordered_map<int, Channel *> _channels- 管理描述符和对应的channel
宏定义:
#define MAX_EPOLLEVENTS 1024
public:
Poller()- 构造函数void UpdateEvent(Channel *channel)- 更新事件监控void RemoveEvent(Channel *channel)- 移除事件监控void Poll(std::vector<Channel *> *active)- 开始监控,获取就绪channel
private:
void Update(Channel *channel, int op)- 根据具体操作类型更新epollbool HasChannel(Channel *channel)- 查找需要更新事件的描述符存不存在
功能:对通信连接进行管理的模块,对socket,buffer,channel的整合,由组件使用者设置各种回调
private:
uint64_t _conne_id- 连接唯一iduint64_t _timer_id- 定时器唯一id,这里使用_conne_idint _sockfd- 连接套接字描述符bool _enable_inactive_release- 是否启动非活跃连接销毁EventLoop* _loop- 连接关联的eventloopConneStatus _status- 连接状态Socket _socket- Socket对象Channel _conne_channel- Channel对象Buffer _in_buffer- 输入缓冲区Buffer _out_buffer- 输出缓冲区Any _context- 协议上下文管理ConnectedCallBack _connected_cb- 连接建立成功回调ClosedCallBack _closed_cb- 连接关闭回调MessageCallBack _message_cb- 有新数据接收成功的回调AnyEventCallBack _event_cb- 任意事件回调ClosedCallBack _server_closed_cb- 组件内的连接关闭回调
public:
Connection(EventLoop *loop, uint64_t cone_id, int sockfd)- 构造函数~Connection()- 析构函数int Fd() const- 获取管理的文件描述符(内联)int Id() const- 获取链接id(内联)bool Connected() const- 判断连接是否处于CONNECTED(内联)void SetContext(const Any &context)- 设置上下文(内联)Any *GetContext()- 获取上下文信息(内联)void SetConnectedCallBack(const ConnectedCallBack &connected_cb)- 设置连接建立回调(内联)void SetClosedCallBack(const ClosedCallBack &closed_cb)- 设置连接关闭回调(内联)void SetMessageCallBack(const MessageCallBack &message_cb)- 设置消息回调(内联)void SetAnyEventCallBack(const AnyEventCallBack &event_cb)- 设置任意事件回调(内联)void SetServerClosedCallBack(const ClosedCallBack &cb)- 设置服务器关闭回调(内联)void Established()- 连接获取后,所处的状态进行各种设置,设置事件回调,启动读监控,调用连接建立完成的回调void Send(const char *data, size_t len)- 发送数据,数据放到发送缓冲区void EnableInactiveRelease(int sec)- 启动非活跃销毁void CancelInactiveRelease()- 取消非活跃销毁void Shutdown()- 提供给使用者的关闭接口void Release()- 释放接口void Upgrade(const Any &context, const ConnectedCallBack &connected_cb, const ClosedCallBack &closed_cb, const MessageCallBack &message_cb, const AnyEventCallBack &event_cb)- 切换协议
private:
void HandleRead()- 描述符触发可读void HandleWrite()- 描述符触发可写void HandleClose()- 描述符触发关闭void HandleError()- 描述符触发错误void HandleEvent()- 描述符触发任意事件void EstablishedInLoop()- 在EventLoop线程中建立连接void ReleaseInLoop()- 在EventLoop线程中释放连接void ShutdownInLoop()- 在EventLoop线程中关闭连接void SendInLoop(Buffer buf)- 在EventLoop线程中发送数据void EnableInactiveReleaseInLoop(int sec)- 在EventLoop线程中启动非活跃销毁void CancelInactiveReleaseInLoop()- 在EventLoop线程中取消非活跃销毁void UpgradeInLoop(const Any &context, const ConnectedCallBack &connected_cb, const ClosedCallBack &closed_cb, const MessageCallBack &message_cb, const AnyEventCallBack &event_cb)- 在EventLoop线程中切换协议
每一个EventLoop对应一个线程,是Reactor模式的核心组件。它整合了Channel、Poller和TimingWheel,通过Poller监控文件描述符的I/O事件,通过Channel管理事件回调,通过TimingWheel实现连接定时销毁和定时任务执行
private:
std::thread::id _thread_id- 线程idint _eventfd- 事件通知描述符,唤醒io事件监控中有可能的阻塞Poller _poller- Poller对象std::unique_ptr<Channel> _event_channel- eventfd的Channelstd::vector<Tasks> _task- 任务队列std::mutex _mutex- 互斥锁TimingWheel _timerwheel- 定时器时间轮
eventfd说明:_eventfd用于在没有I/O事件处理,eventloop处于epoll_wait时,但是这个时候又有任务放入要执行,唤醒eventloop。
pipe/socketpair:需要两个文件描述符(读端+写端),eventfd 只需一个,更轻量,并且_eventfd是系统调用不会阻塞主线程,线程安全,8字节大小开销小
类型定义:
using Tasks = std::function<void()>
public:
EventLoop()- 构造函数void Start()- 启动eventloopvoid RunInLoop(const Tasks& t)- 判断要执行的任务是不是在当前线程,不是就压入任务池,是就执行void TasksInLoop(const Tasks& t)- 将操作加入任务池bool IsInLoop()- 判断当前线程是不是在Eventloop所在线程里面void UpdateEvent(Channel* channel)- 更新描述符的事件监控void RemoveEvent(Channel* channel)- 移除描述符的事件监控void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc& cb)- 添加定时器void TimerReflesh(uint64_t id)- 刷新定时器void TimerCancel(uint64_t id)- 取消定时bool HasTimer(uint64_t id)- 是否有这个定时器void AssertInLoop()- 断言在eventloop线程里面
private:
void RunAllTask()- 运行任务池所有任务static int CreateEventfd()- 创建eventfdvoid ReadEventfd()- 读取eventfdvoid WeakupEventfd()- 唤醒eventfd
EventLoop的核心是对epoll的封装,在epoll_create创建epoll实例、注册各个channel之后,EventLoop就处于epoll_wait阻塞状态等待I/O事件。如果此时没有I/O事件发生,但其他线程需要向EventLoop提交任务(如新连接操作、发送数据等),就通过eventfd写入数据触发可读事件,从而唤醒阻塞在epoll_wait的EventLoop线程,使其能够及时处理任务队列中的任务。
EventLoop和线程的封装类,负责创建一个线程并在该线程内创建EventLoop实例。通过互斥锁和条件变量实现同步,确保外部调用Getloop()时能够安全获取到已初始化的EventLoop指针,然后启动事件循环
private:
std::mutex _mutex- 互斥锁std::condition_variable _cond- 条件变量,结合互斥锁实现loop获取同步关系EventLoop* _loop- 在线程内实例化eventloop指针std::thread _thread- eventloop对应线程
public:
LoopThread()- 构造函数EventLoop* Getloop()- 获取EventLoop指针
private:
void ThreadEntry()- 实例化eventloop对象并启动eventloop
这里使用的是C++11的thread类,没有使用Linux的系统调用pthread。one thread one loop的关键在eventloop和thread的组合,达成一个线程一个eventloop。
void LoopThread::ThreadEntry() {
EventLoop loop;
{
std::unique_lock<std::mutex> lock(_mutex);
_loop = &loop;
_cond.notify_all();//loop实例化完唤醒可能的阻塞
}
loop.Start();//启动eventloop
}
EventLoop* LoopThread::Getloop() {
EventLoop* loop = nullptr;
{
std::unique_lock<std::mutex> lock(_mutex);
_cond.wait(lock, [&](){ return _loop != nullptr; });//阻塞至loop实例化完成
loop = _loop;
}
return loop;
}管理多个LoopThread的线程池类,实现主从Reactor模式。负责创建多个从属线程(每个线程对应一个EventLoop),并通过rr轮转方式分配EventLoop给新连接。当线程数为0时,返回主Reactor(base_loop)处理所有连接
private:
int _thread_cnt- 线程数量int _next_loop_idx- 下一个线程索引EventLoop* _base_loop- 主reactorstd::vector<EventLoop*> _loop- 用于分配eventloopstd::vector<std::unique_ptr<LoopThread>> _threads- 保存LoopThread
public:
LoopThreadPool(EventLoop* base_loop)- 构造函数void SetThreadCnt(int cnt)- 设置线程数量void Create()- 创建所有从属线程,为每个线程创建LoopThread对象,获取其EventLoop指针并保存到_loop数组中EventLoop* NextLoop()- 采用轮转(round-robin)方式分配EventLoop,当线程数为0时返回base_loop
属于主Reactor,对监听套接字进行管理的模块。当新连接到来时,监听套接字触发可读事件,Acceptor调用accept()获取新连接的fd,然后通过回调函数通知TcpServer创建Connection对象并分发到从属Reactor
private:
Socket _socket- 监听套接字EventLoop* _loop- 关联的EventLoopChannel _acpt_channel- 用于对监听套接字进行事件管理AcceptorFunc _acpt_cb- 新连接回调
public:
Acceptor(EventLoop* loop, int port)- 构造函数void SetAcceptorCallBack(const AcceptorFunc& acpt_cb)- 设置新连接回调void Listen()- 开始监听接口
private:
void HandleRead()- 处理新连接,调用accept()获取新连接fd,然后调用_acpt_cb回调int CreateServer(int port)- 创建监听套接字并绑定端口
主要是将对套接字的操作进行封装,简化外部操作
private:
int _sockfd- 套接字文件描述符
宏定义:
#define MAX_LISTEN 1024- 最大监听连接数
public:
Socket()- 默认构造函数Socket(int fd)- 使用已有fd构造~Socket()- 析构函数int Fd()- 获取文件描述符(内联)void Close()- 关闭套接字(内联)bool Create()- 创建套接字bool Bind(const std::string &ip, uint16_t port)- 绑定地址信息bool Listen(int backlog = MAX_LISTEN)- 开始监听bool Connect(const std::string &ip, uint16_t port)- 向服务器发起连接int Accept()- 获取新连接,返回描述符ssize_t Recv(void *buf, size_t len, int flag = 0)- 接收数据ssize_t NonBlockRecv(void *buf, size_t len)- 非阻塞接收数据ssize_t Send(const void *buf, size_t len, int flag = 0)- 发送数据ssize_t NonBlockSend(const void *buf, size_t len)- 非阻塞发送bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false)- 创建监听连接bool CreateClient(uint16_t port, const std::string &ip)- 创建客户端连接void ReuseAdderess()- 开启地址端口重用,防止缓冲区没有数据时阻塞void NoBlock()- 设置套接字为非阻塞
整合前面的所有模块,提供给组件使用者搭建服务器
private:
uint64_t _next_id- 连接id,目前是自增int _port- 服务器监听的端口int _timeout- 超时时间bool _enable_inactive_release- 是否启动非活跃连接销毁EventLoop _base_loop- 主reactorAcceptor _acceptor- 监听套接字的管理对象LoopThreadPool _pool- 从属线程池std::unordered_map<uint64_t, PtrConnection> _connections- 保存管理所有的连接对应的指针ConnectedCallBack _connected_cb- 使用者设置给connection模块的回调ClosedCallBack _closed_cb- 连接关闭回调MessageCallBack _message_cb- 消息回调AnyEventCallBack _event_cb- 任意事件回调ClosedCallBack _server_closed_cb- 服务器关闭回调
主要有acceptor,loopthreadpool,eventloop,connection,channel和一些回调。
public:
TcpServer(int port)- 构造函数void SetThreadCnt(int cnt)- 设置线程池数量void SetConnectedCallBack(const ConnectedCallBack& connected_cb)- 设置连接建立回调void SetClosedCallBack(const ClosedCallBack& closed_cb)- 设置连接关闭回调void SetMessageCallBack(const MessageCallBack& message_cb)- 设置消息回调void SetAnyEventCallBack(const AnyEventCallBack& event_cb)- 设置任意事件回调void SetServerClosedCallBack(const ClosedCallBack& cb)- 设置服务器关闭回调void EnableInactiveRelease(int timeout)- 启动非活跃连接销毁void RunAfter(const std::function<void()>& task, int delay)- delay秒过后执行任务一个taskvoid Start()- 启动
private:
void RemoveConnection(const PtrConnection& conne)- 移除连接信息void NewConnection(int fd)- 为新连接构造一个connection进行管理void RemoveConnectionInLoop(const PtrConnection& conne)- 在EventLoop线程中移除连接void RunAfterInLoop(const std::function<void()>& task, int delay)- 在EventLoop线程中添加定时任务,使用_next_id作为定时器ID(注意:可能与连接ID冲突,建议使用独立的ID生成器)
可以看到私有的几个函数和start都是很重要的:
void TcpServer::Start()
{
_pool.Create(); // 创建线程池的从属线程
_acceptor.SetAcceptorCallBack(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));//收到新连接的回调
_acceptor.Listen(); // 将监听套接字挂到baseloop上面开始事件监控
_base_loop.Start();//启动主Reactor事件循环
}主要是先创建线程池的从属线程,然后为acceptor绑定新连接到来的回调,然后开始监听新连接,最后启动主reactor的事件循环,等待事件的到来。
void TcpServer::NewConnection(int fd)
{
_next_id++;
PtrConnection conne(new Connection(_pool.NextLoop(), _next_id, fd));
conne->SetMessageCallBack(_message_cb);
conne->SetClosedCallBack(_closed_cb);
conne->SetConnectedCallBack(_connected_cb);
conne->SetAnyEventCallBack(_event_cb);
conne->SetServerClosedCallBack(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
if (_enable_inactive_release)
conne->EnableInactiveRelease(_timeout);
conne->Established();
_connections.insert(std::make_pair(_next_id, conne));
}内部的操作已经被封装了,所以新连接回调只需要传递要绑定的从属reactor、连接id和描述符就可以创建新连接了,然后再设置各种回调,然后再看看有没有启动非活跃连接超时销毁,有就设置上,最后启动事件监控,将新连接管理起来。
Connection::Connection(EventLoop *loop, uint64_t cone_id, int sockfd)
: _conne_id(cone_id), _sockfd(sockfd), _enable_inactive_release(false), _loop(loop), _status(CONNECTING), _socket(sockfd), _conne_channel(loop, sockfd)
{
_conne_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));
_conne_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));
_conne_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));
_conne_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
_conne_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));
}如果连接关闭就调用shutdown,然后自动调用RemoveConnection,移除连接在TcpServer里面的管理,然后底层自动调用另外的关闭回调,移除监控。
void TcpServer::RemoveConnectionInLoop(const PtrConnection &conne)
{
int id = conne->Id();
_connections.erase(id);
}
// 描述符触发挂断事件
void Connection::HandleClose()
{
if (_in_buffer.ReadableBytes() > 0 && _message_cb)
{
_message_cb(shared_from_this(), &_in_buffer);
}
return Release();
}
void Connection::ReleaseInLoop()
{
if (_status == DISCONNECTED)
return; // 防止重复释放
_status = DISCONNECTED; // 修改链接状态
_conne_channel.Remove(); // 移除监控
_socket.Close(); // 关闭描述符
if (_loop->HasTimer(_conne_id))
CancelInactiveReleaseInLoop(); // 如果有定时任务,取消
if (_closed_cb)
_closed_cb(shared_from_this()); // 调用关闭回调
if (_server_closed_cb)
_server_closed_cb(shared_from_this()); // 移除服务器内部管理的链接信息
}整个muduo的大概流程:
-
创建TcpServer时自动创建base_loop,作为主reactor,同时创建Acceptor和LoopThreadPool
-
设置从属线程数量,如果设置的线程数量为0,那主reactor不仅负责接收新连接还负责处理I/O事件,设置TcpServer的各种事件回调
-
调用Start()时,先创建从属线程池,然后设置Acceptor的新连接回调,接着Acceptor.listen()将监听套接字注册到主reactor的Poller,最后启动主reactor的事件循环
-
主reactor在事件循环中等待新连接到来,当监听套接字有可读事件时,Acceptor的Channel触发HandleRead回调
-
当新连接到来时,Acceptor调用accept()获取新连接的fd,然后调用TcpServer::NewConnection()创建Connection对象,在线程池获取一个从属reactor绑定,设置Connection的各种回调,调用Established()启动读事件监控,该连接的所有操作都在绑定的reactor里面进行
感谢陈硕的 muduo 网络库提供的设计灵感。
如果这个项目对你有帮助,请给它一个 Star!
🐛 发现问题?欢迎提交 Issue
💡 有改进建议?欢迎提交 Pull Request


