From 683c33234fd7fe5416b77ca0c15031db6a2c7285 Mon Sep 17 00:00:00 2001 From: Khalil Estell Date: Sat, 27 Dec 2025 08:22:13 -0800 Subject: [PATCH] :recycle: extend async context blocking mechanisms and add comprehensive tests This commit significantly expands the async context API to support multiple blocking states and adds extensive test coverage. Key changes include: - Add new blocking methods: block_by_sync() with context pointer, and block_by_external() to support different synchronization patterns - Make blocking methods return std::suspend_always for improved coroutine suspension ergonomics - Add context_token class for safe context capture and unblocking operations - Enhance context state queries with safer std::holds_alternative checks - Add git-town configuration for workflow management - Add comprehensive test suite demonstrating resource synchronization patterns between coroutines, including mutex-like behavior and I/O blocking scenarios - Refactor test infrastructure with reusable test_scheduler implementation. These enhancements enable more sophisticated coroutine coordination patterns while maintaining the lightweight design of the async context system. --- git-town.toml | 8 ++ modules/async_context.cppm | 158 ++++++++++++++++----- test_package/main.cpp | 17 ++- tests/async.test.cpp | 278 ++++++++++++++++++++++++++++++++----- tests/main.test.cpp | 6 +- 5 files changed, 389 insertions(+), 78 deletions(-) create mode 100644 git-town.toml diff --git a/git-town.toml b/git-town.toml new file mode 100644 index 0000000..0d2956e --- /dev/null +++ b/git-town.toml @@ -0,0 +1,8 @@ +# See https://www.git-town.com/configuration-file for details + +[branches] +main = "main" + +[hosting] +forge-type = "github" +github-connector-type = "gh" diff --git a/modules/async_context.cppm b/modules/async_context.cppm index 5b143c3..65352c5 100644 --- a/modules/async_context.cppm +++ b/modules/async_context.cppm @@ -118,6 +118,8 @@ using sleep_duration = std::chrono::nanoseconds; export class scheduler { public: + using block_info = std::variant; + /** * @brief * @@ -142,7 +144,7 @@ public: */ void schedule(context& p_context, blocked_by p_block_state, - std::variant p_block_info) + block_info p_block_info) noexcept { return do_schedule(p_context, p_block_state, p_block_info); } @@ -164,10 +166,9 @@ public: } private: - virtual void do_schedule( - context& p_context, - blocked_by p_block_state, - std::variant p_block_info) = 0; + virtual void do_schedule(context& p_context, + blocked_by p_block_state, + block_info p_block_info) noexcept = 0; virtual std::pmr::memory_resource& do_get_allocator() noexcept = 0; }; @@ -196,25 +197,38 @@ public: m_stack = { allocator.allocate_object(p_stack_size), p_stack_size }; } - void unblock() + void unblock() noexcept { - transition_to(blocked_by::nothing, default_timeout); + transition_to(blocked_by::nothing); } + void unblock_without_notification() { m_state = blocked_by::nothing; } - void block_by_time(sleep_duration p_duration) + + std::suspend_always block_by_time(sleep_duration p_duration) { transition_to(blocked_by::time, p_duration); + return {}; } - void block_by_io(sleep_duration p_duration = default_timeout) + + std::suspend_always block_by_io(sleep_duration p_duration = default_timeout) { transition_to(blocked_by::io, p_duration); + return {}; } - void block_by_sync(sleep_duration p_duration = default_timeout) + + std::suspend_always block_by_sync(context* p_blocker) { - transition_to(blocked_by::sync, p_duration); + transition_to(blocked_by::sync, p_blocker); + return {}; + } + + std::suspend_always block_by_external() + { + transition_to(blocked_by::external, std::monostate{}); + return {}; } [[nodiscard]] constexpr std::coroutine_handle<> active_handle() const @@ -224,7 +238,10 @@ public: [[nodiscard]] auto state() const { - return std::get<1>(m_state); + if (std::holds_alternative(m_state)) { + return std::get(m_state); + } + return blocked_by::nothing; } constexpr void active_handle(std::coroutine_handle<> p_active_handle) @@ -265,10 +282,28 @@ public: constexpr auto last_allocation_size() { - return std::get(m_state); + if (std::holds_alternative(m_state)) { + return std::get(m_state); + } + return 0uz; } - void transition_to(blocked_by p_new_state, sleep_duration p_info) + ~context() + { + using poly_allocator = std::pmr::polymorphic_allocator; + auto allocator = poly_allocator(&m_scheduler->get_allocator()); + allocator.deallocate_object(m_stack.data(), m_stack.size()); + }; + +private: + friend class promise_base; + template + friend class future_promise_type; + + using context_state = std::variant; + + void transition_to(blocked_by p_new_state, + scheduler::block_info p_info = std::monostate{}) noexcept { m_state = p_new_state; m_scheduler->schedule(*this, p_new_state, p_info); @@ -296,17 +331,6 @@ public: m_state = p_exception; } - ~context() - { - using poly_allocator = std::pmr::polymorphic_allocator; - auto allocator = poly_allocator(&m_scheduler->get_allocator()); - allocator.deallocate_object(m_stack.data(), m_stack.size()); - }; - -private: - friend class promise_base; - using context_state = std::variant; - // Should stay within a standard cache-line of 64 bytes (8 words) mem::strong_ptr m_scheduler; // word 1-2 std::coroutine_handle<> m_active_handle = std::noop_coroutine(); // word 3 @@ -315,8 +339,82 @@ private: context_state m_state{ 0uz }; // word 7-8 }; -static_assert(sizeof(context) <= - std::hardware_constructive_interference_size * 2); +export class context_token +{ +public: + constexpr context_token() = default; + constexpr context_token(context& p_capture) noexcept + : m_context_address(std::bit_cast(&p_capture)) + { + } + constexpr context_token& operator=(context& p_capture) noexcept + { + m_context_address = std::bit_cast(&p_capture); + return *this; + } + constexpr context_token& operator=(nullptr_t) noexcept + { + m_context_address = 0U; + return *this; + } + + constexpr context_token(context_token const& p_capture) noexcept = default; + constexpr context_token& operator=(context_token const& p_capture) noexcept = + default; + constexpr context_token(context_token&& p_capture) noexcept = default; + constexpr context_token& operator=(context_token& p_capture) noexcept = + default; + + constexpr bool operator==(context& p_context) noexcept + { + return m_context_address == std::bit_cast(&p_context); + } + + [[nodiscard]] constexpr bool in_use() const noexcept + { + return m_context_address != 0U; + } + + [[nodiscard]] auto address() const noexcept + { + return m_context_address != 0U; + } + + [[nodiscard]] constexpr operator bool() const noexcept + { + return in_use(); + } + + constexpr void lease(context& p_capture) noexcept + { + m_context_address = std::bit_cast(&p_capture); + } + + constexpr std::suspend_always set_as_block_by_sync(context& p_capture) + { + if (in_use()) { + auto* address = std::bit_cast(m_context_address); + auto* inner_context = static_cast(address); + p_capture.block_by_sync(inner_context); + } + return {}; + } + + constexpr void unblock_and_clear() noexcept + { + if (in_use()) { + auto* address = std::bit_cast(m_context_address); + auto* inner_context = static_cast(address); + inner_context->unblock(); + m_context_address = 0U; + } + } + +private: + std::uintptr_t m_context_address = 0U; +}; + +static_assert(sizeof(context) <= std::hardware_constructive_interference_size); // ============================================================================= // @@ -398,8 +496,7 @@ public: constexpr auto await_transform( std::chrono::duration p_sleep_duration) noexcept { - m_context->block_by_time(p_sleep_duration); - return std::suspend_always{}; + return m_context->block_by_time(p_sleep_duration); } constexpr auto await_transform(pop_active_coroutine) noexcept @@ -624,8 +721,7 @@ public: constexpr void resume() const { - auto active = handle().promise().get_context().active_handle(); - active.resume(); + handle().promise().get_context().active_handle().resume(); } /** diff --git a/test_package/main.cpp b/test_package/main.cpp index 408bc71..2967b50 100644 --- a/test_package/main.cpp +++ b/test_package/main.cpp @@ -22,22 +22,21 @@ import async_context; -struct my_scheduler +struct test_scheduler : public async::scheduler - , mem::enable_strong_from_this + , mem::enable_strong_from_this { int sleep_count = 0; - my_scheduler(mem::strong_ptr_only_token) + test_scheduler(mem::strong_ptr_only_token) { } private: - void do_schedule( - [[maybe_unused]] async::context& p_context, - [[maybe_unused]] async::blocked_by p_block_state, - [[maybe_unused]] std::variant - p_block_info) override + void do_schedule([[maybe_unused]] async::context& p_context, + [[maybe_unused]] async::blocked_by p_block_state, + [[maybe_unused]] async::scheduler::block_info + p_block_info) noexcept override { if (std::holds_alternative(p_block_info)) { sleep_count++; @@ -64,7 +63,7 @@ async::future coro_double_delay(async::context&) int main() { auto scheduler = - mem::make_strong_ptr(std::pmr::new_delete_resource()); + mem::make_strong_ptr(std::pmr::new_delete_resource()); async::context my_context(scheduler, 1024); auto future_delay = coro_double_delay(my_context); diff --git a/tests/async.test.cpp b/tests/async.test.cpp index e47efe0..f743766 100644 --- a/tests/async.test.cpp +++ b/tests/async.test.cpp @@ -12,18 +12,100 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include #include -#include +#include +#include #include #include import async_context; +namespace async { +std::ostream& operator<<(std::ostream& out, blocked_by b) +{ + switch (b) { + case blocked_by::nothing: + return out << "nothing"; + case blocked_by::time: + return out << "time"; + case blocked_by::io: + return out << "io"; + case blocked_by::sync: + return out << "sync"; + case blocked_by::external: + return out << "external"; + default: + // For unknown values we print the numeric value + return out << "blocked_by(" << static_cast(b) << ')'; + } +} +} // namespace async + bool resumption_occurred = false; +struct test_scheduler + : public async::scheduler + , mem::enable_strong_from_this +{ + int sleep_count = 0; + async::context* sync_context = nullptr; + bool io_block = false; + + test_scheduler(mem::strong_ptr_only_token) + { + } + +private: + void do_schedule([[maybe_unused]] async::context& p_context, + [[maybe_unused]] async::blocked_by p_block_state, + [[maybe_unused]] async::scheduler::block_info + p_block_info) noexcept override + { + std::println("Scheduler called!", sleep_count); + + switch (p_block_state) { + case async::blocked_by::time: { + if (std::holds_alternative(p_block_info)) { + std::println("sleep for: {}", + std::get(p_block_info)); + sleep_count++; + std::println("Sleep count = {}!", sleep_count); + } + break; + } + case async::blocked_by::sync: { + if (std::holds_alternative(p_block_info)) { + auto* context = std::get(p_block_info); + std::println( + "Coroutine ({}) is blocked by syncronization with coroutine ({})", + static_cast(&p_context), + static_cast(context)); + sync_context = context; + } + break; + } + case async::blocked_by::io: { + io_block = true; + break; + } + case async::blocked_by::nothing: { + std::println("Context ({}) has been unblocked!", + static_cast(&p_context)); + break; + } + default: { + break; + } + } + } + + std::pmr::memory_resource& do_get_allocator() noexcept override + { + return *strong_from_this().get_allocator(); + } +}; async::future coro_print(async::context&) { @@ -36,44 +118,14 @@ async::future coro_print(async::context&) } namespace async { -boost::ut::suite<"async::context"> async_context_suite = []() { +void async_context_suite() +{ using namespace boost::ut; - ""_test = []() { - struct my_scheduler - : public async::scheduler - , mem::enable_strong_from_this - { - int sleep_count = 0; - - my_scheduler(mem::strong_ptr_only_token) - { - } - - private: - void do_schedule( - [[maybe_unused]] context& p_context, - [[maybe_unused]] blocked_by p_block_state, - [[maybe_unused]] std::variant - p_block_info) override - { - std::println("Scheduler called!", sleep_count); - if (std::holds_alternative(p_block_info)) { - std::println("sleep for: {}", - std::get(p_block_info)); - sleep_count++; - std::println("Sleep count = {}!", sleep_count); - } - } - - std::pmr::memory_resource& do_get_allocator() noexcept override - { - return *strong_from_this().get_allocator(); - } - }; + "coroutine with time-based blocking and sync_wait"_test = []() { // Setup auto scheduler = - mem::make_strong_ptr(std::pmr::new_delete_resource()); + mem::make_strong_ptr(std::pmr::new_delete_resource()); async::context my_context(scheduler, 1024); // Exercise @@ -85,5 +137,159 @@ boost::ut::suite<"async::context"> async_context_suite = []() { expect(that % future_print.done()); expect(that % 2 == scheduler->sleep_count); }; + + "block_by_io and block_by_sync notify scheduler correctly"_test = []() { + // Setup + auto scheduler = + mem::make_strong_ptr(std::pmr::new_delete_resource()); + async::context my_context(scheduler, 1024); + async::context my_context2(scheduler, 1024); + + resumption_occurred = false; + + auto test_coro = + [&my_context2](async::context& p_context) -> async::future { + using namespace std::chrono_literals; + std::println("Printed from a coroutine"); + co_await 100ns; + resumption_occurred = true; + co_await p_context.block_by_io(); + co_await p_context.block_by_sync(&my_context2); + co_return; + }; + + // Exercise + auto blocked_by_testing = test_coro(my_context); + expect(that % not resumption_occurred); + blocked_by_testing.sync_wait(); + + // Verify + expect(that % resumption_occurred); + expect(that % blocked_by_testing.done()); + expect(that % scheduler->io_block); + expect(that % &my_context2 == scheduler->sync_context); + }; + + "mutex-like resource synchronization between coroutines"_test = []() { + // Setup + auto scheduler = + mem::make_strong_ptr(std::pmr::new_delete_resource()); + async::context my_context1(scheduler, 1024); + async::context my_context2(scheduler, 1024); + + async::context_token io_in_use; + + auto single_resource = + [&](async::context& p_context) -> async::future { + using namespace std::chrono_literals; + + std::println("Executing 'single_resource' coroutine"); + while (io_in_use) { + std::println("Resource unavailable, blocked by {}", + io_in_use.address()); + co_await io_in_use.set_as_block_by_sync(p_context); + } + + // Block next coroutine from using this resource + io_in_use = p_context; + + // It cannot be assumed that the scheduler will not sync_wait() this + // coroutine, thus, a loop is required to sure that the async operation + // has actually completed. + while (io_in_use == p_context) { + std::println("Waiting on io complete flag, blocking by I/O"); + // Continually notify that this is blocked by IO + co_await p_context.block_by_io(); + } + + std::println("IO operation complete! Returning!"); + + co_return; + }; + + std::println("🧱 Future setup"); + auto access_first = single_resource(my_context1); + auto access_second = single_resource(my_context2); + + auto check_access_first_blocked_by = + [&](async::blocked_by p_state = async::blocked_by::io, + std::source_location const& p_location = + std::source_location::current()) { + expect(that % my_context1.state() == + access_first.handle().promise().get_context().state()) + << "line: " << p_location.line() << '\n'; + expect(that % static_cast(p_state) == + static_cast(my_context1.state())) + << "line: " << p_location.line() << '\n'; + ; + }; + + auto check_access_second_blocked_by = + [&](async::blocked_by p_state = async::blocked_by::nothing, + std::source_location const& p_location = + std::source_location::current()) { + expect(that % my_context2.state() == + access_second.handle().promise().get_context().state()) + << "line: " << p_location.line() << '\n'; + ; + expect(that % p_state == my_context2.state()) + << "line: " << p_location.line() << '\n'; + ; + }; + + // access_first will claim the resource and will return control, and be + // blocked by IO. + std::println("▶️ Resume 1st: 1"); + access_first.resume(); + + check_access_first_blocked_by(); + check_access_second_blocked_by(); + + std::println("▶️ Resume 1st: 2"); + access_first.resume(); + + check_access_first_blocked_by(); + check_access_second_blocked_by(); + + std::println("▶️ Resume 1st: 3"); + access_first.resume(); + + check_access_first_blocked_by(); + check_access_second_blocked_by(); + + std::println("▶️ Resume 2nd: 1"); + access_second.resume(); + + check_access_first_blocked_by(); + check_access_second_blocked_by(async::blocked_by::sync); + + io_in_use.unblock_and_clear(); + + check_access_first_blocked_by(async::blocked_by::nothing); + check_access_second_blocked_by(async::blocked_by::sync); + + std::println("▶️ Resume 2nd: 2"); + access_second.resume(); + + // Resuming access_second shouldn't change the state of anything + check_access_first_blocked_by(async::blocked_by::nothing); + check_access_second_blocked_by(async::blocked_by::io); + + std::println("▶️ Resume 1st: 4, this should finish the operation"); + access_first.resume(); + + expect(that % my_context1.state() == async::blocked_by::nothing); + expect(that % access_first.done()); + + check_access_second_blocked_by(async::blocked_by::io); + access_second.resume(); + check_access_second_blocked_by(async::blocked_by::io); + + io_in_use.unblock_and_clear(); + access_second.resume(); + + expect(that % my_context2.state() == async::blocked_by::nothing); + expect(that % access_second.done()); + }; }; } // namespace async diff --git a/tests/main.test.cpp b/tests/main.test.cpp index 445bbd2..9482d00 100644 --- a/tests/main.test.cpp +++ b/tests/main.test.cpp @@ -14,12 +14,14 @@ // export module libahl_unit_tests; -namespace hal { +namespace async { // Extern position dependant test go here. Refrain from using this whenever // possible. -} // namespace hal +extern void async_context_suite(); +} // namespace async int main() { // Position dependent test go below: + async::async_context_suite(); }