Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions nexus/test-utils/src/nexus_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use omicron_common::api::internal::nexus::Certificate;
use omicron_sled_agent::sim;
use omicron_test_utils::dev;
use omicron_test_utils::dev::poll;
use omicron_test_utils::dev::poll::CondCheckError;
use omicron_test_utils::dev::poll::wait_for_watch_channel_condition;
use omicron_uuid_kinds::BlueprintUuid;
use omicron_uuid_kinds::SledUuid;
Expand Down Expand Up @@ -179,23 +178,20 @@ impl<N: NexusServer> ControlPlaneTestContext<N> {

match wait_for_watch_channel_condition(
&mut inv_rx,
async |inv| {
if inv.is_some() {
Ok(())
} else {
Err(CondCheckError::<()>::NotYet)
}
},
|inv| inv.is_some(),
timeout,
)
.await
{
Ok(()) => (),
Err(poll::Error::TimedOut(elapsed)) => {
Err(poll::WatchChannelError::TimedOut(elapsed)) => {
panic!("no inventory collection found within {elapsed:?}");
}
Err(poll::Error::PermanentError(())) => {
unreachable!("check can only fail via timeout")
Err(poll::WatchChannelError::SenderDropped) => {
panic!(
"inventory watch channel sender dropped before a \
collection was available"
);
}
}
}
Expand Down
9 changes: 1 addition & 8 deletions sled-agent/config-reconciler/src/internal_disks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,6 @@ mod tests {
use assert_matches::assert_matches;
use iddqd::id_ord_map;
use omicron_test_utils::dev;
use omicron_test_utils::dev::poll::CondCheckError;
use omicron_test_utils::dev::poll::wait_for_watch_channel_condition;
use omicron_uuid_kinds::InternalZpoolUuid;
use proptest::sample::size_range;
Expand Down Expand Up @@ -1254,13 +1253,7 @@ mod tests {
// disk is adopted.
wait_for_watch_channel_condition(
&mut disks_rx.errors_rx,
async |errors| {
if errors.is_empty() {
Ok(())
} else {
Err(CondCheckError::<()>::NotYet)
}
},
|errors| errors.is_empty(),
Duration::from_secs(30),
)
.await
Expand Down
19 changes: 7 additions & 12 deletions sled-agent/config-reconciler/src/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,6 @@ mod tests {
use omicron_common::disk::DiskIdentity;
use omicron_common::disk::OmicronPhysicalDiskConfig;
use omicron_test_utils::dev;
use omicron_test_utils::dev::poll::CondCheckError;
use omicron_test_utils::dev::poll::wait_for_watch_channel_condition;
use omicron_uuid_kinds::InternalZpoolUuid;
use omicron_uuid_kinds::MupdateOverrideUuid;
Expand Down Expand Up @@ -826,17 +825,15 @@ mod tests {
// `WaitingForInitialConfig` (if we didn't).
wait_for_watch_channel_condition(
&mut current_config_rx,
async |config| match config {
CurrentSledConfig::WaitingForInternalDisks => {
Err(CondCheckError::<()>::NotYet)
}
|config| match config {
CurrentSledConfig::WaitingForInternalDisks => false,
CurrentSledConfig::WaitingForInitialConfig => {
assert!(sled_config.is_none());
Ok(())
true
}
CurrentSledConfig::Ledgered(_) => {
assert!(sled_config.is_some());
Ok(())
true
}
},
Duration::from_secs(30),
Expand Down Expand Up @@ -931,11 +928,9 @@ mod tests {
// `WaitingForInitialConfig`.
wait_for_watch_channel_condition(
&mut current_config_rx,
async |config| match config {
CurrentSledConfig::WaitingForInternalDisks => {
Err(CondCheckError::<()>::NotYet)
}
CurrentSledConfig::WaitingForInitialConfig => Ok(()),
|config| match config {
CurrentSledConfig::WaitingForInternalDisks => false,
CurrentSledConfig::WaitingForInitialConfig => true,
CurrentSledConfig::Ledgered(config) => {
panic!("unexpected config found: {config:?}");
}
Expand Down
62 changes: 28 additions & 34 deletions test-utils/src/dev/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,45 +91,39 @@ where
}
}

/// Poll the contents of the given watch channel until the given closure
/// succeeds, returns a permanent error, or a given time has expired
/// Error returned by [`wait_for_watch_channel_condition()`].
#[derive(Debug, Error)]
pub enum WatchChannelError {
/// the condition was not satisfied before the timeout elapsed
#[error("timed out after {0:?}")]
TimedOut(Duration),

/// the watch channel's sender was dropped before the condition was
/// satisfied, so the value can never change again
#[error("watch channel sender dropped before the condition was satisfied")]
SenderDropped,
}

/// Wait until the contents of the given watch channel satisfy `cond`, or until
/// `timeout` elapses.
///
/// This function is similar to `wait_for_condition` above, and most of the same
/// caveats from it apply. The biggest difference is that instead of taking a
/// `poll_interval`, this function relies on the watch channel's `changed()`
/// notification to decide when to retry the given closure.
/// This relies on the watch channel's change notifications: `cond` is checked
/// against the current value, then re-checked each time the channel changes.
///
/// Note that `timeout` is not a bound on how long this function can take.
/// Rather, it's the time beyond which this function will stop trying to check
/// `cond`. If `cond` takes an arbitrary amount of time, this function will
/// too.
pub async fn wait_for_watch_channel_condition<T, O, E, Func>(
/// Returns [`WatchChannelError::TimedOut`] if `cond` is not satisfied within
/// `timeout`, or [`WatchChannelError::SenderDropped`] if the sender is dropped
/// first (after which the value can never change again).
pub async fn wait_for_watch_channel_condition<T, Func>(
rx: &mut watch::Receiver<T>,
mut cond: Func,
cond: Func,
timeout: Duration,
) -> Result<O, Error<E>>
) -> Result<(), WatchChannelError>
where
Func: AsyncFnMut(&T) -> Result<O, CondCheckError<E>>,
Func: FnMut(&T) -> bool,
{
let start = Instant::now();
let deadline = start + timeout;
loop {
let check = cond(&*rx.borrow_and_update()).await;
if let Ok(output) = check {
return Ok(output);
}

if let Err(CondCheckError::Failed(e)) = check {
return Err(Error::PermanentError(e));
}

tokio::select! {
_ = rx.changed() => continue,
// If we're already past the deadline, `duration_since` returns 0,
// so this sleep will be ready immediately.
_ = tokio::time::sleep(deadline.duration_since(start)) => {
return Err(Error::TimedOut(timeout));
}
}
match tokio::time::timeout(timeout, rx.wait_for(cond)).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(_recv_error)) => Err(WatchChannelError::SenderDropped),
Err(_elapsed) => Err(WatchChannelError::TimedOut(timeout)),
}
}
Loading