diff --git a/CLAUDE.md b/CLAUDE.md index e64cdb0ad9..1641289e50 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -189,6 +189,12 @@ Data structures often include: - When talking about "Rivet Actors" make sure to capitalize "Rivet Actor" as a proper noun and lowercase "actor" as a generic noun +### Documentation Sync +When making changes to the engine or RivetKit, ensure the corresponding documentation is updated: +- **Limits changes** (e.g., max message sizes, timeouts): Update `website/src/content/docs/actors/limits.mdx` +- **Config changes** (e.g., new config options in `engine/packages/config/`): Update `website/src/content/docs/self-hosting/configuration.mdx` +- **RivetKit config changes** (e.g., `rivetkit-typescript/packages/rivetkit/src/registry/config/index.ts` or `rivetkit-typescript/packages/rivetkit/src/actor/config.ts`): Update `website/src/content/docs/actors/limits.mdx` if they affect limits/timeouts + ### Comments - Write comments as normal, complete sentences. Avoid fragmented structures with parentheticals and dashes like `// Spawn engine (if configured) - regardless of start kind`. Instead, write `// Spawn the engine if configured`. Especially avoid dashes (hyphens are OK). diff --git a/engine/artifacts/errors/guard.request_body_too_large.json b/engine/artifacts/errors/guard.request_body_too_large.json new file mode 100644 index 0000000000..332a89870e --- /dev/null +++ b/engine/artifacts/errors/guard.request_body_too_large.json @@ -0,0 +1,5 @@ +{ + "code": "request_body_too_large", + "group": "guard", + "message": "Request body too large." +} \ No newline at end of file diff --git a/engine/artifacts/errors/guard.response_body_too_large.json b/engine/artifacts/errors/guard.response_body_too_large.json new file mode 100644 index 0000000000..d324889d54 --- /dev/null +++ b/engine/artifacts/errors/guard.response_body_too_large.json @@ -0,0 +1,5 @@ +{ + "code": "response_body_too_large", + "group": "guard", + "message": "Response body too large." +} \ No newline at end of file diff --git a/engine/artifacts/openapi.json b/engine/artifacts/openapi.json index 2caed8bba2..915c463da5 100644 --- a/engine/artifacts/openapi.json +++ b/engine/artifacts/openapi.json @@ -11,7 +11,7 @@ "name": "Apache-2.0", "identifier": "Apache-2.0" }, - "version": "2.0.34-rc.1" + "version": "2.0.34-rc.2" }, "paths": { "/actors": { diff --git a/engine/packages/config/src/config/guard.rs b/engine/packages/config/src/config/guard.rs index a822370087..13df499908 100644 --- a/engine/packages/config/src/config/guard.rs +++ b/engine/packages/config/src/config/guard.rs @@ -11,6 +11,18 @@ pub struct Guard { pub port: Option, /// Enable & configure HTTPS pub https: Option, + /// Route cache TTL in milliseconds. + pub route_cache_ttl_ms: Option, + /// Proxy state cache TTL in milliseconds. + pub proxy_state_cache_ttl_ms: Option, + /// Time to keep TCP connection open after WebSocket close, in milliseconds. + pub websocket_close_linger_ms: Option, + /// Max incoming WebSocket message size in bytes. + pub websocket_max_message_size: Option, + /// Max outgoing WebSocket message size in bytes. + pub websocket_max_outgoing_message_size: Option, + /// Max HTTP request body size in bytes (first line of defense). + pub http_max_request_body_size: Option, } impl Guard { @@ -21,6 +33,31 @@ impl Guard { pub fn port(&self) -> u16 { self.port.unwrap_or(crate::defaults::ports::GUARD) } + + pub fn route_cache_ttl_ms(&self) -> u64 { + self.route_cache_ttl_ms.unwrap_or(10 * 60 * 1000) // 10 minutes + } + + pub fn proxy_state_cache_ttl_ms(&self) -> u64 { + self.proxy_state_cache_ttl_ms.unwrap_or(60 * 60 * 1000) // 1 hour + } + + pub fn websocket_close_linger_ms(&self) -> u64 { + self.websocket_close_linger_ms.unwrap_or(100) + } + + pub fn websocket_max_message_size(&self) -> usize { + self.websocket_max_message_size.unwrap_or(32 * 1024 * 1024) // 32 MiB + } + + pub fn websocket_max_outgoing_message_size(&self) -> usize { + self.websocket_max_outgoing_message_size + .unwrap_or(32 * 1024 * 1024) // 32 MiB + } + + pub fn http_max_request_body_size(&self) -> usize { + self.http_max_request_body_size.unwrap_or(256 * 1024 * 1024) // 256 MiB + } } #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] diff --git a/engine/packages/config/src/config/pegboard.rs b/engine/packages/config/src/config/pegboard.rs index dc3767eecc..c6f2b61cc0 100644 --- a/engine/packages/config/src/config/pegboard.rs +++ b/engine/packages/config/src/config/pegboard.rs @@ -86,6 +86,76 @@ pub struct Pegboard { /// /// **Experimental** pub runner_pool_consecutive_successes_to_clear_error: Option, + + // === Gateway Settings === + /// WebSocket open/handshake timeout in milliseconds. + pub gateway_websocket_open_timeout_ms: Option, + /// Timeout for response to start in milliseconds. + pub gateway_response_start_timeout_ms: Option, + /// Ping interval for gateway updates in milliseconds. + pub gateway_update_ping_interval_ms: Option, + /// GC interval for in-flight requests in milliseconds. + pub gateway_gc_interval_ms: Option, + /// Tunnel ping timeout in milliseconds. + pub gateway_tunnel_ping_timeout_ms: Option, + /// Hibernating WebSocket message ack timeout in milliseconds. + pub gateway_hws_message_ack_timeout_ms: Option, + /// Max pending message buffer size for hibernating WebSockets in bytes. + pub gateway_hws_max_pending_size: Option, + /// Max HTTP request body size in bytes for requests to actors. + /// + /// Note: guard-core also enforces a larger limit (default 256 MiB) as a first line of defense. + /// See `Guard::http_max_request_body_size`. + pub gateway_http_max_request_body_size: Option, + /// Rate limit: number of requests allowed per period. + pub gateway_rate_limit_requests: Option, + /// Rate limit: period in seconds. + pub gateway_rate_limit_period_secs: Option, + /// Maximum concurrent in-flight requests per actor per IP. + pub gateway_max_in_flight: Option, + /// HTTP request timeout in seconds for actor traffic. + /// + /// This is the outer timeout for the entire request lifecycle. + /// Should be slightly longer than `gateway_response_start_timeout_ms` to provide a grace period. + pub gateway_actor_request_timeout_secs: Option, + /// HTTP request timeout in seconds for API traffic (api-public). + pub gateway_api_request_timeout_secs: Option, + /// Maximum retry attempts for failed requests. + pub gateway_retry_max_attempts: Option, + /// Initial retry interval in milliseconds (doubles with each attempt). + pub gateway_retry_initial_interval_ms: Option, + /// WebSocket proxy task timeout in seconds. + pub gateway_ws_proxy_timeout_secs: Option, + /// WebSocket connection attempt timeout in seconds. + pub gateway_ws_connect_timeout_secs: Option, + /// WebSocket send message timeout in seconds. + pub gateway_ws_send_timeout_secs: Option, + /// WebSocket flush timeout in seconds. + pub gateway_ws_flush_timeout_secs: Option, + + // === API Settings === + /// Rate limit for API traffic: number of requests allowed per period. + pub api_rate_limit_requests: Option, + /// Rate limit for API traffic: period in seconds. + pub api_rate_limit_period_secs: Option, + /// Maximum concurrent in-flight requests for API traffic. + pub api_max_in_flight: Option, + /// Maximum retry attempts for API traffic. + pub api_retry_max_attempts: Option, + /// Initial retry interval for API traffic in milliseconds. + pub api_retry_initial_interval_ms: Option, + /// Max HTTP request body size in bytes for API traffic. + pub api_max_http_request_body_size: Option, + + // === Runner Settings === + /// Max HTTP response body size in bytes from actors. + pub runner_http_max_response_body_size: Option, + /// Ping interval for runner updates in milliseconds. + pub runner_update_ping_interval_ms: Option, + /// GC interval for actor event demuxer in milliseconds. + pub runner_event_demuxer_gc_interval_ms: Option, + /// Max time since last seen before actor is considered stale, in milliseconds. + pub runner_event_demuxer_max_last_seen_ms: Option, } impl Pegboard { @@ -139,4 +209,131 @@ impl Pegboard { self.runner_pool_consecutive_successes_to_clear_error .unwrap_or(3) } + + // === Gateway Settings === + + pub fn gateway_websocket_open_timeout_ms(&self) -> u64 { + self.gateway_websocket_open_timeout_ms.unwrap_or(15_000) + } + + pub fn gateway_response_start_timeout_ms(&self) -> u64 { + self.gateway_response_start_timeout_ms + .unwrap_or(5 * 60 * 1000) // 5 minutes + } + + pub fn gateway_update_ping_interval_ms(&self) -> u64 { + self.gateway_update_ping_interval_ms.unwrap_or(3_000) + } + + pub fn gateway_gc_interval_ms(&self) -> u64 { + self.gateway_gc_interval_ms.unwrap_or(15_000) + } + + pub fn gateway_tunnel_ping_timeout_ms(&self) -> i64 { + self.gateway_tunnel_ping_timeout_ms.unwrap_or(30_000) + } + + pub fn gateway_hws_message_ack_timeout_ms(&self) -> u64 { + self.gateway_hws_message_ack_timeout_ms.unwrap_or(30_000) + } + + pub fn gateway_hws_max_pending_size(&self) -> u64 { + self.gateway_hws_max_pending_size + .unwrap_or(128 * 1024 * 1024) // 128 MiB + } + + pub fn gateway_http_max_request_body_size(&self) -> usize { + self.gateway_http_max_request_body_size + .unwrap_or(128 * 1024 * 1024) // 128 MiB + } + + pub fn gateway_rate_limit_requests(&self) -> u64 { + self.gateway_rate_limit_requests.unwrap_or(1200) + } + + pub fn gateway_rate_limit_period_secs(&self) -> u64 { + self.gateway_rate_limit_period_secs.unwrap_or(60) + } + + pub fn gateway_max_in_flight(&self) -> usize { + self.gateway_max_in_flight.unwrap_or(32) + } + + pub fn gateway_actor_request_timeout_secs(&self) -> u64 { + self.gateway_actor_request_timeout_secs.unwrap_or(6 * 60) // 6 minutes + } + + pub fn gateway_api_request_timeout_secs(&self) -> u64 { + self.gateway_api_request_timeout_secs.unwrap_or(60) // 1 minute + } + + pub fn gateway_retry_max_attempts(&self) -> u32 { + self.gateway_retry_max_attempts.unwrap_or(7) + } + + pub fn gateway_retry_initial_interval_ms(&self) -> u64 { + self.gateway_retry_initial_interval_ms.unwrap_or(150) + } + + pub fn gateway_ws_proxy_timeout_secs(&self) -> u64 { + self.gateway_ws_proxy_timeout_secs.unwrap_or(30) + } + + pub fn gateway_ws_connect_timeout_secs(&self) -> u64 { + self.gateway_ws_connect_timeout_secs.unwrap_or(5) + } + + pub fn gateway_ws_send_timeout_secs(&self) -> u64 { + self.gateway_ws_send_timeout_secs.unwrap_or(5) + } + + pub fn gateway_ws_flush_timeout_secs(&self) -> u64 { + self.gateway_ws_flush_timeout_secs.unwrap_or(2) + } + + // === API Settings === + + pub fn api_rate_limit_requests(&self) -> u64 { + self.api_rate_limit_requests.unwrap_or(1200) + } + + pub fn api_rate_limit_period_secs(&self) -> u64 { + self.api_rate_limit_period_secs.unwrap_or(60) + } + + pub fn api_max_in_flight(&self) -> usize { + self.api_max_in_flight.unwrap_or(32) + } + + pub fn api_retry_max_attempts(&self) -> u32 { + self.api_retry_max_attempts.unwrap_or(3) + } + + pub fn api_retry_initial_interval_ms(&self) -> u64 { + self.api_retry_initial_interval_ms.unwrap_or(100) + } + + pub fn api_max_http_request_body_size(&self) -> usize { + self.api_max_http_request_body_size + .unwrap_or(256 * 1024 * 1024) // 256 MiB + } + + // === Runner Settings === + + pub fn runner_http_max_response_body_size(&self) -> usize { + self.runner_http_max_response_body_size + .unwrap_or(128 * 1024 * 1024) // 128 MiB + } + + pub fn runner_update_ping_interval_ms(&self) -> u64 { + self.runner_update_ping_interval_ms.unwrap_or(3_000) + } + + pub fn runner_event_demuxer_gc_interval_ms(&self) -> u64 { + self.runner_event_demuxer_gc_interval_ms.unwrap_or(30_000) + } + + pub fn runner_event_demuxer_max_last_seen_ms(&self) -> u64 { + self.runner_event_demuxer_max_last_seen_ms.unwrap_or(30_000) + } } diff --git a/engine/packages/guard-core/src/errors.rs b/engine/packages/guard-core/src/errors.rs index ebfa809da5..dbab1db1b0 100644 --- a/engine/packages/guard-core/src/errors.rs +++ b/engine/packages/guard-core/src/errors.rs @@ -111,3 +111,15 @@ pub struct WebSocketServiceTimeout; "WebSocket target changed, retry not possible." )] pub struct WebSocketTargetChanged; + +#[derive(RivetError, Serialize, Deserialize)] +#[error( + "guard", + "request_body_too_large", + "Request body too large.", + "Request body size {size} bytes exceeds maximum allowed {max_size} bytes." +)] +pub struct RequestBodyTooLarge { + pub size: usize, + pub max_size: usize, +} diff --git a/engine/packages/guard-core/src/proxy_service.rs b/engine/packages/guard-core/src/proxy_service.rs index b6eb2213a5..7d4d5dcd4d 100644 --- a/engine/packages/guard-core/src/proxy_service.rs +++ b/engine/packages/guard-core/src/proxy_service.rs @@ -1,7 +1,7 @@ use anyhow::{Context, Result, bail, ensure}; use bytes::Bytes; use futures_util::{SinkExt, StreamExt}; -use http_body_util::{BodyExt, Full}; +use http_body_util::Full; use hyper::{Request, Response, StatusCode, body::Incoming as BodyIncoming, header::HeaderName}; use hyper_tungstenite; use hyper_util::{client::legacy::Client, rt::TokioExecutor}; @@ -23,7 +23,7 @@ use tokio::sync::Mutex; use tokio::time::timeout; use tokio_tungstenite::tungstenite::{ client::IntoClientRequest, - protocol::{CloseFrame, frame::coding::CloseCode}, + protocol::{CloseFrame, WebSocketConfig, frame::coding::CloseCode}, }; use tracing::Instrument; use url::Url; @@ -42,10 +42,6 @@ const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token"); pub const X_FORWARDED_FOR: HeaderName = HeaderName::from_static("x-forwarded-for"); pub const X_RIVET_ERROR: HeaderName = HeaderName::from_static("x-rivet-error"); -const ROUTE_CACHE_TTL: Duration = Duration::from_secs(60 * 10); // 10 minutes -const PROXY_STATE_CACHE_TTL: Duration = Duration::from_secs(60 * 60); // 1 hour -const WEBSOCKET_CLOSE_LINGER: Duration = Duration::from_millis(100); // Keep TCP connection open briefly after WebSocket close - /// Response body type that can handle both streaming and buffered responses #[derive(Debug)] pub enum ResponseBody { @@ -178,6 +174,12 @@ pub struct MiddlewareConfig { pub max_in_flight: MaxInFlightConfig, pub retry: RetryConfig, pub timeout: TimeoutConfig, + /// Max incoming WebSocket message size in bytes. + pub max_incoming_ws_message_size: usize, + /// Max outgoing WebSocket message size in bytes. + pub max_outgoing_ws_message_size: usize, + /// Max HTTP request body size in bytes. + pub max_http_request_body_size: usize, } #[derive(Clone, Debug)] @@ -210,7 +212,7 @@ pub enum MiddlewareResponse { pub type MiddlewareFn = Arc< dyn for<'a> Fn( - &'a Id, + &'a Option, &'a hyper::HeaderMap, ) -> futures::future::BoxFuture<'a, Result> + Send @@ -223,11 +225,11 @@ struct RouteCache { } impl RouteCache { - fn new() -> Self { + fn new(ttl: Duration) -> Self { Self { cache: Cache::builder() .max_capacity(10_000) - .time_to_live(ROUTE_CACHE_TTL) + .time_to_live(ttl) .build(), } } @@ -321,6 +323,11 @@ pub struct ProxyState { port_type: PortType, clickhouse_inserter: Option, tasks: Arc, + // Cached config values + websocket_close_linger: Duration, + /// Fallback WebSocket max message size (used for error proxy path). + /// Per-route values come from MiddlewareConfig. + websocket_max_message_size: usize, } impl ProxyState { @@ -332,27 +339,41 @@ impl ProxyState { port_type: PortType, clickhouse_inserter: Option, ) -> Self { + let guard_config = config.guard(); + let route_cache_ttl = Duration::from_millis(guard_config.route_cache_ttl_ms()); + let proxy_state_cache_ttl = Duration::from_millis(guard_config.proxy_state_cache_ttl_ms()); + let websocket_close_linger = + Duration::from_millis(guard_config.websocket_close_linger_ms()); + let websocket_max_message_size = guard_config.websocket_max_message_size(); + Self { config, routing_fn, cache_key_fn, middleware_fn, - route_cache: RouteCache::new(), + route_cache: RouteCache::new(route_cache_ttl), rate_limiters: Cache::builder() .max_capacity(10_000) - .time_to_live(PROXY_STATE_CACHE_TTL) + .time_to_live(proxy_state_cache_ttl) .build(), in_flight_counters: Cache::builder() .max_capacity(10_000) - .time_to_live(PROXY_STATE_CACHE_TTL) + .time_to_live(proxy_state_cache_ttl) .build(), in_flight_requests: Cache::builder().max_capacity(10_000_000).build(), port_type, clickhouse_inserter, tasks: TaskGroup::new(), + websocket_close_linger, + websocket_max_message_size, } } + /// Creates a WebSocket config for client connections. + fn client_websocket_config(max_message_size: usize) -> WebSocketConfig { + WebSocketConfig::default().max_message_size(Some(max_message_size)) + } + #[tracing::instrument(skip(self))] async fn resolve_route( &self, @@ -460,7 +481,7 @@ impl ProxyState { #[tracing::instrument(skip_all)] async fn get_middleware_config( &self, - actor_id: &Id, + actor_id: &Option, headers: &hyper::HeaderMap, ) -> Result { // Call the middleware function with a timeout @@ -473,43 +494,64 @@ impl ProxyState { Ok(result) => match result? { MiddlewareResponse::Ok(config) => Ok(config), MiddlewareResponse::NotFound => { - // Default values if middleware not found for this actor - Ok(MiddlewareConfig { - rate_limit: RateLimitConfig { - requests: 100, // 100 requests - period: 60, // per 60 seconds - }, - max_in_flight: MaxInFlightConfig { - amount: 20, // 20 concurrent requests - }, - retry: RetryConfig { - max_attempts: 3, // 3 retry attempts - initial_interval: 100, // 100ms initial interval - }, - timeout: TimeoutConfig { - request_timeout: 30, // 30 seconds for requests - }, - }) + // Fallback to config-based defaults if middleware not found + Ok(self.default_middleware_config(actor_id.is_some())) } }, Err(_) => { - // Default values if middleware times out - Ok(MiddlewareConfig { - rate_limit: RateLimitConfig { - requests: 100, // 100 requests - period: 60, // per 60 seconds - }, - max_in_flight: MaxInFlightConfig { - amount: 20, // 20 concurrent requests - }, - retry: RetryConfig { - max_attempts: 3, // 3 retry attempts - initial_interval: 100, // 100ms initial interval - }, - timeout: TimeoutConfig { - request_timeout: 30, // 30 seconds for requests - }, - }) + // Fallback to config-based defaults if middleware times out + tracing::warn!("middleware timed out, using fallback config"); + Ok(self.default_middleware_config(actor_id.is_some())) + } + } + } + + /// Returns default middleware config based on whether this is actor traffic or API traffic. + fn default_middleware_config(&self, is_actor_traffic: bool) -> MiddlewareConfig { + let guard = self.config.guard(); + let pegboard = self.config.pegboard(); + + if is_actor_traffic { + // Actor traffic uses gateway_* settings + MiddlewareConfig { + rate_limit: RateLimitConfig { + requests: pegboard.gateway_rate_limit_requests(), + period: pegboard.gateway_rate_limit_period_secs(), + }, + max_in_flight: MaxInFlightConfig { + amount: pegboard.gateway_max_in_flight(), + }, + retry: RetryConfig { + max_attempts: pegboard.gateway_retry_max_attempts(), + initial_interval: pegboard.gateway_retry_initial_interval_ms(), + }, + timeout: TimeoutConfig { + request_timeout: pegboard.gateway_actor_request_timeout_secs(), + }, + max_incoming_ws_message_size: guard.websocket_max_message_size(), + max_outgoing_ws_message_size: guard.websocket_max_outgoing_message_size(), + max_http_request_body_size: pegboard.gateway_http_max_request_body_size(), + } + } else { + // API traffic uses api_* settings + MiddlewareConfig { + rate_limit: RateLimitConfig { + requests: pegboard.api_rate_limit_requests(), + period: pegboard.api_rate_limit_period_secs(), + }, + max_in_flight: MaxInFlightConfig { + amount: pegboard.api_max_in_flight(), + }, + retry: RetryConfig { + max_attempts: pegboard.api_retry_max_attempts(), + initial_interval: pegboard.api_retry_initial_interval_ms(), + }, + timeout: TimeoutConfig { + request_timeout: pegboard.gateway_api_request_timeout_secs(), + }, + max_incoming_ws_message_size: guard.websocket_max_message_size(), + max_outgoing_ws_message_size: guard.websocket_max_outgoing_message_size(), + max_http_request_body_size: pegboard.api_max_http_request_body_size(), } } } @@ -521,15 +563,15 @@ impl ProxyState { actor_id: &Option, headers: &hyper::HeaderMap, ) -> Result { - let Some(actor_id) = *actor_id else { + let Some(inner_actor_id) = *actor_id else { // No rate limiting when actor_id is None return Ok(true); }; // Get actor-specific middleware config - let middleware_config = self.get_middleware_config(&actor_id, headers).await?; + let middleware_config = self.get_middleware_config(actor_id, headers).await?; - let cache_key = (actor_id, ip_addr); + let cache_key = (inner_actor_id, ip_addr); // Get existing limiter or create a new one let limiter_arc = if let Some(existing_limiter) = self.rate_limiters.get(&cache_key).await { @@ -563,11 +605,11 @@ impl ProxyState { headers: &hyper::HeaderMap, ) -> Result> { // Check in-flight limit if actor_id is present - if let Some(actor_id) = *actor_id { + if let Some(inner_actor_id) = *actor_id { // Get actor-specific middleware config - let middleware_config = self.get_middleware_config(&actor_id, headers).await?; + let middleware_config = self.get_middleware_config(actor_id, headers).await?; - let cache_key = (actor_id, ip_addr); + let cache_key = (inner_actor_id, ip_addr); // Get existing counter or create a new one let counter_arc = if let Some(existing_counter) = @@ -842,32 +884,11 @@ impl ProxyService { client_ip: std::net::IpAddr, actor_id: Option, ) -> Result> { - // Get middleware config for this actor if it exists - let middleware_config = if let ResolveRouteOutput::Target(target) = &resolved_route - && let Some(actor_id) = &target.actor_id - { - self.state - .get_middleware_config(actor_id, req.headers()) - .await? - } else { - // Default middleware config for targets without actor_id - MiddlewareConfig { - rate_limit: RateLimitConfig { - requests: 100, // 100 requests - period: 60, // per 60 seconds - }, - max_in_flight: MaxInFlightConfig { - amount: 20, // 20 concurrent requests - }, - retry: RetryConfig { - max_attempts: 3, // 3 retry attempts - initial_interval: 100, // 100ms initial interval - }, - timeout: TimeoutConfig { - request_timeout: 30, // 30 seconds for requests - }, - } - }; + // Get middleware config (works for both actor and non-actor traffic) + let middleware_config = self + .state + .get_middleware_config(&actor_id, req.headers()) + .await?; let host = req .headers() @@ -906,6 +927,15 @@ impl ProxyService { } }; + // Check request body size limit (per-route configurable) + if req_body.len() > middleware_config.max_http_request_body_size { + return Err(errors::RequestBodyTooLarge { + size: req_body.len(), + max_size: middleware_config.max_http_request_body_size, + } + .build()); + } + // Set actual request body size in analytics request_context.client_request_body_bytes = Some(req_body.len() as u64); @@ -970,36 +1000,13 @@ impl ProxyService { let (parts, body) = resp.into_parts(); - // Check if this is a streaming response by examining headers - // let is_streaming = parts.headers.get("content-type") - // .and_then(|ct| ct.to_str().ok()) - // .map(|ct| ct.contains("text/event-stream") || ct.contains("application/stream")) - // .unwrap_or(false); - let is_streaming = true; + // Stream response through without buffering + // Response body size limits are enforced in pegboard-runner + tracing::debug!("Streaming response through"); + request_context.guard_response_body_bytes = None; - if is_streaming { - // For streaming responses, pass through the body without buffering - tracing::debug!("Detected streaming response, preserving stream"); - - // We can't easily calculate response size for streaming, so set it to None - request_context.guard_response_body_bytes = None; - - let streaming_body = ResponseBody::Incoming(body); - return Ok(Response::from_parts(parts, streaming_body)); - } else { - // For non-streaming responses, buffer as before - let body_bytes = match BodyExt::collect(body).await { - Ok(collected) => collected.to_bytes(), - Err(_) => Bytes::new(), - }; - - // Set actual response body size in analytics - request_context.guard_response_body_bytes = - Some(body_bytes.len() as u64); - - let full_body = ResponseBody::Full(Full::new(body_bytes)); - return Ok(Response::from_parts(parts, full_body)); - } + let streaming_body = ResponseBody::Incoming(body); + return Ok(Response::from_parts(parts, streaming_body)); } Err(err) => { if !err.is_connect() || attempts >= max_attempts { @@ -1083,6 +1090,16 @@ impl ProxyService { Bytes::new() } }; + + // Check request body size limit (per-route configurable) + if collected_body.len() > middleware_config.max_http_request_body_size { + return Err(errors::RequestBodyTooLarge { + size: collected_body.len(), + max_size: middleware_config.max_http_request_body_size, + } + .build()); + } + let req_collected = hyper::Request::from_parts( req_parts, Full::::new(collected_body.clone()), @@ -1215,34 +1232,11 @@ impl ProxyService { let req_method = req.method().clone(); let ray_id = req.extensions().get::().map(|x| x.ray_id); - // Get middleware config for this actor if it exists - let middleware_config = match &actor_id { - Some(actor_id) => { - self.state - .get_middleware_config(actor_id, &req_headers) - .await? - } - None => { - // Default middleware config for targets without actor_id - tracing::debug!("Using default middleware config (no actor_id)"); - MiddlewareConfig { - rate_limit: RateLimitConfig { - requests: 100, // 100 requests - period: 60, // per 60 seconds - }, - max_in_flight: MaxInFlightConfig { - amount: 20, // 20 concurrent requests - }, - retry: RetryConfig { - max_attempts: 3, // 3 retry attempts - initial_interval: 100, // 100ms initial interval - }, - timeout: TimeoutConfig { - request_timeout: 30, // 30 seconds for requests - }, - } - } - }; + // Get middleware config (works for both actor and non-actor traffic) + let middleware_config = self + .state + .get_middleware_config(&actor_id, &req_headers) + .await?; // Set up retry with backoff from middleware config let max_attempts = middleware_config.retry.max_attempts; @@ -1258,20 +1252,23 @@ impl ProxyService { // Handle WebSocket upgrade properly with hyper_tungstenite tracing::debug!(%req_path, "Upgrading client connection to WebSocket"); - let (client_response, client_ws) = match hyper_tungstenite::upgrade(req, None) { - Ok(x) => { - tracing::debug!("Client WebSocket upgrade successful"); - x - } - Err(err) => { - tracing::error!(?err, "Failed to upgrade client WebSocket"); - return Err(errors::ConnectionError { - error_message: format!("Failed to upgrade client WebSocket: {}", err), - remote_addr: self.remote_addr.to_string(), + let client_ws_config = + ProxyState::client_websocket_config(middleware_config.max_incoming_ws_message_size); + let (client_response, client_ws) = + match hyper_tungstenite::upgrade(req, Some(client_ws_config)) { + Ok(x) => { + tracing::debug!("Client WebSocket upgrade successful"); + x } - .build()); - } - }; + Err(err) => { + tracing::error!(?err, "Failed to upgrade client WebSocket"); + return Err(errors::ConnectionError { + error_message: format!("Failed to upgrade client WebSocket: {}", err), + remote_addr: self.remote_addr.to_string(), + } + .build()); + } + }; // Log response status and headers tracing::debug!( @@ -1287,6 +1284,14 @@ impl ProxyService { // Clone needed values for the spawned task let state = self.state.clone(); let remote_addr = self.remote_addr; + let max_outgoing_ws_message_size = middleware_config.max_outgoing_ws_message_size; + + // Extract WebSocket timeout config values from pegboard config + let pegboard = self.state.config.pegboard(); + let ws_proxy_timeout_secs = pegboard.gateway_ws_proxy_timeout_secs(); + let ws_connect_timeout_secs = pegboard.gateway_ws_connect_timeout_secs(); + let ws_send_timeout_secs = pegboard.gateway_ws_send_timeout_secs(); + let ws_flush_timeout_secs = pegboard.gateway_ws_flush_timeout_secs(); // Spawn a new task to handle the WebSocket bidirectional communication match target { @@ -1295,7 +1300,7 @@ impl ProxyService { self.state.tasks.spawn( async move { // Set up a timeout for the entire operation - let timeout_duration = Duration::from_secs(30); // 30 seconds timeout + let timeout_duration = Duration::from_secs(ws_proxy_timeout_secs); tracing::debug!( "WebSocket proxy task started with {}s timeout", timeout_duration.as_secs() @@ -1399,9 +1404,15 @@ impl ProxyService { return; } + let upstream_ws_config = WebSocketConfig::default() + .max_message_size(Some(max_outgoing_ws_message_size)); match tokio::time::timeout( - Duration::from_secs(5), // 5 second timeout per connection attempt - tokio_tungstenite::connect_async(ws_request), + Duration::from_secs(ws_connect_timeout_secs), + tokio_tungstenite::connect_async_with_config( + ws_request, + Some(upstream_ws_config), + false, + ), ) .await { @@ -1437,8 +1448,9 @@ impl ProxyService { } Err(_) => { tracing::debug!( - "WebSocket request attempt {} timed out after 5s", - attempts + "WebSocket request attempt {} timed out after {}s", + attempts, + ws_connect_timeout_secs ); } } @@ -1615,7 +1627,7 @@ impl ProxyService { // Send the message with a timeout tracing::trace!("Sending message to upstream server"); let send_result = tokio::time::timeout( - Duration::from_secs(5), + Duration::from_secs(ws_send_timeout_secs), sink.send(upstream_msg) ).await; @@ -1625,7 +1637,7 @@ impl ProxyService { // Flush the sink with a timeout tracing::trace!("Flushing upstream sink"); let flush_result = tokio::time::timeout( - Duration::from_secs(2), + Duration::from_secs(ws_flush_timeout_secs), sink.flush() ).await; @@ -1647,7 +1659,7 @@ impl ProxyService { break; }, Err(_) => { - tracing::trace!("Timeout sending message to upstream after 5s"); + tracing::trace!("Timeout sending message to upstream after {}s", ws_send_timeout_secs); let _ = shutdown_tx.send(true); break; } @@ -1766,7 +1778,7 @@ impl ProxyService { // Send the message with a timeout tracing::trace!("Sending message to client"); let send_result = tokio::time::timeout( - Duration::from_secs(5), + Duration::from_secs(ws_send_timeout_secs), sink.send(client_msg) ).await; @@ -1776,7 +1788,7 @@ impl ProxyService { // Flush the sink with a timeout tracing::trace!("Flushing client sink"); let flush_result = tokio::time::timeout( - Duration::from_secs(2), + Duration::from_secs(ws_flush_timeout_secs), sink.flush() ).await; @@ -1798,7 +1810,7 @@ impl ProxyService { break; }, Err(_) => { - tracing::trace!("Timeout sending message to client after 5s"); + tracing::trace!("Timeout sending message to client after {}s", ws_send_timeout_secs); let _ = shutdown_tx.send(true); break; } @@ -1934,7 +1946,7 @@ impl ProxyService { } // Keep TCP connection open briefly to allow client to process close - tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; + tokio::time::sleep(state.websocket_close_linger).await; break; } @@ -2002,7 +2014,7 @@ impl ProxyService { ws_handle.flush().await?; // Keep TCP connection open briefly to allow client to process close - tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; + tokio::time::sleep(state.websocket_close_linger).await; break; } else { @@ -2052,7 +2064,7 @@ impl ProxyService { ws_handle.flush().await?; // Keep TCP connection open briefly to allow client to process close - tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; + tokio::time::sleep(state.websocket_close_linger).await; break; } @@ -2071,7 +2083,7 @@ impl ProxyService { ws_handle.flush().await?; // Keep TCP connection open briefly to allow client to process close - tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; + tokio::time::sleep(state.websocket_close_linger).await; break; } @@ -2240,10 +2252,13 @@ impl ProxyService { // HTTP errors in a meaningful way resulting in unhelpful errors for the user if is_websocket { tracing::debug!("Upgrading client connection to WebSocket for error proxy"); - match hyper_tungstenite::upgrade(mock_req, None) { + let error_ws_config = + ProxyState::client_websocket_config(self.state.websocket_max_message_size); + match hyper_tungstenite::upgrade(mock_req, Some(error_ws_config)) { Ok((client_response, client_ws)) => { tracing::debug!("Client WebSocket upgrade for error proxy successful"); + let state = self.state.clone(); self.state.tasks.spawn( async move { let ws_handle = match WebSocketHandle::new(client_ws).await { @@ -2286,7 +2301,7 @@ impl ProxyService { } // Keep TCP connection open briefly to allow client to process close - tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; + tokio::time::sleep(state.websocket_close_linger).await; } .instrument(tracing::info_span!("ws_error_proxy_task")), ); @@ -2532,6 +2547,7 @@ fn err_into_response(err: anyhow::Error) -> Result> { ("guard", "service_unavailable") => StatusCode::SERVICE_UNAVAILABLE, ("guard", "actor_ready_timeout") => StatusCode::SERVICE_UNAVAILABLE, ("guard", "no_route") => StatusCode::NOT_FOUND, + ("guard", "request_body_too_large") => StatusCode::PAYLOAD_TOO_LARGE, _ => StatusCode::BAD_REQUEST, }; diff --git a/engine/packages/guard/src/middleware.rs b/engine/packages/guard/src/middleware.rs index 11564419d0..67ab00a8a4 100644 --- a/engine/packages/guard/src/middleware.rs +++ b/engine/packages/guard/src/middleware.rs @@ -12,31 +12,59 @@ use rivet_guard_core::{ /// Creates a middleware function that can use config and pools pub fn create_middleware_function(ctx: StandaloneCtx) -> MiddlewareFn { - Arc::new(move |_actor_id: &Id, _headers: &hyper::HeaderMap| { - let _ctx = ctx.clone(); + Arc::new(move |actor_id: &Option, _headers: &hyper::HeaderMap| { + let ctx = ctx.clone(); + let is_actor_traffic = actor_id.is_some(); Box::pin(async move { - // In a real implementation, you would look up actor-specific middleware settings - // For now, we'll just return a standard configuration + let guard = ctx.config().guard(); + let pegboard = ctx.config().pegboard(); - // Create middleware config based on the actor ID - // This could be fetched from a database in a real implementation - Ok(MiddlewareResponse::Ok(MiddlewareConfig { - rate_limit: RateLimitConfig { - requests: 100, // 100 requests - period: 60, // per 60 seconds - }, - max_in_flight: MaxInFlightConfig { - amount: 20, // 20 concurrent requests - }, - retry: RetryConfig { - max_attempts: 7, - initial_interval: 150, - }, - timeout: TimeoutConfig { - request_timeout: 30, // 30 seconds for requests - }, - })) + let config = if is_actor_traffic { + // Actor traffic uses gateway_* settings + MiddlewareConfig { + rate_limit: RateLimitConfig { + requests: pegboard.gateway_rate_limit_requests(), + period: pegboard.gateway_rate_limit_period_secs(), + }, + max_in_flight: MaxInFlightConfig { + amount: pegboard.gateway_max_in_flight(), + }, + retry: RetryConfig { + max_attempts: pegboard.gateway_retry_max_attempts(), + initial_interval: pegboard.gateway_retry_initial_interval_ms(), + }, + timeout: TimeoutConfig { + request_timeout: pegboard.gateway_actor_request_timeout_secs(), + }, + max_incoming_ws_message_size: guard.websocket_max_message_size(), + max_outgoing_ws_message_size: guard.websocket_max_outgoing_message_size(), + max_http_request_body_size: pegboard.gateway_http_max_request_body_size(), + } + } else { + // API traffic uses api_* settings + MiddlewareConfig { + rate_limit: RateLimitConfig { + requests: pegboard.api_rate_limit_requests(), + period: pegboard.api_rate_limit_period_secs(), + }, + max_in_flight: MaxInFlightConfig { + amount: pegboard.api_max_in_flight(), + }, + retry: RetryConfig { + max_attempts: pegboard.api_retry_max_attempts(), + initial_interval: pegboard.api_retry_initial_interval_ms(), + }, + timeout: TimeoutConfig { + request_timeout: pegboard.gateway_api_request_timeout_secs(), + }, + max_incoming_ws_message_size: guard.websocket_max_message_size(), + max_outgoing_ws_message_size: guard.websocket_max_outgoing_message_size(), + max_http_request_body_size: pegboard.api_max_http_request_body_size(), + } + }; + + Ok(MiddlewareResponse::Ok(config)) }) }) } diff --git a/engine/packages/pegboard-gateway/src/lib.rs b/engine/packages/pegboard-gateway/src/lib.rs index d0a5a14aa0..0712281821 100644 --- a/engine/packages/pegboard-gateway/src/lib.rs +++ b/engine/packages/pegboard-gateway/src/lib.rs @@ -33,10 +33,6 @@ pub mod shared_state; mod tunnel_to_ws_task; mod ws_to_tunnel_task; -const WEBSOCKET_OPEN_TIMEOUT: Duration = Duration::from_secs(15); -const RESPONSE_START_TIMEOUT: Duration = Duration::from_secs(15); -const UPDATE_PING_INTERVAL: Duration = Duration::from_secs(3); - #[derive(RivetError, Serialize, Deserialize)] #[error( "guard", @@ -45,6 +41,18 @@ const UPDATE_PING_INTERVAL: Duration = Duration::from_secs(3); )] pub struct WebsocketPendingLimitReached; +#[derive(RivetError, Serialize, Deserialize)] +#[error( + "guard", + "request_body_too_large", + "Request body too large.", + "Request body size {size} bytes exceeds maximum allowed {max_size} bytes." +)] +pub struct RequestBodyTooLarge { + pub size: usize, + pub max_size: usize, +} + #[derive(Debug)] enum LifecycleResult { ServerClose(protocol::mk2::ToServerWebSocketClose), @@ -154,6 +162,20 @@ impl CustomServeTrait for PegboardGateway { .context("failed to read body")? .to_bytes(); + // Check request body size limit for requests to actors + let max_request_body_size = self + .ctx + .config() + .pegboard() + .gateway_http_max_request_body_size(); + if body_bytes.len() > max_request_body_size { + return Err(RequestBodyTooLarge { + size: body_bytes.len(), + max_size: max_request_body_size, + } + .build()); + } + let udb = self.ctx.udb()?; let runner_id = self.runner_id; let (mut stopped_sub, runner_protocol_version) = tokio::try_join!( @@ -247,7 +269,13 @@ impl CustomServeTrait for PegboardGateway { Err(ServiceUnavailable.build()) }; - let response_start = tokio::time::timeout(RESPONSE_START_TIMEOUT, fut) + let response_start_timeout = Duration::from_millis( + self.ctx + .config() + .pegboard() + .gateway_response_start_timeout_ms(), + ); + let response_start = tokio::time::timeout(response_start_timeout, fut) .await .map_err(|_| { tracing::warn!("timed out waiting for response start from runner"); @@ -401,7 +429,13 @@ impl CustomServeTrait for PegboardGateway { Err(WebSocketServiceUnavailable.build()) }; - let open_msg = tokio::time::timeout(WEBSOCKET_OPEN_TIMEOUT, fut) + let websocket_open_timeout = Duration::from_millis( + self.ctx + .config() + .pegboard() + .gateway_websocket_open_timeout_ms(), + ); + let open_msg = tokio::time::timeout(websocket_open_timeout, fut) .await .map_err(|_| { tracing::warn!("timed out waiting for websocket open from runner"); @@ -444,10 +478,17 @@ impl CustomServeTrait for PegboardGateway { ws_rx, ws_to_tunnel_abort_rx, )); + let update_ping_interval = Duration::from_millis( + self.ctx + .config() + .pegboard() + .gateway_update_ping_interval_ms(), + ); let ping = tokio::spawn(ping_task::task( self.shared_state.clone(), request_id, ping_abort_rx, + update_ping_interval, )); let keepalive = if can_hibernate { Some(tokio::spawn(keepalive_task::task( diff --git a/engine/packages/pegboard-gateway/src/ping_task.rs b/engine/packages/pegboard-gateway/src/ping_task.rs index 01cf19618a..b35e1106a4 100644 --- a/engine/packages/pegboard-gateway/src/ping_task.rs +++ b/engine/packages/pegboard-gateway/src/ping_task.rs @@ -1,18 +1,20 @@ use anyhow::Result; use rivet_runner_protocol as protocol; +use std::time::Duration; use tokio::sync::watch; -use super::{LifecycleResult, UPDATE_PING_INTERVAL}; +use super::LifecycleResult; use crate::shared_state::SharedState; pub async fn task( shared_state: SharedState, request_id: protocol::RequestId, mut ping_abort_rx: watch::Receiver<()>, + update_ping_interval: Duration, ) -> Result { loop { tokio::select! { - _ = tokio::time::sleep(UPDATE_PING_INTERVAL) => {} + _ = tokio::time::sleep(update_ping_interval) => {} _ = ping_abort_rx.changed() => { return Ok(LifecycleResult::Aborted); } diff --git a/engine/packages/pegboard-gateway/src/shared_state.rs b/engine/packages/pegboard-gateway/src/shared_state.rs index 388c27e36a..93a717a813 100644 --- a/engine/packages/pegboard-gateway/src/shared_state.rs +++ b/engine/packages/pegboard-gateway/src/shared_state.rs @@ -16,11 +16,6 @@ use vbare::OwnedVersionedData; use crate::{WebsocketPendingLimitReached, metrics}; -const GC_INTERVAL: Duration = Duration::from_secs(15); -const TUNNEL_PING_TIMEOUT: i64 = util::duration::seconds(30); -const HWS_MESSAGE_ACK_TIMEOUT: Duration = Duration::from_secs(30); -const HWS_MAX_PENDING_MSGS_SIZE_PER_REQ: u64 = util::size::mebibytes(1); - pub struct InFlightRequestHandle { pub msg_rx: mpsc::Receiver, /// Used to check if the request handler has been dropped. @@ -67,6 +62,11 @@ pub struct SharedStateInner { receiver_subject: String, in_flight_requests: HashMap, hibernation_timeout: i64, + // Config values + gc_interval: Duration, + tunnel_ping_timeout: i64, + hws_message_ack_timeout: Duration, + hws_max_pending_size: u64, } #[derive(Clone)] @@ -79,12 +79,19 @@ impl SharedState { let receiver_subject = pegboard::pubsub_subjects::GatewayReceiverSubject::new(gateway_id).to_string(); + let pegboard_config = config.pegboard(); Self(Arc::new(SharedStateInner { ups, gateway_id, receiver_subject, in_flight_requests: HashMap::new(), - hibernation_timeout: config.pegboard().hibernating_request_eligible_threshold(), + hibernation_timeout: pegboard_config.hibernating_request_eligible_threshold(), + gc_interval: Duration::from_millis(pegboard_config.gateway_gc_interval_ms()), + tunnel_ping_timeout: pegboard_config.gateway_tunnel_ping_timeout_ms(), + hws_message_ack_timeout: Duration::from_millis( + pegboard_config.gateway_hws_message_ack_timeout_ms(), + ), + hws_max_pending_size: pegboard_config.gateway_hws_max_pending_size(), })) } @@ -208,7 +215,7 @@ impl SharedState { if let (Some(hs), true) = (&mut req.hibernation_state, is_ws_message) { hs.total_pending_ws_msgs_size += message_serialized.len() as u64; - if hs.total_pending_ws_msgs_size > HWS_MAX_PENDING_MSGS_SIZE_PER_REQ + if hs.total_pending_ws_msgs_size > self.hws_max_pending_size || hs.pending_ws_msgs.len() >= u16::MAX as usize { return Err(WebsocketPendingLimitReached {}.build()); @@ -250,7 +257,7 @@ impl SharedState { let now = util::timestamp::now(); // Verify ping timeout - if now.saturating_sub(req.last_pong) > TUNNEL_PING_TIMEOUT { + if now.saturating_sub(req.last_pong) > self.tunnel_ping_timeout { tracing::warn!("tunnel timeout"); return Err(WebSocketServiceTimeout.build()); } @@ -469,7 +476,7 @@ impl SharedState { #[tracing::instrument(skip_all)] async fn gc(&self) { - let mut interval = tokio::time::interval(GC_INTERVAL); + let mut interval = tokio::time::interval(self.gc_interval); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { @@ -534,7 +541,7 @@ impl SharedState { let reason = 'reason: { if let Some(hs) = &req.hibernation_state { if let Some(earliest_pending_ws_msg) = hs.pending_ws_msgs.first() { - if now.duration_since(earliest_pending_ws_msg.send_instant) > HWS_MESSAGE_ACK_TIMEOUT { + if now.duration_since(earliest_pending_ws_msg.send_instant) > self.hws_message_ack_timeout { break 'reason Some(MsgGcReason::WebSocketMessageNotAcked { first_msg_index: earliest_pending_ws_msg.message_index, last_msg_index: req.message_index, diff --git a/engine/packages/pegboard-runner/src/actor_event_demuxer.rs b/engine/packages/pegboard-runner/src/actor_event_demuxer.rs index 2b5c7c743f..47f5cda42c 100644 --- a/engine/packages/pegboard-runner/src/actor_event_demuxer.rs +++ b/engine/packages/pegboard-runner/src/actor_event_demuxer.rs @@ -7,9 +7,6 @@ use rivet_runner_protocol as protocol; use tokio::sync::mpsc; use tokio::task::JoinHandle; -const GC_INTERVAL: Duration = Duration::from_secs(30); -const MAX_LAST_SEEN: Duration = Duration::from_secs(30); - struct Channel { tx: mpsc::UnboundedSender, handle: JoinHandle<()>, @@ -21,15 +18,24 @@ pub struct ActorEventDemuxer { runner_id: Id, channels: HashMap, last_gc: Instant, + gc_interval: Duration, + max_last_seen: Duration, } impl ActorEventDemuxer { pub fn new(ctx: StandaloneCtx, runner_id: Id) -> Self { + let pegboard_config = ctx.config().pegboard(); + let gc_interval = + Duration::from_millis(pegboard_config.runner_event_demuxer_gc_interval_ms()); + let max_last_seen = + Duration::from_millis(pegboard_config.runner_event_demuxer_max_last_seen_ms()); Self { ctx, runner_id, channels: HashMap::new(), last_gc: Instant::now(), + gc_interval, + max_last_seen, } } @@ -64,11 +70,11 @@ impl ActorEventDemuxer { } // Run gc periodically - if self.last_gc.elapsed() > GC_INTERVAL { + if self.last_gc.elapsed() > self.gc_interval { self.last_gc = Instant::now(); self.channels.retain(|_, channel| { - let keep = channel.last_seen.elapsed() < MAX_LAST_SEEN; + let keep = channel.last_seen.elapsed() < self.max_last_seen; if !keep { // TODO: Verify aborting is safe here diff --git a/engine/packages/pegboard-runner/src/errors.rs b/engine/packages/pegboard-runner/src/errors.rs index 45c5d39144..0799858cf1 100644 --- a/engine/packages/pegboard-runner/src/errors.rs +++ b/engine/packages/pegboard-runner/src/errors.rs @@ -1,5 +1,17 @@ use rivet_error::*; -use serde::Serialize; +use serde::{Deserialize, Serialize}; + +#[derive(RivetError, Serialize, Deserialize)] +#[error( + "guard", + "response_body_too_large", + "Response body too large.", + "Response body size {size} bytes exceeds maximum allowed {max_size} bytes." +)] +pub struct ResponseBodyTooLarge { + pub size: usize, + pub max_size: usize, +} #[derive(RivetError, Debug)] #[error("ws")] diff --git a/engine/packages/pegboard-runner/src/lib.rs b/engine/packages/pegboard-runner/src/lib.rs index b2eeb5bd26..ed160599e9 100644 --- a/engine/packages/pegboard-runner/src/lib.rs +++ b/engine/packages/pegboard-runner/src/lib.rs @@ -23,8 +23,6 @@ mod tunnel_to_ws_task; mod utils; mod ws_to_tunnel_task; -const UPDATE_PING_INTERVAL: Duration = Duration::from_secs(3); - #[derive(Debug)] enum LifecycleResult { Closed, @@ -155,10 +153,17 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe { )); // Update pings + let update_ping_interval = Duration::from_millis( + self.ctx + .config() + .pegboard() + .runner_update_ping_interval_ms(), + ); let ping = tokio::spawn(ping_task::task( self.ctx.clone(), conn.clone(), ping_abort_rx, + update_ping_interval, )); let tunnel_to_ws_abort_tx2 = tunnel_to_ws_abort_tx.clone(); let ws_to_tunnel_abort_tx2 = ws_to_tunnel_abort_tx.clone(); diff --git a/engine/packages/pegboard-runner/src/ping_task.rs b/engine/packages/pegboard-runner/src/ping_task.rs index 7f8f25b9b0..b4bb096321 100644 --- a/engine/packages/pegboard-runner/src/ping_task.rs +++ b/engine/packages/pegboard-runner/src/ping_task.rs @@ -3,10 +3,11 @@ use hyper_tungstenite::tungstenite::Message; use pegboard::ops::runner::update_alloc_idx::{Action, RunnerEligibility}; use rivet_runner_protocol::{self as protocol, versioned}; use std::sync::{Arc, atomic::Ordering}; +use std::time::Duration; use tokio::sync::watch; use vbare::OwnedVersionedData; -use crate::{LifecycleResult, UPDATE_PING_INTERVAL, conn::Conn}; +use crate::{LifecycleResult, conn::Conn}; /// Updates the ping of all runners requesting a ping update at once. #[tracing::instrument(skip_all)] @@ -14,10 +15,11 @@ pub async fn task( ctx: StandaloneCtx, conn: Arc, mut ping_abort_rx: watch::Receiver<()>, + update_ping_interval: Duration, ) -> Result { loop { tokio::select! { - _ = tokio::time::sleep(UPDATE_PING_INTERVAL) => {} + _ = tokio::time::sleep(update_ping_interval) => {} _ = ping_abort_rx.changed() => { return Ok(LifecycleResult::Aborted); } diff --git a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs index 6e86d8f8f1..862e96f8b1 100644 --- a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs @@ -800,6 +800,23 @@ async fn handle_tunnel_message_mk2( ctx: &StandaloneCtx, msg: protocol::mk2::ToServerTunnelMessage, ) -> Result<()> { + // Check response body size limit for HTTP responses + if let protocol::mk2::ToServerTunnelMessageKind::ToServerResponseStart(ref resp) = + msg.message_kind + { + if let Some(ref body) = resp.body { + let max_response_body_size = + ctx.config().pegboard().runner_http_max_response_body_size(); + if body.len() > max_response_body_size { + return Err(errors::ResponseBodyTooLarge { + size: body.len(), + max_size: max_response_body_size, + } + .build()); + } + } + } + // Publish message to UPS let gateway_reply_to = GatewayReceiverSubject::new(msg.message_id.gateway_id).to_string(); @@ -860,6 +877,21 @@ async fn handle_tunnel_message_mk1( return Ok(()); } + // Check response body size limit for HTTP responses + if let protocol::ToServerTunnelMessageKind::ToServerResponseStart(ref resp) = msg.message_kind { + if let Some(ref body) = resp.body { + let max_response_body_size = + ctx.config().pegboard().runner_http_max_response_body_size(); + if body.len() > max_response_body_size { + return Err(errors::ResponseBodyTooLarge { + size: body.len(), + max_size: max_response_body_size, + } + .build()); + } + } + } + // Publish message to UPS let gateway_reply_to = GatewayReceiverSubject::new(msg.message_id.gateway_id).to_string(); let msg_serialized = versioned::ToGateway::v3_to_v4(versioned::ToGateway::V3( diff --git a/engine/packages/test-deps/src/datacenter.rs b/engine/packages/test-deps/src/datacenter.rs index 3720e64eff..19971b826c 100644 --- a/engine/packages/test-deps/src/datacenter.rs +++ b/engine/packages/test-deps/src/datacenter.rs @@ -87,6 +87,7 @@ pub async fn setup_single_datacenter( host: None, port: Some(guard_port), https: None, + ..Default::default() }); // Use short timeouts for tests diff --git a/website/public/llms-full.txt b/website/public/llms-full.txt index 7e937d1221..741cc50c24 100644 --- a/website/public/llms-full.txt +++ b/website/public/llms-full.txt @@ -3196,6 +3196,78 @@ const counter = actor(), default counter; ``` +## Limits + +# Limits + +This page documents the limits for Rivet Actors, including WebSocket message sizes and connection limits. + +## WebSocket Limits + +### Message Size Limits + +WebSocket message sizes are controlled at two levels: the **Rivet Engine** (infrastructure level) and **RivetKit** (application level). + +#### Rivet Engine Limits (Infrastructure) + +These limits are configured on the Rivet Engine and apply to all WebSocket traffic. They are not directly configurable by application developers but can be adjusted by self-hosted operators. + +| Limit | Default | Description | +|-------|---------|-------------| +| Max incoming message size | 32 MiB | Maximum size of an incoming WebSocket message from clients | +| Max outgoing message size | Unlimited | No limit on outgoing message size (uses tungstenite defaults) | +| Max pending buffer size (hibernating) | 128 MiB | Maximum buffered message size for hibernating WebSocket connections | +| Max pending messages (hibernating) | 65,535 | Maximum number of pending messages for hibernating WebSocket connections | + +For self-hosted deployments, these limits can be configured via the `guard.websocket_max_message_size` and `pegboard.gateway_hws_max_pending_size` configuration options. See the [self-hosting configuration](/docs/self-hosting/configuration) documentation. + +#### RivetKit Limits (Application) + +These limits are configured in your RivetKit application and control the message sizes your actors can send and receive. These provide an additional layer of protection on top of the Rivet Engine limits. + +| Limit | Default | Configurable | Description | +|-------|---------|--------------|-------------| +| `maxIncomingMessageSize` | 64 KB | Yes | Maximum size of messages received by actors | +| `maxOutgoingMessageSize` | 1 MB | Yes | Maximum size of messages sent by actors | + +**Configuration Example:** + +```typescript +const rivet = new Rivet(); +``` + +The RivetKit limits cannot exceed the Rivet Engine limits. If you set `maxIncomingMessageSize` higher than the engine's 32 MiB limit, messages will still be rejected at the engine level. + +### Connection Limits + +| Limit | Default | Description | +|-------|---------|-------------| +| WebSocket open timeout | 15 seconds | Time allowed for WebSocket handshake to complete | +| Response start timeout | 15 seconds | Time allowed for actor to begin responding | +| Tunnel ping timeout | 30 seconds | Time before a connection is considered dead if no pong received | +| Hibernation timeout | 90 seconds | Time before a hibernating request is considered disconnected | + +### Hibernating WebSocket Limits + +When a WebSocket connection is hibernating (actor is sleeping), messages are buffered until the actor wakes up: + +| Limit | Default | Description | +|-------|---------|-------------| +| Max pending buffer size | 128 MiB | Total size of all buffered messages per request | +| Max pending message count | 65,535 | Maximum number of buffered messages per request | +| Message ack timeout | 30 seconds | Time allowed for message acknowledgment before connection is closed | + +If these limits are exceeded, the WebSocket connection will be closed with an error. + +## Comparison with Cloudflare Durable Objects + +For users familiar with Cloudflare Durable Objects, here's how Rivet's limits compare: + +| Limit | Rivet | Cloudflare Durable Objects | +|-------|-------|---------------------------| +| Max incoming WebSocket message | 32 MiB | 32 MiB | +| Max outgoing WebSocket message | Unlimited | Not documented | +| Hibernation buffer | 128 MiB | Not documented | ## Metadata # Metadata @@ -6134,6 +6206,10 @@ interface RivetConfig ; // HTTP/HTTPS traffic handling service guard?: ; }; + route_cache_ttl_ms?: number; // Default: 600000 (10 minutes) + proxy_state_cache_ttl_ms?: number; // Default: 3600000 (1 hour) + websocket_close_linger_ms?: number; // Default: 100 + websocket_max_message_size?: number; // Default: 33554432 (32 MiB) }; // Public API service configuration diff --git a/website/public/llms.txt b/website/public/llms.txt index f015de96d4..da4e966a03 100644 --- a/website/public/llms.txt +++ b/website/public/llms.txt @@ -87,6 +87,7 @@ https://rivet.dev/docs/actors/http-api https://rivet.dev/docs/actors/input https://rivet.dev/docs/actors/keys https://rivet.dev/docs/actors/lifecycle +https://rivet.dev/docs/actors/limits https://rivet.dev/docs/actors/metadata https://rivet.dev/docs/actors/quickstart https://rivet.dev/docs/actors/quickstart/backend diff --git a/website/src/content/docs/actors/limits.mdx b/website/src/content/docs/actors/limits.mdx new file mode 100644 index 0000000000..7dda7ee044 --- /dev/null +++ b/website/src/content/docs/actors/limits.mdx @@ -0,0 +1,107 @@ +# Limits + +This page documents the limits for Rivet Actors. + +There are two types of limits: + +- **Soft Limit**: Application-level limit, configurable in RivetKit. These cannot exceed the hard limit. +- **Hard Limit**: Infrastructure-level limit that cannot be configured. + +Soft limits can be configured in RivetKit by passing options to `setup`: + +```typescript +import { setup } from "rivetkit"; + +const rivet = setup({ + use: { /* ... */ }, + maxIncomingMessageSize: 1_048_576, + maxOutgoingMessageSize: 10_485_760, + // ... +}); +``` + +## Limits + +### WebSocket + +These limits affect actions that use `.connect()` and [low-level WebSockets](/docs/actors/websocket-handler). + +| Name | Soft Limit | Hard Limit | Description | +|------|------------|------------|-------------| +| Max incoming message size | 64 KB | 32 MiB | Maximum size of incoming WebSocket messages. Soft limit configurable via `maxIncomingMessageSize`. | +| Max outgoing message size | 1 MB | 32 MiB | Maximum size of outgoing WebSocket messages. Soft limit configurable via `maxOutgoingMessageSize`. | +| WebSocket open timeout | — | 15 seconds | Time allowed for WebSocket connection to be established, including `onBeforeConnect` and `createConnState` hooks. Connection is closed if exceeded. | +| Message ack timeout | — | 30 seconds | Time allowed for message acknowledgment before connection is closed. Only relevant in the case of a network issue and does not affect your application. | + +### Hibernating WebSocket + +Hibernating WebSockets allow actors to sleep while keeping client connections alive. All WebSocket limits above also apply to hibernating WebSockets. See [WebSocket Hibernation](/docs/actors/websocket-handler#web-socket-hibernation) for details. + +| Name | Soft Limit | Hard Limit | Description | +|------|------------|------------|-------------| +| Max pending buffer size | — | 128 MiB | Total size of all buffered messages per connection while actor is sleeping. | +| Max pending message count | — | 65,535 | Maximum number of buffered messages per connection while actor is sleeping. | +| Hibernation timeout | — | 90 seconds | Maximum time an actor has to wake up before the client is disconnected. Only relevant if something is wrong with starting actors. | + +### HTTP + +These limits affect actions that do not use `.connect()` and [low-level HTTP requests](/docs/actors/request-handler). + +| Name | Soft Limit | Hard Limit | Description | +|------|------------|------------|-------------| +| Max request body size | — | 128 MiB | Maximum size of HTTP request bodies. | +| Max response body size | — | 128 MiB | Maximum size of HTTP response bodies. | +| Request timeout | — | 5 minutes | Maximum time for a request to complete. | + +### Networking + +| Name | Soft Limit | Hard Limit | Description | +|------|------------|------------|-------------| +| Connection ping timeout | — | 30 seconds | Connection is closed if a ping is not acknowledged within this time. Applies to both HTTP and WebSocket. Only relevant in the case of a network issue and does not affect your application. | + +### Actor KV Storage + +These limits apply to the low-level KV storage interface powering Rivet Actors. They likely do not affect your application, but are documented for completeness. + +| Name | Soft Limit | Hard Limit | Description | +|------|------------|------------|-------------| +| Max key size | — | 2 KB | Maximum size of a single key. | +| Max value size | — | 128 KB | Maximum size of a single value. | +| Max keys per operation | — | 128 | Maximum number of keys in a single get/put/delete operation. | +| Max batch put payload size | — | 976 KB | Maximum total size of all key-value pairs in a single batch put operation. | +| Max storage size per actor | — | 1 GiB | Maximum total storage size for a single actor. | +| List default limit | — | 16,384 | Default maximum number of keys returned by a list operation. | + +### Actor Input + +See [Actor Input](/docs/actors/input) for details. + +| Name | Soft Limit | Hard Limit | Description | +|------|------------|------------|-------------| +| Max actor input size | — | 4 MiB | Maximum size of the input passed when creating an actor. | +| Max connection params size | — | 4 KB | Maximum size of connection parameters passed when connecting to an actor. | +| Max actor key size | — | 128 bytes | Maximum size of an actor key (used for actor addressing). | + +### Rate Limiting + +| Name | Soft Limit | Hard Limit | Description | +|------|------------|------------|-------------| +| Rate limit | — | 1200 requests/minute | Default rate limit per actor per IP address with a 1 minute time bucket. | +| Max in-flight requests | — | 32 | Default maximum concurrent requests to an actor per IP address. | + +### Timeouts + +| Name | Soft Limit | Hard Limit | Description | +|------|------------|------------|-------------| +| Action timeout | 60 seconds | — | Timeout for RPC actions. Configurable via `actionTimeout`. | +| Create vars timeout | 5 seconds | — | Timeout for `createVars` hook. Configurable via `createVarsTimeout`. | +| Create conn state timeout | 5 seconds | — | Timeout for `createConnState` hook. Configurable via `createConnStateTimeout`. | +| On connect timeout | 5 seconds | — | Timeout for `onConnect` hook. Configurable via `onConnectTimeout`. | +| On sleep timeout | 5 seconds | — | Timeout for `onSleep` hook. Configurable via `onSleepTimeout`. | +| On destroy timeout | 5 seconds | — | Timeout for `onDestroy` hook. Configurable via `onDestroyTimeout`. | +| Wait until timeout | 15 seconds | — | Max time to wait for `waitUntil` background promises during shutdown. Configurable via `waitUntilTimeout`. | + +## Increasing Limits + +These limits are sane defaults designed to protect your application from exploits and accidental runaway bugs. If you have a use case that requires different limits, [contact us](https://rivet.dev/contact) to discuss your requirements. + diff --git a/website/src/content/docs/self-hosting/configuration.mdx b/website/src/content/docs/self-hosting/configuration.mdx index b886b425cf..bcba40c638 100644 --- a/website/src/content/docs/self-hosting/configuration.mdx +++ b/website/src/content/docs/self-hosting/configuration.mdx @@ -49,8 +49,8 @@ interface RivetConfig { // HTTP/HTTPS traffic handling service guard?: { - host?: string; // Default: "::" (IPv6 unspecified) - port?: number; // Default: 6420 + host?: string; // Default: "::" (IPv6 unspecified) + port?: number; // Default: 6420 https?: { port: number; tls: { @@ -60,6 +60,13 @@ interface RivetConfig { api_key_path: string; }; }; + route_cache_ttl_ms?: number; // Default: 600000 (10 minutes) + proxy_state_cache_ttl_ms?: number; // Default: 3600000 (1 hour) + websocket_close_linger_ms?: number; // Default: 100 + websocket_max_message_size?: number; // Default: 33554432 (32 MiB) + websocket_max_outgoing_message_size?: number; // Default: 33554432 (32 MiB) + http_max_request_body_size?: number; // Default: 268435456 (256 MiB) + http_request_timeout_secs?: number; // Default: 360 (6 minutes) }; // Public API service configuration @@ -88,6 +95,20 @@ interface RivetConfig { serverless_retry_reset_duration?: number; // Default: 600000 (ms) serverless_backoff_max_exponent?: number; // Default: 8 pool_desired_max_override?: number; + // Gateway settings + gateway_websocket_open_timeout_ms?: number; // Default: 15000 (ms) + gateway_response_start_timeout_ms?: number; // Default: 15000 (ms) + gateway_update_ping_interval_ms?: number; // Default: 3000 (ms) + gateway_gc_interval_ms?: number; // Default: 15000 (ms) + gateway_tunnel_ping_timeout_ms?: number; // Default: 30000 (ms) + gateway_hws_message_ack_timeout_ms?: number; // Default: 30000 (ms) + gateway_hws_max_pending_size?: number; // Default: 134217728 (128 MiB) + gateway_http_max_request_body_size?: number; // Default: 134217728 (128 MiB) + // Runner settings + runner_http_max_response_body_size?: number; // Default: 134217728 (128 MiB) + runner_update_ping_interval_ms?: number; // Default: 3000 (ms) + runner_event_demuxer_gc_interval_ms?: number; // Default: 30000 (ms) + runner_event_demuxer_max_last_seen_ms?: number; // Default: 30000 (ms) }; // Logging configuration diff --git a/website/src/sitemap/mod.ts b/website/src/sitemap/mod.ts index e8f450c96c..9d9ddb5568 100644 --- a/website/src/sitemap/mod.ts +++ b/website/src/sitemap/mod.ts @@ -261,6 +261,10 @@ export const sitemap = [ href: "/docs/general/cors", // icon: faShareNodes, }, + { + title: "Limits", + href: "/docs/actors/limits", + }, ], }, ],