-
Notifications
You must be signed in to change notification settings - Fork 20
feat: bound incoming request and add postgres service #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| version: '3.8' | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks this here emits this warning on my machine with a fresh docker let's delete it. |
||
| services: | ||
| postgres: | ||
| image: postgres:15 | ||
| environment: | ||
| POSTGRES_DB: postgres | ||
| POSTGRES_USER: postgres | ||
| POSTGRES_PASSWORD: postgres | ||
| volumes: | ||
| - postgres-data:/var/lib/postgresql/data | ||
| ports: | ||
| - "5432:5432" | ||
| networks: | ||
| - app-network | ||
|
|
||
| volumes: | ||
| postgres-data: | ||
|
|
||
| networks: | ||
| app-network: | ||
| driver: bridge | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| use http_body_util::{BodyExt, Full}; | ||
| use http_body_util::{BodyExt, Full, Limited}; | ||
| use hyper::body::{Bytes, Incoming}; | ||
| use hyper::service::Service; | ||
| use hyper::{Request, Response, StatusCode}; | ||
|
|
@@ -18,15 +18,37 @@ use std::future::Future; | |
| use std::pin::Pin; | ||
| use std::sync::Arc; | ||
|
|
||
| const MAXIMUM_REQUEST_BODY_SIZE: usize = 1024 * 1024 * 1024; | ||
|
|
||
| #[derive(Clone)] | ||
| pub(crate) struct VssServiceConfig { | ||
| maximum_request_body_size: usize, | ||
| } | ||
|
|
||
| impl VssServiceConfig { | ||
| pub fn new(maximum_request_body_size: usize) -> Self { | ||
| Self { maximum_request_body_size: maximum_request_body_size.min(MAXIMUM_REQUEST_BODY_SIZE) } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of silently taking the min here, I would prefer we abort startup in case the user sets a value greater than 1 GiB to avoid surprises. |
||
| } | ||
| } | ||
|
|
||
| impl Default for VssServiceConfig { | ||
| fn default() -> Self { | ||
| Self { maximum_request_body_size: MAXIMUM_REQUEST_BODY_SIZE } | ||
| } | ||
| } | ||
|
|
||
| #[derive(Clone)] | ||
| pub struct VssService { | ||
| store: Arc<dyn KvStore>, | ||
| authorizer: Arc<dyn Authorizer>, | ||
| config: VssServiceConfig, | ||
| } | ||
|
|
||
| impl VssService { | ||
| pub(crate) fn new(store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>) -> Self { | ||
| Self { store, authorizer } | ||
| pub(crate) fn new( | ||
| store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>, config: VssServiceConfig, | ||
| ) -> Self { | ||
| Self { store, authorizer, config } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -41,22 +63,51 @@ impl Service<Request<Incoming>> for VssService { | |
| let store = Arc::clone(&self.store); | ||
| let authorizer = Arc::clone(&self.authorizer); | ||
| let path = req.uri().path().to_owned(); | ||
| let maximum_request_body_size = self.config.maximum_request_body_size; | ||
|
|
||
| Box::pin(async move { | ||
| let prefix_stripped_path = path.strip_prefix(BASE_PATH_PREFIX).unwrap_or_default(); | ||
|
|
||
| match prefix_stripped_path { | ||
| "/getObject" => { | ||
| handle_request(store, authorizer, req, handle_get_object_request).await | ||
| handle_request( | ||
| store, | ||
| authorizer, | ||
| req, | ||
| maximum_request_body_size, | ||
| handle_get_object_request, | ||
| ) | ||
| .await | ||
| }, | ||
| "/putObjects" => { | ||
| handle_request(store, authorizer, req, handle_put_object_request).await | ||
| handle_request( | ||
| store, | ||
| authorizer, | ||
| req, | ||
| maximum_request_body_size, | ||
| handle_put_object_request, | ||
| ) | ||
| .await | ||
| }, | ||
| "/deleteObject" => { | ||
| handle_request(store, authorizer, req, handle_delete_object_request).await | ||
| handle_request( | ||
| store, | ||
| authorizer, | ||
| req, | ||
| maximum_request_body_size, | ||
| handle_delete_object_request, | ||
| ) | ||
| .await | ||
| }, | ||
| "/listKeyVersions" => { | ||
| handle_request(store, authorizer, req, handle_list_object_request).await | ||
| handle_request( | ||
| store, | ||
| authorizer, | ||
| req, | ||
| maximum_request_body_size, | ||
| handle_list_object_request, | ||
| ) | ||
| .await | ||
| }, | ||
| _ => { | ||
| let error_msg = "Invalid request path.".as_bytes(); | ||
|
|
@@ -97,7 +148,7 @@ async fn handle_request< | |
| Fut: Future<Output = Result<R, VssError>> + Send, | ||
| >( | ||
| store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>, request: Request<Incoming>, | ||
| handler: F, | ||
| maximum_request_body_size: usize, handler: F, | ||
| ) -> Result<<VssService as Service<Request<Incoming>>>::Response, hyper::Error> { | ||
| let (parts, body) = request.into_parts(); | ||
| let headers_map = parts | ||
|
|
@@ -110,8 +161,17 @@ async fn handle_request< | |
| Ok(auth_response) => auth_response.user_token, | ||
| Err(e) => return Ok(build_error_response(e)), | ||
| }; | ||
| // TODO: we should bound the amount of data we read to avoid allocating too much memory. | ||
| let bytes = body.collect().await?.to_bytes(); | ||
|
|
||
| let limited_body = Limited::new(body, maximum_request_body_size); | ||
| let bytes = match limited_body.collect().await { | ||
| Ok(body) => body.to_bytes(), | ||
| Err(_) => { | ||
| return Ok(Response::builder() | ||
| .status(StatusCode::PAYLOAD_TOO_LARGE) | ||
| .body(Full::new(Bytes::from("Request body too large"))) | ||
| .unwrap()); | ||
| }, | ||
| }; | ||
| match T::decode(bytes) { | ||
| Ok(request) => match handler(store.clone(), user_token, request).await { | ||
| Ok(response) => Ok(Response::builder() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this tests something specific to the
PostgresBackendimplementation, let's put this test insideimpls/src/postgres_store.rs. This maximum body size of 1GiB is not part of the VSS protocol contract.