diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala index cb66ea3e4f3..b6aba648d75 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala @@ -19,6 +19,7 @@ package org.apache.texera.amber.core.storage.util +import com.typesafe.scalalogging.LazyLogging import io.lakefs.clients.sdk._ import io.lakefs.clients.sdk.model.ResetCreation.TypeEnum import io.lakefs.clients.sdk.model._ @@ -33,11 +34,16 @@ import scala.jdk.CollectionConverters._ * LakeFSFileStorage provides high-level file storage operations using LakeFS, * similar to Git operations for version control and file management. */ -object LakeFSStorageClient { +object LakeFSStorageClient extends LazyLogging { // Maximum number of results per LakeFS API request (pagination page size) private val PageSize = 1000 + // Health-check retry settings: retry with exponential backoff before giving up. + // 5 attempts starting at 200ms (200, 400, 800, 1600ms) caps total wait at ~3s. + private val HealthCheckMaxAttempts = 5 + private val HealthCheckInitialDelayMillis = 200L + private lazy val apiClient: ApiClient = { val client = new ApiClient() client.setApiKey(StorageConfig.lakefsPassword) @@ -69,11 +75,50 @@ object LakeFSStorageClient { private val branchName: String = "main" def healthCheck(): Unit = { - try { + retryWithBackoff(HealthCheckMaxAttempts, HealthCheckInitialDelayMillis) { this.healthCheckApi.healthCheck().execute() - } catch { - case e: Exception => - throw new RuntimeException(s"Failed to connect to lake fs server: ${e.getMessage}") + } + } + + /** + * Runs `operation`, retrying on failure with exponential backoff (the delay + * doubles after each failed attempt) until it succeeds or `maxAttempts` is + * reached. The final failure is rethrown with the last exception as its cause. + * If interrupted while waiting, restores the interrupt status and fails fast. + * + * `sleep` is injectable so the backoff can be exercised in tests without real waiting. + */ + private[util] def retryWithBackoff( + maxAttempts: Int, + initialDelayMillis: Long, + sleep: Long => Unit = Thread.sleep + )(operation: => Unit): Unit = { + var attempt = 1 + var delayMillis = initialDelayMillis + while (true) { + try { + operation + return + } catch { + case ie: InterruptedException => + // Restore the interrupt status and fail fast rather than retrying. + Thread.currentThread().interrupt() + throw new RuntimeException("Interrupted while waiting to retry lake fs health check", ie) + case e: Exception => + if (attempt >= maxAttempts) { + throw new RuntimeException( + s"Failed to connect to lake fs server after $maxAttempts attempts: ${e.getMessage}", + e + ) + } + logger.warn( + s"LakeFS not reachable (attempt $attempt/$maxAttempts): ${e.getMessage}. " + + s"Retrying in ${delayMillis}ms..." + ) + sleep(delayMillis) + attempt += 1 + delayMillis *= 2 + } } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClientSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClientSpec.scala new file mode 100644 index 00000000000..56506cee1fd --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClientSpec.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.core.storage.util + +import org.scalatest.flatspec.AnyFlatSpec + +import scala.collection.mutable.ListBuffer + +class LakeFSStorageClientSpec extends AnyFlatSpec { + + "retryWithBackoff" should "run the operation once and not sleep when it succeeds immediately" in { + var attempts = 0 + val delays = ListBuffer.empty[Long] + LakeFSStorageClient.retryWithBackoff(5, 200L, delays += _) { + attempts += 1 + } + assert(attempts == 1) + assert(delays.isEmpty) + } + + it should "retry until success and double the delay after each failed attempt" in { + var attempts = 0 + val delays = ListBuffer.empty[Long] + LakeFSStorageClient.retryWithBackoff(5, 200L, delays += _) { + attempts += 1 + if (attempts < 3) throw new RuntimeException("transient") + } + assert(attempts == 3) + assert(delays.toList == List(200L, 400L)) + } + + it should "give up after maxAttempts and preserve the last failure as the cause" in { + var attempts = 0 + val cause = new RuntimeException("still down") + val ex = intercept[RuntimeException] { + LakeFSStorageClient.retryWithBackoff(3, 200L, _ => ()) { + attempts += 1 + throw cause + } + } + assert(attempts == 3) + assert(ex.getMessage.contains("after 3 attempts")) + assert(ex.getCause eq cause) + } + + it should "fail fast and restore the interrupt status when interrupted" in { + val ex = intercept[RuntimeException] { + LakeFSStorageClient.retryWithBackoff(5, 200L, _ => ()) { + throw new InterruptedException("interrupted") + } + } + // Thread.interrupted() both reads and clears the flag, so the interrupt was restored. + assert(Thread.interrupted()) + assert(ex.getCause.isInstanceOf[InterruptedException]) + } +}