Skip to content

Commit 87725fe

Browse files
committed
Emit PgNotification on a duplicated context
Currently, notifications are emitted on the context captured when the connection is created. If a PgSubscriber was created while running on a DuplicatedContext, this would be context of emissions. This is wrong and could cause local data (e.g. tracing) to be still present when notification handlers are executed. This change consists in unwrapping the connection context at creation time, and then duplicating it before emitting notifications. Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
1 parent b88802e commit 87725fe

File tree

3 files changed

+66
-14
lines changed

3 files changed

+66
-14
lines changed

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import io.vertx.pgclient.impl.codec.NoticeResponse;
2929
import io.vertx.pgclient.impl.codec.TxFailedEvent;
3030
import io.vertx.pgclient.spi.PgDriver;
31-
import io.vertx.sqlclient.spi.connection.Connection;
3231
import io.vertx.sqlclient.codec.SocketConnectionBase;
3332
import io.vertx.sqlclient.internal.SqlConnectionBase;
33+
import io.vertx.sqlclient.spi.connection.Connection;
3434

3535
public class PgConnectionImpl extends SqlConnectionBase<PgConnectionImpl> implements PgConnection {
3636

@@ -67,10 +67,11 @@ public void handleEvent(Object event) {
6767
Handler<PgNotification> handler = notificationHandler;
6868
if (handler != null) {
6969
Notification notification = (Notification) event;
70-
handler.handle(new PgNotification()
70+
PgNotification pgNotification = new PgNotification()
7171
.setChannel(notification.getChannel())
7272
.setProcessId(notification.getProcessId())
73-
.setPayload(notification.getPayload()));
73+
.setPayload(notification.getPayload());
74+
context.duplicate().emit(pgNotification, handler);
7475
}
7576
} else if (event instanceof NoticeResponse) {
7677
Handler<PgNotice> handler = noticeHandler;
@@ -94,8 +95,7 @@ public void handleEvent(Object event) {
9495
.setDataType(noticeEvent.getDataType())
9596
.setConstraint(noticeEvent.getConstraint());
9697
if (handler != null) {
97-
handler.handle(notice
98-
);
98+
context.duplicate().emit(notice, handler);
9999
} else {
100100
notice.log(SocketConnectionBase.logger);
101101
}

vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PubSubTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.vertx.core.Vertx;
2020
import io.vertx.core.buffer.Buffer;
21+
import io.vertx.core.internal.ContextInternal;
2122
import io.vertx.ext.unit.Async;
2223
import io.vertx.ext.unit.TestContext;
2324
import io.vertx.pgclient.PgConnectOptions;
@@ -32,9 +33,12 @@
3233

3334
import java.util.Arrays;
3435
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.ConcurrentHashMap;
3537
import java.util.concurrent.atomic.AtomicInteger;
3638
import java.util.concurrent.atomic.AtomicReference;
3739

40+
import static io.vertx.core.internal.ContextInternal.LOCAL_MAP;
41+
3842
public class PubSubTest extends PgTestBase {
3943

4044
Vertx vertx;
@@ -361,4 +365,52 @@ public void testNoticedRaised(TestContext ctx) {
361365
}));
362366
}));
363367
}
368+
369+
@Test
370+
public void testSubscriberEmitsOnDuplicatedContext(TestContext ctx) {
371+
String channelName = "test_channel";
372+
String quotedChannelName = "\"" + channelName + "\"";
373+
374+
// Create a duplicated context with local data BEFORE creating the subscriber
375+
ContextInternal originalContext = (ContextInternal) vertx.getOrCreateContext();
376+
ContextInternal duplicatedContext = originalContext.duplicate();
377+
duplicatedContext.getLocal(LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar");
378+
379+
Async connectLatch = ctx.async();
380+
381+
duplicatedContext.runOnContext(v -> {
382+
ctx.assertEquals("bar", ContextInternal.current().getLocal(LOCAL_MAP, ConcurrentHashMap::new).get("foo"));
383+
384+
subscriber = PgSubscriber.subscriber(vertx, options);
385+
subscriber.connect().onComplete(v2 -> connectLatch.complete());
386+
});
387+
388+
connectLatch.awaitSuccess(10000);
389+
390+
Async notificationLatch = ctx.async();
391+
392+
duplicatedContext.runOnContext(v -> {
393+
ctx.assertEquals("bar", ContextInternal.current().getLocal(LOCAL_MAP, ConcurrentHashMap::new).get("foo"));
394+
395+
PgChannel channel = subscriber.channel(channelName);
396+
channel.handler(notif -> {
397+
ContextInternal currentContext = ContextInternal.current();
398+
ctx.assertTrue(currentContext.isDuplicate());
399+
ctx.assertNotEquals(duplicatedContext, currentContext);
400+
ctx.assertNull(currentContext.getLocal(LOCAL_MAP, ConcurrentHashMap::new).get("foo"));
401+
ctx.assertEquals("msg", notif);
402+
notificationLatch.countDown();
403+
});
404+
});
405+
406+
vertx.setTimer(100, t -> {
407+
subscriber
408+
.actualConnection()
409+
.query("NOTIFY " + quotedChannelName + ", 'msg'")
410+
.execute()
411+
.onComplete(ctx.asyncAssertSuccess());
412+
});
413+
414+
notificationLatch.awaitSuccess(10000);
415+
}
364416
}

vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,26 @@
1818
package io.vertx.sqlclient.internal;
1919

2020
import io.vertx.core.*;
21-
import io.vertx.core.spi.metrics.ClientMetrics;
22-
import io.vertx.core.spi.tracing.VertxTracer;
2321
import io.vertx.core.internal.ContextInternal;
2422
import io.vertx.core.internal.PromiseInternal;
23+
import io.vertx.core.spi.metrics.ClientMetrics;
24+
import io.vertx.core.spi.tracing.VertxTracer;
2525
import io.vertx.sqlclient.PrepareOptions;
2626
import io.vertx.sqlclient.PreparedStatement;
2727
import io.vertx.sqlclient.SqlConnection;
2828
import io.vertx.sqlclient.Transaction;
2929
import io.vertx.sqlclient.impl.PreparedStatementBase;
3030
import io.vertx.sqlclient.impl.TransactionImpl;
31-
import io.vertx.sqlclient.spi.connection.ConnectionContext;
32-
import io.vertx.sqlclient.spi.protocol.CommandBase;
33-
import io.vertx.sqlclient.spi.connection.Connection;
34-
import io.vertx.sqlclient.spi.protocol.PrepareStatementCommand;
35-
import io.vertx.sqlclient.spi.protocol.QueryCommandBase;
3631
import io.vertx.sqlclient.impl.pool.SqlConnectionPool;
3732
import io.vertx.sqlclient.impl.tracing.QueryReporter;
38-
import io.vertx.sqlclient.spi.connection.ConnectionFactory;
3933
import io.vertx.sqlclient.spi.DatabaseMetadata;
4034
import io.vertx.sqlclient.spi.Driver;
35+
import io.vertx.sqlclient.spi.connection.Connection;
36+
import io.vertx.sqlclient.spi.connection.ConnectionContext;
37+
import io.vertx.sqlclient.spi.connection.ConnectionFactory;
38+
import io.vertx.sqlclient.spi.protocol.CommandBase;
39+
import io.vertx.sqlclient.spi.protocol.PrepareStatementCommand;
40+
import io.vertx.sqlclient.spi.protocol.QueryCommandBase;
4141

4242
/**
4343
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
@@ -54,7 +54,7 @@ public class SqlConnectionBase<C extends SqlConnectionBase<C>> extends SqlClient
5454

5555
public SqlConnectionBase(ContextInternal context, ConnectionFactory factory, Connection conn, Driver driver) {
5656
super(driver);
57-
this.context = context;
57+
this.context = context.unwrap();
5858
this.factory = factory;
5959
this.conn = conn;
6060
}

0 commit comments

Comments
 (0)