Skip to content

Separate shuffler's communication into new interface#437

Merged
rapids-bot[bot] merged 88 commits into
rapidsai:mainfrom
pentschev:comms-interface
Nov 10, 2025
Merged

Separate shuffler's communication into new interface#437
rapids-bot[bot] merged 88 commits into
rapidsai:mainfrom
pentschev:comms-interface

Conversation

@pentschev

@pentschev pentschev commented Aug 14, 2025

Copy link
Copy Markdown
Member

Move communication operations from the Shuffler::ProgressThread into a new CommunicationInterface abstract class to handle all communication, with the use of a class Message. This approach has the benefit of making a more distinct and easier to understand separation of the communication routine from the shuffler making the progress operator considerably smaller (will be done in follow-up PR), simultaneously allowing extensions to the communication interface disjoint from the shuffler, for example, to allow implementing an Active Message-based communicator for the shuffler. It also generalizes the concept of a "message", instead of transferring chunks all the CommunicationInterface knows about are messages represented by the Message class that combines metadata, payload and src/dst rank.

Currently implement only a TagCommunicationInterface that implements exactly the behavior that is implemented in the shuffler progress, additionally removing the need for the ack message. Extending to support Active Messages will also be handled in a separate PR.

Removes also ack messages, closes #475, closes #535 .

@pentschev pentschev self-assigned this Aug 14, 2025
@pentschev pentschev added feature request New feature or request non-breaking Introduces a non-breaking change labels Aug 14, 2025
@copy-pr-bot

copy-pr-bot Bot commented Aug 14, 2025

Copy link
Copy Markdown

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@pentschev

Copy link
Copy Markdown
Member Author

/ok to test

@pentschev

Copy link
Copy Markdown
Member Author

/ok to test

@pentschev

Copy link
Copy Markdown
Member Author

/ok to test

