-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Open
Labels
Description
The Flowable.timeout() operator signals TimeoutException when:
- EITHER the upstream is emitting too slowly (expected)
- OR if the downstream is requesting the upstream to emit items too slowly. (surprising)
Here's a toy example demonstrating TimeoutException during backpressure:
@Test
public void testTimeout() {
Flowable
.just("Hello, world!")
.repeat(100)
.timeout(1, TimeUnit.SECONDS)
.blockingSubscribe(new Subscriber<>() {
Subscription sub;
private void slowRequest(final long n) {
Schedulers.io().scheduleDirect(() -> sub.request(n), 5, TimeUnit.SECONDS);
}
@Override
public void onSubscribe(final Subscription sub) {
this.sub = sub;
slowRequest(1);
}
@Override
public void onNext(final String o) {
System.out.println(o);
slowRequest(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable);
}
@Override
public void onComplete() {
System.out.println("COMPLETE");
}
});
}Result:
java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.
The error message indicates that the source hadn't signal for longer than the timeout, which is technically accurate, but the source was entirely capable of emitting as much as the downstream subscriber wanted.
Should the family of timeout() operators be taking into account whether the downstream has actually requested more items? Ie: don't timeout backpressure?
(Apologies if this has already been asked or if I have overlooked another operator with the desired behavior.)