From 9a22c5fd1086e2f2b2c7f008ab671d8ef14dacce Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Wed, 10 Jun 2026 20:53:32 -0700 Subject: [PATCH 1/6] added a notification for when users don't follow the appropriate template in issues or prs --- .../workflows/template-compliance-warning.yml | 195 ++++++++++++++++++ 1 file changed, 195 insertions(+) create mode 100644 .github/workflows/template-compliance-warning.yml diff --git a/.github/workflows/template-compliance-warning.yml b/.github/workflows/template-compliance-warning.yml new file mode 100644 index 00000000000..6ca71616b66 --- /dev/null +++ b/.github/workflows/template-compliance-warning.yml @@ -0,0 +1,195 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Post a non-blocking warning when a pull request (or issue) is opened +# without following our template, and clear it automatically once the +# author fixes the description. +# +# Designed to be cheap on CI: +# * Single `github-script` job, no build, no full checkout (only a +# sparse-checkout of the one message .txt file), so a run is a few +# seconds of an ubuntu-latest runner. +# * Triggers only on `opened` / `edited`, never on `synchronize`, so +# it does NOT run on every push to a PR branch. +# * Skips drafts and bots, so WIP and automation don't get nagged. +# * Posts a single sticky comment (idempotency marker) that is +# UPDATED in place while the template is incomplete and DELETED once +# it is followed, so it never piles up duplicate comments. +# +# Issue templates here are GitHub form (`.yaml`) templates whose +# required fields are already enforced at submission time, so for issues +# this only catches a fully blank body (e.g. a blank issue). PR +# templates cannot be enforced by GitHub, which is the main case this +# covers. +# +# Uses `pull_request_target` so PRs from forks are still checked. +# A `pull_request` run from a fork gets a read-only token and could not +# comment. +name: Template compliance warning +on: + issues: + types: [opened, edited] + pull_request_target: + types: [opened, edited] + +permissions: + issues: write + pull-requests: write + +jobs: + check-template: + if: github.event.sender.type != 'Bot' + runs-on: ubuntu-latest + steps: + # Check out only the warning message template. `pull_request_target` + # and `issues` both resolve to the trusted base branch (never the + # fork head), so reading this file is safe. Keeping the wording in a + # .txt file means editing the message does not touch workflow logic. + - uses: actions/checkout@v5 + with: + persist-credentials: false + sparse-checkout: .github/template-compliance-warning.txt + sparse-checkout-cone-mode: false + - uses: actions/github-script@v8 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + script: | + const isPR = context.eventName === 'pull_request_target'; + const subject = isPR + ? context.payload.pull_request + : context.payload.issue; + + // Drafts are work-in-progress; don't nag until they're ready. + if (isPR && subject.draft) { + core.info(`#${subject.number} is a draft; skipping.`); + return; + } + + const author = subject.user.login; + const issue_number = subject.number; + const kind = isPR ? 'pull request' : 'issue'; + const { owner, repo } = context.repo; + const body = subject.body || ''; + + // Strip HTML comments (the template's guidance) + // before judging whether a section actually has content. + const stripped = body.replace(//g, ''); + + // Build the list of problems with the description. Each entry + // is a user-facing bullet. An empty list means "compliant". + const problems = []; + + if (stripped.trim().length === 0) { + problems.push( + `The description is empty. Please open the ${kind} using ` + + `the provided template and fill it out.`, + ); + } else if (isPR) { + // PR template required sections (headings copied verbatim + // from .github/PULL_REQUEST_TEMPLATE). For each, capture the + // text from its heading to the next "### " heading (or end) + // and treat whitespace-only as not filled in. + const REQUIRED_SECTIONS = [ + 'What changes were proposed in this PR?', + 'How was this PR tested?', + 'Was this PR authored or co-authored using generative AI tooling?', + ]; + for (const heading of REQUIRED_SECTIONS) { + // Escape regex metacharacters in the heading text. + const esc = heading.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); + // The trailing `(?![\s\S])` is the end-of-string case (JS + // regex has no `\Z`); with the `m` flag a bare `$` would + // match every line end, not just the end of the body. + const re = new RegExp( + `^#{1,6}\\s*${esc}\\s*$([\\s\\S]*?)(?=^#{1,6}\\s|(?![\\s\\S]))`, + 'm', + ); + const m = stripped.match(re); + if (!m) { + problems.push( + `The **${heading}** section is missing; please keep ` + + `the template's headings.`, + ); + } else if (m[1].trim().length === 0) { + problems.push( + `The **${heading}** section is empty; please fill it in.`, + ); + } + } + } + + const MARKER = ''; + + // Find a previous warning comment from this workflow. + let existing = null; + try { + const comments = await github.paginate( + github.rest.issues.listComments, + { owner, repo, issue_number, per_page: 100 }, + ); + existing = comments.find((c) => (c.body || '').includes(MARKER)); + } catch (e) { + core.warning(`listComments on #${issue_number} failed: ${e.message}`); + // Without the comment list we can't safely de-dupe; bail to + // avoid posting a duplicate warning. + return; + } + + // Compliant now: remove any stale warning and stop. + if (problems.length === 0) { + core.info(`#${issue_number} follows the template.`); + if (existing) { + try { + await github.rest.issues.deleteComment({ + owner, repo, comment_id: existing.id, + }); + core.info(`Cleared resolved warning on #${issue_number}.`); + } catch (e) { + core.warning(`Failed to delete warning: ${e.message}`); + } + } + return; + } + + // Not compliant: render the message and post/update the sticky + // comment. + const fs = require('fs'); + const template = fs.readFileSync( + '.github/template-compliance-warning.txt', 'utf8', + ); + const details = problems.map((p) => `- ${p}`).join('\n'); + const message = MARKER + '\n' + template + .replaceAll('{{author}}', author) + .replaceAll('{{owner}}', owner) + .replaceAll('{{repo}}', repo) + .replaceAll('{{kind}}', kind) + .replaceAll('{{details}}', details); + + try { + if (existing) { + await github.rest.issues.updateComment({ + owner, repo, comment_id: existing.id, body: message, + }); + core.info(`Updated template warning on #${issue_number}.`); + } else { + await github.rest.issues.createComment({ + owner, repo, issue_number, body: message, + }); + core.info(`Posted template warning on #${issue_number}.`); + } + } catch (e) { + core.warning(`Failed to post warning on #${issue_number}: ${e.message}`); + } From f78721fee5a92427be66458c44427aa4986b3981 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Thu, 11 Jun 2026 02:15:52 -0700 Subject: [PATCH 2/6] changed issues to cover more templates --- .../workflows/template-compliance-warning.yml | 74 +++++++++++++------ 1 file changed, 53 insertions(+), 21 deletions(-) diff --git a/.github/workflows/template-compliance-warning.yml b/.github/workflows/template-compliance-warning.yml index 6ca71616b66..82b8e87cf26 100644 --- a/.github/workflows/template-compliance-warning.yml +++ b/.github/workflows/template-compliance-warning.yml @@ -29,11 +29,11 @@ # UPDATED in place while the template is incomplete and DELETED once # it is followed, so it never piles up duplicate comments. # -# Issue templates here are GitHub form (`.yaml`) templates whose -# required fields are already enforced at submission time, so for issues -# this only catches a fully blank body (e.g. a blank issue). PR -# templates cannot be enforced by GitHub, which is the main case this -# covers. +# Issues are matched to their template by GitHub issue type +# (Bug/Feature/Task) and checked against that template's `required: true` +# fields. Because every template sets a type, an issue with no recognized +# type is flagged outright as not using a template. PR templates cannot +# be enforced by GitHub, which is the main case this covers. # # Uses `pull_request_target` so PRs from forks are still checked. # A `pull_request` run from a fork gets a read-only token and could not @@ -88,26 +88,55 @@ jobs: // before judging whether a section actually has content. const stripped = body.replace(//g, ''); + // Pick the required sections for whichever template applies. + // PRs use the single PR template. Issues are matched by their + // GitHub issue type (set by the form template the author + // chose), so each issue is checked against the right + // template's fields. Only fields marked `required: true` in + // the templates are listed here. + const PR_SECTIONS = [ + 'What changes were proposed in this PR?', + 'How was this PR tested?', + 'Was this PR authored or co-authored using generative AI tooling?', + ]; + const ISSUE_SECTIONS = { + Bug: ['What happened?', 'How to reproduce?', 'Version/Branch'], + Feature: ['Feature Summary', 'Proposed Solution or Design'], + Task: ['Task Summary'], + }; + let requiredSections = null; + if (isPR) { + requiredSections = PR_SECTIONS; + } else { + const typeName = subject.type && subject.type.name; + requiredSections = ISSUE_SECTIONS[typeName] || null; + } + // Build the list of problems with the description. Each entry // is a user-facing bullet. An empty list means "compliant". const problems = []; - if (stripped.trim().length === 0) { + if (!isPR && !requiredSections) { + // All our issue templates set an issue type, so a missing or + // unrecognized type means no template was used (e.g. a blank + // issue). Flag it outright. + problems.push( + `This ${kind} doesn't appear to use one of our templates ` + + `(Bug, Feature, or Task). Please open it using a template ` + + `so the required details are captured.`, + ); + } else if (stripped.trim().length === 0) { problems.push( `The description is empty. Please open the ${kind} using ` + `the provided template and fill it out.`, ); - } else if (isPR) { - // PR template required sections (headings copied verbatim - // from .github/PULL_REQUEST_TEMPLATE). For each, capture the - // text from its heading to the next "### " heading (or end) - // and treat whitespace-only as not filled in. - const REQUIRED_SECTIONS = [ - 'What changes were proposed in this PR?', - 'How was this PR tested?', - 'Was this PR authored or co-authored using generative AI tooling?', - ]; - for (const heading of REQUIRED_SECTIONS) { + } else { + // PR, or an issue with a recognized type: check each required + // section. Capture the text from its heading to the next + // heading (or end of body), treating a blank value or + // GitHub's "_No response_" placeholder (shown for an empty + // field) as not filled in. + for (const heading of requiredSections) { // Escape regex metacharacters in the heading text. const esc = heading.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); // The trailing `(?![\s\S])` is the end-of-string case (JS @@ -123,10 +152,13 @@ jobs: `The **${heading}** section is missing; please keep ` + `the template's headings.`, ); - } else if (m[1].trim().length === 0) { - problems.push( - `The **${heading}** section is empty; please fill it in.`, - ); + } else { + const content = m[1].trim(); + if (content.length === 0 || content === '_No response_') { + problems.push( + `The **${heading}** section is empty; please fill it in.`, + ); + } } } } From 1eb49272d6afd0935bedf5c1b85e1ab3f6048ccb Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Fri, 12 Jun 2026 14:27:03 -0700 Subject: [PATCH 3/6] fix(file-service): retry LakeFS health check on startup --- .../storage/util/LakeFSStorageClient.scala | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala index 613255173e5..d29d45facac 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala @@ -19,6 +19,7 @@ package org.apache.texera.amber.core.storage.util +import com.typesafe.scalalogging.LazyLogging import io.lakefs.clients.sdk._ import io.lakefs.clients.sdk.model.ResetCreation.TypeEnum import io.lakefs.clients.sdk.model._ @@ -33,11 +34,15 @@ import scala.jdk.CollectionConverters._ * LakeFSFileStorage provides high-level file storage operations using LakeFS, * similar to Git operations for version control and file management. */ -object LakeFSStorageClient { +object LakeFSStorageClient extends LazyLogging { // Maximum number of results per LakeFS API request (pagination page size) private val PageSize = 1000 + // Health-check retry settings: retry on failure before giving up. + private val HealthCheckMaxAttempts = 10 + private val HealthCheckRetryDelayMillis = 3000L + private lazy val apiClient: ApiClient = { val client = new ApiClient() client.setApiKey(StorageConfig.lakefsPassword) @@ -69,11 +74,25 @@ object LakeFSStorageClient { private val branchName: String = "main" def healthCheck(): Unit = { - try { - this.healthCheckApi.healthCheck().execute() - } catch { - case e: Exception => - throw new RuntimeException(s"Failed to connect to lake fs server: ${e.getMessage}") + var attempt = 1 + while (true) { + try { + this.healthCheckApi.healthCheck().execute() + return + } catch { + case e: Exception => + if (attempt >= HealthCheckMaxAttempts) { + throw new RuntimeException( + s"Failed to connect to lake fs server after $HealthCheckMaxAttempts attempts: ${e.getMessage}" + ) + } + logger.warn( + s"LakeFS not reachable (attempt $attempt/$HealthCheckMaxAttempts): ${e.getMessage}. " + + s"Retrying in ${HealthCheckRetryDelayMillis}ms..." + ) + Thread.sleep(HealthCheckRetryDelayMillis) + attempt += 1 + } } } From 668e5998f5cbb369b23efad98a1668e145d5677a Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Fri, 12 Jun 2026 19:37:58 -0700 Subject: [PATCH 4/6] remove unrelated changes --- .../workflows/template-compliance-warning.yml | 227 ------------------ 1 file changed, 227 deletions(-) delete mode 100644 .github/workflows/template-compliance-warning.yml diff --git a/.github/workflows/template-compliance-warning.yml b/.github/workflows/template-compliance-warning.yml deleted file mode 100644 index 82b8e87cf26..00000000000 --- a/.github/workflows/template-compliance-warning.yml +++ /dev/null @@ -1,227 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Post a non-blocking warning when a pull request (or issue) is opened -# without following our template, and clear it automatically once the -# author fixes the description. -# -# Designed to be cheap on CI: -# * Single `github-script` job, no build, no full checkout (only a -# sparse-checkout of the one message .txt file), so a run is a few -# seconds of an ubuntu-latest runner. -# * Triggers only on `opened` / `edited`, never on `synchronize`, so -# it does NOT run on every push to a PR branch. -# * Skips drafts and bots, so WIP and automation don't get nagged. -# * Posts a single sticky comment (idempotency marker) that is -# UPDATED in place while the template is incomplete and DELETED once -# it is followed, so it never piles up duplicate comments. -# -# Issues are matched to their template by GitHub issue type -# (Bug/Feature/Task) and checked against that template's `required: true` -# fields. Because every template sets a type, an issue with no recognized -# type is flagged outright as not using a template. PR templates cannot -# be enforced by GitHub, which is the main case this covers. -# -# Uses `pull_request_target` so PRs from forks are still checked. -# A `pull_request` run from a fork gets a read-only token and could not -# comment. -name: Template compliance warning -on: - issues: - types: [opened, edited] - pull_request_target: - types: [opened, edited] - -permissions: - issues: write - pull-requests: write - -jobs: - check-template: - if: github.event.sender.type != 'Bot' - runs-on: ubuntu-latest - steps: - # Check out only the warning message template. `pull_request_target` - # and `issues` both resolve to the trusted base branch (never the - # fork head), so reading this file is safe. Keeping the wording in a - # .txt file means editing the message does not touch workflow logic. - - uses: actions/checkout@v5 - with: - persist-credentials: false - sparse-checkout: .github/template-compliance-warning.txt - sparse-checkout-cone-mode: false - - uses: actions/github-script@v8 - with: - github-token: ${{ secrets.GITHUB_TOKEN }} - script: | - const isPR = context.eventName === 'pull_request_target'; - const subject = isPR - ? context.payload.pull_request - : context.payload.issue; - - // Drafts are work-in-progress; don't nag until they're ready. - if (isPR && subject.draft) { - core.info(`#${subject.number} is a draft; skipping.`); - return; - } - - const author = subject.user.login; - const issue_number = subject.number; - const kind = isPR ? 'pull request' : 'issue'; - const { owner, repo } = context.repo; - const body = subject.body || ''; - - // Strip HTML comments (the template's guidance) - // before judging whether a section actually has content. - const stripped = body.replace(//g, ''); - - // Pick the required sections for whichever template applies. - // PRs use the single PR template. Issues are matched by their - // GitHub issue type (set by the form template the author - // chose), so each issue is checked against the right - // template's fields. Only fields marked `required: true` in - // the templates are listed here. - const PR_SECTIONS = [ - 'What changes were proposed in this PR?', - 'How was this PR tested?', - 'Was this PR authored or co-authored using generative AI tooling?', - ]; - const ISSUE_SECTIONS = { - Bug: ['What happened?', 'How to reproduce?', 'Version/Branch'], - Feature: ['Feature Summary', 'Proposed Solution or Design'], - Task: ['Task Summary'], - }; - let requiredSections = null; - if (isPR) { - requiredSections = PR_SECTIONS; - } else { - const typeName = subject.type && subject.type.name; - requiredSections = ISSUE_SECTIONS[typeName] || null; - } - - // Build the list of problems with the description. Each entry - // is a user-facing bullet. An empty list means "compliant". - const problems = []; - - if (!isPR && !requiredSections) { - // All our issue templates set an issue type, so a missing or - // unrecognized type means no template was used (e.g. a blank - // issue). Flag it outright. - problems.push( - `This ${kind} doesn't appear to use one of our templates ` + - `(Bug, Feature, or Task). Please open it using a template ` + - `so the required details are captured.`, - ); - } else if (stripped.trim().length === 0) { - problems.push( - `The description is empty. Please open the ${kind} using ` + - `the provided template and fill it out.`, - ); - } else { - // PR, or an issue with a recognized type: check each required - // section. Capture the text from its heading to the next - // heading (or end of body), treating a blank value or - // GitHub's "_No response_" placeholder (shown for an empty - // field) as not filled in. - for (const heading of requiredSections) { - // Escape regex metacharacters in the heading text. - const esc = heading.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); - // The trailing `(?![\s\S])` is the end-of-string case (JS - // regex has no `\Z`); with the `m` flag a bare `$` would - // match every line end, not just the end of the body. - const re = new RegExp( - `^#{1,6}\\s*${esc}\\s*$([\\s\\S]*?)(?=^#{1,6}\\s|(?![\\s\\S]))`, - 'm', - ); - const m = stripped.match(re); - if (!m) { - problems.push( - `The **${heading}** section is missing; please keep ` + - `the template's headings.`, - ); - } else { - const content = m[1].trim(); - if (content.length === 0 || content === '_No response_') { - problems.push( - `The **${heading}** section is empty; please fill it in.`, - ); - } - } - } - } - - const MARKER = ''; - - // Find a previous warning comment from this workflow. - let existing = null; - try { - const comments = await github.paginate( - github.rest.issues.listComments, - { owner, repo, issue_number, per_page: 100 }, - ); - existing = comments.find((c) => (c.body || '').includes(MARKER)); - } catch (e) { - core.warning(`listComments on #${issue_number} failed: ${e.message}`); - // Without the comment list we can't safely de-dupe; bail to - // avoid posting a duplicate warning. - return; - } - - // Compliant now: remove any stale warning and stop. - if (problems.length === 0) { - core.info(`#${issue_number} follows the template.`); - if (existing) { - try { - await github.rest.issues.deleteComment({ - owner, repo, comment_id: existing.id, - }); - core.info(`Cleared resolved warning on #${issue_number}.`); - } catch (e) { - core.warning(`Failed to delete warning: ${e.message}`); - } - } - return; - } - - // Not compliant: render the message and post/update the sticky - // comment. - const fs = require('fs'); - const template = fs.readFileSync( - '.github/template-compliance-warning.txt', 'utf8', - ); - const details = problems.map((p) => `- ${p}`).join('\n'); - const message = MARKER + '\n' + template - .replaceAll('{{author}}', author) - .replaceAll('{{owner}}', owner) - .replaceAll('{{repo}}', repo) - .replaceAll('{{kind}}', kind) - .replaceAll('{{details}}', details); - - try { - if (existing) { - await github.rest.issues.updateComment({ - owner, repo, comment_id: existing.id, body: message, - }); - core.info(`Updated template warning on #${issue_number}.`); - } else { - await github.rest.issues.createComment({ - owner, repo, issue_number, body: message, - }); - core.info(`Posted template warning on #${issue_number}.`); - } - } catch (e) { - core.warning(`Failed to post warning on #${issue_number}: ${e.message}`); - } From c0a1e1a68e53a23d120cc536054646a4c4bb0250 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Fri, 12 Jun 2026 19:50:00 -0700 Subject: [PATCH 5/6] expand exception --- .../amber/core/storage/util/LakeFSStorageClient.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala index d29d45facac..bec77214e89 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala @@ -80,10 +80,15 @@ object LakeFSStorageClient extends LazyLogging { this.healthCheckApi.healthCheck().execute() return } catch { + case ie: InterruptedException => + // Restore the interrupt status and fail fast rather than retrying. + Thread.currentThread().interrupt() + throw new RuntimeException("Interrupted while waiting to retry lake fs health check", ie) case e: Exception => if (attempt >= HealthCheckMaxAttempts) { throw new RuntimeException( - s"Failed to connect to lake fs server after $HealthCheckMaxAttempts attempts: ${e.getMessage}" + s"Failed to connect to lake fs server after $HealthCheckMaxAttempts attempts: ${e.getMessage}", + e ) } logger.warn( From 01bfd9949b7d29c6d9bd2100cfed86bc8ca604a4 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Sat, 13 Jun 2026 19:17:54 -0700 Subject: [PATCH 6/6] fix(file-service): use exponential backoff for LakeFS health check + add tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the constant 3s × 10-attempt retry with exponential backoff: 5 attempts starting at 200ms (200, 400, 800, 1600ms), capping total wait at ~3s instead of ~30s. Extract the retry loop into a testable retryWithBackoff helper with an injectable sleep, and add LakeFSStorageClientSpec covering immediate success, retry-then-success, attempt exhaustion (preserving the original cause), and interrupt fail-fast. --- .../storage/util/LakeFSStorageClient.scala | 39 +++++++--- .../util/LakeFSStorageClientSpec.scala | 73 +++++++++++++++++++ 2 files changed, 103 insertions(+), 9 deletions(-) create mode 100644 common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClientSpec.scala diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala index bec77214e89..548579cde5f 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala @@ -39,9 +39,10 @@ object LakeFSStorageClient extends LazyLogging { // Maximum number of results per LakeFS API request (pagination page size) private val PageSize = 1000 - // Health-check retry settings: retry on failure before giving up. - private val HealthCheckMaxAttempts = 10 - private val HealthCheckRetryDelayMillis = 3000L + // Health-check retry settings: retry with exponential backoff before giving up. + // 5 attempts starting at 200ms (200, 400, 800, 1600ms) caps total wait at ~3s. + private val HealthCheckMaxAttempts = 5 + private val HealthCheckInitialDelayMillis = 200L private lazy val apiClient: ApiClient = { val client = new ApiClient() @@ -74,10 +75,29 @@ object LakeFSStorageClient extends LazyLogging { private val branchName: String = "main" def healthCheck(): Unit = { + retryWithBackoff(HealthCheckMaxAttempts, HealthCheckInitialDelayMillis) { + this.healthCheckApi.healthCheck().execute() + } + } + + /** + * Runs `operation`, retrying on failure with exponential backoff (the delay + * doubles after each failed attempt) until it succeeds or `maxAttempts` is + * reached. The final failure is rethrown with the last exception as its cause. + * If interrupted while waiting, restores the interrupt status and fails fast. + * + * `sleep` is injectable so the backoff can be exercised in tests without real waiting. + */ + private[util] def retryWithBackoff( + maxAttempts: Int, + initialDelayMillis: Long, + sleep: Long => Unit = Thread.sleep + )(operation: => Unit): Unit = { var attempt = 1 + var delayMillis = initialDelayMillis while (true) { try { - this.healthCheckApi.healthCheck().execute() + operation return } catch { case ie: InterruptedException => @@ -85,18 +105,19 @@ object LakeFSStorageClient extends LazyLogging { Thread.currentThread().interrupt() throw new RuntimeException("Interrupted while waiting to retry lake fs health check", ie) case e: Exception => - if (attempt >= HealthCheckMaxAttempts) { + if (attempt >= maxAttempts) { throw new RuntimeException( - s"Failed to connect to lake fs server after $HealthCheckMaxAttempts attempts: ${e.getMessage}", + s"Failed to connect to lake fs server after $maxAttempts attempts: ${e.getMessage}", e ) } logger.warn( - s"LakeFS not reachable (attempt $attempt/$HealthCheckMaxAttempts): ${e.getMessage}. " + - s"Retrying in ${HealthCheckRetryDelayMillis}ms..." + s"LakeFS not reachable (attempt $attempt/$maxAttempts): ${e.getMessage}. " + + s"Retrying in ${delayMillis}ms..." ) - Thread.sleep(HealthCheckRetryDelayMillis) + sleep(delayMillis) attempt += 1 + delayMillis *= 2 } } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClientSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClientSpec.scala new file mode 100644 index 00000000000..56506cee1fd --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClientSpec.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.core.storage.util + +import org.scalatest.flatspec.AnyFlatSpec + +import scala.collection.mutable.ListBuffer + +class LakeFSStorageClientSpec extends AnyFlatSpec { + + "retryWithBackoff" should "run the operation once and not sleep when it succeeds immediately" in { + var attempts = 0 + val delays = ListBuffer.empty[Long] + LakeFSStorageClient.retryWithBackoff(5, 200L, delays += _) { + attempts += 1 + } + assert(attempts == 1) + assert(delays.isEmpty) + } + + it should "retry until success and double the delay after each failed attempt" in { + var attempts = 0 + val delays = ListBuffer.empty[Long] + LakeFSStorageClient.retryWithBackoff(5, 200L, delays += _) { + attempts += 1 + if (attempts < 3) throw new RuntimeException("transient") + } + assert(attempts == 3) + assert(delays.toList == List(200L, 400L)) + } + + it should "give up after maxAttempts and preserve the last failure as the cause" in { + var attempts = 0 + val cause = new RuntimeException("still down") + val ex = intercept[RuntimeException] { + LakeFSStorageClient.retryWithBackoff(3, 200L, _ => ()) { + attempts += 1 + throw cause + } + } + assert(attempts == 3) + assert(ex.getMessage.contains("after 3 attempts")) + assert(ex.getCause eq cause) + } + + it should "fail fast and restore the interrupt status when interrupted" in { + val ex = intercept[RuntimeException] { + LakeFSStorageClient.retryWithBackoff(5, 200L, _ => ()) { + throw new InterruptedException("interrupted") + } + } + // Thread.interrupted() both reads and clears the flag, so the interrupt was restored. + assert(Thread.interrupted()) + assert(ex.getCause.isInstanceOf[InterruptedException]) + } +}