Skip to content

Commit a4fa131

Browse files
Merge pull request #6512 from kiryl-filatau:aws-5k
PiperOrigin-RevId: 895951188
2 parents f9fc3cc + b4e6dc7 commit a4fa131

File tree

9 files changed

+410
-250
lines changed

9 files changed

+410
-250
lines changed

perfkitbenchmarker/data/container/karpenter/nodepool.yaml.j2

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,25 @@ spec:
1515
- key: karpenter.sh/capacity-type
1616
operator: In
1717
values: ["on-demand"]
18+
{%- if KARPENTER_INSTANCE_TYPES %}
19+
- key: node.kubernetes.io/instance-type
20+
operator: In
21+
values: [{% for instance_type in KARPENTER_INSTANCE_TYPES %}"{{ instance_type }}"{% if not loop.last %}, {% endif %}{% endfor %}]
22+
{%- else %}
1823
- key: karpenter.k8s.aws/instance-category
1924
operator: In
20-
values: ["c", "m", "r"]
25+
values: ["c", "m", "r", "t"]
2126
- key: karpenter.k8s.aws/instance-generation
2227
operator: Gt
2328
values: ["2"]
29+
{%- endif %}
2430
nodeClassRef:
2531
group: karpenter.k8s.aws
2632
kind: EC2NodeClass
2733
name: default
2834
expireAfter: 720h # 30 * 24h = 720h
2935
limits:
30-
cpu: 1000
36+
cpu: {{ KARPENTER_NODEPOOL_CPU_LIMIT }}
3137
disruption:
3238
consolidationPolicy: WhenEmptyOrUnderutilized
3339
consolidateAfter: 1m

perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ def Prepare(bm_spec: benchmark_spec.BenchmarkSpec):
129129
"""Sets additional spec attributes."""
130130
bm_spec.always_call_cleanup = True
131131
assert bm_spec.container_cluster
132-
_EnsureEksKarpenterGpuNodepool(bm_spec.container_cluster)
132+
cluster = bm_spec.container_cluster
133+
assert isinstance(cluster, kubernetes_cluster.KubernetesCluster)
134+
_EnsureEksKarpenterGpuNodepool(cluster)
133135

134136

