Skip to content

Commit 216d90a

Browse files
committed
[FLINK-38799][runtime] Rename LoadingWeight to TaskExecutionLoad
1 parent 4980860 commit 216d90a

27 files changed

+273
-234
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
2929
import org.apache.flink.runtime.jobmaster.SlotInfo;
3030
import org.apache.flink.runtime.jobmaster.SlotRequestId;
31-
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
31+
import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
3232
import org.apache.flink.runtime.slots.ResourceRequirement;
3333
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
3434
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -66,13 +66,13 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem
6666
private static final class FulfilledAllocation {
6767
final AllocationID allocationID;
6868
final ResourceID taskExecutorID;
69-
final LoadingWeight loadingWeight;
69+
final TaskExecutionLoad taskExecutionLoad;
7070

71-
FulfilledAllocation(PhysicalSlot slot, LoadingWeight loadingWeight) {
71+
FulfilledAllocation(PhysicalSlot slot, TaskExecutionLoad taskExecutionLoad) {
7272
this.allocationID = Preconditions.checkNotNull(slot.getAllocationId());
7373
this.taskExecutorID =
7474
Preconditions.checkNotNull(slot.getTaskManagerLocation().getResourceID());
75-
this.loadingWeight = Preconditions.checkNotNull(loadingWeight);
75+
this.taskExecutionLoad = Preconditions.checkNotNull(taskExecutionLoad);
7676
}
7777

7878
@Override
@@ -83,7 +83,7 @@ public boolean equals(Object o) {
8383
FulfilledAllocation that = (FulfilledAllocation) o;
8484
return Objects.equals(allocationID, that.allocationID)
8585
&& Objects.equals(taskExecutorID, that.taskExecutorID)
86-
&& Objects.equals(loadingWeight, that.loadingWeight);
86+
&& Objects.equals(taskExecutionLoad, that.taskExecutionLoad);
8787
}
8888
}
8989

@@ -261,16 +261,16 @@ void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
261261
}
262262
}
263263

264-
private Map<ResourceID, LoadingWeight> getTaskExecutorsLoadingView() {
265-
final Map<ResourceID, LoadingWeight> result = new HashMap<>();
264+
private Map<ResourceID, TaskExecutionLoad> getTaskExecutorsLoadingView() {
265+
final Map<ResourceID, TaskExecutionLoad> result = new HashMap<>();
266266
Collection<FulfilledAllocation> fulfilledAllocations = fulfilledRequests.values();
267267
for (FulfilledAllocation allocation : fulfilledAllocations) {
268268
result.compute(
269269
allocation.taskExecutorID,
270270
(ignoredID, oldLoading) ->
271271
Objects.isNull(oldLoading)
272-
? allocation.loadingWeight
273-
: oldLoading.merge(allocation.loadingWeight));
272+
? allocation.taskExecutionLoad
273+
: oldLoading.merge(allocation.taskExecutionLoad));
274274
}
275275
return result;
276276
}
@@ -353,7 +353,8 @@ private PhysicalSlot reserveFreeSlot(AllocationID allocationId, PendingRequest p
353353
getDeclarativeSlotPool()
354354
.reserveFreeSlot(allocationId, pendingRequest.getResourceProfile());
355355
fulfilledRequests.put(
356-
slotRequestId, new FulfilledAllocation(slot, pendingRequest.getLoading()));
356+
slotRequestId,
357+
new FulfilledAllocation(slot, pendingRequest.getTaskExecutionLoad()));
357358
return slot;
358359
}
359360

flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import org.apache.flink.runtime.clusterframework.types.AllocationID;
2222
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
2323
import org.apache.flink.runtime.jobmaster.SlotRequestId;
24-
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
25-
import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
24+
import org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad;
25+
import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
2626
import org.apache.flink.util.Preconditions;
2727

2828
import javax.annotation.Nonnull;
@@ -32,13 +32,13 @@
3232
import java.util.Set;
3333
import java.util.concurrent.CompletableFuture;
3434

