Skip to content

Commit e38ef50

Browse files
committed
Minor tweaks
1 parent cd1900c commit e38ef50

File tree

12 files changed

+674
-106
lines changed

12 files changed

+674
-106
lines changed

src/main/java/groovy/concurrent/AsyncStream.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,36 @@ public interface AsyncStream<T> extends AutoCloseable {
7575
default void close() {
7676
}
7777

78+
/**
79+
* Converts the given source to an {@code AsyncStream}.
80+
* <p>
81+
* If the source is already an {@code AsyncStream}, it is returned as-is.
82+
* Otherwise, the {@link AwaitableAdapterRegistry} is consulted to find a
83+
* suitable adapter. Built-in adapters handle {@link Iterable} and
84+
* {@link java.util.Iterator}; the auto-discovered {@code FlowPublisherAdapter}
85+
* handles {@link java.util.concurrent.Flow.Publisher}; third-party frameworks
86+
* can register additional adapters via the registry.
87+
* <p>
88+
* This is the recommended entry point for converting external collection or
89+
* reactive types to {@code AsyncStream}:
90+
* <pre>
91+
* AsyncStream&lt;String&gt; stream = AsyncStream.from(myList)
92+
* AsyncStream&lt;Integer&gt; stream2 = AsyncStream.from(myFlowPublisher)
93+
* </pre>
94+
*
95+
* @param source the source object; must not be {@code null}
96+
* @param <T> the element type
97+
* @return an async stream backed by the source
98+
* @throws IllegalArgumentException if {@code source} is {@code null}
99+
* or no adapter supports the source type
100+
* @see AwaitableAdapterRegistry#toAsyncStream(Object)
101+
* @since 6.0.0
102+
*/
103+
@SuppressWarnings("unchecked")
104+
static <T> AsyncStream<T> from(Object source) {
105+
return AwaitableAdapterRegistry.toAsyncStream(source);
106+
}
107+
78108
/**
79109
* Returns an empty {@code AsyncStream} that completes immediately.
80110
*/

src/main/java/groovy/concurrent/Awaitable.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,18 @@
5454
* {@code Promise.allSettled()}</li>
5555
* <li>{@link #delay(long) Awaitable.delay(ms)} — like
5656
* {@code Task.Delay()} / {@code setTimeout}</li>
57-
* <li>{@link #timeout(Object, long) Awaitable.timeout(task, ms)} — like
57+
* <li>{@link #orTimeoutMillis(Object, long) Awaitable.orTimeoutMillis(task, ms)} — like
5858
* Kotlin's {@code withTimeout} or a JavaScript promise raced against a timer</li>
59-
* <li>{@link #timeoutOr(Object, Object, long) Awaitable.timeoutOr(task, fallback, ms)} —
59+
* <li>{@link #completeOnTimeoutMillis(Object, Object, long)
60+
* Awaitable.completeOnTimeoutMillis(task, fallback, ms)} —
6061
* like a timeout with fallback/default value</li>
6162
* </ul>
6263
* <p>
63-
* <b>Static factories:</b>
64+
* <b>Static factories and conversion:</b>
6465
* <ul>
66+
* <li>{@link #from(Object) Awaitable.from(source)} — converts any supported
67+
* async type (CompletableFuture, CompletionStage, Future, Flow.Publisher, etc.)
68+
* to an {@code Awaitable}</li>
6569
* <li>{@link #of(Object) Awaitable.of(value)} — like
6670
* {@code Task.FromResult()} / {@code Promise.resolve()}</li>
6771
* <li>{@link #failed(Throwable) Awaitable.failed(error)} — like
@@ -297,6 +301,35 @@ default Awaitable<T> completeOnTimeout(T fallback, long duration, TimeUnit unit)
297301

298302
// ---- Static factories ----
299303

304+
/**
305+
* Converts the given source to an {@code Awaitable}.
306+
* <p>
307+
* If the source is already an {@code Awaitable}, it is returned as-is.
308+
* Otherwise, the {@link AwaitableAdapterRegistry} is consulted to find a
309+
* suitable adapter. Built-in adapters handle {@link CompletableFuture},
310+
* {@link java.util.concurrent.CompletionStage}, {@link java.util.concurrent.Future},
311+
* and {@link java.util.concurrent.Flow.Publisher}; third-party frameworks
312+
* can register additional adapters via the registry.
313+
* <p>
314+
* This is the recommended entry point for converting external async types
315+
* to {@code Awaitable}:
316+
* <pre>
317+
* Awaitable&lt;String&gt; aw = Awaitable.from(someCompletableFuture)
318+
* Awaitable&lt;Integer&gt; aw2 = Awaitable.from(someReactorMono)
319+
* </pre>
320+
*
321+
* @param source the source object; must not be {@code null}
322+
* @param <T> the result type
323+
* @return an awaitable backed by the source
324+
* @throws IllegalArgumentException if {@code source} is {@code null}
325+
* or no adapter supports the source type
326+
* @see AwaitableAdapterRegistry#toAwaitable(Object)
327+
* @since 6.0.0
328+
*/
329+
static <T> Awaitable<T> from(Object source) {
330+
return AwaitableAdapterRegistry.toAwaitable(source);
331+
}
332+
300333
/**
301334
* Returns an already-completed {@code Awaitable} with the given value.
302335
* Analogous to C#'s {@code Task.FromResult()} or JavaScript's

src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,18 @@ public static void setBlockingExecutor(Executor executor) {
111111
/**
112112
* Converts the given source to an {@link Awaitable}.
113113
* If the source is already an {@code Awaitable}, it is returned as-is.
114+
* <p>
115+
* <b>Tip:</b> user code should generally prefer {@link Awaitable#from(Object)},
116+
* which delegates to this method but is more discoverable from the
117+
* {@code Awaitable} type itself.
114118
*
115119
* @param source the source object; must not be {@code null}
116120
* @throws IllegalArgumentException if {@code source} is {@code null}
117121
* or no adapter supports the source type
122+
* @see Awaitable#from(Object)
118123
*/
119124
@SuppressWarnings("unchecked")
120-
public static <T> Awaitable<T> toAwaitable(Object source) {
125+
static <T> Awaitable<T> toAwaitable(Object source) {
121126
if (source == null) {
122127
throw new IllegalArgumentException("Cannot convert null to Awaitable");
123128
}
@@ -136,13 +141,18 @@ public static <T> Awaitable<T> toAwaitable(Object source) {
136141
/**
137142
* Converts the given source to an {@link AsyncStream}.
138143
* If the source is already an {@code AsyncStream}, it is returned as-is.
144+
* <p>
145+
* <b>Tip:</b> user code should generally prefer {@link AsyncStream#from(Object)},
146+
* which delegates to this method but is more discoverable from the
147+
* {@code AsyncStream} type itself.
139148
*
140149
* @param source the source object; must not be {@code null}
141150
* @throws IllegalArgumentException if {@code source} is {@code null}
142151
* or no adapter supports the source type
152+
* @see AsyncStream#from(Object)
143153
*/
144154
@SuppressWarnings("unchecked")
145-
public static <T> AsyncStream<T> toAsyncStream(Object source) {
155+
static <T> AsyncStream<T> toAsyncStream(Object source) {
146156
if (source == null) {
147157
throw new IllegalArgumentException("Cannot convert null to AsyncStream");
148158
}

src/main/java/groovy/transform/Async.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@
107107
* <ul>
108108
* <li>Cannot be applied to abstract methods</li>
109109
* <li>Cannot be applied to constructors</li>
110-
* <li>Cannot be applied to methods that already return {@code Awaitable}</li>
110+
* <li>Cannot be applied to methods that already return an async type
111+
* ({@code Awaitable}, {@code AsyncStream}, {@code CompletableFuture},
112+
* {@code CompletionStage}, or {@code Future})</li>
111113
* </ul>
112114
*
113115
* @see groovy.concurrent.Awaitable

src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import groovy.concurrent.AsyncStream;
2222
import groovy.concurrent.AwaitResult;
2323
import groovy.concurrent.Awaitable;
24-
import groovy.concurrent.AwaitableAdapterRegistry;
2524
import groovy.lang.Closure;
2625

2726
import java.lang.invoke.MethodHandle;
@@ -64,8 +63,9 @@
6463
* <ul>
6564
* <li><b>Async execution</b> — {@code executeAsync()}, {@code executeAsyncVoid()},
6665
* and {@code wrapAsync()} run closures on the configured executor</li>
67-
* <li><b>Await</b> — all {@code await()} overloads go through the
68-
* {@link AwaitableAdapterRegistry} so that third-party async types
66+
* <li><b>Await</b> — all {@code await()} overloads use
67+
* {@link groovy.concurrent.Awaitable#from(Object) Awaitable.from()} so
68+
* that third-party async types
6969
* (RxJava {@code Single}, Reactor {@code Mono}, etc.) are supported
7070
* transparently once an adapter is registered</li>
7171
* <li><b>Async generators</b> — {@code generateAsyncStream()} manages the
@@ -112,7 +112,6 @@
112112
*
113113
* @see groovy.concurrent.Awaitable
114114
* @see groovy.transform.Async
115-
* @see Awaitable
116115
* @since 6.0.0
117116
*/
118117
public class AsyncSupport {
@@ -267,8 +266,8 @@ public static <T> T await(Future<T> future) {
267266
}
268267

269268
/**
270-
* Awaits an arbitrary object by adapting it to {@link Awaitable} via the
271-
* {@link AwaitableAdapterRegistry}. This is the fallback overload called
269+
* Awaits an arbitrary object by adapting it to {@link Awaitable} via
270+
* {@link Awaitable#from(Object)}. This is the fallback overload called
272271
* by the {@code await} expression when the compile-time type is not one
273272
* of the other supported types. Returns {@code null} for a {@code null}
274273
* argument.
@@ -285,7 +284,7 @@ public static <T> T await(Object source) {
285284
if (source instanceof CompletionStage) return await((CompletionStage<T>) source);
286285
if (source instanceof Future) return await((Future<T>) source);
287286
if (source instanceof Closure) return awaitClosure((Closure<?>) source);
288-
return await(AwaitableAdapterRegistry.<T>toAwaitable(source));
287+
return await(Awaitable.<T>from(source));
289288
}
290289

291290
/**
@@ -543,7 +542,7 @@ private static CompletableFuture<?> toCompletableFuture(Object source) {
543542
if (source instanceof CompletableFuture<?> cf) return cf;
544543
if (source instanceof Awaitable<?> a) return a.toCompletableFuture();
545544
if (source instanceof CompletionStage<?> cs) return cs.toCompletableFuture();
546-
return AwaitableAdapterRegistry.toAwaitable(source).toCompletableFuture();
545+
return Awaitable.from(source).toCompletableFuture();
547546
}
548547

549548
// ---- non-blocking combinators (return Awaitable) --------------------
@@ -813,7 +812,7 @@ public static Awaitable<Void> delay(long duration, TimeUnit unit) {
813812
public static <T> AsyncStream<T> toAsyncStream(Object source) {
814813
if (source == null) return AsyncStream.empty();
815814
if (source instanceof AsyncStream) return (AsyncStream<T>) source;
816-
return AwaitableAdapterRegistry.toAsyncStream(source);
815+
return AsyncStream.from(source);
817816
}
818817

819818
/**

src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,13 @@
6464
* <ul>
6565
* <li>§2.5 — duplicate {@code onSubscribe} cancels the second subscription</li>
6666
* <li>§2.13 — {@code null} items in {@code onNext} are rejected immediately</li>
67-
* <li>Terminal signals ({@code onError}/{@code onComplete}) use
67+
* <li>All signals ({@code onNext}, {@code onError}, {@code onComplete}) use
6868
* blocking {@code put()} to guarantee delivery even under queue
6969
* contention</li>
70+
* <li>Back-pressure is enforced by requesting exactly one item after
71+
* each consumed element; demand is signalled <em>before</em> the
72+
* consumer's {@code moveNext()} awaitable completes, preventing
73+
* livelock when producer and consumer share the same thread pool</li>
7074
* </ul>
7175
*
7276
* @see groovy.concurrent.AwaitableAdapterRegistry
@@ -227,9 +231,16 @@ private static final class ErrorSignal {
227231
* providing a pull-based iteration interface over a push-based source.
228232
*
229233
* <p>Back-pressure is enforced by requesting exactly one item after
230-
* each consumed element. The internal bounded queue (capacity
231-
* {@value QUEUE_CAPACITY}) absorbs minor timing jitter between
232-
* producer and consumer.</p>
234+
* each consumed element. Demand is signalled <em>before</em> the
235+
* consumer's {@code moveNext()} awaitable completes, so the publisher
236+
* can begin producing the next value while the consumer processes the
237+
* current one — this prevents livelock when producer and consumer
238+
* share the same thread pool.</p>
239+
*
240+
* <p>The internal bounded queue (capacity {@value QUEUE_CAPACITY})
241+
* absorbs minor timing jitter between producer and consumer. All
242+
* signals use blocking {@code put()}, ensuring no items or terminal
243+
* events are silently dropped.</p>
233244
*
234245
* <p><b>Resource management:</b> When the consumer calls
235246
* {@link AsyncStream#close()} (e.g. via {@code break} in a
@@ -277,10 +288,16 @@ public void onNext(T item) {
277288
return;
278289
}
279290
if (!closedRef.get()) {
280-
// Use offer() for value signals — if the queue is full (publisher
281-
// misbehaving with respect to demand), the item is dropped rather
282-
// than blocking the publisher thread indefinitely.
283-
queue.offer(new ValueSignal<>(item));
291+
try {
292+
// Blocking put() guarantees the item reaches the consumer.
293+
// Since demand is capped at 1 (one request(1) per moveNext),
294+
// a well-behaved publisher will never overflow the queue; put()
295+
// still protects against misbehaving publishers by blocking
296+
// rather than silently dropping the value.
297+
queue.put(new ValueSignal<>(item));
298+
} catch (InterruptedException ie) {
299+
Thread.currentThread().interrupt();
300+
}
284301
}
285302
}
286303

@@ -326,10 +343,16 @@ public Awaitable<Boolean> moveNext() {
326343

327344
if (signal instanceof ValueSignal) {
328345
current = ((ValueSignal<T>) signal).value;
329-
cf.complete(Boolean.TRUE);
330-
// Request the next item — back-pressure: one-at-a-time
346+
// Signal demand for the next item BEFORE completing
347+
// the awaitable, so the publisher can begin producing
348+
// the next value while the consumer processes this one.
349+
// Ordering here is critical: if request(1) were called
350+
// after cf.complete(), the consumer could re-enter
351+
// moveNext() and block in take() before demand was
352+
// signalled, creating a livelock.
331353
Flow.Subscription sub = subRef.get();
332354
if (sub != null) sub.request(1);
355+
cf.complete(Boolean.TRUE);
333356
} else if (signal instanceof ErrorSignal) {
334357
streamClosed.set(true);
335358
cf.completeExceptionally(((ErrorSignal) signal).error);
@@ -366,10 +389,14 @@ public void close() {
366389
if (sub != null) sub.cancel();
367390
// Drain the queue and inject a sentinel to unblock a
368391
// concurrent moveNext() that may be blocked in take().
369-
// Using offer() after clear() performs a non-blocking,
370-
// best-effort delivery of the sentinel to any waiter.
392+
// Using blocking put() after clear() guarantees delivery;
393+
// since the queue is freshly cleared, put() will not block.
371394
queue.clear();
372-
queue.offer(COMPLETE_SENTINEL);
395+
try {
396+
queue.put(COMPLETE_SENTINEL);
397+
} catch (InterruptedException ie) {
398+
Thread.currentThread().interrupt();
399+
}
373400
}
374401
}
375402
};

src/spec/doc/core-async-await.adoc

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,10 @@ RxJava 3 via adapters, Vert.x, etc.).
610610
611611
The internal adapter uses a bounded buffer (capacity 256) to bridge the push-based `Publisher`
612612
model to Groovy's pull-based `AsyncStream` model, preventing unbounded memory growth with
613-
fast publishers while maintaining throughput.
613+
fast publishers while maintaining throughput. Back-pressure is enforced by requesting exactly
614+
one item at a time; demand for the next element is signalled _before_ the consumer's
615+
`moveNext()` awaitable completes, so the publisher can begin producing the next value while
616+
the consumer processes the current one.
614617
615618
=== Awaiting a Single Value
616619
@@ -638,6 +641,24 @@ This is conceptually similar to JavaScript async iterators' `return()`, C#'s
638641
[[adapter-registry]]
639642
== Adapter Registry
640643
644+
=== Converting External Types with `from()`
645+
646+
The `Awaitable.from(source)` and `AsyncStream.from(source)` static methods are the recommended
647+
entry points for converting external async types to Groovy's native abstractions. They delegate
648+
to the `AwaitableAdapterRegistry` internally but provide a more discoverable, user-friendly API:
649+
650+
[source,groovy]
651+
----
652+
include::../test/AsyncAwaitSpecTest.groovy[tags=from_conversion,indent=0]
653+
----
654+
655+
`Awaitable.from()` accepts any type supported by a registered adapter: `CompletableFuture`,
656+
`CompletionStage`, `Future`, `Flow.Publisher`, or any third-party type with a custom adapter.
657+
If the source is already an `Awaitable`, it is returned as-is. `AsyncStream.from()` works
658+
similarly for multi-value sources such as `Iterable`, `Iterator`, or `Flow.Publisher`.
659+
660+
=== Built-in Adapters
661+
641662
The `groovy.concurrent.AwaitableAdapterRegistry` allows extending `await` to support additional
642663
asynchronous types from third-party frameworks. The built-in adapter handles:
643664
@@ -959,7 +980,9 @@ writes (adapter registration) are rare, reads (every `await`) are frequent
959980
* `AsyncStreamGenerator` close state — `AtomicBoolean` + `AtomicReference<Thread>` for prompt,
960981
idempotent close/cancellation signalling
961982
* `Flow.Publisher` adaptation (`FlowPublisherAdapter`) — `AtomicReference<Subscription>` with CAS-guarded
962-
onSubscribe (§2.5 compliance), `AtomicBoolean` done-guard for single-value path, and close-aware queue signalling
983+
onSubscribe (§2.5 compliance), `AtomicBoolean` done-guard for single-value path, and close-aware
984+
queue signalling; all signals (`onNext`/`onError`/`onComplete`) use blocking `put()` for
985+
guaranteed delivery; demand is signalled before the consumer's awaitable completes to prevent livelock
963986
* Defer scopes — per-task `ArrayDeque`, confined to a single thread (no sharing)
964987
* `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations
965988
@@ -1131,7 +1154,8 @@ JavaScript, C#, Kotlin, and Swift, for developers familiar with those languages.
11311154
| task priority / executor chosen by runtime
11321155
11331156
| **Third-party support**
1134-
| `AwaitableAdapterRegistry` SPI
1157+
| `AwaitableAdapterRegistry` SPI +
1158+
`Awaitable.from()` / `AsyncStream.from()`
11351159
| _(native `thenable` protocol)_
11361160
| Custom awaiters via `GetAwaiter()`
11371161
| coroutine adapters / bridges
@@ -1195,6 +1219,9 @@ JavaScript, C#, Kotlin, and Swift, for developers familiar with those languages.
11951219
| Pre-computed result
11961220
| `Awaitable.of(value)` / `Awaitable.failed(error)`
11971221
1222+
| Type conversion
1223+
| `Awaitable.from(source)` / `AsyncStream.from(source)`
1224+
11981225
| Annotation form
11991226
| `@Async def methodName() { ... }`
12001227

0 commit comments

Comments
 (0)