Skip to content

Conversation

@ozangunalp
Copy link
Collaborator

Replenish upstream at inner stream completion even though we hit the concurrency limit

Replenish upstream at inner stream completion even though we hit the concurrency limit
@codecov
Copy link

codecov bot commented Dec 11, 2025

Codecov Report

❌ Patch coverage is 58.33333% with 5 lines in your changes missing coverage. Please review.
✅ Project coverage is 89.00%. Comparing base (a1998df) to head (6746983).
⚠️ Report is 45 commits behind head on main.

Files with missing lines Patch % Lines
...mallrye/mutiny/operators/multi/MultiFlatMapOp.java 37.50% 4 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##               main    #2031      +/-   ##
============================================
- Coverage     89.18%   89.00%   -0.19%     
+ Complexity     3120     3114       -6     
============================================
  Files           412      412              
  Lines         13295    13304       +9     
  Branches       1688     1690       +2     
============================================
- Hits          11857    11841      -16     
- Misses          812      825      +13     
- Partials        626      638      +12     
Files with missing lines Coverage Δ
...n/java/io/smallrye/mutiny/groups/MultiFlatten.java 100.00% <100.00%> (ø)
...mallrye/mutiny/operators/multi/MultiFlatMapOp.java 88.85% <37.50%> (-2.03%) ⬇️

... and 14 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ozangunalp
Copy link
Collaborator Author

For better coverage some tests need to be duplicated for the lenient concurrency branch

@jponge
Copy link
Member

jponge commented Dec 11, 2025

I'm not worried about test coverage here

@jponge
Copy link
Member

jponge commented Jan 15, 2026

Hi @ozangunalp, getting back to this, what's your take on this PR? Also should more tests be added?

@ozangunalp
Copy link
Collaborator Author

Yes, next is to add more test to check that requests doesn't blow up in normal cases. I'll get back to this tomorrow.

@ozangunalp
Copy link
Collaborator Author

ozangunalp commented Jan 20, 2026

@jponge, I reviewed why I need this in smallrye/smallrye-reactive-messaging#3208

I think I can get around this by cancelling/completing the merged group with an idleness timeout: If the group doesn't receive any more items within the timeout, it can cancel the upstream and complete the group to free up a merge slot in flatMap and remove the groupedMulti from groupBy.

Later if another item is produced upstream for the same key, it'll re-create the group.
This way we can hold the merge concurrency in it's limit and avoid the deadlock by closing idle but open inner streams.

I'll ping you on the PR for you take a look.

So I think we can close this one.

@jponge
Copy link
Member

jponge commented Jan 20, 2026

Thanks @ozangunalp for looking into this! I'll close this one, and happy to have a look on smallrye-reactive-messaging if you want a review!

@jponge jponge closed this Jan 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants