Skip to content

Commit 7e0f3b2

Browse files
committed
[FLINK-38719][runtime] Display the number of tasks on the TaskManagers page.
1 parent 36a8fd3 commit 7e0f3b2

File tree

18 files changed

+102
-1
lines changed

18 files changed

+102
-1
lines changed

docs/layouts/shortcodes/generated/rest_v1_dispatcher.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5240,6 +5240,9 @@
52405240
"freeSlots" : {
52415241
"type" : "integer"
52425242
},
5243+
"numberOfTasks" : {
5244+
"type" : "integer"
5245+
},
52435246
"hardware" : {
52445247
"type" : "object",
52455248
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
@@ -5464,6 +5467,9 @@
54645467
"freeSlots" : {
54655468
"type" : "integer"
54665469
},
5470+
"numberOfTasks" : {
5471+
"type" : "integer"
5472+
},
54675473
"hardware" : {
54685474
"type" : "object",
54695475
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",

flink-runtime-web/src/test/resources/rest_api_v1.snapshot

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3931,6 +3931,9 @@
39313931
"freeSlots" : {
39323932
"type" : "integer"
39333933
},
3934+
"numberOfTasks" : {
3935+
"type" : "integer"
3936+
},
39343937
"totalResource" : {
39353938
"type" : "object",
39363939
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
@@ -4093,6 +4096,9 @@
40934096
"freeSlots" : {
40944097
"type" : "integer"
40954098
},
4099+
"numberOfTasks" : {
4100+
"type" : "integer"
4101+
},
40964102
"totalResource" : {
40974103
"type" : "object",
40984104
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",

flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export interface TaskManagerDetail {
2727
timeSinceLastHeartbeat: number;
2828
slotsNumber: number;
2929
freeSlots: number;
30+
numberOfTasks: number;
3031
hardware: Hardware;
3132
metrics: Metrics;
3233
memoryConfiguration: MemoryConfiguration;
@@ -68,6 +69,7 @@ export interface TaskManagersItem {
6869
timeSinceLastHeartbeat: number;
6970
slotsNumber: number;
7071
freeSlots: number;
72+
numberOfTasks: number;
7173
hardware: Hardware;
7274
blocked?: boolean;
7375
}

flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
<th [nzSortFn]="sortHeartBeatFn" [nzWidth]="'160px'">Last Heartbeat</th>
3333
<th [nzSortFn]="sortSlotsNumberFn" [nzWidth]="'90px'">All Slots</th>
3434
<th [nzSortFn]="sortFreeSlotsFn" [nzWidth]="'100px'">Free Slots</th>
35+
<th [nzSortFn]="sortTasksFn" [nzWidth]="'100px'">Tasks</th>
3536
<th [nzSortFn]="sortCpuCoresFn" [nzWidth]="'110px'">CPU Cores</th>
3637
<th [nzSortFn]="sortPhysicalMemoryFn" [nzWidth]="'120px'">Physical MEM</th>
3738
<th [nzSortFn]="sortFreeMemoryFn" [nzWidth]="'130px'">JVM Heap Size</th>
@@ -53,6 +54,7 @@
5354
<td>{{ manager.timeSinceLastHeartbeat | date: 'yyyy-MM-dd HH:mm:ss' }}</td>
5455
<td>{{ manager.slotsNumber }}</td>
5556
<td>{{ manager.freeSlots }}</td>
57+
<td>{{ manager.numberOfTasks }}</td>
5658
<td>{{ manager.hardware.cpuCores }}</td>
5759
<td [attr.title]="manager.hardware.physicalMemory + ' bytes'">
5860
{{ manager.hardware.physicalMemory | humanizeBytes }}

flink-runtime-web/web-dashboard/src/app/pages/task-manager/list/task-manager-list.component.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ export class TaskManagerListComponent implements OnInit, OnDestroy {
4949
public readonly sortHeartBeatFn = createSortFn(item => item.timeSinceLastHeartbeat);
5050
public readonly sortSlotsNumberFn = createSortFn(item => item.slotsNumber);
5151
public readonly sortFreeSlotsFn = createSortFn(item => item.freeSlots);
52+
public readonly sortTasksFn = createSortFn(item => item.numberOfTasks);
5253
public readonly sortCpuCoresFn = createSortFn(item => item.hardware?.cpuCores);
5354
public readonly sortPhysicalMemoryFn = createSortFn(item => item.hardware?.physicalMemory);
5455
public readonly sortFreeMemoryFn = createSortFn(item => item.hardware?.freeMemory);

flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.html

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
<flink-blocked-badge *ngIf="taskManagerDetail?.blocked"></flink-blocked-badge>
2323
</div>
2424
<nz-descriptions *ngIf="taskManagerDetail" nzBordered nzSize="small">
25-
<nz-descriptions-item [nzSpan]="2" nzTitle="Path">
25+
<nz-descriptions-item [nzSpan]="1" nzTitle="Path">
2626
{{ taskManagerDetail.path }}
2727
</nz-descriptions-item>
2828
<nz-descriptions-item [nzSpan]="1" nzTitle="Free/All Slots">
2929
{{ taskManagerDetail.freeSlots }} / {{ taskManagerDetail.slotsNumber }}
3030
</nz-descriptions-item>
31+
<nz-descriptions-item [nzSpan]="1" nzTitle="Tasks">
32+
{{ taskManagerDetail.numberOfTasks }}
33+
</nz-descriptions-item>
3134
<nz-descriptions-item [nzSpan]="1" nzTitle="Last Heartbeat">
3235
{{ taskManagerDetail.timeSinceLastHeartbeat | date: 'yyyy-MM-dd HH:mm:ss' }}
3336
</nz-descriptions-item>

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,7 @@ public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Dur
687687
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
688688
slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
689689
slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
690+
slotManager.getNumberOfTasksOf(taskExecutor.getInstanceID()),
690691
slotManager.getRegisteredResourceOf(taskExecutor.getInstanceID()),
691692
slotManager.getFreeResourceOf(taskExecutor.getInstanceID()),
692693
taskExecutor.getHardwareDescription(),
@@ -717,6 +718,7 @@ public CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo
717718
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
718719
slotManager.getNumberRegisteredSlotsOf(instanceId),
719720
slotManager.getNumberFreeSlotsOf(instanceId),
721+
slotManager.getNumberOfTasksOf(instanceId),
720722
slotManager.getRegisteredResourceOf(instanceId),
721723
slotManager.getFreeResourceOf(instanceId),
722724
taskExecutor.getHardwareDescription(),

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ClusterResourceStatisticsProvider.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,15 @@
2121
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
2222
import org.apache.flink.runtime.instance.InstanceID;
2323

24+
import javax.annotation.Nullable;
25+
2426
/** Provides statistics of cluster resources. */
2527
public interface ClusterResourceStatisticsProvider {
2628

29+
/** Get total number of tasks in the current instance. slots. */
30+
@Nullable
31+
Integer getNumberOfTasksOf(InstanceID instanceId);
32+
2733
/** Get total number of registered slots. */
2834
int getNumberRegisteredSlots();
2935

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,12 @@ private Set<PendingTaskManagerId> allocateTaskManagersAccordingTo(
755755
// Legacy APIs
756756
// ---------------------------------------------------------------------------------------------
757757

758+
@Nullable
759+
@Override
760+
public Integer getNumberOfTasksOf(InstanceID instanceId) {
761+
return taskManagerTracker.getNumberOfTasksOf(instanceId);
762+
}
763+
758764
@Override
759765
public int getNumberRegisteredSlots() {
760766
return taskManagerTracker.getNumberRegisteredSlots();

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

33+
import javax.annotation.Nullable;
34+
3335
import java.util.Collection;
3436
import java.util.Collections;
3537
import java.util.HashMap;
3638
import java.util.HashSet;
3739
import java.util.Map;
40+
import java.util.Objects;
3841
import java.util.Optional;
3942
import java.util.Set;
4043
import java.util.stream.Collectors;
@@ -315,6 +318,28 @@ public Collection<PendingTaskManager> getPendingTaskManagers() {
315318
Collections.emptySet()));
316319
}
317320

321+
@Nullable
322+
@Override
323+
public Integer getNumberOfTasksOf(InstanceID instanceId) {
324+
FineGrainedTaskManagerRegistration taskManagerRegistration =
325+
taskManagerRegistrations.get(instanceId);
326+
if (Objects.isNull(taskManagerRegistration)) {
327+
return null;
328+
}
329+
Integer totalNumberOfTasks = null;
330+
for (TaskManagerSlotInformation slot :
331+
taskManagerRegistration.getAllocatedSlots().values()) {
332+
Integer numberOfTasks = slot.getNumberOfTasks();
333+
if (Objects.isNull(totalNumberOfTasks) && Objects.isNull(numberOfTasks)) {
334+
continue;
335+
}
336+
totalNumberOfTasks = Objects.isNull(totalNumberOfTasks) ? 0 : totalNumberOfTasks;
337+
numberOfTasks = Objects.isNull(numberOfTasks) ? 0 : numberOfTasks;
338+
totalNumberOfTasks = totalNumberOfTasks + numberOfTasks;
339+
}
340+
return totalNumberOfTasks;
341+
}
342+
318343
@Override
319344
public int getNumberRegisteredSlots() {
320345
return taskManagerRegistrations.values().stream()

0 commit comments

Comments
 (0)