Skip to content

Commit efbbb4e

Browse files
committed
[FLINK-38719][runtime] Display the number of tasks on the TaskManagers page.
1 parent 0c126de commit efbbb4e

File tree

18 files changed

+82
-1
lines changed

18 files changed

+82
-1
lines changed

docs/layouts/shortcodes/generated/rest_v1_dispatcher.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5240,6 +5240,9 @@
52405240
"freeSlots" : {
52415241
"type" : "integer"
52425242
},
5243+
"assignedTasks" : {
5244+
"type" : "integer"
5245+
},
52435246
"hardware" : {
52445247
"type" : "object",
52455248
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
@@ -5464,6 +5467,9 @@
54645467
"freeSlots" : {
54655468
"type" : "integer"
54665469
},
5470+
"assignedTasks" : {
5471+
"type" : "integer"
5472+
},
54675473
"hardware" : {
54685474
"type" : "object",
54695475
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",

flink-runtime-web/src/test/resources/rest_api_v1.snapshot

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3931,6 +3931,9 @@
39313931
"freeSlots" : {
39323932
"type" : "integer"
39333933
},
3934+
"assignedTasks" : {
3935+
"type" : "integer"
3936+
},
39343937
"totalResource" : {
39353938
"type" : "object",
39363939
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
@@ -4093,6 +4096,9 @@
40934096
"freeSlots" : {
40944097
"type" : "integer"
40954098
},
4099+
"assignedTasks" : {
4100+
"type" : "integer"
4101+
},
40964102
"totalResource" : {
40974103
"type" : "object",
40984104
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",

flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export interface TaskManagerDetail {
2727
timeSinceLastHeartbeat: number;
2828
slotsNumber: number;
2929
freeSlots: number;
30+
assignedTasks: number;
3031
hardware: Hardware;
3132
metrics: Metrics;
3233
memoryConfiguration: MemoryConfiguration;
@@ -68,6 +69,7 @@ export interface TaskManagersItem {
6869
timeSinceLastHeartbeat: number;
6970
slotsNumber: number;
7071
freeSlots: number;
72+
assignedTasks: number;
7173
hardware: Hardware;
7274
blocked?: boolean;
7375
}

flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
<th [nzSortFn]="sortHeartBeatFn" [nzWidth]="'160px'">Last Heartbeat</th>
3333
<th [nzSortFn]="sortSlotsNumberFn" [nzWidth]="'90px'">All Slots</th>
3434
<th [nzSortFn]="sortFreeSlotsFn" [nzWidth]="'100px'">Free Slots</th>
35+
<th [nzSortFn]="sortAssignedTasksFn" [nzWidth]="'100px'">Assigned Tasks</th>
3536
<th [nzSortFn]="sortCpuCoresFn" [nzWidth]="'110px'">CPU Cores</th>
3637
<th [nzSortFn]="sortPhysicalMemoryFn" [nzWidth]="'120px'">Physical MEM</th>
3738
<th [nzSortFn]="sortFreeMemoryFn" [nzWidth]="'130px'">JVM Heap Size</th>
@@ -53,6 +54,7 @@
5354
<td>{{ manager.timeSinceLastHeartbeat | date: 'yyyy-MM-dd HH:mm:ss' }}</td>
5455
<td>{{ manager.slotsNumber }}</td>
5556
<td>{{ manager.freeSlots }}</td>
57+
<td>{{ manager.assignedTasks }}</td>
5658
<td>{{ manager.hardware.cpuCores }}</td>
5759
<td [attr.title]="manager.hardware.physicalMemory + ' bytes'">
5860
{{ manager.hardware.physicalMemory | humanizeBytes }}

flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ export class TaskManagerListComponent implements OnInit, OnDestroy {
4949
public readonly sortHeartBeatFn = createSortFn(item => item.timeSinceLastHeartbeat);
5050
public readonly sortSlotsNumberFn = createSortFn(item => item.slotsNumber);
5151
public readonly sortFreeSlotsFn = createSortFn(item => item.freeSlots);
52+
public readonly sortAssignedTasksFn = createSortFn(item => item.assignedTasks);
5253
public readonly sortCpuCoresFn = createSortFn(item => item.hardware?.cpuCores);
5354
public readonly sortPhysicalMemoryFn = createSortFn(item => item.hardware?.physicalMemory);
5455
public readonly sortFreeMemoryFn = createSortFn(item => item.hardware?.freeMemory);

flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.html

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
<flink-blocked-badge *ngIf="taskManagerDetail?.blocked"></flink-blocked-badge>
2323
</div>
2424
<nz-descriptions *ngIf="taskManagerDetail" nzBordered nzSize="small">
25-
<nz-descriptions-item [nzSpan]="2" nzTitle="Path">
25+
<nz-descriptions-item [nzSpan]="1" nzTitle="Path">
2626
{{ taskManagerDetail.path }}
2727
</nz-descriptions-item>
2828
<nz-descriptions-item [nzSpan]="1" nzTitle="Free/All Slots">
2929
{{ taskManagerDetail.freeSlots }} / {{ taskManagerDetail.slotsNumber }}
3030
</nz-descriptions-item>
31+
<nz-descriptions-item [nzSpan]="1" nzTitle="Assigned Tasks">
32+
{{ taskManagerDetail.assignedTasks }}
33+
</nz-descriptions-item>
3134
<nz-descriptions-item [nzSpan]="1" nzTitle="Last Heartbeat">
3235
{{ taskManagerDetail.timeSinceLastHeartbeat | date: 'yyyy-MM-dd HH:mm:ss' }}
3336
</nz-descriptions-item>

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,7 @@ public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Dur
687687
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
688688
slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
689689
slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
690+
slotManager.getAssignedTasksOf(taskExecutor.getInstanceID()),
690691
slotManager.getRegisteredResourceOf(taskExecutor.getInstanceID()),
691692
slotManager.getFreeResourceOf(taskExecutor.getInstanceID()),
692693
taskExecutor.getHardwareDescription(),
@@ -717,6 +718,7 @@ public CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo
717718
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
718719
slotManager.getNumberRegisteredSlotsOf(instanceId),
719720
slotManager.getNumberFreeSlotsOf(instanceId),
721+
slotManager.getAssignedTasksOf(instanceId),
720722
slotManager.getRegisteredResourceOf(instanceId),
721723
slotManager.getFreeResourceOf(instanceId),
722724
taskExecutor.getHardwareDescription(),

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ClusterResourceStatisticsProvider.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
/** Provides statistics of cluster resources. */
2525
public interface ClusterResourceStatisticsProvider {
2626

27+
/** Get total number of tasks assigned to the current instance. slots. */
28+
int getAssignedTasks(InstanceID instanceId);
29+
2730
/** Get total number of registered slots. */
2831
int getNumberRegisteredSlots();
2932

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,11 @@ private Set<PendingTaskManagerId> allocateTaskManagersAccordingTo(
755755
// Legacy APIs
756756
// ---------------------------------------------------------------------------------------------
757757

758+
@Override
759+
public int getAssignedTasksOf(InstanceID instanceId) {
760+
return taskManagerTracker.getAssignedTasks(instanceId);
761+
}
762+
758763
@Override
759764
public int getNumberRegisteredSlots() {
760765
return taskManagerTracker.getNumberRegisteredSlots();

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.HashMap;
3636
import java.util.HashSet;
3737
import java.util.Map;
38+
import java.util.Objects;
3839
import java.util.Optional;
3940
import java.util.Set;
4041
import java.util.stream.Collectors;
@@ -315,6 +316,22 @@ public Collection<PendingTaskManager> getPendingTaskManagers() {
315316
Collections.emptySet()));
316317
}
317318

319+
@Override
320+
public int getAssignedTasks(InstanceID instanceId) {
321+
FineGrainedTaskManagerRegistration taskManagerRegistration =
322+
taskManagerRegistrations.get(instanceId);
323+
if (Objects.isNull(taskManagerRegistration)) {
324+
return 0;
325+
}
326+
int totalAssignedTasks = 0;
327+
for (TaskManagerSlotInformation slot :
328+
taskManagerRegistration.getAllocatedSlots().values()) {
329+
final int assignedTasks = slot.getAssignedTasks();
330+
totalAssignedTasks = totalAssignedTasks + assignedTasks;
331+
}
332+
return totalAssignedTasks;
333+
}
334+
318335
@Override
319336
public int getNumberRegisteredSlots() {
320337
return taskManagerRegistrations.values().stream()

0 commit comments

Comments
 (0)