make max_concurrent_chunks for PutObjectStreamRequest configurable#464
make max_concurrent_chunks for PutObjectStreamRequest configurable#464robinfriedli wants to merge 1 commit into
Conversation
- move concurrent chunk upload implementation to helper function _put_object_stream_chunks_concurrent - add helper function _put_object_stream_chunks_sequential for sequential chunk upload if max_concurrent_chunks is = 1
📝 WalkthroughWalkthroughThe PR adds optional concurrency control to multipart S3 uploads. A new ChangesConfigurable concurrency for multipart uploads
Sequence DiagramsequenceDiagram
participant Caller
participant PutObjectStreamRequest
participant _put_object_stream
participant ConcurrencyLogic
Caller->>PutObjectStreamRequest: with_max_concurrent_chunks(max)
PutObjectStreamRequest->>PutObjectStreamRequest: store NonZeroUsize
Caller->>PutObjectStreamRequest: execute_stream()
PutObjectStreamRequest->>_put_object_stream: call with max_concurrent_chunks arg
_put_object_stream->>ConcurrencyLogic: compute effective max or use override
ConcurrencyLogic->>_put_object_stream: return (total_size, etags)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Brownian Motion (Brass)Recommendation: Stop Summary: Configurable max_concurrent_chunks adds complexity without clear necessity. Highlights
Unknowns
Next actions
Reflection questions
|
|
Adds |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
s3/src/put_object_request.rs (1)
291-295: ⚡ Quick winAdd a rustdoc example for
with_max_concurrent_chunks.This is a new public builder method, but its docs still don't show how callers are expected to use the special values here, especially
0and1.📝 Suggested rustdoc snippet
+ /// # Example + /// ```no_run + /// # use anyhow::Result; + /// # use s3::bucket::Bucket; + /// # use s3::creds::Credentials; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// # use tokio::fs::File; + /// # let bucket = Bucket::new("my-bucket", "us-east-1".parse()?, Credentials::default()?)?; + /// # let mut file = File::open("large-file.zip").await?; + /// let response = bucket + /// .put_object_stream_builder("/large-file.zip") + /// .with_max_concurrent_chunks(1) + /// .execute_stream(&mut file) + /// .await?; + /// # let _ = response; + /// # Ok(()) + /// # } + /// ``` /// Set the maximum number of concurrent chunks for multipart upload, setting it to 0 falls back to the default value based on available memory pub fn with_max_concurrent_chunks(mut self, max: usize) -> Self { self.max_concurrent_chunks = std::num::NonZeroUsize::new(max); self }As per coding guidelines, "All public APIs should have documentation examples".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@s3/src/put_object_request.rs` around lines 291 - 295, Add a rustdoc example to the public builder method with_max_concurrent_chunks showing typical usage and the special values (0 falls back to default, 1 forces single-chunk concurrency); update the doc comment for with_max_concurrent_chunks to include a no_run code snippet that builds a Bucket, opens a file, calls put_object_stream_builder(...).with_max_concurrent_chunks(1).execute_stream(&mut file).await and also mention/example of using .with_max_concurrent_chunks(0) to show the fallback behavior so callers know how to pass and interpret 0 and 1.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@s3/src/bucket.rs`:
- Around line 1725-1732: The multipart branch always returns
Ok(PutStreamResponse) even when complete_multipart_upload returned a 4xx/5xx;
update the code around complete_multipart_upload/response_data to mirror the
small-file branch by checking response_data.status_code() (or .is_success()) and
returning an Err when it's not a 2xx, instead of always wrapping into
PutStreamResponse; use the existing error type/path used elsewhere (e.g. the
same error construction used in the small-file flow) and only return
Ok(PutStreamResponse::new(response_data.status_code(), total_size)) when the
status is successful.
- Around line 1685-1689: The code currently uses the caller-provided
max_concurrent_chunks verbatim; change the mapping so the override is capped by
the auto-calculated safe cap: compute let safe_cap =
Self::calculate_max_concurrent_chunks(); then set max_concurrent_chunks =
max_concurrent_chunks.map_or_else(|| safe_cap, |n| std::cmp::min(n.get(),
safe_cap)); this uses the smaller of the caller override and
Self::calculate_max_concurrent_chunks() before the read loop/filling the upload
queue (references: the max_concurrent_chunks binding and
Self::calculate_max_concurrent_chunks).
---
Nitpick comments:
In `@s3/src/put_object_request.rs`:
- Around line 291-295: Add a rustdoc example to the public builder method
with_max_concurrent_chunks showing typical usage and the special values (0 falls
back to default, 1 forces single-chunk concurrency); update the doc comment for
with_max_concurrent_chunks to include a no_run code snippet that builds a
Bucket, opens a file, calls
put_object_stream_builder(...).with_max_concurrent_chunks(1).execute_stream(&mut
file).await and also mention/example of using .with_max_concurrent_chunks(0) to
show the fallback behavior so callers know how to pass and interpret 0 and 1.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 20098e04-2f6f-4b8e-984a-f4679d1ee0fa
📒 Files selected for processing (2)
s3/src/bucket.rss3/src/put_object_request.rs
| // use configured max_concurrent_chunks or determine max concurrent chunks based on available memory | ||
| let max_concurrent_chunks = max_concurrent_chunks.map_or_else( | ||
| Self::calculate_max_concurrent_chunks, | ||
| std::num::NonZeroUsize::get, | ||
| ); |
There was a problem hiding this comment.
Cap the caller override before filling the upload queue.
Line 1686 uses the override verbatim, so with_max_concurrent_chunks(10_000) will let the read loop buffer up to 10,000 full 8 MiB chunks before backpressure kicks in. That is enough to OOM the process and defeats the existing safety cap on the auto-detected path.
🛡️ Suggested fix
- let max_concurrent_chunks = max_concurrent_chunks.map_or_else(
- Self::calculate_max_concurrent_chunks,
- std::num::NonZeroUsize::get,
- );
+ let max_concurrent_chunks = max_concurrent_chunks
+ .map(std::num::NonZeroUsize::get)
+ .unwrap_or_else(Self::calculate_max_concurrent_chunks)
+ .min(100);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@s3/src/bucket.rs` around lines 1685 - 1689, The code currently uses the
caller-provided max_concurrent_chunks verbatim; change the mapping so the
override is capped by the auto-calculated safe cap: compute let safe_cap =
Self::calculate_max_concurrent_chunks(); then set max_concurrent_chunks =
max_concurrent_chunks.map_or_else(|| safe_cap, |n| std::cmp::min(n.get(),
safe_cap)); this uses the smaller of the caller override and
Self::calculate_max_concurrent_chunks() before the read loop/filling the upload
queue (references: the max_concurrent_chunks binding and
Self::calculate_max_concurrent_chunks).
| let response_data = self | ||
| .complete_multipart_upload(&path, &msg.upload_id, inner_data) | ||
| .await?; | ||
|
|
||
| Ok(PutStreamResponse::new( | ||
| response_data.status_code(), | ||
| total_size, | ||
| )) |
There was a problem hiding this comment.
Return an error when multipart completion fails.
The small-file branch converts non-2xx responses into Err, but this multipart branch always returns Ok(PutStreamResponse) even if complete_multipart_upload comes back 4xx/5xx. That makes a failed upload look successful unless every caller manually inspects the status code.
🚨 Suggested fix
let response_data = self
.complete_multipart_upload(&path, &msg.upload_id, inner_data)
.await?;
+ let status_code = response_data.status_code();
+ if status_code >= 300 {
+ return Err(error_from_response_data(response_data)?);
+ }
Ok(PutStreamResponse::new(
- response_data.status_code(),
+ status_code,
total_size,
))🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@s3/src/bucket.rs` around lines 1725 - 1732, The multipart branch always
returns Ok(PutStreamResponse) even when complete_multipart_upload returned a
4xx/5xx; update the code around complete_multipart_upload/response_data to
mirror the small-file branch by checking response_data.status_code() (or
.is_success()) and returning an Err when it's not a 2xx, instead of always
wrapping into PutStreamResponse; use the existing error type/path used elsewhere
(e.g. the same error construction used in the small-file flow) and only return
Ok(PutStreamResponse::new(response_data.status_code(), total_size)) when the
status is successful.
This change is
Summary by CodeRabbit