35-
public final class PendingRequest implements WeightLoadable {
35+
public final class PendingRequest implements HasTaskExecutionLoad {
3636

3737
private final SlotRequestId slotRequestId;
3838

3939
private final ResourceProfile resourceProfile;
4040

41-
private final LoadingWeight loadingWeight;
41+
private final TaskExecutionLoad taskExecutionLoad;
4242

4343
private final HashSet<AllocationID> preferredAllocations;
4444

@@ -51,12 +51,12 @@ public final class PendingRequest implements WeightLoadable {
5151
private PendingRequest(
5252
SlotRequestId slotRequestId,
5353
ResourceProfile resourceProfile,
54-
LoadingWeight loadingWeight,
54+
TaskExecutionLoad taskExecutionLoad,
5555
Collection<AllocationID> preferredAllocations,
5656
boolean isBatchRequest) {
5757
this.slotRequestId = slotRequestId;
5858
this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
59-
this.loadingWeight = Preconditions.checkNotNull(loadingWeight);
59+
this.taskExecutionLoad = Preconditions.checkNotNull(taskExecutionLoad);
6060
this.preferredAllocations = new HashSet<>(preferredAllocations);
6161
this.isBatchRequest = isBatchRequest;
6262
this.slotFuture = new CompletableFuture<>();
@@ -66,19 +66,19 @@ private PendingRequest(
6666
static PendingRequest createBatchRequest(
6767
SlotRequestId slotRequestId,
6868
ResourceProfile resourceProfile,
69-
LoadingWeight loadingWeight,
69+
TaskExecutionLoad taskExecutionLoad,
7070
Collection<AllocationID> preferredAllocations) {
7171
return new PendingRequest(
72-
slotRequestId, resourceProfile, loadingWeight, preferredAllocations, true);
72+
slotRequestId, resourceProfile, taskExecutionLoad, preferredAllocations, true);
7373
}
7474

7575
public static PendingRequest createNormalRequest(
7676
SlotRequestId slotRequestId,
7777
ResourceProfile resourceProfile,
78-
LoadingWeight loadingWeight,
78+
TaskExecutionLoad taskExecutionLoad,
7979
Collection<AllocationID> preferredAllocations) {
8080
return new PendingRequest(
81-
slotRequestId, resourceProfile, loadingWeight, preferredAllocations, false);
81+
slotRequestId, resourceProfile, taskExecutionLoad, preferredAllocations, false);
8282
}
8383

8484
SlotRequestId getSlotRequestId() {
@@ -134,8 +134,8 @@ public String toString() {
134134
+ slotRequestId
135135
+ ", resourceProfile="
136136
+ resourceProfile
137-
+ ", loadingWeight="
138-
+ loadingWeight
137+
+ ", taskExecutionLoad="
138+
+ taskExecutionLoad
139139
+ ", preferredAllocations="
140140
+ preferredAllocations
141141
+ ", isBatchRequest="
@@ -146,7 +146,7 @@ public String toString() {
146146
}
147147

148148
@Override
149-
public @Nonnull LoadingWeight getLoading() {
150-
return loadingWeight;
149+
public @Nonnull TaskExecutionLoad getTaskExecutionLoad() {
150+
return taskExecutionLoad;
151151
}
152152
}

flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,31 +21,31 @@
2121
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
2222
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
2323
import org.apache.flink.runtime.jobmaster.SlotRequestId;
24-
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
25-
import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
24+
import org.apache.flink.runtime.scheduler.taskexecload.HasTaskExecutionLoad;
25+
import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
2626

2727
import javax.annotation.Nonnull;
2828

2929
/** Represents a request for a physical slot. */
30-
public class PhysicalSlotRequest implements WeightLoadable {
30+
public class PhysicalSlotRequest implements HasTaskExecutionLoad {
3131

3232
private final SlotRequestId slotRequestId;
3333

3434
private final SlotProfile slotProfile;
3535

36-
private final LoadingWeight loadingWeight;
36+
private final TaskExecutionLoad taskExecutionLoad;
3737

3838
private final boolean slotWillBeOccupiedIndefinitely;
3939

4040
public PhysicalSlotRequest(
4141
final SlotRequestId slotRequestId,
4242
final SlotProfile slotProfile,
43-
final LoadingWeight loadingWeight,
43+
final TaskExecutionLoad taskExecutionLoad,
4444
final boolean slotWillBeOccupiedIndefinitely) {
4545

4646
this.slotRequestId = slotRequestId;
4747
this.slotProfile = slotProfile;
48-
this.loadingWeight = loadingWeight;
48+
this.taskExecutionLoad = taskExecutionLoad;
4949
this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
5050
}
5151

@@ -70,18 +70,18 @@ public PendingRequest toPendingRequest() {
7070
? PendingRequest.createNormalRequest(
7171
slotRequestId,
7272
slotProfile.getPhysicalSlotResourceProfile(),
73-
loadingWeight,
73+
taskExecutionLoad,
7474
slotProfile.getPreferredAllocations())
7575
: PendingRequest.createBatchRequest(
7676
slotRequestId,
7777
slotProfile.getPhysicalSlotResourceProfile(),
78-
loadingWeight,
78+
taskExecutionLoad,
7979
slotProfile.getPreferredAllocations());
8080
}
8181

8282
@Override
83-
public @Nonnull LoadingWeight getLoading() {
84-
return loadingWeight;
83+
public @Nonnull TaskExecutionLoad getTaskExecutionLoad() {
84+
return taskExecutionLoad;
8585
}
8686

8787
/** Result of a {@link PhysicalSlotRequest}. */

flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.flink.runtime.clusterframework.types.AllocationID;
2222
import org.apache.flink.runtime.clusterframework.types.ResourceID;
2323
import org.apache.flink.runtime.jobmaster.SlotRequestId;
24-
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
24+
import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
2525
import org.apache.flink.util.Preconditions;
2626

2727
import javax.annotation.Nonnull;
@@ -58,7 +58,7 @@ public static RequestSlotMatchingStrategy create(RequestSlotMatchingStrategy rol
5858
public Collection<RequestSlotMatch> matchRequestsAndSlots(
5959
Collection<? extends PhysicalSlot> slots,
6060
Collection<PendingRequest> pendingRequests,
61-
Map<ResourceID, LoadingWeight> taskExecutorsLoadingWeight) {
61+
Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap) {
6262
final Collection<RequestSlotMatch> requestSlotMatches = new ArrayList<>();
6363

6464
final Map<AllocationID, PhysicalSlot> freeSlots =
@@ -97,11 +97,11 @@ public Collection<RequestSlotMatch> matchRequestsAndSlots(
9797
.getPreferredAllocations()
9898
.contains(freeSlot.getAllocationId())) {
9999
requestSlotMatches.add(RequestSlotMatch.createFor(pendingRequest, freeSlot));
100-
LoadingWeight deltaLoading = pendingRequest.getLoading();
101-
taskExecutorsLoadingWeight.compute(
100+
TaskExecutionLoad deltaLoad = pendingRequest.getTaskExecutionLoad();
101+
taskExecutionLoadMap.compute(
102102
freeSlot.getTaskManagerLocation().getResourceID(),
103103
(ignoredId, oldLoad) ->
104-
oldLoad == null ? deltaLoading : deltaLoading.merge(oldLoad));
104+
oldLoad == null ? deltaLoad : deltaLoad.merge(oldLoad));
105105
pendingRequestIterator.remove();
106106
freeSlotsIterator.remove();
107107
break;
@@ -113,7 +113,7 @@ public Collection<RequestSlotMatch> matchRequestsAndSlots(
113113
if (!freeSlots.isEmpty() && !unmatchedRequests.isEmpty()) {
114114
requestSlotMatches.addAll(
115115
rollbackStrategy.matchRequestsAndSlots(
116-
freeSlots.values(), unmatchedRequests, taskExecutorsLoadingWeight));
116+
freeSlots.values(), unmatchedRequests, taskExecutionLoadMap));
117117
}
118118

119119
return requestSlotMatches;

flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.flink.runtime.jobmaster.slotpool;
2020

2121
import org.apache.flink.runtime.clusterframework.types.ResourceID;
22-
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
22+
import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
2323

2424
import java.util.Collection;
2525
import java.util.Map;
@@ -32,13 +32,13 @@ public interface RequestSlotMatchingStrategy {
3232
*
3333
* @param slots slots to match
3434
* @param pendingRequests slot requests to match
35-
* @param taskExecutorsLoadingWeight current task executors loading weight information
35+
* @param taskExecutionLoadMap task execution load information by resource
3636
* @return resulting matches of this operation
3737
*/
3838
Collection<RequestSlotMatch> matchRequestsAndSlots(
3939
Collection<? extends PhysicalSlot> slots,
4040
Collection<PendingRequest> pendingRequests,
41-
Map<ResourceID, LoadingWeight> taskExecutorsLoadingWeight);
41+
Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap);
4242

4343
/** Result class representing matches. */
4444
final class RequestSlotMatch {

flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.flink.runtime.jobmaster.slotpool;
2020

2121
import org.apache.flink.runtime.clusterframework.types.ResourceID;
22-
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
22+
import org.apache.flink.runtime.scheduler.taskexecload.TaskExecutionLoad;
2323

2424
import java.util.ArrayList;
2525
import java.util.Collection;
@@ -38,7 +38,7 @@ public enum SimpleRequestSlotMatchingStrategy implements RequestSlotMatchingStra
3838
public Collection<RequestSlotMatch> matchRequestsAndSlots(
3939
Collection<? extends PhysicalSlot> slots,
4040
Collection<PendingRequest> pendingRequests,
41-
Map<ResourceID, LoadingWeight> taskExecutorsLoadingWeight) {
41+
Map<ResourceID, TaskExecutionLoad> taskExecutionLoadMap) {
4242
final Collection<RequestSlotMatch> resultingMatches = new ArrayList<>();
4343

4444
// if pendingRequests has a special order, then let's preserve it

0 commit comments

Comments
 (0)