|
1 | 1 | package gentle.async; |
2 | 2 |
|
| 3 | +import gentle.Result; |
3 | 4 | import lombok.AccessLevel; |
4 | 5 | import lombok.NonNull; |
5 | 6 | import lombok.RequiredArgsConstructor; |
6 | 7 |
|
| 8 | +import java.time.Duration; |
7 | 9 | import java.util.ArrayList; |
8 | 10 | import java.util.List; |
9 | | -import java.util.concurrent.Callable; |
10 | | -import java.util.concurrent.ExecutorService; |
11 | | -import java.util.concurrent.Executors; |
| 11 | +import java.util.concurrent.*; |
12 | 12 |
|
| 13 | +/** |
| 14 | + * A structured concurrency scope for managing asynchronous tasks. |
| 15 | + * <p> |
| 16 | + * The {@code Scope} allows creating and managing multiple asynchronous {@link Task tasks} in a single |
| 17 | + * logical scope. All tasks are automatically cancelled when the scope is closed. |
| 18 | + * |
| 19 | + * <p>Typical usage: |
| 20 | + * <pre>{@code |
| 21 | + * try (Scope scope = Scope.open()) { |
| 22 | + * Task<String> t1 = scope.async(() -> "Hello"); |
| 23 | + * Task<Integer> t2 = scope.async(() -> 42); |
| 24 | + * |
| 25 | + * System.out.println(t1.await()); |
| 26 | + * System.out.println(t2.await()); |
| 27 | + * } |
| 28 | + * }</pre> |
| 29 | + */ |
13 | 30 | @RequiredArgsConstructor(access = AccessLevel.PRIVATE) |
14 | 31 | public final class Scope implements AutoCloseable { |
| 32 | + private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(Utils.cores); |
| 33 | + |
| 34 | + /** ExecutorService used for running tasks in this scope. */ |
15 | 35 | private final ExecutorService executor; |
| 36 | + |
| 37 | + /** List of tasks created in this scope. */ |
16 | 38 | private final List<Task<?>> tasks = new ArrayList<>(); |
| 39 | + |
| 40 | + /** Indicates whether this scope has been closed. */ |
17 | 41 | private volatile boolean closed = false; |
18 | 42 |
|
| 43 | + /** |
| 44 | + * Opens a new scope with a cached thread pool. |
| 45 | + * |
| 46 | + * @return a new {@code Scope} |
| 47 | + */ |
19 | 48 | public static Scope open() { |
20 | 49 | return new Scope(Executors.newCachedThreadPool()); |
21 | 50 | } |
22 | 51 |
|
| 52 | + /** |
| 53 | + * Opens a new scope using a custom {@link ExecutorService}. |
| 54 | + * |
| 55 | + * @param executor the executor to run tasks |
| 56 | + * @return a new {@code Scope} |
| 57 | + */ |
23 | 58 | public static Scope open(@NonNull ExecutorService executor) { |
24 | 59 | return new Scope(executor); |
25 | 60 | } |
26 | 61 |
|
| 62 | + /** |
| 63 | + * Submits a new asynchronous {@link Task} to this scope. |
| 64 | + * <p> |
| 65 | + * The task is automatically cancelled when the scope is closed. |
| 66 | + * |
| 67 | + * @param supplier the task to execute |
| 68 | + * @param <T> the type of the task result |
| 69 | + * @return a {@link Task} representing the asynchronous computation |
| 70 | + * @throws IllegalStateException if the scope is already closed |
| 71 | + */ |
27 | 72 | public synchronized <T> Task<T> async(@NonNull Callable<T> supplier) { |
28 | 73 | if (closed) throw new IllegalStateException("Scope already closed"); |
29 | 74 | Task<T> task = new Task<>(executor.submit(supplier)); |
30 | 75 | tasks.add(task); |
31 | 76 | return task; |
32 | 77 | } |
33 | 78 |
|
| 79 | + /** |
| 80 | + * Submits a new asynchronous {@link Task} to this scope that will start after a specified delay. |
| 81 | + * <p> |
| 82 | + * The task is scheduled using a shared {@link ScheduledExecutorService}. When the delay elapses, |
| 83 | + * the {@code supplier} is executed and the result is completed in the returned {@link Task}. |
| 84 | + * The task is automatically cancelled if the scope is closed before it executes. |
| 85 | + * |
| 86 | + * <p>Example usage: |
| 87 | + * <pre>{@code |
| 88 | + * try (Scope scope = Scope.open()) { |
| 89 | + * Task<String> delayedTask = scope.delayed(Duration.ofSeconds(2), () -> "Hello after 2s"); |
| 90 | + * System.out.println(delayedTask.await()); |
| 91 | + * } |
| 92 | + * }</pre> |
| 93 | + * |
| 94 | + * @param delay the delay after which the task should execute |
| 95 | + * @param supplier the task to execute after the delay |
| 96 | + * @param <T> the type of the task result |
| 97 | + * @return a {@link Task} representing the delayed asynchronous computation |
| 98 | + * @throws IllegalStateException if the scope has already been closed |
| 99 | + */ |
| 100 | + public synchronized <T> Task<T> delayed(@NonNull Duration delay, @NonNull Callable<T> supplier) { |
| 101 | + if (closed) throw new IllegalStateException("Scope already closed"); |
| 102 | + |
| 103 | + CompletableFuture<T> future = new CompletableFuture<>(); |
| 104 | + ScheduledFuture<?> scheduled = scheduler.schedule(() -> { |
| 105 | + try { |
| 106 | + future.complete(supplier.call()); |
| 107 | + } catch (Throwable t) { |
| 108 | + future.completeExceptionally(t); |
| 109 | + } |
| 110 | + }, delay.toMillis(), TimeUnit.MILLISECONDS); |
| 111 | + |
| 112 | + Task<T> task = new Task<>(future) { |
| 113 | + @Override |
| 114 | + public void cancel() { |
| 115 | + scheduled.cancel(true); |
| 116 | + super.cancel(); |
| 117 | + } |
| 118 | + }; |
| 119 | + |
| 120 | + tasks.add(task); |
| 121 | + return task; |
| 122 | + } |
| 123 | + |
| 124 | + /** |
| 125 | + * Returns the number of tasks currently tracked by this scope. |
| 126 | + * |
| 127 | + * @return number of tasks |
| 128 | + */ |
34 | 129 | public synchronized int size() { |
35 | 130 | return tasks.size(); |
36 | 131 | } |
37 | 132 |
|
| 133 | + /** |
| 134 | + * Closes the scope. |
| 135 | + * <p> |
| 136 | + * Cancels all tasks and shuts down the executor. If multiple tasks throw exceptions during |
| 137 | + * cancellation, the first exception is rethrown and subsequent exceptions are suppressed. |
| 138 | + * |
| 139 | + * @throws Exception if any task cancellation or shutdown fails |
| 140 | + */ |
38 | 141 | @Override |
39 | 142 | public void close() throws Exception { |
40 | 143 | List<Throwable> errors = new ArrayList<>(); |
|
0 commit comments