Skip to content

make max_concurrent_chunks for PutObjectStreamRequest configurable#464

Open
robinfriedli wants to merge 1 commit into
durch:masterfrom
filebroker:feature/configurable-put-object-stream-concurrency
Open

make max_concurrent_chunks for PutObjectStreamRequest configurable#464
robinfriedli wants to merge 1 commit into
durch:masterfrom
filebroker:feature/configurable-put-object-stream-concurrency

Conversation

@robinfriedli
Copy link
Copy Markdown
Contributor

@robinfriedli robinfriedli commented May 16, 2026

  • move concurrent chunk upload implementation to helper function _put_object_stream_chunks_concurrent
  • add helper function _put_object_stream_chunks_sequential for sequential chunk upload if max_concurrent_chunks is = 1

This change is Reviewable

Summary by CodeRabbit

  • New Features
    • Added configurable concurrency control for multipart streaming uploads. Users can now specify a maximum number of concurrent chunks during upload operations.

Review Change Stack

- move concurrent chunk upload implementation to helper function
  _put_object_stream_chunks_concurrent
- add helper function _put_object_stream_chunks_sequential for
  sequential chunk upload if max_concurrent_chunks is = 1
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 16, 2026

📝 Walkthrough

Walkthrough

The PR adds optional concurrency control to multipart S3 uploads. A new max_concurrent_chunks builder method on PutObjectStreamRequest allows callers to override the default memory-based concurrency limit. The internal multipart handler computes an effective max, branches between sequential and concurrent paths, and now handles ETag sorting and completion outside the concurrent upload logic.

Changes

Configurable concurrency for multipart uploads

Layer / File(s) Summary
Request builder for concurrency control
s3/src/put_object_request.rs
PutObjectStreamRequest gains an optional max_concurrent_chunks field and public with_max_concurrent_chunks(max: usize) builder method. Both tokio and async-std execute_stream implementations pass this override to the internal bucket helper.
Multipart upload with configurable concurrency
s3/src/bucket.rs
_put_object_stream_with_content_type_and_headers now accepts max_concurrent_chunks: Option<NonZeroUsize>, computes an effective concurrency limit, branches between sequential and concurrent upload paths, and moves ETag sorting and multipart completion responsibility into the outer flow. Concurrent task submission was refactored to avoid cloning captured variables, and abort-on-failure calls were updated.

Sequence Diagram

sequenceDiagram
  participant Caller
  participant PutObjectStreamRequest
  participant _put_object_stream
  participant ConcurrencyLogic
  Caller->>PutObjectStreamRequest: with_max_concurrent_chunks(max)
  PutObjectStreamRequest->>PutObjectStreamRequest: store NonZeroUsize
  Caller->>PutObjectStreamRequest: execute_stream()
  PutObjectStreamRequest->>_put_object_stream: call with max_concurrent_chunks arg
  _put_object_stream->>ConcurrencyLogic: compute effective max or use override
  ConcurrencyLogic->>_put_object_stream: return (total_size, etags)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related issues

Poem

🐰 Chunked uploads now obey the bunny's will,
With concurrency bending to each caller's skill—
No more memory's iron fist holds the reins alone,
When with_max_concurrent_chunks plants its seed and tone,
Sequential or swift, the choice is all your own!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately describes the main change: making max_concurrent_chunks configurable for PutObjectStreamRequest, which is the core objective reflected in both the code changes and PR description.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@brownian-motion-v0
Copy link
Copy Markdown

Brownian Motion (Brass)

Recommendation: Stop

Summary: Configurable max_concurrent_chunks adds complexity without clear necessity.
Risk: Medium · Confidence: 80%

Highlights

  • Good separation of sequential and concurrent logic
  • Clear commit message

Unknowns

  • No linked issue
  • Unclear performance impact of new configuration

Next actions

  • Keep: existing chunk upload logic
  • Drop: max_concurrent_chunks configuration
  • Add: documentation on chunk upload behavior

Reflection questions

  • What core assumption underpins this PR's approach?
  • How does this change align with the project's longer-term goals?
  • Could there be a simpler way to achieve the primary objective here?

@robinfriedli
Copy link
Copy Markdown
Contributor Author

Adds max_concurrent_chunks configuration for #463

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
s3/src/put_object_request.rs (1)

291-295: ⚡ Quick win

Add a rustdoc example for with_max_concurrent_chunks.

This is a new public builder method, but its docs still don't show how callers are expected to use the special values here, especially 0 and 1.

📝 Suggested rustdoc snippet
+    /// # Example
+    /// ```no_run
+    /// # use anyhow::Result;
+    /// # use s3::bucket::Bucket;
+    /// # use s3::creds::Credentials;
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// # use tokio::fs::File;
+    /// # let bucket = Bucket::new("my-bucket", "us-east-1".parse()?, Credentials::default()?)?;
+    /// # let mut file = File::open("large-file.zip").await?;
+    /// let response = bucket
+    ///     .put_object_stream_builder("/large-file.zip")
+    ///     .with_max_concurrent_chunks(1)
+    ///     .execute_stream(&mut file)
+    ///     .await?;
+    /// # let _ = response;
+    /// # Ok(())
+    /// # }
+    /// ```
     /// Set the maximum number of concurrent chunks for multipart upload, setting it to 0 falls back to the default value based on available memory
     pub fn with_max_concurrent_chunks(mut self, max: usize) -> Self {
         self.max_concurrent_chunks = std::num::NonZeroUsize::new(max);
         self
     }

As per coding guidelines, "All public APIs should have documentation examples".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@s3/src/put_object_request.rs` around lines 291 - 295, Add a rustdoc example
to the public builder method with_max_concurrent_chunks showing typical usage
and the special values (0 falls back to default, 1 forces single-chunk
concurrency); update the doc comment for with_max_concurrent_chunks to include a
no_run code snippet that builds a Bucket, opens a file, calls
put_object_stream_builder(...).with_max_concurrent_chunks(1).execute_stream(&mut
file).await and also mention/example of using .with_max_concurrent_chunks(0) to
show the fallback behavior so callers know how to pass and interpret 0 and 1.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@s3/src/bucket.rs`:
- Around line 1725-1732: The multipart branch always returns
Ok(PutStreamResponse) even when complete_multipart_upload returned a 4xx/5xx;
update the code around complete_multipart_upload/response_data to mirror the
small-file branch by checking response_data.status_code() (or .is_success()) and
returning an Err when it's not a 2xx, instead of always wrapping into
PutStreamResponse; use the existing error type/path used elsewhere (e.g. the
same error construction used in the small-file flow) and only return
Ok(PutStreamResponse::new(response_data.status_code(), total_size)) when the
status is successful.
- Around line 1685-1689: The code currently uses the caller-provided
max_concurrent_chunks verbatim; change the mapping so the override is capped by
the auto-calculated safe cap: compute let safe_cap =
Self::calculate_max_concurrent_chunks(); then set max_concurrent_chunks =
max_concurrent_chunks.map_or_else(|| safe_cap, |n| std::cmp::min(n.get(),
safe_cap)); this uses the smaller of the caller override and
Self::calculate_max_concurrent_chunks() before the read loop/filling the upload
queue (references: the max_concurrent_chunks binding and
Self::calculate_max_concurrent_chunks).

---

Nitpick comments:
In `@s3/src/put_object_request.rs`:
- Around line 291-295: Add a rustdoc example to the public builder method
with_max_concurrent_chunks showing typical usage and the special values (0 falls
back to default, 1 forces single-chunk concurrency); update the doc comment for
with_max_concurrent_chunks to include a no_run code snippet that builds a
Bucket, opens a file, calls
put_object_stream_builder(...).with_max_concurrent_chunks(1).execute_stream(&mut
file).await and also mention/example of using .with_max_concurrent_chunks(0) to
show the fallback behavior so callers know how to pass and interpret 0 and 1.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 20098e04-2f6f-4b8e-984a-f4679d1ee0fa

📥 Commits

Reviewing files that changed from the base of the PR and between b584ce7 and caa841c.

📒 Files selected for processing (2)
  • s3/src/bucket.rs
  • s3/src/put_object_request.rs

Comment thread s3/src/bucket.rs
Comment on lines +1685 to +1689
// use configured max_concurrent_chunks or determine max concurrent chunks based on available memory
let max_concurrent_chunks = max_concurrent_chunks.map_or_else(
Self::calculate_max_concurrent_chunks,
std::num::NonZeroUsize::get,
);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Cap the caller override before filling the upload queue.

Line 1686 uses the override verbatim, so with_max_concurrent_chunks(10_000) will let the read loop buffer up to 10,000 full 8 MiB chunks before backpressure kicks in. That is enough to OOM the process and defeats the existing safety cap on the auto-detected path.

🛡️ Suggested fix
-        let max_concurrent_chunks = max_concurrent_chunks.map_or_else(
-            Self::calculate_max_concurrent_chunks,
-            std::num::NonZeroUsize::get,
-        );
+        let max_concurrent_chunks = max_concurrent_chunks
+            .map(std::num::NonZeroUsize::get)
+            .unwrap_or_else(Self::calculate_max_concurrent_chunks)
+            .min(100);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@s3/src/bucket.rs` around lines 1685 - 1689, The code currently uses the
caller-provided max_concurrent_chunks verbatim; change the mapping so the
override is capped by the auto-calculated safe cap: compute let safe_cap =
Self::calculate_max_concurrent_chunks(); then set max_concurrent_chunks =
max_concurrent_chunks.map_or_else(|| safe_cap, |n| std::cmp::min(n.get(),
safe_cap)); this uses the smaller of the caller override and
Self::calculate_max_concurrent_chunks() before the read loop/filling the upload
queue (references: the max_concurrent_chunks binding and
Self::calculate_max_concurrent_chunks).

Comment thread s3/src/bucket.rs
Comment on lines +1725 to +1732
let response_data = self
.complete_multipart_upload(&path, &msg.upload_id, inner_data)
.await?;

Ok(PutStreamResponse::new(
response_data.status_code(),
total_size,
))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Return an error when multipart completion fails.

The small-file branch converts non-2xx responses into Err, but this multipart branch always returns Ok(PutStreamResponse) even if complete_multipart_upload comes back 4xx/5xx. That makes a failed upload look successful unless every caller manually inspects the status code.

🚨 Suggested fix
         let response_data = self
             .complete_multipart_upload(&path, &msg.upload_id, inner_data)
             .await?;
+        let status_code = response_data.status_code();
+        if status_code >= 300 {
+            return Err(error_from_response_data(response_data)?);
+        }

         Ok(PutStreamResponse::new(
-            response_data.status_code(),
+            status_code,
             total_size,
         ))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@s3/src/bucket.rs` around lines 1725 - 1732, The multipart branch always
returns Ok(PutStreamResponse) even when complete_multipart_upload returned a
4xx/5xx; update the code around complete_multipart_upload/response_data to
mirror the small-file branch by checking response_data.status_code() (or
.is_success()) and returning an Err when it's not a 2xx, instead of always
wrapping into PutStreamResponse; use the existing error type/path used elsewhere
(e.g. the same error construction used in the small-file flow) and only return
Ok(PutStreamResponse::new(response_data.status_code(), total_size)) when the
status is successful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant