Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,20 +284,18 @@ private CursorSendEngine(String sfDir, long segmentSizeBytes, SegmentManager man
this.ring = ringInProgress;
this.watermark = watermarkInProgress;
} catch (Throwable t) {
// Order: ring first (releases mmap/fd), then manager (joins
// worker thread, but only if we started it AND we own it),
// then watermark (releases its own mmap/fd), then slot lock.
// Each in its own try/catch so a single failure doesn't
// strand later cleanups.
if (ringInProgress != null) {
// Stop an owned manager before freeing the ring and watermark it may
// touch, then release the slot lock. Each cleanup is in its own
// try/catch so a single failure doesn't strand later cleanups.
if (ownsManager && managerStarted) {
try {
ringInProgress.close();
manager.close();
} catch (Throwable ignored) {
}
}
if (ownsManager && managerStarted) {
if (ringInProgress != null) {
try {
manager.close();
ringInProgress.close();
} catch (Throwable ignored) {
}
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.questdb.client.cutlass.qwp.client.sf.cursor.AckWatermark;
import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorSendEngine;
import io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment;
import io.questdb.client.cutlass.qwp.client.sf.cursor.SegmentManager;
import io.questdb.client.cutlass.qwp.client.sf.cursor.SlotLock;
import io.questdb.client.std.Files;
import io.questdb.client.std.MemoryTag;
import io.questdb.client.std.ObjList;
Expand All @@ -37,6 +39,9 @@
import org.junit.Before;
import org.junit.Test;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Paths;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -190,6 +195,50 @@ public void testCloseIsIdempotent() throws Exception {
});
}

@Test
public void testConstructorFailureAfterOwnedManagerStartCleansResources() throws Exception {
TestUtils.assertMemoryLeak(() -> {
SegmentManager manager = new SegmentManager(4096);
poisonRegisterGeneration(manager);

Throwable thrown = invokeOwnedPrivateConstructorExpectingFailure(tmpDir, 4096, manager);
assertTrue("register sabotage should surface from constructor catch: " + thrown,
thrown instanceof NullPointerException);

assertNull("owned manager worker must be stopped by constructor catch",
workerThread(manager));
assertSlotCanBeReacquired(tmpDir);
});
}

@Test
public void testConstructorFailureWithSharedManagerReleasesSlotButKeepsManagerRunning() throws Exception {
TestUtils.assertMemoryLeak(() -> {
SegmentManager manager = new SegmentManager(4096);
try {
manager.start();
Thread originalWorker = workerThread(manager);
assertNotNull("shared manager must be running before constructor", originalWorker);
assertTrue("shared manager worker must be alive before constructor",
originalWorker.isAlive());

poisonRegisterGeneration(manager);
Throwable thrown = invokeSharedConstructorExpectingFailure(tmpDir, 4096, manager);
assertTrue("register sabotage should surface from constructor catch: " + thrown,
thrown instanceof NullPointerException);

Thread stillOwnedByCaller = workerThread(manager);
assertNotNull("constructor catch must not close caller-owned manager",
stillOwnedByCaller);
assertTrue("caller-owned manager worker must remain alive",
stillOwnedByCaller.isAlive());
assertSlotCanBeReacquired(tmpDir);
} finally {
manager.close();
}
});
}

@Test
public void testMemoryModeSkipsDirAndStillWorks() throws Exception {
TestUtils.assertMemoryLeak(() -> {
Expand Down Expand Up @@ -500,4 +549,52 @@ public void testWasRecoveredFromDiskTrueOnReopen() throws Exception {
}
});
}

private static void assertSlotCanBeReacquired(String sfDir) {
try (SlotLock ignored = SlotLock.acquire(sfDir)) {
// good
}
}

private static Throwable invokeOwnedPrivateConstructorExpectingFailure(
String sfDir, long segmentSizeBytes, SegmentManager manager) throws Exception {
Constructor<CursorSendEngine> ctor = CursorSendEngine.class.getDeclaredConstructor(
String.class, long.class, SegmentManager.class, boolean.class, long.class);
ctor.setAccessible(true);
try {
ctor.newInstance(sfDir, segmentSizeBytes, manager, true,
CursorSendEngine.DEFAULT_APPEND_DEADLINE_NANOS);
fail("expected constructor failure");
return null;
} catch (InvocationTargetException e) {
return e.getCause();
}
}

private static Throwable invokeSharedConstructorExpectingFailure(
String sfDir, long segmentSizeBytes, SegmentManager manager) {
try {
new CursorSendEngine(sfDir, segmentSizeBytes, manager);
fail("expected constructor failure");
return null;
} catch (Throwable t) {
return t;
}
}

private static void poisonRegisterGeneration(SegmentManager manager) throws Exception {
// register() advances fileGeneration before publishing the ring. Nulling
// it forces a deterministic constructor failure after the ring and
// watermark exist, without adding a production test hook.
Field f = SegmentManager.class.getDeclaredField("fileGeneration");
f.setAccessible(true);
f.set(manager, null);
}

private static Thread workerThread(SegmentManager manager) throws Exception {
Field f = SegmentManager.class.getDeclaredField("workerThread");
f.setAccessible(true);
return (Thread) f.get(manager);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment;
import io.questdb.client.cutlass.qwp.client.sf.cursor.SegmentManager;
import io.questdb.client.cutlass.qwp.client.sf.cursor.SegmentRing;
import io.questdb.client.std.bytes.DirectByteSink;
import io.questdb.client.std.Files;
import io.questdb.client.std.str.DirectUtf8Sink;
import io.questdb.client.test.tools.TestUtils;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -36,6 +38,10 @@

import java.lang.reflect.Field;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
* Concurrent regression for the {@code SegmentManager} worker race vs
Expand Down Expand Up @@ -130,6 +136,76 @@ public void testManagerDoesNotInstallSpareIntoClosedRing() throws Exception {
});
}

@Test(timeout = 15_000L)
public void testCloseDoesNotFreePathScratchWhenWorkerStillAlive() throws Exception {
TestUtils.assertMemoryLeak(() -> {
long segSize = MmapSegment.HEADER_SIZE + (MmapSegment.FRAME_HEADER_SIZE + 32);
String slot = tmpDir + "/timeout-slot";
Assert.assertEquals(0, Files.mkdir(slot, Files.DIR_MODE_DEFAULT));
MmapSegment initial = MmapSegment.create(slot + "/sf-initial.sfa", 0L, segSize);
SegmentRing ring = new SegmentRing(initial, segSize);
SegmentManager manager = new SegmentManager(segSize, TimeUnit.SECONDS.toNanos(60));
CountDownLatch workerBlocked = new CountDownLatch(1);
CountDownLatch releaseWorker = new CountDownLatch(1);
AtomicBoolean fired = new AtomicBoolean();
AtomicReference<Throwable> hookErr = new AtomicReference<>();
boolean managerClosed = false;
try {
manager.register(ring, slot);
manager.setBeforeInstallSyncHook(() -> {
if (!fired.compareAndSet(false, true)) return;
workerBlocked.countDown();
try {
if (!releaseWorker.await(10, TimeUnit.SECONDS)) {
hookErr.compareAndSet(null,
new AssertionError("timed out waiting for test to release worker"));
}
} catch (Throwable t) {
hookErr.compareAndSet(null, t);
}
});
manager.start();
Assert.assertTrue("worker did not reach install hook",
workerBlocked.await(5, TimeUnit.SECONDS));
Assert.assertTrue("precondition: path scratch should be allocated",
readPathScratchImpl(manager) != 0L);

// Exercise the same branch as a timed-out join without making
// the test sleep for 5 seconds: join() returns while the worker
// is still alive. close() must leave worker-owned native memory
// alone so the worker can resume safely.
Thread.currentThread().interrupt();
manager.close();
Assert.assertTrue("close should preserve interrupted status",
Thread.interrupted());
Thread worker = readWorkerThread(manager);
Assert.assertTrue("worker should still be tracked after incomplete close",
worker != null && worker.isAlive());
Assert.assertTrue("path scratch was freed while worker was still alive",
readPathScratchImpl(manager) != 0L);

releaseWorker.countDown();
manager.close();
managerClosed = true;
Assert.assertNull("successful close should clear workerThread",
readWorkerThread(manager));
Assert.assertEquals("successful close should free path scratch",
0L, readPathScratchImpl(manager));
if (hookErr.get() != null) {
throw new AssertionError("install hook failed", hookErr.get());
}
} finally {
manager.setBeforeInstallSyncHook(null);
releaseWorker.countDown();
if (!managerClosed) {
Thread.interrupted();
manager.close();
}
ring.close();
}
});
}

private static void cleanupRecursively(String dir) {
if (!Files.exists(dir)) return;
long find = Files.findFirst(dir);
Expand All @@ -152,4 +228,22 @@ private static void cleanupRecursively(String dir) {
Files.findClose(find);
}
}

private static long readPathScratchImpl(SegmentManager manager) throws Exception {
Field pathScratchF = SegmentManager.class.getDeclaredField("pathScratch");
pathScratchF.setAccessible(true);
DirectUtf8Sink pathScratch = (DirectUtf8Sink) pathScratchF.get(manager);
Field sinkF = DirectUtf8Sink.class.getDeclaredField("sink");
sinkF.setAccessible(true);
DirectByteSink sink = (DirectByteSink) sinkF.get(pathScratch);
Field implF = DirectByteSink.class.getDeclaredField("impl");
implF.setAccessible(true);
return implF.getLong(sink);
}

private static Thread readWorkerThread(SegmentManager manager) throws Exception {
Field workerThreadF = SegmentManager.class.getDeclaredField("workerThread");
workerThreadF.setAccessible(true);
return (Thread) workerThreadF.get(manager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testInstallPathDoesNotCommitAfterDeregister() throws Exception {
CountDownLatch hookDone = new CountDownLatch(1);
AtomicBoolean fired = new AtomicBoolean();
AtomicReference<Throwable> hookErr = new AtomicReference<>();
setBeforeInstallSyncHook(mgr, () -> {
mgr.setBeforeInstallSyncHook(() -> {
if (!fired.compareAndSet(false, true)) return;
try {
mgr.deregister(ring);
Expand Down Expand Up @@ -170,7 +170,7 @@ public void testInstallPathDoesNotCommitAfterDeregister() throws Exception {
+ "under the same lock that covers deregister.",
0L, observed);
} finally {
setBeforeInstallSyncHook(mgr, null);
mgr.setBeforeInstallSyncHook(null);
try {
ring.close();
} catch (Throwable ignored) {
Expand Down Expand Up @@ -226,12 +226,6 @@ private static void rmDirRecursive(String dir) {
Files.remove(dir);
}

private static void setBeforeInstallSyncHook(SegmentManager mgr, Runnable hook) throws Exception {
Field f = SegmentManager.class.getDeclaredField("beforeInstallSyncHook");
f.setAccessible(true);
f.set(mgr, hook);
}

private static Thread workerThread(SegmentManager mgr) throws Exception {
Field f = SegmentManager.class.getDeclaredField("workerThread");
f.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
Expand Down Expand Up @@ -143,7 +144,7 @@ public void testTrimPathDoesNotDoubleSubtractAfterDeregister() throws Exception
AtomicBoolean fired = new AtomicBoolean();
CountDownLatch hookDone = new CountDownLatch(1);
AtomicReference<Throwable> hookErr = new AtomicReference<>();
setBeforeTrimSyncHook(mgr, () -> {
mgr.setBeforeTrimSyncHook(() -> {
if (!fired.compareAndSet(false, true)) return;
try {
mgr.deregister(ring);
Expand Down Expand Up @@ -185,8 +186,14 @@ public void testTrimPathDoesNotDoubleSubtractAfterDeregister() throws Exception
+ "`totalBytes -= sz` on a stillRegistered re-check "
+ "under the same lock that covers deregister.",
0L, observed);
assertFalse("stale SegmentManager snapshot skipped drainTrimmable() "
+ "after deregister and left a fully-acked sealed "
+ "segment on disk. The registration guard should "
+ "protect watermark/accounting only; trim ownership "
+ "transfer must still close and unlink " + activePath,
Files.exists(activePath));
} finally {
setBeforeTrimSyncHook(mgr, null);
mgr.setBeforeTrimSyncHook(null);
Unsafe.free(buf, 32, MemoryTag.NATIVE_DEFAULT);
try {
ring.close();
Expand Down Expand Up @@ -220,12 +227,6 @@ private static void awaitSpare(SegmentRing ring, String where) {
}
}

private static void setBeforeTrimSyncHook(SegmentManager mgr, Runnable hook) throws Exception {
Field f = SegmentManager.class.getDeclaredField("beforeTrimSyncHook");
f.setAccessible(true);
f.set(mgr, hook);
}

private static long readTotalBytes(SegmentManager mgr) throws Exception {
Field f = SegmentManager.class.getDeclaredField("totalBytes");
f.setAccessible(true);
Expand Down
Loading
Loading