feat: add io_uring transport layer support#3328
Conversation
0e65397 to
89b2765
Compare
|
|
||
| if (!IsWritable()) { errno = EAGAIN; return -1; } | ||
|
|
||
| const int fd = _socket->fd(); |
There was a problem hiding this comment.
- 这里 fd 还是 tcp 的 socket fd,不是 ring_fd 是吗?
- 这里 iouring 就仅仅使用了数据的读写的 API ?
There was a problem hiding this comment.
_inflight_writes (per-endpoint) ← 保护单条连接不无限堆积未完成写
SQ 容量(256 个槽) ← 保护 ring 本身不溢出
89b2765 to
d737faf
Compare
d737faf to
07c041d
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds an io_uring-backed transport (IouringTransport) to bRPC as an alternative to the existing TCP/RDMA transports, including a per-bthread-tag poller architecture, optional fixed-buffer (READ/WRITE_FIXED) support, and build-system integration for both CMake and Bazel. It also includes an io_uring example and Chinese documentation.
Changes:
- Introduce io_uring core implementation (endpoint, helper, transport, buffer pools) and register it in the transport factory / socket mode.
- Add build toggles and dependency wiring for liburing (CMake + Bazel), plus an io_uring echo example and docs.
- Minor compatibility adjustments (e.g.,
BLOCK_SIZErename) to avoid macro collisions when including liburing headers.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| WORKSPACE | Adds Bazel liburing repository definition (currently via hard-coded local path). |
| src/butil/single_threaded_pool.h | Renames a static constant to avoid BLOCK_SIZE macro conflicts. |
| src/bthread/types.h | Adjusts minimum bthread concurrency when RDMA/io_uring is compiled in. |
| src/brpc/transport_factory.cpp | Registers IouringTransport in the factory. |
| src/brpc/socket.h | Adds friend declarations for io_uring endpoint/transport. |
| src/brpc/socket_mode.h | Adds SOCKET_MODE_IOURING. |
| src/brpc/iouring/iouring_helper.h | Declares io_uring lifecycle APIs, polling modes, and poller handle. |
| src/brpc/iouring/iouring_helper.cpp | Implements global io_uring init, probing, and poller hook wiring. |
| src/brpc/iouring/iouring_endpoint.h | Defines IouringEndpoint and poller structures. |
| src/brpc/iouring/iouring_endpoint.cpp | Implements SQE submission, CQE polling, and per-tag poller loop. |
| src/brpc/iouring/iouring_block_pool.h | Declares fixed-buffer mempool + per-ring read-slot pool. |
| src/brpc/iouring/iouring_block_pool.cpp | Implements fixed-buffer mempool and read-slot pool. |
| src/brpc/iouring_transport.h | Declares IouringTransport. |
| src/brpc/iouring_transport.cpp | Implements IouringTransport read/write/wait logic and global init hook. |
| src/brpc/input_messenger.h | Adds friend declarations for io_uring integration. |
| example/iouring_echo_c++/server.cpp | Adds an io_uring echo server example with flags and initialization. |
| example/iouring_echo_c++/client.cpp | Adds a simple client for the io_uring echo server example. |
| example/iouring_echo_c++/echo.proto | Proto definitions for the echo example. |
| example/iouring_echo_c++/CMakeLists.txt | CMake wiring for building the io_uring echo example. |
| example/BUILD.bazel | Adds Bazel targets/defines for io_uring example(s). |
| docs/cn/iouring.md | Adds Chinese documentation for io_uring build/flags/architecture. |
| CMakeLists.txt | Adds WITH_IOURING option and liburing discovery/linking. |
| BUILD.bazel | Adds Bazel selects/deps/src handling for io_uring sources and liburing. |
| bazel/third_party/liburing/liburing.BUILD | Adds Bazel cc_library target for liburing sources/headers. |
| bazel/config/BUILD.bazel | Adds brpc_with_iouring config_setting. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # io_uring support via local liburing source tree. | ||
| # Enable with: --define=BRPC_WITH_IOURING=true | ||
| # Requires liburing checked out at /docker/root/projects/liburing | ||
| new_local_repository( | ||
| name = "com_github_axboe_liburing", | ||
| path = "/docker/root/projects/liburing", | ||
| build_file = "//bazel/third_party/liburing:liburing.BUILD", | ||
| ) |
| // When RDMA or io_uring is compiled in, their dedicated Poller threads handle | ||
| // I/O directly, so the bare minimum drops to 1 worker + epoll thread. | ||
| // Otherwise keep the traditional floor of 3 + epoll (1 epoll + 2 workers). | ||
| #if defined(BRPC_WITH_RDMA) || defined(BRPC_WITH_IOURING) | ||
| static const int BTHREAD_MIN_CONCURRENCY = 1 + BTHREAD_EPOLL_THREAD_NUM; | ||
| #else | ||
| static const int BTHREAD_MIN_CONCURRENCY = 3 + BTHREAD_EPOLL_THREAD_NUM; | ||
| #endif |
| if(WITH_IOURING) | ||
| list(APPEND DYNAMIC_LIB ${IOURING_LIB}) | ||
| set(BRPC_PRIVATE_LIBS "${BRPC_PRIVATE_LIBS} -luring") | ||
| endif() | ||
|
|
||
| set(BRPC_PRIVATE_LIBS "-lgflags -lprotobuf -lleveldb -lprotoc -lssl -lcrypto -ldl -lz") |
| if (mode == IouringPollingMode::NONE) { | ||
| // Interrupt-driven mode: block up to 1 ms waiting for a CQE. | ||
| // The 1 ms timeout keeps the Poller loop responsive to new | ||
| // connections arriving in op_queue without burning CPU when | ||
| // there is no I/O traffic. | ||
| struct io_uring_cqe* cqe = nullptr; | ||
| struct __kernel_timespec ts{0, 1000000}; // 1 ms | ||
| int r = io_uring_wait_cqe_timeout(&poller->ring, &cqe, &ts); | ||
| if (r == 0 && cqe) { | ||
| io_uring_cqe_seen(&poller->ring, cqe); | ||
| } | ||
|
|
| // Only process CQEs that bRPC submitted (bit 63 == 1). | ||
| // All other CQEs are user-submitted; leave them in the ring so the | ||
| // user callback (called right after PollCq returns) can | ||
| // drain and handle them. Users need no special tagging – bit 63 | ||
| // is never set in a canonical user-space pointer or small integer. | ||
| if (!(udata & kBrpcCqeTag)) { | ||
| continue; // do NOT call io_uring_cqe_seen() | ||
| } |
| static const int kMaxTlsCacheNum = 4096; | ||
|
|
||
| IouringMemPool& IouringMemPool::Instance() { | ||
| static IouringMemPool inst; | ||
| return inst; | ||
| } | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // Init | ||
| // --------------------------------------------------------------------------- | ||
| bool IouringMemPool::Init(size_t block_size) { | ||
| if (initialized_) { | ||
| LOG(WARNING) << "IouringMemPool already initialized"; | ||
| return true; |
| "butil::SetDefaultBlockSize() is called with this value at " | ||
| "startup so IOBuf and the registered slab are always in sync."); | ||
|
|
||
| DEFINE_int32(iouring_read_slot_num, 256, | ||
| "Initial read slots per Poller ring."); | ||
|
|
||
| DEFINE_int32(iouring_read_slot_max, 4096, |
| IouringReadSlot consumed_slot = ep->_read_slot; | ||
| ep->_read_slot = {}; | ||
|
|
||
| // Already on the Poller thread – no lock needed. | ||
| // Acquire the next slot before handing off consumed_slot so | ||
| // that back-to-back arrivals never stall waiting for a slot. | ||
| { | ||
| Poller* p = ep->GetPoller(); | ||
| if (p && p->slot_pool.initialized()) { | ||
| p->slot_pool.Acquire(&ep->_read_slot); | ||
| } | ||
| } | ||
|
|
||
| // Zero-copy: wrap slot memory – IouringMemPool is | ||
| // thread-safe so the destructor can run on any thread. | ||
| struct SlotDeleter { | ||
| static void destroy(void* ptr) { | ||
| IouringMemPool::Instance().Deallocate(ptr); | ||
| } | ||
| }; | ||
| butil::IOBuf tmp; | ||
| tmp.append_user_data(consumed_slot.buf, | ||
| static_cast<size_t>(res), | ||
| SlotDeleter::destroy); | ||
| m->_read_buf.append(std::move(tmp)); |
| LOG(WARNING) | ||
| << "io_uring_register_buffers failed for new " | ||
| "region (buf_index_base=" << buf_index_base | ||
| << "): " << berror(-r2) | ||
| << " – WRITE_FIXED will fall back to WRITEV."; | ||
| } |
| }) + select({ | ||
| "//bazel/config:brpc_with_iouring": glob([ | ||
| "src/brpc/iouring/*.cpp", | ||
| ]), | ||
| "//conditions:default": [], |
07c041d to
f6b9ad4
Compare
Add a new io_uring-based transport layer (IouringTransport) as an
alternative to the existing TCP and RDMA transports, following the
same architectural patterns as the RDMA implementation.
Core implementation:
- src/brpc/iouring/iouring_endpoint.h/cpp: IouringEndpoint (SocketUser
subclass) that submits async read/write SQEs and reaps CQEs, with
optional SQPOLL polling mode.
- src/brpc/iouring/iouring_helper.h/cpp: global io_uring ring lifecycle
management, per-bthread-tag poller threads, and availability checks.
- src/brpc/iouring_transport.h/cpp: IouringTransport (Transport
interface) wiring Init/Release/Reset/Connect/CutFromIOBuf(List)/
WaitEpollOut/ProcessEvent/QueueMessage/Debug/ContextInitOrDie.
Build system integration:
- CMakeLists.txt: BRPC_WITH_IOURING option; find_package(liburing);
conditionally compile iouring sources and link -luring.
- BUILD.bazel / bazel/config/BUILD.bazel: brpc_with_iouring
config_setting; conditional srcs/defines/linkopts/deps.
- WORKSPACE: new_local_repository for @com_github_axboe_liburing.
- bazel/third_party/liburing/liburing.BUILD: cc_library target for
liburing.
Framework hooks:
- src/brpc/socket_mode.h: add SOCKET_MODE_IOURING enum value.
- src/brpc/transport_factory.cpp: register IouringTransport in
TransportFactory::Create().
- src/brpc/socket.h: friend declarations for IouringEndpoint /
IouringTransport.
- src/brpc/input_messenger.h: friend declarations for IouringEndpoint /
IouringTransport.
Bug fixes:
- src/butil/single_threaded_pool.h: rename static member BLOCK_SIZE to
POOL_BLOCK_SIZE to avoid conflict with the BLOCK_SIZE macro defined
by <linux/fs.h> (pulled in via liburing.h).
- src/brpc/iouring_transport.cpp: move DECLARE_bool(usercode_in_*)
inside namespace brpc{} to match the DEFINE_bool site in
event_dispatcher.cpp, fixing linker undefined-reference errors.
Example and documentation:
- example/iouring_performance/: server, client, proto, CMakeLists.txt
mirroring the rdma_performance example; supports WITH_IOURING=1 make
flag.
- example/BUILD.bazel: Bazel targets for the new example.
- docs/cn/iouring.md: Chinese-language guide covering build, flags,
architecture and comparison with RDMA.
f6b9ad4 to
22db965
Compare
Add a new io_uring-based transport layer (IouringTransport) as an alternative to the existing TCP and RDMA transports, following the same architectural patterns as the RDMA implementation.
Core implementation:
Build system integration:
Framework hooks:
Bug fixes:
Example and documentation:
What problem does this PR solve?
Issue Number:
#1650
#3212
Problem Summary:
What is changed and the side effects?
Changed:
Side effects:
Performance effects:
Breaking backward compatibility:
Check List: