Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rs/canonical_state/src/encoding/tests/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ fn with_stream_header_deltas(
QueueFull => header.reject_signals.queue_full_deltas = deltas,
OutOfMemory => header.reject_signals.out_of_memory_deltas = deltas,
Unknown => header.reject_signals.unknown_deltas = deltas,
EngineNotAllowed => header.reject_signals.engine_not_allowed_deltas = deltas,
}
header
}
Expand Down
5 changes: 5 additions & 0 deletions rs/canonical_state/src/encoding/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub struct RejectSignals {
pub out_of_memory_deltas: Vec<u64>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub unknown_deltas: Vec<u64>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub engine_not_allowed_deltas: Vec<u64>,
Comment on lines +70 to +71

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damn, this just occurred to me now: whether we go with an explicit certification version or not, a canonical_state change requires a staged rollout: first deploy the new "variant" (to all subnets, in this case, since it affects stream structure) with the full logic required to handle it (in this case, to inflate the signal into a reject response, so not much); and only then deploy replica binaries that may produce it.

If we're to do this properly, the new certification version would be introduced with the field; and it would become the "current version" once we have code that can produce it. An explicit version would also allow us to rely on tests to tell us when it's safe to merge follow-up changes.

You should also make a copy of the original RejectSignals type (no engine_not_allowed_deltas field) under rs/canonical_state/encoding/old_types.rs (likely as RejectSignalsV25) and update encoding/tests/compatibility.rs and encoding/tests/test_fixtures.rs to also cover the new variant.

}

impl RejectSignals {
Expand All @@ -78,6 +80,7 @@ impl RejectSignals {
&& self.queue_full_deltas.is_empty()
&& self.out_of_memory_deltas.is_empty()
&& self.unknown_deltas.is_empty()
&& self.engine_not_allowed_deltas.is_empty()
}
}

Expand Down Expand Up @@ -332,6 +335,7 @@ impl From<(&VecDeque<RejectSignal>, StreamIndex, CertificationVersion)> for Reje
queue_full_deltas: deltas_for(RejectReason::QueueFull),
out_of_memory_deltas: deltas_for(RejectReason::OutOfMemory),
unknown_deltas: deltas_for(RejectReason::Unknown),
engine_not_allowed_deltas: deltas_for(RejectReason::EngineNotAllowed),
}
}
}
Expand All @@ -351,6 +355,7 @@ pub(crate) fn try_from_deltas(
(QueueFull, &reject_signals.queue_full_deltas),
(OutOfMemory, &reject_signals.out_of_memory_deltas),
(Unknown, &reject_signals.unknown_deltas),
(EngineNotAllowed, &reject_signals.engine_not_allowed_deltas),
] {
let mut stream_index = StreamIndex::new(signals_end);
for delta in deltas.iter().rev() {
Expand Down
79 changes: 37 additions & 42 deletions rs/messaging/src/message_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ const CRITICAL_ERROR_NO_CANISTER_ALLOCATION_RANGE: &str = "mr_empty_canister_all
const CRITICAL_ERROR_FAILED_TO_READ_REGISTRY: &str = "mr_failed_to_read_registry_error";
pub const CRITICAL_ERROR_NON_INCREASING_BATCH_TIME: &str = "mr_non_increasing_batch_time";
pub const CRITICAL_ERROR_INDUCT_RESPONSE_FAILED: &str = "mr_induct_response_failed";
pub const CRITICAL_ERROR_ENGINE_MESSAGE: &str = "mr_engine_message";
const CRITICAL_ERROR_ILLEGAL_NON_EMPTY_SUBNET_ADMINS: &str = "mr_illegal_non_empty_subnet_admins";

/// Records the timestamp when all messages before the given index (down to the
Expand Down Expand Up @@ -344,6 +345,10 @@ pub(crate) struct MessageRoutingMetrics {
/// Critical error counter (see [`MetricsRegistry::error_counter`]) tracking
/// failures to induct responses.
pub critical_error_induct_response_failed: IntCounter,
/// Critical error counter (see [`MetricsRegistry::error_counter`]) tracking
/// messages to/from CloudEngine subnets that carried cycles or were
/// guaranteed-response calls, which are not permitted on non-engine subnets.
pub critical_error_engine_message: IntCounter,
/// Critical error: a non-rental subnet has a non-empty subnet admins list.
critical_error_illegal_non_empty_subnet_admins: IntCounter,

Expand Down Expand Up @@ -487,6 +492,8 @@ impl MessageRoutingMetrics {
.error_counter(CRITICAL_ERROR_NON_INCREASING_BATCH_TIME),
critical_error_induct_response_failed: metrics_registry
.error_counter(CRITICAL_ERROR_INDUCT_RESPONSE_FAILED),
critical_error_engine_message: metrics_registry
.error_counter(CRITICAL_ERROR_ENGINE_MESSAGE),
critical_error_illegal_non_empty_subnet_admins: metrics_registry
.error_counter(CRITICAL_ERROR_ILLEGAL_NON_EMPTY_SUBNET_ADMINS),

Expand Down Expand Up @@ -1070,43 +1077,17 @@ impl<RegistryClient_: RegistryClient> BatchProcessorImpl<RegistryClient_> {
.map_err(|err| registry_error("routing table", None, err))?
.unwrap_or_default();

// Derive filtered subnets and routing table.
let (subnets, routing_table) = if own_subnet_type == SubnetType::CloudEngine {
// CloudEngine subnets only see themselves.
let subnets = all_subnets
.iter()
.filter(|(id, _)| **id == own_subnet_id)
.map(|(id, topo)| (*id, topo.clone()))
.collect();
let routing_table = full_routing_table
.iter()
.filter(|(_, id)| **id == own_subnet_id)
.map(|(range, id)| (*range, *id))
.collect::<BTreeMap<_, _>>()
.try_into()
.map_err(|err| {
Persistent(format!(
"'filtered routing table for CloudEngine subnet {}', err: {:?}",
own_subnet_id, err
))
})?;
(subnets, routing_table)
} else {
// Non-engine subnets see every subnet that is *not* a CloudEngine.
let subnets: BTreeMap<_, _> = all_subnets
.iter()
.filter(|(_, topo)| topo.subnet_type != SubnetType::CloudEngine)
.map(|(id, topo)| (*id, topo.clone()))
.collect();
let routing_table = full_routing_table
.iter()
.filter(|(_, id)| subnets.contains_key(id))
.map(|(range, id)| (*range, *id))
.collect::<BTreeMap<_, _>>()
.try_into()
.map_err(|err| Persistent(format!("'filtered routing table', err: {:?}", err)))?;
(subnets, routing_table)
};
// All subnets see the full topology, including CloudEngine subnets.
let subnets: BTreeMap<_, _> = all_subnets
.iter()
.map(|(id, topo)| (*id, topo.clone()))
.collect();
Comment thread
schneiderstefan marked this conversation as resolved.
let routing_table = full_routing_table
.iter()
.map(|(range, id)| (*range, *id))
.collect::<BTreeMap<_, _>>()
.try_into()
.map_err(|err| Persistent(format!("routing table err: {:?}", err)))?;
let canister_migrations = self
.registry
.get_canister_migrations(registry_version)
Expand All @@ -1120,16 +1101,30 @@ impl<RegistryClient_: RegistryClient> BatchProcessorImpl<RegistryClient_> {
.ok_or_else(|| not_found_error("NNS subnet ID", None))?;

// Look up the default subnet for `SetupInitialDKG`. The key may be
// unset (in which case `SetupInitialDKG` requests fall back to the
// calling subnet), or the configured subnet may not be visible from
// this subnet (e.g. if it has been filtered out above), in which case
// we also fall back to the calling subnet.
// unset, in which case `SetupInitialDKG` requests fall back to the
// calling subnet.
let default_initial_dkg_subnet_id = self
.registry
.get_default_initial_dkg_subnet_id(registry_version)
.map_err(|err| registry_error("default initial DKG subnet ID", None, err))?
.filter(|subnet_id| subnets.contains_key(subnet_id));

// A signing subnet is only usable from here if a chain-key request can
// reach it without crossing the CloudEngine boundary: requests carrying
// cycles or guaranteed-response calls are rejected at that boundary (see
// `StreamBuilderImpl`/`StreamHandlerImpl`), and chain-key requests are
// always both. The local subnet is always reachable (loopback); any other
// subnet is reachable only if neither end is a CloudEngine. Pruning such
// subnets here means routing returns a clean `ChainKeyError` (and the cost
// API an `UnknownKey`) instead of routing a doomed request to the boundary.
let chain_key_subnet_is_reachable = |id: &SubnetId| match subnets.get(id) {
None => false,
Some(topo) => {
*id == own_subnet_id
|| (own_subnet_type != SubnetType::CloudEngine
&& topo.subnet_type != SubnetType::CloudEngine)
Comment thread
alin-at-dfinity marked this conversation as resolved.
}
};
let chain_key_enabled_subnets: BTreeMap<_, _> = self
.registry
.get_chain_key_enabled_subnets(registry_version)
Expand All @@ -1139,7 +1134,7 @@ impl<RegistryClient_: RegistryClient> BatchProcessorImpl<RegistryClient_> {
.filter_map(|(key, subnet_ids)| {
let filtered: Vec<_> = subnet_ids
.into_iter()
.filter(|id| subnets.contains_key(id))
.filter(&chain_key_subnet_is_reachable)
.collect();
if filtered.is_empty() {
None
Expand Down
Loading
Loading