Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ public <R> void execute(CommandBase<R> cmd, Completable<R> handler, long timeout
pooled.refresh();
lease.recycle();
});
}, t -> {
dequeueMetric(metric);
return Future.failedFuture(t);
}).onComplete(ar -> {
if (ar.succeeded()) {
handler.succeed(ar.result());
Expand Down Expand Up @@ -281,6 +284,7 @@ public void complete(Lease<PooledConnection> lease, Throwable failure) {
handle(lease);
}
} else {
dequeueMetric(metric);
handler.fail(failure);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,65 @@ public void requestReset(Object requestMetric) {
}));
});
}

@Test
public void testGetConnectionFailure(TestContext ctx) throws Exception {
testConnectionFailure(ctx, true);
}

@Test
public void testPooledConnectionFailure(TestContext ctx) throws Exception {
testConnectionFailure(ctx, false);
}

private void testConnectionFailure(TestContext ctx, boolean useGetConnection) throws Exception {
AtomicInteger queueSize = new AtomicInteger();
List<Object> enqueueMetrics = Collections.synchronizedList(new ArrayList<>());
List<Object> dequeueMetrics = Collections.synchronizedList(new ArrayList<>());
poolMetrics = new PoolMetrics() {
@Override
public Object enqueue() {
Object metric = new Object();
enqueueMetrics.add(metric);
queueSize.incrementAndGet();
return metric;
}

@Override
public void dequeue(Object taskMetric) {
dequeueMetrics.add(taskMetric);
queueSize.decrementAndGet();
}

@Override
public Object begin() {
throw new IllegalStateException("Shouldn't be invoked");
}

@Override
public void end(Object usageMetric) {
throw new IllegalStateException("Shouldn't be invoked");
}
};
PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setName("the-pool");
SqlConnectOptions connectOptions = connectOptions().setHost("does.not.exist.com");
Pool pool = poolBuilder().with(poolOptions).using(vertx).connectingTo(connectOptions).build();
int num = 16;
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < num; i++) {
Future<RowSet<Row>> future;
if (useGetConnection) {
future = pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute());
} else {
future = pool.query("SELECT * FROM immutable WHERE id=1").execute();
}
futures.add(future);
}
Future.join(futures).otherwiseEmpty().await(20, SECONDS);
ctx.assertEquals(0, queueSize.get());
ctx.assertEquals(num, enqueueMetrics.size());
ctx.assertEquals(enqueueMetrics, dequeueMetrics);
ctx.assertEquals("sql", poolType);
ctx.assertEquals("the-pool", poolName);
}
}