Skip to content
1 change: 1 addition & 0 deletions rs/http_endpoints/fuzz/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ DEV_DEPENDENCIES = [
"//rs/crypto/tls_interfaces",
"//rs/crypto/tls_interfaces/mocks",
"//rs/crypto/tree_hash",
"//rs/http_endpoints/test_agent",
"//rs/interfaces/mocks",
"//rs/interfaces/state_manager",
"//rs/interfaces/state_manager/mocks",
Expand Down
65 changes: 50 additions & 15 deletions rs/http_endpoints/public/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ pub(crate) enum IngressError {
UserError(UserError),
}

pub(crate) enum EffectiveDestination {
Canister(CanisterId),
Subnet(SubnetId),
}

impl From<HttpError> for IngressError {
fn from(err: HttpError) -> Self {
IngressError::HttpError(err)
Expand Down Expand Up @@ -206,7 +211,7 @@ impl IngressValidator {
pub(crate) async fn validate_ingress_message(
self,
request: HttpRequestEnvelope<HttpCallContent>,
effective_canister_id: CanisterId,
effective_destination: EffectiveDestination,
) -> Result<IngressMessageSubmitter, IngressError> {
let Self {
log,
Expand Down Expand Up @@ -243,20 +248,50 @@ impl IngressValidator {
message: format!("Could not parse body as call message: {e}"),
})?;

// Reject requests where `canister_id` != `effective_canister_id` for non mgmt canister calls.
// This needs to be enforced because boundary nodes block access based on the `effective_canister_id`
// in the url and the replica processes the request based on the `canister_id`.
// If this is not enforced, a blocked canisters can still be accessed by specifying
// a non-blocked `effective_canister_id` and a blocked `canister_id`.
if msg.canister_id() != CanisterId::ic_00() && msg.canister_id() != effective_canister_id {
Err(HttpError {
status: StatusCode::BAD_REQUEST,
message: format!(
"Specified CanisterId {} does not match effective canister id in URL {}",
msg.canister_id(),
effective_canister_id
),
})?;
match effective_destination {
// Reject requests where `canister_id` != `effective_canister_id` for non mgmt canister calls.
// This needs to be enforced because boundary nodes block access based on the `effective_canister_id`
// in the url and the replica processes the request based on the `canister_id`.
// If this is not enforced, a blocked canisters can still be accessed by specifying
// a non-blocked `effective_canister_id` and a blocked `canister_id`.
EffectiveDestination::Canister(effective_canister_id) => {
if msg.canister_id() != CanisterId::ic_00()
&& msg.canister_id() != effective_canister_id
{
Err(HttpError {
status: StatusCode::BAD_REQUEST,
message: format!(
"Specified CanisterId {} does not match effective canister id in URL {}",
msg.canister_id(),
effective_canister_id
),
})?;
}
}
EffectiveDestination::Subnet(effective_subnet_id) => {
if effective_subnet_id != subnet_id {
Err(HttpError {
status: StatusCode::BAD_REQUEST,
message: format!(
"Specified SubnetId {} does not match the subnet id of this node {}",
effective_subnet_id, subnet_id
),
})?;
}
if msg.canister_id() != CanisterId::ic_00()
|| msg.method_name() != "create_canister"
{
Err(HttpError {
status: StatusCode::BAD_REQUEST,
message: format!(
"Subnet call endpoint only accepts calls to the management canister ({}) 'create_canister' method, got canister_id={} method_name='{}'",
CanisterId::ic_00(),
msg.canister_id(),
msg.method_name()
),
})?;
}
}
}

let message_id = msg.id();
Expand Down
7 changes: 5 additions & 2 deletions rs/http_endpoints/public/src/call/call_async.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Module that deals with requests to /api/v2/canister/.../call

use super::{IngressError, IngressValidator, IngressWatcherHandle};
use super::{EffectiveDestination, IngressError, IngressValidator, IngressWatcherHandle};
use crate::{
HttpError,
common::{Cbor, CborUserError, WithTimeout},
Expand Down Expand Up @@ -112,7 +112,10 @@ async fn handler(
let logger = ingress_validator.log.clone();

let ingress_submitter = ingress_validator
.validate_ingress_message(request, effective_canister_id)
.validate_ingress_message(
request,
EffectiveDestination::Canister(effective_canister_id),
)
.await?;

let message_id = ingress_submitter.message_id();
Expand Down
48 changes: 30 additions & 18 deletions rs/http_endpoints/public/src/call/call_sync.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Module that deals with requests to /api/{v3,v4}/canister/.../call.

use super::{
IngressError, IngressValidator,
EffectiveDestination, IngressError, IngressValidator,
ingress_watcher::{IngressWatcherHandle, SubscriptionError},
};
use crate::{
Expand Down Expand Up @@ -33,7 +33,7 @@ use ic_logger::{error, warn};
use ic_nns_delegation_manager::{CanisterRangesFilter, NNSDelegationReader};
use ic_replicated_state::ReplicatedState;
use ic_types::{
CanisterId,
CanisterId, PrincipalId, SubnetId,
consensus::certification::Certification,
messages::{Blob, Certificate, HttpCallContent, HttpRequestEnvelope, MessageId},
};
Expand All @@ -54,6 +54,8 @@ pub enum Version {
V3,
// Synchronous endpoint with the NNS delegation using the tree format of the canister ranges.
V4,
// Synchronous subnet endpoint with no canister ranges in the NNS delegation.
SubnetV4,
}

enum SyncCallResponse {
Expand Down Expand Up @@ -133,6 +135,7 @@ pub(crate) fn route(version: Version) -> &'static str {
match version {
Version::V3 => "/api/v3/canister/{effective_canister_id}/call",
Version::V4 => "/api/v4/canister/{effective_canister_id}/call",
Version::SubnetV4 => "/api/v4/subnet/{effective_subnet_id}/call",
}
}

Expand Down Expand Up @@ -184,9 +187,9 @@ pub fn new_service(
BoxCloneService::new(router.into_service())
}

/// Handles a call to /api/{v3,v4}/canister/../call
/// Handles a call to /api/{v3,v4}/canister/../call and /api/v4/subnet/../call
async fn call_sync(
axum::extract::Path(effective_canister_id): axum::extract::Path<CanisterId>,
axum::extract::Path(id): axum::extract::Path<PrincipalId>,
State(SynchronousCallHandlerState {
call_handler,
ingress_watcher_handle,
Expand All @@ -198,10 +201,30 @@ async fn call_sync(
}): State<SynchronousCallHandlerState>,
WithTimeout(Cbor(request)): WithTimeout<Cbor<HttpRequestEnvelope<HttpCallContent>>>,
) -> SyncCallResponse {
let (effective_destination, delegation_filter) = match version {
Version::V3 => {
let canister_id = CanisterId::unchecked_from_principal(id);
(
EffectiveDestination::Canister(canister_id),
CanisterRangesFilter::Flat,
)
}
Version::V4 => {
let canister_id = CanisterId::unchecked_from_principal(id);
(
EffectiveDestination::Canister(canister_id),
CanisterRangesFilter::Tree(canister_id),
)
}
Version::SubnetV4 => (
EffectiveDestination::Subnet(SubnetId::from(id)),
CanisterRangesFilter::None,
),
};
let log = call_handler.log.clone();

let ingress_submitter = match call_handler
.validate_ingress_message(request, effective_canister_id)
.validate_ingress_message(request, effective_destination)
.await
{
Ok(ingress_submitter) => ingress_submitter,
Expand All @@ -217,11 +240,6 @@ async fn call_sync(
tree_and_certificate_for_message(state_reader.clone(), message_id.clone()).await
&& let ParsedMessageStatus::Known(_) = parsed_message_status(&tree, &message_id)
{
let delegation_from_nns = match version {
Version::V3 => nns_delegation_reader.get_delegation(CanisterRangesFilter::Flat),
Version::V4 => nns_delegation_reader
.get_delegation(CanisterRangesFilter::Tree(effective_canister_id)),
};
let signature = certification.signed.signature.signature.get().0;

metrics
Expand All @@ -232,7 +250,7 @@ async fn call_sync(
return SyncCallResponse::Certificate(Certificate {
tree,
signature: Blob(signature),
delegation: delegation_from_nns,
delegation: nns_delegation_reader.get_delegation(delegation_filter),
});
};

Expand Down Expand Up @@ -335,18 +353,12 @@ async fn call_sync(
);
}

let delegation_from_nns = match version {
Version::V3 => nns_delegation_reader.get_delegation(CanisterRangesFilter::Flat),
Version::V4 => {
nns_delegation_reader.get_delegation(CanisterRangesFilter::Tree(effective_canister_id))
}
};
let signature = certification.signed.signature.signature.get().0;

SyncCallResponse::Certificate(Certificate {
tree,
signature: Blob(signature),
delegation: delegation_from_nns,
delegation: nns_delegation_reader.get_delegation(delegation_filter),
})
}

Expand Down
9 changes: 9 additions & 0 deletions rs/http_endpoints/public/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ struct HttpHandler {
call_v2_router: Router,
call_v3_router: Router,
call_v4_router: Router,
subnet_call_v4_router: Router,
query_v2_router: Router,
query_v3_router: Router,
catchup_router: Router,
Expand Down Expand Up @@ -328,6 +329,8 @@ pub fn start_server(
let call_v3_router = call_sync_router(call_sync::Version::V3);
let call_v4_router = call_sync_router(call_sync::Version::V4);

let subnet_call_v4_router = call_sync_router(call_sync::Version::SubnetV4);

let query_router = |version| {
QueryServiceBuilder::builder(
log.clone(),
Expand Down Expand Up @@ -420,6 +423,7 @@ pub fn start_server(
call_v2_router,
call_v3_router,
call_v4_router,
subnet_call_v4_router,
query_v2_router,
query_v3_router,
status_router,
Expand Down Expand Up @@ -607,6 +611,7 @@ fn make_router(
// TODO(CON-1574): see if there is any reasonable explicit concurrency limit we could use here.
.merge(http_handler.call_v3_router)
.merge(http_handler.call_v4_router)
.merge(http_handler.subnet_call_v4_router)
.merge(http_handler.query_v2_router.layer(service_builder(
GlobalConcurrencyLimitLayer::new(config.max_query_concurrent_requests),
)))
Expand Down Expand Up @@ -809,6 +814,10 @@ pub(crate) mod tests {
call_sync::route(call_sync::Version::V4),
axum::routing::post(dummy),
),
subnet_call_v4_router: Router::new().route(
call_sync::route(call_sync::Version::SubnetV4),
axum::routing::post(dummy),
),
query_v2_router: Router::new().route(
QueryService::route(query::Version::V2),
axum::routing::post(dummy_cbor),
Expand Down
32 changes: 31 additions & 1 deletion rs/http_endpoints/public/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use ic_replicated_state::{
};
use ic_test_utilities_types::ids::{node_test_id, subnet_test_id};
use ic_types::{
CryptoHashOfPartialState, Height, RegistryVersion,
CanisterId, CryptoHashOfPartialState, Height, RegistryVersion,
artifact::UnvalidatedArtifactMutation,
batch::RawQueryStats,
consensus::certification::{Certification, CertificationContent},
Expand Down Expand Up @@ -87,6 +87,36 @@ use tower_test::mock::Handle;
pub type IngressFilterHandle = Handle<IngressFilterInput, IngressFilterResponse>;
pub type QueryExecutionHandle = Handle<QueryExecutionInput, QueryExecutionResponse>;

/// Unified endpoint type used to parameterise tests that cover both the canister and subnet
/// synchronous call paths.
#[derive(Copy, Clone, Debug)]
pub enum UpdateEndpoint {
Canister(ic_http_endpoints_test_agent::Call),
Subnet(ic_http_endpoints_test_agent::CallSubnet),
}

impl UpdateEndpoint {
pub async fn call(
self,
addr: SocketAddr,
message: ic_http_endpoints_test_agent::IngressMessage,
) -> reqwest::Response {
match self {
UpdateEndpoint::Canister(c) => c.call(addr, message).await,
UpdateEndpoint::Subnet(s) => s.call(addr, message).await,
}
}

pub fn default_ingress_message(&self) -> ic_http_endpoints_test_agent::IngressMessage {
match self {
UpdateEndpoint::Canister(_) => ic_http_endpoints_test_agent::IngressMessage::default(),
UpdateEndpoint::Subnet(_) => ic_http_endpoints_test_agent::IngressMessage::default()
.with_canister_id(CanisterId::ic_00().get(), CanisterId::ic_00().get())
.with_method_name("create_canister".to_string()),
}
}
}

fn setup_query_execution_mock() -> (QueryExecutionService, QueryExecutionHandle) {
let (service, handle) = tower_test::mock::pair::<QueryExecutionInput, QueryExecutionResponse>();

Expand Down
28 changes: 16 additions & 12 deletions rs/http_endpoints/public/tests/load_shed_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod common;

use crate::common::{
HttpEndpointBuilder, MockIngressPoolThrottler, default_certified_state_reader,
HttpEndpointBuilder, MockIngressPoolThrottler, UpdateEndpoint, default_certified_state_reader,
default_get_latest_state, default_read_certified_state, get_free_localhost_socket_addr,
};
use async_trait::async_trait;
Expand All @@ -13,8 +13,10 @@ use ic_crypto_tree_hash::{Label, Path};
use ic_http_endpoints_public::query;
use ic_http_endpoints_public::read_state;
use ic_http_endpoints_test_agent::{
self, Call, CanisterReadState, IngressMessage, Query, wait_for_status_healthy,
self, Call, CallSubnet, CanisterReadState, IngressMessage, Query, wait_for_status_healthy,
};
use ic_test_utilities_types::ids::subnet_test_id;

use ic_interfaces_state_manager_mocks::MockStateManager;
use ic_pprof::{Error, PprofCollector};
use ic_types::PrincipalId;
Expand Down Expand Up @@ -407,10 +409,11 @@ fn test_load_shedding_update_call() {

/// Test that the call endpoints load shed requests when the ingress pool is full.
#[rstest]
#[case::v2_endpoint(Call::V2)]
#[case::v3_endpoint(Call::V3)]
#[case::v4_endpoint(Call::V4)]
fn test_load_shedding_update_call_when_ingress_pool_is_full(#[case] endpoint: Call) {
#[case::v2_endpoint(UpdateEndpoint::Canister(Call::V2))]
#[case::v3_endpoint(UpdateEndpoint::Canister(Call::V3))]
#[case::v4_endpoint(UpdateEndpoint::Canister(Call::V4))]
#[case::v4_subnet_endpoint(UpdateEndpoint::Subnet(CallSubnet::V4(subnet_test_id(1).get())))]
fn test_load_shedding_update_call_when_ingress_pool_is_full(#[case] endpoint: UpdateEndpoint) {
use std::sync::RwLock;

let rt = Runtime::new().unwrap();
Expand Down Expand Up @@ -445,10 +448,11 @@ fn test_load_shedding_update_call_when_ingress_pool_is_full(#[case] endpoint: Ca

/// Test that the call endpoints load shed requests when the ingress channel is full.
#[rstest]
#[case::v2_endpoint(Call::V2)]
#[case::v3_endpoint(Call::V3)]
#[case::v4_endpoint(Call::V4)]
fn test_load_shedding_update_call_when_ingress_channel_is_full(#[case] endpoint: Call) {
#[case::v2_endpoint(UpdateEndpoint::Canister(Call::V2))]
#[case::v3_endpoint(UpdateEndpoint::Canister(Call::V3))]
#[case::v4_endpoint(UpdateEndpoint::Canister(Call::V4))]
#[case::v4_subnet_endpoint(UpdateEndpoint::Subnet(CallSubnet::V4(subnet_test_id(1).get())))]
fn test_load_shedding_update_call_when_ingress_channel_is_full(#[case] endpoint: UpdateEndpoint) {
let rt = Runtime::new().unwrap();

let addr = get_free_localhost_socket_addr();
Expand All @@ -473,7 +477,7 @@ fn test_load_shedding_update_call_when_ingress_channel_is_full(#[case] endpoint:
rt.block_on(async move {
wait_for_status_healthy(&addr).await.unwrap();
for _ in 0..capacity {
let message = Default::default();
let message = endpoint.default_ingress_message();
let call_response = endpoint.call(addr, message).await;
assert_eq!(
call_response.status(),
Expand All @@ -482,7 +486,7 @@ fn test_load_shedding_update_call_when_ingress_channel_is_full(#[case] endpoint:
call_response.text().await.unwrap()
);
}
let message = Default::default();
let message = endpoint.default_ingress_message();
let call_response = endpoint.call(addr, message).await;
assert_eq!(
call_response.status(),
Expand Down
Loading
Loading