@pentschev pentschev changed the title Separate shuffler's communication into separate interface Separate shuffler's communication into new interface Aug 15, 2025
@pentschev pentschev marked this pull request as ready for review September 24, 2025 20:47
@pentschev pentschev requested a review from a team as a code owner September 24, 2025 20:47
Comment on lines +131 to +143
std::vector<std::unique_ptr<MetadataPayloadExchange::Message>> received_messages;
for (int iter = 0; iter < 10 && received_messages.empty(); ++iter) {
auto messages = comm_interface->recv();
received_messages.insert(
received_messages.end(),
std::make_move_iterator(messages.begin()),
std::make_move_iterator(messages.end())
);

if (!received_messages.empty())
break;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test, rank 0 sends to rank 1, and everyone else "does nothing".

But the test is not really following that flow.

I think we want something like:

if (comm->nranks() != 2) {
   GTEST_SKIP() << "Only for two ranks";
}

if (comm->rank() == 0) {
   send(message);
} else {
    auto received = ...;
    while (received.empty()) {
        std::ranges::move(comm_interface->recv(), std::back_inserter(received));
        std::this_thread::yield();
    }
    EXPECT_EQ(received.size(), 1);
    EXPECT_EQ(msg->peer_rank(), 0);
    ...
}

But now I am worried about rank-0 finishing the test with stuff still in flight.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, wait, rank-0 must also call recv repeatedly because that's the only thing that actually progresses the underlying tag communication.

This is very counter intuitive.

Comment on lines +107 to +111
completed_messages.insert(
completed_messages.end(),
std::make_move_iterator(completed_data.begin()),
std::make_move_iterator(completed_data.end())
);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::ranges::move(completed_data, std::back_inserter(completed_messages));

@wence- wence- left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, reading through the usage in the tests gives me some pause for thought.

It seems like the sender must call recv repeatedly to progress sends (even if they're never receiving anything). Do I have this right?

@pentschev

pentschev commented Oct 31, 2025

Copy link
Copy Markdown
Member Author

@wence- I have pushed multiple changes that do what you requested in your latest comments and what we've discussed offline. Here's a summary of the changes:

  1. Add a progress() method to allow progressing the state of MetadataPayloadExchange independent of recv()
  2. recv() now only returns messages that were previously received during progress()
  3. Address test issues
    1. Use a ring topology instead of the (also previously broken) root to all: send data to "next rank", receive data from "previous rank"
    2. Remove timeouts, that was an overly eager optimization and could also lead to incomplete processing as you noticed. If tests hang this will cause a deadlock but it is how most of our tests are currently written we can always revisit and add timeouts later if they begin to show signs of being problematic
  4. Fixed a bug with message ordering that was introduced with some of the changes made when addressing previous change requests in this PR

I hope this now addresses all the concerns, and more importantly does it all correctly.

One more thing: I just want to say I will start pushing against changes in the Tag communication flow in this PR, my initial idea was to start simple with porting over the Shuffler comms implementation, and while some of the changes requested previously were indeed important to make MetadataPayloadExchange better, others were deeply involved in changing the actual implementation details of the Tag code which led me to make mistakes that are not necessarily well-tested in this PR because the Shuffler is not currently using the changes here. This made me spend over an hour now to address item 4, since that change was not necessary from an interface perspective. So for any implementation changes that do not pertain to the interface but rather to the Tag API implementation let's do so in a follow-up PR where needed.

std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

if (comm->rank() == peer_rank) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is never true.

std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

if (comm->rank() == peer_rank) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is never true

@wence- wence- left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small suggestions for fixing, but I think this now looks like a good interface


EXPECT_TRUE(comm_interface->is_idle());
comm_interface->send(std::move(messages));
EXPECT_FALSE(comm_interface->is_idle());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we're idle here because no progress has been called yet?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, in more specific terms it means there's still unfinished work, calling progress() only may not immediately change state.

Comment thread cpp/tests/test_metadata_payload_exchange.cpp
Comment on lines +108 to +118
// Store all completed messages
received_messages_.insert(
received_messages_.end(),
std::make_move_iterator(completed_metadata_only.begin()),
std::make_move_iterator(completed_metadata_only.end())
);
received_messages_.insert(
received_messages_.end(),
std::make_move_iterator(completed_data.begin()),
std::make_move_iterator(completed_data.end())
);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::ranges::move(completed_metadata_only, std::back_inserter(received_messages_));
std::ranges::move(completed_data, std::back_inserter(received_messages_));

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in f3530ef.

Comment on lines +127 to +135
auto const t0 = Clock::now();

// Return all completed messages and clear the internal storage
auto messages = std::move(received_messages_);
received_messages_.clear();

statistics_->add_duration_stat("comms-interface-receive-messages", Clock::now() - t0);

return messages;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this makes sense to time now, since it's just a pointer move, so the timing will be orders of magnitude longer than move.

I think this whole function should be (no need for the clear() because moving from a vector clears it):

return std::move(received_messages_);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, removed in bd3c42b and c20d981.

Comment on lines +70 to +100
/**
* @brief Get the destination rank for outgoing or source rank for incoming
* messages.
*
* @return The rank of the destination or source.
*/
[[nodiscard]] constexpr Rank peer_rank() const {
return peer_rank_;
}

/**
* @brief Get the serialized metadata for this message.
*
* This metadata is sent first to inform the receiver about the incoming message.
*
* @return The serialized metadata.
*/
[[nodiscard]] constexpr std::vector<std::uint8_t> const& metadata() const {
return metadata_;
}

/**
* @brief Release ownership of the metadata.
*
* This is typically called when transferring metadata to the communication layer.
*
* @return Metadata with ownership transferred.
*/
[[nodiscard]] std::vector<std::uint8_t> release_metadata() {
return std::move(metadata_);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move implementation to core.cpp?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved release_metadata in ff13c0a, but the others would require removing the constexpr qualifier, did you really mean to suggest that?

Comment on lines +57 to +68
/**
* @brief Construct a new Message.
*
* @param peer_rank Destination (outgoing) or source (incoming) rank.
* @param metadata Serialized metadata.
* @param data Data buffer (can be nullptr for metadata-only messages).
*/
Message(
Rank peer_rank,
std::vector<std::uint8_t> metadata,
std::unique_ptr<Buffer> data = nullptr
);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be valid to only send data?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we can't allow it, it should be possible but the overhead will be the same as sending metadata, because we need to pack some "header" together with the metadata. I'll make the change to support it.

@pentschev pentschev Nov 7, 2025

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we can pass an empty vector, added a test for that in aa68e8c, does that suffice? I think it keeps the interface cleaner (instead of having something like std::optional) and delivers essentially the same result with minimal overhead. Note that there's always some sort of metadata/header required, so implementation-wise it's identical.

*/
Message(
Rank peer_rank,
std::vector<std::uint8_t> metadata,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accept by && reference?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in efa9d94.

@pentschev

Copy link
Copy Markdown
Member Author

Thanks all for the reviews, I'll go ahead and merge this now and continue the integration in a follow-up PR. Let me know if you see other issues we need to address.

@pentschev

Copy link
Copy Markdown
Member Author

/merge

@rapids-bot rapids-bot Bot merged commit dc5850d into rapidsai:main Nov 10, 2025
91 checks passed
@pentschev pentschev deleted the comms-interface branch December 12, 2025 20:02
rapids-bot Bot pushed a commit that referenced this pull request Apr 14, 2026
Follow-up on #437 by switching `Shuffler` to use the new `MetadataPayloadExchange`. This helps simplify the code in `Shuffler`, and simultaneously remove the need for an "ack" message, opening also the possiblity for using an Active Message-based communicator (TBD).

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: #649
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature request New feature or request non-breaking Introduces a non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Remove need for ack messages in shuffler implementation

4 participants