Skip to content

Commit 5af78e2

Browse files
eemariozhuzhurk
authored andcommitted
[FLINK-39173][runtime] Support application-level blob storage
1 parent dfd785c commit 5af78e2

33 files changed

+2714
-45
lines changed

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@
2727
import org.apache.flink.configuration.DeploymentOptions;
2828
import org.apache.flink.configuration.StateRecoveryOptions;
2929
import org.apache.flink.core.execution.RecoveryClaimMode;
30+
import org.apache.flink.runtime.blob.BlobClient;
31+
import org.apache.flink.runtime.blob.PermanentBlobKey;
32+
import org.apache.flink.runtime.client.ClientUtils;
3033
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
3134
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
35+
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
3236
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
3337
import org.apache.flink.runtime.rest.handler.HandlerRequest;
3438
import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -40,6 +44,8 @@
4044

4145
import javax.annotation.Nonnull;
4246

47+
import java.net.InetAddress;
48+
import java.net.InetSocketAddress;
4349
import java.nio.file.Path;
4450
import java.time.Duration;
4551
import java.util.Map;
@@ -103,7 +109,18 @@ public CompletableFuture<JarRunApplicationResponseBody> handleRequest(
103109
new PackagedProgramApplication(
104110
applicationId, program, effectiveConfiguration, false, true, false, false);
105111

106-
return gateway.submitApplication(application, timeout)
112+
// TODO upload user jar to blob in HA mode once application resource cleanup is supported
113+
final boolean isHaEnabled =
114+
HighAvailabilityMode.isHighAvailabilityModeActivated(configuration);
115+
CompletableFuture<PermanentBlobKey> jarUploadFuture =
116+
CompletableFuture.completedFuture(null);
117+
118+
return jarUploadFuture
119+
.thenCompose(
120+
blobKey -> {
121+
// TODO record blob key in the application for HA recovery
122+
return gateway.submitApplication(application, timeout);
123+
})
107124
.handle(
108125
(acknowledge, throwable) -> {
109126
if (throwable != null) {
@@ -117,6 +134,48 @@ public CompletableFuture<JarRunApplicationResponseBody> handleRequest(
117134
});
118135
}
119136

137+
private CompletableFuture<PermanentBlobKey> uploadJarFile(
138+
final DispatcherGateway gateway,
139+
final JarHandlerContext context,
140+
final ApplicationID applicationId) {
141+
CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
142+
CompletableFuture<InetAddress> blobServerAddressFuture =
143+
gateway.getBlobServerAddress(timeout);
144+
145+
return blobServerPortFuture
146+
.thenCombine(
147+
blobServerAddressFuture,
148+
(blobServerPort, blobServerAddress) ->
149+
new InetSocketAddress(
150+
blobServerAddress.getHostName(), blobServerPort))
151+
.thenApply(
152+
blobSocketAddress -> {
153+
try {
154+
org.apache.flink.core.fs.Path jarPath =
155+
new org.apache.flink.core.fs.Path(
156+
context.getJarFile().toString());
157+
PermanentBlobKey blobKey =
158+
ClientUtils.uploadUserJarForApplication(
159+
jarPath,
160+
applicationId,
161+
() ->
162+
new BlobClient(
163+
blobSocketAddress, configuration));
164+
log.info(
165+
"Uploaded user jar for application {} to blob server with blob key {}.",
166+
applicationId,
167+
blobKey);
168+
return blobKey;
169+
} catch (Exception e) {
170+
throw new CompletionException(
171+
new RestHandlerException(
172+
"Could not upload jar file to blob server.",
173+
HttpResponseStatus.INTERNAL_SERVER_ERROR,
174+
e));
175+
}
176+
});
177+
}
178+
120179
private SavepointRestoreSettings getSavepointRestoreSettings(
121180
final @Nonnull HandlerRequest<JarRunApplicationRequestBody> request,
122181
final Configuration effectiveConfiguration)

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,10 @@ JobID getJobId() {
238238
public Optional<ApplicationID> getApplicationId() {
239239
return Optional.ofNullable(applicationId);
240240
}
241+
242+
public Path getJarFile() {
243+
return jarFile;
244+
}
241245
}
242246

243247
private static List<URL> getClasspaths(Configuration configuration) {

flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.runtime.blob;
2020

21+
import org.apache.flink.api.common.ApplicationID;
2122
import org.apache.flink.api.common.JobID;
2223
import org.apache.flink.configuration.BlobServerOptions;
2324
import org.apache.flink.configuration.Configuration;
@@ -207,6 +208,94 @@ protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IO
207208
}
208209
}
209210

211+
/**
212+
* Returns local copy of the file for the BLOB with the given key.
213+
*
214+
* <p>The method will first attempt to serve the BLOB from its local cache. If the BLOB is not
215+
* in the cache, the method will try to download it from this cache's BLOB server via a
216+
* distributed BLOB store (if available) or direct end-to-end download.
217+
*
218+
* @param applicationId ID of the application this blob belongs to
219+
* @param blobKey The key of the desired BLOB.
220+
* @return file referring to the local storage location of the BLOB.
221+
* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB
222+
* server.
223+
*/
224+
protected File getFileInternal(ApplicationID applicationId, BlobKey blobKey)
225+
throws IOException {
226+
checkArgument(blobKey != null, "BLOB key cannot be null.");
227+
228+
final File localFile =
229+
BlobUtils.getStorageLocation(storageDir.deref(), applicationId, blobKey);
230+
readWriteLock.readLock().lock();
231+
232+
try {
233+
if (localFile.exists()) {
234+
return localFile;
235+
}
236+
} finally {
237+
readWriteLock.readLock().unlock();
238+
}
239+
240+
// first try the distributed blob store (if available)
241+
// use a temporary file (thread-safe without locking)
242+
File incomingFile = createTemporaryFilename();
243+
try {
244+
try {
245+
if (blobView.get(applicationId, blobKey, incomingFile)) {
246+
// now move the temp file to our local cache atomically
247+
readWriteLock.writeLock().lock();
248+
try {
249+
BlobUtils.moveTempFileToStore(
250+
incomingFile, applicationId, blobKey, localFile, log, null);
251+
} finally {
252+
readWriteLock.writeLock().unlock();
253+
}
254+
255+
return localFile;
256+
}
257+
} catch (Exception e) {
258+
log.info(
259+
"Failed to copy from blob store. Downloading from BLOB server instead.", e);
260+
}
261+
262+
final InetSocketAddress currentServerAddress = serverAddress;
263+
264+
if (currentServerAddress != null) {
265+
// fallback: download from the BlobServer
266+
BlobClient.downloadFromBlobServer(
267+
applicationId,
268+
blobKey,
269+
incomingFile,
270+
currentServerAddress,
271+
blobClientConfig,
272+
numFetchRetries);
273+
274+
readWriteLock.writeLock().lock();
275+
try {
276+
BlobUtils.moveTempFileToStore(
277+
incomingFile, applicationId, blobKey, localFile, log, null);
278+
} finally {
279+
readWriteLock.writeLock().unlock();
280+
}
281+
} else {
282+
throw new IOException(
283+
"Cannot download from BlobServer, because the server address is unknown.");
284+
}
285+
286+
return localFile;
287+
} finally {
288+
// delete incomingFile from a failed download
289+
if (!incomingFile.delete() && incomingFile.exists()) {
290+
log.warn(
291+
"Could not delete the staging file {} for blob key {} and application {}.",
292+
incomingFile,
293+
blobKey,
294+
applicationId);
295+
}
296+
}
297+
}
298+
210299
/**
211300
* Returns the port the BLOB server is listening on.
212301
*

0 commit comments

Comments
 (0)