Skip to content

Commit 513163f

Browse files
committed
implement issue 4971
1 parent 2e588f2 commit 513163f

File tree

5 files changed

+54
-18
lines changed

5 files changed

+54
-18
lines changed

vertx-core/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import io.vertx.core.streams.impl.InboundBuffer;
3232

3333
import java.io.IOException;
34+
import java.lang.invoke.MethodHandles;
35+
import java.lang.invoke.VarHandle;
3436
import java.nio.ByteBuffer;
3537
import java.nio.channels.AsynchronousFileChannel;
3638
import java.nio.channels.CompletionHandler;
@@ -43,14 +45,15 @@
4345
import java.nio.file.attribute.PosixFilePermissions;
4446
import java.util.HashSet;
4547
import java.util.Objects;
46-
import java.util.concurrent.atomic.AtomicBoolean;
4748
import java.util.concurrent.atomic.AtomicInteger;
4849

4950
/**
5051
* @author <a href="http://tfox.org">Tim Fox</a>
5152
*/
5253
public class AsyncFileImpl implements AsyncFile {
5354

55+
private static final VarHandle SEND_FAILURE = MethodHandles.arrayElementVarHandle(boolean[].class);
56+
5457
private static final Logger log = LoggerFactory.getLogger(AsyncFile.class);
5558

5659
public static final int DEFAULT_READ_BUFFER_SIZE = 8192;
@@ -332,7 +335,7 @@ private void handleException(Throwable t) {
332335

333336
private synchronized void doWrite(ByteBuffer[] buffers, long position, Handler<AsyncResult<Void>> handler) {
334337
AtomicInteger cnt = new AtomicInteger();
335-
AtomicBoolean sentFailure = new AtomicBoolean();
338+
boolean[] sentFailure = new boolean[1];
336339
for (ByteBuffer b: buffers) {
337340
int limit = b.limit();
338341
doWrite(b, position, limit, ar -> {
@@ -341,7 +344,7 @@ private synchronized void doWrite(ByteBuffer[] buffers, long position, Handler<A
341344
handler.handle(ar);
342345
}
343346
} else {
344-
if (sentFailure.compareAndSet(false, true)) {
347+
if (SEND_FAILURE.compareAndSet(sentFailure, 0, false, true)) {
345348
handler.handle(ar);
346349
}
347350
}

vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,15 @@
7373

7474
import java.io.IOException;
7575
import java.io.InputStream;
76+
import java.lang.invoke.MethodHandles;
77+
import java.lang.invoke.VarHandle;
7678
import java.lang.ref.Cleaner;
7779
import java.lang.ref.WeakReference;
7880
import java.lang.reflect.Method;
7981
import java.net.InetSocketAddress;
8082
import java.nio.charset.StandardCharsets;
8183
import java.util.*;
8284
import java.util.concurrent.*;
83-
import java.util.concurrent.atomic.AtomicBoolean;
8485
import java.util.concurrent.atomic.AtomicInteger;
8586
import java.util.concurrent.atomic.AtomicLong;
8687
import java.util.function.Function;
@@ -107,6 +108,16 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
107108
// https://github.com/eclipse-vertx/vert.x/issues/4611
108109

109110
private static final Logger log = LoggerFactory.getLogger(VertxImpl.class);
111+
private static final VarHandle INTERNAL_TIMER_HANDLER_DISPOSED;
112+
113+
static {
114+
try {
115+
INTERNAL_TIMER_HANDLER_DISPOSED = MethodHandles.lookup()
116+
.findVarHandle(InternalTimerHandler.class, "disposed", boolean.class);
117+
} catch (NoSuchFieldException | IllegalAccessException e) {
118+
throw new IllegalArgumentException(e);
119+
}
120+
}
110121

111122
static final Object[] EMPTY_CONTEXT_LOCALS = new Object[0];
112123
private static final String CLUSTER_MAP_NAME = "__vertx.haInfo";
@@ -1063,7 +1074,7 @@ class InternalTimerHandler implements Handler<Void>, Closeable, Runnable {
10631074
private final boolean periodic;
10641075
private final long id;
10651076
private final ContextInternal context;
1066-
private final AtomicBoolean disposed = new AtomicBoolean();
1077+
private volatile boolean disposed;
10671078
private volatile java.util.concurrent.Future<?> future;
10681079

10691080
InternalTimerHandler(long id, Handler<Long> runnable, boolean periodic, ContextInternal context) {
@@ -1084,10 +1095,10 @@ public void run() {
10841095

10851096
public void handle(Void v) {
10861097
if (periodic) {
1087-
if (!disposed.get()) {
1098+
if (!disposed) {
10881099
handler.handle(id);
10891100
}
1090-
} else if (disposed.compareAndSet(false, true)) {
1101+
} else if (compareAndSetDisposed()) {
10911102
timeouts.remove(id);
10921103
try {
10931104
handler.handle(id);
@@ -1109,7 +1120,7 @@ private boolean cancel() {
11091120
}
11101121

11111122
private boolean tryCancel() {
1112-
if (disposed.compareAndSet(false, true)) {
1123+
if (compareAndSetDisposed()) {
11131124
timeouts.remove(id);
11141125
future.cancel(false);
11151126
return true;
@@ -1123,6 +1134,10 @@ public void close(Completable<Void> completion) {
11231134
tryCancel();
11241135
completion.succeed();
11251136
}
1137+
1138+
private boolean compareAndSetDisposed() {
1139+
return INTERNAL_TIMER_HANDLER_DISPOSED.compareAndSet(this, false, true);
1140+
}
11261141
}
11271142

11281143
@Override

vertx-core/src/main/java/io/vertx/core/net/endpoint/impl/EndpointResolverImpl.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private void close() {
128128
private class ManagedEndpoint extends ManagedResource {
129129

130130
private final Future<EndpointImpl> endpoint;
131-
private final AtomicBoolean disposed = new AtomicBoolean();
131+
private volatile boolean disposed;
132132

133133
public ManagedEndpoint(Future<EndpointImpl> endpoint) {
134134
super();
@@ -145,12 +145,20 @@ protected void cleanup() {
145145
@Override
146146
protected void checkExpired() {
147147
if (endpoint.succeeded() && keepAliveMillis > 0 && System.currentTimeMillis() - endpoint.result().lastAccessed.get() >= keepAliveMillis) {
148-
if (disposed.compareAndSet(false, true)) {
148+
if (markDisposed()) {
149149
decRefCount();
150150
}
151151
}
152152
}
153153

154+
private synchronized boolean markDisposed() {
155+
if (disposed) {
156+
return false;
157+
}
158+
disposed = true;
159+
return true;
160+
}
161+
154162
@Override
155163
public boolean incRefCount() {
156164
return super.incRefCount();
@@ -215,7 +223,7 @@ private ManagedEndpoint resolveAddress(A address) {
215223
}, fn);
216224
if (sFuture.created) {
217225
sFuture.fut.onFailure(err -> {
218-
if (sFuture.endpoint.disposed.compareAndSet(false, true)) {
226+
if (sFuture.endpoint.markDisposed()) {
219227
// We need to call decRefCount outside the withEndpoint method, hence we need
220228
// the Result class workaround
221229
sFuture.endpoint.decRefCount();

vertx-core/src/main/java/io/vertx/core/shareddata/impl/LocalAsyncLocks.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,28 @@
1616
import io.vertx.core.internal.ContextInternal;
1717
import io.vertx.core.shareddata.Lock;
1818

19+
import java.lang.invoke.MethodHandles;
20+
import java.lang.invoke.VarHandle;
1921
import java.util.ArrayList;
2022
import java.util.List;
2123
import java.util.concurrent.ConcurrentHashMap;
2224
import java.util.concurrent.ConcurrentMap;
23-
import java.util.concurrent.atomic.AtomicBoolean;
2425

2526
/**
2627
* @author Thomas Segismont
2728
*/
2829
public class LocalAsyncLocks {
2930

31+
private static final VarHandle ASYNC_LOCK_INVOKED;
32+
33+
static {
34+
try {
35+
ASYNC_LOCK_INVOKED = MethodHandles.lookup().findVarHandle(AsyncLock.class, "invoked", boolean.class);
36+
} catch (NoSuchFieldException | IllegalAccessException e) {
37+
throw new IllegalArgumentException(e);
38+
}
39+
}
40+
3041
private class LockWaiter {
3142

3243
final ContextInternal context;
@@ -77,15 +88,15 @@ void acquireLock() {
7788
private class AsyncLock implements LockInternal {
7889

7990
final String lockName;
80-
final AtomicBoolean invoked = new AtomicBoolean();
91+
private volatile boolean invoked;
8192

8293
AsyncLock(String lockName) {
8394
this.lockName = lockName;
8495
}
8596

8697
@Override
8798
public void release() {
88-
if (invoked.compareAndSet(false, true)) {
99+
if (ASYNC_LOCK_INVOKED.compareAndSet(this, false, true)) {
89100
nextWaiter(lockName);
90101
}
91102
}

vertx-core/src/main/java/io/vertx/core/shareddata/impl/LocalAsyncMapImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.Set;
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.ConcurrentMap;
27-
import java.util.concurrent.atomic.AtomicBoolean;
2827

2928
import static java.util.concurrent.TimeUnit.*;
3029
import static java.util.stream.Collectors.*;
@@ -103,18 +102,18 @@ public Future<V> putIfAbsent(K k, V v, long ttl) {
103102
@Override
104103
public Future<Boolean> removeIfPresent(K k, V v) {
105104
ContextInternal ctx = vertx.getOrCreateContext();
106-
AtomicBoolean result = new AtomicBoolean();
105+
boolean[] result = new boolean[1];
107106
map.computeIfPresent(k, (key, holder) -> {
108107
if (holder.value.equals(v)) {
109-
result.compareAndSet(false, true);
108+
result[0] = true;
110109
if (holder.expires()) {
111110
vertx.cancelTimer(holder.timerId);
112111
}
113112
return null;
114113
}
115114
return holder;
116115
});
117-
return ctx.succeededFuture(result.get());
116+
return ctx.succeededFuture(result[0]);
118117
}
119118

120119
@Override

0 commit comments

Comments
 (0)