-
Notifications
You must be signed in to change notification settings - Fork 243
make max_concurrent_chunks for PutObjectStreamRequest configurable #464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1598,8 +1598,14 @@ impl Bucket { | |
| s3_path: &str, | ||
| content_type: &str, | ||
| ) -> Result<PutStreamResponse, S3Error> { | ||
| self._put_object_stream_with_content_type_and_headers(reader, s3_path, content_type, None) | ||
| .await | ||
| self._put_object_stream_with_content_type_and_headers( | ||
| reader, | ||
| s3_path, | ||
| content_type, | ||
| None, | ||
| None, | ||
| ) | ||
| .await | ||
| } | ||
|
|
||
| /// Calculate the maximum number of concurrent chunks based on available memory. | ||
|
|
@@ -1642,6 +1648,7 @@ impl Bucket { | |
| s3_path: &str, | ||
| content_type: &str, | ||
| custom_headers: Option<http::HeaderMap>, | ||
| max_concurrent_chunks: Option<std::num::NonZeroUsize>, | ||
| ) -> Result<PutStreamResponse, S3Error> { | ||
| // If the file is smaller CHUNK_SIZE, just do a regular upload. | ||
| // Otherwise perform a multi-part upload. | ||
|
|
@@ -1675,9 +1682,117 @@ impl Bucket { | |
| let path = msg.key; | ||
| let upload_id = &msg.upload_id; | ||
|
|
||
| // Determine max concurrent chunks based on available memory | ||
| let max_concurrent_chunks = Self::calculate_max_concurrent_chunks(); | ||
| // use configured max_concurrent_chunks or determine max concurrent chunks based on available memory | ||
| let max_concurrent_chunks = max_concurrent_chunks.map_or_else( | ||
| Self::calculate_max_concurrent_chunks, | ||
| std::num::NonZeroUsize::get, | ||
| ); | ||
|
|
||
| let (total_size, mut etags) = if max_concurrent_chunks == 1 { | ||
| self._put_object_stream_chunks_sequential( | ||
| reader, | ||
| first_chunk, | ||
| &path, | ||
| upload_id, | ||
| content_type, | ||
| ) | ||
| .await? | ||
| } else { | ||
| self._put_object_stream_chunks_concurrent( | ||
| reader, | ||
| first_chunk, | ||
| &path, | ||
| upload_id, | ||
| content_type, | ||
| max_concurrent_chunks, | ||
| ) | ||
| .await? | ||
| }; | ||
|
|
||
| // Sort etags by part number to ensure correct order | ||
| etags.sort_by_key(|k| k.0); | ||
| let etags: Vec<String> = etags.into_iter().map(|(_, etag)| etag).collect(); | ||
|
|
||
| // Finish the upload | ||
| let inner_data = etags | ||
| .into_iter() | ||
| .enumerate() | ||
| .map(|(i, x)| Part { | ||
| etag: x, | ||
| part_number: i as u32 + 1, | ||
| }) | ||
| .collect::<Vec<Part>>(); | ||
| let response_data = self | ||
| .complete_multipart_upload(&path, &msg.upload_id, inner_data) | ||
| .await?; | ||
|
|
||
| Ok(PutStreamResponse::new( | ||
| response_data.status_code(), | ||
| total_size, | ||
| )) | ||
|
Comment on lines
+1725
to
+1732
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Return an error when multipart completion fails. The small-file branch converts non-2xx responses into 🚨 Suggested fix let response_data = self
.complete_multipart_upload(&path, &msg.upload_id, inner_data)
.await?;
+ let status_code = response_data.status_code();
+ if status_code >= 300 {
+ return Err(error_from_response_data(response_data)?);
+ }
Ok(PutStreamResponse::new(
- response_data.status_code(),
+ status_code,
total_size,
))🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| #[maybe_async::async_impl] | ||
| async fn _put_object_stream_chunks_sequential<R: AsyncRead + Unpin + ?Sized>( | ||
| &self, | ||
| reader: &mut R, | ||
| first_chunk: Vec<u8>, | ||
| path: &str, | ||
| upload_id: &str, | ||
| content_type: &str, | ||
| ) -> Result<(usize, Vec<(u32, String)>), S3Error> { | ||
| let mut chunk = first_chunk; | ||
| let mut part_number: u32 = 0; | ||
| let mut total_size = 0; | ||
| let mut etags = Vec::new(); | ||
|
|
||
| loop { | ||
| let chunk_len = chunk.len(); | ||
|
|
||
| if chunk_len == 0 { | ||
| break; | ||
| } | ||
|
|
||
| part_number += 1; | ||
| total_size += chunk_len; | ||
|
|
||
| let current_part = part_number; | ||
| let is_last_chunk = chunk_len < CHUNK_SIZE; | ||
|
|
||
| let response_data = self | ||
| .make_multipart_request(path, chunk, current_part, upload_id, content_type) | ||
| .await?; | ||
|
|
||
| if !(200..300).contains(&response_data.status_code()) { | ||
| // it chunk upload failed - abort the upload | ||
| return match self.abort_upload(path, upload_id).await { | ||
| Ok(_) => Err(error_from_response_data(response_data)?), | ||
| Err(error) => Err(error), | ||
| }; | ||
| } | ||
|
|
||
| etags.push((current_part, response_data.as_str()?.to_string())); | ||
|
|
||
| if is_last_chunk { | ||
| break; | ||
| } | ||
|
|
||
| chunk = crate::utils::read_chunk_async(reader).await?; | ||
| } | ||
|
|
||
| Ok((total_size, etags)) | ||
| } | ||
|
|
||
| #[maybe_async::async_impl] | ||
| async fn _put_object_stream_chunks_concurrent<R: AsyncRead + Unpin + ?Sized>( | ||
| &self, | ||
| reader: &mut R, | ||
| first_chunk: Vec<u8>, | ||
| path: &str, | ||
| upload_id: &str, | ||
| content_type: &str, | ||
| max_concurrent_chunks: usize, | ||
| ) -> Result<(usize, Vec<(u32, String)>), S3Error> { | ||
| // Use FuturesUnordered for bounded parallelism | ||
| use futures_util::FutureExt; | ||
| use futures_util::stream::{FuturesUnordered, StreamExt}; | ||
|
|
@@ -1697,21 +1812,10 @@ impl Bucket { | |
| reading_done = true; | ||
| } | ||
|
|
||
| let path_clone = path.clone(); | ||
| let upload_id_clone = upload_id.clone(); | ||
| let content_type_clone = content_type.to_string(); | ||
| let bucket_clone = self.clone(); | ||
|
|
||
| active_uploads.push( | ||
| async move { | ||
| let result = bucket_clone | ||
| .make_multipart_request( | ||
| &path_clone, | ||
| first_chunk, | ||
| 1, | ||
| &upload_id_clone, | ||
| &content_type_clone, | ||
| ) | ||
| let result = self | ||
| .make_multipart_request(path, first_chunk, 1, upload_id, content_type) | ||
| .await; | ||
| (1, result) | ||
| } | ||
|
|
@@ -1738,20 +1842,16 @@ impl Bucket { | |
| } | ||
|
|
||
| let current_part = part_number; | ||
| let path_clone = path.clone(); | ||
| let upload_id_clone = upload_id.clone(); | ||
| let content_type_clone = content_type.to_string(); | ||
| let bucket_clone = self.clone(); | ||
|
|
||
| active_uploads.push( | ||
| async move { | ||
| let result = bucket_clone | ||
| let result = self | ||
| .make_multipart_request( | ||
| &path_clone, | ||
| path, | ||
| chunk, | ||
| current_part, | ||
| &upload_id_clone, | ||
| &content_type_clone, | ||
| upload_id, | ||
| content_type, | ||
| ) | ||
| .await; | ||
| (current_part, result) | ||
|
|
@@ -1765,7 +1865,7 @@ impl Bucket { | |
| let response_data = result?; | ||
| if !(200..300).contains(&response_data.status_code()) { | ||
| // if chunk upload failed - abort the upload | ||
| match self.abort_upload(&path, upload_id).await { | ||
| match self.abort_upload(path, upload_id).await { | ||
| Ok(_) => { | ||
| return Err(error_from_response_data(response_data)?); | ||
| } | ||
|
|
@@ -1781,28 +1881,7 @@ impl Bucket { | |
| } | ||
| } | ||
|
|
||
| // Sort etags by part number to ensure correct order | ||
| etags.sort_by_key(|k| k.0); | ||
| let etags: Vec<String> = etags.into_iter().map(|(_, etag)| etag).collect(); | ||
|
|
||
| // Finish the upload | ||
| let inner_data = etags | ||
| .clone() | ||
| .into_iter() | ||
| .enumerate() | ||
| .map(|(i, x)| Part { | ||
| etag: x, | ||
| part_number: i as u32 + 1, | ||
| }) | ||
| .collect::<Vec<Part>>(); | ||
| let response_data = self | ||
| .complete_multipart_upload(&path, &msg.upload_id, inner_data) | ||
| .await?; | ||
|
|
||
| Ok(PutStreamResponse::new( | ||
| response_data.status_code(), | ||
| total_size, | ||
| )) | ||
| Ok((total_size, etags)) | ||
| } | ||
|
|
||
| #[maybe_async::sync_impl] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cap the caller override before filling the upload queue.
Line 1686 uses the override verbatim, so
with_max_concurrent_chunks(10_000)will let the read loop buffer up to 10,000 full 8 MiB chunks before backpressure kicks in. That is enough to OOM the process and defeats the existing safety cap on the auto-detected path.🛡️ Suggested fix
🤖 Prompt for AI Agents