GROOVY-9381: Add native async/await support#2387
Conversation
ec35b7d to
5f8acbb
Compare
There was a problem hiding this comment.
Pull request overview
This PR implements GROOVY-9381 by adding native async/await syntax to the Groovy language (including for await, yield return async generators, and defer), plus runtime support and extensive documentation/tests.
Changes:
- Extend the Groovy grammar + AST builder to parse/compile
async,await,for await,yield return, anddefer. - Add/extend runtime primitives (
Awaitable,AsyncStream, adapters/registry, promise + generator implementation) and@AsyncAST transformation support. - Add comprehensive tests and new spec documentation; add Reactor/RxJava test dependencies for adapter interop coverage.
Reviewed changes
Copilot reviewed 29 out of 30 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| versions.properties | Adds version pins for Reactor and RxJava3 used by new integration tests. |
| build.gradle | Adds Reactor/RxJava3 as testImplementation dependencies. |
| gradle/verification-metadata.xml | Adds verification metadata entries for new test dependencies. |
| src/antlr/GroovyLexer.g4 | Introduces lexer tokens for async, await, defer. |
| src/antlr/GroovyParser.g4 | Adds grammar support for async closure/lambda, await expressions, for await, yield return, and defer. |
| src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java | Lowers new syntax into runtime calls (e.g., AsyncSupport.await, stream conversion/cleanup, defer, yieldReturn). |
| src/main/java/org/codehaus/groovy/ast/ModifierNode.java | Treats ASYNC as a non-bytecode modifier (opcode 0). |
| src/main/java/org/codehaus/groovy/transform/AsyncTransformHelper.java | Centralizes AST construction + scanning/rewriting for async constructs. |
| src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java | Implements @Async transformation using shared helper logic (await rewrite, generator/defer handling). |
| src/main/java/groovy/transform/Async.java | Adds @Async annotation and documentation. |
| src/main/java/groovy/concurrent/* | Adds new async public API (Awaitable, AsyncStream, adapters/registry, AwaitResult). |
| src/main/java/org/apache/groovy/runtime/async/* | Adds runtime implementations (GroovyPromise, AsyncStreamGenerator) used by compiler output. |
| src/spec/doc/core-async-await.adoc | Adds user-facing language documentation/spec for async/await. |
| src/test/groovy/org/codehaus/groovy/transform/* | Adds extensive test coverage for async/await, generators, defer, adapters, and virtual threads. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| | VOLATILE | ||
| | DEF | ||
| | VAR | ||
| | ASYNC |
There was a problem hiding this comment.
Adding ASYNC to the general modifier rule means async can now appear anywhere modifiers are allowed (e.g., on fields, variables, classes) and will likely be silently ignored (since it maps to opcode 0). If async is intended to be a method-only modifier, consider restricting it in the grammar or adding a parse/semantic error when async is used in unsupported contexts, to avoid misleading code compiling without effect.
| | ASYNC |
| def customPool = Executors.newFixedThreadPool(2, { r -> | ||
| def t = new Thread(r) | ||
| t.setName("custom-async-" + t.getId()) | ||
| t | ||
| }) | ||
| Awaitable.setExecutor(customPool) |
There was a problem hiding this comment.
The custom fixed thread pool created here is never shut down. Since the thread factory creates non-daemon threads, this can leak threads and potentially hang the test JVM. Please ensure the executor is shut down (preferably in the existing finally block) after restoring the previous executor.
| Awaitable.setExecutor(Executors.newSingleThreadExecutor()) | ||
| assert Awaitable.getExecutor() != originalExecutor | ||
| // Reset to null — should restore default | ||
| Awaitable.setExecutor(null) | ||
| def restored = Awaitable.getExecutor() | ||
| assert restored != null | ||
| // Verify it works | ||
| def task = async { 42 }; def awaitable = task() | ||
| assert await(awaitable) == 42 | ||
| // Restore original | ||
| Awaitable.setExecutor(originalExecutor) |
There was a problem hiding this comment.
This test sets the global executor to a newSingleThreadExecutor but never shuts that executor down. Even though the executor is reset to null later, the underlying thread can remain alive and leak across the suite. Please shut down the created executor (e.g., keep a reference and shutdown in a finally block).
| Awaitable.setExecutor(Executors.newSingleThreadExecutor()) | |
| assert Awaitable.getExecutor() != originalExecutor | |
| // Reset to null — should restore default | |
| Awaitable.setExecutor(null) | |
| def restored = Awaitable.getExecutor() | |
| assert restored != null | |
| // Verify it works | |
| def task = async { 42 }; def awaitable = task() | |
| assert await(awaitable) == 42 | |
| // Restore original | |
| Awaitable.setExecutor(originalExecutor) | |
| def customExecutor = Executors.newSingleThreadExecutor() | |
| try { | |
| Awaitable.setExecutor(customExecutor) | |
| assert Awaitable.getExecutor() != originalExecutor | |
| // Reset to null — should restore default | |
| Awaitable.setExecutor(null) | |
| def restored = Awaitable.getExecutor() | |
| assert restored != null | |
| // Verify it works | |
| def task = async { 42 }; def awaitable = task() | |
| assert await(awaitable) == 42 | |
| } finally { | |
| // Restore original and shut down custom executor to avoid thread leaks | |
| Awaitable.setExecutor(originalExecutor) | |
| customExecutor.shutdown() | |
| } |
| static Executor myPool = Executors.newFixedThreadPool(1, { r -> | ||
| def t = new Thread(r) | ||
| t.setName("my-pool-thread") | ||
| t | ||
| }) | ||
|
|
There was a problem hiding this comment.
The static Executor created via Executors.newFixedThreadPool uses non-daemon threads and is never shut down. This can leak threads for the remainder of the test run and may prevent the JVM from exiting. Consider using daemon threads (as other tests do) and/or explicitly shutting down the pool at the end of the script.
c3808d6 to
1afcb33
Compare
7dde6e5 to
9124f63
Compare
9124f63 to
5df6414
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 29 out of 32 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| throw createParsingFailedException("for await requires enhanced for syntax: for await (item in source)", ctx); | ||
| } | ||
|
|
||
| ClassNode varType = this.visitType(enhCtrl.type()); |
There was a problem hiding this comment.
enhCtrl.type() is likely optional in enhanced-for syntax (e.g. for (item in src)), so calling visitType(enhCtrl.type()) can throw if the type node is absent. Align this with the existing enhanced-for handling by defaulting to an implicit/dynamic type when enhCtrl.type() is null (and still applying modifiers via variableModifiersOpt).
| ClassNode varType = this.visitType(enhCtrl.type()); | |
| ClassNode varType = (enhCtrl.type() != null ? this.visitType(enhCtrl.type()) : ClassHelper.DYNAMIC_TYPE); |
| private static <T> void completeFrom(CompletableFuture<T> cf, Future<T> future) { | ||
| try { | ||
| cf.complete(future.get()); | ||
| } catch (Exception e) { | ||
| cf.completeExceptionally(e.getCause() != null ? e.getCause() : e); | ||
| } | ||
| } |
There was a problem hiding this comment.
This hides interrupt semantics: if future.get() throws InterruptedException, the thread interrupt flag is not restored, and the adapted awaitable completes exceptionally with InterruptedException rather than making interruption/cancellation behavior consistent with the rest of the async runtime (which restores interrupts and commonly uses CancellationException with a cause). Handle InterruptedException explicitly (restore interrupt flag and complete exceptionally with a consistent cancellation/interrupt exception) and handle ExecutionException separately to unwrap its cause without conflating it with interruption.
| * Awaitable<Map> fetchProfile(long id) { | ||
| * return [name: "User$id"] | ||
| * } | ||
| * | ||
| * {@code @}Async | ||
| * Awaitable<List> fetchOrders(long id) { |
There was a problem hiding this comment.
The Javadoc example contradicts the transformation’s validation in AsyncASTTransformation, which rejects methods that already return Awaitable. Update the example to use a non-Awaitable return type for @Async methods (e.g., Map / List / def) so the documented usage matches the actual compiler behavior.
| * Awaitable<Map> fetchProfile(long id) { | |
| * return [name: "User$id"] | |
| * } | |
| * | |
| * {@code @}Async | |
| * Awaitable<List> fetchOrders(long id) { | |
| * Map fetchProfile(long id) { | |
| * return [name: "User$id"] | |
| * } | |
| * | |
| * {@code @}Async | |
| * List fetchOrders(long id) { |
| ClassNode originalReturnType = mNode.getReturnType(); | ||
| if (AWAITABLE_TYPE.getName().equals(originalReturnType.getName()) | ||
| || ASYNC_STREAM_TYPE.getName().equals(originalReturnType.getName()) | ||
| || "java.util.concurrent.CompletableFuture".equals(originalReturnType.getName())) { |
There was a problem hiding this comment.
The error message says “already returns an async type”, but the check only blocks Awaitable, AsyncStream, and CompletableFuture. Since CompletionStage/Future are also async types in this PR’s model (and are handled by await/adapters), allowing @Async on those return types can yield confusing nested async types (e.g., Awaitable<CompletionStage<T>>). Consider expanding this validation to include CompletionStage and Future (or adjust the error text/documentation to reflect the narrower restriction).
| || "java.util.concurrent.CompletableFuture".equals(originalReturnType.getName())) { | |
| || "java.util.concurrent.CompletableFuture".equals(originalReturnType.getName()) | |
| || "java.util.concurrent.CompletionStage".equals(originalReturnType.getName()) | |
| || "java.util.concurrent.Future".equals(originalReturnType.getName())) { |
| @Test | ||
| void testAwaitReactorMonoDeferred() { | ||
| assertScript REACTOR_PREAMBLE + ''' | ||
| def mono = Mono.fromSupplier { Thread.sleep(50); "deferred" } |
There was a problem hiding this comment.
Tests relying on small Thread.sleep(...) durations can be flaky on loaded CI agents (timing variance) while not materially improving coverage of the feature. Prefer using deterministic synchronization (e.g., latches) or removing the sleep if the test intent is only to validate await semantics on a deferred publisher.
| def mono = Mono.fromSupplier { Thread.sleep(50); "deferred" } | |
| def mono = Mono.fromSupplier { "deferred" } |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 29 out of 32 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| public Awaitable<Boolean> moveNext() { | ||
| if (closed.get()) { | ||
| return Awaitable.of(false); | ||
| } | ||
| Thread currentThread = Thread.currentThread(); | ||
| consumerThread.set(currentThread); | ||
| // Double-check after registration: if close() raced between the first | ||
| // closed check and consumerThread.set(), the consumer reference was not | ||
| // yet visible to close(), so no interrupt was delivered. Without this | ||
| // re-check the consumer would block in queue.take() indefinitely. | ||
| if (closed.get()) { | ||
| consumerThread.compareAndSet(currentThread, null); | ||
| return Awaitable.of(false); | ||
| } |
| private static <T> AsyncStream<T> publisherToAsyncStream(Flow.Publisher<T> publisher) { | ||
| BlockingQueue<Object> queue = new LinkedBlockingQueue<>(256); | ||
| AtomicReference<Flow.Subscription> subRef = new AtomicReference<>(); | ||
| AtomicBoolean closedRef = new AtomicBoolean(false); | ||
|
|
||
| publisher.subscribe(new Flow.Subscriber<T>() { | ||
| @Override | ||
| public void onSubscribe(Flow.Subscription s) { | ||
| if (!closedRef.get()) { | ||
| subRef.set(s); | ||
| s.request(1); | ||
| } else { | ||
| s.cancel(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onNext(T item) { | ||
| if (!closedRef.get()) { | ||
| queue.offer(new ValueSignal<>(item)); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onError(Throwable t) { | ||
| if (!closedRef.get()) { | ||
| queue.offer(new ErrorSignal(t)); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onComplete() { | ||
| if (!closedRef.get()) { | ||
| queue.offer(COMPLETE_SENTINEL); | ||
| } | ||
| } | ||
| }); |
| cf.completeExceptionally(t); | ||
| } | ||
|
|
||
| @Override | ||
| public void onComplete() { | ||
| if (!cf.isDone()) cf.complete(null); |
| if (AWAITABLE_TYPE.getName().equals(originalReturnType.getName()) | ||
| || ASYNC_STREAM_TYPE.getName().equals(originalReturnType.getName()) | ||
| || "java.util.concurrent.CompletableFuture".equals(originalReturnType.getName())) { | ||
| addError(MY_TYPE_NAME + " cannot be applied to a method that already returns an async type", mNode); |
a0115c5 to
88e960c
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 30 out of 34 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| Thread producer = Thread.start { | ||
| gen.attachProducer(Thread.currentThread()) | ||
| try { | ||
| gen.yield(1) | ||
| gen.yield(2) | ||
| gen.yield(3) | ||
| gen.complete() | ||
| } finally { | ||
| gen.detachProducer(Thread.currentThread()) | ||
| } | ||
| } |
| // Using put() after clear() guarantees delivery because | ||
| // the queue was just emptied and has capacity. |
| Thread.start { | ||
| Thread.sleep(50) | ||
| publisher.submit('hello') | ||
| publisher.close() | ||
| } |
6529bbe to
e38ef50
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 30 out of 34 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| // Inject @Async annotation for methods declared with the 'async' keyword modifier | ||
| if (modifierManager.containsAny(ASYNC)) { | ||
| methodNode.addAnnotation(new AnnotationNode(ClassHelper.make("groovy.transform.Async"))); | ||
| } |
| public ExpressionStatement visitDeferStmtAlt(final DeferStmtAltContext ctx) { | ||
| Expression action; | ||
| ExpressionStatement stmtExprStmt = (ExpressionStatement) this.visit(ctx.statementExpression()); | ||
| Expression expr = stmtExprStmt.getExpression(); | ||
| if (expr instanceof ClosureExpression) { | ||
| // Already a closure/lambda — use directly as the defer action | ||
| action = expr; | ||
| } else { | ||
| // Wrap the statement expression in a closure: { -> expr } | ||
| ClosureExpression wrapper = closureX(Parameter.EMPTY_ARRAY, block(stmtExprStmt)); | ||
| wrapper.setSourcePosition(stmtExprStmt); | ||
| action = wrapper; | ||
| } | ||
| // Emit: AsyncSupport.defer($__deferScope__, action) | ||
| Expression deferCall = AsyncTransformHelper.buildDeferCall(action); | ||
| return configureAST(new ExpressionStatement(deferCall), ctx); | ||
| } |
| while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) } | ||
| subscribed.countDown() |
98c29c0 to
aa4e567
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 30 out of 34 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| void attachProducer(Thread thread) { | ||
| producerThread.set(thread); | ||
| if (closed.get()) { | ||
| thread.interrupt(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Unregisters the given thread as the producer for this stream. | ||
| * Called from a {@code finally} block after the generator body | ||
| * completes (normally or exceptionally). | ||
| * | ||
| * @param thread the producer thread to unregister | ||
| */ | ||
| void detachProducer(Thread thread) { | ||
| producerThread.compareAndSet(thread, null); | ||
| } |
| @Override | ||
| public ExpressionStatement visitYieldReturnStmtAlt(final YieldReturnStmtAltContext ctx) { | ||
| Expression expr = (Expression) this.visit(ctx.expression()); | ||
| Expression yieldCall = AsyncTransformHelper.buildYieldReturnCall(expr); | ||
| return configureAST(new ExpressionStatement(yieldCall), ctx); | ||
| } |
| public void onComplete() { | ||
| // Publisher completed before emitting — resolve to null | ||
| if (done.compareAndSet(false, true)) { | ||
| cf.complete(null); |
| @Override | ||
| public void onError(Throwable t) { | ||
| // Cancel subscription eagerly to release upstream resources | ||
| Flow.Subscription sub = subRef.getAndSet(null); | ||
| if (sub != null) sub.cancel(); | ||
| try { | ||
| // Terminal signals use blocking put() to guarantee delivery — | ||
| // the consumer MUST see the error to propagate it correctly. | ||
| queue.put(new ErrorSignal(t)); | ||
| } catch (InterruptedException ie) { | ||
| Thread.currentThread().interrupt(); | ||
| // Blocking put() was interrupted. Fall back to non-blocking | ||
| // offer(). If that also fails (queue full), set streamClosed | ||
| // so the consumer's next moveNext() returns false instead of | ||
| // blocking indefinitely. | ||
| if (!queue.offer(new ErrorSignal(t))) { | ||
| streamClosed.set(true); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onComplete() { | ||
| try { | ||
| // Blocking put() guarantees the consumer will see the sentinel, | ||
| // even if the queue was temporarily full from buffered values. | ||
| queue.put(COMPLETE_SENTINEL); | ||
| } catch (InterruptedException ie) { | ||
| Thread.currentThread().interrupt(); | ||
| // Same fallback as onError: try non-blocking offer, | ||
| // set streamClosed if that also fails. | ||
| if (!queue.offer(COMPLETE_SENTINEL)) { | ||
| streamClosed.set(true); | ||
| } | ||
| } | ||
| } |
| } | ||
| def future = task() | ||
| // Wait until the subscriber is registered with the publisher | ||
| while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) } |
| def publisher = new SubmissionPublisher<Integer>() | ||
| def future = new StreamConsumer().consumeAll(publisher) | ||
| Thread.start { | ||
| Thread.sleep(50) |
f043c2c to
d4f97d5
Compare
d1a6bb4 to
6106905
Compare
https://issues.apache.org/jira/browse/GROOVY-9381
See also: