diff --git a/rs/canonical_state/src/encoding/tests/conversion.rs b/rs/canonical_state/src/encoding/tests/conversion.rs index 141b2f074010..c491f2d664e1 100644 --- a/rs/canonical_state/src/encoding/tests/conversion.rs +++ b/rs/canonical_state/src/encoding/tests/conversion.rs @@ -83,6 +83,7 @@ fn with_stream_header_deltas( QueueFull => header.reject_signals.queue_full_deltas = deltas, OutOfMemory => header.reject_signals.out_of_memory_deltas = deltas, Unknown => header.reject_signals.unknown_deltas = deltas, + EngineNotAllowed => header.reject_signals.engine_not_allowed_deltas = deltas, } header } diff --git a/rs/canonical_state/src/encoding/types.rs b/rs/canonical_state/src/encoding/types.rs index fa1c0c81e32c..4bb2e28a40ef 100644 --- a/rs/canonical_state/src/encoding/types.rs +++ b/rs/canonical_state/src/encoding/types.rs @@ -67,6 +67,8 @@ pub struct RejectSignals { pub out_of_memory_deltas: Vec, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub unknown_deltas: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub engine_not_allowed_deltas: Vec, } impl RejectSignals { @@ -78,6 +80,7 @@ impl RejectSignals { && self.queue_full_deltas.is_empty() && self.out_of_memory_deltas.is_empty() && self.unknown_deltas.is_empty() + && self.engine_not_allowed_deltas.is_empty() } } @@ -332,6 +335,7 @@ impl From<(&VecDeque, StreamIndex, CertificationVersion)> for Reje queue_full_deltas: deltas_for(RejectReason::QueueFull), out_of_memory_deltas: deltas_for(RejectReason::OutOfMemory), unknown_deltas: deltas_for(RejectReason::Unknown), + engine_not_allowed_deltas: deltas_for(RejectReason::EngineNotAllowed), } } } @@ -351,6 +355,7 @@ pub(crate) fn try_from_deltas( (QueueFull, &reject_signals.queue_full_deltas), (OutOfMemory, &reject_signals.out_of_memory_deltas), (Unknown, &reject_signals.unknown_deltas), + (EngineNotAllowed, &reject_signals.engine_not_allowed_deltas), ] { let mut stream_index = StreamIndex::new(signals_end); for delta in deltas.iter().rev() { diff --git a/rs/messaging/src/message_routing.rs b/rs/messaging/src/message_routing.rs index e737a85ee358..abbacfe1a0bb 100644 --- a/rs/messaging/src/message_routing.rs +++ b/rs/messaging/src/message_routing.rs @@ -127,6 +127,7 @@ const CRITICAL_ERROR_NO_CANISTER_ALLOCATION_RANGE: &str = "mr_empty_canister_all const CRITICAL_ERROR_FAILED_TO_READ_REGISTRY: &str = "mr_failed_to_read_registry_error"; pub const CRITICAL_ERROR_NON_INCREASING_BATCH_TIME: &str = "mr_non_increasing_batch_time"; pub const CRITICAL_ERROR_INDUCT_RESPONSE_FAILED: &str = "mr_induct_response_failed"; +pub const CRITICAL_ERROR_ENGINE_MESSAGE: &str = "mr_engine_message"; const CRITICAL_ERROR_ILLEGAL_NON_EMPTY_SUBNET_ADMINS: &str = "mr_illegal_non_empty_subnet_admins"; /// Records the timestamp when all messages before the given index (down to the @@ -344,6 +345,10 @@ pub(crate) struct MessageRoutingMetrics { /// Critical error counter (see [`MetricsRegistry::error_counter`]) tracking /// failures to induct responses. pub critical_error_induct_response_failed: IntCounter, + /// Critical error counter (see [`MetricsRegistry::error_counter`]) tracking + /// messages to/from CloudEngine subnets that carried cycles or were + /// guaranteed-response calls, which are not permitted on non-engine subnets. + pub critical_error_engine_message: IntCounter, /// Critical error: a non-rental subnet has a non-empty subnet admins list. critical_error_illegal_non_empty_subnet_admins: IntCounter, @@ -487,6 +492,8 @@ impl MessageRoutingMetrics { .error_counter(CRITICAL_ERROR_NON_INCREASING_BATCH_TIME), critical_error_induct_response_failed: metrics_registry .error_counter(CRITICAL_ERROR_INDUCT_RESPONSE_FAILED), + critical_error_engine_message: metrics_registry + .error_counter(CRITICAL_ERROR_ENGINE_MESSAGE), critical_error_illegal_non_empty_subnet_admins: metrics_registry .error_counter(CRITICAL_ERROR_ILLEGAL_NON_EMPTY_SUBNET_ADMINS), @@ -1070,43 +1077,17 @@ impl BatchProcessorImpl { .map_err(|err| registry_error("routing table", None, err))? .unwrap_or_default(); - // Derive filtered subnets and routing table. - let (subnets, routing_table) = if own_subnet_type == SubnetType::CloudEngine { - // CloudEngine subnets only see themselves. - let subnets = all_subnets - .iter() - .filter(|(id, _)| **id == own_subnet_id) - .map(|(id, topo)| (*id, topo.clone())) - .collect(); - let routing_table = full_routing_table - .iter() - .filter(|(_, id)| **id == own_subnet_id) - .map(|(range, id)| (*range, *id)) - .collect::>() - .try_into() - .map_err(|err| { - Persistent(format!( - "'filtered routing table for CloudEngine subnet {}', err: {:?}", - own_subnet_id, err - )) - })?; - (subnets, routing_table) - } else { - // Non-engine subnets see every subnet that is *not* a CloudEngine. - let subnets: BTreeMap<_, _> = all_subnets - .iter() - .filter(|(_, topo)| topo.subnet_type != SubnetType::CloudEngine) - .map(|(id, topo)| (*id, topo.clone())) - .collect(); - let routing_table = full_routing_table - .iter() - .filter(|(_, id)| subnets.contains_key(id)) - .map(|(range, id)| (*range, *id)) - .collect::>() - .try_into() - .map_err(|err| Persistent(format!("'filtered routing table', err: {:?}", err)))?; - (subnets, routing_table) - }; + // All subnets see the full topology, including CloudEngine subnets. + let subnets: BTreeMap<_, _> = all_subnets + .iter() + .map(|(id, topo)| (*id, topo.clone())) + .collect(); + let routing_table = full_routing_table + .iter() + .map(|(range, id)| (*range, *id)) + .collect::>() + .try_into() + .map_err(|err| Persistent(format!("routing table err: {:?}", err)))?; let canister_migrations = self .registry .get_canister_migrations(registry_version) @@ -1120,16 +1101,30 @@ impl BatchProcessorImpl { .ok_or_else(|| not_found_error("NNS subnet ID", None))?; // Look up the default subnet for `SetupInitialDKG`. The key may be - // unset (in which case `SetupInitialDKG` requests fall back to the - // calling subnet), or the configured subnet may not be visible from - // this subnet (e.g. if it has been filtered out above), in which case - // we also fall back to the calling subnet. + // unset, in which case `SetupInitialDKG` requests fall back to the + // calling subnet. let default_initial_dkg_subnet_id = self .registry .get_default_initial_dkg_subnet_id(registry_version) .map_err(|err| registry_error("default initial DKG subnet ID", None, err))? .filter(|subnet_id| subnets.contains_key(subnet_id)); + // A signing subnet is only usable from here if a chain-key request can + // reach it without crossing the CloudEngine boundary: requests carrying + // cycles or guaranteed-response calls are rejected at that boundary (see + // `StreamBuilderImpl`/`StreamHandlerImpl`), and chain-key requests are + // always both. The local subnet is always reachable (loopback); any other + // subnet is reachable only if neither end is a CloudEngine. Pruning such + // subnets here means routing returns a clean `ChainKeyError` (and the cost + // API an `UnknownKey`) instead of routing a doomed request to the boundary. + let chain_key_subnet_is_reachable = |id: &SubnetId| match subnets.get(id) { + None => false, + Some(topo) => { + *id == own_subnet_id + || (own_subnet_type != SubnetType::CloudEngine + && topo.subnet_type != SubnetType::CloudEngine) + } + }; let chain_key_enabled_subnets: BTreeMap<_, _> = self .registry .get_chain_key_enabled_subnets(registry_version) @@ -1139,7 +1134,7 @@ impl BatchProcessorImpl { .filter_map(|(key, subnet_ids)| { let filtered: Vec<_> = subnet_ids .into_iter() - .filter(|id| subnets.contains_key(id)) + .filter(&chain_key_subnet_is_reachable) .collect(); if filtered.is_empty() { None diff --git a/rs/messaging/src/message_routing/tests.rs b/rs/messaging/src/message_routing/tests.rs index 6734ff421580..796993d9cff3 100644 --- a/rs/messaging/src/message_routing/tests.rs +++ b/rs/messaging/src/message_routing/tests.rs @@ -1670,8 +1670,8 @@ fn try_read_registry_succeeds_and_populates_subnet_admins() { rental_subnet_record_from_topo.subnet_admins, btreeset! {rental_subnet_admin.get()} ); - // CloudEngine subnets are filtered out of the topology on non-NNS subnets. - assert!(network_topology.subnets().get(&engine_subnet_id).is_none()); + // CloudEngine subnets are visible in the full topology on all subnets. + assert!(network_topology.subnets().get(&engine_subnet_id).is_some()); }); } @@ -1743,8 +1743,8 @@ fn try_read_registry_succeeds_and_resets_subnet_admins() { // Check that subnet admins are reset and a critical error is raised. let own_subnet_record_from_topo = network_topology.subnets().get(&own_subnet_id).unwrap(); assert_eq!(own_subnet_record_from_topo.subnet_admins, BTreeSet::new()); - // CloudEngine subnets are filtered out of the topology on non-NNS subnets. - assert!(network_topology.subnets().get(&engine_subnet_id).is_none()); + // CloudEngine subnets are visible in the full topology on all subnets. + assert!(network_topology.subnets().get(&engine_subnet_id).is_some()); let nns_subnet_record_from_topo = network_topology.subnets().get(&nns_subnet_id).unwrap(); assert_eq!(nns_subnet_record_from_topo.subnet_admins, BTreeSet::new()); // The critical error is still raised for all 3 subnets (before filtering). @@ -1815,57 +1815,49 @@ fn setup_three_subnet_registry() -> (Arc, SubnetId, SubnetId ) } -/// Tests that a CloudEngine subnet sees only itself in the resulting topology: -/// `subnets`, `routing_table`, `subnets_for_certification`, and `routing_table_for_certification` -/// all contain only the own subnet. +/// Tests that a CloudEngine subnet sees the full topology (all three subnets): +/// `subnets`, `routing_table`, `subnets_for_certification`, and +/// `routing_table_for_certification` all contain every subnet. #[test] -fn try_read_registry_engine_subnet_sees_only_itself() { +fn try_read_registry_engine_subnet_sees_full_topology() { with_test_replica_logger(|log| { - let (registry, _app_subnet_id, engine_subnet_id, _nns_subnet_id) = + let (registry, app_subnet_id, engine_subnet_id, nns_subnet_id) = setup_three_subnet_registry(); let network_topology = try_to_read_registry(registry, log, engine_subnet_id) .unwrap() .0; - // Filtered view: only the own engine subnet. - assert_eq!( - network_topology.subnets().keys().collect::>(), - vec![&engine_subnet_id], - ); - assert_eq!( - network_topology - .routing_table() - .iter() - .map(|(_, sid)| *sid) - .collect::>(), - BTreeSet::from([engine_subnet_id]), - ); + let subnet_keys: Vec<_> = network_topology.subnets().keys().copied().collect(); + assert!(subnet_keys.contains(&app_subnet_id)); + assert!(subnet_keys.contains(&engine_subnet_id)); + assert!(subnet_keys.contains(&nns_subnet_id)); - // Engine accessors also return only the own subnet (no full_topology on engines). + let rt_subnets: BTreeSet<_> = network_topology + .routing_table() + .iter() + .map(|(_, sid)| *sid) + .collect(); + assert!(rt_subnets.contains(&app_subnet_id)); + assert!(rt_subnets.contains(&engine_subnet_id)); + assert!(rt_subnets.contains(&nns_subnet_id)); + + // No full_topology on engine subnets; certification accessors fall back to subnets(). assert_eq!( - network_topology - .subnets_for_certification() - .keys() - .collect::>(), - vec![&engine_subnet_id], + network_topology.subnets_for_certification(), + network_topology.subnets(), ); assert_eq!( - network_topology - .routing_table_for_certification() - .iter() - .map(|(_, sid)| *sid) - .collect::>(), - BTreeSet::from([engine_subnet_id]), + network_topology.routing_table_for_certification(), + network_topology.routing_table(), ); }); } -/// Tests that an Application subnet filters out CloudEngine subnets from its -/// topology: `subnets`, `routing_table`, `subnets_for_certification`, and -/// `routing_table_for_certification` all exclude the engine subnet. +/// Tests that an Application subnet sees the full topology (all three subnets), +/// including CloudEngine subnets. #[test] -fn try_read_registry_application_subnet_filters_out_engines() { +fn try_read_registry_application_subnet_sees_full_topology() { with_test_replica_logger(|log| { let (registry, app_subnet_id, engine_subnet_id, nns_subnet_id) = setup_three_subnet_registry(); @@ -1874,11 +1866,11 @@ fn try_read_registry_application_subnet_filters_out_engines() { .unwrap() .0; - // Filtered view: app and NNS subnets are visible, engine is excluded. + // Full view: all three subnets are visible, including the engine. let subnet_keys: Vec<_> = network_topology.subnets().keys().copied().collect(); assert!(subnet_keys.contains(&app_subnet_id)); assert!(subnet_keys.contains(&nns_subnet_id)); - assert!(!subnet_keys.contains(&engine_subnet_id)); + assert!(subnet_keys.contains(&engine_subnet_id)); let rt_subnets: BTreeSet<_> = network_topology .routing_table() @@ -1887,9 +1879,9 @@ fn try_read_registry_application_subnet_filters_out_engines() { .collect(); assert!(rt_subnets.contains(&app_subnet_id)); assert!(rt_subnets.contains(&nns_subnet_id)); - assert!(!rt_subnets.contains(&engine_subnet_id)); + assert!(rt_subnets.contains(&engine_subnet_id)); - // Engine accessors also exclude the engine (no full_topology on non-NNS subnets). + // No full_topology on non-NNS subnets; certification accessors fall back to subnets(). assert_eq!( network_topology.subnets_for_certification(), network_topology.subnets(), @@ -1901,9 +1893,8 @@ fn try_read_registry_application_subnet_filters_out_engines() { }); } -/// Tests that the NNS subnet filters out CloudEngine subnets from `subnets()` -/// and `routing_table()`, but `subnets_for_certification()` and -/// `routing_table_for_certification()` include all subnets (via `full_topology`). +/// Tests that the NNS subnet sees all subnets in both `subnets()` and +/// `subnets_for_certification()` (via `full_topology`), including engines. #[test] fn try_read_registry_nns_subnet_has_full_topology_with_engines() { with_test_replica_logger(|log| { @@ -1914,31 +1905,163 @@ fn try_read_registry_nns_subnet_has_full_topology_with_engines() { .unwrap() .0; - // Filtered view: app and NNS subnets are visible, engine is excluded. + // subnets() includes all three subnets. let subnet_keys: Vec<_> = network_topology.subnets().keys().copied().collect(); assert!(subnet_keys.contains(&app_subnet_id)); assert!(subnet_keys.contains(&nns_subnet_id)); - assert!(!subnet_keys.contains(&engine_subnet_id)); + assert!(subnet_keys.contains(&engine_subnet_id)); - // subnets_for_certification includes all three subnets via full_topology. - let all_keys: Vec<_> = network_topology + // subnets_for_certification also includes all three (via full_topology). + let cert_keys: Vec<_> = network_topology .subnets_for_certification() .keys() .copied() .collect(); - assert!(all_keys.contains(&app_subnet_id)); - assert!(all_keys.contains(&engine_subnet_id)); - assert!(all_keys.contains(&nns_subnet_id)); + assert!(cert_keys.contains(&app_subnet_id)); + assert!(cert_keys.contains(&engine_subnet_id)); + assert!(cert_keys.contains(&nns_subnet_id)); // routing_table_for_certification includes ranges for all three subnets. - let rt_with_engines_subnets: BTreeSet<_> = network_topology + let rt_cert: BTreeSet<_> = network_topology .routing_table_for_certification() .iter() .map(|(_, sid)| *sid) .collect(); - assert!(rt_with_engines_subnets.contains(&app_subnet_id)); - assert!(rt_with_engines_subnets.contains(&engine_subnet_id)); - assert!(rt_with_engines_subnets.contains(&nns_subnet_id)); + assert!(rt_cert.contains(&app_subnet_id)); + assert!(rt_cert.contains(&engine_subnet_id)); + assert!(rt_cert.contains(&nns_subnet_id)); + }); +} + +/// Like `setup_three_subnet_registry`, but also enables chain keys: `shared_key` +/// on both the Application and CloudEngine subnets, and `engine_only_key` on the +/// CloudEngine subnet alone. +fn setup_three_subnet_registry_with_chain_keys() +-> (Arc, SubnetId, SubnetId, SubnetId) { + use Integrity::*; + + let dummy_transcript = dummy_transcript_for_tests(); + + let app_subnet_id = subnet_test_id(1); + let app_subnet_record = SubnetRecord { + subnet_type: SubnetType::Application, + ..Default::default() + }; + let engine_subnet_id = subnet_test_id(2); + let engine_subnet_record = SubnetRecord { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }; + let nns_subnet_id = subnet_test_id(3); + let nns_subnet_record = SubnetRecord { + subnet_type: SubnetType::System, + ..Default::default() + }; + + let mut routing_table = RoutingTable::new(); + routing_table_insert_subnet(&mut routing_table, app_subnet_id).unwrap(); + routing_table_insert_subnet(&mut routing_table, engine_subnet_id).unwrap(); + routing_table_insert_subnet(&mut routing_table, nns_subnet_id).unwrap(); + + let chain_key_enabled_subnets = btreemap! { + shared_chain_key() => Valid(vec![app_subnet_id, engine_subnet_id]), + engine_only_chain_key() => Valid(vec![engine_subnet_id]), + }; + + let fixture = RegistryFixture::new(); + fixture + .write_test_records(&TestRecords { + subnet_ids: Valid([app_subnet_id, engine_subnet_id, nns_subnet_id]), + subnet_records: [ + Valid(&app_subnet_record), + Valid(&engine_subnet_record), + Valid(&nns_subnet_record), + ], + ni_dkg_transcripts: [Valid(Some(&dummy_transcript)); 3], + nns_subnet_id: Valid(nns_subnet_id), + chain_key_enabled_subnets: &chain_key_enabled_subnets, + provisional_whitelist: Missing, + routing_table: Valid(&routing_table), + canister_migrations: Missing, + node_public_keys: &BTreeMap::default(), + api_boundary_node_records: &BTreeMap::default(), + node_records: &BTreeMap::default(), + }) + .unwrap(); + + ( + fixture.registry, + app_subnet_id, + engine_subnet_id, + nns_subnet_id, + ) +} + +fn shared_chain_key() -> MasterPublicKeyId { + MasterPublicKeyId::Ecdsa(EcdsaKeyId { + curve: EcdsaCurve::Secp256k1, + name: "shared_key".to_string(), + }) +} + +fn engine_only_chain_key() -> MasterPublicKeyId { + MasterPublicKeyId::Ecdsa(EcdsaKeyId { + curve: EcdsaCurve::Secp256k1, + name: "engine_only_key".to_string(), + }) +} + +/// A non-engine (Application) subnet must not list a CloudEngine subnet as a +/// signing subnet for a chain key, since a chain-key request to it would be +/// rejected at the engine boundary. A key held *only* on an engine is pruned +/// entirely. +#[test] +fn chain_key_enabled_subnets_prune_engine_signers_on_application_subnet() { + with_test_replica_logger(|log| { + let (registry, app_subnet_id, _engine_subnet_id, _nns_subnet_id) = + setup_three_subnet_registry_with_chain_keys(); + + let network_topology = try_to_read_registry(registry, log, app_subnet_id) + .unwrap() + .0; + + // The shared key keeps only the Application subnet; the engine is pruned. + assert_eq!( + network_topology.chain_key_enabled_subnets(&shared_chain_key()), + &[app_subnet_id], + ); + // The engine-only key is pruned to an empty list and thus dropped. + assert!( + network_topology + .chain_key_enabled_subnets(&engine_only_chain_key()) + .is_empty() + ); + }); +} + +/// A CloudEngine subnet may serve chain keys it holds itself (loopback never +/// crosses the boundary), but must not list a non-engine signing subnet, as a +/// chain-key request to it would be rejected at the engine boundary. +#[test] +fn chain_key_enabled_subnets_keep_only_loopback_on_engine_subnet() { + with_test_replica_logger(|log| { + let (registry, _app_subnet_id, engine_subnet_id, _nns_subnet_id) = + setup_three_subnet_registry_with_chain_keys(); + + let network_topology = try_to_read_registry(registry, log, engine_subnet_id) + .unwrap() + .0; + + // The shared key keeps only the engine itself; the Application subnet is pruned. + assert_eq!( + network_topology.chain_key_enabled_subnets(&shared_chain_key()), + &[engine_subnet_id], + ); + // The engine-only key is kept (loopback). + assert_eq!( + network_topology.chain_key_enabled_subnets(&engine_only_chain_key()), + &[engine_subnet_id], + ); }); } diff --git a/rs/messaging/src/routing/stream_builder.rs b/rs/messaging/src/routing/stream_builder.rs index 48c2dfa02d32..f3e24ffa9279 100644 --- a/rs/messaging/src/routing/stream_builder.rs +++ b/rs/messaging/src/routing/stream_builder.rs @@ -10,8 +10,8 @@ use ic_replicated_state::replicated_state::{ }; use ic_replicated_state::{ReplicatedState, Stream}; use ic_types::messages::{ - MAX_INTER_CANISTER_PAYLOAD_IN_BYTES, MAX_REJECT_MESSAGE_LEN_BYTES, Payload, RejectContext, - Request, RequestOrResponse, Response, StreamMessage, + MAX_INTER_CANISTER_PAYLOAD_IN_BYTES, MAX_REJECT_MESSAGE_LEN_BYTES, NO_DEADLINE, Payload, + RejectContext, Request, RequestOrResponse, Response, StreamMessage, }; use ic_types::{CountBytes, SubnetId}; use ic_types_cycles::{CompoundCycles, Cycles}; @@ -71,6 +71,7 @@ const LABEL_VALUE_TYPE_REFUND: &str = "refund"; const LABEL_VALUE_STATUS_SUCCESS: &str = "success"; const LABEL_VALUE_STATUS_CANISTER_NOT_FOUND: &str = "canister_not_found"; const LABEL_VALUE_STATUS_PAYLOAD_TOO_LARGE: &str = "payload_too_large"; +const LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED: &str = "engine_not_allowed"; const CRITICAL_ERROR_INFINITE_LOOP: &str = "mr_stream_builder_infinite_loop"; const CRITICAL_ERROR_PAYLOAD_TOO_LARGE: &str = "mr_stream_builder_payload_too_large"; @@ -423,6 +424,7 @@ impl StreamBuilderImpl { let mut streams = state.take_streams(); let network_topology = state.metadata.network_topology.clone(); + let own_subnet_type = state.metadata.own_subnet_type; // First, have up to `max_stream_messages / 2` refunds in each stream (including // already routed ones) while respecting stream message and byte limits. @@ -439,6 +441,8 @@ impl StreamBuilderImpl { let mut requests_to_reject = Vec::new(); let mut oversized_requests = Vec::new(); + let mut engine_requests_to_reject: Vec> = Vec::new(); + let mut engine_response_dropped_cycles = Cycles::zero(); let mut output_iter = state.output_into_iter(); let mut last_output_size = usize::MAX; @@ -487,9 +491,52 @@ impl StreamBuilderImpl { // We will route (or reject) the message, pop it. let mut msg = validated_next(&mut output_iter, &msg); + let is_engine_dst = !is_loopback_stream + && network_topology + .subnets() + .get(&dst_subnet_id) + .is_some_and(|t| t.subnet_type == SubnetType::CloudEngine); + let is_engine_src = + !is_loopback_stream && own_subnet_type == SubnetType::CloudEngine; + // Reject messages with oversized payloads, as they may // cause streams to permanently stall. match msg { + // Request at an engine boundary: reject if unbounded-wait or carries cycles. + RequestOrResponse::Request(ref req) + if (is_engine_dst || is_engine_src) + && (req.deadline == NO_DEADLINE + || req.payment > Cycles::zero()) => + { + self.observe_message_type_status( + LABEL_VALUE_TYPE_REQUEST, + LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED, + ); + let req = match msg { + RequestOrResponse::Request(req) => req, + _ => unreachable!(), + }; + engine_requests_to_reject.push(req); + } + + // Response that should not exist at an engine boundary: a + // guaranteed-response response, or one carrying cycles. Neither + // guaranteed-response calls nor cycles are allowed across the + // boundary, so such a response can only come from a buggy local + // canister. Drop it; any cycles are lost. Kept above the + // oversized-response arm, which truncates and routes; matching + // here first ensures such a response is dropped, not routed. + RequestOrResponse::Response(ref rep) + if (is_engine_dst || is_engine_src) + && (rep.deadline == NO_DEADLINE || rep.refund > Cycles::zero()) => + { + engine_response_dropped_cycles += rep.refund; + self.observe_message_type_status( + LABEL_VALUE_TYPE_RESPONSE, + LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED, + ); + } + // Remote request above the payload size limit. RequestOrResponse::Request(req) if dst_subnet_id != self.subnet_id @@ -616,6 +663,24 @@ impl StreamBuilderImpl { ); } + for req in engine_requests_to_reject { + self.reject_local_request( + &mut state, + &req, + RejectCode::SysFatal, + "Unbounded-wait calls and calls with cycles are not allowed to CloudEngine subnets" + .to_string(), + ); + } + + if engine_response_dropped_cycles > Cycles::zero() { + let own_cost_schedule = state.get_own_cost_schedule(); + state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + engine_response_dropped_cycles, + own_cost_schedule, + )); + } + // Export the total number of enqueued messages and byte size, per stream. streams .iter() @@ -685,11 +750,27 @@ impl StreamBuilderImpl { ) { let mut cycles_lost = Cycles::zero(); let own_cost_schedule = state.get_own_cost_schedule(); + let own_is_engine = state.metadata.own_subnet_type == SubnetType::CloudEngine; state.take_refunds(|refund| { match network_topology.route(refund.recipient().get()) { Some(dst_subnet_id) => { - let stream = streams.entry(dst_subnet_id).or_default(); let is_loopback_stream = dst_subnet_id == self.subnet_id; + let is_engine_dst = !is_loopback_stream + && network_topology + .subnets() + .get(&dst_subnet_id) + .is_some_and(|t| t.subnet_type == SubnetType::CloudEngine); + let is_engine_src = !is_loopback_stream && own_is_engine; + if is_engine_dst || is_engine_src { + // Refund destined to cross the engine boundary: drop, cycles lost. + cycles_lost += refund.amount(); + self.observe_message_type_status( + LABEL_VALUE_TYPE_REFUND, + LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED, + ); + return true; + } + let stream = streams.entry(dst_subnet_id).or_default(); if is_loopback_stream || (stream.refund_count() < refund_limit && stream.messages().len() < self.max_stream_messages diff --git a/rs/messaging/src/routing/stream_builder/tests.rs b/rs/messaging/src/routing/stream_builder/tests.rs index 25d4922d4678..70ee4a235d1d 100644 --- a/rs/messaging/src/routing/stream_builder/tests.rs +++ b/rs/messaging/src/routing/stream_builder/tests.rs @@ -799,6 +799,260 @@ fn build_streams_with_best_effort_messages() { } } +/// Tests that a guaranteed-response request from a CloudEngine subnet (own subnet) to a +/// non-engine subnet is rejected with a synthetic reject response. +#[test] +fn build_streams_engine_src_rejects_gr_request() { + let local_canister_id = canister_test_id(0); + let remote_canister_id = canister_test_id(1); + with_test_replica_logger(|log| { + let msg = RequestBuilder::new() + .sender(local_canister_id) + .receiver(remote_canister_id) + .sender_reply_callback(CallbackId::from(1)) + .deadline(NO_DEADLINE) + .payment(Cycles::zero()) + .build(); + + let (stream_builder, mut provided_state, metrics_registry) = new_fixture(&log); + + provided_state.metadata.own_subnet_type = SubnetType::CloudEngine; + provided_state.metadata.network_topology.set_subnets(btreemap! { + LOCAL_SUBNET => SubnetTopology { subnet_type: SubnetType::CloudEngine, ..Default::default() }, + REMOTE_SUBNET => SubnetTopology { subnet_type: SubnetType::Application, ..Default::default() }, + }); + provided_state.metadata.network_topology.set_routing_table( + RoutingTable::try_from(btreemap! { + CanisterIdRange { start: local_canister_id, end: local_canister_id } => LOCAL_SUBNET, + CanisterIdRange { start: remote_canister_id, end: remote_canister_id } => REMOTE_SUBNET, + }) + .unwrap(), + ); + + let provided_canister_states = canister_states_with_outputs(vec![msg]); + provided_state.put_canister_states(provided_canister_states); + + let result_state = stream_builder.build_streams(provided_state); + + // No message in REMOTE_SUBNET stream. + assert!( + result_state + .streams() + .get(&REMOTE_SUBNET) + .is_none_or(|s| s.messages().is_empty()) + ); + + // A synthetic reject response was delivered back to the sender. + assert!( + result_state + .canister_state(&local_canister_id) + .unwrap() + .clone() + .pop_input() + .is_some() + ); + + assert_routed_messages_eq( + metric_vec(&[( + &[ + (LABEL_TYPE, LABEL_VALUE_TYPE_REQUEST), + (LABEL_STATUS, LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED), + ], + 1, + )]), + &metrics_registry, + ); + assert_eq_critical_errors(0, 0, &metrics_registry); + }); +} + +/// Tests that a best-effort request with cycles from a CloudEngine subnet (own subnet) to a +/// non-engine subnet is rejected with a synthetic reject response. +#[test] +fn build_streams_engine_src_rejects_cycles_request() { + let local_canister_id = canister_test_id(0); + let remote_canister_id = canister_test_id(1); + with_test_replica_logger(|log| { + let msg = RequestBuilder::new() + .sender(local_canister_id) + .receiver(remote_canister_id) + .sender_reply_callback(CallbackId::from(1)) + .deadline(SOME_DEADLINE) + .payment(Cycles::new(100)) + .build(); + + let (stream_builder, mut provided_state, metrics_registry) = new_fixture(&log); + + provided_state.metadata.own_subnet_type = SubnetType::CloudEngine; + provided_state.metadata.network_topology.set_subnets(btreemap! { + LOCAL_SUBNET => SubnetTopology { subnet_type: SubnetType::CloudEngine, ..Default::default() }, + REMOTE_SUBNET => SubnetTopology { subnet_type: SubnetType::Application, ..Default::default() }, + }); + provided_state.metadata.network_topology.set_routing_table( + RoutingTable::try_from(btreemap! { + CanisterIdRange { start: local_canister_id, end: local_canister_id } => LOCAL_SUBNET, + CanisterIdRange { start: remote_canister_id, end: remote_canister_id } => REMOTE_SUBNET, + }) + .unwrap(), + ); + + let provided_canister_states = canister_states_with_outputs(vec![msg]); + provided_state.put_canister_states(provided_canister_states); + + let result_state = stream_builder.build_streams(provided_state); + + // No message in REMOTE_SUBNET stream. + assert!( + result_state + .streams() + .get(&REMOTE_SUBNET) + .is_none_or(|s| s.messages().is_empty()) + ); + + // A synthetic reject response was delivered back to the sender. + assert!( + result_state + .canister_state(&local_canister_id) + .unwrap() + .clone() + .pop_input() + .is_some() + ); + + assert_routed_messages_eq( + metric_vec(&[( + &[ + (LABEL_TYPE, LABEL_VALUE_TYPE_REQUEST), + (LABEL_STATUS, LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED), + ], + 1, + )]), + &metrics_registry, + ); + assert_eq_critical_errors(0, 0, &metrics_registry); + }); +} + +/// Tests that a response with a cycles refund from a CloudEngine subnet (own subnet) to a +/// non-engine subnet is dropped (no synthetic reject, no stream entry). +#[test] +fn build_streams_engine_src_drops_cycles_response() { + let local_canister_id = canister_test_id(0); + let remote_canister_id = canister_test_id(1); + with_test_replica_logger(|log| { + let response = Arc::new(Response { + originator: remote_canister_id, + respondent: local_canister_id, + originator_reply_callback: CallbackId::from(1), + refund: Cycles::new(100), + response_payload: Payload::Data(vec![]), + deadline: NO_DEADLINE, + }); + + let (stream_builder, mut provided_state, metrics_registry) = new_fixture(&log); + + provided_state.metadata.own_subnet_type = SubnetType::CloudEngine; + provided_state.metadata.network_topology.set_subnets(btreemap! { + LOCAL_SUBNET => SubnetTopology { subnet_type: SubnetType::CloudEngine, ..Default::default() }, + REMOTE_SUBNET => SubnetTopology { subnet_type: SubnetType::Application, ..Default::default() }, + }); + provided_state.metadata.network_topology.set_routing_table( + RoutingTable::try_from(btreemap! { + CanisterIdRange { start: local_canister_id, end: local_canister_id } => LOCAL_SUBNET, + CanisterIdRange { start: remote_canister_id, end: remote_canister_id } => REMOTE_SUBNET, + }) + .unwrap(), + ); + + let provided_canister_states = + canister_states_with_outputs(vec![RequestOrResponse::Response(response)]); + provided_state.put_canister_states(provided_canister_states); + + let result_state = stream_builder.build_streams(provided_state); + + // No message in REMOTE_SUBNET stream (response was dropped). + assert!( + result_state + .streams() + .get(&REMOTE_SUBNET) + .is_none_or(|s| s.messages().is_empty()) + ); + + // No synthetic reject: responses are dropped silently. + let maybe_reject = result_state + .canister_state(&local_canister_id) + .unwrap() + .clone() + .pop_input(); + assert!(maybe_reject.is_none()); + + assert_routed_messages_eq( + metric_vec(&[( + &[ + (LABEL_TYPE, LABEL_VALUE_TYPE_RESPONSE), + (LABEL_STATUS, LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED), + ], + 1, + )]), + &metrics_registry, + ); + assert_eq_critical_errors(0, 0, &metrics_registry); + }); +} + +/// Tests that refunds destined to cross an engine boundary are dropped, in both +/// directions: +/// * engine → non-engine (own subnet is engine, recipient on a non-engine subnet) +/// * non-engine → engine (own subnet is non-engine, recipient on an engine subnet) +/// +/// In both cases, the refund must not appear in the destination stream. +#[test] +fn build_streams_drops_refunds_at_engine_boundary() { + let local_canister_id = canister_test_id(0); + let remote_canister_id = canister_test_id(1); + + for (own_subnet_type, remote_subnet_type) in [ + (SubnetType::CloudEngine, SubnetType::Application), + (SubnetType::Application, SubnetType::CloudEngine), + ] { + with_test_replica_logger(|log| { + let (stream_builder, mut provided_state, _) = new_fixture(&log); + + provided_state.metadata.own_subnet_type = own_subnet_type; + provided_state + .metadata + .network_topology + .set_subnets(btreemap! { + LOCAL_SUBNET => SubnetTopology { subnet_type: own_subnet_type, ..Default::default() }, + REMOTE_SUBNET => SubnetTopology { subnet_type: remote_subnet_type, ..Default::default() }, + }); + provided_state.metadata.network_topology.set_routing_table( + RoutingTable::try_from(btreemap! { + CanisterIdRange { start: local_canister_id, end: local_canister_id } => LOCAL_SUBNET, + CanisterIdRange { start: remote_canister_id, end: remote_canister_id } => REMOTE_SUBNET, + }) + .unwrap(), + ); + + // Add a refund destined for the canister on the other side of the engine boundary. + provided_state.add_refund(remote_canister_id, Cycles::new(100)); + + let result_state = stream_builder.build_streams(provided_state); + + // The refund must NOT have been routed into the REMOTE_SUBNET stream. + let routed_refunds = result_state + .streams() + .get(&REMOTE_SUBNET) + .map_or(0, |s| s.refund_count()); + assert_eq!( + 0, routed_refunds, + "Refund leaked across engine boundary (own_subnet_type={own_subnet_type:?}, \ + remote_subnet_type={remote_subnet_type:?})", + ); + }); + } +} + /// Given a stream with some (potentially zero) initial refunds and canister /// messages, tests that `build_streams()` respects the various limits when /// routing additional refunds and canister messages: diff --git a/rs/messaging/src/routing/stream_handler.rs b/rs/messaging/src/routing/stream_handler.rs index 6a5e5193d782..dca6cb1ea687 100644 --- a/rs/messaging/src/routing/stream_handler.rs +++ b/rs/messaging/src/routing/stream_handler.rs @@ -1,5 +1,6 @@ use crate::message_routing::{ - CRITICAL_ERROR_INDUCT_RESPONSE_FAILED, LatencyMetrics, MessageRoutingMetrics, + CRITICAL_ERROR_ENGINE_MESSAGE, CRITICAL_ERROR_INDUCT_RESPONSE_FAILED, LatencyMetrics, + MessageRoutingMetrics, }; use ic_base_types::NumBytes; use ic_config::execution_environment::Config as HypervisorConfig; @@ -12,18 +13,19 @@ use ic_interfaces::messaging::{ use ic_logger::{ReplicaLogger, debug, error, info, trace}; use ic_metrics::MetricsRegistry; use ic_metrics::buckets::{add_bucket, decimal_buckets}; +use ic_registry_subnet_type::SubnetType; use ic_replicated_state::metadata_state::{Stream, StreamMap}; use ic_replicated_state::replicated_state::{ LABEL_VALUE_QUEUE_FULL, MR_SYNTHETIC_REJECT_MESSAGE_MAX_LEN, ReplicatedStateMessageRouting, }; use ic_replicated_state::{ReplicatedState, StateError}; use ic_types::messages::{ - MAX_INTER_CANISTER_PAYLOAD_IN_BYTES_U64, MAX_RESPONSE_COUNT_BYTES, Payload, Refund, - RejectContext, Request, RequestOrResponse, Response, StreamMessage, + MAX_INTER_CANISTER_PAYLOAD_IN_BYTES_U64, MAX_RESPONSE_COUNT_BYTES, NO_DEADLINE, Payload, + Refund, RejectContext, Request, RequestOrResponse, Response, StreamMessage, }; use ic_types::xnet::{RejectReason, RejectSignal, StreamIndex, StreamIndexedQueue, StreamSlice}; use ic_types::{CanisterId, SubnetId}; -use ic_types_cycles::CompoundCycles; +use ic_types_cycles::{CompoundCycles, Cycles}; use prometheus::{Histogram, IntCounter, IntCounterVec, IntGaugeVec}; use std::cell::RefCell; use std::collections::{BTreeMap, VecDeque}; @@ -60,6 +62,10 @@ struct StreamHandlerMetrics { /// messages for canisters not hosted (now, or previously, according to /// `canister_migrations`) by this subnet. pub critical_error_receiver_subnet_mismatch: IntCounter, + /// Critical error counter (see [`MetricsRegistry::error_counter`]) tracking + /// messages received from a CloudEngine subnet that carried cycles or were + /// guaranteed-response calls, which are not permitted on non-engine subnets. + pub critical_error_engine_message: IntCounter, } const METRIC_INDUCTED_XNET_MESSAGES: &str = "mr_inducted_xnet_message_count"; @@ -135,6 +141,9 @@ impl StreamHandlerMetrics { metrics_registry.error_counter(CRITICAL_ERROR_SENDER_SUBNET_MISMATCH); let critical_error_receiver_subnet_mismatch = metrics_registry.error_counter(CRITICAL_ERROR_RECEIVER_SUBNET_MISMATCH); + let critical_error_engine_message = message_routing_metrics + .critical_error_engine_message + .clone(); // Initialize all `inducted_xnet_messages` counters with zero, so they are all // exported from process start (`IntCounterVec` is really a map). @@ -171,6 +180,7 @@ impl StreamHandlerMetrics { critical_error_induct_response_failed, critical_error_sender_subnet_mismatch, critical_error_receiver_subnet_mismatch, + critical_error_engine_message, } } } @@ -742,6 +752,30 @@ impl StreamHandlerImpl { available_guaranteed_response_memory: &mut i64, ) { let own_cost_schedule = state.get_own_cost_schedule(); + + // True if the remote subnet is a CloudEngine subnet (and this is not the loopback stream). + // Messages at the engine boundary require special handling: + // * Guaranteed-response requests are rejected with `EngineNotAllowed`. + // * Best-effort requests carrying cycles are dropped; their cycles are accounted as lost. + // * Best-effort requests without cycles are inducted normally. + // * Best-effort responses without cycles are inducted normally. + // * Responses that should not exist (guaranteed-response, or carrying cycles) are + // dropped and a critical error is raised; any cycles are accounted as lost. + // * `Refund` messages are dropped; their cycles are accounted as lost. + // Loopback messages are excluded: the engine subnet is itself CloudEngine, but its + // own-subnet loopback messages should always be inducted normally. + let is_engine_subnet = remote_subnet_id != self.subnet_id + && state + .metadata + .network_topology + .subnets() + .get(&remote_subnet_id) + .is_some_and(|t| t.subnet_type == SubnetType::CloudEngine); + let own_is_engine = state.metadata.own_subnet_type == SubnetType::CloudEngine; + // True when this message is at the engine boundary and is not a loopback message. + let is_at_engine_boundary = + is_engine_subnet || (own_is_engine && remote_subnet_id != self.subnet_id); + let (msg, msg_type) = match msg { StreamMessage::Request(req) => { (RequestOrResponse::Request(req), LABEL_VALUE_TYPE_REQUEST) @@ -750,6 +784,27 @@ impl StreamHandlerImpl { (RequestOrResponse::Response(rep), LABEL_VALUE_TYPE_RESPONSE) } StreamMessage::Refund(refund) => { + // Refunds bypass sender validation; apply the engine filters here. + if is_at_engine_boundary { + error!( + self.log, + "{}: Dropping refund from {}: {:?}", + CRITICAL_ERROR_ENGINE_MESSAGE, + remote_subnet_id, + refund, + ); + self.metrics.critical_error_engine_message.inc(); + self.observe_inducted_message_status( + LABEL_VALUE_TYPE_REFUND, + LABEL_VALUE_DROPPED, + ); + state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + refund.amount(), + own_cost_schedule, + )); + stream.push_accept_signal(); + return; + } return self.induct_refund(&refund, state, stream); } }; @@ -762,6 +817,74 @@ impl StreamHandlerImpl { // on a canister's migration path. (SenderSubnet::Match, _) | (SenderSubnet::OnMigrationPath, RequestOrResponse::Response(_)) => { + // Apply the engine filter before inducting. See `is_at_engine_boundary`. + if is_at_engine_boundary { + match &msg { + // Guaranteed-response requests are never allowed at the engine boundary. + RequestOrResponse::Request(req) if req.deadline == NO_DEADLINE => { + error!( + self.log, + "{}: Rejecting guaranteed-response request at engine boundary (from {}): {:?}", + CRITICAL_ERROR_ENGINE_MESSAGE, + remote_subnet_id, + req, + ); + self.metrics.critical_error_engine_message.inc(); + self.observe_inducted_message_status(msg_type, LABEL_VALUE_DROPPED); + stream.push_reject_signal(RejectReason::EngineNotAllowed); + return; + } + // Best-effort requests with cycles: drop. + RequestOrResponse::Request(req) if req.payment > Cycles::zero() => { + error!( + self.log, + "{}: Dropping best-effort request with cycles at engine boundary (from {}): {:?}", + CRITICAL_ERROR_ENGINE_MESSAGE, + remote_subnet_id, + req, + ); + self.metrics.critical_error_engine_message.inc(); + self.observe_inducted_message_status(msg_type, LABEL_VALUE_DROPPED); + state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + req.payment, + own_cost_schedule, + )); + stream.push_accept_signal(); + return; + } + // BE request with no cycles, or a BE response with no cycles: + // fall through and induct normally. A response that should not + // exist (GR or carrying cycles) is dropped just below. + _ => {} + } + // A response that should not exist at the engine boundary: a + // guaranteed-response response, or one carrying cycles. Neither is + // allowed across the boundary, so this can only come from a buggy or + // malicious peer. Drop it (any cycles are lost) and raise a critical + // error. + if let RequestOrResponse::Response(ref rep) = msg { + let is_gr = rep.deadline == NO_DEADLINE; + let has_cycles = rep.refund > Cycles::zero(); + if is_gr || has_cycles { + error!( + self.log, + "{}: Dropping engine-boundary response (from {}): {:?}", + CRITICAL_ERROR_ENGINE_MESSAGE, + remote_subnet_id, + rep, + ); + self.metrics.critical_error_engine_message.inc(); + self.observe_inducted_message_status(msg_type, LABEL_VALUE_DROPPED); + if has_cycles { + state.observe_lost_cycles_due_to_dropped_messages( + CompoundCycles::new(rep.refund, own_cost_schedule), + ); + } + stream.push_accept_signal(); + return; + } + } + } match self.induct_message_impl( msg, msg_type, @@ -797,6 +920,13 @@ impl StreamHandlerImpl { // Reject requests not originating from their sender's known host // subnet. Their senders are likely manually migrated canisters. + // + // NOTE: We intentionally do NOT relax this for engine-boundary traffic. + // Sender-subnet validation is the only anti-forgery check on responses; a + // malicious engine that forges `respondent` to a canister hosted on a + // different subnet would otherwise be able to inject responses matching + // outstanding callbacks. Failing closed here (drop responses, reject + // requests) is required for response authenticity. (SenderSubnet::Mismatch, RequestOrResponse::Request(_)) => { self.observe_inducted_message_status( msg_type, @@ -1210,6 +1340,10 @@ fn generate_reject_response_for(reason: RejectReason, request: &Request) -> Requ RejectCode::SysFatal, "Inducting request failed due to an unknown error".to_string(), ), + RejectReason::EngineNotAllowed => ( + RejectCode::SysFatal, + "Guaranteed-response calls from CloudEngine subnets are not allowed".to_string(), + ), }; generate_reject_response(request, code, message) } diff --git a/rs/messaging/src/routing/stream_handler/tests.rs b/rs/messaging/src/routing/stream_handler/tests.rs index 97863683f4b0..cb4603b011a2 100644 --- a/rs/messaging/src/routing/stream_handler/tests.rs +++ b/rs/messaging/src/routing/stream_handler/tests.rs @@ -10,7 +10,7 @@ use ic_metrics::MetricsRegistry; use ic_registry_routing_table::{CanisterIdRange, CanisterIdRanges, RoutingTable}; use ic_registry_subnet_type::SubnetType; use ic_replicated_state::{ - CanisterStatus, ReplicatedState, Stream, + CanisterStatus, ReplicatedState, Stream, SubnetTopology, metadata_state::{StreamMap, testing::NetworkTopologyTesting}, replicated_state::LABEL_VALUE_OUT_OF_MEMORY, testing::{ReplicatedStateTesting, StreamTesting, SystemStateTesting}, @@ -2811,6 +2811,69 @@ fn induct_stream_slices_with_refunds() { } } +/// Tests that a refund arriving in a slice across an engine boundary is dropped, +/// its cycles are observed as lost, and a critical error is raised. With subnet +/// types fixed at creation, an honest peer would never produce such a refund — +/// arrival here implies a malicious or buggy sender. +#[test] +fn induct_stream_slices_drops_refund_at_engine_boundary() { + for cost_schedule in [ + CanisterCyclesCostSchedule::Normal, + CanisterCyclesCostSchedule::Free, + ] { + with_test_setup( + btreemap![], + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages: vec![Refund(*LOCAL_CANISTER)], + ..StreamSliceConfig::default() + }], + |stream_handler, mut state, slices, metrics| { + // Mark REMOTE_SUBNET as a CloudEngine. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Expected state: a stream with one accept signal, no induction, cycles lost. + let refund = *refund_in_slice(slices.get(&REMOTE_SUBNET), 0); + let mut expected_state = state.clone(); + let expected_stream = stream_from_config(StreamConfig { + signals_end: 1, + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + expected_state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + refund.amount(), + cost_schedule, + )); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + let inducted_state = stream_handler.induct_stream_slices( + state, + slices, + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_REFUND, + LABEL_VALUE_DROPPED, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts { + engine_message: 1, + ..CriticalErrorCounts::default() + }); + }, + ); + } +} + /// Tests that messages in the loopback stream and incoming slices are inducted /// (with signals added appropriately); and messages present in the initial /// state are garbage collected or rerouted as appropriate. @@ -3229,6 +3292,368 @@ fn process_stream_slices_with_invalid_messages() { ); } +/// Tests that a guaranteed-response request from a CloudEngine subnet, arriving at a +/// non-engine subnet, triggers a critical error and is rejected with a reject signal. +#[test] +fn induct_stream_slices_engine_src_guaranteed_response_request_critical_error() { + with_test_setup( + btreemap![], + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages: vec![Request(*REMOTE_CANISTER, *LOCAL_CANISTER)], + ..StreamSliceConfig::default() + }], + |stream_handler, mut state, slices, metrics| { + // Mark REMOTE_SUBNET as CloudEngine. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Expect a stream with a reject signal (EngineNotAllowed) for the GR request. + let mut expected_state = state.clone(); + let expected_stream = stream_from_config(StreamConfig { + signals_end: 1, + reject_signals: vec![RejectSignal::new(RejectReason::EngineNotAllowed, 0.into())], + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + let inducted_state = stream_handler.induct_stream_slices( + state, + slices, + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_REQUEST, + LABEL_VALUE_DROPPED, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts { + engine_message: 1, + ..CriticalErrorCounts::default() + }); + }, + ); +} + +/// Tests that a best-effort request with no cycles from a CloudEngine subnet, arriving at a +/// non-engine subnet, is inducted successfully without triggering a critical error. +#[test] +fn induct_stream_slices_engine_src_best_effort_request_inducted() { + with_test_setup( + btreemap![], + btreemap![], + |stream_handler, mut state, _, metrics| { + // Mark REMOTE_SUBNET as CloudEngine. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Build a best-effort request (deadline != NO_DEADLINE, payment = 0). + let be_request = RequestBuilder::new() + .sender(*REMOTE_CANISTER) + .receiver(*LOCAL_CANISTER) + .sender_reply_callback(CallbackId::new(1)) + .deadline(CoarseTime::from_secs_since_unix_epoch(123)) + .payment(Cycles::zero()) + .build(); + let slice = stream_slice_from_config(StreamSliceConfig { + messages: vec![be_request.into()], + ..StreamSliceConfig::default() + }); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + stream_handler.induct_stream_slices( + state, + btreemap![REMOTE_SUBNET => slice], + &mut available_guaranteed_response_memory, + ); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_REQUEST, + LABEL_VALUE_SUCCESS, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts::default()); + }, + ); +} + +/// Tests that a best-effort request carrying cycles from a CloudEngine subnet is dropped +/// at the engine boundary: cycles observed as lost, accept signal pushed, and a critical +/// error raised. Mirrors the sender-side test +/// `build_streams_engine_src_rejects_cycles_request` on the receiving side, which is +/// the security-critical filter against a malicious engine. +#[test] +fn induct_stream_slices_engine_src_best_effort_request_with_cycles_dropped() { + with_test_setup( + btreemap![], + btreemap![], + |stream_handler, mut state, _, metrics| { + // Mark REMOTE_SUBNET as CloudEngine. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Build a best-effort request carrying cycles + // (deadline != NO_DEADLINE, payment > 0). + let payment = Cycles::new(1_000); + let be_request = RequestBuilder::new() + .sender(*REMOTE_CANISTER) + .receiver(*LOCAL_CANISTER) + .sender_reply_callback(CallbackId::new(1)) + .deadline(CoarseTime::from_secs_since_unix_epoch(123)) + .payment(payment) + .build(); + let slice = stream_slice_from_config(StreamSliceConfig { + messages: vec![be_request.into()], + ..StreamSliceConfig::default() + }); + + // Expected: an outgoing stream to REMOTE_SUBNET with one accept signal; + // the request's cycles are observed as lost. + let mut expected_state = state.clone(); + let expected_stream = stream_from_config(StreamConfig { + signals_end: 1, + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + expected_state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + payment, + CanisterCyclesCostSchedule::Normal, + )); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + let inducted_state = stream_handler.induct_stream_slices( + state, + btreemap![REMOTE_SUBNET => slice], + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_REQUEST, + LABEL_VALUE_DROPPED, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts { + engine_message: 1, + ..CriticalErrorCounts::default() + }); + }, + ); +} + +/// Tests that a best-effort response with no cycles from a CloudEngine subnet, arriving at a +/// non-engine subnet, is inducted successfully without triggering a critical error. +#[test] +fn induct_stream_slices_engine_src_best_effort_response_inducted() { + // Use a BestEffortResponse in the slice config so that the framework registers a callback + // for LOCAL_CANISTER and creates the necessary input queue reservation. + with_test_setup( + btreemap![], + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages: vec![BestEffortResponse( + *REMOTE_CANISTER, + *LOCAL_CANISTER, + CoarseTime::from_secs_since_unix_epoch(123), + )], + ..StreamSliceConfig::default() + }], + |stream_handler, mut state, _, metrics| { + // Mark REMOTE_SUBNET as CloudEngine. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Build a best-effort response with no cycles using the callback registered by setup. + let be_response = ResponseBuilder::new() + .respondent(*REMOTE_CANISTER) + .originator(*LOCAL_CANISTER) + .originator_reply_callback(CallbackId::new(1)) + .deadline(CoarseTime::from_secs_since_unix_epoch(123)) + .refund(Cycles::zero()) + .build(); + let slice = stream_slice_from_config(StreamSliceConfig { + messages: vec![be_response.into()], + ..StreamSliceConfig::default() + }); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + stream_handler.induct_stream_slices( + state, + btreemap![REMOTE_SUBNET => slice], + &mut available_guaranteed_response_memory, + ); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_RESPONSE, + LABEL_VALUE_SUCCESS, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts::default()); + }, + ); +} + +/// Regression test for response forgery via an engine boundary. +/// +/// A malicious engine subnet sends a slice containing a response whose `respondent` +/// is hosted on a different (honest) subnet, matching an outstanding callback on the +/// victim canister. Without sender-subnet validation as a closed gate, the forged +/// response would be inducted, satisfying the callback with attacker-controlled data +/// before the genuine response from the real respondent could arrive. +/// +/// Expected: sender-subnet validation must fail closed even at the engine boundary; +/// the forged response is dropped, cycles are accounted for as lost, and the +/// `sender_subnet_mismatch` critical error is raised. +#[test] +fn induct_stream_slices_engine_boundary_drops_forged_response() { + with_test_setup( + btreemap![], + // Malicious engine sends a response forging `OTHER_LOCAL_CANISTER` as the + // respondent — that canister is hosted on LOCAL_SUBNET, not REMOTE_SUBNET. + // The originator is LOCAL_CANISTER (victim); the framework registers a + // matching callback so the response would be inducted if validation were + // bypassed. + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages: vec![Response(*OTHER_LOCAL_CANISTER, *LOCAL_CANISTER)], + ..StreamSliceConfig::default() + }], + |stream_handler, mut state, slices, metrics| { + // Mark REMOTE_SUBNET as a CloudEngine to put us at the engine boundary. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Expected state: response dropped (no induction), accept signal pushed, + // cycles attached to the forged response observed as lost. + let forged = response_in_slice(slices.get(&REMOTE_SUBNET), 0).clone(); + let mut expected_state = state.clone(); + let expected_stream = stream_from_config(StreamConfig { + signals_end: 1, + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + expected_state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + forged.refund, + CanisterCyclesCostSchedule::Normal, + )); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + let inducted_state = stream_handler.induct_stream_slices( + state, + slices, + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_RESPONSE, + LABEL_VALUE_SENDER_SUBNET_MISMATCH, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts { + sender_subnet_mismatch: 1, + ..CriticalErrorCounts::default() + }); + }, + ); +} + +/// Tests that a response which should not exist at the engine boundary — here a +/// guaranteed-response response carrying cycles, from a CloudEngine subnet — is +/// dropped (not inducted), its cycles are observed as lost, an accept signal is +/// pushed, and the `engine_message` critical error is raised. The sender subnet +/// matches, so the response reaches the engine filter rather than the +/// sender-subnet-mismatch path. +#[test] +fn induct_stream_slices_engine_boundary_drops_response_that_should_not_exist() { + with_test_setup( + btreemap![], + // A guaranteed-response response (with cycles) whose respondent is genuinely + // hosted on REMOTE_SUBNET, so sender-subnet validation matches. + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages: vec![Response(*REMOTE_CANISTER, *LOCAL_CANISTER)], + ..StreamSliceConfig::default() + }], + |stream_handler, mut state, slices, metrics| { + // Mark REMOTE_SUBNET as a CloudEngine to put us at the engine boundary. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Expected state: response dropped (no induction), accept signal pushed, + // cycles attached to the dropped response observed as lost. + let dropped = response_in_slice(slices.get(&REMOTE_SUBNET), 0).clone(); + let mut expected_state = state.clone(); + let expected_stream = stream_from_config(StreamConfig { + signals_end: 1, + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + expected_state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + dropped.refund, + CanisterCyclesCostSchedule::Normal, + )); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + let inducted_state = stream_handler.induct_stream_slices( + state, + slices, + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_RESPONSE, + LABEL_VALUE_DROPPED, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts { + engine_message: 1, + ..CriticalErrorCounts::default() + }); + }, + ); +} + /// Generates a test setup. For details see `with_test_setup_and_config()`. fn with_test_setup( stream_configs: BTreeMap>>, @@ -3835,7 +4260,11 @@ impl MetricsFixture { &CRITICAL_ERROR_RECEIVER_SUBNET_MISMATCH.to_string() )], counts.receiver_subnet_mismatch - ) + ), + ( + &[("error", &CRITICAL_ERROR_ENGINE_MESSAGE.to_string())], + counts.engine_message + ), ])), nonzero_values(fetch_int_counter_vec(&self.registry, "critical_errors")) ); @@ -3848,6 +4277,7 @@ struct CriticalErrorCounts { pub bad_reject_signal_for_response: u64, pub sender_subnet_mismatch: u64, pub receiver_subnet_mismatch: u64, + pub engine_message: u64, } /// Populates the given `state`'s canister migrations with a single entry, diff --git a/rs/protobuf/def/state/queues/v1/queues.proto b/rs/protobuf/def/state/queues/v1/queues.proto index c870b1333a54..2454f2ecabf3 100644 --- a/rs/protobuf/def/state/queues/v1/queues.proto +++ b/rs/protobuf/def/state/queues/v1/queues.proto @@ -32,6 +32,7 @@ enum RejectReason { REJECT_REASON_QUEUE_FULL = 5; REJECT_REASON_OUT_OF_MEMORY = 6; REJECT_REASON_UNKNOWN = 7; + REJECT_REASON_ENGINE_NOT_ALLOWED = 8; } message RejectSignal { diff --git a/rs/protobuf/src/gen/state/state.queues.v1.rs b/rs/protobuf/src/gen/state/state.queues.v1.rs index cfd87b780904..c1797fc65b8d 100644 --- a/rs/protobuf/src/gen/state/state.queues.v1.rs +++ b/rs/protobuf/src/gen/state/state.queues.v1.rs @@ -300,6 +300,7 @@ pub enum RejectReason { QueueFull = 5, OutOfMemory = 6, Unknown = 7, + EngineNotAllowed = 8, } impl RejectReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -316,6 +317,7 @@ impl RejectReason { Self::QueueFull => "REJECT_REASON_QUEUE_FULL", Self::OutOfMemory => "REJECT_REASON_OUT_OF_MEMORY", Self::Unknown => "REJECT_REASON_UNKNOWN", + Self::EngineNotAllowed => "REJECT_REASON_ENGINE_NOT_ALLOWED", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -329,6 +331,7 @@ impl RejectReason { "REJECT_REASON_QUEUE_FULL" => Some(Self::QueueFull), "REJECT_REASON_OUT_OF_MEMORY" => Some(Self::OutOfMemory), "REJECT_REASON_UNKNOWN" => Some(Self::Unknown), + "REJECT_REASON_ENGINE_NOT_ALLOWED" => Some(Self::EngineNotAllowed), _ => None, } } diff --git a/rs/protobuf/src/gen/types/state.queues.v1.rs b/rs/protobuf/src/gen/types/state.queues.v1.rs index 08001487edfe..dab2108a24e5 100644 --- a/rs/protobuf/src/gen/types/state.queues.v1.rs +++ b/rs/protobuf/src/gen/types/state.queues.v1.rs @@ -300,6 +300,7 @@ pub enum RejectReason { QueueFull = 5, OutOfMemory = 6, Unknown = 7, + EngineNotAllowed = 8, } impl RejectReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -316,6 +317,7 @@ impl RejectReason { Self::QueueFull => "REJECT_REASON_QUEUE_FULL", Self::OutOfMemory => "REJECT_REASON_OUT_OF_MEMORY", Self::Unknown => "REJECT_REASON_UNKNOWN", + Self::EngineNotAllowed => "REJECT_REASON_ENGINE_NOT_ALLOWED", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -329,6 +331,7 @@ impl RejectReason { "REJECT_REASON_QUEUE_FULL" => Some(Self::QueueFull), "REJECT_REASON_OUT_OF_MEMORY" => Some(Self::OutOfMemory), "REJECT_REASON_UNKNOWN" => Some(Self::Unknown), + "REJECT_REASON_ENGINE_NOT_ALLOWED" => Some(Self::EngineNotAllowed), _ => None, } } diff --git a/rs/replicated_state/src/metadata_state/tests.rs b/rs/replicated_state/src/metadata_state/tests.rs index 35a403c0d15d..6407dbca03e1 100644 --- a/rs/replicated_state/src/metadata_state/tests.rs +++ b/rs/replicated_state/src/metadata_state/tests.rs @@ -2287,7 +2287,7 @@ fn compatibility_for_reject_reason() { RejectReason::iter() .map(|reason| reason as i32) .collect::>(), - [1, 2, 3, 4, 5, 6, 7] + [1, 2, 3, 4, 5, 6, 7, 8] ); } diff --git a/rs/test_utilities/src/state_manager.rs b/rs/test_utilities/src/state_manager.rs index 5d4602b20eef..026893dcb60a 100644 --- a/rs/test_utilities/src/state_manager.rs +++ b/rs/test_utilities/src/state_manager.rs @@ -460,6 +460,7 @@ pub enum SerializableRejectReason { QueueFull = 5, OutOfMemory = 6, Unknown = 7, + EngineNotAllowed = 8, } impl From<&RejectReason> for SerializableRejectReason { @@ -472,6 +473,7 @@ impl From<&RejectReason> for SerializableRejectReason { RejectReason::QueueFull => Self::QueueFull, RejectReason::OutOfMemory => Self::OutOfMemory, RejectReason::Unknown => Self::Unknown, + RejectReason::EngineNotAllowed => Self::EngineNotAllowed, } } } @@ -486,6 +488,7 @@ impl From for RejectReason { SerializableRejectReason::QueueFull => RejectReason::QueueFull, SerializableRejectReason::OutOfMemory => RejectReason::OutOfMemory, SerializableRejectReason::Unknown => RejectReason::Unknown, + SerializableRejectReason::EngineNotAllowed => RejectReason::EngineNotAllowed, } } } diff --git a/rs/tests/message_routing/xnet/xnet_cloud_engine_isolation_test.rs b/rs/tests/message_routing/xnet/xnet_cloud_engine_isolation_test.rs index c35f92157505..0bb93c511fa9 100644 --- a/rs/tests/message_routing/xnet/xnet_cloud_engine_isolation_test.rs +++ b/rs/tests/message_routing/xnet/xnet_cloud_engine_isolation_test.rs @@ -1,10 +1,11 @@ /* tag::catalog[] -Title:: CloudEngine subnets are isolated from XNet traffic. +Title:: CloudEngine subnets enforce XNet filtering. -Goal:: Verify that a CloudEngine subnet cannot exchange XNet messages with -other subnets (including other CloudEngine subnets), while intra-subnet -(loopback) calls still work. Additionally verify that the state tree only -exposes the expected subnets via /subnet and /canister_ranges. +Goal:: Verify that a CloudEngine subnet only accepts best-effort, +no-cycles XNet messages (and rejects guaranteed-response calls or calls +carrying cycles), while intra-subnet (loopback) calls always work. +Additionally verify that the state tree exposes the full topology to all +subnets via /subnet and /canister_ranges. Runbook:: 0. Set up an IC with one System (NNS) subnet, one Application subnet, and @@ -15,19 +16,20 @@ Runbook:: other (intra-subnet). 4. Verify that XNet calls between NNS and Application subnets succeed (both directions). -5. Verify that XNet calls involving any CloudEngine subnet are rejected: - Application <-> CloudEngine, NNS <-> CloudEngine, and - CloudEngine 1 <-> CloudEngine 2. +5. For every pair crossing an engine boundary (Application <-> CloudEngine, + NNS <-> CloudEngine, CloudEngine 1 <-> CloudEngine 2): + a. Guaranteed-response (unbounded-wait) calls are rejected with SysFatal. + b. Best-effort (bounded-wait), no-cycles calls succeed. + c. Best-effort calls carrying cycles are rejected with SysFatal. 6. Via read_state, verify that /subnet//public_key and - /canister_ranges/ are present only for the expected subnets: - - NNS sees all four subnets (full topology). - - Application sees NNS and itself. - - Each CloudEngine sees only itself. + /canister_ranges/ are present for all subnets on every subnet + (full topology). Success:: All assertions pass: loopback works, non-CloudEngine XNet works, -all cross-subnet directions involving CloudEngine are rejected, and -the state tree matches the expected visibility. +guaranteed-response cross-engine calls are rejected, best-effort +no-cycles cross-engine calls succeed, and the state tree shows the +full topology on every subnet. end::catalog[] */ @@ -290,12 +292,19 @@ fn test(env: TestEnv) { ); info!(logger, "Application -> NNS succeeded."); - // ── XNet calls that should be rejected (CloudEngine involved) ─── - - // All cross-subnet calls involving a CloudEngine subnet must fail - // with DestinationInvalid. The on_reject handler replies with the - // reject code so we can verify the exact code. - let rejected_pairs: &[(&str, &UniversalCanister<'_>, &str, &UniversalCanister<'_>)] = &[ + // ── XNet calls crossing an engine boundary ────────────────────── + // + // For every pair crossing an engine boundary: + // - Guaranteed-response (unbounded-wait) calls must be rejected + // with SysFatal. + // - Best-effort (bounded-wait), no-cycles calls must succeed. + // - Best-effort calls carrying cycles must be rejected with SysFatal. + let engine_boundary_pairs: &[( + &str, + &UniversalCanister<'_>, + &str, + &UniversalCanister<'_>, + )] = &[ ("Application", &uc_app, "CloudEngine 1", &uc_ce_1a), ("CloudEngine 1", &uc_ce_1a, "Application", &uc_app), ("NNS", &uc_nns, "CloudEngine 1", &uc_ce_1a), @@ -306,10 +315,10 @@ fn test(env: TestEnv) { ("CloudEngine 2", &uc_ce_2a, "CloudEngine 1", &uc_ce_1a), ]; - for (src_name, src_uc, dst_name, dst_uc) in rejected_pairs { + for (src_name, src_uc, dst_name, dst_uc) in engine_boundary_pairs { info!( logger, - "Testing XNet call: {src_name} -> {dst_name} (should fail)..." + "Testing XNet call: {src_name} -> {dst_name} (guaranteed-response, should fail)..." ); let res = src_uc .update( @@ -321,8 +330,58 @@ fn test(env: TestEnv) { ), ) .await; - assert_xnet_rejected(res, RejectCode::DestinationInvalid); - info!(logger, "{src_name} -> {dst_name} correctly rejected."); + assert_xnet_rejected(res, RejectCode::SysFatal); + info!( + logger, + "{src_name} -> {dst_name} guaranteed-response correctly rejected." + ); + + info!( + logger, + "Testing XNet call: {src_name} -> {dst_name} (best-effort, should succeed)..." + ); + let res = src_uc + .update( + wasm().call_simple_with_cycles_and_best_effort_response( + dst_uc.canister_id(), + "update", + call_args() + .other_side(wasm().reply_data(&data)) + .on_reject(wasm().reject_message().reject()), + 0_u128, + 30_u32, + ), + ) + .await; + assert_eq!( + res.unwrap(), + data, + "{src_name} -> {dst_name} best-effort should succeed" + ); + info!(logger, "{src_name} -> {dst_name} best-effort succeeded."); + + info!( + logger, + "Testing XNet call: {src_name} -> {dst_name} (best-effort with cycles, should fail)..." + ); + let res = src_uc + .update( + wasm().call_simple_with_cycles_and_best_effort_response( + dst_uc.canister_id(), + "update", + call_args() + .other_side(wasm().reply_data(&data)) + .on_reject(wasm().reject_code().reply_int()), + 1_u128, + 30_u32, + ), + ) + .await; + assert_xnet_rejected(res, RejectCode::SysFatal); + info!( + logger, + "{src_name} -> {dst_name} best-effort with cycles correctly rejected." + ); } // ── Verify /subnet and /canister_ranges via read_state ────────── @@ -331,9 +390,7 @@ fn test(env: TestEnv) { // /canister_ranges/ for every known subnet ID and verify that // the expected ones are present and the rest absent. // - // NNS sees all subnets (full topology for the state tree). - // Application sees NNS and itself. - // Each CloudEngine sees only itself. + // All subnets see the full topology (all four subnets). info!( logger, "Verifying /subnet and /canister_ranges via read_state..." @@ -362,19 +419,19 @@ fn test(env: TestEnv) { "Application", &app_agent, app_subnet_id, - vec![nns_subnet_id, app_subnet_id], + vec![nns_subnet_id, app_subnet_id, ce_subnet_id_1, ce_subnet_id_2], ), ( "CloudEngine 1", &ce_agent_1, ce_subnet_id_1, - vec![ce_subnet_id_1], + vec![nns_subnet_id, app_subnet_id, ce_subnet_id_1, ce_subnet_id_2], ), ( "CloudEngine 2", &ce_agent_2, ce_subnet_id_2, - vec![ce_subnet_id_2], + vec![nns_subnet_id, app_subnet_id, ce_subnet_id_1, ce_subnet_id_2], ), ] { info!(logger, "Checking /subnet and /canister_ranges on {name}..."); diff --git a/rs/types/types/src/xnet.rs b/rs/types/types/src/xnet.rs index 30393a46e404..1c308e0b686d 100644 --- a/rs/types/types/src/xnet.rs +++ b/rs/types/types/src/xnet.rs @@ -211,6 +211,10 @@ pub enum RejectReason { /// `StateError` variants that shouldn't be possible to occur for requests. /// It is not expected that this reason will ever be used. Unknown = 7, + + /// Request rejected because the sending subnet is a CloudEngine subnet, + /// which is not allowed to send guaranteed-response calls to non-engine subnets. + EngineNotAllowed = 8, } impl RejectReason { @@ -231,6 +235,7 @@ impl From for pb_queues::RejectReason { RejectReason::QueueFull => Self::QueueFull, RejectReason::OutOfMemory => Self::OutOfMemory, RejectReason::Unknown => Self::Unknown, + RejectReason::EngineNotAllowed => Self::EngineNotAllowed, } } } @@ -250,6 +255,7 @@ impl TryFrom for RejectReason { pb_queues::RejectReason::QueueFull => Ok(Self::QueueFull), pb_queues::RejectReason::OutOfMemory => Ok(Self::OutOfMemory), pb_queues::RejectReason::Unknown => Ok(Self::Unknown), + pb_queues::RejectReason::EngineNotAllowed => Ok(Self::EngineNotAllowed), } } } diff --git a/rs/xnet/payload_builder/src/impl_tests.rs b/rs/xnet/payload_builder/src/impl_tests.rs index ee430598ace2..dc1136f8781b 100644 --- a/rs/xnet/payload_builder/src/impl_tests.rs +++ b/rs/xnet/payload_builder/src/impl_tests.rs @@ -88,7 +88,7 @@ async fn expected_stream_indices() { /// `expected_stream_indices` should not include `SUBNET_6` /// (registered as `CloudEngine` in `get_simple_registry_for_test`). #[tokio::test] -async fn expected_stream_indices_excludes_engine_subnets() { +async fn expected_stream_indices_includes_engine_subnets() { with_test_replica_logger(|log| { let state_manager = FakeStateManager::new(); let (payloads, _expected_indices) = get_xnet_state_for_testing(&state_manager); @@ -126,13 +126,13 @@ async fn expected_stream_indices_excludes_engine_subnets() { ) .unwrap(); - // SUBNET_6 is a CloudEngine and must be excluded. + // SUBNET_6 is a CloudEngine and must now be included. assert!( - !computed_indices.contains_key(&SUBNET_6), - "CloudEngine subnet SUBNET_6 should be excluded from expected stream indices" + computed_indices.contains_key(&SUBNET_6), + "CloudEngine subnet SUBNET_6 should be included in expected stream indices" ); - // Application subnets should still be present. + // Application subnets should also be present. for subnet in &[SUBNET_1, SUBNET_2, SUBNET_3, SUBNET_4] { assert!( computed_indices.contains_key(subnet), diff --git a/rs/xnet/payload_builder/src/lib.rs b/rs/xnet/payload_builder/src/lib.rs index 7a97c91aabc7..f7f31418d88b 100644 --- a/rs/xnet/payload_builder/src/lib.rs +++ b/rs/xnet/payload_builder/src/lib.rs @@ -489,16 +489,7 @@ impl XNetPayloadBuilderImpl { .map_err(Error::RegistryGetSubnetsFailed)? .unwrap_or_default(); - let subnet_ids: Vec<_> = all_subnet_ids - .into_iter() - .filter(|subnet_id| { - self.registry - .get_subnet_type(*subnet_id, validation_context.registry_version) - .is_ok_and(|maybe_t| { - maybe_t.is_some_and(|t| t != SubnetType::CloudEngine.into()) - }) - }) - .collect(); + let subnet_ids: Vec<_> = all_subnet_ids.into_iter().collect(); let expected_indices = subnet_ids .into_iter() @@ -641,18 +632,11 @@ impl XNetPayloadBuilderImpl { ); } - // Do not accept slices from CloudEngine subnets or subnets whose type - // cannot be determined. + // Do not accept slices from subnets whose type cannot be determined. match self .registry .get_subnet_type(subnet_id, validation_context.registry_version) { - Ok(Some(subnet_type)) if subnet_type == SubnetType::CloudEngine.into() => { - return SliceValidationResult::Invalid(format!( - "Slice from CloudEngine subnet {}", - subnet_id - )); - } Ok(Some(_)) => {} Ok(None) => { return SliceValidationResult::Invalid(format!( @@ -870,11 +854,6 @@ impl XNetPayloadBuilderImpl { })? .take(); - // CloudEngine subnets do not participate in XNet. - if state.metadata.own_subnet_type == SubnetType::CloudEngine { - return Ok((XNetPayload::default(), 0.into())); - } - // Build the payload based on indices computed from state + past payloads. let stream_positions = self.expected_stream_indices(validation_context, &state, past_payloads)?; @@ -1235,16 +1214,6 @@ impl XNetPayloadBuilder for XNetPayloadBuilderImpl { } }; - if state.metadata.own_subnet_type == SubnetType::CloudEngine - && !payload.stream_slices.is_empty() - { - return Err(ValidationError::InvalidArtifact( - InvalidXNetPayload::InvalidSlice( - "CloudEngine subnets do not accept XNet payloads".to_string(), - ), - )); - } - // For every slice in `payload`, check certification and gaps/duplicates. let mut new_stream_positions = Vec::new(); let mut payload_byte_size = 0; diff --git a/rs/xnet/payload_builder/src/tests.rs b/rs/xnet/payload_builder/src/tests.rs index 7ea7d010049c..033a518d8d45 100644 --- a/rs/xnet/payload_builder/src/tests.rs +++ b/rs/xnet/payload_builder/src/tests.rs @@ -522,31 +522,52 @@ impl PayloadBuilderTestFixture { } } -/// CloudEngine subnets must produce an empty XNet payload. +/// CloudEngine subnets now participate in XNet. At the unit level this checks +/// that `expected_stream_indices` returns entries for an engine's Application +/// peers (no filter excluding them), so the engine will attempt to pull from +/// them. The end-to-end behavior — that a CloudEngine subnet actually builds and +/// validates XNet payloads and completes a best-effort cross-subnet call (which +/// requires `get_xnet_payload`/`validate_xnet_payload` to no longer short-circuit +/// for engines) — is covered by `xnet_cloud_engine_isolation_test`. #[tokio::test] -async fn cloud_engine_get_xnet_payload_returns_empty() { +async fn cloud_engine_get_xnet_payload_participates() { with_test_replica_logger(|log| { let fixture = PayloadBuilderTestFixture::with_xnet_state_and_subnet_types( - 0, + 4, btreemap![], Some(SubnetType::CloudEngine), ); let xnet_payload_builder = fixture.new_xnet_payload_builder_impl(log); - let (payload, byte_size) = xnet_payload_builder.get_xnet_payload( - &fixture.validation_context, - &[], - PAYLOAD_BYTES_LIMIT, - ); + let state = fixture + .state_manager + .get_state_at(fixture.validation_context.certified_height) + .unwrap() + .take(); + let past_payloads = fixture.past_payloads(); + let stream_positions = xnet_payload_builder + .expected_stream_indices( + &fixture.validation_context, + state.as_ref(), + past_payloads.as_slice(), + ) + .unwrap(); - assert!(payload.stream_slices.is_empty()); - assert_eq!(byte_size, NumBytes::from(0)); + // The engine produces non-empty stream positions — it will try to pull + // from peers (Application subnets SUBNET_1..SUBNET_4 are all present). + assert!(!stream_positions.is_empty()); + for subnet in &[SUBNET_1, SUBNET_2, SUBNET_3, SUBNET_4] { + assert!( + stream_positions.contains_key(subnet), + "CloudEngine should pull from Application subnet {subnet:?}" + ); + } }); } -/// CloudEngine subnets must reject non-empty XNet payloads during validation. +/// CloudEngine subnets must accept non-empty XNet payloads during validation. #[tokio::test] -async fn cloud_engine_validate_rejects_non_empty_payload() { +async fn cloud_engine_validate_accepts_non_empty_payload() { with_test_replica_logger(|log| { let fixture = PayloadBuilderTestFixture::with_xnet_state_and_subnet_types( 0, @@ -555,48 +576,30 @@ async fn cloud_engine_validate_rejects_non_empty_payload() { ); let xnet_payload_builder = fixture.new_xnet_payload_builder_impl(log); - // A non-empty payload with a slice from SUBNET_1. - let slice = make_certified_stream_slice( - SUBNET_1, - StreamConfig { - message_begin: 0, - message_end: 2, - signal_end: 0, - }, - ); - let payload = XNetPayload { - stream_slices: btreemap![SUBNET_1 => slice], - }; - - assert_matches!( - xnet_payload_builder.validate_xnet_payload( - &payload, - &fixture.validation_context, - &[], - ), - Err(ValidationError::InvalidArtifact( - InvalidXNetPayload::InvalidSlice(msg) - )) if msg.contains("CloudEngine") + // Use a pre-built payload from the fixture — indices are consistent with + // the state, so validation passes for any subnet type including CloudEngine. + let payload = fixture.payloads.last().unwrap(); + assert!( + !payload.stream_slices.is_empty(), + "fixture payload must be non-empty for this test to be meaningful" ); - // An empty payload should still be accepted. - let empty_payload = XNetPayload::default(); - assert_eq!( - NumBytes::from(0), + assert!( xnet_payload_builder - .validate_xnet_payload(&empty_payload, &fixture.validation_context, &[]) - .unwrap() + .validate_xnet_payload(payload, &fixture.validation_context, &[]) + .is_ok(), + "CloudEngine subnet should accept non-empty XNet payloads" ); }); } -/// Non-CloudEngine subnets must reject slices originating from a CloudEngine subnet. +/// Non-CloudEngine subnets must accept slices originating from a CloudEngine subnet. #[tokio::test] -async fn validate_rejects_slice_from_cloud_engine_subnet() { +async fn validate_accepts_slice_from_cloud_engine_subnet() { with_test_replica_logger(|log| { let cloud_engine_subnet = SUBNET_1; let fixture = PayloadBuilderTestFixture::with_xnet_state_and_subnet_types( - 1, + 0, btreemap![cloud_engine_subnet => SubnetType::CloudEngine], None, ); @@ -604,28 +607,19 @@ async fn validate_rejects_slice_from_cloud_engine_subnet() { // This is an Application subnet validating an incoming payload. let xnet_payload_builder = fixture.new_xnet_payload_builder_impl(log); - // A payload containing a slice from the CloudEngine subnet. - let slice = make_certified_stream_slice( - cloud_engine_subnet, - StreamConfig { - message_begin: 0, - message_end: 2, - signal_end: 0, - }, + // Use a pre-built payload from the fixture. SUBNET_1 is the CloudEngine + // subnet; a slice from it must now be accepted by a non-engine subnet. + let payload = fixture.payloads.last().unwrap(); + assert!( + payload.stream_slices.contains_key(&cloud_engine_subnet), + "fixture payload must contain a slice from the CloudEngine subnet" ); - let payload = XNetPayload { - stream_slices: btreemap![cloud_engine_subnet => slice], - }; - assert_matches!( - xnet_payload_builder.validate_xnet_payload( - &payload, - &fixture.validation_context, - &[], - ), - Err(ValidationError::InvalidArtifact( - InvalidXNetPayload::InvalidSlice(msg) - )) if msg.contains("CloudEngine") + assert!( + xnet_payload_builder + .validate_xnet_payload(payload, &fixture.validation_context, &[]) + .is_ok(), + "Application subnet should accept slices from a CloudEngine subnet" ); }); } @@ -665,9 +659,9 @@ async fn validate_rejects_slice_from_unknown_subnet() { }); } -/// An Application subnet with a registered CloudEngine peer must not pull from it. +/// An Application subnet with a registered CloudEngine peer must pull from it. #[tokio::test] -async fn build_payload_skips_cloud_engine_subnet() { +async fn build_payload_includes_cloud_engine_subnet() { with_test_replica_logger(|log| { // Register 2 subnets: SUBNET_1 as CloudEngine, SUBNET_2 as Application. let fixture = PayloadBuilderTestFixture::with_xnet_state_and_subnet_types( @@ -692,8 +686,8 @@ async fn build_payload_skips_cloud_engine_subnet() { ) .unwrap(); - // SUBNET_2 (Application) is included, SUBNET_1 (CloudEngine) is not. + // Both SUBNET_1 (CloudEngine) and SUBNET_2 (Application) are included. + assert!(stream_positions.contains_key(&SUBNET_1)); assert!(stream_positions.contains_key(&SUBNET_2)); - assert!(!stream_positions.contains_key(&SUBNET_1)); }); }