Skip to content

Commit 25b3821

Browse files
committed
Emit PgNotification on a duplicated context
Currently, notifications are emitted on the context captured when the connection is created. If a PgSubscriber was created while running on a DuplicatedContext, this would be context of emissions. This is wrong and could cause local data (e.g. tracing) to be still present when notification handlers are executed. This change consists in duplicating the connection context before emitting notifications. Signed-off-by: Thomas Segismont <[email protected]>
1 parent 529aee0 commit 25b3821

File tree

2 files changed

+57
-5
lines changed

2 files changed

+57
-5
lines changed

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import io.vertx.pgclient.impl.codec.NoticeResponse;
2929
import io.vertx.pgclient.impl.codec.TxFailedEvent;
3030
import io.vertx.pgclient.spi.PgDriver;
31-
import io.vertx.sqlclient.spi.connection.Connection;
3231
import io.vertx.sqlclient.codec.SocketConnectionBase;
3332
import io.vertx.sqlclient.internal.SqlConnectionBase;
33+
import io.vertx.sqlclient.spi.connection.Connection;
3434

3535
public class PgConnectionImpl extends SqlConnectionBase<PgConnectionImpl> implements PgConnection {
3636

@@ -67,10 +67,11 @@ public void handleEvent(Object event) {
6767
Handler<PgNotification> handler = notificationHandler;
6868
if (handler != null) {
6969
Notification notification = (Notification) event;
70-
handler.handle(new PgNotification()
70+
PgNotification pgNotification = new PgNotification()
7171
.setChannel(notification.getChannel())
7272
.setProcessId(notification.getProcessId())
73-
.setPayload(notification.getPayload()));
73+
.setPayload(notification.getPayload());
74+
context.duplicate().emit(pgNotification, handler);
7475
}
7576
} else if (event instanceof NoticeResponse) {
7677
Handler<PgNotice> handler = noticeHandler;
@@ -94,8 +95,7 @@ public void handleEvent(Object event) {
9495
.setDataType(noticeEvent.getDataType())
9596
.setConstraint(noticeEvent.getConstraint());
9697
if (handler != null) {
97-
handler.handle(notice
98-
);
98+
context.duplicate().emit(notice, handler);
9999
} else {
100100
notice.log(SocketConnectionBase.logger);
101101
}

vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PubSubTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.vertx.core.Vertx;
2020
import io.vertx.core.buffer.Buffer;
21+
import io.vertx.core.internal.ContextInternal;
2122
import io.vertx.ext.unit.Async;
2223
import io.vertx.ext.unit.TestContext;
2324
import io.vertx.pgclient.PgConnectOptions;
@@ -32,9 +33,12 @@
3233

3334
import java.util.Arrays;
3435
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.ConcurrentHashMap;
3537
import java.util.concurrent.atomic.AtomicInteger;
3638
import java.util.concurrent.atomic.AtomicReference;
3739

40+
import static io.vertx.core.internal.ContextInternal.LOCAL_MAP;
41+
3842
public class PubSubTest extends PgTestBase {
3943

4044
Vertx vertx;
@@ -361,4 +365,52 @@ public void testNoticedRaised(TestContext ctx) {
361365
}));
362366
}));
363367
}
368+
369+
@Test
370+
public void testSubscriberEmitsOnDuplicatedContext(TestContext ctx) {
371+
String channelName = "test_channel";
372+
String quotedChannelName = "\"" + channelName + "\"";
373+
374+
// Create a duplicated context with local data BEFORE creating the subscriber
375+
ContextInternal originalContext = (ContextInternal) vertx.getOrCreateContext();
376+
ContextInternal duplicatedContext = originalContext.duplicate();
377+
duplicatedContext.getLocal(LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar");
378+
379+
Async connectLatch = ctx.async();
380+
381+
duplicatedContext.runOnContext(v -> {
382+
ctx.assertEquals("bar", ContextInternal.current().getLocal(LOCAL_MAP, ConcurrentHashMap::new).get("foo"));
383+
384+
subscriber = PgSubscriber.subscriber(vertx, options);
385+
subscriber.connect().onComplete(v2 -> connectLatch.complete());
386+
});
387+
388+
connectLatch.awaitSuccess(10000);
389+
390+
Async notificationLatch = ctx.async();
391+
392+
duplicatedContext.runOnContext(v -> {
393+
ctx.assertEquals("bar", ContextInternal.current().getLocal(LOCAL_MAP, ConcurrentHashMap::new).get("foo"));
394+
395+
PgChannel channel = subscriber.channel(channelName);
396+
channel.handler(notif -> {
397+
ContextInternal currentContext = ContextInternal.current();
398+
ctx.assertTrue(currentContext.isDuplicate());
399+
ctx.assertNotEquals(duplicatedContext, currentContext);
400+
ctx.assertNull(currentContext.getLocal(LOCAL_MAP, ConcurrentHashMap::new).get("foo"));
401+
ctx.assertEquals("msg", notif);
402+
notificationLatch.countDown();
403+
});
404+
});
405+
406+
vertx.setTimer(100, t -> {
407+
subscriber
408+
.actualConnection()
409+
.query("NOTIFY " + quotedChannelName + ", 'msg'")
410+
.execute()
411+
.onComplete(ctx.asyncAssertSuccess());
412+
});
413+
414+
notificationLatch.awaitSuccess(10000);
415+
}
364416
}

0 commit comments

Comments
 (0)