Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions git-town.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# See https://www.git-town.com/configuration-file for details

[branches]
main = "main"

[hosting]
forge-type = "github"
github-connector-type = "gh"
158 changes: 127 additions & 31 deletions modules/async_context.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ using sleep_duration = std::chrono::nanoseconds;
export class scheduler
{
public:
using block_info = std::variant<std::monostate, sleep_duration, context*>;

/**
* @brief
*
Expand All @@ -142,7 +144,7 @@ public:
*/
void schedule(context& p_context,
blocked_by p_block_state,
std::variant<sleep_duration, context*> p_block_info)
block_info p_block_info) noexcept
{
return do_schedule(p_context, p_block_state, p_block_info);
}
Expand All @@ -164,10 +166,9 @@ public:
}

private:
virtual void do_schedule(
context& p_context,
blocked_by p_block_state,
std::variant<sleep_duration, context*> p_block_info) = 0;
virtual void do_schedule(context& p_context,
blocked_by p_block_state,
block_info p_block_info) noexcept = 0;

virtual std::pmr::memory_resource& do_get_allocator() noexcept = 0;
};
Expand Down Expand Up @@ -196,25 +197,38 @@ public:
m_stack = { allocator.allocate_object<byte>(p_stack_size), p_stack_size };
}

void unblock()
void unblock() noexcept
{
transition_to(blocked_by::nothing, default_timeout);
transition_to(blocked_by::nothing);
}

void unblock_without_notification()
{
m_state = blocked_by::nothing;
}
void block_by_time(sleep_duration p_duration)

std::suspend_always block_by_time(sleep_duration p_duration)
{
transition_to(blocked_by::time, p_duration);
return {};
}
void block_by_io(sleep_duration p_duration = default_timeout)

std::suspend_always block_by_io(sleep_duration p_duration = default_timeout)
{
transition_to(blocked_by::io, p_duration);
return {};
}
void block_by_sync(sleep_duration p_duration = default_timeout)

std::suspend_always block_by_sync(context* p_blocker)
{
transition_to(blocked_by::sync, p_duration);
transition_to(blocked_by::sync, p_blocker);
return {};
}

std::suspend_always block_by_external()
{
transition_to(blocked_by::external, std::monostate{});
return {};
}

[[nodiscard]] constexpr std::coroutine_handle<> active_handle() const
Expand All @@ -224,7 +238,10 @@ public:

[[nodiscard]] auto state() const
{
return std::get<1>(m_state);
if (std::holds_alternative<blocked_by>(m_state)) {
return std::get<blocked_by>(m_state);
}
return blocked_by::nothing;
}

constexpr void active_handle(std::coroutine_handle<> p_active_handle)
Expand Down Expand Up @@ -265,10 +282,28 @@ public:

constexpr auto last_allocation_size()
{
return std::get<usize>(m_state);
if (std::holds_alternative<usize>(m_state)) {
return std::get<usize>(m_state);
}
return 0uz;
}

void transition_to(blocked_by p_new_state, sleep_duration p_info)
~context()
{
using poly_allocator = std::pmr::polymorphic_allocator<byte>;
auto allocator = poly_allocator(&m_scheduler->get_allocator());
allocator.deallocate_object<byte>(m_stack.data(), m_stack.size());
};

private:
friend class promise_base;
template<typename T>
friend class future_promise_type;

using context_state = std::variant<usize, blocked_by, std::exception_ptr>;

void transition_to(blocked_by p_new_state,
scheduler::block_info p_info = std::monostate{}) noexcept
{
m_state = p_new_state;
m_scheduler->schedule(*this, p_new_state, p_info);
Expand Down Expand Up @@ -296,17 +331,6 @@ public:
m_state = p_exception;
}

~context()
{
using poly_allocator = std::pmr::polymorphic_allocator<byte>;
auto allocator = poly_allocator(&m_scheduler->get_allocator());
allocator.deallocate_object<byte>(m_stack.data(), m_stack.size());
};

private:
friend class promise_base;
using context_state = std::variant<usize, blocked_by, std::exception_ptr>;

// Should stay within a standard cache-line of 64 bytes (8 words)
mem::strong_ptr<scheduler> m_scheduler; // word 1-2
std::coroutine_handle<> m_active_handle = std::noop_coroutine(); // word 3
Expand All @@ -315,8 +339,82 @@ private:
context_state m_state{ 0uz }; // word 7-8
};