135137
def _GetScaleTimeout() -> int:
@@ -377,35 +379,33 @@ def GetStatusConditionsForResourceType(
377379
lastTransitionTime.
378380
"""
379381

380-
jsonpath = (
381-
r'{range .items[*]}'
382-
# e.g. '"pod-name-1234": [<condition1>, ...],\n'
383-
r'{"\""}{.metadata.name}{"\": "}{.status.conditions}{",\n"}'
384-
r'{end}'
385-
)
382+
# Use full JSON output to avoid invalid JSON when manually building from
383+
# jsonpath with many resources or on connection reset (truncated output).
384+
# Avoid logging huge JSON: kubernetes_scale uses num_replicas;
385+
# kubernetes_node_scale uses kubernetes_scale_num_nodes for the
386+
# same code path (get pod/node -o json).
386387
stdout, _, _ = kubectl.RunKubectlCommand(
387-
[
388-
'get',
389-
resource_type,
390-
'-o',
391-
'jsonpath=' + jsonpath,
392-
],
393-
timeout=60 * 2, # 2 minutes; should be a pretty fast call.
394-
# Output can be quite large, so we'll conditionally suppress it.
395-
suppress_logging=NUM_PODS.value > 20,
388+
['get', resource_type, '-o', 'json'],
389+
timeout=60 * 5, # 5 minutes for large clusters (e.g. 1000 pods)
390+
suppress_logging=(
391+
NUM_PODS.value > 20
392+
or getattr(FLAGS, 'kubernetes_scale_num_nodes', 5) > 20
393+
),
396394
)
397-
398-
# Convert output to valid json and parse it
399-
stdout = stdout.rstrip('\t\n\r ,')
400-
stdout = '{' + stdout + '}'
401-
name_to_conditions = json.loads(stdout)
395+
data = json.loads(stdout)
396+
name_to_conditions = {}
397+
for item in data.get('items', []):
398+
name = item.get('metadata', {}).get('name')
399+
conditions = item.get('status', {}).get('conditions')
400+
if name is not None and conditions is not None:
401+
name_to_conditions[name] = conditions
402402

403403
for key in resources_to_ignore:
404404
name_to_conditions.pop(key, None)
405405

406406
results = []
407407
failures = []
408-
for name in name_to_conditions.keys():
408+
for name in name_to_conditions:
409409
for conditions in name_to_conditions[name]:
410410
if not KubernetesResourceStatusCondition.IsValid(conditions):
411411
failures.append(conditions)

perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py

Lines changed: 100 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from collections import abc
2424
import json
2525
import logging
26+
import math
2627
import re
2728
from typing import Any
2829
from urllib import parse
@@ -992,9 +993,31 @@ def _PostIngressNetworkingFixups(
992993
'[PKB][EKS] Allowed ALB SG %s -> node SGs on port %s', alb_sg, port
993994
)
994995

996+
@staticmethod
997+
def _DefaultNodepoolInstanceTypes() -> list[str]:
998+
"""EC2 instance types for the default Karpenter NodePool manifest.
999+
1000+
Controlled by --eks_karpenter_nodepool_instance_types.
1001+
Empty list: Jinja template keeps instance-category/generation rules.
1002+
"""
1003+
return [
1004+
t.strip()
1005+
for t in FLAGS.eks_karpenter_nodepool_instance_types
1006+
if t.strip()
1007+
]
1008+
9951009
def _PostCreate(self):
9961010
"""Performs post-creation steps for the cluster."""
9971011
super()._PostCreate()
1012+
# Karpenter controller resources: default 1/1Gi; scale up when
1013+
# node_scale target exceeds 1000 nodes.
1014+
num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', None)
1015+
if num_nodes is not None and num_nodes > 1000:
1016+
controller_cpu, controller_memory = 4, '16Gi'
1017+
elif num_nodes is not None and num_nodes >= 500:
1018+
controller_cpu, controller_memory = 2, '8Gi'
1019+
else:
1020+
controller_cpu, controller_memory = 1, '1Gi'
9981021
vm_util.IssueCommand([
9991022
'helm',
10001023
'upgrade',
@@ -1013,13 +1036,13 @@ def _PostCreate(self):
10131036
'--set',
10141037
f'settings.interruptionQueue={self.name}',
10151038
'--set',
1016-
'controller.resources.requests.cpu=1',
1039+
f'controller.resources.requests.cpu={controller_cpu}',
10171040
'--set',
1018-
'controller.resources.requests.memory=1Gi',
1041+
f'controller.resources.requests.memory={controller_memory}',
10191042
'--set',
1020-
'controller.resources.limits.cpu=1',
1043+
f'controller.resources.limits.cpu={controller_cpu}',
10211044
'--set',
1022-
'controller.resources.limits.memory=1Gi',
1045+
f'controller.resources.limits.memory={controller_memory}',
10231046
'--set',
10241047
'logLevel=debug',
10251048
'--wait',
@@ -1057,10 +1080,16 @@ def _PostCreate(self):
10571080
'v'
10581081
+ full_version.strip().strip('"').split(f'{self.cluster_version}-v')[1]
10591082
)
1083+
# NodePool CPU limit: benchmark target nodes * vCPU + 5%, min 1000.
1084+
num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', 5)
1085+
vcpu_per_node = FLAGS.eks_karpenter_limits_vcpu_per_node
1086+
cpu_limit = max(1000, math.ceil(num_nodes * vcpu_per_node * 1.05))
10601087
kubernetes_commands.ApplyManifest(
10611088
'container/karpenter/nodepool.yaml.j2',
10621089
CLUSTER_NAME=self.name,
10631090
ALIAS_VERSION=alias_version,
1091+
KARPENTER_NODEPOOL_CPU_LIMIT=cpu_limit,
1092+
KARPENTER_INSTANCE_TYPES=self._DefaultNodepoolInstanceTypes(),
10641093
)
10651094

10661095
def _Delete(self):
@@ -1093,16 +1122,33 @@ def _DeleteDependencies(self):
10931122
# Start deleting the stack but likely to fail to delete this role.
10941123
vm_util.IssueCommand(delete_stack_cmd)
10951124
node_role = f'KarpenterNodeRole-{self.name}'
1096-
out, _, _ = vm_util.IssueCommand([
1097-
'aws',
1098-
'iam',
1099-
'list-instance-profiles-for-role',
1100-
'--role-name',
1101-
node_role,
1102-
'--region',
1103-
f'{self.region}',
1104-
])
1105-
profiles_json = json.loads(out)
1125+
out, _, retcode = vm_util.IssueCommand(
1126+
[
1127+
'aws',
1128+
'iam',
1129+
'list-instance-profiles-for-role',
1130+
'--role-name',
1131+
node_role,
1132+
'--region',
1133+
f'{self.region}',
1134+
],
1135+
suppress_failure=lambda stdout, stderr, rc: (
1136+
rc != 0
1137+
and (
1138+
'nosuchentity' in (stderr or '').lower()
1139+
or 'cannot be found' in (stderr or '').lower()
1140+
)
1141+
),
1142+
)
1143+
if retcode == 0 and out.strip():
1144+
profiles_json = json.loads(out)
1145+
else:
1146+
logging.info(
1147+
'Karpenter node role %s not found or empty response; skipping'
1148+
' instance profile cleanup',
1149+
node_role,
1150+
)
1151+
profiles_json = {'InstanceProfiles': []}
11061152
for profile in profiles_json.get('InstanceProfiles', []):
11071153
profile_name = profile['InstanceProfileName']
11081154
vm_util.IssueCommand([
@@ -1149,21 +1195,21 @@ def _CleanupKarpenter(self):
11491195
"""Cleanup Karpenter managed nodes before cluster deletion."""
11501196
logging.info('Cleaning up Karpenter nodes...')
11511197
# Delete NodePool resources - this will trigger node termination
1152-
kubectl.RunKubectlCommand(
1198+
kubectl.RunRetryableKubectlCommand(
11531199
[
11541200
'delete',
11551201
'nodepool,ec2nodeclass',
11561202
'--all',
11571203
'--timeout=120s',
11581204
],
1205+
timeout=300,
11591206
suppress_failure=lambda stdout, stderr, retcode: (
11601207
'no resources found' in stderr.lower()
11611208
or 'not found' in stderr.lower()
1162-
or 'timed out waiting for the condition' in stderr.lower()
11631209
),
11641210
)
11651211
# Wait for all Karpenter nodes to be deleted
1166-
kubectl.RunKubectlCommand(
1212+
kubectl.RunRetryableKubectlCommand(
11671213
[
11681214
'wait',
11691215
'--for=delete',
@@ -1172,11 +1218,13 @@ def _CleanupKarpenter(self):
11721218
'karpenter.sh/nodepool',
11731219
'--timeout=120s',
11741220
],
1221+
timeout=300,
11751222
suppress_failure=lambda stdout, stderr, retcode: (
11761223
'no matching resources found' in stderr.lower()
1177-
or 'timed out' in stderr.lower()
1224+
or 'no resources found' in stderr.lower()
11781225
),
11791226
)
1227+
11801228
# Force terminate remaining EC2 instances
11811229
stdout, _, _ = vm_util.IssueCommand(
11821230
[
@@ -1246,21 +1294,40 @@ def _CleanupKarpenter(self):
12461294
if eni_ids:
12471295
logging.info('Deleting %d orphaned network interfaces', len(eni_ids))
12481296
for eni_id in eni_ids:
1249-
vm_util.IssueCommand(
1250-
[
1251-
'aws',
1252-
'ec2',
1253-
'delete-network-interface',
1254-
'--region',
1255-
self.region,
1256-
'--network-interface-id',
1257-
eni_id,
1258-
],
1259-
suppress_failure=lambda stdout, stderr, retcode: (
1260-
'not found' in stderr.lower()
1261-
or 'does not exist' in stderr.lower()
1262-
),
1263-
)
1297+
# Bind eni_id by default to avoid loop closure issues if
1298+
# this is refactored.
1299+
def _DeleteOneEni(eni_id=eni_id) -> None:
1300+
_, stderr, retcode = vm_util.IssueCommand(
1301+
[
1302+
'aws',
1303+
'ec2',
1304+
'delete-network-interface',
1305+
'--region',
1306+
self.region,
1307+
'--network-interface-id',
1308+
eni_id,
1309+
],
1310+
raise_on_failure=False,
1311+
)
1312+
if retcode == 0:
1313+
return
1314+
stderr_lower = (stderr or '').lower()
1315+
# ENI already deleted (e.g. by another process or previous attempt).
1316+
if 'invalidnetworkinterfaceid.notfound' in stderr_lower:
1317+
return
1318+
# RequestLimitExceeded (throttle): retry via vm_util.Retry.
1319+
if 'requestlimitexceeded' in stderr_lower:
1320+
raise errors.Resource.RetryableDeletionError(stderr or '')
1321+
raise errors.VmUtil.IssueCommandError(
1322+
f'DeleteNetworkInterface failed: {stderr}'
1323+
)
1324+
1325+
# max_retries=5 yields 6 CLI attempts (tries > 5 on 6th failure).
1326+
vm_util.Retry(
1327+
poll_interval=10,
1328+
max_retries=5,
1329+
retryable_exceptions=(errors.Resource.RetryableDeletionError,),
1330+
)(_DeleteOneEni)()
12641331

12651332
def _IsReady(self):
12661333
"""Returns True if cluster is running. Autopilot defaults to 0 nodes."""

perfkitbenchmarker/providers/aws/flags.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,19 @@
233233
'Whether to install AWS Load Balancer Controller in EKS Karpenter clusters'
234234
'Default value - do not install unless explicitly requested',
235235
)
236+
flags.DEFINE_integer(
237+
'eks_karpenter_limits_vcpu_per_node',
238+
2,
239+
'Assumed vCPUs per provisioned node when computing Karpenter NodePool '
240+
'limits.cpu on EKS (uses kubernetes_scale_num_nodes, this value, and 5% '
241+
'headroom; minimum limit 1000). Raise for larger EC2 instance shapes.',
242+
)
243+
flags.DEFINE_list(
244+
'eks_karpenter_nodepool_instance_types',
245+
[],
246+
'Comma-separated EC2 types for the Karpenter default NodePool (worker '
247+
'nodes only). Empty keeps instance-category/generation in the template.',
248+
)
236249
AWS_CAPACITY_BLOCK_RESERVATION_ID = flags.DEFINE_string(
237250
'aws_capacity_block_reservation_id',
238251
None,

perfkitbenchmarker/resources/container_service/kubectl.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
'error sending request:',
1919
'(abnormal closure): unexpected EOF',
2020
'deadline exceeded',
21+
# kubectl wait/delete timeouts and connection errors
22+
# (retried in EKS cleanup)
23+
'timed out',
24+
'unable to connect to the server',
2125
]
2226

2327

@@ -38,8 +42,9 @@ def _DetectTimeoutViaSuppressFailure(stdout, stderr, retcode):
3842
# Check for kubectl timeout. If found, treat it the same as a regular
3943
# timeout.
4044
if retcode != 0:
45+
stderr_lower = stderr.lower()
4146
for error_substring in RETRYABLE_KUBECTL_ERRORS:
42-
if error_substring in stderr:
47+
if error_substring.lower() in stderr_lower:
4348
# Raise timeout error regardless of raise_on_failure - as the intended
4449
# semantics is to ignore expected errors caused by invoking the
4550
# command not errors from PKB infrastructure.

perfkitbenchmarker/resources/container_service/kubernetes_cluster.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ def _DeleteAllFromDefaultNamespace():
325325
run_cmd = ['delete', 'job', '--all', '-n', 'default']
326326
kubectl.RunRetryableKubectlCommand(run_cmd)
327327

328-
timeout = 60 * 20
328+
timeout = 60 * 60 # 1 hour for kubectl delete all -n default (teardown)
329329
run_cmd = [
330330
'delete',
331331
'all',

0 commit comments

Comments
 (0)