Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.texera.amber.core.storage.util

import com.typesafe.scalalogging.LazyLogging
import io.lakefs.clients.sdk._
import io.lakefs.clients.sdk.model.ResetCreation.TypeEnum
import io.lakefs.clients.sdk.model._
Expand All @@ -33,11 +34,16 @@ import scala.jdk.CollectionConverters._
* LakeFSFileStorage provides high-level file storage operations using LakeFS,
* similar to Git operations for version control and file management.
*/
object LakeFSStorageClient {
object LakeFSStorageClient extends LazyLogging {

// Maximum number of results per LakeFS API request (pagination page size)
private val PageSize = 1000

// Health-check retry settings: retry with exponential backoff before giving up.
// 5 attempts starting at 200ms (200, 400, 800, 1600ms) caps total wait at ~3s.
private val HealthCheckMaxAttempts = 5
private val HealthCheckInitialDelayMillis = 200L

private lazy val apiClient: ApiClient = {
val client = new ApiClient()
client.setApiKey(StorageConfig.lakefsPassword)
Expand Down Expand Up @@ -69,11 +75,50 @@ object LakeFSStorageClient {
private val branchName: String = "main"

def healthCheck(): Unit = {
try {
retryWithBackoff(HealthCheckMaxAttempts, HealthCheckInitialDelayMillis) {
this.healthCheckApi.healthCheck().execute()
} catch {
case e: Exception =>
throw new RuntimeException(s"Failed to connect to lake fs server: ${e.getMessage}")
}
}

/**
* Runs `operation`, retrying on failure with exponential backoff (the delay
* doubles after each failed attempt) until it succeeds or `maxAttempts` is
* reached. The final failure is rethrown with the last exception as its cause.
* If interrupted while waiting, restores the interrupt status and fails fast.
*
* `sleep` is injectable so the backoff can be exercised in tests without real waiting.
*/
private[util] def retryWithBackoff(
maxAttempts: Int,
initialDelayMillis: Long,
sleep: Long => Unit = Thread.sleep
)(operation: => Unit): Unit = {
var attempt = 1
var delayMillis = initialDelayMillis
while (true) {
try {
operation
return
} catch {
case ie: InterruptedException =>
// Restore the interrupt status and fail fast rather than retrying.
Thread.currentThread().interrupt()
throw new RuntimeException("Interrupted while waiting to retry lake fs health check", ie)
case e: Exception =>
if (attempt >= maxAttempts) {
throw new RuntimeException(
s"Failed to connect to lake fs server after $maxAttempts attempts: ${e.getMessage}",
e
)
}
logger.warn(
s"LakeFS not reachable (attempt $attempt/$maxAttempts): ${e.getMessage}. " +
s"Retrying in ${delayMillis}ms..."
)
sleep(delayMillis)
attempt += 1
delayMillis *= 2
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.core.storage.util

import org.scalatest.flatspec.AnyFlatSpec

import scala.collection.mutable.ListBuffer

class LakeFSStorageClientSpec extends AnyFlatSpec {

"retryWithBackoff" should "run the operation once and not sleep when it succeeds immediately" in {
var attempts = 0
val delays = ListBuffer.empty[Long]
LakeFSStorageClient.retryWithBackoff(5, 200L, delays += _) {
attempts += 1
}
assert(attempts == 1)
assert(delays.isEmpty)
}

it should "retry until success and double the delay after each failed attempt" in {
var attempts = 0
val delays = ListBuffer.empty[Long]
LakeFSStorageClient.retryWithBackoff(5, 200L, delays += _) {
attempts += 1
if (attempts < 3) throw new RuntimeException("transient")
}
assert(attempts == 3)
assert(delays.toList == List(200L, 400L))
}

it should "give up after maxAttempts and preserve the last failure as the cause" in {
var attempts = 0
val cause = new RuntimeException("still down")
val ex = intercept[RuntimeException] {
LakeFSStorageClient.retryWithBackoff(3, 200L, _ => ()) {
attempts += 1
throw cause
}
}
assert(attempts == 3)
assert(ex.getMessage.contains("after 3 attempts"))
assert(ex.getCause eq cause)
}

it should "fail fast and restore the interrupt status when interrupted" in {
val ex = intercept[RuntimeException] {
LakeFSStorageClient.retryWithBackoff(5, 200L, _ => ()) {
throw new InterruptedException("interrupted")
}
}
// Thread.interrupted() both reads and clears the flag, so the interrupt was restored.
assert(Thread.interrupted())
assert(ex.getCause.isInstanceOf[InterruptedException])
}
}
Loading