Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions rs/execution_environment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use ic_interfaces::execution_environment::{
use ic_interfaces_state_manager::StateReader;
use ic_logger::ReplicaLogger;
use ic_metrics::MetricsRegistry;
use ic_query_stats::QueryStatsPayloadBuilderParams;
use ic_query_stats::{QueryStatsCollector, QueryStatsPayloadBuilderParams};
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::page_map::PageAllocatorFileDescriptor;
use ic_replicated_state::{CallOrigin, NetworkTopology, ReplicatedState};
Expand Down Expand Up @@ -132,6 +132,7 @@ impl ExecutionServices {
query_stats_payload_builder,
cycles_account_manager,
execution_environment,
query_stats_collector,
) = setup_execution_helper(
logger.clone(),
metrics_registry,
Expand Down Expand Up @@ -180,6 +181,7 @@ impl ExecutionServices {
config.rate_limiting_of_instructions,
config.log_memory_store_feature,
Arc::clone(&fd_factory),
query_stats_collector,
));

Self {
Expand Down Expand Up @@ -228,6 +230,7 @@ pub struct ExecutionServicesForTesting {
pub query_stats_payload_builder: QueryStatsPayloadBuilderParams,
pub cycles_account_manager: Arc<CyclesAccountManager>,
pub execution_environment: Arc<ExecutionEnvironment>,
pub local_query_execution_stats: Arc<QueryStatsCollector>,
}

impl ExecutionServicesForTesting {
Expand Down Expand Up @@ -256,6 +259,7 @@ impl ExecutionServicesForTesting {
query_stats_payload_builder,
cycles_account_manager,
execution_environment,
local_query_execution_stats,
) = setup_execution_helper(
logger,
metrics_registry,
Expand All @@ -281,6 +285,7 @@ impl ExecutionServicesForTesting {
query_stats_payload_builder,
cycles_account_manager,
execution_environment,
local_query_execution_stats,
}
}
}
Expand Down Expand Up @@ -308,6 +313,7 @@ fn setup_execution_helper(
QueryStatsPayloadBuilderParams,
Arc<CyclesAccountManager>,
Arc<ExecutionEnvironment>,
Arc<QueryStatsCollector>,
) {
let scheduler_config = subnet_config.scheduler_config;

Expand Down Expand Up @@ -353,6 +359,7 @@ fn setup_execution_helper(

let (query_stats_collector, query_stats_payload_builder) =
ic_query_stats::init_query_stats(logger.clone(), &config, metrics_registry);
let query_stats_collector = Arc::new(query_stats_collector);

let canister_manager_config: CanisterMgrConfig = CanisterMgrConfig::new(
config.default_provisional_cycles_balance,
Expand Down Expand Up @@ -405,7 +412,7 @@ fn setup_execution_helper(
metrics_registry,
scheduler_config.max_instructions_per_query_message,
Arc::clone(&cycles_account_manager),
query_stats_collector,
Arc::clone(&query_stats_collector),
);

let query_scheduler = QueryScheduler::new(
Expand Down Expand Up @@ -433,5 +440,6 @@ fn setup_execution_helper(
query_stats_payload_builder,
cycles_account_manager,
exec_env,
query_stats_collector,
)
}
6 changes: 3 additions & 3 deletions rs/execution_environment/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub struct InternalHttpQueryHandler {
metrics: QueryHandlerMetrics,
max_instructions_per_query: NumInstructions,
cycles_account_manager: Arc<CyclesAccountManager>,
local_query_execution_stats: QueryStatsCollector,
local_query_execution_stats: Arc<QueryStatsCollector>,
query_cache: query_cache::QueryCache,
}

Expand All @@ -133,7 +133,7 @@ impl InternalHttpQueryHandler {
metrics_registry: &MetricsRegistry,
max_instructions_per_query: NumInstructions,
cycles_account_manager: Arc<CyclesAccountManager>,
local_query_execution_stats: QueryStatsCollector,
local_query_execution_stats: Arc<QueryStatsCollector>,
) -> Self {
let query_cache_capacity = config.query_cache_capacity;
let query_max_expiry_time = config.query_cache_max_expiry_time;
Expand Down Expand Up @@ -344,7 +344,7 @@ impl InternalHttpQueryHandler {
let query_stats_collector = if self.config.query_stats_aggregation == FlagStatus::Enabled
&& enable_query_stats_tracking
{
Some(&self.local_query_execution_stats)
Some(self.local_query_execution_stats.as_ref())
} else {
None
};
Expand Down
11 changes: 10 additions & 1 deletion rs/execution_environment/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use ic_interfaces::execution_environment::{
use ic_logger::{ReplicaLogger, debug, error, fatal, info, new_logger, warn};
use ic_management_canister_types_private::{CanisterStatusType, Method as Ic00Method};
use ic_metrics::MetricsRegistry;
use ic_query_stats::QueryStatsCollector;
use ic_registry_resource_limits::ResourceLimits;
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::SubnetSchedule;
Expand All @@ -41,7 +42,7 @@ use ic_types::batch::ChainKeyData;
use ic_types::ingress::{IngressState, IngressStatus};
use ic_types::messages::{Ingress, MessageId, NO_DEADLINE, Response, SubnetMessage};
use ic_types::{
CanisterId, ComputeAllocation, DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT, ExecutionRound,
CanisterId, ComputeAllocation, DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT, ExecutionRound, Height,
MemoryAllocation, NumBytes, NumInstructions, NumMessages, NumSlices, Randomness,
ReplicaVersion, Time,
};
Expand Down Expand Up @@ -156,6 +157,7 @@ pub(crate) struct SchedulerImpl {
rate_limiting_of_instructions: FlagStatus,
log_memory_store_feature: FlagStatus,
fd_factory: Arc<dyn PageAllocatorFileDescriptor>,
local_query_execution_stats: Arc<QueryStatsCollector>,
}

impl SchedulerImpl {
Expand All @@ -172,6 +174,7 @@ impl SchedulerImpl {
rate_limiting_of_instructions: FlagStatus,
log_memory_store_feature: FlagStatus,
fd_factory: Arc<dyn PageAllocatorFileDescriptor>,
local_query_execution_stats: Arc<QueryStatsCollector>,
) -> Self {
let scheduler_cores = config.scheduler_cores as u32;
Self {
Expand All @@ -187,6 +190,7 @@ impl SchedulerImpl {
rate_limiting_of_instructions,
log_memory_store_feature,
fd_factory,
local_query_execution_stats,
}
}

Expand Down Expand Up @@ -1133,6 +1137,11 @@ impl Scheduler for SchedulerImpl {
// The goal is to ensure that we can track the performance of `execute_round` and its individual components.
let root_measurement_scope = MeasurementScope::root(&self.metrics.round);

// Advance the query stats epoch unconditionally so that the previous epoch's
// stats are flushed to the payload builder even when no queries arrive.
self.local_query_execution_stats
.set_epoch_from_height(Height::from(current_round.get()));

if !state.metadata.logs_migrated {
let _timer = self
.metrics
Expand Down
1 change: 1 addition & 0 deletions rs/execution_environment/src/scheduler/test_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,7 @@ impl SchedulerTestBuilder {
rate_limiting_of_instructions,
config.log_memory_store_feature,
Arc::new(TestPageAllocatorFileDescriptorImpl::new()),
execution_services.local_query_execution_stats,
);

let state_metrics = ReplicatedStateMetrics::new(&self.metrics_registry);
Expand Down
28 changes: 16 additions & 12 deletions rs/pocket_ic_server/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,27 +938,31 @@ fn test_query_stats() {
assert_eq!(query_stats.request_payload_bytes_total, zero);
assert_eq!(query_stats.response_payload_bytes_total, zero);

// Execute 13 query calls (one per each app subnet node) on the counter canister in each of 4 query stats epochs.
// Execute 13 query calls (one per each app subnet node) on the counter canister in each of the first two query stats epochs.
// Every single query call has different arguments so that query calls are not cached.
// Due to a delay in query stats aggregation, two more epochs need to pass before
// we observe the stats from an epoch.
let mut n: u64 = 0;
for _ in 0..4 {
for _ in 0..13 {
pic.query_call(
canister_id,
Principal::anonymous(),
"read",
n.to_le_bytes().to_vec(),
)
.unwrap();
n += 1;
for i in 0..4 {
if i < 2 {
for _ in 0..13 {
pic.query_call(
canister_id,
Principal::anonymous(),
"read",
n.to_le_bytes().to_vec(),
)
.unwrap();
n += 1;
}
}
// Execute one epoch.
for _ in 0..60 {
pic.tick();
}
}

// Now the number of calls should be set to 26 (13 calls per epoch from 2 epochs) due to a delay in query stats aggregation.
// Now the number of calls should be set to 26 (13 calls per epoch from 2 epochs).
let query_stats = pic.canister_status(canister_id, None).unwrap().query_stats;
assert_eq!(query_stats.num_calls_total, candid::Nat::from(26_u64));
assert_ne!(query_stats.num_instructions_total, candid::Nat::from(0_u64));
Expand Down
6 changes: 6 additions & 0 deletions rs/protobuf/def/state/stats/v1/stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ message Stats {
message QueryStats {
optional uint64 highest_aggregated_epoch = 1;
repeated QueryStatsInner query_stats = 2;
repeated EmptyEpochEntry empty_epochs = 3;
}

message QueryStatsInner {
Expand All @@ -21,3 +22,8 @@ message QueryStatsInner {
uint64 ingress_payload_size = 5;
uint64 egress_payload_size = 6;
}

message EmptyEpochEntry {
types.v1.NodeId proposer = 1;
uint64 epoch = 2;
}
9 changes: 9 additions & 0 deletions rs/protobuf/src/gen/state/state.stats.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub struct QueryStats {
pub highest_aggregated_epoch: ::core::option::Option<u64>,
#[prost(message, repeated, tag = "2")]
pub query_stats: ::prost::alloc::vec::Vec<QueryStatsInner>,
#[prost(message, repeated, tag = "3")]
pub empty_epochs: ::prost::alloc::vec::Vec<EmptyEpochEntry>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct QueryStatsInner {
Expand All @@ -28,3 +30,10 @@ pub struct QueryStatsInner {
#[prost(uint64, tag = "6")]
pub egress_payload_size: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EmptyEpochEntry {
#[prost(message, optional, tag = "1")]
pub proposer: ::core::option::Option<super::super::super::types::v1::NodeId>,
#[prost(uint64, tag = "2")]
pub epoch: u64,
}
70 changes: 37 additions & 33 deletions rs/query_stats/src/payload_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl QueryStatsPayloadBuilderImpl {
return vec![];
}

let Ok(previous_ids) =
let Ok((previous_ids, has_submitted)) =
self.get_previous_ids(self.node_id, current_stats.epoch, past_payloads, context)
else {
return vec![];
Expand All @@ -196,7 +196,10 @@ impl QueryStatsPayloadBuilderImpl {
.cloned()
.collect::<Vec<_>>();

if messages.is_empty() {
// Skip if there is nothing new to report and we've already submitted for this epoch.
// If we have not yet submitted for this epoch, fall through to send an empty payload
// so the aggregator can see that this node has moved past this epoch.
if messages.is_empty() && has_submitted {
return vec![];
}
Comment on lines +199 to 204

Expand Down Expand Up @@ -273,7 +276,7 @@ impl QueryStatsPayloadBuilderImpl {

// Get the previous ids, that have been already reported by this node in the epoch
// NOTE: This also checks that the epoch that is being reported has not been aggregated yet
let previous_ids = self.get_previous_ids(
let (previous_ids, _) = self.get_previous_ids(
payload.proposer,
payload.epoch,
past_payloads,
Expand Down Expand Up @@ -304,13 +307,15 @@ impl QueryStatsPayloadBuilderImpl {
///
/// This function also returns an error, if we are requesting data on an epoch,
/// that has been already aggregated.
/// Returns `(previous_canister_ids, has_submitted)` where `has_submitted` is true if this
/// node has already submitted any report (even empty) for the given epoch.
fn get_previous_ids(
&self,
node_id: NodeId,
epoch: QueryStatsEpoch,
past_payloads: &[PastPayload],
context: &ValidationContext,
) -> Result<BTreeSet<CanisterId>, PayloadValidationError> {
) -> Result<(BTreeSet<CanisterId>, bool), PayloadValidationError> {
// Get unaggregated stats from certified state
let certified_height = context.certified_height;
let state_stats = &match self.state_reader.get_state_at(certified_height) {
Expand Down Expand Up @@ -363,41 +368,40 @@ impl QueryStatsPayloadBuilderImpl {
}

// Check the certified state for stats that we have already sent
if let Some(state_stats) = get_stats_for_node_id_and_epoch(state_stats, &node_id, &epoch)
.map(|record| record.keys())
{
previous_ids.extend(state_stats);
let node_record = get_stats_for_node_id_and_epoch(state_stats, &node_id, &epoch);
let has_submitted_in_state = node_record.is_some();
if let Some(record) = node_record {
previous_ids.extend(record.keys().copied());
}

// Check past payloads for stats already sent
let matching_past_payloads: Vec<_> = past_payloads
.iter()
.filter_map(|past_payload| {
QueryStatsPayload::deserialize(past_payload.payload)
.inspect_err(|_| {
error!(
self.log,
"Failed to deserialize past payload, this is a bug"
);
})
.ok()
.flatten()
})
.filter(|stats| stats.epoch == epoch && stats.proposer == node_id)
.collect();

let has_submitted_in_past = !matching_past_payloads.is_empty();
previous_ids.extend(
past_payloads
.iter()
// Deserialize the payload
.filter_map(|past_payload| {
QueryStatsPayload::deserialize(past_payload.payload)
.inspect_err(|_| {
error!(
self.log,
"Failed to deserialize past payload, this is a bug"
);
})
.ok()
.flatten()
})
// Filter out payloads that have a different epoch or are sent from different node
.filter(|stats| stats.epoch == epoch && stats.proposer == node_id)
// Map payload to CanisterIds
.flat_map(|stats| {
stats
.stats
.iter()
.map(|stat| stat.canister_id)
.collect::<Vec<CanisterId>>()
}),
matching_past_payloads
.into_iter()
.flat_map(|stats| stats.stats.into_iter().map(|stat| stat.canister_id)),
);

Ok(previous_ids)
Ok((
previous_ids,
has_submitted_in_state || has_submitted_in_past,
))
}
}

Expand Down
Loading
Loading