Skip to content

Commit 36a8fd3

Browse files
committed
[FLINK-38718][runtime] Display the number of tasks at slots level on the detailed TaskManager page.
1 parent e42cdd1 commit 36a8fd3

File tree

14 files changed

+212
-16
lines changed

14 files changed

+212
-16
lines changed

docs/layouts/shortcodes/generated/rest_v1_dispatcher.html

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5441,6 +5441,9 @@
54415441
"jobId" : {
54425442
"type" : "any"
54435443
},
5444+
"numberOfTasks" : {
5445+
"type" : "integer"
5446+
},
54445447
"resource" : {
54455448
"type" : "object",
54465449
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"

flink-runtime-web/src/test/resources/rest_api_v1.snapshot

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4190,6 +4190,9 @@
41904190
"jobId" : {
41914191
"type" : "any"
41924192
},
4193+
"numberOfTasks" : {
4194+
"type" : "integer"
4195+
},
41934196
"resource" : {
41944197
"type" : "object",
41954198
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"

flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ export interface TaskManagerDetail {
3838

3939
export interface AllocatedSlot {
4040
jobId: string;
41+
numberOfTasks: number;
4142
resource: Resources;
4243
}
4344

flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@
396396
<tr>
397397
<th>#</th>
398398
<th>Job ID</th>
399+
<th>Tasks</th>
399400
<th>CPU (cores)</th>
400401
<th>Task Heap memory (MB)</th>
401402
<th>Task Off-Heap memory (MB)</th>
@@ -409,6 +410,7 @@
409410
<strong>{{ i | number }}</strong>
410411
</td>
411412
<td>{{ slot.jobId }}</td>
413+
<td>{{ slot.numberOfTasks }}</td>
412414
<td>{{ slot.resource.cpuCores | number }}</td>
413415
<td>{{ slot.resource.taskHeapMemory | number }}</td>
414416
<td>{{ slot.resource.taskOffHeapMemory | number }}</td>

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import java.time.Duration;
4343
import java.util.HashSet;
44+
import java.util.Objects;
4445
import java.util.Optional;
4546
import java.util.Set;
4647
import java.util.concurrent.CompletableFuture;
@@ -295,16 +296,15 @@ public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
295296
private boolean syncAllocatedSlotStatus(SlotStatus slotStatus, TaskManagerInfo taskManager) {
296297
final AllocationID allocationId = Preconditions.checkNotNull(slotStatus.getAllocationID());
297298
final JobID jobId = Preconditions.checkNotNull(slotStatus.getJobID());
299+
final int numberOfTasks = slotStatus.getNumberOfTasks();
298300
try (MdcUtils.MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
299301
final ResourceProfile resourceProfile =
300302
Preconditions.checkNotNull(slotStatus.getResourceProfile());
301303

302-
if (taskManager.getAllocatedSlots().containsKey(allocationId)) {
303-
if (taskManager.getAllocatedSlots().get(allocationId).getState()
304-
== SlotState.PENDING) {
304+
TaskManagerSlotInformation slot = taskManager.getAllocatedSlots().get(allocationId);
305+
if (Objects.nonNull(slot)) {
306+
if (slot.getState() == SlotState.PENDING) {
305307
// Allocation Complete
306-
final TaskManagerSlotInformation slot =
307-
taskManager.getAllocatedSlots().get(allocationId);
308308
pendingSlotAllocations.remove(slot.getAllocationId());
309309
taskManagerTracker.notifySlotStatus(
310310
slot.getAllocationId(),
@@ -313,6 +313,7 @@ private boolean syncAllocatedSlotStatus(SlotStatus slotStatus, TaskManagerInfo t
313313
slot.getResourceProfile(),
314314
SlotState.ALLOCATED);
315315
}
316+
slot.setNumberOfTasks(numberOfTasks);
316317
return true;
317318
} else {
318319
Preconditions.checkState(
@@ -325,6 +326,7 @@ private boolean syncAllocatedSlotStatus(SlotStatus slotStatus, TaskManagerInfo t
325326
resourceProfile,
326327
SlotState.ALLOCATED);
327328
resourceTracker.notifyAcquiredResource(jobId, resourceProfile);
329+
taskManager.getAllocatedSlots().get(allocationId).setNumberOfTasks(numberOfTasks);
328330
return false;
329331
}
330332
}

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -803,7 +803,12 @@ public Collection<SlotInfo> getAllocatedSlotsOf(InstanceID instanceID) {
803803
.map(Map::values)
804804
.orElse(Collections.emptyList())
805805
.stream()
806-
.map(slot -> new SlotInfo(slot.getJobId(), slot.getResourceProfile()))
806+
.map(
807+
slot ->
808+
new SlotInfo(
809+
slot.getJobId(),
810+
slot.getResourceProfile(),
811+
slot.getNumberOfTasks()))
807812
.collect(Collectors.toList());
808813
}
809814

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerSlot.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
2727
import org.apache.flink.util.Preconditions;
2828

29+
import javax.annotation.Nullable;
30+
2931
import static org.apache.flink.util.Preconditions.checkArgument;
3032
import static org.apache.flink.util.Preconditions.checkNotNull;
3133

@@ -51,6 +53,8 @@ public class FineGrainedTaskManagerSlot implements TaskManagerSlotInformation {
5153
/** Current state of this slot. Should be either PENDING or ALLOCATED. */
5254
private SlotState state;
5355

56+
private @Nullable Integer numberOfTasks = null;
57+
5458
public FineGrainedTaskManagerSlot(
5559
AllocationID allocationId,
5660
JobID jobId,
@@ -72,6 +76,17 @@ public ResourceProfile getResourceProfile() {
7276
return resourceProfile;
7377
}
7478

79+
@Override
80+
public void setNumberOfTasks(int numberOfTasks) {
81+
this.numberOfTasks = numberOfTasks;
82+
}
83+
84+
@Nullable
85+
@Override
86+
public Integer getNumberOfTasks() {
87+
return numberOfTasks;
88+
}
89+
7590
@Override
7691
public SlotState getState() {
7792
return state;

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotInformation.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,19 @@ default boolean isMatchingRequirement(ResourceProfile required) {
6060
* @return resource profile of this slot
6161
*/
6262
ResourceProfile getResourceProfile();
63+
64+
/**
65+
* Set the number of tasks that locate at the current slot.
66+
*
67+
* @param numberOfTasks The number of tasks at the current slot.
68+
*/
69+
void setNumberOfTasks(int numberOfTasks);
70+
71+
/**
72+
* Get the number of tasks that locate at the current slot.
73+
*
74+
* @return The number of tasks that locate at the current slot.
75+
*/
76+
@Nullable
77+
Integer getNumberOfTasks();
6378
}

flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/SlotInfo.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
3333
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
3434

35+
import javax.annotation.Nullable;
36+
3537
import java.io.Serializable;
3638
import java.util.Objects;
3739

@@ -46,31 +48,44 @@ public class SlotInfo implements ResponseBody, Serializable {
4648

4749
public static final String FIELD_NAME_JOB_ID = "jobId";
4850

51+
public static final String FIELD_NAME_NUMBER_OF_TASKS = "numberOfTasks";
52+
4953
@JsonProperty(FIELD_NAME_RESOURCE)
5054
private final ResourceProfileInfo resource;
5155

5256
@JsonProperty(FIELD_NAME_JOB_ID)
5357
@JsonSerialize(using = JobIDSerializer.class)
5458
private final JobID jobId;
5559

60+
@JsonProperty(FIELD_NAME_NUMBER_OF_TASKS)
61+
private final @Nullable Integer numberOfTasks;
62+
5663
@JsonCreator
5764
public SlotInfo(
5865
@JsonDeserialize(using = JobIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_ID)
5966
JobID jobId,
60-
@JsonProperty(FIELD_NAME_RESOURCE) ResourceProfileInfo resource) {
67+
@JsonProperty(FIELD_NAME_RESOURCE) ResourceProfileInfo resource,
68+
@Nullable @JsonProperty(FIELD_NAME_NUMBER_OF_TASKS) Integer numberOfTasks) {
6169
this.jobId = Preconditions.checkNotNull(jobId);
6270
this.resource = Preconditions.checkNotNull(resource);
71+
this.numberOfTasks = numberOfTasks;
6372
}
6473

65-
public SlotInfo(JobID jobId, ResourceProfile resource) {
66-
this(jobId, ResourceProfileInfo.fromResourceProfile(resource));
74+
public SlotInfo(JobID jobId, ResourceProfile resource, @Nullable Integer numberOfTasks) {
75+
this(jobId, ResourceProfileInfo.fromResourceProfile(resource), numberOfTasks);
6776
}
6877

6978
@JsonIgnore
7079
public JobID getJobId() {
7180
return jobId;
7281
}
7382

83+
@Nullable
84+
@JsonIgnore
85+
public Integer getNumberOfTasks() {
86+
return numberOfTasks;
87+
}
88+
7489
@JsonIgnore
7590
public ResourceProfileInfo getResource() {
7691
return resource;
@@ -85,11 +100,13 @@ public boolean equals(Object o) {
85100
return false;
86101
}
87102
SlotInfo that = (SlotInfo) o;
88-
return Objects.equals(jobId, that.jobId) && Objects.equals(resource, that.resource);
103+
return Objects.equals(jobId, that.jobId)
104+
&& Objects.equals(resource, that.resource)
105+
&& Objects.equals(numberOfTasks, that.numberOfTasks);
89106
}
90107

91108
@Override
92109
public int hashCode() {
93-
return Objects.hash(jobId, resource);
110+
return Objects.hash(jobId, resource, numberOfTasks);
94111
}
95112
}

flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.runtime.taskexecutor;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.api.common.JobID;
2223
import org.apache.flink.runtime.clusterframework.types.AllocationID;
2324
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -49,19 +50,33 @@ public class SlotStatus implements Serializable {
4950
*/
5051
private final JobID jobID;
5152

53+
private final int numberOfTasks;
54+
55+
@VisibleForTesting
5256
public SlotStatus(SlotID slotID, ResourceProfile resourceProfile) {
53-
this(slotID, resourceProfile, null, null);
57+
this(slotID, resourceProfile, null, null, 0);
5458
}
5559

60+
@VisibleForTesting
5661
public SlotStatus(
5762
SlotID slotID,
5863
ResourceProfile resourceProfile,
5964
JobID jobID,
6065
AllocationID allocationID) {
66+
this(slotID, resourceProfile, jobID, allocationID, 0);
67+
}
68+
69+
public SlotStatus(
70+
SlotID slotID,
71+
ResourceProfile resourceProfile,
72+
JobID jobID,
73+
AllocationID allocationID,
74+
int numberOfTasks) {
6175
this.slotID = checkNotNull(slotID, "slotID cannot be null");
6276
this.resourceProfile = checkNotNull(resourceProfile, "profile cannot be null");
6377
this.allocationID = allocationID;
6478
this.jobID = jobID;
79+
this.numberOfTasks = numberOfTasks;
6580
}
6681

6782
/**
@@ -100,6 +115,10 @@ public JobID getJobID() {
100115
return jobID;
101116
}
102117

118+
public int getNumberOfTasks() {
119+
return numberOfTasks;
120+
}
121+
103122
@Override
104123
public boolean equals(Object o) {
105124
if (this == o) {
@@ -143,6 +162,8 @@ public String toString() {
143162
+ allocationID
144163
+ ", jobID="
145164
+ jobID
165+
+ ", numberOfTasks="
166+
+ numberOfTasks
146167
+ ", resourceProfile="
147168
+ resourceProfile
148169
+ '}';

0 commit comments

Comments
 (0)