Separate shuffler's communication into new interface#437
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
|
/ok to test |
|
/ok to test |
|
/ok to test |
| std::vector<std::unique_ptr<MetadataPayloadExchange::Message>> received_messages; | ||
| for (int iter = 0; iter < 10 && received_messages.empty(); ++iter) { | ||
| auto messages = comm_interface->recv(); | ||
| received_messages.insert( | ||
| received_messages.end(), | ||
| std::make_move_iterator(messages.begin()), | ||
| std::make_move_iterator(messages.end()) | ||
| ); | ||
|
|
||
| if (!received_messages.empty()) | ||
| break; | ||
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); | ||
| } |
There was a problem hiding this comment.
In this test, rank 0 sends to rank 1, and everyone else "does nothing".
But the test is not really following that flow.
I think we want something like:
if (comm->nranks() != 2) {
GTEST_SKIP() << "Only for two ranks";
}
if (comm->rank() == 0) {
send(message);
} else {
auto received = ...;
while (received.empty()) {
std::ranges::move(comm_interface->recv(), std::back_inserter(received));
std::this_thread::yield();
}
EXPECT_EQ(received.size(), 1);
EXPECT_EQ(msg->peer_rank(), 0);
...
}
But now I am worried about rank-0 finishing the test with stuff still in flight.
There was a problem hiding this comment.
Ah, wait, rank-0 must also call recv repeatedly because that's the only thing that actually progresses the underlying tag communication.
This is very counter intuitive.
| completed_messages.insert( | ||
| completed_messages.end(), | ||
| std::make_move_iterator(completed_data.begin()), | ||
| std::make_move_iterator(completed_data.end()) | ||
| ); |
There was a problem hiding this comment.
std::ranges::move(completed_data, std::back_inserter(completed_messages));
wence-
left a comment
There was a problem hiding this comment.
Sorry, reading through the usage in the tests gives me some pause for thought.
It seems like the sender must call recv repeatedly to progress sends (even if they're never receiving anything). Do I have this right?
|
@wence- I have pushed multiple changes that do what you requested in your latest comments and what we've discussed offline. Here's a summary of the changes:
I hope this now addresses all the concerns, and more importantly does it all correctly. One more thing: I just want to say I will start pushing against changes in the Tag communication flow in this PR, my initial idea was to start simple with porting over the |
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); | ||
| } | ||
|
|
||
| if (comm->rank() == peer_rank) { |
There was a problem hiding this comment.
This condition is never true.
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); | ||
| } | ||
|
|
||
| if (comm->rank() == peer_rank) { |
There was a problem hiding this comment.
This condition is never true
wence-
left a comment
There was a problem hiding this comment.
Some small suggestions for fixing, but I think this now looks like a good interface
|
|
||
| EXPECT_TRUE(comm_interface->is_idle()); | ||
| comm_interface->send(std::move(messages)); | ||
| EXPECT_FALSE(comm_interface->is_idle()); |
There was a problem hiding this comment.
OK, we're idle here because no progress has been called yet?
There was a problem hiding this comment.
Correct, in more specific terms it means there's still unfinished work, calling progress() only may not immediately change state.
| // Store all completed messages | ||
| received_messages_.insert( | ||
| received_messages_.end(), | ||
| std::make_move_iterator(completed_metadata_only.begin()), | ||
| std::make_move_iterator(completed_metadata_only.end()) | ||
| ); | ||
| received_messages_.insert( | ||
| received_messages_.end(), | ||
| std::make_move_iterator(completed_data.begin()), | ||
| std::make_move_iterator(completed_data.end()) | ||
| ); |
There was a problem hiding this comment.
std::ranges::move(completed_metadata_only, std::back_inserter(received_messages_));
std::ranges::move(completed_data, std::back_inserter(received_messages_));
| auto const t0 = Clock::now(); | ||
|
|
||
| // Return all completed messages and clear the internal storage | ||
| auto messages = std::move(received_messages_); | ||
| received_messages_.clear(); | ||
|
|
||
| statistics_->add_duration_stat("comms-interface-receive-messages", Clock::now() - t0); | ||
|
|
||
| return messages; |
There was a problem hiding this comment.
I don't think this makes sense to time now, since it's just a pointer move, so the timing will be orders of magnitude longer than move.
I think this whole function should be (no need for the clear() because moving from a vector clears it):
return std::move(received_messages_);
| /** | ||
| * @brief Get the destination rank for outgoing or source rank for incoming | ||
| * messages. | ||
| * | ||
| * @return The rank of the destination or source. | ||
| */ | ||
| [[nodiscard]] constexpr Rank peer_rank() const { | ||
| return peer_rank_; | ||
| } | ||
|
|
||
| /** | ||
| * @brief Get the serialized metadata for this message. | ||
| * | ||
| * This metadata is sent first to inform the receiver about the incoming message. | ||
| * | ||
| * @return The serialized metadata. | ||
| */ | ||
| [[nodiscard]] constexpr std::vector<std::uint8_t> const& metadata() const { | ||
| return metadata_; | ||
| } | ||
|
|
||
| /** | ||
| * @brief Release ownership of the metadata. | ||
| * | ||
| * This is typically called when transferring metadata to the communication layer. | ||
| * | ||
| * @return Metadata with ownership transferred. | ||
| */ | ||
| [[nodiscard]] std::vector<std::uint8_t> release_metadata() { | ||
| return std::move(metadata_); | ||
| } |
There was a problem hiding this comment.
Move implementation to core.cpp?
There was a problem hiding this comment.
Moved release_metadata in ff13c0a, but the others would require removing the constexpr qualifier, did you really mean to suggest that?
| /** | ||
| * @brief Construct a new Message. | ||
| * | ||
| * @param peer_rank Destination (outgoing) or source (incoming) rank. | ||
| * @param metadata Serialized metadata. | ||
| * @param data Data buffer (can be nullptr for metadata-only messages). | ||
| */ | ||
| Message( | ||
| Rank peer_rank, | ||
| std::vector<std::uint8_t> metadata, | ||
| std::unique_ptr<Buffer> data = nullptr | ||
| ); |
There was a problem hiding this comment.
Should it be valid to only send data?
There was a problem hiding this comment.
I don't see why we can't allow it, it should be possible but the overhead will be the same as sending metadata, because we need to pack some "header" together with the metadata. I'll make the change to support it.
There was a problem hiding this comment.
Actually, we can pass an empty vector, added a test for that in aa68e8c, does that suffice? I think it keeps the interface cleaner (instead of having something like std::optional) and delivers essentially the same result with minimal overhead. Note that there's always some sort of metadata/header required, so implementation-wise it's identical.
| */ | ||
| Message( | ||
| Rank peer_rank, | ||
| std::vector<std::uint8_t> metadata, |
|
Thanks all for the reviews, I'll go ahead and merge this now and continue the integration in a follow-up PR. Let me know if you see other issues we need to address. |
|
/merge |
Follow-up on #437 by switching `Shuffler` to use the new `MetadataPayloadExchange`. This helps simplify the code in `Shuffler`, and simultaneously remove the need for an "ack" message, opening also the possiblity for using an Active Message-based communicator (TBD). Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) URL: #649
Move communication operations from the
Shuffler::ProgressThreadinto a newCommunicationInterfaceabstract class to handle all communication, with the use of a classMessage. This approach has the benefit of making a more distinct and easier to understand separation of the communication routine from the shuffler making the progress operator considerably smaller (will be done in follow-up PR), simultaneously allowing extensions to the communication interface disjoint from the shuffler, for example, to allow implementing an Active Message-based communicator for the shuffler. It also generalizes the concept of a "message", instead of transferring chunks all theCommunicationInterfaceknows about are messages represented by theMessageclass that combines metadata, payload and src/dst rank.Currently implement only a
TagCommunicationInterfacethat implements exactly the behavior that is implemented in the shuffler progress, additionally removing the need for the ack message. Extending to support Active Messages will also be handled in a separate PR.Removes also ack messages, closes #475, closes #535 .