From 35f20e076a073900e5765248bc8ae57aa0154f25 Mon Sep 17 00:00:00 2001 From: Stefan Schneider Date: Wed, 13 May 2026 08:31:39 +0000 Subject: [PATCH 1/4] feat: Allow xnet on engines This commit opens engines up to send and receive XNet messages with 2 restrictions on messages that involve engines (engine->subnet, subnet->engine, engine->engine, but not messages staying on the same engine): 1. Only bounded wait calls are allowed 2. Messages cannot contain attached cycles Messages that are not allowed will be rejected by the protocol and the canister receives an error. The implementation is in 3 places: 1. The StreamBuilder rejects these messages on the sending side 2. The XNet PayloadBuilder rejects these messages on the receiving side. No honest subnet would send any of these messages, but this protects from malicious subnets. 3. The NetworkTopology now contains all other subnets again. Previously, when engines were not able to do any XNet, the NetworkTopology would only contain other subnets if it could send messages to them. The exception to this is the list of ecsda subnets, which does not contain other subnets that do not allow sending cycles to them. This is because the threshold ecdsa endpoint expect to be called with cycles attached. --- .../src/encoding/tests/conversion.rs | 1 + rs/canonical_state/src/encoding/types.rs | 5 + rs/messaging/src/message_routing.rs | 73 ++- rs/messaging/src/message_routing/tests.rs | 235 +++++++--- rs/messaging/src/routing/stream_builder.rs | 87 +++- .../src/routing/stream_builder/tests.rs | 254 ++++++++++ rs/messaging/src/routing/stream_handler.rs | 142 +++++- .../src/routing/stream_handler/tests.rs | 434 +++++++++++++++++- rs/protobuf/def/state/queues/v1/queues.proto | 1 + rs/protobuf/src/gen/state/state.queues.v1.rs | 3 + rs/protobuf/src/gen/types/state.queues.v1.rs | 3 + .../src/metadata_state/tests.rs | 2 +- rs/test_utilities/src/state_manager.rs | 3 + .../xnet/xnet_cloud_engine_isolation_test.rs | 117 +++-- rs/types/types/src/xnet.rs | 6 + rs/xnet/payload_builder/src/impl_tests.rs | 10 +- rs/xnet/payload_builder/src/lib.rs | 35 +- rs/xnet/payload_builder/src/tests.rs | 128 +++--- 18 files changed, 1300 insertions(+), 239 deletions(-) diff --git a/rs/canonical_state/src/encoding/tests/conversion.rs b/rs/canonical_state/src/encoding/tests/conversion.rs index 141b2f074010..c491f2d664e1 100644 --- a/rs/canonical_state/src/encoding/tests/conversion.rs +++ b/rs/canonical_state/src/encoding/tests/conversion.rs @@ -83,6 +83,7 @@ fn with_stream_header_deltas( QueueFull => header.reject_signals.queue_full_deltas = deltas, OutOfMemory => header.reject_signals.out_of_memory_deltas = deltas, Unknown => header.reject_signals.unknown_deltas = deltas, + EngineNotAllowed => header.reject_signals.engine_not_allowed_deltas = deltas, } header } diff --git a/rs/canonical_state/src/encoding/types.rs b/rs/canonical_state/src/encoding/types.rs index fa1c0c81e32c..4bb2e28a40ef 100644 --- a/rs/canonical_state/src/encoding/types.rs +++ b/rs/canonical_state/src/encoding/types.rs @@ -67,6 +67,8 @@ pub struct RejectSignals { pub out_of_memory_deltas: Vec, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub unknown_deltas: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub engine_not_allowed_deltas: Vec, } impl RejectSignals { @@ -78,6 +80,7 @@ impl RejectSignals { && self.queue_full_deltas.is_empty() && self.out_of_memory_deltas.is_empty() && self.unknown_deltas.is_empty() + && self.engine_not_allowed_deltas.is_empty() } } @@ -332,6 +335,7 @@ impl From<(&VecDeque, StreamIndex, CertificationVersion)> for Reje queue_full_deltas: deltas_for(RejectReason::QueueFull), out_of_memory_deltas: deltas_for(RejectReason::OutOfMemory), unknown_deltas: deltas_for(RejectReason::Unknown), + engine_not_allowed_deltas: deltas_for(RejectReason::EngineNotAllowed), } } } @@ -351,6 +355,7 @@ pub(crate) fn try_from_deltas( (QueueFull, &reject_signals.queue_full_deltas), (OutOfMemory, &reject_signals.out_of_memory_deltas), (Unknown, &reject_signals.unknown_deltas), + (EngineNotAllowed, &reject_signals.engine_not_allowed_deltas), ] { let mut stream_index = StreamIndex::new(signals_end); for delta in deltas.iter().rev() { diff --git a/rs/messaging/src/message_routing.rs b/rs/messaging/src/message_routing.rs index e737a85ee358..42adbe00d77c 100644 --- a/rs/messaging/src/message_routing.rs +++ b/rs/messaging/src/message_routing.rs @@ -127,6 +127,7 @@ const CRITICAL_ERROR_NO_CANISTER_ALLOCATION_RANGE: &str = "mr_empty_canister_all const CRITICAL_ERROR_FAILED_TO_READ_REGISTRY: &str = "mr_failed_to_read_registry_error"; pub const CRITICAL_ERROR_NON_INCREASING_BATCH_TIME: &str = "mr_non_increasing_batch_time"; pub const CRITICAL_ERROR_INDUCT_RESPONSE_FAILED: &str = "mr_induct_response_failed"; +pub const CRITICAL_ERROR_ENGINE_MESSAGE: &str = "mr_engine_message"; const CRITICAL_ERROR_ILLEGAL_NON_EMPTY_SUBNET_ADMINS: &str = "mr_illegal_non_empty_subnet_admins"; /// Records the timestamp when all messages before the given index (down to the @@ -344,6 +345,10 @@ pub(crate) struct MessageRoutingMetrics { /// Critical error counter (see [`MetricsRegistry::error_counter`]) tracking /// failures to induct responses. pub critical_error_induct_response_failed: IntCounter, + /// Critical error counter (see [`MetricsRegistry::error_counter`]) tracking + /// messages to/from CloudEngine subnets that carried cycles or were + /// guaranteed-response calls, which are not permitted on non-engine subnets. + pub critical_error_engine_message: IntCounter, /// Critical error: a non-rental subnet has a non-empty subnet admins list. critical_error_illegal_non_empty_subnet_admins: IntCounter, @@ -487,6 +492,8 @@ impl MessageRoutingMetrics { .error_counter(CRITICAL_ERROR_NON_INCREASING_BATCH_TIME), critical_error_induct_response_failed: metrics_registry .error_counter(CRITICAL_ERROR_INDUCT_RESPONSE_FAILED), + critical_error_engine_message: metrics_registry + .error_counter(CRITICAL_ERROR_ENGINE_MESSAGE), critical_error_illegal_non_empty_subnet_admins: metrics_registry .error_counter(CRITICAL_ERROR_ILLEGAL_NON_EMPTY_SUBNET_ADMINS), @@ -1070,43 +1077,17 @@ impl BatchProcessorImpl { .map_err(|err| registry_error("routing table", None, err))? .unwrap_or_default(); - // Derive filtered subnets and routing table. - let (subnets, routing_table) = if own_subnet_type == SubnetType::CloudEngine { - // CloudEngine subnets only see themselves. - let subnets = all_subnets - .iter() - .filter(|(id, _)| **id == own_subnet_id) - .map(|(id, topo)| (*id, topo.clone())) - .collect(); - let routing_table = full_routing_table - .iter() - .filter(|(_, id)| **id == own_subnet_id) - .map(|(range, id)| (*range, *id)) - .collect::>() - .try_into() - .map_err(|err| { - Persistent(format!( - "'filtered routing table for CloudEngine subnet {}', err: {:?}", - own_subnet_id, err - )) - })?; - (subnets, routing_table) - } else { - // Non-engine subnets see every subnet that is *not* a CloudEngine. - let subnets: BTreeMap<_, _> = all_subnets - .iter() - .filter(|(_, topo)| topo.subnet_type != SubnetType::CloudEngine) - .map(|(id, topo)| (*id, topo.clone())) - .collect(); - let routing_table = full_routing_table - .iter() - .filter(|(_, id)| subnets.contains_key(id)) - .map(|(range, id)| (*range, *id)) - .collect::>() - .try_into() - .map_err(|err| Persistent(format!("'filtered routing table', err: {:?}", err)))?; - (subnets, routing_table) - }; + // All subnets see the full topology, including CloudEngine subnets. + let subnets: BTreeMap<_, _> = all_subnets + .iter() + .map(|(id, topo)| (*id, topo.clone())) + .collect(); + let routing_table = full_routing_table + .iter() + .map(|(range, id)| (*range, *id)) + .collect::>() + .try_into() + .map_err(|err| Persistent(format!("routing table err: {:?}", err)))?; let canister_migrations = self .registry .get_canister_migrations(registry_version) @@ -1130,6 +1111,22 @@ impl BatchProcessorImpl { .map_err(|err| registry_error("default initial DKG subnet ID", None, err))? .filter(|subnet_id| subnets.contains_key(subnet_id)); + // A signing subnet is only usable from here if a chain-key request can + // reach it without crossing the CloudEngine boundary: requests carrying + // cycles or guaranteed-response calls are rejected at that boundary (see + // `StreamBuilderImpl`/`StreamHandlerImpl`), and chain-key requests are + // always both. The local subnet is always reachable (loopback); any other + // subnet is reachable only if neither end is a CloudEngine. Pruning such + // subnets here means routing returns a clean `ChainKeyError` (and the cost + // API an `UnknownKey`) instead of routing a doomed request to the boundary. + let chain_key_subnet_is_reachable = |id: &SubnetId| match subnets.get(id) { + None => false, + Some(topo) => { + *id == own_subnet_id + || (own_subnet_type != SubnetType::CloudEngine + && topo.subnet_type != SubnetType::CloudEngine) + } + }; let chain_key_enabled_subnets: BTreeMap<_, _> = self .registry .get_chain_key_enabled_subnets(registry_version) @@ -1139,7 +1136,7 @@ impl BatchProcessorImpl { .filter_map(|(key, subnet_ids)| { let filtered: Vec<_> = subnet_ids .into_iter() - .filter(|id| subnets.contains_key(id)) + .filter(&chain_key_subnet_is_reachable) .collect(); if filtered.is_empty() { None diff --git a/rs/messaging/src/message_routing/tests.rs b/rs/messaging/src/message_routing/tests.rs index 6734ff421580..796993d9cff3 100644 --- a/rs/messaging/src/message_routing/tests.rs +++ b/rs/messaging/src/message_routing/tests.rs @@ -1670,8 +1670,8 @@ fn try_read_registry_succeeds_and_populates_subnet_admins() { rental_subnet_record_from_topo.subnet_admins, btreeset! {rental_subnet_admin.get()} ); - // CloudEngine subnets are filtered out of the topology on non-NNS subnets. - assert!(network_topology.subnets().get(&engine_subnet_id).is_none()); + // CloudEngine subnets are visible in the full topology on all subnets. + assert!(network_topology.subnets().get(&engine_subnet_id).is_some()); }); } @@ -1743,8 +1743,8 @@ fn try_read_registry_succeeds_and_resets_subnet_admins() { // Check that subnet admins are reset and a critical error is raised. let own_subnet_record_from_topo = network_topology.subnets().get(&own_subnet_id).unwrap(); assert_eq!(own_subnet_record_from_topo.subnet_admins, BTreeSet::new()); - // CloudEngine subnets are filtered out of the topology on non-NNS subnets. - assert!(network_topology.subnets().get(&engine_subnet_id).is_none()); + // CloudEngine subnets are visible in the full topology on all subnets. + assert!(network_topology.subnets().get(&engine_subnet_id).is_some()); let nns_subnet_record_from_topo = network_topology.subnets().get(&nns_subnet_id).unwrap(); assert_eq!(nns_subnet_record_from_topo.subnet_admins, BTreeSet::new()); // The critical error is still raised for all 3 subnets (before filtering). @@ -1815,57 +1815,49 @@ fn setup_three_subnet_registry() -> (Arc, SubnetId, SubnetId ) } -/// Tests that a CloudEngine subnet sees only itself in the resulting topology: -/// `subnets`, `routing_table`, `subnets_for_certification`, and `routing_table_for_certification` -/// all contain only the own subnet. +/// Tests that a CloudEngine subnet sees the full topology (all three subnets): +/// `subnets`, `routing_table`, `subnets_for_certification`, and +/// `routing_table_for_certification` all contain every subnet. #[test] -fn try_read_registry_engine_subnet_sees_only_itself() { +fn try_read_registry_engine_subnet_sees_full_topology() { with_test_replica_logger(|log| { - let (registry, _app_subnet_id, engine_subnet_id, _nns_subnet_id) = + let (registry, app_subnet_id, engine_subnet_id, nns_subnet_id) = setup_three_subnet_registry(); let network_topology = try_to_read_registry(registry, log, engine_subnet_id) .unwrap() .0; - // Filtered view: only the own engine subnet. - assert_eq!( - network_topology.subnets().keys().collect::>(), - vec![&engine_subnet_id], - ); - assert_eq!( - network_topology - .routing_table() - .iter() - .map(|(_, sid)| *sid) - .collect::>(), - BTreeSet::from([engine_subnet_id]), - ); + let subnet_keys: Vec<_> = network_topology.subnets().keys().copied().collect(); + assert!(subnet_keys.contains(&app_subnet_id)); + assert!(subnet_keys.contains(&engine_subnet_id)); + assert!(subnet_keys.contains(&nns_subnet_id)); - // Engine accessors also return only the own subnet (no full_topology on engines). + let rt_subnets: BTreeSet<_> = network_topology + .routing_table() + .iter() + .map(|(_, sid)| *sid) + .collect(); + assert!(rt_subnets.contains(&app_subnet_id)); + assert!(rt_subnets.contains(&engine_subnet_id)); + assert!(rt_subnets.contains(&nns_subnet_id)); + + // No full_topology on engine subnets; certification accessors fall back to subnets(). assert_eq!( - network_topology - .subnets_for_certification() - .keys() - .collect::>(), - vec![&engine_subnet_id], + network_topology.subnets_for_certification(), + network_topology.subnets(), ); assert_eq!( - network_topology - .routing_table_for_certification() - .iter() - .map(|(_, sid)| *sid) - .collect::>(), - BTreeSet::from([engine_subnet_id]), + network_topology.routing_table_for_certification(), + network_topology.routing_table(), ); }); } -/// Tests that an Application subnet filters out CloudEngine subnets from its -/// topology: `subnets`, `routing_table`, `subnets_for_certification`, and -/// `routing_table_for_certification` all exclude the engine subnet. +/// Tests that an Application subnet sees the full topology (all three subnets), +/// including CloudEngine subnets. #[test] -fn try_read_registry_application_subnet_filters_out_engines() { +fn try_read_registry_application_subnet_sees_full_topology() { with_test_replica_logger(|log| { let (registry, app_subnet_id, engine_subnet_id, nns_subnet_id) = setup_three_subnet_registry(); @@ -1874,11 +1866,11 @@ fn try_read_registry_application_subnet_filters_out_engines() { .unwrap() .0; - // Filtered view: app and NNS subnets are visible, engine is excluded. + // Full view: all three subnets are visible, including the engine. let subnet_keys: Vec<_> = network_topology.subnets().keys().copied().collect(); assert!(subnet_keys.contains(&app_subnet_id)); assert!(subnet_keys.contains(&nns_subnet_id)); - assert!(!subnet_keys.contains(&engine_subnet_id)); + assert!(subnet_keys.contains(&engine_subnet_id)); let rt_subnets: BTreeSet<_> = network_topology .routing_table() @@ -1887,9 +1879,9 @@ fn try_read_registry_application_subnet_filters_out_engines() { .collect(); assert!(rt_subnets.contains(&app_subnet_id)); assert!(rt_subnets.contains(&nns_subnet_id)); - assert!(!rt_subnets.contains(&engine_subnet_id)); + assert!(rt_subnets.contains(&engine_subnet_id)); - // Engine accessors also exclude the engine (no full_topology on non-NNS subnets). + // No full_topology on non-NNS subnets; certification accessors fall back to subnets(). assert_eq!( network_topology.subnets_for_certification(), network_topology.subnets(), @@ -1901,9 +1893,8 @@ fn try_read_registry_application_subnet_filters_out_engines() { }); } -/// Tests that the NNS subnet filters out CloudEngine subnets from `subnets()` -/// and `routing_table()`, but `subnets_for_certification()` and -/// `routing_table_for_certification()` include all subnets (via `full_topology`). +/// Tests that the NNS subnet sees all subnets in both `subnets()` and +/// `subnets_for_certification()` (via `full_topology`), including engines. #[test] fn try_read_registry_nns_subnet_has_full_topology_with_engines() { with_test_replica_logger(|log| { @@ -1914,31 +1905,163 @@ fn try_read_registry_nns_subnet_has_full_topology_with_engines() { .unwrap() .0; - // Filtered view: app and NNS subnets are visible, engine is excluded. + // subnets() includes all three subnets. let subnet_keys: Vec<_> = network_topology.subnets().keys().copied().collect(); assert!(subnet_keys.contains(&app_subnet_id)); assert!(subnet_keys.contains(&nns_subnet_id)); - assert!(!subnet_keys.contains(&engine_subnet_id)); + assert!(subnet_keys.contains(&engine_subnet_id)); - // subnets_for_certification includes all three subnets via full_topology. - let all_keys: Vec<_> = network_topology + // subnets_for_certification also includes all three (via full_topology). + let cert_keys: Vec<_> = network_topology .subnets_for_certification() .keys() .copied() .collect(); - assert!(all_keys.contains(&app_subnet_id)); - assert!(all_keys.contains(&engine_subnet_id)); - assert!(all_keys.contains(&nns_subnet_id)); + assert!(cert_keys.contains(&app_subnet_id)); + assert!(cert_keys.contains(&engine_subnet_id)); + assert!(cert_keys.contains(&nns_subnet_id)); // routing_table_for_certification includes ranges for all three subnets. - let rt_with_engines_subnets: BTreeSet<_> = network_topology + let rt_cert: BTreeSet<_> = network_topology .routing_table_for_certification() .iter() .map(|(_, sid)| *sid) .collect(); - assert!(rt_with_engines_subnets.contains(&app_subnet_id)); - assert!(rt_with_engines_subnets.contains(&engine_subnet_id)); - assert!(rt_with_engines_subnets.contains(&nns_subnet_id)); + assert!(rt_cert.contains(&app_subnet_id)); + assert!(rt_cert.contains(&engine_subnet_id)); + assert!(rt_cert.contains(&nns_subnet_id)); + }); +} + +/// Like `setup_three_subnet_registry`, but also enables chain keys: `shared_key` +/// on both the Application and CloudEngine subnets, and `engine_only_key` on the +/// CloudEngine subnet alone. +fn setup_three_subnet_registry_with_chain_keys() +-> (Arc, SubnetId, SubnetId, SubnetId) { + use Integrity::*; + + let dummy_transcript = dummy_transcript_for_tests(); + + let app_subnet_id = subnet_test_id(1); + let app_subnet_record = SubnetRecord { + subnet_type: SubnetType::Application, + ..Default::default() + }; + let engine_subnet_id = subnet_test_id(2); + let engine_subnet_record = SubnetRecord { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }; + let nns_subnet_id = subnet_test_id(3); + let nns_subnet_record = SubnetRecord { + subnet_type: SubnetType::System, + ..Default::default() + }; + + let mut routing_table = RoutingTable::new(); + routing_table_insert_subnet(&mut routing_table, app_subnet_id).unwrap(); + routing_table_insert_subnet(&mut routing_table, engine_subnet_id).unwrap(); + routing_table_insert_subnet(&mut routing_table, nns_subnet_id).unwrap(); + + let chain_key_enabled_subnets = btreemap! { + shared_chain_key() => Valid(vec![app_subnet_id, engine_subnet_id]), + engine_only_chain_key() => Valid(vec![engine_subnet_id]), + }; + + let fixture = RegistryFixture::new(); + fixture + .write_test_records(&TestRecords { + subnet_ids: Valid([app_subnet_id, engine_subnet_id, nns_subnet_id]), + subnet_records: [ + Valid(&app_subnet_record), + Valid(&engine_subnet_record), + Valid(&nns_subnet_record), + ], + ni_dkg_transcripts: [Valid(Some(&dummy_transcript)); 3], + nns_subnet_id: Valid(nns_subnet_id), + chain_key_enabled_subnets: &chain_key_enabled_subnets, + provisional_whitelist: Missing, + routing_table: Valid(&routing_table), + canister_migrations: Missing, + node_public_keys: &BTreeMap::default(), + api_boundary_node_records: &BTreeMap::default(), + node_records: &BTreeMap::default(), + }) + .unwrap(); + + ( + fixture.registry, + app_subnet_id, + engine_subnet_id, + nns_subnet_id, + ) +} + +fn shared_chain_key() -> MasterPublicKeyId { + MasterPublicKeyId::Ecdsa(EcdsaKeyId { + curve: EcdsaCurve::Secp256k1, + name: "shared_key".to_string(), + }) +} + +fn engine_only_chain_key() -> MasterPublicKeyId { + MasterPublicKeyId::Ecdsa(EcdsaKeyId { + curve: EcdsaCurve::Secp256k1, + name: "engine_only_key".to_string(), + }) +} + +/// A non-engine (Application) subnet must not list a CloudEngine subnet as a +/// signing subnet for a chain key, since a chain-key request to it would be +/// rejected at the engine boundary. A key held *only* on an engine is pruned +/// entirely. +#[test] +fn chain_key_enabled_subnets_prune_engine_signers_on_application_subnet() { + with_test_replica_logger(|log| { + let (registry, app_subnet_id, _engine_subnet_id, _nns_subnet_id) = + setup_three_subnet_registry_with_chain_keys(); + + let network_topology = try_to_read_registry(registry, log, app_subnet_id) + .unwrap() + .0; + + // The shared key keeps only the Application subnet; the engine is pruned. + assert_eq!( + network_topology.chain_key_enabled_subnets(&shared_chain_key()), + &[app_subnet_id], + ); + // The engine-only key is pruned to an empty list and thus dropped. + assert!( + network_topology + .chain_key_enabled_subnets(&engine_only_chain_key()) + .is_empty() + ); + }); +} + +/// A CloudEngine subnet may serve chain keys it holds itself (loopback never +/// crosses the boundary), but must not list a non-engine signing subnet, as a +/// chain-key request to it would be rejected at the engine boundary. +#[test] +fn chain_key_enabled_subnets_keep_only_loopback_on_engine_subnet() { + with_test_replica_logger(|log| { + let (registry, _app_subnet_id, engine_subnet_id, _nns_subnet_id) = + setup_three_subnet_registry_with_chain_keys(); + + let network_topology = try_to_read_registry(registry, log, engine_subnet_id) + .unwrap() + .0; + + // The shared key keeps only the engine itself; the Application subnet is pruned. + assert_eq!( + network_topology.chain_key_enabled_subnets(&shared_chain_key()), + &[engine_subnet_id], + ); + // The engine-only key is kept (loopback). + assert_eq!( + network_topology.chain_key_enabled_subnets(&engine_only_chain_key()), + &[engine_subnet_id], + ); }); } diff --git a/rs/messaging/src/routing/stream_builder.rs b/rs/messaging/src/routing/stream_builder.rs index 48c2dfa02d32..f3e24ffa9279 100644 --- a/rs/messaging/src/routing/stream_builder.rs +++ b/rs/messaging/src/routing/stream_builder.rs @@ -10,8 +10,8 @@ use ic_replicated_state::replicated_state::{ }; use ic_replicated_state::{ReplicatedState, Stream}; use ic_types::messages::{ - MAX_INTER_CANISTER_PAYLOAD_IN_BYTES, MAX_REJECT_MESSAGE_LEN_BYTES, Payload, RejectContext, - Request, RequestOrResponse, Response, StreamMessage, + MAX_INTER_CANISTER_PAYLOAD_IN_BYTES, MAX_REJECT_MESSAGE_LEN_BYTES, NO_DEADLINE, Payload, + RejectContext, Request, RequestOrResponse, Response, StreamMessage, }; use ic_types::{CountBytes, SubnetId}; use ic_types_cycles::{CompoundCycles, Cycles}; @@ -71,6 +71,7 @@ const LABEL_VALUE_TYPE_REFUND: &str = "refund"; const LABEL_VALUE_STATUS_SUCCESS: &str = "success"; const LABEL_VALUE_STATUS_CANISTER_NOT_FOUND: &str = "canister_not_found"; const LABEL_VALUE_STATUS_PAYLOAD_TOO_LARGE: &str = "payload_too_large"; +const LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED: &str = "engine_not_allowed"; const CRITICAL_ERROR_INFINITE_LOOP: &str = "mr_stream_builder_infinite_loop"; const CRITICAL_ERROR_PAYLOAD_TOO_LARGE: &str = "mr_stream_builder_payload_too_large"; @@ -423,6 +424,7 @@ impl StreamBuilderImpl { let mut streams = state.take_streams(); let network_topology = state.metadata.network_topology.clone(); + let own_subnet_type = state.metadata.own_subnet_type; // First, have up to `max_stream_messages / 2` refunds in each stream (including // already routed ones) while respecting stream message and byte limits. @@ -439,6 +441,8 @@ impl StreamBuilderImpl { let mut requests_to_reject = Vec::new(); let mut oversized_requests = Vec::new(); + let mut engine_requests_to_reject: Vec> = Vec::new(); + let mut engine_response_dropped_cycles = Cycles::zero(); let mut output_iter = state.output_into_iter(); let mut last_output_size = usize::MAX; @@ -487,9 +491,52 @@ impl StreamBuilderImpl { // We will route (or reject) the message, pop it. let mut msg = validated_next(&mut output_iter, &msg); + let is_engine_dst = !is_loopback_stream + && network_topology + .subnets() + .get(&dst_subnet_id) + .is_some_and(|t| t.subnet_type == SubnetType::CloudEngine); + let is_engine_src = + !is_loopback_stream && own_subnet_type == SubnetType::CloudEngine; + // Reject messages with oversized payloads, as they may // cause streams to permanently stall. match msg { + // Request at an engine boundary: reject if unbounded-wait or carries cycles. + RequestOrResponse::Request(ref req) + if (is_engine_dst || is_engine_src) + && (req.deadline == NO_DEADLINE + || req.payment > Cycles::zero()) => + { + self.observe_message_type_status( + LABEL_VALUE_TYPE_REQUEST, + LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED, + ); + let req = match msg { + RequestOrResponse::Request(req) => req, + _ => unreachable!(), + }; + engine_requests_to_reject.push(req); + } + + // Response that should not exist at an engine boundary: a + // guaranteed-response response, or one carrying cycles. Neither + // guaranteed-response calls nor cycles are allowed across the + // boundary, so such a response can only come from a buggy local + // canister. Drop it; any cycles are lost. Kept above the + // oversized-response arm, which truncates and routes; matching + // here first ensures such a response is dropped, not routed. + RequestOrResponse::Response(ref rep) + if (is_engine_dst || is_engine_src) + && (rep.deadline == NO_DEADLINE || rep.refund > Cycles::zero()) => + { + engine_response_dropped_cycles += rep.refund; + self.observe_message_type_status( + LABEL_VALUE_TYPE_RESPONSE, + LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED, + ); + } + // Remote request above the payload size limit. RequestOrResponse::Request(req) if dst_subnet_id != self.subnet_id @@ -616,6 +663,24 @@ impl StreamBuilderImpl { ); } + for req in engine_requests_to_reject { + self.reject_local_request( + &mut state, + &req, + RejectCode::SysFatal, + "Unbounded-wait calls and calls with cycles are not allowed to CloudEngine subnets" + .to_string(), + ); + } + + if engine_response_dropped_cycles > Cycles::zero() { + let own_cost_schedule = state.get_own_cost_schedule(); + state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + engine_response_dropped_cycles, + own_cost_schedule, + )); + } + // Export the total number of enqueued messages and byte size, per stream. streams .iter() @@ -685,11 +750,27 @@ impl StreamBuilderImpl { ) { let mut cycles_lost = Cycles::zero(); let own_cost_schedule = state.get_own_cost_schedule(); + let own_is_engine = state.metadata.own_subnet_type == SubnetType::CloudEngine; state.take_refunds(|refund| { match network_topology.route(refund.recipient().get()) { Some(dst_subnet_id) => { - let stream = streams.entry(dst_subnet_id).or_default(); let is_loopback_stream = dst_subnet_id == self.subnet_id; + let is_engine_dst = !is_loopback_stream + && network_topology + .subnets() + .get(&dst_subnet_id) + .is_some_and(|t| t.subnet_type == SubnetType::CloudEngine); + let is_engine_src = !is_loopback_stream && own_is_engine; + if is_engine_dst || is_engine_src { + // Refund destined to cross the engine boundary: drop, cycles lost. + cycles_lost += refund.amount(); + self.observe_message_type_status( + LABEL_VALUE_TYPE_REFUND, + LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED, + ); + return true; + } + let stream = streams.entry(dst_subnet_id).or_default(); if is_loopback_stream || (stream.refund_count() < refund_limit && stream.messages().len() < self.max_stream_messages diff --git a/rs/messaging/src/routing/stream_builder/tests.rs b/rs/messaging/src/routing/stream_builder/tests.rs index 25d4922d4678..70ee4a235d1d 100644 --- a/rs/messaging/src/routing/stream_builder/tests.rs +++ b/rs/messaging/src/routing/stream_builder/tests.rs @@ -799,6 +799,260 @@ fn build_streams_with_best_effort_messages() { } } +/// Tests that a guaranteed-response request from a CloudEngine subnet (own subnet) to a +/// non-engine subnet is rejected with a synthetic reject response. +#[test] +fn build_streams_engine_src_rejects_gr_request() { + let local_canister_id = canister_test_id(0); + let remote_canister_id = canister_test_id(1); + with_test_replica_logger(|log| { + let msg = RequestBuilder::new() + .sender(local_canister_id) + .receiver(remote_canister_id) + .sender_reply_callback(CallbackId::from(1)) + .deadline(NO_DEADLINE) + .payment(Cycles::zero()) + .build(); + + let (stream_builder, mut provided_state, metrics_registry) = new_fixture(&log); + + provided_state.metadata.own_subnet_type = SubnetType::CloudEngine; + provided_state.metadata.network_topology.set_subnets(btreemap! { + LOCAL_SUBNET => SubnetTopology { subnet_type: SubnetType::CloudEngine, ..Default::default() }, + REMOTE_SUBNET => SubnetTopology { subnet_type: SubnetType::Application, ..Default::default() }, + }); + provided_state.metadata.network_topology.set_routing_table( + RoutingTable::try_from(btreemap! { + CanisterIdRange { start: local_canister_id, end: local_canister_id } => LOCAL_SUBNET, + CanisterIdRange { start: remote_canister_id, end: remote_canister_id } => REMOTE_SUBNET, + }) + .unwrap(), + ); + + let provided_canister_states = canister_states_with_outputs(vec![msg]); + provided_state.put_canister_states(provided_canister_states); + + let result_state = stream_builder.build_streams(provided_state); + + // No message in REMOTE_SUBNET stream. + assert!( + result_state + .streams() + .get(&REMOTE_SUBNET) + .is_none_or(|s| s.messages().is_empty()) + ); + + // A synthetic reject response was delivered back to the sender. + assert!( + result_state + .canister_state(&local_canister_id) + .unwrap() + .clone() + .pop_input() + .is_some() + ); + + assert_routed_messages_eq( + metric_vec(&[( + &[ + (LABEL_TYPE, LABEL_VALUE_TYPE_REQUEST), + (LABEL_STATUS, LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED), + ], + 1, + )]), + &metrics_registry, + ); + assert_eq_critical_errors(0, 0, &metrics_registry); + }); +} + +/// Tests that a best-effort request with cycles from a CloudEngine subnet (own subnet) to a +/// non-engine subnet is rejected with a synthetic reject response. +#[test] +fn build_streams_engine_src_rejects_cycles_request() { + let local_canister_id = canister_test_id(0); + let remote_canister_id = canister_test_id(1); + with_test_replica_logger(|log| { + let msg = RequestBuilder::new() + .sender(local_canister_id) + .receiver(remote_canister_id) + .sender_reply_callback(CallbackId::from(1)) + .deadline(SOME_DEADLINE) + .payment(Cycles::new(100)) + .build(); + + let (stream_builder, mut provided_state, metrics_registry) = new_fixture(&log); + + provided_state.metadata.own_subnet_type = SubnetType::CloudEngine; + provided_state.metadata.network_topology.set_subnets(btreemap! { + LOCAL_SUBNET => SubnetTopology { subnet_type: SubnetType::CloudEngine, ..Default::default() }, + REMOTE_SUBNET => SubnetTopology { subnet_type: SubnetType::Application, ..Default::default() }, + }); + provided_state.metadata.network_topology.set_routing_table( + RoutingTable::try_from(btreemap! { + CanisterIdRange { start: local_canister_id, end: local_canister_id } => LOCAL_SUBNET, + CanisterIdRange { start: remote_canister_id, end: remote_canister_id } => REMOTE_SUBNET, + }) + .unwrap(), + ); + + let provided_canister_states = canister_states_with_outputs(vec![msg]); + provided_state.put_canister_states(provided_canister_states); + + let result_state = stream_builder.build_streams(provided_state); + + // No message in REMOTE_SUBNET stream. + assert!( + result_state + .streams() + .get(&REMOTE_SUBNET) + .is_none_or(|s| s.messages().is_empty()) + ); + + // A synthetic reject response was delivered back to the sender. + assert!( + result_state + .canister_state(&local_canister_id) + .unwrap() + .clone() + .pop_input() + .is_some() + ); + + assert_routed_messages_eq( + metric_vec(&[( + &[ + (LABEL_TYPE, LABEL_VALUE_TYPE_REQUEST), + (LABEL_STATUS, LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED), + ], + 1, + )]), + &metrics_registry, + ); + assert_eq_critical_errors(0, 0, &metrics_registry); + }); +} + +/// Tests that a response with a cycles refund from a CloudEngine subnet (own subnet) to a +/// non-engine subnet is dropped (no synthetic reject, no stream entry). +#[test] +fn build_streams_engine_src_drops_cycles_response() { + let local_canister_id = canister_test_id(0); + let remote_canister_id = canister_test_id(1); + with_test_replica_logger(|log| { + let response = Arc::new(Response { + originator: remote_canister_id, + respondent: local_canister_id, + originator_reply_callback: CallbackId::from(1), + refund: Cycles::new(100), + response_payload: Payload::Data(vec![]), + deadline: NO_DEADLINE, + }); + + let (stream_builder, mut provided_state, metrics_registry) = new_fixture(&log); + + provided_state.metadata.own_subnet_type = SubnetType::CloudEngine; + provided_state.metadata.network_topology.set_subnets(btreemap! { + LOCAL_SUBNET => SubnetTopology { subnet_type: SubnetType::CloudEngine, ..Default::default() }, + REMOTE_SUBNET => SubnetTopology { subnet_type: SubnetType::Application, ..Default::default() }, + }); + provided_state.metadata.network_topology.set_routing_table( + RoutingTable::try_from(btreemap! { + CanisterIdRange { start: local_canister_id, end: local_canister_id } => LOCAL_SUBNET, + CanisterIdRange { start: remote_canister_id, end: remote_canister_id } => REMOTE_SUBNET, + }) + .unwrap(), + ); + + let provided_canister_states = + canister_states_with_outputs(vec![RequestOrResponse::Response(response)]); + provided_state.put_canister_states(provided_canister_states); + + let result_state = stream_builder.build_streams(provided_state); + + // No message in REMOTE_SUBNET stream (response was dropped). + assert!( + result_state + .streams() + .get(&REMOTE_SUBNET) + .is_none_or(|s| s.messages().is_empty()) + ); + + // No synthetic reject: responses are dropped silently. + let maybe_reject = result_state + .canister_state(&local_canister_id) + .unwrap() + .clone() + .pop_input(); + assert!(maybe_reject.is_none()); + + assert_routed_messages_eq( + metric_vec(&[( + &[ + (LABEL_TYPE, LABEL_VALUE_TYPE_RESPONSE), + (LABEL_STATUS, LABEL_VALUE_STATUS_ENGINE_NOT_ALLOWED), + ], + 1, + )]), + &metrics_registry, + ); + assert_eq_critical_errors(0, 0, &metrics_registry); + }); +} + +/// Tests that refunds destined to cross an engine boundary are dropped, in both +/// directions: +/// * engine → non-engine (own subnet is engine, recipient on a non-engine subnet) +/// * non-engine → engine (own subnet is non-engine, recipient on an engine subnet) +/// +/// In both cases, the refund must not appear in the destination stream. +#[test] +fn build_streams_drops_refunds_at_engine_boundary() { + let local_canister_id = canister_test_id(0); + let remote_canister_id = canister_test_id(1); + + for (own_subnet_type, remote_subnet_type) in [ + (SubnetType::CloudEngine, SubnetType::Application), + (SubnetType::Application, SubnetType::CloudEngine), + ] { + with_test_replica_logger(|log| { + let (stream_builder, mut provided_state, _) = new_fixture(&log); + + provided_state.metadata.own_subnet_type = own_subnet_type; + provided_state + .metadata + .network_topology + .set_subnets(btreemap! { + LOCAL_SUBNET => SubnetTopology { subnet_type: own_subnet_type, ..Default::default() }, + REMOTE_SUBNET => SubnetTopology { subnet_type: remote_subnet_type, ..Default::default() }, + }); + provided_state.metadata.network_topology.set_routing_table( + RoutingTable::try_from(btreemap! { + CanisterIdRange { start: local_canister_id, end: local_canister_id } => LOCAL_SUBNET, + CanisterIdRange { start: remote_canister_id, end: remote_canister_id } => REMOTE_SUBNET, + }) + .unwrap(), + ); + + // Add a refund destined for the canister on the other side of the engine boundary. + provided_state.add_refund(remote_canister_id, Cycles::new(100)); + + let result_state = stream_builder.build_streams(provided_state); + + // The refund must NOT have been routed into the REMOTE_SUBNET stream. + let routed_refunds = result_state + .streams() + .get(&REMOTE_SUBNET) + .map_or(0, |s| s.refund_count()); + assert_eq!( + 0, routed_refunds, + "Refund leaked across engine boundary (own_subnet_type={own_subnet_type:?}, \ + remote_subnet_type={remote_subnet_type:?})", + ); + }); + } +} + /// Given a stream with some (potentially zero) initial refunds and canister /// messages, tests that `build_streams()` respects the various limits when /// routing additional refunds and canister messages: diff --git a/rs/messaging/src/routing/stream_handler.rs b/rs/messaging/src/routing/stream_handler.rs index 6a5e5193d782..dca6cb1ea687 100644 --- a/rs/messaging/src/routing/stream_handler.rs +++ b/rs/messaging/src/routing/stream_handler.rs @@ -1,5 +1,6 @@ use crate::message_routing::{ - CRITICAL_ERROR_INDUCT_RESPONSE_FAILED, LatencyMetrics, MessageRoutingMetrics, + CRITICAL_ERROR_ENGINE_MESSAGE, CRITICAL_ERROR_INDUCT_RESPONSE_FAILED, LatencyMetrics, + MessageRoutingMetrics, }; use ic_base_types::NumBytes; use ic_config::execution_environment::Config as HypervisorConfig; @@ -12,18 +13,19 @@ use ic_interfaces::messaging::{ use ic_logger::{ReplicaLogger, debug, error, info, trace}; use ic_metrics::MetricsRegistry; use ic_metrics::buckets::{add_bucket, decimal_buckets}; +use ic_registry_subnet_type::SubnetType; use ic_replicated_state::metadata_state::{Stream, StreamMap}; use ic_replicated_state::replicated_state::{ LABEL_VALUE_QUEUE_FULL, MR_SYNTHETIC_REJECT_MESSAGE_MAX_LEN, ReplicatedStateMessageRouting, }; use ic_replicated_state::{ReplicatedState, StateError}; use ic_types::messages::{ - MAX_INTER_CANISTER_PAYLOAD_IN_BYTES_U64, MAX_RESPONSE_COUNT_BYTES, Payload, Refund, - RejectContext, Request, RequestOrResponse, Response, StreamMessage, + MAX_INTER_CANISTER_PAYLOAD_IN_BYTES_U64, MAX_RESPONSE_COUNT_BYTES, NO_DEADLINE, Payload, + Refund, RejectContext, Request, RequestOrResponse, Response, StreamMessage, }; use ic_types::xnet::{RejectReason, RejectSignal, StreamIndex, StreamIndexedQueue, StreamSlice}; use ic_types::{CanisterId, SubnetId}; -use ic_types_cycles::CompoundCycles; +use ic_types_cycles::{CompoundCycles, Cycles}; use prometheus::{Histogram, IntCounter, IntCounterVec, IntGaugeVec}; use std::cell::RefCell; use std::collections::{BTreeMap, VecDeque}; @@ -60,6 +62,10 @@ struct StreamHandlerMetrics { /// messages for canisters not hosted (now, or previously, according to /// `canister_migrations`) by this subnet. pub critical_error_receiver_subnet_mismatch: IntCounter, + /// Critical error counter (see [`MetricsRegistry::error_counter`]) tracking + /// messages received from a CloudEngine subnet that carried cycles or were + /// guaranteed-response calls, which are not permitted on non-engine subnets. + pub critical_error_engine_message: IntCounter, } const METRIC_INDUCTED_XNET_MESSAGES: &str = "mr_inducted_xnet_message_count"; @@ -135,6 +141,9 @@ impl StreamHandlerMetrics { metrics_registry.error_counter(CRITICAL_ERROR_SENDER_SUBNET_MISMATCH); let critical_error_receiver_subnet_mismatch = metrics_registry.error_counter(CRITICAL_ERROR_RECEIVER_SUBNET_MISMATCH); + let critical_error_engine_message = message_routing_metrics + .critical_error_engine_message + .clone(); // Initialize all `inducted_xnet_messages` counters with zero, so they are all // exported from process start (`IntCounterVec` is really a map). @@ -171,6 +180,7 @@ impl StreamHandlerMetrics { critical_error_induct_response_failed, critical_error_sender_subnet_mismatch, critical_error_receiver_subnet_mismatch, + critical_error_engine_message, } } } @@ -742,6 +752,30 @@ impl StreamHandlerImpl { available_guaranteed_response_memory: &mut i64, ) { let own_cost_schedule = state.get_own_cost_schedule(); + + // True if the remote subnet is a CloudEngine subnet (and this is not the loopback stream). + // Messages at the engine boundary require special handling: + // * Guaranteed-response requests are rejected with `EngineNotAllowed`. + // * Best-effort requests carrying cycles are dropped; their cycles are accounted as lost. + // * Best-effort requests without cycles are inducted normally. + // * Best-effort responses without cycles are inducted normally. + // * Responses that should not exist (guaranteed-response, or carrying cycles) are + // dropped and a critical error is raised; any cycles are accounted as lost. + // * `Refund` messages are dropped; their cycles are accounted as lost. + // Loopback messages are excluded: the engine subnet is itself CloudEngine, but its + // own-subnet loopback messages should always be inducted normally. + let is_engine_subnet = remote_subnet_id != self.subnet_id + && state + .metadata + .network_topology + .subnets() + .get(&remote_subnet_id) + .is_some_and(|t| t.subnet_type == SubnetType::CloudEngine); + let own_is_engine = state.metadata.own_subnet_type == SubnetType::CloudEngine; + // True when this message is at the engine boundary and is not a loopback message. + let is_at_engine_boundary = + is_engine_subnet || (own_is_engine && remote_subnet_id != self.subnet_id); + let (msg, msg_type) = match msg { StreamMessage::Request(req) => { (RequestOrResponse::Request(req), LABEL_VALUE_TYPE_REQUEST) @@ -750,6 +784,27 @@ impl StreamHandlerImpl { (RequestOrResponse::Response(rep), LABEL_VALUE_TYPE_RESPONSE) } StreamMessage::Refund(refund) => { + // Refunds bypass sender validation; apply the engine filters here. + if is_at_engine_boundary { + error!( + self.log, + "{}: Dropping refund from {}: {:?}", + CRITICAL_ERROR_ENGINE_MESSAGE, + remote_subnet_id, + refund, + ); + self.metrics.critical_error_engine_message.inc(); + self.observe_inducted_message_status( + LABEL_VALUE_TYPE_REFUND, + LABEL_VALUE_DROPPED, + ); + state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + refund.amount(), + own_cost_schedule, + )); + stream.push_accept_signal(); + return; + } return self.induct_refund(&refund, state, stream); } }; @@ -762,6 +817,74 @@ impl StreamHandlerImpl { // on a canister's migration path. (SenderSubnet::Match, _) | (SenderSubnet::OnMigrationPath, RequestOrResponse::Response(_)) => { + // Apply the engine filter before inducting. See `is_at_engine_boundary`. + if is_at_engine_boundary { + match &msg { + // Guaranteed-response requests are never allowed at the engine boundary. + RequestOrResponse::Request(req) if req.deadline == NO_DEADLINE => { + error!( + self.log, + "{}: Rejecting guaranteed-response request at engine boundary (from {}): {:?}", + CRITICAL_ERROR_ENGINE_MESSAGE, + remote_subnet_id, + req, + ); + self.metrics.critical_error_engine_message.inc(); + self.observe_inducted_message_status(msg_type, LABEL_VALUE_DROPPED); + stream.push_reject_signal(RejectReason::EngineNotAllowed); + return; + } + // Best-effort requests with cycles: drop. + RequestOrResponse::Request(req) if req.payment > Cycles::zero() => { + error!( + self.log, + "{}: Dropping best-effort request with cycles at engine boundary (from {}): {:?}", + CRITICAL_ERROR_ENGINE_MESSAGE, + remote_subnet_id, + req, + ); + self.metrics.critical_error_engine_message.inc(); + self.observe_inducted_message_status(msg_type, LABEL_VALUE_DROPPED); + state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + req.payment, + own_cost_schedule, + )); + stream.push_accept_signal(); + return; + } + // BE request with no cycles, or a BE response with no cycles: + // fall through and induct normally. A response that should not + // exist (GR or carrying cycles) is dropped just below. + _ => {} + } + // A response that should not exist at the engine boundary: a + // guaranteed-response response, or one carrying cycles. Neither is + // allowed across the boundary, so this can only come from a buggy or + // malicious peer. Drop it (any cycles are lost) and raise a critical + // error. + if let RequestOrResponse::Response(ref rep) = msg { + let is_gr = rep.deadline == NO_DEADLINE; + let has_cycles = rep.refund > Cycles::zero(); + if is_gr || has_cycles { + error!( + self.log, + "{}: Dropping engine-boundary response (from {}): {:?}", + CRITICAL_ERROR_ENGINE_MESSAGE, + remote_subnet_id, + rep, + ); + self.metrics.critical_error_engine_message.inc(); + self.observe_inducted_message_status(msg_type, LABEL_VALUE_DROPPED); + if has_cycles { + state.observe_lost_cycles_due_to_dropped_messages( + CompoundCycles::new(rep.refund, own_cost_schedule), + ); + } + stream.push_accept_signal(); + return; + } + } + } match self.induct_message_impl( msg, msg_type, @@ -797,6 +920,13 @@ impl StreamHandlerImpl { // Reject requests not originating from their sender's known host // subnet. Their senders are likely manually migrated canisters. + // + // NOTE: We intentionally do NOT relax this for engine-boundary traffic. + // Sender-subnet validation is the only anti-forgery check on responses; a + // malicious engine that forges `respondent` to a canister hosted on a + // different subnet would otherwise be able to inject responses matching + // outstanding callbacks. Failing closed here (drop responses, reject + // requests) is required for response authenticity. (SenderSubnet::Mismatch, RequestOrResponse::Request(_)) => { self.observe_inducted_message_status( msg_type, @@ -1210,6 +1340,10 @@ fn generate_reject_response_for(reason: RejectReason, request: &Request) -> Requ RejectCode::SysFatal, "Inducting request failed due to an unknown error".to_string(), ), + RejectReason::EngineNotAllowed => ( + RejectCode::SysFatal, + "Guaranteed-response calls from CloudEngine subnets are not allowed".to_string(), + ), }; generate_reject_response(request, code, message) } diff --git a/rs/messaging/src/routing/stream_handler/tests.rs b/rs/messaging/src/routing/stream_handler/tests.rs index 97863683f4b0..cb4603b011a2 100644 --- a/rs/messaging/src/routing/stream_handler/tests.rs +++ b/rs/messaging/src/routing/stream_handler/tests.rs @@ -10,7 +10,7 @@ use ic_metrics::MetricsRegistry; use ic_registry_routing_table::{CanisterIdRange, CanisterIdRanges, RoutingTable}; use ic_registry_subnet_type::SubnetType; use ic_replicated_state::{ - CanisterStatus, ReplicatedState, Stream, + CanisterStatus, ReplicatedState, Stream, SubnetTopology, metadata_state::{StreamMap, testing::NetworkTopologyTesting}, replicated_state::LABEL_VALUE_OUT_OF_MEMORY, testing::{ReplicatedStateTesting, StreamTesting, SystemStateTesting}, @@ -2811,6 +2811,69 @@ fn induct_stream_slices_with_refunds() { } } +/// Tests that a refund arriving in a slice across an engine boundary is dropped, +/// its cycles are observed as lost, and a critical error is raised. With subnet +/// types fixed at creation, an honest peer would never produce such a refund — +/// arrival here implies a malicious or buggy sender. +#[test] +fn induct_stream_slices_drops_refund_at_engine_boundary() { + for cost_schedule in [ + CanisterCyclesCostSchedule::Normal, + CanisterCyclesCostSchedule::Free, + ] { + with_test_setup( + btreemap![], + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages: vec![Refund(*LOCAL_CANISTER)], + ..StreamSliceConfig::default() + }], + |stream_handler, mut state, slices, metrics| { + // Mark REMOTE_SUBNET as a CloudEngine. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Expected state: a stream with one accept signal, no induction, cycles lost. + let refund = *refund_in_slice(slices.get(&REMOTE_SUBNET), 0); + let mut expected_state = state.clone(); + let expected_stream = stream_from_config(StreamConfig { + signals_end: 1, + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + expected_state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + refund.amount(), + cost_schedule, + )); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + let inducted_state = stream_handler.induct_stream_slices( + state, + slices, + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_REFUND, + LABEL_VALUE_DROPPED, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts { + engine_message: 1, + ..CriticalErrorCounts::default() + }); + }, + ); + } +} + /// Tests that messages in the loopback stream and incoming slices are inducted /// (with signals added appropriately); and messages present in the initial /// state are garbage collected or rerouted as appropriate. @@ -3229,6 +3292,368 @@ fn process_stream_slices_with_invalid_messages() { ); } +/// Tests that a guaranteed-response request from a CloudEngine subnet, arriving at a +/// non-engine subnet, triggers a critical error and is rejected with a reject signal. +#[test] +fn induct_stream_slices_engine_src_guaranteed_response_request_critical_error() { + with_test_setup( + btreemap![], + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages: vec![Request(*REMOTE_CANISTER, *LOCAL_CANISTER)], + ..StreamSliceConfig::default() + }], + |stream_handler, mut state, slices, metrics| { + // Mark REMOTE_SUBNET as CloudEngine. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Expect a stream with a reject signal (EngineNotAllowed) for the GR request. + let mut expected_state = state.clone(); + let expected_stream = stream_from_config(StreamConfig { + signals_end: 1, + reject_signals: vec![RejectSignal::new(RejectReason::EngineNotAllowed, 0.into())], + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + let inducted_state = stream_handler.induct_stream_slices( + state, + slices, + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_REQUEST, + LABEL_VALUE_DROPPED, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts { + engine_message: 1, + ..CriticalErrorCounts::default() + }); + }, + ); +} + +/// Tests that a best-effort request with no cycles from a CloudEngine subnet, arriving at a +/// non-engine subnet, is inducted successfully without triggering a critical error. +#[test] +fn induct_stream_slices_engine_src_best_effort_request_inducted() { + with_test_setup( + btreemap![], + btreemap![], + |stream_handler, mut state, _, metrics| { + // Mark REMOTE_SUBNET as CloudEngine. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Build a best-effort request (deadline != NO_DEADLINE, payment = 0). + let be_request = RequestBuilder::new() + .sender(*REMOTE_CANISTER) + .receiver(*LOCAL_CANISTER) + .sender_reply_callback(CallbackId::new(1)) + .deadline(CoarseTime::from_secs_since_unix_epoch(123)) + .payment(Cycles::zero()) + .build(); + let slice = stream_slice_from_config(StreamSliceConfig { + messages: vec![be_request.into()], + ..StreamSliceConfig::default() + }); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + stream_handler.induct_stream_slices( + state, + btreemap![REMOTE_SUBNET => slice], + &mut available_guaranteed_response_memory, + ); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_REQUEST, + LABEL_VALUE_SUCCESS, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts::default()); + }, + ); +} + +/// Tests that a best-effort request carrying cycles from a CloudEngine subnet is dropped +/// at the engine boundary: cycles observed as lost, accept signal pushed, and a critical +/// error raised. Mirrors the sender-side test +/// `build_streams_engine_src_rejects_cycles_request` on the receiving side, which is +/// the security-critical filter against a malicious engine. +#[test] +fn induct_stream_slices_engine_src_best_effort_request_with_cycles_dropped() { + with_test_setup( + btreemap![], + btreemap![], + |stream_handler, mut state, _, metrics| { + // Mark REMOTE_SUBNET as CloudEngine. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Build a best-effort request carrying cycles + // (deadline != NO_DEADLINE, payment > 0). + let payment = Cycles::new(1_000); + let be_request = RequestBuilder::new() + .sender(*REMOTE_CANISTER) + .receiver(*LOCAL_CANISTER) + .sender_reply_callback(CallbackId::new(1)) + .deadline(CoarseTime::from_secs_since_unix_epoch(123)) + .payment(payment) + .build(); + let slice = stream_slice_from_config(StreamSliceConfig { + messages: vec![be_request.into()], + ..StreamSliceConfig::default() + }); + + // Expected: an outgoing stream to REMOTE_SUBNET with one accept signal; + // the request's cycles are observed as lost. + let mut expected_state = state.clone(); + let expected_stream = stream_from_config(StreamConfig { + signals_end: 1, + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + expected_state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + payment, + CanisterCyclesCostSchedule::Normal, + )); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + let inducted_state = stream_handler.induct_stream_slices( + state, + btreemap![REMOTE_SUBNET => slice], + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_REQUEST, + LABEL_VALUE_DROPPED, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts { + engine_message: 1, + ..CriticalErrorCounts::default() + }); + }, + ); +} + +/// Tests that a best-effort response with no cycles from a CloudEngine subnet, arriving at a +/// non-engine subnet, is inducted successfully without triggering a critical error. +#[test] +fn induct_stream_slices_engine_src_best_effort_response_inducted() { + // Use a BestEffortResponse in the slice config so that the framework registers a callback + // for LOCAL_CANISTER and creates the necessary input queue reservation. + with_test_setup( + btreemap![], + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages: vec![BestEffortResponse( + *REMOTE_CANISTER, + *LOCAL_CANISTER, + CoarseTime::from_secs_since_unix_epoch(123), + )], + ..StreamSliceConfig::default() + }], + |stream_handler, mut state, _, metrics| { + // Mark REMOTE_SUBNET as CloudEngine. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Build a best-effort response with no cycles using the callback registered by setup. + let be_response = ResponseBuilder::new() + .respondent(*REMOTE_CANISTER) + .originator(*LOCAL_CANISTER) + .originator_reply_callback(CallbackId::new(1)) + .deadline(CoarseTime::from_secs_since_unix_epoch(123)) + .refund(Cycles::zero()) + .build(); + let slice = stream_slice_from_config(StreamSliceConfig { + messages: vec![be_response.into()], + ..StreamSliceConfig::default() + }); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + stream_handler.induct_stream_slices( + state, + btreemap![REMOTE_SUBNET => slice], + &mut available_guaranteed_response_memory, + ); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_RESPONSE, + LABEL_VALUE_SUCCESS, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts::default()); + }, + ); +} + +/// Regression test for response forgery via an engine boundary. +/// +/// A malicious engine subnet sends a slice containing a response whose `respondent` +/// is hosted on a different (honest) subnet, matching an outstanding callback on the +/// victim canister. Without sender-subnet validation as a closed gate, the forged +/// response would be inducted, satisfying the callback with attacker-controlled data +/// before the genuine response from the real respondent could arrive. +/// +/// Expected: sender-subnet validation must fail closed even at the engine boundary; +/// the forged response is dropped, cycles are accounted for as lost, and the +/// `sender_subnet_mismatch` critical error is raised. +#[test] +fn induct_stream_slices_engine_boundary_drops_forged_response() { + with_test_setup( + btreemap![], + // Malicious engine sends a response forging `OTHER_LOCAL_CANISTER` as the + // respondent — that canister is hosted on LOCAL_SUBNET, not REMOTE_SUBNET. + // The originator is LOCAL_CANISTER (victim); the framework registers a + // matching callback so the response would be inducted if validation were + // bypassed. + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages: vec![Response(*OTHER_LOCAL_CANISTER, *LOCAL_CANISTER)], + ..StreamSliceConfig::default() + }], + |stream_handler, mut state, slices, metrics| { + // Mark REMOTE_SUBNET as a CloudEngine to put us at the engine boundary. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Expected state: response dropped (no induction), accept signal pushed, + // cycles attached to the forged response observed as lost. + let forged = response_in_slice(slices.get(&REMOTE_SUBNET), 0).clone(); + let mut expected_state = state.clone(); + let expected_stream = stream_from_config(StreamConfig { + signals_end: 1, + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + expected_state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + forged.refund, + CanisterCyclesCostSchedule::Normal, + )); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + let inducted_state = stream_handler.induct_stream_slices( + state, + slices, + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_RESPONSE, + LABEL_VALUE_SENDER_SUBNET_MISMATCH, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts { + sender_subnet_mismatch: 1, + ..CriticalErrorCounts::default() + }); + }, + ); +} + +/// Tests that a response which should not exist at the engine boundary — here a +/// guaranteed-response response carrying cycles, from a CloudEngine subnet — is +/// dropped (not inducted), its cycles are observed as lost, an accept signal is +/// pushed, and the `engine_message` critical error is raised. The sender subnet +/// matches, so the response reaches the engine filter rather than the +/// sender-subnet-mismatch path. +#[test] +fn induct_stream_slices_engine_boundary_drops_response_that_should_not_exist() { + with_test_setup( + btreemap![], + // A guaranteed-response response (with cycles) whose respondent is genuinely + // hosted on REMOTE_SUBNET, so sender-subnet validation matches. + btreemap![REMOTE_SUBNET => StreamSliceConfig { + messages: vec![Response(*REMOTE_CANISTER, *LOCAL_CANISTER)], + ..StreamSliceConfig::default() + }], + |stream_handler, mut state, slices, metrics| { + // Mark REMOTE_SUBNET as a CloudEngine to put us at the engine boundary. + state.metadata.network_topology.subnets_mut().insert( + REMOTE_SUBNET, + SubnetTopology { + subnet_type: SubnetType::CloudEngine, + ..Default::default() + }, + ); + + // Expected state: response dropped (no induction), accept signal pushed, + // cycles attached to the dropped response observed as lost. + let dropped = response_in_slice(slices.get(&REMOTE_SUBNET), 0).clone(); + let mut expected_state = state.clone(); + let expected_stream = stream_from_config(StreamConfig { + signals_end: 1, + ..StreamConfig::default() + }); + expected_state.with_streams(btreemap![REMOTE_SUBNET => expected_stream]); + expected_state.observe_lost_cycles_due_to_dropped_messages(CompoundCycles::new( + dropped.refund, + CanisterCyclesCostSchedule::Normal, + )); + + let mut available_guaranteed_response_memory = + stream_handler.available_guaranteed_response_memory(&state); + let inducted_state = stream_handler.induct_stream_slices( + state, + slices, + &mut available_guaranteed_response_memory, + ); + + assert_eq!(expected_state, inducted_state); + + metrics.assert_inducted_xnet_messages_eq(&[( + LABEL_VALUE_TYPE_RESPONSE, + LABEL_VALUE_DROPPED, + 1, + )]); + metrics.assert_eq_critical_errors(CriticalErrorCounts { + engine_message: 1, + ..CriticalErrorCounts::default() + }); + }, + ); +} + /// Generates a test setup. For details see `with_test_setup_and_config()`. fn with_test_setup( stream_configs: BTreeMap>>, @@ -3835,7 +4260,11 @@ impl MetricsFixture { &CRITICAL_ERROR_RECEIVER_SUBNET_MISMATCH.to_string() )], counts.receiver_subnet_mismatch - ) + ), + ( + &[("error", &CRITICAL_ERROR_ENGINE_MESSAGE.to_string())], + counts.engine_message + ), ])), nonzero_values(fetch_int_counter_vec(&self.registry, "critical_errors")) ); @@ -3848,6 +4277,7 @@ struct CriticalErrorCounts { pub bad_reject_signal_for_response: u64, pub sender_subnet_mismatch: u64, pub receiver_subnet_mismatch: u64, + pub engine_message: u64, } /// Populates the given `state`'s canister migrations with a single entry, diff --git a/rs/protobuf/def/state/queues/v1/queues.proto b/rs/protobuf/def/state/queues/v1/queues.proto index c870b1333a54..2454f2ecabf3 100644 --- a/rs/protobuf/def/state/queues/v1/queues.proto +++ b/rs/protobuf/def/state/queues/v1/queues.proto @@ -32,6 +32,7 @@ enum RejectReason { REJECT_REASON_QUEUE_FULL = 5; REJECT_REASON_OUT_OF_MEMORY = 6; REJECT_REASON_UNKNOWN = 7; + REJECT_REASON_ENGINE_NOT_ALLOWED = 8; } message RejectSignal { diff --git a/rs/protobuf/src/gen/state/state.queues.v1.rs b/rs/protobuf/src/gen/state/state.queues.v1.rs index cfd87b780904..c1797fc65b8d 100644 --- a/rs/protobuf/src/gen/state/state.queues.v1.rs +++ b/rs/protobuf/src/gen/state/state.queues.v1.rs @@ -300,6 +300,7 @@ pub enum RejectReason { QueueFull = 5, OutOfMemory = 6, Unknown = 7, + EngineNotAllowed = 8, } impl RejectReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -316,6 +317,7 @@ impl RejectReason { Self::QueueFull => "REJECT_REASON_QUEUE_FULL", Self::OutOfMemory => "REJECT_REASON_OUT_OF_MEMORY", Self::Unknown => "REJECT_REASON_UNKNOWN", + Self::EngineNotAllowed => "REJECT_REASON_ENGINE_NOT_ALLOWED", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -329,6 +331,7 @@ impl RejectReason { "REJECT_REASON_QUEUE_FULL" => Some(Self::QueueFull), "REJECT_REASON_OUT_OF_MEMORY" => Some(Self::OutOfMemory), "REJECT_REASON_UNKNOWN" => Some(Self::Unknown), + "REJECT_REASON_ENGINE_NOT_ALLOWED" => Some(Self::EngineNotAllowed), _ => None, } } diff --git a/rs/protobuf/src/gen/types/state.queues.v1.rs b/rs/protobuf/src/gen/types/state.queues.v1.rs index 08001487edfe..dab2108a24e5 100644 --- a/rs/protobuf/src/gen/types/state.queues.v1.rs +++ b/rs/protobuf/src/gen/types/state.queues.v1.rs @@ -300,6 +300,7 @@ pub enum RejectReason { QueueFull = 5, OutOfMemory = 6, Unknown = 7, + EngineNotAllowed = 8, } impl RejectReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -316,6 +317,7 @@ impl RejectReason { Self::QueueFull => "REJECT_REASON_QUEUE_FULL", Self::OutOfMemory => "REJECT_REASON_OUT_OF_MEMORY", Self::Unknown => "REJECT_REASON_UNKNOWN", + Self::EngineNotAllowed => "REJECT_REASON_ENGINE_NOT_ALLOWED", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -329,6 +331,7 @@ impl RejectReason { "REJECT_REASON_QUEUE_FULL" => Some(Self::QueueFull), "REJECT_REASON_OUT_OF_MEMORY" => Some(Self::OutOfMemory), "REJECT_REASON_UNKNOWN" => Some(Self::Unknown), + "REJECT_REASON_ENGINE_NOT_ALLOWED" => Some(Self::EngineNotAllowed), _ => None, } } diff --git a/rs/replicated_state/src/metadata_state/tests.rs b/rs/replicated_state/src/metadata_state/tests.rs index 35a403c0d15d..6407dbca03e1 100644 --- a/rs/replicated_state/src/metadata_state/tests.rs +++ b/rs/replicated_state/src/metadata_state/tests.rs @@ -2287,7 +2287,7 @@ fn compatibility_for_reject_reason() { RejectReason::iter() .map(|reason| reason as i32) .collect::>(), - [1, 2, 3, 4, 5, 6, 7] + [1, 2, 3, 4, 5, 6, 7, 8] ); } diff --git a/rs/test_utilities/src/state_manager.rs b/rs/test_utilities/src/state_manager.rs index 5d4602b20eef..026893dcb60a 100644 --- a/rs/test_utilities/src/state_manager.rs +++ b/rs/test_utilities/src/state_manager.rs @@ -460,6 +460,7 @@ pub enum SerializableRejectReason { QueueFull = 5, OutOfMemory = 6, Unknown = 7, + EngineNotAllowed = 8, } impl From<&RejectReason> for SerializableRejectReason { @@ -472,6 +473,7 @@ impl From<&RejectReason> for SerializableRejectReason { RejectReason::QueueFull => Self::QueueFull, RejectReason::OutOfMemory => Self::OutOfMemory, RejectReason::Unknown => Self::Unknown, + RejectReason::EngineNotAllowed => Self::EngineNotAllowed, } } } @@ -486,6 +488,7 @@ impl From for RejectReason { SerializableRejectReason::QueueFull => RejectReason::QueueFull, SerializableRejectReason::OutOfMemory => RejectReason::OutOfMemory, SerializableRejectReason::Unknown => RejectReason::Unknown, + SerializableRejectReason::EngineNotAllowed => RejectReason::EngineNotAllowed, } } } diff --git a/rs/tests/message_routing/xnet/xnet_cloud_engine_isolation_test.rs b/rs/tests/message_routing/xnet/xnet_cloud_engine_isolation_test.rs index c35f92157505..0bb93c511fa9 100644 --- a/rs/tests/message_routing/xnet/xnet_cloud_engine_isolation_test.rs +++ b/rs/tests/message_routing/xnet/xnet_cloud_engine_isolation_test.rs @@ -1,10 +1,11 @@ /* tag::catalog[] -Title:: CloudEngine subnets are isolated from XNet traffic. +Title:: CloudEngine subnets enforce XNet filtering. -Goal:: Verify that a CloudEngine subnet cannot exchange XNet messages with -other subnets (including other CloudEngine subnets), while intra-subnet -(loopback) calls still work. Additionally verify that the state tree only -exposes the expected subnets via /subnet and /canister_ranges. +Goal:: Verify that a CloudEngine subnet only accepts best-effort, +no-cycles XNet messages (and rejects guaranteed-response calls or calls +carrying cycles), while intra-subnet (loopback) calls always work. +Additionally verify that the state tree exposes the full topology to all +subnets via /subnet and /canister_ranges. Runbook:: 0. Set up an IC with one System (NNS) subnet, one Application subnet, and @@ -15,19 +16,20 @@ Runbook:: other (intra-subnet). 4. Verify that XNet calls between NNS and Application subnets succeed (both directions). -5. Verify that XNet calls involving any CloudEngine subnet are rejected: - Application <-> CloudEngine, NNS <-> CloudEngine, and - CloudEngine 1 <-> CloudEngine 2. +5. For every pair crossing an engine boundary (Application <-> CloudEngine, + NNS <-> CloudEngine, CloudEngine 1 <-> CloudEngine 2): + a. Guaranteed-response (unbounded-wait) calls are rejected with SysFatal. + b. Best-effort (bounded-wait), no-cycles calls succeed. + c. Best-effort calls carrying cycles are rejected with SysFatal. 6. Via read_state, verify that /subnet//public_key and - /canister_ranges/ are present only for the expected subnets: - - NNS sees all four subnets (full topology). - - Application sees NNS and itself. - - Each CloudEngine sees only itself. + /canister_ranges/ are present for all subnets on every subnet + (full topology). Success:: All assertions pass: loopback works, non-CloudEngine XNet works, -all cross-subnet directions involving CloudEngine are rejected, and -the state tree matches the expected visibility. +guaranteed-response cross-engine calls are rejected, best-effort +no-cycles cross-engine calls succeed, and the state tree shows the +full topology on every subnet. end::catalog[] */ @@ -290,12 +292,19 @@ fn test(env: TestEnv) { ); info!(logger, "Application -> NNS succeeded."); - // ── XNet calls that should be rejected (CloudEngine involved) ─── - - // All cross-subnet calls involving a CloudEngine subnet must fail - // with DestinationInvalid. The on_reject handler replies with the - // reject code so we can verify the exact code. - let rejected_pairs: &[(&str, &UniversalCanister<'_>, &str, &UniversalCanister<'_>)] = &[ + // ── XNet calls crossing an engine boundary ────────────────────── + // + // For every pair crossing an engine boundary: + // - Guaranteed-response (unbounded-wait) calls must be rejected + // with SysFatal. + // - Best-effort (bounded-wait), no-cycles calls must succeed. + // - Best-effort calls carrying cycles must be rejected with SysFatal. + let engine_boundary_pairs: &[( + &str, + &UniversalCanister<'_>, + &str, + &UniversalCanister<'_>, + )] = &[ ("Application", &uc_app, "CloudEngine 1", &uc_ce_1a), ("CloudEngine 1", &uc_ce_1a, "Application", &uc_app), ("NNS", &uc_nns, "CloudEngine 1", &uc_ce_1a), @@ -306,10 +315,10 @@ fn test(env: TestEnv) { ("CloudEngine 2", &uc_ce_2a, "CloudEngine 1", &uc_ce_1a), ]; - for (src_name, src_uc, dst_name, dst_uc) in rejected_pairs { + for (src_name, src_uc, dst_name, dst_uc) in engine_boundary_pairs { info!( logger, - "Testing XNet call: {src_name} -> {dst_name} (should fail)..." + "Testing XNet call: {src_name} -> {dst_name} (guaranteed-response, should fail)..." ); let res = src_uc .update( @@ -321,8 +330,58 @@ fn test(env: TestEnv) { ), ) .await; - assert_xnet_rejected(res, RejectCode::DestinationInvalid); - info!(logger, "{src_name} -> {dst_name} correctly rejected."); + assert_xnet_rejected(res, RejectCode::SysFatal); + info!( + logger, + "{src_name} -> {dst_name} guaranteed-response correctly rejected." + ); + + info!( + logger, + "Testing XNet call: {src_name} -> {dst_name} (best-effort, should succeed)..." + ); + let res = src_uc + .update( + wasm().call_simple_with_cycles_and_best_effort_response( + dst_uc.canister_id(), + "update", + call_args() + .other_side(wasm().reply_data(&data)) + .on_reject(wasm().reject_message().reject()), + 0_u128, + 30_u32, + ), + ) + .await; + assert_eq!( + res.unwrap(), + data, + "{src_name} -> {dst_name} best-effort should succeed" + ); + info!(logger, "{src_name} -> {dst_name} best-effort succeeded."); + + info!( + logger, + "Testing XNet call: {src_name} -> {dst_name} (best-effort with cycles, should fail)..." + ); + let res = src_uc + .update( + wasm().call_simple_with_cycles_and_best_effort_response( + dst_uc.canister_id(), + "update", + call_args() + .other_side(wasm().reply_data(&data)) + .on_reject(wasm().reject_code().reply_int()), + 1_u128, + 30_u32, + ), + ) + .await; + assert_xnet_rejected(res, RejectCode::SysFatal); + info!( + logger, + "{src_name} -> {dst_name} best-effort with cycles correctly rejected." + ); } // ── Verify /subnet and /canister_ranges via read_state ────────── @@ -331,9 +390,7 @@ fn test(env: TestEnv) { // /canister_ranges/ for every known subnet ID and verify that // the expected ones are present and the rest absent. // - // NNS sees all subnets (full topology for the state tree). - // Application sees NNS and itself. - // Each CloudEngine sees only itself. + // All subnets see the full topology (all four subnets). info!( logger, "Verifying /subnet and /canister_ranges via read_state..." @@ -362,19 +419,19 @@ fn test(env: TestEnv) { "Application", &app_agent, app_subnet_id, - vec![nns_subnet_id, app_subnet_id], + vec![nns_subnet_id, app_subnet_id, ce_subnet_id_1, ce_subnet_id_2], ), ( "CloudEngine 1", &ce_agent_1, ce_subnet_id_1, - vec![ce_subnet_id_1], + vec![nns_subnet_id, app_subnet_id, ce_subnet_id_1, ce_subnet_id_2], ), ( "CloudEngine 2", &ce_agent_2, ce_subnet_id_2, - vec![ce_subnet_id_2], + vec![nns_subnet_id, app_subnet_id, ce_subnet_id_1, ce_subnet_id_2], ), ] { info!(logger, "Checking /subnet and /canister_ranges on {name}..."); diff --git a/rs/types/types/src/xnet.rs b/rs/types/types/src/xnet.rs index 30393a46e404..1c308e0b686d 100644 --- a/rs/types/types/src/xnet.rs +++ b/rs/types/types/src/xnet.rs @@ -211,6 +211,10 @@ pub enum RejectReason { /// `StateError` variants that shouldn't be possible to occur for requests. /// It is not expected that this reason will ever be used. Unknown = 7, + + /// Request rejected because the sending subnet is a CloudEngine subnet, + /// which is not allowed to send guaranteed-response calls to non-engine subnets. + EngineNotAllowed = 8, } impl RejectReason { @@ -231,6 +235,7 @@ impl From for pb_queues::RejectReason { RejectReason::QueueFull => Self::QueueFull, RejectReason::OutOfMemory => Self::OutOfMemory, RejectReason::Unknown => Self::Unknown, + RejectReason::EngineNotAllowed => Self::EngineNotAllowed, } } } @@ -250,6 +255,7 @@ impl TryFrom for RejectReason { pb_queues::RejectReason::QueueFull => Ok(Self::QueueFull), pb_queues::RejectReason::OutOfMemory => Ok(Self::OutOfMemory), pb_queues::RejectReason::Unknown => Ok(Self::Unknown), + pb_queues::RejectReason::EngineNotAllowed => Ok(Self::EngineNotAllowed), } } } diff --git a/rs/xnet/payload_builder/src/impl_tests.rs b/rs/xnet/payload_builder/src/impl_tests.rs index ee430598ace2..dc1136f8781b 100644 --- a/rs/xnet/payload_builder/src/impl_tests.rs +++ b/rs/xnet/payload_builder/src/impl_tests.rs @@ -88,7 +88,7 @@ async fn expected_stream_indices() { /// `expected_stream_indices` should not include `SUBNET_6` /// (registered as `CloudEngine` in `get_simple_registry_for_test`). #[tokio::test] -async fn expected_stream_indices_excludes_engine_subnets() { +async fn expected_stream_indices_includes_engine_subnets() { with_test_replica_logger(|log| { let state_manager = FakeStateManager::new(); let (payloads, _expected_indices) = get_xnet_state_for_testing(&state_manager); @@ -126,13 +126,13 @@ async fn expected_stream_indices_excludes_engine_subnets() { ) .unwrap(); - // SUBNET_6 is a CloudEngine and must be excluded. + // SUBNET_6 is a CloudEngine and must now be included. assert!( - !computed_indices.contains_key(&SUBNET_6), - "CloudEngine subnet SUBNET_6 should be excluded from expected stream indices" + computed_indices.contains_key(&SUBNET_6), + "CloudEngine subnet SUBNET_6 should be included in expected stream indices" ); - // Application subnets should still be present. + // Application subnets should also be present. for subnet in &[SUBNET_1, SUBNET_2, SUBNET_3, SUBNET_4] { assert!( computed_indices.contains_key(subnet), diff --git a/rs/xnet/payload_builder/src/lib.rs b/rs/xnet/payload_builder/src/lib.rs index 7a97c91aabc7..f7f31418d88b 100644 --- a/rs/xnet/payload_builder/src/lib.rs +++ b/rs/xnet/payload_builder/src/lib.rs @@ -489,16 +489,7 @@ impl XNetPayloadBuilderImpl { .map_err(Error::RegistryGetSubnetsFailed)? .unwrap_or_default(); - let subnet_ids: Vec<_> = all_subnet_ids - .into_iter() - .filter(|subnet_id| { - self.registry - .get_subnet_type(*subnet_id, validation_context.registry_version) - .is_ok_and(|maybe_t| { - maybe_t.is_some_and(|t| t != SubnetType::CloudEngine.into()) - }) - }) - .collect(); + let subnet_ids: Vec<_> = all_subnet_ids.into_iter().collect(); let expected_indices = subnet_ids .into_iter() @@ -641,18 +632,11 @@ impl XNetPayloadBuilderImpl { ); } - // Do not accept slices from CloudEngine subnets or subnets whose type - // cannot be determined. + // Do not accept slices from subnets whose type cannot be determined. match self .registry .get_subnet_type(subnet_id, validation_context.registry_version) { - Ok(Some(subnet_type)) if subnet_type == SubnetType::CloudEngine.into() => { - return SliceValidationResult::Invalid(format!( - "Slice from CloudEngine subnet {}", - subnet_id - )); - } Ok(Some(_)) => {} Ok(None) => { return SliceValidationResult::Invalid(format!( @@ -870,11 +854,6 @@ impl XNetPayloadBuilderImpl { })? .take(); - // CloudEngine subnets do not participate in XNet. - if state.metadata.own_subnet_type == SubnetType::CloudEngine { - return Ok((XNetPayload::default(), 0.into())); - } - // Build the payload based on indices computed from state + past payloads. let stream_positions = self.expected_stream_indices(validation_context, &state, past_payloads)?; @@ -1235,16 +1214,6 @@ impl XNetPayloadBuilder for XNetPayloadBuilderImpl { } }; - if state.metadata.own_subnet_type == SubnetType::CloudEngine - && !payload.stream_slices.is_empty() - { - return Err(ValidationError::InvalidArtifact( - InvalidXNetPayload::InvalidSlice( - "CloudEngine subnets do not accept XNet payloads".to_string(), - ), - )); - } - // For every slice in `payload`, check certification and gaps/duplicates. let mut new_stream_positions = Vec::new(); let mut payload_byte_size = 0; diff --git a/rs/xnet/payload_builder/src/tests.rs b/rs/xnet/payload_builder/src/tests.rs index 7ea7d010049c..033a518d8d45 100644 --- a/rs/xnet/payload_builder/src/tests.rs +++ b/rs/xnet/payload_builder/src/tests.rs @@ -522,31 +522,52 @@ impl PayloadBuilderTestFixture { } } -/// CloudEngine subnets must produce an empty XNet payload. +/// CloudEngine subnets now participate in XNet. At the unit level this checks +/// that `expected_stream_indices` returns entries for an engine's Application +/// peers (no filter excluding them), so the engine will attempt to pull from +/// them. The end-to-end behavior — that a CloudEngine subnet actually builds and +/// validates XNet payloads and completes a best-effort cross-subnet call (which +/// requires `get_xnet_payload`/`validate_xnet_payload` to no longer short-circuit +/// for engines) — is covered by `xnet_cloud_engine_isolation_test`. #[tokio::test] -async fn cloud_engine_get_xnet_payload_returns_empty() { +async fn cloud_engine_get_xnet_payload_participates() { with_test_replica_logger(|log| { let fixture = PayloadBuilderTestFixture::with_xnet_state_and_subnet_types( - 0, + 4, btreemap![], Some(SubnetType::CloudEngine), ); let xnet_payload_builder = fixture.new_xnet_payload_builder_impl(log); - let (payload, byte_size) = xnet_payload_builder.get_xnet_payload( - &fixture.validation_context, - &[], - PAYLOAD_BYTES_LIMIT, - ); + let state = fixture + .state_manager + .get_state_at(fixture.validation_context.certified_height) + .unwrap() + .take(); + let past_payloads = fixture.past_payloads(); + let stream_positions = xnet_payload_builder + .expected_stream_indices( + &fixture.validation_context, + state.as_ref(), + past_payloads.as_slice(), + ) + .unwrap(); - assert!(payload.stream_slices.is_empty()); - assert_eq!(byte_size, NumBytes::from(0)); + // The engine produces non-empty stream positions — it will try to pull + // from peers (Application subnets SUBNET_1..SUBNET_4 are all present). + assert!(!stream_positions.is_empty()); + for subnet in &[SUBNET_1, SUBNET_2, SUBNET_3, SUBNET_4] { + assert!( + stream_positions.contains_key(subnet), + "CloudEngine should pull from Application subnet {subnet:?}" + ); + } }); } -/// CloudEngine subnets must reject non-empty XNet payloads during validation. +/// CloudEngine subnets must accept non-empty XNet payloads during validation. #[tokio::test] -async fn cloud_engine_validate_rejects_non_empty_payload() { +async fn cloud_engine_validate_accepts_non_empty_payload() { with_test_replica_logger(|log| { let fixture = PayloadBuilderTestFixture::with_xnet_state_and_subnet_types( 0, @@ -555,48 +576,30 @@ async fn cloud_engine_validate_rejects_non_empty_payload() { ); let xnet_payload_builder = fixture.new_xnet_payload_builder_impl(log); - // A non-empty payload with a slice from SUBNET_1. - let slice = make_certified_stream_slice( - SUBNET_1, - StreamConfig { - message_begin: 0, - message_end: 2, - signal_end: 0, - }, - ); - let payload = XNetPayload { - stream_slices: btreemap![SUBNET_1 => slice], - }; - - assert_matches!( - xnet_payload_builder.validate_xnet_payload( - &payload, - &fixture.validation_context, - &[], - ), - Err(ValidationError::InvalidArtifact( - InvalidXNetPayload::InvalidSlice(msg) - )) if msg.contains("CloudEngine") + // Use a pre-built payload from the fixture — indices are consistent with + // the state, so validation passes for any subnet type including CloudEngine. + let payload = fixture.payloads.last().unwrap(); + assert!( + !payload.stream_slices.is_empty(), + "fixture payload must be non-empty for this test to be meaningful" ); - // An empty payload should still be accepted. - let empty_payload = XNetPayload::default(); - assert_eq!( - NumBytes::from(0), + assert!( xnet_payload_builder - .validate_xnet_payload(&empty_payload, &fixture.validation_context, &[]) - .unwrap() + .validate_xnet_payload(payload, &fixture.validation_context, &[]) + .is_ok(), + "CloudEngine subnet should accept non-empty XNet payloads" ); }); } -/// Non-CloudEngine subnets must reject slices originating from a CloudEngine subnet. +/// Non-CloudEngine subnets must accept slices originating from a CloudEngine subnet. #[tokio::test] -async fn validate_rejects_slice_from_cloud_engine_subnet() { +async fn validate_accepts_slice_from_cloud_engine_subnet() { with_test_replica_logger(|log| { let cloud_engine_subnet = SUBNET_1; let fixture = PayloadBuilderTestFixture::with_xnet_state_and_subnet_types( - 1, + 0, btreemap![cloud_engine_subnet => SubnetType::CloudEngine], None, ); @@ -604,28 +607,19 @@ async fn validate_rejects_slice_from_cloud_engine_subnet() { // This is an Application subnet validating an incoming payload. let xnet_payload_builder = fixture.new_xnet_payload_builder_impl(log); - // A payload containing a slice from the CloudEngine subnet. - let slice = make_certified_stream_slice( - cloud_engine_subnet, - StreamConfig { - message_begin: 0, - message_end: 2, - signal_end: 0, - }, + // Use a pre-built payload from the fixture. SUBNET_1 is the CloudEngine + // subnet; a slice from it must now be accepted by a non-engine subnet. + let payload = fixture.payloads.last().unwrap(); + assert!( + payload.stream_slices.contains_key(&cloud_engine_subnet), + "fixture payload must contain a slice from the CloudEngine subnet" ); - let payload = XNetPayload { - stream_slices: btreemap![cloud_engine_subnet => slice], - }; - assert_matches!( - xnet_payload_builder.validate_xnet_payload( - &payload, - &fixture.validation_context, - &[], - ), - Err(ValidationError::InvalidArtifact( - InvalidXNetPayload::InvalidSlice(msg) - )) if msg.contains("CloudEngine") + assert!( + xnet_payload_builder + .validate_xnet_payload(payload, &fixture.validation_context, &[]) + .is_ok(), + "Application subnet should accept slices from a CloudEngine subnet" ); }); } @@ -665,9 +659,9 @@ async fn validate_rejects_slice_from_unknown_subnet() { }); } -/// An Application subnet with a registered CloudEngine peer must not pull from it. +/// An Application subnet with a registered CloudEngine peer must pull from it. #[tokio::test] -async fn build_payload_skips_cloud_engine_subnet() { +async fn build_payload_includes_cloud_engine_subnet() { with_test_replica_logger(|log| { // Register 2 subnets: SUBNET_1 as CloudEngine, SUBNET_2 as Application. let fixture = PayloadBuilderTestFixture::with_xnet_state_and_subnet_types( @@ -692,8 +686,8 @@ async fn build_payload_skips_cloud_engine_subnet() { ) .unwrap(); - // SUBNET_2 (Application) is included, SUBNET_1 (CloudEngine) is not. + // Both SUBNET_1 (CloudEngine) and SUBNET_2 (Application) are included. + assert!(stream_positions.contains_key(&SUBNET_1)); assert!(stream_positions.contains_key(&SUBNET_2)); - assert!(!stream_positions.contains_key(&SUBNET_1)); }); } From 388214d6b1acbdf74efe2261e5980d6a5e0be4b8 Mon Sep 17 00:00:00 2001 From: Stefan Schneider <31004026+schneiderstefan@users.noreply.github.com> Date: Mon, 8 Jun 2026 14:15:21 +0200 Subject: [PATCH 2/4] Update rs/messaging/src/message_routing.rs Co-authored-by: Alin Sinpalean <58422065+alin-at-dfinity@users.noreply.github.com> --- rs/messaging/src/message_routing.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rs/messaging/src/message_routing.rs b/rs/messaging/src/message_routing.rs index 42adbe00d77c..104cd927037d 100644 --- a/rs/messaging/src/message_routing.rs +++ b/rs/messaging/src/message_routing.rs @@ -1104,7 +1104,9 @@ impl BatchProcessorImpl { // unset (in which case `SetupInitialDKG` requests fall back to the // calling subnet), or the configured subnet may not be visible from // this subnet (e.g. if it has been filtered out above), in which case - // we also fall back to the calling subnet. +// Look up the default subnet for `SetupInitialDKG`. The key may be +// unset, in which case `SetupInitialDKG` requests fall back to the +// calling subnet. let default_initial_dkg_subnet_id = self .registry .get_default_initial_dkg_subnet_id(registry_version) From 0309536f3322c5628ced1b8d1c47898f8b316342 Mon Sep 17 00:00:00 2001 From: IDX GitHub Automation Date: Mon, 8 Jun 2026 12:18:55 +0000 Subject: [PATCH 3/4] Automatically fixing code for linting and formatting issues --- rs/messaging/src/message_routing.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rs/messaging/src/message_routing.rs b/rs/messaging/src/message_routing.rs index 104cd927037d..00fa4a18d5a0 100644 --- a/rs/messaging/src/message_routing.rs +++ b/rs/messaging/src/message_routing.rs @@ -1104,9 +1104,9 @@ impl BatchProcessorImpl { // unset (in which case `SetupInitialDKG` requests fall back to the // calling subnet), or the configured subnet may not be visible from // this subnet (e.g. if it has been filtered out above), in which case -// Look up the default subnet for `SetupInitialDKG`. The key may be -// unset, in which case `SetupInitialDKG` requests fall back to the -// calling subnet. + // Look up the default subnet for `SetupInitialDKG`. The key may be + // unset, in which case `SetupInitialDKG` requests fall back to the + // calling subnet. let default_initial_dkg_subnet_id = self .registry .get_default_initial_dkg_subnet_id(registry_version) From fd8d5cf9f66574c683d488fd6323e8398488c35d Mon Sep 17 00:00:00 2001 From: Stefan Schneider Date: Mon, 8 Jun 2026 12:20:20 +0000 Subject: [PATCH 4/4] fix comment manually --- rs/messaging/src/message_routing.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/rs/messaging/src/message_routing.rs b/rs/messaging/src/message_routing.rs index 00fa4a18d5a0..abbacfe1a0bb 100644 --- a/rs/messaging/src/message_routing.rs +++ b/rs/messaging/src/message_routing.rs @@ -1100,10 +1100,6 @@ impl BatchProcessorImpl { .map_err(|err| registry_error("NNS subnet ID", None, err))? .ok_or_else(|| not_found_error("NNS subnet ID", None))?; - // Look up the default subnet for `SetupInitialDKG`. The key may be - // unset (in which case `SetupInitialDKG` requests fall back to the - // calling subnet), or the configured subnet may not be visible from - // this subnet (e.g. if it has been filtered out above), in which case // Look up the default subnet for `SetupInitialDKG`. The key may be // unset, in which case `SetupInitialDKG` requests fall back to the // calling subnet.