static_assert(sizeof(context) <=
std::hardware_constructive_interference_size * 2);
export class context_token
{
public:
constexpr context_token() = default;
constexpr context_token(context& p_capture) noexcept
: m_context_address(std::bit_cast<std::uintptr_t>(&p_capture))
{
}
constexpr context_token& operator=(context& p_capture) noexcept
{
m_context_address = std::bit_cast<std::uintptr_t>(&p_capture);
return *this;
}
constexpr context_token& operator=(nullptr_t) noexcept
{
m_context_address = 0U;
return *this;
}

constexpr context_token(context_token const& p_capture) noexcept = default;
constexpr context_token& operator=(context_token const& p_capture) noexcept =
default;
constexpr context_token(context_token&& p_capture) noexcept = default;
constexpr context_token& operator=(context_token& p_capture) noexcept =
default;

constexpr bool operator==(context& p_context) noexcept
{
return m_context_address == std::bit_cast<std::uintptr_t>(&p_context);
}

[[nodiscard]] constexpr bool in_use() const noexcept
{
return m_context_address != 0U;
}

[[nodiscard]] auto address() const noexcept
{
return m_context_address != 0U;
}

[[nodiscard]] constexpr operator bool() const noexcept
{
return in_use();
}

constexpr void lease(context& p_capture) noexcept
{
m_context_address = std::bit_cast<std::uintptr_t>(&p_capture);
}

constexpr std::suspend_always set_as_block_by_sync(context& p_capture)
{
if (in_use()) {
auto* address = std::bit_cast<void*>(m_context_address);
auto* inner_context = static_cast<context*>(address);
p_capture.block_by_sync(inner_context);
}
return {};
}

constexpr void unblock_and_clear() noexcept
{
if (in_use()) {
auto* address = std::bit_cast<void*>(m_context_address);
auto* inner_context = static_cast<context*>(address);
inner_context->unblock();
m_context_address = 0U;
}
}

private:
std::uintptr_t m_context_address = 0U;
};

static_assert(sizeof(context) <= std::hardware_constructive_interference_size);

// =============================================================================
//
Expand Down Expand Up @@ -398,8 +496,7 @@ public:
constexpr auto await_transform(
std::chrono::duration<Rep, Ratio> p_sleep_duration) noexcept
{
m_context->block_by_time(p_sleep_duration);
return std::suspend_always{};
return m_context->block_by_time(p_sleep_duration);
}

constexpr auto await_transform(pop_active_coroutine) noexcept
Expand Down Expand Up @@ -624,8 +721,7 @@ public:

constexpr void resume() const
{
auto active = handle().promise().get_context().active_handle();
active.resume();
handle().promise().get_context().active_handle().resume();
}

/**
Expand Down
17 changes: 8 additions & 9 deletions test_package/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,21 @@

import async_context;

struct my_scheduler
struct test_scheduler
: public async::scheduler
, mem::enable_strong_from_this<my_scheduler>
, mem::enable_strong_from_this<test_scheduler>
{
int sleep_count = 0;

my_scheduler(mem::strong_ptr_only_token)
test_scheduler(mem::strong_ptr_only_token)
{
}

private:
void do_schedule(
[[maybe_unused]] async::context& p_context,
[[maybe_unused]] async::blocked_by p_block_state,
[[maybe_unused]] std::variant<std::chrono::nanoseconds, async::context*>
p_block_info) override
void do_schedule([[maybe_unused]] async::context& p_context,
[[maybe_unused]] async::blocked_by p_block_state,
[[maybe_unused]] async::scheduler::block_info
p_block_info) noexcept override
{
if (std::holds_alternative<std::chrono::nanoseconds>(p_block_info)) {
sleep_count++;
Expand All @@ -64,7 +63,7 @@ async::future<void> coro_double_delay(async::context&)
int main()
{
auto scheduler =
mem::make_strong_ptr<my_scheduler>(std::pmr::new_delete_resource());
mem::make_strong_ptr<test_scheduler>(std::pmr::new_delete_resource());
async::context my_context(scheduler, 1024);

auto future_delay = coro_double_delay(my_context);
Expand Down
Loading