Skip to content

Flowable.timeout() signals TimeoutException during backpressure #7944

@davidmcote

Description

@davidmcote

The Flowable.timeout() operator signals TimeoutException when:

  • EITHER the upstream is emitting too slowly (expected)
  • OR if the downstream is requesting the upstream to emit items too slowly. (surprising)

Here's a toy example demonstrating TimeoutException during backpressure:

    @Test
    public void testTimeout() {
        Flowable
                .just("Hello, world!")
                .repeat(100)
                .timeout(1, TimeUnit.SECONDS)

                .blockingSubscribe(new Subscriber<>() {
                    Subscription sub;

                    private void slowRequest(final long n) {
                        Schedulers.io().scheduleDirect(() -> sub.request(n), 5, TimeUnit.SECONDS);
                    }

                    @Override
                    public void onSubscribe(final Subscription sub) {
                        this.sub = sub;
                        slowRequest(1);
                    }


                    @Override
                    public void onNext(final String o) {
                        System.out.println(o);
                        slowRequest(1);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        System.out.println(throwable);
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("COMPLETE");
                    }
                });
    }

Result:

java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.

The error message indicates that the source hadn't signal for longer than the timeout, which is technically accurate, but the source was entirely capable of emitting as much as the downstream subscriber wanted.

Should the family of timeout() operators be taking into account whether the downstream has actually requested more items? Ie: don't timeout backpressure?

(Apologies if this has already been asked or if I have overlooked another operator with the desired behavior.)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions