Skip to content

feat(file-service): cleanup stale uncommitted dataset uploads#5643

Open
eugenegujing wants to merge 4 commits into
apache:mainfrom
eugenegujing:feat/staged-file-cleanup
Open

feat(file-service): cleanup stale uncommitted dataset uploads#5643
eugenegujing wants to merge 4 commits into
apache:mainfrom
eugenegujing:feat/staged-file-cleanup

Conversation

@eugenegujing

Copy link
Copy Markdown
Contributor

What changes were proposed in this PR?

Adds a configurable scheduled cleanup job to file-service that reclaims the storage held by uploads that were started (or staged) but never committed.

Problem. Files uploaded to a dataset sit in LakeFS staging until the user commits a version. LakeFS GC only manages committed data, and Texera has no background cleanup, so abandoned uploads accumulate forever in MinIO/S3, along with their DATASET_UPLOAD_SESSION rows. The only existing mitigations are the 24h presigned-URL expiry and the lazy re-init cleanup in initMultipartUpload, neither of which removes staged data for datasets nobody touches again.

Design.

FileService.run() ──(if enabled)──> Managed StagedFileCleanupJob
   every interval-minutes, on one daemon thread:
   ├─ path 1: DATASET_UPLOAD_SESSION rows with created_at < now − retention
   │          → abort LakeFS multipart, then delete row (parts cascade)
   ├─ path 2: per dataset repo, staged objects with mtime < now − retention
   │          → branch-reset via existing LakeFS API
   └─ audit log: sessions deleted / objects reset / errors

Safety properties (each independently enforced):

Guard Mechanism
Committed data is untouchable the job only calls staging-scoped APIs (diffBranch, branch reset, multipart abort) — committed objects never appear in its inputs
No race with an in-flight upload staged objects whose path belongs to a non-expired session are excluded (exact complement of the deletion cutoff); default retention 72h is far above the 24h PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS, so deleted sessions are already non-resumable
Crash-safe ordering multipart abort happens before the row delete; a crash in between leaves the row, which is retried next round (abort 404 = already aborted)
Idempotent & isolated every action is safely repeatable; per-item failures are logged, counted in the report, and never abort the batch; staged deletions and orphaned repos (dataset row without a live LakeFS repo) are skipped instead of erroring every round
Operator control storage.cleanup.enabled / retention-hours / interval-minutes, each overridable via STORAGE_CLEANUP_* env vars; the whole feature can be disabled

Also in this PR: file-service test suites now fork one JVM per suite (build.sbt). Each testcontainers-based suite boots its own LakeFS/MinIO/Postgres stack and mutates JVM-wide singletons (StorageConfig endpoints), so sharing one JVM broke whichever container suite ran second; forking isolates the Docker client and singleton state. LakeFSStorageClient gains one helper (getStagedObjectMtime) because the diff API carries no timestamps.

Feedback welcome on the default retention (72h ≈ covers a weekend; it is a config default, not a hard-coded value).

Note: since this change automatically deletes user-uploaded data, it touches a fairly high-risk area. I have been deliberately conservative (staging-scoped APIs only, generous retention, kill switch, audit logging), but I'm happy to discuss any part of the design — including making the feature opt-in (enabled = false by default) if that feels safer for a first release.

Any related issues, documentation, discussions?

Closes #3681.

How was this PR tested?

New StagedFileCleanupJobSpec (11 tests, real LakeFS/MinIO/Postgres via testcontainers + embedded Postgres for the DAO): expired session deleted with part-row cascade; fresh session survives; expired staged object reset while a committed object survives; fresh staged object survives; idempotent second run reports zeros; active upload and its staged file untouched; mixed-batch exact counts; ±1min boundary around the cutoff; out-of-band-aborted multipart treated as already-aborted (404 rule); per-item failure counted without aborting the batch.

sbt "FileService/test"

111 tests, 4 suites, all passed (includes the pre-existing DatasetResourceSpec 93 tests — no regression). Also ran sbt scalafixAll and sbt scalafmtAll (clean). Requires Docker for the testcontainers suites.

Was this PR authored or co-authored using generative AI tooling?

Co-authored by: Claude Code (Claude Fable 5)

Add a configurable scheduled job that reclaims storage left behind by
uploads that were never committed (issue apache#3681):

- StagedFileCleanupJob, registered as a Dropwizard Managed lifecycle in
  FileService, runs every check interval on a single daemon thread
- path 1: DATASET_UPLOAD_SESSION rows older than the retention period
  are removed after aborting their LakeFS multipart upload (abort
  before delete, so a crash between the two steps is retried next
  round; HTTP 404 on abort is treated as already-aborted)
- path 2: LakeFS staged (uncommitted) objects whose mtime exceeds the
  retention period are reset via the existing branch-reset API; objects
  belonging to a non-expired upload session are excluded, staged
  deletions and orphaned repos are skipped
- config: storage.cleanup.enabled / retention-hours (default 72, above
  the 24h presigned-URL expiry so only non-resumable sessions are
  removed) / interval-minutes (default 60), all env-overridable
- every action is idempotent; per-item failures are logged and counted
  without aborting the batch; each round logs an audit summary
- file-service test suites now fork one JVM per suite: each
  testcontainers suite boots its own LakeFS/MinIO/Postgres stack and
  mutates JVM-wide singletons, which broke whichever suite ran second

Closes apache#3681
@github-actions github-actions Bot added feature dependencies Pull requests that update a dependency file common platform Non-amber Scala service paths labels Jun 12, 2026
@codecov-commenter

codecov-commenter commented Jun 12, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 69.09091% with 17 lines in your changes missing coverage. Please review.
✅ Project coverage is 52.50%. Comparing base (1572edf) to head (d00460a).
⚠️ Report is 27 commits behind head on main.

Files with missing lines Patch % Lines
...org/apache/texera/amber/config/StorageConfig.scala 0.00% 6 Missing ⚠️
.../scala/org/apache/texera/service/FileService.scala 0.00% 5 Missing ⚠️
...che/texera/service/util/StagedFileCleanupJob.scala 90.47% 2 Missing and 2 partials ⚠️
.../amber/core/storage/util/LakeFSStorageClient.scala 0.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #5643      +/-   ##
============================================
- Coverage     52.51%   52.50%   -0.01%     
- Complexity     2484     2550      +66     
============================================
  Files          1071     1086      +15     
  Lines         41363    41884     +521     
  Branches       4441     4482      +41     
============================================
+ Hits          21720    21990     +270     
- Misses        18371    18596     +225     
- Partials       1272     1298      +26     
Flag Coverage Δ *Carryforward flag
access-control-service 71.42% <ø> (+6.81%) ⬆️
agent-service 34.36% <ø> (ø) Carriedforward from 820a0c0
amber 52.65% <0.00%> (-0.66%) ⬇️
computing-unit-managing-service 1.65% <ø> (ø)
config-service 56.71% <ø> (ø)
file-service 59.63% <80.85%> (+21.41%) ⬆️ Carriedforward from 820a0c0
frontend 47.10% <ø> (ø) Carriedforward from 820a0c0
pyamber 90.72% <ø> (ø) Carriedforward from 820a0c0
python 90.75% <ø> (ø) Carriedforward from 820a0c0
workflow-compiling-service 58.69% <ø> (ø)

*This pull request uses carry forward flags. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions

github-actions Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

✅ No material benchmark regressions detected

🟢 15 better · 🔴 0 worse · ⚪ 0 noise (<±5%) · 0 without baseline

CI benchmark results are noisy; treat <±5% as noise unless repeated.

Dashboard · Run

config throughput MB/s latency max Δ latest / 7d
🟢 bs=10 sw=10 sl=64 537 0.328 16,995/28,292/28,292 us 🟢 -29.6% / 🟢 +30.7%
🟢 bs=100 sw=10 sl=64 1,230 0.751 81,616/93,374/93,374 us 🟢 -35.2% / 🟢 +37.2%
🟢 bs=1000 sw=10 sl=64 1,434 0.875 699,210/730,446/730,446 us 🟢 +28.3% / 🟢 +36.8%
Baseline details

Latest main 4fd395b from 2026-06-13T21:15:43.559Z

config metric PR latest main 7d avg Δ latest Δ 7d
bs=10 sw=10 sl=64 throughput 537 tuples/sec 425.86 tuples/sec 411.23 tuples/sec +26.1% +30.6%
bs=10 sw=10 sl=64 MB/s 0.328 MB/s 0.26 MB/s 0.251 MB/s +26.2% +30.7%
bs=10 sw=10 sl=64 p50 16,995 us 24,156 us 23,773 us -29.6% -28.5%
bs=10 sw=10 sl=64 p95 28,292 us 30,243 us 35,177 us -6.5% -19.6%
bs=10 sw=10 sl=64 p99 28,292 us 30,243 us 35,177 us -6.5% -19.6%
bs=100 sw=10 sl=64 throughput 1,230 tuples/sec 957.92 tuples/sec 896.95 tuples/sec +28.4% +37.1%
bs=100 sw=10 sl=64 MB/s 0.751 MB/s 0.585 MB/s 0.547 MB/s +28.4% +37.2%
bs=100 sw=10 sl=64 p50 81,616 us 103,271 us 111,596 us -21.0% -26.9%
bs=100 sw=10 sl=64 p95 93,374 us 144,102 us 139,415 us -35.2% -33.0%
bs=100 sw=10 sl=64 p99 93,374 us 144,102 us 139,415 us -35.2% -33.0%
bs=1000 sw=10 sl=64 throughput 1,434 tuples/sec 1,118 tuples/sec 1,048 tuples/sec +28.3% +36.8%
bs=1000 sw=10 sl=64 MB/s 0.875 MB/s 0.682 MB/s 0.64 MB/s +28.3% +36.8%
bs=1000 sw=10 sl=64 p50 699,210 us 892,725 us 966,517 us -21.7% -27.7%
bs=1000 sw=10 sl=64 p95 730,446 us 943,636 us 1,016,694 us -22.6% -28.2%
bs=1000 sw=10 sl=64 p99 730,446 us 943,636 us 1,016,694 us -22.6% -28.2%
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,372.41,200,128000,537,0.328,16994.67,28291.91,28291.91
1,100,10,64,20,1626.38,2000,1280000,1230,0.751,81615.51,93374.03,93374.03
2,1000,10,64,20,13948.42,20000,12800000,1434,0.875,699209.92,730445.62,730445.62

@Yicong-Huang

Copy link
Copy Markdown
Contributor

@eugenegujing thanks for the PR. Please add more tests to guard the behavior.

…type branches

Address review feedback (add more tests to guard the behavior) and the
Codecov gaps in StagedFileCleanupJob. Adds 7 cases to
StagedFileCleanupJobSpec covering branches the first 11 tests missed:

- start()/stop() lifecycle, including stop() before start() (the
  executor-null guard); the started daemon executor is always torn down
  in a finally
- session-cleanup path: an orphan session whose dataset has a null
  repository_name is deleted with no multipart abort attempted and no error
- staged-object path: a staged deletion (a REMOVED diff from deleting a
  committed object on the branch without committing) is skipped, not reset,
  with no error
- staged-object path: a dataset pointing at a non-existent LakeFS repo
  makes retrieveUncommittedObjects throw 404, which is caught and skipped
  without counting an error
- staged-object path: a staged content change (a CHANGED diff from
  re-uploading to a previously committed path) is reset to the committed
  version, exercising the CHANGED half of the object-write check that
  ADDED-only tests missed
- staged-object path across multiple datasets in one round: expired staged
  objects in two separate repos are both reset, and an active session in
  one dataset does not protect a same-named path in another, proving
  per-dataset keying

Test-only; extra DATASET rows and repos are removed in a finally so the
suite's single-dataset assumptions and per-test counts are unaffected.
@eugenegujing eugenegujing force-pushed the feat/staged-file-cleanup branch from 4021691 to 820a0c0 Compare June 13, 2026 21:14
@eugenegujing

Copy link
Copy Markdown
Contributor Author

@Yicong-Huang I have added 7 more tests for this. Could you please review it again?

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a scheduled, configurable cleanup job in file-service to reclaim storage used by abandoned (staged but uncommitted) dataset uploads, plus supporting config, LakeFS client helper, and isolation for testcontainers-based test suites.

Changes:

  • Introduce StagedFileCleanupJob to delete expired DATASET_UPLOAD_SESSION rows (after multipart abort) and reset expired staged LakeFS objects while protecting active sessions.
  • Wire the job into FileService behind new storage.cleanup.* config (with env overrides), and add a LakeFS helper to read staged object mtime.
  • Add an extensive StagedFileCleanupJobSpec and fork FileService test suites into separate JVMs to avoid shared-singleton/testcontainers interference.

Reviewed changes

Copilot reviewed 6 out of 7 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala New managed scheduled job implementing staged-upload cleanup logic and reporting.
file-service/src/main/scala/org/apache/texera/service/FileService.scala Registers the cleanup job with Dropwizard lifecycle when enabled.
common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala Adds getStagedObjectMtime helper for cleanup’s age-based filtering.
common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala Exposes cleanup enable/retention/interval config values.
common/config/src/main/resources/storage.conf Adds storage.cleanup configuration block with env var overrides and defaults.
file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala New integration-style spec covering session deletion, staged resets, idempotence, and failure handling.
build.sbt Forks FileService test suites into separate JVMs and groups them to run serially.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Eugene Gu <eugenegujing@outlook.com>

@Yicong-Huang Yicong-Huang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have some concern on the atomicity of the operation. Please see my inline comments for details.

…r-object 404 as a no-op

Address review feedback on the staged-file cleanup job.

- Path 1 (abandoned upload sessions): the session-row delete and the LakeFS
  multipart abort now run inside one DB transaction (SqlServer.withTransaction),
  deleting FIRST. If the abort then fails with a non-404 error, the transaction
  rolls back and the session row survives, so the next round retries instead of
  leaving an orphaned multipart with no tracking record. LakeFS is external and
  cannot truly enroll in a DB transaction, so a *successful* abort is not undone
  by a later failure -- but the abort is idempotent (re-aborting an already-aborted
  upload returns 404, treated as success), so partial state is never permanent.
  The transaction is per session (not per round) to avoid holding a DB connection
  open across many LakeFS HTTP calls.

- Path 2 (staged objects): an ApiException 404 from the per-object stat/reset is
  now treated as a successful no-op (a concurrent commit/reset, or another cleanup
  round, already removed it), matching the job's idempotent design instead of
  logging a warning and counting an error.

Adds failure-mode tests for cleaning a single item:
- a non-404 abort failure rolls back the row delete (the row survives) and is
  counted, so it is retried;
- a transiently-failing session is cleaned on a later round once the failure
  clears (self-heals, not stuck);
- a failing item does not prevent a healthy item in the same round from being
  cleaned (the healthy row is deleted, the failing row is kept for retry).
Together with existing tests these cover the named failure cases: no DB record
(orphan/null-repo session), no file found (already-aborted / repo 404), and a
generic/timeout LakeFS error (rolled back, retried).

Also renames the test section comments from F1/F2 to "Path 1 (session cleanup)" /
"Path 2 (staged objects)" to match the production-code naming.
@eugenegujing

Copy link
Copy Markdown
Contributor Author

@Yicong-Huang Thank you for the detailed review. It was really helpful. I've tried to address all three points in the latest commit. Here's what I did, please let me know if I've misunderstood anything.

On combining the steps in a transaction (comments 1 & 3): I've wrapped Path 1 so that, for each session, the row delete and the multipart abort run in one SqlServer.withTransaction block, with the delete staged first. My thinking is that if the abort then fails, the transaction rolls back and the row survives, so we shouldn't end up in the partial state you described (record gone but the LakeFS entry left behind), so it'd just be retried next round. Please correct me if there's a case I'm missing here.

One thing I wasn't sure how to fully solve: since LakeFS is an external service, I don't think it can really take part in a Postgres transaction (a DB rollback can't undo an abort that already went through), so I couldn't get true all-or-nothing across both systems. What I did instead was lean on ordering + idempotency. The DB write is gated on the abort, and the abort seems to be idempotent (re-aborting an already-aborted upload returns 404, which we treat as success), so in my understanding a successful abort followed by a later failure should self-heal on the next round. I also kept the transaction per-session rather than per-round, mainly so we don't hold a DB connection open across many LakeFS calls, but I'm happy to change the granularity if you'd prefer. For Path 2 (resetting uncommitted objects) there's no DB write, so I didn't see anything to wrap there; it also runs as an independent sweep that doesn't rely on the session records, which I think gives us a second chance to clean any orphaned object even if its row is already gone.

On testing the different step failures (comment 2): I added tests for each case you mentioned, checking the result is either rolled back or cleaned next round: no DB record (orphan / null-repository_name session), no file found (already-aborted 404 and repo-level 404), and a timeout / generic error (non-404 abort failure → delete rolled back, row survives, retried). I also added one for "cleaned on the next round" (a transient failure self-healing) and one for failure isolation (a failing item not blocking a healthy one in the same round). The only one I couldn't reproduce deterministically is the per-object 404 in Path 2. It's handled the same way in code, but I couldn't trigger it in a container test without faking the client, so I left a comment pointing to the equivalent repo-level 404 test instead. Let me know if you'd like me to cover that differently.

Thanks again for that and I'm happy to adjust any of this.

@chenlica

Copy link
Copy Markdown
Contributor

@carloea2 @xuang7 @aicam : please chime in as you did a related project before.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common dependencies Pull requests that update a dependency file feature platform Non-amber Scala service paths

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Automated cleanup of uploaded but uncommitted files

5 participants