From 52480328119a54b62540729fd089487cd3699ea0 Mon Sep 17 00:00:00 2001 From: Eugene Date: Fri, 12 Jun 2026 12:53:05 -0700 Subject: [PATCH 1/5] feat(file-service): cleanup stale uncommitted dataset uploads Add a configurable scheduled job that reclaims storage left behind by uploads that were never committed (issue #3681): - StagedFileCleanupJob, registered as a Dropwizard Managed lifecycle in FileService, runs every check interval on a single daemon thread - path 1: DATASET_UPLOAD_SESSION rows older than the retention period are removed after aborting their LakeFS multipart upload (abort before delete, so a crash between the two steps is retried next round; HTTP 404 on abort is treated as already-aborted) - path 2: LakeFS staged (uncommitted) objects whose mtime exceeds the retention period are reset via the existing branch-reset API; objects belonging to a non-expired upload session are excluded, staged deletions and orphaned repos are skipped - config: storage.cleanup.enabled / retention-hours (default 72, above the 24h presigned-URL expiry so only non-resumable sessions are removed) / interval-minutes (default 60), all env-overridable - every action is idempotent; per-item failures are logged and counted without aborting the batch; each round logs an audit summary - file-service test suites now fork one JVM per suite: each testcontainers suite boots its own LakeFS/MinIO/Postgres stack and mutates JVM-wide singletons, which broke whichever suite ran second Closes #3681 --- build.sbt | 12 +- common/config/src/main/resources/storage.conf | 16 + .../texera/amber/config/StorageConfig.scala | 10 + .../storage/util/LakeFSStorageClient.scala | 15 + .../apache/texera/service/FileService.scala | 13 + .../service/util/StagedFileCleanupJob.scala | 229 ++++++++ .../util/StagedFileCleanupJobSpec.scala | 489 ++++++++++++++++++ 7 files changed, 783 insertions(+), 1 deletion(-) create mode 100644 file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala create mode 100644 file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala diff --git a/build.sbt b/build.sbt index 43f97d9dfff..2832643d821 100644 --- a/build.sbt +++ b/build.sbt @@ -120,7 +120,17 @@ lazy val FileService = (project in file("file-service")) "com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion, "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion, "org.glassfish.jersey.core" % "jersey-common" % "3.0.12" - ) + ), + // Each testcontainers-based suite starts its own LakeFS/MinIO/Postgres stack + // and mutates JVM-wide singletons (StorageConfig endpoints, LakeFS client), + // so every suite gets its own forked JVM; sbt runs forked groups one at a + // time by default (Tags.ForkedTestGroup limit), keeping the stacks serial. + Test / fork := true, + Test / forkOptions := (Test / forkOptions).value + .withWorkingDirectory((ThisBuild / baseDirectory).value), + Test / testGrouping := (Test / definedTests).value.map { suite => + Tests.Group(suite.name, Seq(suite), Tests.SubProcess((Test / forkOptions).value)) + } ) lazy val WorkflowOperator = (project in file("common/workflow-operator")).settings(asfLicensingSettingsWithVendored).dependsOn(WorkflowCore) diff --git a/common/config/src/main/resources/storage.conf b/common/config/src/main/resources/storage.conf index 12a9919e048..64f68d2a45e 100644 --- a/common/config/src/main/resources/storage.conf +++ b/common/config/src/main/resources/storage.conf @@ -133,6 +133,22 @@ storage { } } + # Automated cleanup of uploaded but uncommitted dataset files + cleanup { + enabled = true + enabled = ${?STORAGE_CLEANUP_ENABLED} + + # Uncommitted uploads older than this many hours are cleaned up. Keep this above the + # 24h presigned-URL expiry (S3StorageClient.PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS) so + # that only already-non-resumable upload sessions are deleted. + retention-hours = 72 + retention-hours = ${?STORAGE_CLEANUP_RETENTION_HOURS} + + # Delay between two consecutive cleanup rounds + interval-minutes = 60 + interval-minutes = ${?STORAGE_CLEANUP_INTERVAL_MINUTES} + } + # Configuration for Postgres, used for user system data & metadata storage jdbc { url = "jdbc:postgresql://localhost:5432/texera_db?currentSchema=texera_db,public" diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala index c88541cf1b4..7980811e086 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala @@ -85,6 +85,11 @@ object StorageConfig { conf.getString("storage.s3.multipart.part-size") ) + // Staged file cleanup specifics + val cleanupEnabled: Boolean = conf.getBoolean("storage.cleanup.enabled") + val cleanupRetentionHours: Int = conf.getInt("storage.cleanup.retention-hours") + val cleanupIntervalMinutes: Int = conf.getInt("storage.cleanup.interval-minutes") + // File storage configurations val fileStorageDirectoryPath: Path = Path @@ -128,6 +133,11 @@ object StorageConfig { val ENV_LAKEFS_BLOCK_STORAGE_TYPE = "STORAGE_LAKEFS_BLOCK_STORAGE_TYPE" val ENV_LAKEFS_BLOCK_STORAGE_BUCKET_NAME = "STORAGE_LAKEFS_BLOCK_STORAGE_BUCKET_NAME" + // Staged file cleanup + val ENV_CLEANUP_ENABLED = "STORAGE_CLEANUP_ENABLED" + val ENV_CLEANUP_RETENTION_HOURS = "STORAGE_CLEANUP_RETENTION_HOURS" + val ENV_CLEANUP_INTERVAL_MINUTES = "STORAGE_CLEANUP_INTERVAL_MINUTES" + // S3 val ENV_S3_ENDPOINT = "STORAGE_S3_ENDPOINT" val ENV_S3_REGION = "STORAGE_S3_REGION" diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala index 613255173e5..869a9eaa7f1 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala @@ -452,4 +452,19 @@ object LakeFSStorageClient { .getSizeBytes .longValue() } + + /** + * Gets the last-modified time of a staged (uncommitted) object on the main branch. + * + * @param repoName Repository name. + * @param filePath Path to the staged object in the repository. + * @return Last-modified time as Unix epoch seconds. + */ + def getStagedObjectMtime(repoName: String, filePath: String): Long = { + objectsApi + .statObject(repoName, branchName, filePath) + .execute() + .getMtime + .longValue() + } } diff --git a/file-service/src/main/scala/org/apache/texera/service/FileService.scala b/file-service/src/main/scala/org/apache/texera/service/FileService.scala index 88f76503780..7bd1213b2fe 100644 --- a/file-service/src/main/scala/org/apache/texera/service/FileService.scala +++ b/file-service/src/main/scala/org/apache/texera/service/FileService.scala @@ -44,6 +44,7 @@ import org.apache.texera.service.resource.{ } import org.apache.texera.service.util.S3StorageClient import org.apache.texera.service.util.LargeBinaryManager +import org.apache.texera.service.util.StagedFileCleanupJob import org.eclipse.jetty.server.session.SessionHandler import org.glassfish.jersey.server.filter.RolesAllowedDynamicFeature import java.nio.file.Path @@ -104,6 +105,18 @@ class FileService extends Application[FileServiceConfiguration] with LazyLogging // Route request logs through SLF4J, controlled by TEXERA_SERVICE_LOG_LEVEL RequestLoggingFilter.register(environment.getApplicationContext) + + // Periodically clean up uploaded but uncommitted (staged) dataset files + if (StorageConfig.cleanupEnabled) { + environment + .lifecycle() + .manage( + new StagedFileCleanupJob( + StorageConfig.cleanupRetentionHours, + StorageConfig.cleanupIntervalMinutes + ) + ) + } } } diff --git a/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala b/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala new file mode 100644 index 00000000000..ebe8c1b9248 --- /dev/null +++ b/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import com.typesafe.scalalogging.LazyLogging +import io.dropwizard.lifecycle.Managed +import io.lakefs.clients.sdk.ApiException +import io.lakefs.clients.sdk.model.Diff +import org.apache.texera.amber.core.storage.util.LakeFSStorageClient +import org.apache.texera.dao.SqlServer +import org.apache.texera.dao.jooq.generated.tables.Dataset.DATASET +import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION + +import java.time.OffsetDateTime +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} +import scala.jdk.CollectionConverters._ + +/** + * Summary of one cleanup round. + * + * @param sessionsDeleted Number of abandoned upload session rows deleted. + * @param objectsReset Number of staged (uncommitted) objects reset in LakeFS. + * @param errors Number of failures encountered (each is retried next round). + */ +case class CleanupReport(sessionsDeleted: Int, objectsReset: Int, errors: Int) + +/** + * Periodically cleans up uploaded but uncommitted dataset files: + * 1. Aborts and deletes abandoned multipart upload sessions older than the retention window. + * 2. Resets staged (uncommitted) LakeFS objects older than the retention window, skipping + * objects that belong to still-active upload sessions. + * + * @param retentionHours Age (in hours) after which uncommitted uploads are cleaned up. + * @param intervalMinutes Delay (in minutes) between cleanup rounds. + */ +class StagedFileCleanupJob(retentionHours: Int, intervalMinutes: Int) + extends Managed + with LazyLogging { + + private var executor: ScheduledExecutorService = _ + + override def start(): Unit = { + executor = Executors.newSingleThreadScheduledExecutor((runnable: Runnable) => { + val thread = new Thread(runnable, "staged-file-cleanup") + thread.setDaemon(true) + thread + }) + executor.scheduleWithFixedDelay( + () => { + try { + runCleanupOnce() + } catch { + // An exception must never kill the schedule. + case t: Throwable => logger.error("Staged file cleanup round failed", t) + } + }, + // Small fixed initial delay so a restart doesn't postpone backlog cleanup by up to a + // full interval. + 1L, + intervalMinutes.toLong, + TimeUnit.MINUTES + ) + } + + override def stop(): Unit = { + if (executor != null) { + executor.shutdown() + } + } + + /** + * Runs a single cleanup round. Idempotent: rows/objects already cleaned up are not + * revisited, and failures are retried on the next round. + * + * @param now The reference time used to evaluate the retention window. + * @return Summary counts for this round. + */ + def runCleanupOnce(now: OffsetDateTime = OffsetDateTime.now()): CleanupReport = { + val cutoff = now.minusHours(retentionHours.toLong) + var sessionsDeleted = 0 + var objectsReset = 0 + var errors = 0 + + val ctx = SqlServer.getInstance().createDSLContext() + + // Map each dataset id to its LakeFS repository name (same mapping DatasetResource uses + // via dataset.getRepositoryName). + val repoNameByDid: Map[Integer, String] = ctx + .select(DATASET.DID, DATASET.REPOSITORY_NAME) + .from(DATASET) + .where(DATASET.REPOSITORY_NAME.isNotNull) + .fetch() + .asScala + .map(record => record.get(DATASET.DID) -> record.get(DATASET.REPOSITORY_NAME)) + .toMap + + // Path 1: abort and delete abandoned multipart upload sessions. + val expiredSessions = ctx + .selectFrom(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.CREATED_AT.lt(cutoff)) + .fetch() + .asScala + .toList + + expiredSessions.foreach { session => + try { + repoNameByDid.get(session.getDid) match { + case Some(repoName) => + try { + LakeFSStorageClient.abortPresignedMultipartUploads( + repoName, + session.getFilePath, + session.getUploadId, + session.getPhysicalAddress + ) + } catch { + // Already aborted (or never materialized): safe to delete the session row. + case e: ApiException if e.getCode == 404 => + logger.debug( + s"Multipart upload ${session.getUploadId} not found in LakeFS; " + + "treating as already aborted" + ) + } + case None => + // Dataset row gone or repository_name is NULL: the multipart lived in that + // repository's namespace, so there is nothing left to abort. + logger.debug( + s"No repository for dataset ${session.getDid}; " + + s"deleting orphan upload session ${session.getUploadId}" + ) + } + ctx + .deleteFrom(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(session.getUploadId)) + .execute() + sessionsDeleted += 1 + } catch { + case t: Throwable => + logger.warn( + s"Failed to clean up upload session ${session.getUploadId} " + + s"(did=${session.getDid}, path=${session.getFilePath}); will retry next round", + t + ) + errors += 1 + } + } + + // File paths of still-active (non-expired) upload sessions, per dataset. Staged objects + // belonging to an active upload must never be reset. + val activePathsByDid: Map[Integer, Set[String]] = ctx + .select(DATASET_UPLOAD_SESSION.DID, DATASET_UPLOAD_SESSION.FILE_PATH) + .from(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.CREATED_AT.ge(cutoff)) + .fetch() + .asScala + .groupBy(record => record.get(DATASET_UPLOAD_SESSION.DID)) + .map { + case (did, records) => + did -> records.map(_.get(DATASET_UPLOAD_SESSION.FILE_PATH)).toSet + } + + // Path 2: reset staged (uncommitted) objects older than the retention window. + repoNameByDid.foreach { + case (did, repoName) => + try { + val activePaths = activePathsByDid.getOrElse(did, Set.empty) + val stagedObjects = LakeFSStorageClient.retrieveUncommittedObjects(repoName) + // diffBranch carries no mtime, so each candidate costs one extra statObject call + // (N+1). Unavoidable until LakeFS exposes timestamps in the diff API. + stagedObjects.foreach { diff => + val path = diff.getPath + val isObjectWrite = + diff.getType == Diff.TypeEnum.ADDED || diff.getType == Diff.TypeEnum.CHANGED + if (!isObjectWrite) { + // E.g. a staged deletion of a committed file: there is no object behind it and + // it consumes no storage, so leaving it is correct and cheap. + logger.debug(s"Skipping staged ${diff.getType} entry '$path' in '$repoName'") + } else if (!activePaths.contains(path)) { + try { + val mtime = LakeFSStorageClient.getStagedObjectMtime(repoName, path) + if (mtime < cutoff.toEpochSecond) { + LakeFSStorageClient.resetObjectUploadOrDeletion(repoName, path) + objectsReset += 1 + } + } catch { + case t: Throwable => + logger.warn( + s"Failed to clean up staged object '$path' in repo '$repoName'", + t + ) + errors += 1 + } + } + } + } catch { + // The dataset's LakeFS repository was deleted out-of-band (a supported state): + // nothing staged to clean up there. + case e: ApiException if e.getCode == 404 => + logger.debug(s"Repository '$repoName' not found in LakeFS; skipping") + case t: Throwable => + logger.warn(s"Failed to clean up staged objects in repo '$repoName'", t) + errors += 1 + } + } + + logger.info( + s"Staged file cleanup round finished: sessionsDeleted=$sessionsDeleted, " + + s"objectsReset=$objectsReset, errors=$errors" + ) + CleanupReport(sessionsDeleted, objectsReset, errors) + } +} diff --git a/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala b/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala new file mode 100644 index 00000000000..b8de6e004d8 --- /dev/null +++ b/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import io.lakefs.clients.sdk.ApiException +import org.apache.texera.amber.core.storage.util.LakeFSStorageClient +import org.apache.texera.auth.SessionUser +import org.apache.texera.dao.MockTexeraDB +import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum +import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION +import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATASET_UPLOAD_SESSION_PART +import org.apache.texera.dao.jooq.generated.tables.daos.{DatasetDao, UserDao} +import org.apache.texera.dao.jooq.generated.tables.pojos.{Dataset, User} +import org.apache.texera.service.MockLakeFS +import org.apache.texera.service.resource.DatasetResource +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import java.io.ByteArrayInputStream +import java.net.URLEncoder +import java.nio.charset.StandardCharsets +import java.time.OffsetDateTime +import java.util.Optional + +/** + * Spec for [[StagedFileCleanupJob]] (issue #3681 — automated cleanup of uploaded but + * uncommitted files). + * + * Contract under test: + * - `runCleanupOnce(now)` deletes DATASET_UPLOAD_SESSION rows whose created_at is older + * than `retentionHours` relative to the injected `now` (aborting their LakeFS multipart + * first; part rows go away via ON DELETE CASCADE), + * - resets LakeFS staged (uncommitted) objects whose mtime exceeds retention, + * - skips staged objects that belong to a non-expired upload session, + * - never touches committed objects, + * - counts per-item failures in `errors` without aborting the batch, + * - is idempotent. + * + * Tests never sleep to age things: sessions are aged either by passing a future `now` + * (everything created "now" is then older than retention) or by writing an explicit + * created_at via jOOQ. Staged-object mtimes cannot be faked, so object-expiry tests pass a + * future `now`, and "fresh staged object" semantics under a future `now` are exercised via + * the session-protection rule (created_at moved next to the future `now`). + */ +class StagedFileCleanupJobSpec + extends AnyFlatSpec + with Matchers + with MockTexeraDB + with MockLakeFS + with BeforeAndAfterAll + with BeforeAndAfterEach { + + // --------------------------------------------------------------------------- + // Job configuration under test + // --------------------------------------------------------------------------- + private val RetentionHours = 24 + private val IntervalMinutes = 60 + + private lazy val job = new StagedFileCleanupJob(RetentionHours, IntervalMinutes) + + /** A `now` far enough in the future that anything created at real wall-clock time is expired. */ + private def farFuture: OffsetDateTime = + OffsetDateTime.now().plusHours(RetentionHours.toLong + 1L) + + // --------------------------------------------------------------------------- + // Fixtures (minimal copies of the DatasetResourceSpec idioms) + // --------------------------------------------------------------------------- + private val ownerUser: User = { + val user = new User + user.setName("cleanup_test_user") + user.setPassword("123") + user.setEmail("cleanup_test_user@test.com") + user.setRole(UserRoleEnum.ADMIN) + user + } + + private val repoName: String = s"cleanup-ds-${System.nanoTime()}" + + private val cleanupDataset: Dataset = { + val dataset = new Dataset + dataset.setName("cleanup-ds") + dataset.setRepositoryName(repoName) + dataset.setIsPublic(true) + dataset.setIsDownloadable(true) + dataset.setDescription("dataset for staged-file cleanup tests") + dataset + } + + /** Object committed once up-front; must survive every cleanup run (safety pin). */ + private val PinnedCommittedPath = "pinned/committed-pin.bin" + + private lazy val sessionUser = new SessionUser(ownerUser) + private lazy val datasetResource = new DatasetResource() + + private var lakeFsReady = false + + // --------------------------------------------------------------------------- + // Lifecycle + // --------------------------------------------------------------------------- + override protected def beforeAll(): Unit = { + super.beforeAll() + + initializeDBAndReplaceDSLContext() + + new UserDao(getDSLContext.configuration()).insert(ownerUser) + cleanupDataset.setOwnerUid(ownerUser.getUid) + new DatasetDao(getDSLContext.configuration()).insert(cleanupDataset) + } + + // Containers (MockLakeFS) only become reachable after the suite starts running, so all + // LakeFS setup happens lazily here rather than in beforeAll — same reason + // DatasetResourceSpec initializes its repo in beforeEach. + override protected def beforeEach(): Unit = { + super.beforeEach() + + if (!lakeFsReady) { + try LakeFSStorageClient.initRepo(repoName) + catch { + case e: ApiException if e.getCode == 409 => // already exists, fine + } + // Commit one object up-front: cleanup must NEVER touch committed objects. + LakeFSStorageClient.writeFileToRepo( + repoName, + PinnedCommittedPath, + new ByteArrayInputStream("pinned".getBytes(StandardCharsets.UTF_8)) + ) + LakeFSStorageClient.createCommit(repoName, "main", "pin committed object") + lakeFsReady = true + } + + // Clean slate so report counts are exact and independent of test order. + // (Deliberately NOT done via the job under test, to keep fixtures independent of it.) + getDSLContext.deleteFrom(DATASET_UPLOAD_SESSION).execute() + LakeFSStorageClient + .retrieveUncommittedObjects(repoName) + .foreach(diff => LakeFSStorageClient.resetObjectUploadOrDeletion(repoName, diff.getPath)) + } + + override protected def afterAll(): Unit = { + try shutdownDB() + finally super.afterAll() + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + private def urlEnc(raw: String): String = + URLEncoder.encode(raw, StandardCharsets.UTF_8.name()) + + private def uniquePath(prefix: String): String = + s"$prefix/${System.nanoTime()}.bin" + + /** Creates a real upload session (valid uploadId + physicalAddress) and returns its uploadId. */ + private def initSession(filePath: String): String = { + val resp = datasetResource.multipartUpload( + "init", + ownerUser.getEmail, + cleanupDataset.getName, + urlEnc(filePath), + Optional.of(java.lang.Long.valueOf(16L)), + Optional.of(java.lang.Long.valueOf(32L)), // single part + Optional.empty(), + sessionUser + ) + resp.getStatus shouldEqual 200 + val record = fetchSession(filePath) + record should not be null + record.getUploadId + } + + private def fetchSession(filePath: String) = + getDSLContext + .selectFrom(DATASET_UPLOAD_SESSION) + .where( + DATASET_UPLOAD_SESSION.UID + .eq(ownerUser.getUid) + .and(DATASET_UPLOAD_SESSION.DID.eq(cleanupDataset.getDid)) + .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath)) + ) + .fetchOne() + + private def countPartRows(uploadId: String): Int = + getDSLContext + .selectCount() + .from(DATASET_UPLOAD_SESSION_PART) + .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId)) + .fetchOne(0, classOf[Int]) + + /** Pins a session's age precisely — the injectable-clock counterpart on the DB side. */ + private def setSessionCreatedAt(uploadId: String, createdAt: OffsetDateTime): Unit = + getDSLContext + .update(DATASET_UPLOAD_SESSION) + .set(DATASET_UPLOAD_SESSION.CREATED_AT, createdAt) + .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(uploadId)) + .execute() + + /** Uploads an object to the repo branch WITHOUT committing (a staged/uncommitted object). */ + private def stageObject(filePath: String, content: String = "staged-bytes"): Unit = + LakeFSStorageClient.writeFileToRepo( + repoName, + filePath, + new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)) + ) + + private def uncommittedPaths(): List[String] = + LakeFSStorageClient.retrieveUncommittedObjects(repoName).map(_.getPath) + + private def committedPaths(): List[String] = + LakeFSStorageClient.retrieveObjectsOfVersion(repoName, "main").map(_.getPath) + + /** Inserts a session row whose LakeFS multipart does not exist (forces a per-item failure). */ + private def insertBogusSession(filePath: String): Unit = + getDSLContext + .insertInto(DATASET_UPLOAD_SESSION) + .set(DATASET_UPLOAD_SESSION.DID, cleanupDataset.getDid) + .set(DATASET_UPLOAD_SESSION.UID, ownerUser.getUid) + .set(DATASET_UPLOAD_SESSION.FILE_PATH, filePath) + .set(DATASET_UPLOAD_SESSION.UPLOAD_ID, s"bogus-upload-${System.nanoTime()}") + .set( + DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, + "s3://nonexistent-bucket/nonexistent-key" + ) + .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, Int.box(1)) + .set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES, java.lang.Long.valueOf(16L)) + .set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES, java.lang.Long.valueOf(32L)) + .execute() + + // =========================================================================== + // 1. Expired session is cleaned + // =========================================================================== + "StagedFileCleanupJob.runCleanupOnce" should "delete an expired upload session and its part rows" in { + val filePath = uniquePath("expired-session") + val uploadId = initSession(filePath) + countPartRows(uploadId) shouldEqual 1 // placeholder created at init + + val report = job.runCleanupOnce(farFuture) + + report.sessionsDeleted shouldEqual 1 + report.errors shouldEqual 0 + fetchSession(filePath) shouldBe null + countPartRows(uploadId) shouldEqual 0 // gone via ON DELETE CASCADE + } + + // =========================================================================== + // 2. Fresh session survives + // =========================================================================== + it should "keep a fresh (non-expired) upload session" in { + val filePath = uniquePath("fresh-session") + val uploadId = initSession(filePath) + + val report = job.runCleanupOnce(OffsetDateTime.now()) + + report.sessionsDeleted shouldEqual 0 + report.errors shouldEqual 0 + fetchSession(filePath) should not be null + countPartRows(uploadId) shouldEqual 1 + } + + // =========================================================================== + // 3. Expired staged object is reset (committed object untouched) + // =========================================================================== + it should "reset an expired staged object but never a committed one" in { + val stagedPath = uniquePath("expired-staged") + stageObject(stagedPath) + uncommittedPaths() should contain(stagedPath) + + val report = job.runCleanupOnce(farFuture) + + report.objectsReset should be >= 1 + report.errors shouldEqual 0 + uncommittedPaths() should not contain stagedPath + // safety pin: the committed object survives every cleanup + committedPaths() should contain(PinnedCommittedPath) + } + + // =========================================================================== + // 4. Fresh staged object survives + // =========================================================================== + it should "keep a freshly staged object when run with the real current time" in { + val stagedPath = uniquePath("fresh-staged") + stageObject(stagedPath) + + val report = job.runCleanupOnce(OffsetDateTime.now()) + + report.objectsReset shouldEqual 0 + report.errors shouldEqual 0 + uncommittedPaths() should contain(stagedPath) + } + + // =========================================================================== + // 5. Idempotence + // =========================================================================== + it should "be idempotent: a second run with the same now reports all zeros" in { + val sessionPath = uniquePath("idempotent-session") + val stagedPath = uniquePath("idempotent-staged") + initSession(sessionPath) + stageObject(stagedPath) + + val now = farFuture + + val first = job.runCleanupOnce(now) + first.sessionsDeleted shouldEqual 1 + first.objectsReset should be >= 1 + first.errors shouldEqual 0 + + val second = job.runCleanupOnce(now) + second.sessionsDeleted shouldEqual 0 + second.objectsReset shouldEqual 0 + second.errors shouldEqual 0 + } + + // =========================================================================== + // 6. Active upload is not touched while other items expire + // =========================================================================== + it should "not touch a non-expired session or its staged object while expiring other items" in { + val now = farFuture + + // Protected: a session that is fresh RELATIVE TO the injected now, with its staged + // file present on the branch. The skip rule must protect the object even though its + // real mtime is "older" than retention relative to the future now. + val protectedPath = uniquePath("active-upload") + val protectedUploadId = initSession(protectedPath) + stageObject(protectedPath) + setSessionCreatedAt(protectedUploadId, now.minusMinutes(5)) + + // Expirees: another session and an orphan staged object, both created at real now, + // i.e. older than retention relative to the future now. + val expiredSessionPath = uniquePath("expired-other-session") + initSession(expiredSessionPath) + val expiredStagedPath = uniquePath("expired-other-staged") + stageObject(expiredStagedPath) + + val report = job.runCleanupOnce(now) + + report.sessionsDeleted shouldEqual 1 + report.objectsReset shouldEqual 1 + report.errors shouldEqual 0 + + // survivors + fetchSession(protectedPath) should not be null + uncommittedPaths() should contain(protectedPath) + // expirees + fetchSession(expiredSessionPath) shouldBe null + uncommittedPaths() should not contain expiredStagedPath + } + + // =========================================================================== + // 7. Report counting on a mixed batch + // =========================================================================== + it should "report exact counts for a mix of expired and fresh items" in { + val now = farFuture + + // 2 expired sessions + val expired1 = uniquePath("mix-expired-1") + val expired2 = uniquePath("mix-expired-2") + initSession(expired1) + initSession(expired2) + + // 1 fresh session protecting 1 fresh staged object (fresh relative to the injected now) + val freshPath = uniquePath("mix-fresh") + val freshUploadId = initSession(freshPath) + stageObject(freshPath) + setSessionCreatedAt(freshUploadId, now.minusMinutes(5)) + + // 1 expired staged object with no session + val expiredStaged = uniquePath("mix-expired-staged") + stageObject(expiredStaged) + + val report = job.runCleanupOnce(now) + + report.sessionsDeleted shouldEqual 2 + report.objectsReset shouldEqual 1 + report.errors shouldEqual 0 + + fetchSession(expired1) shouldBe null + fetchSession(expired2) shouldBe null + fetchSession(freshPath) should not be null + uncommittedPaths() should contain(freshPath) + uncommittedPaths() should not contain expiredStaged + } + + // =========================================================================== + // 8. Retention boundary (precision via injectable clock + explicit created_at) + // =========================================================================== + it should "clean a session just past retention but keep one just inside it" in { + val now = OffsetDateTime.now() + val cutoff = now.minusHours(RetentionHours.toLong) + + val survivorPath = uniquePath("boundary-survivor") + val survivorUploadId = initSession(survivorPath) + setSessionCreatedAt(survivorUploadId, cutoff.plusMinutes(1)) // retention - epsilon + + val expiredPath = uniquePath("boundary-expired") + val expiredUploadId = initSession(expiredPath) + setSessionCreatedAt(expiredUploadId, cutoff.minusMinutes(1)) // retention + epsilon + + val report = job.runCleanupOnce(now) + + report.sessionsDeleted shouldEqual 1 + report.errors shouldEqual 0 + fetchSession(survivorPath) should not be null + fetchSession(expiredPath) shouldBe null + } + + // =========================================================================== + // 9. Committed objects are never touched (dedicated, with a fresh commit) + // =========================================================================== + it should "leave committed objects intact while resetting expired staged objects" in { + // Commit a new object in this test, alongside an expired staged object. + val committedPath = uniquePath("committed-safe") + stageObject(committedPath, content = "committed-bytes") + LakeFSStorageClient.createCommit(repoName, "main", "commit object that cleanup must keep") + + val expiredStaged = uniquePath("doomed-staged") + stageObject(expiredStaged) + + val report = job.runCleanupOnce(farFuture) + + report.errors shouldEqual 0 + uncommittedPaths() should not contain expiredStaged + committedPaths() should contain(committedPath) + committedPaths() should contain(PinnedCommittedPath) + } + + // =========================================================================== + // 10. Already-aborted multipart (LakeFS 404) is treated as cleaned, not an error + // =========================================================================== + it should "delete a session whose multipart was already aborted in LakeFS, with no error" in { + val filePath = uniquePath("already-aborted") + initSession(filePath) + val record = fetchSession(filePath) + // Abort the multipart out-of-band; the DB row stays behind (simulates a crash between + // LakeFS abort and row deletion, or a previous partially-failed cleanup round). + LakeFSStorageClient.abortPresignedMultipartUploads( + repoName, + filePath, + record.getUploadId, + record.getPhysicalAddress + ) + fetchSession(filePath) should not be null + + val report = job.runCleanupOnce(farFuture) + + report.sessionsDeleted shouldEqual 1 + report.errors shouldEqual 0 + fetchSession(filePath) shouldBe null + } + + // =========================================================================== + // 11. Per-item failures are counted and never abort the batch + // =========================================================================== + it should "count per-item failures in errors without aborting the rest of the batch" in { + // A session whose LakeFS multipart cannot be aborted (bogus uploadId / physical address). + val bogusPath = uniquePath("bogus-session") + insertBogusSession(bogusPath) // created_at defaults to now -> expired under farFuture + + // A real expired session and an expired staged object that MUST still be cleaned. + val realExpiredPath = uniquePath("real-expired-session") + initSession(realExpiredPath) + val expiredStaged = uniquePath("error-batch-staged") + stageObject(expiredStaged) + + val report = job.runCleanupOnce(farFuture) + + report.errors should be >= 1 + // The failure must not stop the rest of the batch: + fetchSession(realExpiredPath) shouldBe null + uncommittedPaths() should not contain expiredStaged + report.sessionsDeleted should be >= 1 + } +} From 820a0c07503a7394732ad9d139e44efefcf85658 Mon Sep 17 00:00:00 2001 From: Eugene Date: Sat, 13 Jun 2026 03:14:19 -0700 Subject: [PATCH 2/5] test(file-service): cover cleanup-job lifecycle, skip/error and diff-type branches Address review feedback (add more tests to guard the behavior) and the Codecov gaps in StagedFileCleanupJob. Adds 7 cases to StagedFileCleanupJobSpec covering branches the first 11 tests missed: - start()/stop() lifecycle, including stop() before start() (the executor-null guard); the started daemon executor is always torn down in a finally - session-cleanup path: an orphan session whose dataset has a null repository_name is deleted with no multipart abort attempted and no error - staged-object path: a staged deletion (a REMOVED diff from deleting a committed object on the branch without committing) is skipped, not reset, with no error - staged-object path: a dataset pointing at a non-existent LakeFS repo makes retrieveUncommittedObjects throw 404, which is caught and skipped without counting an error - staged-object path: a staged content change (a CHANGED diff from re-uploading to a previously committed path) is reset to the committed version, exercising the CHANGED half of the object-write check that ADDED-only tests missed - staged-object path across multiple datasets in one round: expired staged objects in two separate repos are both reset, and an active session in one dataset does not protect a same-named path in another, proving per-dataset keying Test-only; extra DATASET rows and repos are removed in a finally so the suite's single-dataset assumptions and per-test counts are unaffected. --- .../util/StagedFileCleanupJobSpec.scala | 227 ++++++++++++++++++ 1 file changed, 227 insertions(+) diff --git a/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala b/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala index b8de6e004d8..c522d800b74 100644 --- a/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala @@ -486,4 +486,231 @@ class StagedFileCleanupJobSpec uncommittedPaths() should not contain expiredStaged report.sessionsDeleted should be >= 1 } + + // =========================================================================== + // 12. Lifecycle: stop() before start() is a no-op (executor == null guard) + // =========================================================================== + "StagedFileCleanupJob lifecycle" should "allow stop() before start() without throwing" in { + val lifecycleJob = new StagedFileCleanupJob(RetentionHours, IntervalMinutes) + noException should be thrownBy lifecycleJob.stop() + } + + // =========================================================================== + // 13. Lifecycle: start() then stop() schedules and tears down cleanly + // =========================================================================== + it should "start() then stop() without throwing" in { + // The scheduled task has a 1-minute initial delay, so its body never runs during this + // test; we are only covering the scheduling + teardown lines, not the lambda. + val lifecycleJob = new StagedFileCleanupJob(RetentionHours, IntervalMinutes) + try { + noException should be thrownBy lifecycleJob.start() + } finally { + // Always stop so a started daemon executor never leaks between tests. + lifecycleJob.stop() + } + } + + // =========================================================================== + // 14. F1: orphan session (dataset has NULL repository_name) — cleaned, no abort attempted + // =========================================================================== + it should "delete an orphan session whose dataset has a NULL repository_name, without error" in { + // A second dataset with NO repository_name: such a did never appears in repoNameByDid, + // so the cleanup hits the `case None` branch (no multipart abort) and still deletes the row. + val nullRepoDataset = new Dataset + nullRepoDataset.setName(s"null-repo-ds-${System.nanoTime()}") + nullRepoDataset.setRepositoryName(null) + nullRepoDataset.setIsPublic(true) + nullRepoDataset.setIsDownloadable(true) + nullRepoDataset.setDescription("dataset with no LakeFS repo for orphan-session test") + nullRepoDataset.setOwnerUid(ownerUser.getUid) + val datasetDao = new DatasetDao(getDSLContext.configuration()) + datasetDao.insert(nullRepoDataset) + + try { + val orphanUploadId = s"orphan-upload-${System.nanoTime()}" + getDSLContext + .insertInto(DATASET_UPLOAD_SESSION) + .set(DATASET_UPLOAD_SESSION.DID, nullRepoDataset.getDid) + .set(DATASET_UPLOAD_SESSION.UID, ownerUser.getUid) + .set(DATASET_UPLOAD_SESSION.FILE_PATH, "orphan/file.bin") + .set(DATASET_UPLOAD_SESSION.UPLOAD_ID, orphanUploadId) + .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, "s3://whatever/orphan") + .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, Int.box(1)) + .set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES, java.lang.Long.valueOf(16L)) + .set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES, java.lang.Long.valueOf(32L)) + .execute() + + val report = job.runCleanupOnce(farFuture) + + report.sessionsDeleted shouldEqual 1 + report.errors shouldEqual 0 + getDSLContext + .selectCount() + .from(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(orphanUploadId)) + .fetchOne(0, classOf[Int]) shouldEqual 0 + } finally { + // Remove the extra dataset so later tests' repoNameByDid scan / counts are unaffected. + // (Its session row, if any survived, cascades away with the dataset.) + datasetDao.deleteById(nullRepoDataset.getDid) + } + } + + // =========================================================================== + // 15. F2: staged DELETION of a committed object is skipped (not reset, not an error) + // =========================================================================== + it should "skip a staged deletion (REMOVED diff) without resetting it or counting an error" in { + // Commit an object, then stage a deletion of it on main WITHOUT committing. The pending + // deletion surfaces as a Diff of type REMOVED, which has no object behind it. + val committedThenDeleted = uniquePath("staged-deletion") + stageObject(committedThenDeleted, content = "to-be-deleted") + LakeFSStorageClient.createCommit(repoName, "main", "commit object for staged-deletion test") + + // Stage the deletion (deleteObject targets the main branch but does not commit). + LakeFSStorageClient.deleteObject(repoName, committedThenDeleted) + + // Sanity: the pending change is a REMOVED diff for this path. + val uncommitted = LakeFSStorageClient.retrieveUncommittedObjects(repoName) + uncommitted.map(_.getPath) should contain(committedThenDeleted) + + val report = job.runCleanupOnce(farFuture) + + // The REMOVED entry is skipped: not counted in objectsReset and not an error. + report.objectsReset shouldEqual 0 + report.errors shouldEqual 0 + // The staged deletion is left intact (still pending, not reverted by cleanup). + LakeFSStorageClient + .retrieveUncommittedObjects(repoName) + .map(_.getPath) should contain(committedThenDeleted) + } + + // =========================================================================== + // 16. F2: a dataset pointing at a non-existent LakeFS repo (404) is skipped, no error + // =========================================================================== + it should "skip a dataset whose LakeFS repository does not exist, without error" in { + // A dataset row whose repository was never created in LakeFS. retrieveUncommittedObjects + // throws ApiException 404, which the job catches and skips. + val ghostDataset = new Dataset + ghostDataset.setName(s"ghost-ds-${System.nanoTime()}") + ghostDataset.setRepositoryName(s"ghost-repo-${System.nanoTime()}") + ghostDataset.setIsPublic(true) + ghostDataset.setIsDownloadable(true) + ghostDataset.setDescription("dataset pointing at a non-existent LakeFS repo") + ghostDataset.setOwnerUid(ownerUser.getUid) + val datasetDao = new DatasetDao(getDSLContext.configuration()) + datasetDao.insert(ghostDataset) + + try { + val report = job.runCleanupOnce(farFuture) + report.errors shouldEqual 0 + } finally { + // Remove the ghost dataset so the per-test clean-slate teardown (which only resets the + // suite repo) and later tests' repo scan are not affected by a repo that doesn't exist. + datasetDao.deleteById(ghostDataset.getDid) + } + } + + // =========================================================================== + // 17. F2: a staged CHANGED diff (content modification) is reset, not skipped + // =========================================================================== + it should "reset a staged content change (CHANGED diff) while keeping the committed version" in { + // Commit content A, then re-upload content B at the SAME path without committing. The + // pending modification surfaces as a Diff of type CHANGED (the `|| CHANGED` half of + // isObjectWrite), which must be treated as an object write and reset. + val changedPath = uniquePath("staged-changed") + stageObject(changedPath, content = "content-A") + LakeFSStorageClient.createCommit(repoName, "main", "commit content-A for CHANGED test") + + stageObject(changedPath, content = "content-B-modified") + + // Sanity: the path is now uncommitted (a CHANGED diff, not ADDED, since it was committed). + LakeFSStorageClient + .retrieveUncommittedObjects(repoName) + .find(_.getPath == changedPath) + .map(_.getType) shouldEqual Some(io.lakefs.clients.sdk.model.Diff.TypeEnum.CHANGED) + + val report = job.runCleanupOnce(farFuture) + + report.objectsReset should be >= 1 + report.errors shouldEqual 0 + // The staged change is reverted... + uncommittedPaths() should not contain changedPath + // ...and the committed version (content A) is intact and retrievable. + committedPaths() should contain(changedPath) + val committed = LakeFSStorageClient.getFileFromRepo(repoName, "main", changedPath) + new String( + java.nio.file.Files.readAllBytes(committed.toPath), + StandardCharsets.UTF_8 + ) shouldEqual + "content-A" + } + + // =========================================================================== + // 18. Multiple datasets/repos are cleaned in a single round (path-2 loop > 1 repo) + // =========================================================================== + it should "reset expired staged objects across multiple datasets, keyed per-dataset" in { + // A second dataset with its own LakeFS repo, initialized like the suite's. + val repo2 = s"cleanup-ds2-${System.nanoTime()}" + val dataset2 = new Dataset + dataset2.setName(s"cleanup-ds2-${System.nanoTime()}") + dataset2.setRepositoryName(repo2) + dataset2.setIsPublic(true) + dataset2.setIsDownloadable(true) + dataset2.setDescription("second dataset for multi-repo cleanup test") + dataset2.setOwnerUid(ownerUser.getUid) + val datasetDao = new DatasetDao(getDSLContext.configuration()) + datasetDao.insert(dataset2) + + try { + try LakeFSStorageClient.initRepo(repo2) + catch { + case e: ApiException if e.getCode == 409 => // already exists, fine + } + + val now = farFuture + + // One expired staged object in each repo -> both must be reset. + val expired1 = uniquePath("multi-expired-1") + stageObject(expired1) // suite repo + val expired2 = "multi-expired-2/obj.bin" + LakeFSStorageClient.writeFileToRepo( + repo2, + expired2, + new ByteArrayInputStream("staged-bytes".getBytes(StandardCharsets.UTF_8)) + ) + + // An active session in dataset1 (path P) must NOT protect a same-named path in dataset2: + // activePathsByDid is keyed per dataset. Stage P in repo2 with no active session there. + val sharedPath = "shared/same-path.bin" + val activeUploadId = initSession(sharedPath) // active session for dataset1 only + setSessionCreatedAt(activeUploadId, now.minusMinutes(5)) // fresh relative to `now` + stageObject(sharedPath) // staged in dataset1 -> protected + LakeFSStorageClient.writeFileToRepo( + repo2, + sharedPath, + new ByteArrayInputStream("staged-bytes".getBytes(StandardCharsets.UTF_8)) + ) // staged in dataset2 -> NOT protected (no session for dataset2) + + val report = job.runCleanupOnce(now) + + // Reset: expired1 (repo1), expired2 (repo2), and sharedPath in repo2 = 3. + report.objectsReset shouldEqual 3 + report.errors shouldEqual 0 + + // dataset1: expired object gone, but the active-session-protected path survives. + uncommittedPaths() should not contain expired1 + uncommittedPaths() should contain(sharedPath) + + // dataset2: both staged objects gone (the dataset1 active path did not protect them). + val repo2Uncommitted = LakeFSStorageClient.retrieveUncommittedObjects(repo2).map(_.getPath) + repo2Uncommitted should not contain expired2 + repo2Uncommitted should not contain sharedPath + } finally { + // Drop the extra dataset + repo so the suite's single-dataset assumptions hold and the + // per-test clean-slate (which only resets the suite repo) isn't affected. + datasetDao.deleteById(dataset2.getDid) + try LakeFSStorageClient.deleteRepo(repo2) + catch { case _: ApiException => /* best-effort cleanup */ } + } + } } From d00460a14b7bd1914afd94877f95e41e24bf64ad Mon Sep 17 00:00:00 2001 From: Eugene Gu Date: Sat, 13 Jun 2026 14:35:19 -0700 Subject: [PATCH 3/5] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Eugene Gu --- .../org/apache/texera/service/util/StagedFileCleanupJob.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala b/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala index ebe8c1b9248..6d6c555b670 100644 --- a/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala +++ b/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala @@ -54,6 +54,9 @@ class StagedFileCleanupJob(retentionHours: Int, intervalMinutes: Int) extends Managed with LazyLogging { + require(retentionHours > 0, s"retentionHours must be > 0 (got $retentionHours)") + require(intervalMinutes > 0, s"intervalMinutes must be > 0 (got $intervalMinutes)") + private var executor: ScheduledExecutorService = _ override def start(): Unit = { From 72252c2c84ec4ccba79416665d57d4740972c936 Mon Sep 17 00:00:00 2001 From: Eugene Date: Sun, 14 Jun 2026 05:04:45 -0700 Subject: [PATCH 4/5] fix(file-service): wrap session cleanup in a transaction and treat per-object 404 as a no-op Address review feedback on the staged-file cleanup job. - Path 1 (abandoned upload sessions): the session-row delete and the LakeFS multipart abort now run inside one DB transaction (SqlServer.withTransaction), deleting FIRST. If the abort then fails with a non-404 error, the transaction rolls back and the session row survives, so the next round retries instead of leaving an orphaned multipart with no tracking record. LakeFS is external and cannot truly enroll in a DB transaction, so a *successful* abort is not undone by a later failure -- but the abort is idempotent (re-aborting an already-aborted upload returns 404, treated as success), so partial state is never permanent. The transaction is per session (not per round) to avoid holding a DB connection open across many LakeFS HTTP calls. - Path 2 (staged objects): an ApiException 404 from the per-object stat/reset is now treated as a successful no-op (a concurrent commit/reset, or another cleanup round, already removed it), matching the job's idempotent design instead of logging a warning and counting an error. Adds failure-mode tests for cleaning a single item: - a non-404 abort failure rolls back the row delete (the row survives) and is counted, so it is retried; - a transiently-failing session is cleaned on a later round once the failure clears (self-heals, not stuck); - a failing item does not prevent a healthy item in the same round from being cleaned (the healthy row is deleted, the failing row is kept for retry). Together with existing tests these cover the named failure cases: no DB record (orphan/null-repo session), no file found (already-aborted / repo 404), and a generic/timeout LakeFS error (rolled back, retried). Also renames the test section comments from F1/F2 to "Path 1 (session cleanup)" / "Path 2 (staged objects)" to match the production-code naming. --- .../service/util/StagedFileCleanupJob.scala | 69 +++++++---- .../util/StagedFileCleanupJobSpec.scala | 115 +++++++++++++++++- 2 files changed, 153 insertions(+), 31 deletions(-) diff --git a/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala b/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala index 6d6c555b670..cb6f68c415d 100644 --- a/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala +++ b/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala @@ -124,35 +124,43 @@ class StagedFileCleanupJob(retentionHours: Int, intervalMinutes: Int) expiredSessions.foreach { session => try { - repoNameByDid.get(session.getDid) match { - case Some(repoName) => - try { - LakeFSStorageClient.abortPresignedMultipartUploads( - repoName, - session.getFilePath, - session.getUploadId, - session.getPhysicalAddress - ) - } catch { - // Already aborted (or never materialized): safe to delete the session row. - case e: ApiException if e.getCode == 404 => - logger.debug( - s"Multipart upload ${session.getUploadId} not found in LakeFS; " + - "treating as already aborted" + // Delete the row and abort the multipart in one transaction, deleting FIRST. LakeFS is + // external and cannot truly enroll in a DB transaction, but the abort is idempotent + // (re-aborting an already-aborted upload returns 404, treated as success below), so the + // only risk is the abort failing AFTER the delete is staged. By staging the delete first + // and letting a non-404 abort failure roll the whole transaction back, the session row + // survives and the next round retries — never leaving an orphaned multipart behind. + SqlServer.withTransaction(ctx) { txn => + txn + .deleteFrom(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(session.getUploadId)) + .execute() + repoNameByDid.get(session.getDid) match { + case Some(repoName) => + try { + LakeFSStorageClient.abortPresignedMultipartUploads( + repoName, + session.getFilePath, + session.getUploadId, + session.getPhysicalAddress ) - } - case None => - // Dataset row gone or repository_name is NULL: the multipart lived in that - // repository's namespace, so there is nothing left to abort. - logger.debug( - s"No repository for dataset ${session.getDid}; " + - s"deleting orphan upload session ${session.getUploadId}" - ) + } catch { + // Already aborted (or never materialized): safe to delete the session row. + case e: ApiException if e.getCode == 404 => + logger.debug( + s"Multipart upload ${session.getUploadId} not found in LakeFS; " + + "treating as already aborted" + ) + } + case None => + // Dataset row gone or repository_name is NULL: the multipart lived in that + // repository's namespace, so there is nothing left to abort. + logger.debug( + s"No repository for dataset ${session.getDid}; " + + s"deleting orphan upload session ${session.getUploadId}" + ) + } } - ctx - .deleteFrom(DATASET_UPLOAD_SESSION) - .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(session.getUploadId)) - .execute() sessionsDeleted += 1 } catch { case t: Throwable => @@ -203,6 +211,13 @@ class StagedFileCleanupJob(retentionHours: Int, intervalMinutes: Int) objectsReset += 1 } } catch { + // Concurrently committed/reset, or already cleaned by another round: the + // object is gone, which is the desired end state for an idempotent job. + case e: ApiException if e.getCode == 404 => + logger.debug( + s"Staged object '$path' not found in repo '$repoName'; " + + "treating as already cleaned" + ) case t: Throwable => logger.warn( s"Failed to clean up staged object '$path' in repo '$repoName'", diff --git a/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala b/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala index c522d800b74..bd8fbc22c04 100644 --- a/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala @@ -511,7 +511,7 @@ class StagedFileCleanupJobSpec } // =========================================================================== - // 14. F1: orphan session (dataset has NULL repository_name) — cleaned, no abort attempted + // 14. Path 1 (session cleanup): orphan session (dataset has NULL repository_name) — cleaned, no abort attempted // =========================================================================== it should "delete an orphan session whose dataset has a NULL repository_name, without error" in { // A second dataset with NO repository_name: such a did never appears in repoNameByDid, @@ -557,7 +557,7 @@ class StagedFileCleanupJobSpec } // =========================================================================== - // 15. F2: staged DELETION of a committed object is skipped (not reset, not an error) + // 15. Path 2 (staged objects): staged DELETION of a committed object is skipped (not reset, not an error) // =========================================================================== it should "skip a staged deletion (REMOVED diff) without resetting it or counting an error" in { // Commit an object, then stage a deletion of it on main WITHOUT committing. The pending @@ -585,7 +585,7 @@ class StagedFileCleanupJobSpec } // =========================================================================== - // 16. F2: a dataset pointing at a non-existent LakeFS repo (404) is skipped, no error + // 16. Path 2 (staged objects): a dataset pointing at a non-existent LakeFS repo (404) is skipped, no error // =========================================================================== it should "skip a dataset whose LakeFS repository does not exist, without error" in { // A dataset row whose repository was never created in LakeFS. retrieveUncommittedObjects @@ -611,7 +611,7 @@ class StagedFileCleanupJobSpec } // =========================================================================== - // 17. F2: a staged CHANGED diff (content modification) is reset, not skipped + // 17. Path 2 (staged objects): a staged CHANGED diff (content modification) is reset, not skipped // =========================================================================== it should "reset a staged content change (CHANGED diff) while keeping the committed version" in { // Commit content A, then re-upload content B at the SAME path without committing. The @@ -713,4 +713,111 @@ class StagedFileCleanupJobSpec catch { case _: ApiException => /* best-effort cleanup */ } } } + + // =========================================================================== + // 19. Path 1 (session cleanup): a non-404 abort failure rolls back the row delete (transactional) + // =========================================================================== + it should "roll back the session-row delete when the multipart abort fails (non-404)" in { + // Same failure mechanism as test 11: a bogus uploadId / physical address whose abort + // throws a NON-404 error. Unlike test 11, this asserts the DB row SURVIVES, proving the + // delete (staged first inside withTransaction) is rolled back rather than committed. + // A timeout or a 5xx from LakeFS takes this same non-404 -> rollback -> retry path. + val bogusPath = uniquePath("rollback-bogus-session") + insertBogusSession(bogusPath) // created_at defaults to now -> expired under farFuture + val bogusId = fetchSession(bogusPath).getUploadId + + try { + val report = job.runCleanupOnce(farFuture) + + report.errors shouldEqual 1 + report.sessionsDeleted shouldEqual 0 + // Key assertion vs. test 11: the delete was rolled back with the failed abort, so the + // row is still present and the next round will retry it. + fetchSession(bogusPath) should not be null + } finally { + // The row survives the rolled-back round, so remove it explicitly to keep later tests' + // exact counts independent (beforeEach also clears the table, but be explicit). + getDSLContext + .deleteFrom(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(bogusId)) + .execute() + } + } + + // =========================================================================== + // 20. Path 1 (session cleanup): a transiently-failing session is cleaned on the NEXT round (self-heals) + // =========================================================================== + it should "clean a transiently-failing session on the next round (retried, not stuck)" in { + // Demonstrates Yicong's "cleaned in next round": round 1's abort fails (non-404) and rolls + // back, leaving the row; once the transient condition clears, a later round succeeds. + val filePath = uniquePath("transient-session") + insertBogusSession(filePath) // abort throws non-404 under the bogus physical address + val bogusId = fetchSession(filePath).getUploadId + + try { + // Round 1: abort fails -> transaction rolls back -> row survives, counted as an error. + val round1 = job.runCleanupOnce(farFuture) + round1.errors shouldEqual 1 + round1.sessionsDeleted shouldEqual 0 + fetchSession(filePath) should not be null + + // Clear the transient failure deterministically WITHOUT faking the client: replace the + // bogus row with a REAL session at the same logical path, then abort its multipart + // out-of-band (the test-10 mechanism). The next round's abort therefore returns 404, + // which the job treats as success. + getDSLContext + .deleteFrom(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(bogusId)) + .execute() + initSession(filePath) + val healed = fetchSession(filePath) + LakeFSStorageClient.abortPresignedMultipartUploads( + repoName, + filePath, + healed.getUploadId, + healed.getPhysicalAddress + ) + + // Round 2: abort hits the already-aborted 404 (success) -> row is deleted, no error. + val round2 = job.runCleanupOnce(farFuture) + round2.errors shouldEqual 0 + round2.sessionsDeleted shouldEqual 1 + fetchSession(filePath) shouldBe null + } finally { + // Best-effort: remove any row that may survive an unexpected mid-test failure. + getDSLContext + .deleteFrom(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath)) + .execute() + } + } + + // =========================================================================== + // 21. Path 1 (session cleanup): a failing item does not prevent a healthy item in the same round from cleaning + // =========================================================================== + it should "clean a healthy session in the same round where another item fails, keeping the failed row" in { + // Test 11 asserts the healthy item is cleaned and the batch continues, but does NOT assert + // that the failing row SURVIVES (rolled back). This pins both halves precisely. + val healthyPath = uniquePath("healthy-alongside-failing") + initSession(healthyPath) // abort succeeds -> row deleted + val bogusPath = uniquePath("failing-alongside-healthy") + insertBogusSession(bogusPath) // abort throws non-404 -> transaction rolls back + val bogusId = fetchSession(bogusPath).getUploadId + + try { + val report = job.runCleanupOnce(farFuture) + + report.sessionsDeleted shouldEqual 1 + report.errors shouldEqual 1 + // Healthy item cleaned despite the sibling failure... + fetchSession(healthyPath) shouldBe null + // ...and the failing item's row is rolled back (survives), ready to retry next round. + fetchSession(bogusPath) should not be null + } finally { + getDSLContext + .deleteFrom(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(bogusId)) + .execute() + } + } } From 7619e3f9abd94da247caedc2f19c237236a35f28 Mon Sep 17 00:00:00 2001 From: Eugene Date: Wed, 17 Jun 2026 23:02:46 -0700 Subject: [PATCH 5/5] fix(file-service): harden staged file cleanup --- .../service/util/StagedFileCleanupJob.scala | 75 ++++++++++++------- .../util/StagedFileCleanupJobSpec.scala | 30 +++++++- 2 files changed, 78 insertions(+), 27 deletions(-) diff --git a/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala b/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala index cb6f68c415d..8eaf201eab2 100644 --- a/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala +++ b/file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala @@ -27,6 +27,7 @@ import org.apache.texera.amber.core.storage.util.LakeFSStorageClient import org.apache.texera.dao.SqlServer import org.apache.texera.dao.jooq.generated.tables.Dataset.DATASET import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION +import org.jooq.DSLContext import java.time.OffsetDateTime import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} @@ -41,6 +42,10 @@ import scala.jdk.CollectionConverters._ */ case class CleanupReport(sessionsDeleted: Int, objectsReset: Int, errors: Int) +object StagedFileCleanupJob { + private[util] val DefaultSessionCleanupBatchSize = 500 +} + /** * Periodically cleans up uploaded but uncommitted dataset files: * 1. Aborts and deletes abandoned multipart upload sessions older than the retention window. @@ -50,12 +55,19 @@ case class CleanupReport(sessionsDeleted: Int, objectsReset: Int, errors: Int) * @param retentionHours Age (in hours) after which uncommitted uploads are cleaned up. * @param intervalMinutes Delay (in minutes) between cleanup rounds. */ -class StagedFileCleanupJob(retentionHours: Int, intervalMinutes: Int) - extends Managed +class StagedFileCleanupJob( + retentionHours: Int, + intervalMinutes: Int, + sessionCleanupBatchSize: Int = StagedFileCleanupJob.DefaultSessionCleanupBatchSize +) extends Managed with LazyLogging { require(retentionHours > 0, s"retentionHours must be > 0 (got $retentionHours)") require(intervalMinutes > 0, s"intervalMinutes must be > 0 (got $intervalMinutes)") + require( + sessionCleanupBatchSize > 0, + s"sessionCleanupBatchSize must be > 0 (got $sessionCleanupBatchSize)" + ) private var executor: ScheduledExecutorService = _ @@ -95,8 +107,9 @@ class StagedFileCleanupJob(retentionHours: Int, intervalMinutes: Int) * @param now The reference time used to evaluate the retention window. * @return Summary counts for this round. */ - def runCleanupOnce(now: OffsetDateTime = OffsetDateTime.now()): CleanupReport = { + private[util] def runCleanupOnce(now: OffsetDateTime = OffsetDateTime.now()): CleanupReport = { val cutoff = now.minusHours(retentionHours.toLong) + val cutoffEpochSecond = cutoff.toEpochSecond var sessionsDeleted = 0 var objectsReset = 0 var errors = 0 @@ -118,6 +131,8 @@ class StagedFileCleanupJob(retentionHours: Int, intervalMinutes: Int) val expiredSessions = ctx .selectFrom(DATASET_UPLOAD_SESSION) .where(DATASET_UPLOAD_SESSION.CREATED_AT.lt(cutoff)) + .orderBy(DATASET_UPLOAD_SESSION.CREATED_AT.asc()) + .limit(sessionCleanupBatchSize) .fetch() .asScala .toList @@ -173,28 +188,14 @@ class StagedFileCleanupJob(retentionHours: Int, intervalMinutes: Int) } } - // File paths of still-active (non-expired) upload sessions, per dataset. Staged objects - // belonging to an active upload must never be reset. - val activePathsByDid: Map[Integer, Set[String]] = ctx - .select(DATASET_UPLOAD_SESSION.DID, DATASET_UPLOAD_SESSION.FILE_PATH) - .from(DATASET_UPLOAD_SESSION) - .where(DATASET_UPLOAD_SESSION.CREATED_AT.ge(cutoff)) - .fetch() - .asScala - .groupBy(record => record.get(DATASET_UPLOAD_SESSION.DID)) - .map { - case (did, records) => - did -> records.map(_.get(DATASET_UPLOAD_SESSION.FILE_PATH)).toSet - } - // Path 2: reset staged (uncommitted) objects older than the retention window. repoNameByDid.foreach { case (did, repoName) => try { - val activePaths = activePathsByDid.getOrElse(did, Set.empty) val stagedObjects = LakeFSStorageClient.retrieveUncommittedObjects(repoName) - // diffBranch carries no mtime, so each candidate costs one extra statObject call - // (N+1). Unavoidable until LakeFS exposes timestamps in the diff API. + // diffBranch carries no mtime, so old write candidates need statObject calls. + // Re-check immediately before reset so a new upload to the same path is not judged + // by stale session/mtime reads from earlier in the cleanup round. stagedObjects.foreach { diff => val path = diff.getPath val isObjectWrite = @@ -203,12 +204,18 @@ class StagedFileCleanupJob(retentionHours: Int, intervalMinutes: Int) // E.g. a staged deletion of a committed file: there is no object behind it and // it consumes no storage, so leaving it is correct and cheap. logger.debug(s"Skipping staged ${diff.getType} entry '$path' in '$repoName'") - } else if (!activePaths.contains(path)) { + } else { try { val mtime = LakeFSStorageClient.getStagedObjectMtime(repoName, path) - if (mtime < cutoff.toEpochSecond) { - LakeFSStorageClient.resetObjectUploadOrDeletion(repoName, path) - objectsReset += 1 + if ( + mtime < cutoffEpochSecond && + !hasActiveUploadSession(ctx, did, path, cutoff) + ) { + val latestMtime = LakeFSStorageClient.getStagedObjectMtime(repoName, path) + if (latestMtime < cutoffEpochSecond) { + LakeFSStorageClient.resetObjectUploadOrDeletion(repoName, path) + objectsReset += 1 + } } } catch { // Concurrently committed/reset, or already cleaned by another round: the @@ -238,10 +245,28 @@ class StagedFileCleanupJob(retentionHours: Int, intervalMinutes: Int) } } - logger.info( + logger.debug( s"Staged file cleanup round finished: sessionsDeleted=$sessionsDeleted, " + s"objectsReset=$objectsReset, errors=$errors" ) CleanupReport(sessionsDeleted, objectsReset, errors) } + + private def hasActiveUploadSession( + ctx: DSLContext, + did: Integer, + path: String, + cutoff: OffsetDateTime + ): Boolean = + ctx.fetchExists( + ctx + .selectOne() + .from(DATASET_UPLOAD_SESSION) + .where( + DATASET_UPLOAD_SESSION.DID + .eq(did) + .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(path)) + .and(DATASET_UPLOAD_SESSION.CREATED_AT.ge(cutoff)) + ) + ) } diff --git a/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala b/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala index bd8fbc22c04..8e9404edcec 100644 --- a/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala @@ -204,6 +204,13 @@ class StagedFileCleanupJobSpec .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId)) .fetchOne(0, classOf[Int]) + private def countDatasetUploadSessions(): Int = + getDSLContext + .selectCount() + .from(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.DID.eq(cleanupDataset.getDid)) + .fetchOne(0, classOf[Int]) + /** Pins a session's age precisely — the injectable-clock counterpart on the DB side. */ private def setSessionCreatedAt(uploadId: String, createdAt: OffsetDateTime): Unit = getDSLContext @@ -327,6 +334,25 @@ class StagedFileCleanupJobSpec second.errors shouldEqual 0 } + it should "process only a bounded number of expired sessions per cleanup round" in { + val boundedJob = + new StagedFileCleanupJob(RetentionHours, IntervalMinutes, sessionCleanupBatchSize = 1) + + initSession(uniquePath("bounded-session-1")) + initSession(uniquePath("bounded-session-2")) + countDatasetUploadSessions() shouldEqual 2 + + val first = boundedJob.runCleanupOnce(farFuture) + first.sessionsDeleted shouldEqual 1 + first.errors shouldEqual 0 + countDatasetUploadSessions() shouldEqual 1 + + val second = boundedJob.runCleanupOnce(farFuture) + second.sessionsDeleted shouldEqual 1 + second.errors shouldEqual 0 + countDatasetUploadSessions() shouldEqual 0 + } + // =========================================================================== // 6. Active upload is not touched while other items expire // =========================================================================== @@ -679,8 +705,8 @@ class StagedFileCleanupJobSpec new ByteArrayInputStream("staged-bytes".getBytes(StandardCharsets.UTF_8)) ) - // An active session in dataset1 (path P) must NOT protect a same-named path in dataset2: - // activePathsByDid is keyed per dataset. Stage P in repo2 with no active session there. + // An active session in dataset1 (path P) must NOT protect a same-named path in dataset2. + // Stage P in repo2 with no active session there. val sharedPath = "shared/same-path.bin" val activeUploadId = initSession(sharedPath) // active session for dataset1 only setSessionCreatedAt(activeUploadId, now.minusMinutes(5)) // fresh relative to `now`