diff --git a/rs/execution_environment/src/lib.rs b/rs/execution_environment/src/lib.rs index 32291d777da9..2cf4cebc7480 100644 --- a/rs/execution_environment/src/lib.rs +++ b/rs/execution_environment/src/lib.rs @@ -39,7 +39,7 @@ use ic_interfaces::execution_environment::{ use ic_interfaces_state_manager::StateReader; use ic_logger::ReplicaLogger; use ic_metrics::MetricsRegistry; -use ic_query_stats::QueryStatsPayloadBuilderParams; +use ic_query_stats::{QueryStatsCollector, QueryStatsPayloadBuilderParams}; use ic_registry_subnet_type::SubnetType; use ic_replicated_state::page_map::PageAllocatorFileDescriptor; use ic_replicated_state::{CallOrigin, NetworkTopology, ReplicatedState}; @@ -132,6 +132,7 @@ impl ExecutionServices { query_stats_payload_builder, cycles_account_manager, execution_environment, + query_stats_collector, ) = setup_execution_helper( logger.clone(), metrics_registry, @@ -180,6 +181,7 @@ impl ExecutionServices { config.rate_limiting_of_instructions, config.log_memory_store_feature, Arc::clone(&fd_factory), + query_stats_collector, )); Self { @@ -228,6 +230,7 @@ pub struct ExecutionServicesForTesting { pub query_stats_payload_builder: QueryStatsPayloadBuilderParams, pub cycles_account_manager: Arc, pub execution_environment: Arc, + pub local_query_execution_stats: Arc, } impl ExecutionServicesForTesting { @@ -256,6 +259,7 @@ impl ExecutionServicesForTesting { query_stats_payload_builder, cycles_account_manager, execution_environment, + local_query_execution_stats, ) = setup_execution_helper( logger, metrics_registry, @@ -281,6 +285,7 @@ impl ExecutionServicesForTesting { query_stats_payload_builder, cycles_account_manager, execution_environment, + local_query_execution_stats, } } } @@ -308,6 +313,7 @@ fn setup_execution_helper( QueryStatsPayloadBuilderParams, Arc, Arc, + Arc, ) { let scheduler_config = subnet_config.scheduler_config; @@ -353,6 +359,7 @@ fn setup_execution_helper( let (query_stats_collector, query_stats_payload_builder) = ic_query_stats::init_query_stats(logger.clone(), &config, metrics_registry); + let query_stats_collector = Arc::new(query_stats_collector); let canister_manager_config: CanisterMgrConfig = CanisterMgrConfig::new( config.default_provisional_cycles_balance, @@ -405,7 +412,7 @@ fn setup_execution_helper( metrics_registry, scheduler_config.max_instructions_per_query_message, Arc::clone(&cycles_account_manager), - query_stats_collector, + Arc::clone(&query_stats_collector), ); let query_scheduler = QueryScheduler::new( @@ -433,5 +440,6 @@ fn setup_execution_helper( query_stats_payload_builder, cycles_account_manager, exec_env, + query_stats_collector, ) } diff --git a/rs/execution_environment/src/query_handler.rs b/rs/execution_environment/src/query_handler.rs index 8341aa108a53..c781be5931c3 100644 --- a/rs/execution_environment/src/query_handler.rs +++ b/rs/execution_environment/src/query_handler.rs @@ -119,7 +119,7 @@ pub struct InternalHttpQueryHandler { metrics: QueryHandlerMetrics, max_instructions_per_query: NumInstructions, cycles_account_manager: Arc, - local_query_execution_stats: QueryStatsCollector, + local_query_execution_stats: Arc, query_cache: query_cache::QueryCache, } @@ -133,7 +133,7 @@ impl InternalHttpQueryHandler { metrics_registry: &MetricsRegistry, max_instructions_per_query: NumInstructions, cycles_account_manager: Arc, - local_query_execution_stats: QueryStatsCollector, + local_query_execution_stats: Arc, ) -> Self { let query_cache_capacity = config.query_cache_capacity; let query_max_expiry_time = config.query_cache_max_expiry_time; @@ -344,7 +344,7 @@ impl InternalHttpQueryHandler { let query_stats_collector = if self.config.query_stats_aggregation == FlagStatus::Enabled && enable_query_stats_tracking { - Some(&self.local_query_execution_stats) + Some(self.local_query_execution_stats.as_ref()) } else { None }; diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index 476f2541be5f..c158f2e0c4f4 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -28,6 +28,7 @@ use ic_interfaces::execution_environment::{ use ic_logger::{ReplicaLogger, debug, error, fatal, info, new_logger, warn}; use ic_management_canister_types_private::{CanisterStatusType, Method as Ic00Method}; use ic_metrics::MetricsRegistry; +use ic_query_stats::QueryStatsCollector; use ic_registry_resource_limits::ResourceLimits; use ic_registry_subnet_type::SubnetType; use ic_replicated_state::SubnetSchedule; @@ -41,7 +42,7 @@ use ic_types::batch::ChainKeyData; use ic_types::ingress::{IngressState, IngressStatus}; use ic_types::messages::{Ingress, MessageId, NO_DEADLINE, Response, SubnetMessage}; use ic_types::{ - CanisterId, ComputeAllocation, DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT, ExecutionRound, + CanisterId, ComputeAllocation, DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT, ExecutionRound, Height, MemoryAllocation, NumBytes, NumInstructions, NumMessages, NumSlices, Randomness, ReplicaVersion, Time, }; @@ -156,6 +157,7 @@ pub(crate) struct SchedulerImpl { rate_limiting_of_instructions: FlagStatus, log_memory_store_feature: FlagStatus, fd_factory: Arc, + local_query_execution_stats: Arc, } impl SchedulerImpl { @@ -172,6 +174,7 @@ impl SchedulerImpl { rate_limiting_of_instructions: FlagStatus, log_memory_store_feature: FlagStatus, fd_factory: Arc, + local_query_execution_stats: Arc, ) -> Self { let scheduler_cores = config.scheduler_cores as u32; Self { @@ -187,6 +190,7 @@ impl SchedulerImpl { rate_limiting_of_instructions, log_memory_store_feature, fd_factory, + local_query_execution_stats, } } @@ -1133,6 +1137,11 @@ impl Scheduler for SchedulerImpl { // The goal is to ensure that we can track the performance of `execute_round` and its individual components. let root_measurement_scope = MeasurementScope::root(&self.metrics.round); + // Advance the query stats epoch unconditionally so that the previous epoch's + // stats are flushed to the payload builder even when no queries arrive. + self.local_query_execution_stats + .set_epoch_from_height(Height::from(current_round.get())); + if !state.metadata.logs_migrated { let _timer = self .metrics diff --git a/rs/execution_environment/src/scheduler/test_utilities.rs b/rs/execution_environment/src/scheduler/test_utilities.rs index 7d6cee133472..1fb8f59d7034 100644 --- a/rs/execution_environment/src/scheduler/test_utilities.rs +++ b/rs/execution_environment/src/scheduler/test_utilities.rs @@ -1124,6 +1124,7 @@ impl SchedulerTestBuilder { rate_limiting_of_instructions, config.log_memory_store_feature, Arc::new(TestPageAllocatorFileDescriptorImpl::new()), + execution_services.local_query_execution_stats, ); let state_metrics = ReplicatedStateMetrics::new(&self.metrics_registry); diff --git a/rs/pocket_ic_server/tests/test.rs b/rs/pocket_ic_server/tests/test.rs index 9be41ee5ae01..c06a47f29aa3 100644 --- a/rs/pocket_ic_server/tests/test.rs +++ b/rs/pocket_ic_server/tests/test.rs @@ -938,19 +938,23 @@ fn test_query_stats() { assert_eq!(query_stats.request_payload_bytes_total, zero); assert_eq!(query_stats.response_payload_bytes_total, zero); - // Execute 13 query calls (one per each app subnet node) on the counter canister in each of 4 query stats epochs. + // Execute 13 query calls (one per each app subnet node) on the counter canister in each of the first two query stats epochs. // Every single query call has different arguments so that query calls are not cached. + // Due to a delay in query stats aggregation, two more epochs need to pass before + // we observe the stats from an epoch. let mut n: u64 = 0; - for _ in 0..4 { - for _ in 0..13 { - pic.query_call( - canister_id, - Principal::anonymous(), - "read", - n.to_le_bytes().to_vec(), - ) - .unwrap(); - n += 1; + for i in 0..4 { + if i < 2 { + for _ in 0..13 { + pic.query_call( + canister_id, + Principal::anonymous(), + "read", + n.to_le_bytes().to_vec(), + ) + .unwrap(); + n += 1; + } } // Execute one epoch. for _ in 0..60 { @@ -958,7 +962,7 @@ fn test_query_stats() { } } - // Now the number of calls should be set to 26 (13 calls per epoch from 2 epochs) due to a delay in query stats aggregation. + // Now the number of calls should be set to 26 (13 calls per epoch from 2 epochs). let query_stats = pic.canister_status(canister_id, None).unwrap().query_stats; assert_eq!(query_stats.num_calls_total, candid::Nat::from(26_u64)); assert_ne!(query_stats.num_instructions_total, candid::Nat::from(0_u64)); diff --git a/rs/protobuf/def/state/stats/v1/stats.proto b/rs/protobuf/def/state/stats/v1/stats.proto index 2509f3d40d8d..2efd6174370e 100644 --- a/rs/protobuf/def/state/stats/v1/stats.proto +++ b/rs/protobuf/def/state/stats/v1/stats.proto @@ -10,6 +10,7 @@ message Stats { message QueryStats { optional uint64 highest_aggregated_epoch = 1; repeated QueryStatsInner query_stats = 2; + repeated EmptyEpochEntry empty_epochs = 3; } message QueryStatsInner { @@ -21,3 +22,8 @@ message QueryStatsInner { uint64 ingress_payload_size = 5; uint64 egress_payload_size = 6; } + +message EmptyEpochEntry { + types.v1.NodeId proposer = 1; + uint64 epoch = 2; +} diff --git a/rs/protobuf/src/gen/state/state.stats.v1.rs b/rs/protobuf/src/gen/state/state.stats.v1.rs index 22dde6976c22..5706a3ea306f 100644 --- a/rs/protobuf/src/gen/state/state.stats.v1.rs +++ b/rs/protobuf/src/gen/state/state.stats.v1.rs @@ -10,6 +10,8 @@ pub struct QueryStats { pub highest_aggregated_epoch: ::core::option::Option, #[prost(message, repeated, tag = "2")] pub query_stats: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "3")] + pub empty_epochs: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct QueryStatsInner { @@ -28,3 +30,10 @@ pub struct QueryStatsInner { #[prost(uint64, tag = "6")] pub egress_payload_size: u64, } +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct EmptyEpochEntry { + #[prost(message, optional, tag = "1")] + pub proposer: ::core::option::Option, + #[prost(uint64, tag = "2")] + pub epoch: u64, +} diff --git a/rs/query_stats/src/payload_builder.rs b/rs/query_stats/src/payload_builder.rs index e32a04eb636d..8d86aa9dc916 100644 --- a/rs/query_stats/src/payload_builder.rs +++ b/rs/query_stats/src/payload_builder.rs @@ -182,7 +182,7 @@ impl QueryStatsPayloadBuilderImpl { return vec![]; } - let Ok(previous_ids) = + let Ok((previous_ids, has_submitted)) = self.get_previous_ids(self.node_id, current_stats.epoch, past_payloads, context) else { return vec![]; @@ -196,7 +196,10 @@ impl QueryStatsPayloadBuilderImpl { .cloned() .collect::>(); - if messages.is_empty() { + // Skip if there is nothing new to report and we've already submitted for this epoch. + // If we have not yet submitted for this epoch, fall through to send an empty payload + // so the aggregator can see that this node has moved past this epoch. + if messages.is_empty() && has_submitted { return vec![]; } @@ -273,7 +276,7 @@ impl QueryStatsPayloadBuilderImpl { // Get the previous ids, that have been already reported by this node in the epoch // NOTE: This also checks that the epoch that is being reported has not been aggregated yet - let previous_ids = self.get_previous_ids( + let (previous_ids, _) = self.get_previous_ids( payload.proposer, payload.epoch, past_payloads, @@ -304,13 +307,15 @@ impl QueryStatsPayloadBuilderImpl { /// /// This function also returns an error, if we are requesting data on an epoch, /// that has been already aggregated. + /// Returns `(previous_canister_ids, has_submitted)` where `has_submitted` is true if this + /// node has already submitted any report (even empty) for the given epoch. fn get_previous_ids( &self, node_id: NodeId, epoch: QueryStatsEpoch, past_payloads: &[PastPayload], context: &ValidationContext, - ) -> Result, PayloadValidationError> { + ) -> Result<(BTreeSet, bool), PayloadValidationError> { // Get unaggregated stats from certified state let certified_height = context.certified_height; let state_stats = &match self.state_reader.get_state_at(certified_height) { @@ -363,41 +368,40 @@ impl QueryStatsPayloadBuilderImpl { } // Check the certified state for stats that we have already sent - if let Some(state_stats) = get_stats_for_node_id_and_epoch(state_stats, &node_id, &epoch) - .map(|record| record.keys()) - { - previous_ids.extend(state_stats); + let node_record = get_stats_for_node_id_and_epoch(state_stats, &node_id, &epoch); + let has_submitted_in_state = node_record.is_some(); + if let Some(record) = node_record { + previous_ids.extend(record.keys().copied()); } // Check past payloads for stats already sent + let matching_past_payloads: Vec<_> = past_payloads + .iter() + .filter_map(|past_payload| { + QueryStatsPayload::deserialize(past_payload.payload) + .inspect_err(|_| { + error!( + self.log, + "Failed to deserialize past payload, this is a bug" + ); + }) + .ok() + .flatten() + }) + .filter(|stats| stats.epoch == epoch && stats.proposer == node_id) + .collect(); + + let has_submitted_in_past = !matching_past_payloads.is_empty(); previous_ids.extend( - past_payloads - .iter() - // Deserialize the payload - .filter_map(|past_payload| { - QueryStatsPayload::deserialize(past_payload.payload) - .inspect_err(|_| { - error!( - self.log, - "Failed to deserialize past payload, this is a bug" - ); - }) - .ok() - .flatten() - }) - // Filter out payloads that have a different epoch or are sent from different node - .filter(|stats| stats.epoch == epoch && stats.proposer == node_id) - // Map payload to CanisterIds - .flat_map(|stats| { - stats - .stats - .iter() - .map(|stat| stat.canister_id) - .collect::>() - }), + matching_past_payloads + .into_iter() + .flat_map(|stats| stats.stats.into_iter().map(|stat| stat.canister_id)), ); - Ok(previous_ids) + Ok(( + previous_ids, + has_submitted_in_state || has_submitted_in_past, + )) } } diff --git a/rs/types/types/src/batch/execution_environment.rs b/rs/types/types/src/batch/execution_environment.rs index e58b22ffc349..fbbc471a8cff 100644 --- a/rs/types/types/src/batch/execution_environment.rs +++ b/rs/types/types/src/batch/execution_environment.rs @@ -28,7 +28,7 @@ use ic_protobuf::{ proxy::{ProxyDecodeError, try_from_option_field}, state::{ canister_state_bits::v1::{TotalQueryStats as TotalQueryStatsProto, Unsigned128}, - stats::v1::{QueryStats as QueryStatsProto, QueryStatsInner}, + stats::v1::{EmptyEpochEntry, QueryStats as QueryStatsProto, QueryStatsInner}, }, types::v1::{self as pb}, }; @@ -141,11 +141,17 @@ pub struct RawQueryStats { impl RawQueryStats { pub fn as_query_stats(&self) -> Option { - // Serialize BTreeMap as vector let mut query_stats = vec![]; + let mut empty_epochs = vec![]; for (node_id, inner) in &self.stats { for (epoch, inner) in inner { + if inner.is_empty() { + empty_epochs.push(EmptyEpochEntry { + proposer: Some(node_id_into_protobuf(*node_id)), + epoch: epoch.get(), + }); + } for (canister_id, stats) in inner { query_stats.push(QueryStatsInner { proposer: Some(node_id_into_protobuf(*node_id)), @@ -160,12 +166,16 @@ impl RawQueryStats { } } - if query_stats.is_empty() && self.highest_aggregated_epoch.is_none() { + if query_stats.is_empty() + && empty_epochs.is_empty() + && self.highest_aggregated_epoch.is_none() + { None } else { Some(QueryStatsProto { highest_aggregated_epoch: self.highest_aggregated_epoch.map(|epoch| epoch.get()), query_stats, + empty_epochs, }) } } @@ -184,7 +194,6 @@ impl TryFrom for RawQueryStats { let epoch = QueryStatsEpoch::new(entry.epoch); let canister: CanisterId = try_from_option_field(entry.canister, "QueryStatsInner::canister_id")?; - r.stats .entry(proposer) .or_default() @@ -201,6 +210,16 @@ impl TryFrom for RawQueryStats { ); } } + for entry in value.empty_epochs { + if let Ok(proposer) = node_id_try_from_option(entry.proposer) { + let epoch = QueryStatsEpoch::new(entry.epoch); + r.stats + .entry(proposer) + .or_default() + .entry(epoch) + .or_default(); + } + } Ok(r) } } @@ -224,10 +243,6 @@ impl QueryStatsPayload { /// This function will drop trailing stats to guarantee, that the /// payload will fit into the `byte_limit` pub fn serialize_with_limit(&self, byte_limit: NumBytes) -> Vec { - if self.stats.is_empty() { - return vec![]; - } - let mut buffer = vec![].limit(byte_limit.get() as usize); // Encode the metadata about the messages @@ -242,6 +257,11 @@ impl QueryStatsPayload { Err(_) => return vec![], } + // If there are no stats, return just epoch+proposer to signal epoch advancement. + if self.stats.is_empty() { + return buffer.into_inner(); + } + let mut num_stats_included = 0; for entry in &self.stats { if pb::CanisterQueryStats::from(entry)