Skip to content

Commit 923d56a

Browse files
authored
vertx_pool_queue_pending grows indefinitely when connection acquisition fails (#1626)
See #1625 We must invoke dequeueMetric in all cases, not just in case of timeout. Signed-off-by: Thomas Segismont <[email protected]>
1 parent 947cf72 commit 923d56a

File tree

2 files changed

+65
-0
lines changed

2 files changed

+65
-0
lines changed

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,9 @@ public <R> void execute(CommandBase<R> cmd, Completable<R> handler, long timeout
246246
pooled.refresh();
247247
lease.recycle();
248248
});
249+
}, t -> {
250+
dequeueMetric(metric);
251+
return Future.failedFuture(t);
249252
}).onComplete(ar -> {
250253
if (ar.succeeded()) {
251254
handler.succeed(ar.result());
@@ -284,6 +287,7 @@ public void complete(Lease<PooledConnection> lease, Throwable failure) {
284287
handle(lease);
285288
}
286289
} else {
290+
dequeueMetric(metric);
287291
handler.fail(failure);
288292
}
289293
}

vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,4 +342,65 @@ public void requestReset(Object requestMetric) {
342342
}));
343343
});
344344
}
345+
346+
@Test
347+
public void testGetConnectionFailure(TestContext ctx) throws Exception {
348+
testConnectionFailure(ctx, true);
349+
}
350+
351+
@Test
352+
public void testPooledConnectionFailure(TestContext ctx) throws Exception {
353+
testConnectionFailure(ctx, false);
354+
}
355+
356+
private void testConnectionFailure(TestContext ctx, boolean useGetConnection) throws Exception {
357+
AtomicInteger queueSize = new AtomicInteger();
358+
List<Object> enqueueMetrics = Collections.synchronizedList(new ArrayList<>());
359+
List<Object> dequeueMetrics = Collections.synchronizedList(new ArrayList<>());
360+
poolMetrics = new PoolMetrics() {
361+
@Override
362+
public Object enqueue() {
363+
Object metric = new Object();
364+
enqueueMetrics.add(metric);
365+
queueSize.incrementAndGet();
366+
return metric;
367+
}
368+
369+
@Override
370+
public void dequeue(Object taskMetric) {
371+
dequeueMetrics.add(taskMetric);
372+
queueSize.decrementAndGet();
373+
}
374+
375+
@Override
376+
public Object begin() {
377+
throw new IllegalStateException("Shouldn't be invoked");
378+
}
379+
380+
@Override
381+
public void end(Object usageMetric) {
382+
throw new IllegalStateException("Shouldn't be invoked");
383+
}
384+
};
385+
PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setName("the-pool");
386+
SqlConnectOptions connectOptions = connectOptions().setHost("does.not.exist.com");
387+
Pool pool = poolBuilder().with(poolOptions).using(vertx).connectingTo(connectOptions).build();
388+
int num = 16;
389+
List<Future<?>> futures = new ArrayList<>();
390+
for (int i = 0; i < num; i++) {
391+
Future<RowSet<Row>> future;
392+
if (useGetConnection) {
393+
future = pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute());
394+
} else {
395+
future = pool.query("SELECT * FROM immutable WHERE id=1").execute();
396+
}
397+
futures.add(future);
398+
}
399+
Future.join(futures).otherwiseEmpty().await(20, SECONDS);
400+
ctx.assertEquals(0, queueSize.get());
401+
ctx.assertEquals(num, enqueueMetrics.size());
402+
ctx.assertEquals(enqueueMetrics, dequeueMetrics);
403+
ctx.assertEquals("sql", poolType);
404+
ctx.assertEquals("the-pool", poolName);
405+
}
345406
}

0 commit comments

Comments
 (0)