Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 127 additions & 48 deletions s3/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1598,8 +1598,14 @@ impl Bucket {
s3_path: &str,
content_type: &str,
) -> Result<PutStreamResponse, S3Error> {
self._put_object_stream_with_content_type_and_headers(reader, s3_path, content_type, None)
.await
self._put_object_stream_with_content_type_and_headers(
reader,
s3_path,
content_type,
None,
None,
)
.await
}

/// Calculate the maximum number of concurrent chunks based on available memory.
Expand Down Expand Up @@ -1642,6 +1648,7 @@ impl Bucket {
s3_path: &str,
content_type: &str,
custom_headers: Option<http::HeaderMap>,
max_concurrent_chunks: Option<std::num::NonZeroUsize>,
) -> Result<PutStreamResponse, S3Error> {
// If the file is smaller CHUNK_SIZE, just do a regular upload.
// Otherwise perform a multi-part upload.
Expand Down Expand Up @@ -1675,9 +1682,117 @@ impl Bucket {
let path = msg.key;
let upload_id = &msg.upload_id;

// Determine max concurrent chunks based on available memory
let max_concurrent_chunks = Self::calculate_max_concurrent_chunks();
// use configured max_concurrent_chunks or determine max concurrent chunks based on available memory
let max_concurrent_chunks = max_concurrent_chunks.map_or_else(
Self::calculate_max_concurrent_chunks,
std::num::NonZeroUsize::get,
);
Comment on lines +1685 to +1689
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Cap the caller override before filling the upload queue.

Line 1686 uses the override verbatim, so with_max_concurrent_chunks(10_000) will let the read loop buffer up to 10,000 full 8 MiB chunks before backpressure kicks in. That is enough to OOM the process and defeats the existing safety cap on the auto-detected path.

🛡️ Suggested fix
-        let max_concurrent_chunks = max_concurrent_chunks.map_or_else(
-            Self::calculate_max_concurrent_chunks,
-            std::num::NonZeroUsize::get,
-        );
+        let max_concurrent_chunks = max_concurrent_chunks
+            .map(std::num::NonZeroUsize::get)
+            .unwrap_or_else(Self::calculate_max_concurrent_chunks)
+            .min(100);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@s3/src/bucket.rs` around lines 1685 - 1689, The code currently uses the
caller-provided max_concurrent_chunks verbatim; change the mapping so the
override is capped by the auto-calculated safe cap: compute let safe_cap =
Self::calculate_max_concurrent_chunks(); then set max_concurrent_chunks =
max_concurrent_chunks.map_or_else(|| safe_cap, |n| std::cmp::min(n.get(),
safe_cap)); this uses the smaller of the caller override and
Self::calculate_max_concurrent_chunks() before the read loop/filling the upload
queue (references: the max_concurrent_chunks binding and
Self::calculate_max_concurrent_chunks).


let (total_size, mut etags) = if max_concurrent_chunks == 1 {
self._put_object_stream_chunks_sequential(
reader,
first_chunk,
&path,
upload_id,
content_type,
)
.await?
} else {
self._put_object_stream_chunks_concurrent(
reader,
first_chunk,
&path,
upload_id,
content_type,
max_concurrent_chunks,
)
.await?
};

// Sort etags by part number to ensure correct order
etags.sort_by_key(|k| k.0);
let etags: Vec<String> = etags.into_iter().map(|(_, etag)| etag).collect();

// Finish the upload
let inner_data = etags
.into_iter()
.enumerate()
.map(|(i, x)| Part {
etag: x,
part_number: i as u32 + 1,
})
.collect::<Vec<Part>>();
let response_data = self
.complete_multipart_upload(&path, &msg.upload_id, inner_data)
.await?;

Ok(PutStreamResponse::new(
response_data.status_code(),
total_size,
))
Comment on lines +1725 to +1732
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Return an error when multipart completion fails.

The small-file branch converts non-2xx responses into Err, but this multipart branch always returns Ok(PutStreamResponse) even if complete_multipart_upload comes back 4xx/5xx. That makes a failed upload look successful unless every caller manually inspects the status code.

🚨 Suggested fix
         let response_data = self
             .complete_multipart_upload(&path, &msg.upload_id, inner_data)
             .await?;
+        let status_code = response_data.status_code();
+        if status_code >= 300 {
+            return Err(error_from_response_data(response_data)?);
+        }

         Ok(PutStreamResponse::new(
-            response_data.status_code(),
+            status_code,
             total_size,
         ))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@s3/src/bucket.rs` around lines 1725 - 1732, The multipart branch always
returns Ok(PutStreamResponse) even when complete_multipart_upload returned a
4xx/5xx; update the code around complete_multipart_upload/response_data to
mirror the small-file branch by checking response_data.status_code() (or
.is_success()) and returning an Err when it's not a 2xx, instead of always
wrapping into PutStreamResponse; use the existing error type/path used elsewhere
(e.g. the same error construction used in the small-file flow) and only return
Ok(PutStreamResponse::new(response_data.status_code(), total_size)) when the
status is successful.

}

#[maybe_async::async_impl]
async fn _put_object_stream_chunks_sequential<R: AsyncRead + Unpin + ?Sized>(
&self,
reader: &mut R,
first_chunk: Vec<u8>,
path: &str,
upload_id: &str,
content_type: &str,
) -> Result<(usize, Vec<(u32, String)>), S3Error> {
let mut chunk = first_chunk;
let mut part_number: u32 = 0;
let mut total_size = 0;
let mut etags = Vec::new();

loop {
let chunk_len = chunk.len();

if chunk_len == 0 {
break;
}

part_number += 1;
total_size += chunk_len;

let current_part = part_number;
let is_last_chunk = chunk_len < CHUNK_SIZE;

let response_data = self
.make_multipart_request(path, chunk, current_part, upload_id, content_type)
.await?;

if !(200..300).contains(&response_data.status_code()) {
// it chunk upload failed - abort the upload
return match self.abort_upload(path, upload_id).await {
Ok(_) => Err(error_from_response_data(response_data)?),
Err(error) => Err(error),
};
}

etags.push((current_part, response_data.as_str()?.to_string()));

if is_last_chunk {
break;
}

chunk = crate::utils::read_chunk_async(reader).await?;
}

Ok((total_size, etags))
}

#[maybe_async::async_impl]
async fn _put_object_stream_chunks_concurrent<R: AsyncRead + Unpin + ?Sized>(
&self,
reader: &mut R,
first_chunk: Vec<u8>,
path: &str,
upload_id: &str,
content_type: &str,
max_concurrent_chunks: usize,
) -> Result<(usize, Vec<(u32, String)>), S3Error> {
// Use FuturesUnordered for bounded parallelism
use futures_util::FutureExt;
use futures_util::stream::{FuturesUnordered, StreamExt};
Expand All @@ -1697,21 +1812,10 @@ impl Bucket {
reading_done = true;
}

let path_clone = path.clone();
let upload_id_clone = upload_id.clone();
let content_type_clone = content_type.to_string();
let bucket_clone = self.clone();

active_uploads.push(
async move {
let result = bucket_clone
.make_multipart_request(
&path_clone,
first_chunk,
1,
&upload_id_clone,
&content_type_clone,
)
let result = self
.make_multipart_request(path, first_chunk, 1, upload_id, content_type)
.await;
(1, result)
}
Expand All @@ -1738,20 +1842,16 @@ impl Bucket {
}

let current_part = part_number;
let path_clone = path.clone();
let upload_id_clone = upload_id.clone();
let content_type_clone = content_type.to_string();
let bucket_clone = self.clone();

active_uploads.push(
async move {
let result = bucket_clone
let result = self
.make_multipart_request(
&path_clone,
path,
chunk,
current_part,
&upload_id_clone,
&content_type_clone,
upload_id,
content_type,
)
.await;
(current_part, result)
Expand All @@ -1765,7 +1865,7 @@ impl Bucket {
let response_data = result?;
if !(200..300).contains(&response_data.status_code()) {
// if chunk upload failed - abort the upload
match self.abort_upload(&path, upload_id).await {
match self.abort_upload(path, upload_id).await {
Ok(_) => {
return Err(error_from_response_data(response_data)?);
}
Expand All @@ -1781,28 +1881,7 @@ impl Bucket {
}
}

// Sort etags by part number to ensure correct order
etags.sort_by_key(|k| k.0);
let etags: Vec<String> = etags.into_iter().map(|(_, etag)| etag).collect();

// Finish the upload
let inner_data = etags
.clone()
.into_iter()
.enumerate()
.map(|(i, x)| Part {
etag: x,
part_number: i as u32 + 1,
})
.collect::<Vec<Part>>();
let response_data = self
.complete_multipart_upload(&path, &msg.upload_id, inner_data)
.await?;

Ok(PutStreamResponse::new(
response_data.status_code(),
total_size,
))
Ok((total_size, etags))
}

#[maybe_async::sync_impl]
Expand Down
10 changes: 10 additions & 0 deletions s3/src/put_object_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ pub struct PutObjectStreamRequest<'a> {
path: String,
content_type: String,
custom_headers: HeaderMap,
max_concurrent_chunks: Option<std::num::NonZeroUsize>,
}

#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
Expand All @@ -219,6 +220,7 @@ impl<'a> PutObjectStreamRequest<'a> {
path: path.as_ref().to_string(),
content_type: "application/octet-stream".to_string(),
custom_headers: HeaderMap::new(),
max_concurrent_chunks: None,
}
}

Expand Down Expand Up @@ -286,6 +288,12 @@ impl<'a> PutObjectStreamRequest<'a> {
Ok(self)
}

/// Set the maximum number of concurrent chunks for multipart upload, setting it to 0 falls back to the default value based on available memory
pub fn with_max_concurrent_chunks(mut self, max: usize) -> Self {
self.max_concurrent_chunks = std::num::NonZeroUsize::new(max);
self
}

/// Execute the streaming PUT request
#[cfg(feature = "with-tokio")]
pub async fn execute_stream<R: AsyncRead + Unpin + ?Sized>(
Expand All @@ -304,6 +312,7 @@ impl<'a> PutObjectStreamRequest<'a> {
} else {
Some(self.custom_headers)
},
self.max_concurrent_chunks,
)
.await
}
Expand All @@ -323,6 +332,7 @@ impl<'a> PutObjectStreamRequest<'a> {
} else {
Some(self.custom_headers)
},
self.max_concurrent_chunks,
)
.await
}
Expand Down
Loading