Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions rs/execution_environment/src/execution_environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,11 +693,11 @@ impl ExecutionEnvironment {
let cost_schedule = state.get_own_cost_schedule();

let mut msg = match msg {
SubnetMessage::Response(response) => {
SubnetMessage::ConsensusResponse(response) => {
let context = state
.metadata
.subnet_call_context_manager
.retrieve_context(response.originator_reply_callback, &self.log);
.retrieve_context(response.callback, &self.log);
return match context {
None => (state, ExecuteSubnetMessageResultType::Finished),
Some(context) => {
Expand All @@ -718,7 +718,7 @@ impl ExecutionEnvironment {
context.max_response_bytes,
registry_settings.subnet_size,
cost_schedule,
NumBytes::from(response.payload_size_bytes()),
response.payload.size_bytes(),
);

self.metrics.observe_http_outcall_price_change(
Expand All @@ -737,7 +737,7 @@ impl ExecutionEnvironment {
info!(
self.log,
"Canister Http request with payload_size {}, max_response_size {}, subnet_size {}, reply_callback_id {}, sender {}, process_id {}",
response.payload_size_bytes().get(),
response.payload.size_bytes().get(),
max_response_size,
registry_settings.subnet_size,
context.request.sender_reply_callback,
Expand All @@ -749,7 +749,7 @@ impl ExecutionEnvironment {
self.metrics.observe_subnet_message(
&request.method_name,
time_elapsed.as_secs_f64(),
&match &response.response_payload {
&match &response.payload {
Payload::Data(_) => Ok(()),
Payload::Reject(_) => Err(ErrorCode::CanisterRejectedMessage),
},
Expand All @@ -758,7 +758,7 @@ impl ExecutionEnvironment {
if let (
SubnetCallContext::SignWithThreshold(threshold_context),
Payload::Data(_),
) = (&context, &response.response_payload)
) = (&context, &response.payload)
{
*state
.metadata
Expand All @@ -768,13 +768,21 @@ impl ExecutionEnvironment {
.or_default() += 1;
}

// Refund the cycles left unspent upfront (`request.payment`)
// plus the per-replica refunds that the participating nodes
// signed over. For non-HTTP responses `refund_shares` is empty,
// so this preserves the previous behavior.
let refund_shares_total: Cycles =
response.refund_shares.iter().map(|(_, c)| *c).sum();
let refund = request.payment + refund_shares_total;

state.push_subnet_output_response(
Response {
originator: request.sender,
respondent: CanisterId::from(self.own_subnet_id),
originator_reply_callback: request.sender_reply_callback,
refund: request.payment,
response_payload: response.response_payload.clone(),
refund,
response_payload: response.payload.clone(),
deadline: request.deadline,
}
.into(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use ic_metrics::MetricsRegistry;
use ic_metrics::buckets::{decimal_buckets, decimal_buckets_with_zero};
use ic_replicated_state::metadata_state::subnet_call_context_manager::InstallCodeCallId;
use ic_types::CanisterId;
use ic_types::batch::ConsensusResponse;
use ic_types::canister_http::{CanisterHttpRequestContext, MAX_CANISTER_HTTP_RESPONSE_BYTES};
use ic_types::messages::Response;
use ic_types_cycles::NominalCycles;
use prometheus::{Histogram, HistogramVec, IntCounter};
use std::str::FromStr;
Expand Down Expand Up @@ -232,7 +232,7 @@ impl ExecutionEnvironmentMetrics {
pub(crate) fn observe_http_outcall_request(
&self,
context: &CanisterHttpRequestContext,
response: &Response,
response: &ConsensusResponse,
) {
self.http_outcalls_metrics
.request_size
Expand All @@ -250,7 +250,7 @@ impl ExecutionEnvironmentMetrics {

self.http_outcalls_metrics
.payload_size
.observe(response.payload_size_bytes().get() as f64);
.observe(response.payload.size_bytes().get() as f64);
}

pub(crate) fn observe_http_outcall_price_change(
Expand Down
23 changes: 5 additions & 18 deletions rs/execution_environment/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ use ic_replicated_state::{
};
use ic_types::batch::ChainKeyData;
use ic_types::ingress::{IngressState, IngressStatus};
use ic_types::messages::{Ingress, MessageId, NO_DEADLINE, Response, SubnetMessage};
use ic_types::messages::{Ingress, MessageId, SubnetMessage};
use ic_types::{
CanisterId, ComputeAllocation, DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT, ExecutionRound,
MemoryAllocation, NumBytes, NumInstructions, NumMessages, NumSlices, Randomness,
ReplicaVersion, Time,
};
use ic_types_cycles::{CanisterCyclesCostSchedule, Cycles};
use ic_types_cycles::CanisterCyclesCostSchedule;
use more_asserts::{debug_assert_ge, debug_assert_le, debug_assert_lt};
use std::cell::RefCell;
use std::collections::BTreeSet;
Expand Down Expand Up @@ -1295,20 +1295,7 @@ impl Scheduler for SchedulerImpl {
// state. That can be changed in the future as we optimize scheduling.
while let Some(response) = state.consensus_queue.pop() {
let (new_state, _) = self.execute_subnet_message(
// Wrap the callback ID and payload into a Response, to make it easier for
// `execute_subnet_message()` to deal with. All other fields will be ignored by
// `execute_subnet_message()`.
SubnetMessage::Response(
Response {
originator: CanisterId::ic_00(),
respondent: CanisterId::ic_00(),
originator_reply_callback: response.callback,
refund: Cycles::zero(),
response_payload: response.payload,
deadline: NO_DEADLINE,
}
.into(),
),
SubnetMessage::ConsensusResponse(Arc::new(response)),
state,
&mut csprng,
current_round,
Expand Down Expand Up @@ -1844,7 +1831,7 @@ fn can_execute_subnet_msg(
let maybe_method = match msg {
SubnetMessage::Ingress(ingress) => Ic00Method::from_str(ingress.method_name.as_str()).ok(),
SubnetMessage::Request(request) => Ic00Method::from_str(request.method_name.as_str()).ok(),
SubnetMessage::Response { .. } => None,
SubnetMessage::ConsensusResponse { .. } => None,
};
let Some(method) = maybe_method else {
// If this is a response or the method name is not valid, execute the message.
Expand Down Expand Up @@ -1896,7 +1883,7 @@ fn get_instruction_limits_for_subnet_message(
// for install code in which case the default limits are overriden.
let default_limits = InstructionLimits::new(NumInstructions::new(0), NumInstructions::new(0));
let method_name = match &msg {
SubnetMessage::Response { .. } => {
SubnetMessage::ConsensusResponse { .. } => {
return default_limits;
}
SubnetMessage::Ingress(ingress) => &ingress.method_name,
Expand Down
86 changes: 70 additions & 16 deletions rs/https_outcalls/consensus/src/payload_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use ic_types::{
messages::{CallbackId, Payload, RejectContext},
registry::RegistryClientError,
};
use ic_types_cycles::Cycles;
use std::{
collections::{BTreeMap, HashSet},
sync::{Arc, RwLock},
Expand Down Expand Up @@ -929,6 +930,14 @@ impl IntoMessages<(Vec<ConsensusResponse>, CanisterHttpBatchStats)>
stats.single_signature_responses += 1;
}
stats.responses += 1;
// Carry each participating replica's signed refund share to
// Execution, which sums them up to refund the caller.
let refund_shares = response
.proof
.signatures
.iter()
.map(|(node_id, signature)| (*node_id, signature.payment_receipt.refund))
.collect();
ConsensusResponse::new(
response.content.id,
match response.content.content {
Expand All @@ -938,6 +947,7 @@ impl IntoMessages<(Vec<ConsensusResponse>, CanisterHttpBatchStats)>
}
},
)
.with_refund_shares(refund_shares)
});

let timeouts = messages.timeouts.iter().map(|callback| {
Expand Down Expand Up @@ -989,6 +999,31 @@ impl IntoMessages<(Vec<ConsensusResponse>, CanisterHttpBatchStats)>
}
}

/// Extracts the per-replica refund shares from a collection of response shares,
/// i.e. the refund each signing node returns out of its per-replica allowance.
fn refund_shares_from_shares(shares: &[CanisterHttpResponseShare]) -> Vec<(NodeId, Cycles)> {
shares
.iter()
.map(|share| (share.signature.signer, share.content.refund()))
.collect()
}

/// Extracts the per-replica refund shares from a collection of flexible
/// responses, each carrying a single-signer proof.
fn refund_shares_from_flexible(
responses: &[FlexibleCanisterHttpResponseWithProof],
) -> Vec<(NodeId, Cycles)> {
responses
.iter()
.map(|response| {
(
response.proof.signature.signer,
response.proof.content.refund(),
)
})
.collect()
}

/// Converts a [`FlexibleCanisterHttpResponses`] into a [`ConsensusResponse`].
///
/// Returns `None` if Candid decoding/encoding fails, which leads to skipping
Expand All @@ -997,6 +1032,7 @@ impl IntoMessages<(Vec<ConsensusResponse>, CanisterHttpBatchStats)>
fn flexible_ok_responses_into_consensus_response(
response_group: FlexibleCanisterHttpResponses,
) -> Option<ConsensusResponse> {
let refund_shares = refund_shares_from_flexible(&response_group.responses);
let payloads: Vec<_> = response_group
.responses
.into_iter()
Expand All @@ -1015,10 +1051,10 @@ fn flexible_ok_responses_into_consensus_response(

let bytes = Encode!(&FlexibleHttpRequestResult::Ok(payloads)).ok()?;

Some(ConsensusResponse::new(
response_group.callback_id,
Payload::Data(bytes),
))
Some(
ConsensusResponse::new(response_group.callback_id, Payload::Data(bytes))
.with_refund_shares(refund_shares),
)
}

/// Converts a [`FlexibleCanisterHttpError`] into a [`ConsensusResponse`]
Expand All @@ -1030,6 +1066,17 @@ fn flexible_error_into_consensus_response(
) -> Option<ConsensusResponse> {
let callback_id = error.callback_id();

// A timed-out flexible request has no signed shares, hence no refunds.
let refund_shares = match &error {
FlexibleCanisterHttpError::Timeout { .. } => vec![],
FlexibleCanisterHttpError::TooManyRejects {
reject_responses, ..
} => refund_shares_from_flexible(reject_responses),
FlexibleCanisterHttpError::ResponsesTooLarge {
all_seen_shares, ..
} => refund_shares_from_shares(all_seen_shares),
};

let err = match error {
FlexibleCanisterHttpError::Timeout { .. } => FlexibleHttpRequestErr {
global_error: Some(FlexibleHttpGlobalError::Timeout(candid::Reserved)),
Expand Down Expand Up @@ -1138,7 +1185,9 @@ fn flexible_error_into_consensus_response(

let bytes = Encode!(&FlexibleHttpRequestResult::Err(err)).ok()?;

Some(ConsensusResponse::new(callback_id, Payload::Data(bytes)))
Some(
ConsensusResponse::new(callback_id, Payload::Data(bytes)).with_refund_shares(refund_shares),
)
}

/// Turns a [`CanisterHttpResponseDivergence`] into a [`ConsensusResponse`] containing a rejection.
Expand All @@ -1162,6 +1211,8 @@ fn divergence_response_into_reject(
return None;
};

let refund_shares = refund_shares_from_shares(&response.shares);

// Count the different content hashes, that we have encountered in the divergence response
let mut hash_counts = BTreeMap::new();
response
Expand All @@ -1187,17 +1238,20 @@ fn divergence_response_into_reject(
.map(|(hash, count)| format!("[{}: {}]", hex::encode(hash), count))
.collect::<Vec<_>>();

Some(ConsensusResponse::new(
id,
Payload::Reject(RejectContext::new(
RejectCode::SysTransient,
format!(
"No consensus could be reached. Replicas had different responses. Details: request_id: {}, hashes: {}",
id,
hash_counts.join(", ")
),
)),
))
Some(
ConsensusResponse::new(
id,
Payload::Reject(RejectContext::new(
RejectCode::SysTransient,
format!(
"No consensus could be reached. Replicas had different responses. Details: request_id: {}, hashes: {}",
id,
hash_counts.join(", ")
),
)),
)
.with_refund_shares(refund_shares),
)
}

fn validation_failed(
Expand Down
8 changes: 4 additions & 4 deletions rs/test_utilities/execution_environment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,7 @@ impl ExecutionTest {
let subnet_size = self.subnet_size();
let cost_schedule = self.cost_schedule();
let message = match message {
SubnetMessage::Response(_) => return NominalCycles::zero(),
SubnetMessage::ConsensusResponse(_) => return NominalCycles::zero(),
SubnetMessage::Request(request) => CanisterCall::Request(request),
SubnetMessage::Ingress(ingress) => CanisterCall::Ingress(ingress),
};
Expand Down Expand Up @@ -1661,7 +1661,7 @@ impl ExecutionTest {
let cycles_used = cycles_used_after - cycles_used_before;
if instructions_used.get() != 0 {
let method_name = match message {
SubnetMessage::Response(_) => None,
SubnetMessage::ConsensusResponse(_) => None,
SubnetMessage::Request(ref request) => Some(request.method_name.clone()),
SubnetMessage::Ingress(ref ingress) => Some(ingress.method_name.clone()),
};
Expand Down Expand Up @@ -3241,7 +3241,7 @@ pub fn get_output_messages(state: &mut ReplicatedState) -> Vec<(CanisterId, Requ

fn get_effective_canister_id(message: SubnetMessage) -> Option<CanisterId> {
match message {
SubnetMessage::Response(_) => None,
SubnetMessage::ConsensusResponse(_) => None,
SubnetMessage::Request(request) => request.extract_effective_canister_id(),
SubnetMessage::Ingress(ingress) => {
let signed_ingress_content = SignedIngressContent::new_for_testing(
Expand All @@ -3260,7 +3260,7 @@ fn get_effective_canister_id(message: SubnetMessage) -> Option<CanisterId> {

fn check_is_install_code(message: SubnetMessage) -> bool {
let message = match message {
SubnetMessage::Response(_) => return false,
SubnetMessage::ConsensusResponse(_) => return false,
SubnetMessage::Request(request) => CanisterCall::Request(request),
SubnetMessage::Ingress(ingress) => CanisterCall::Ingress(ingress),
};
Expand Down
Loading
Loading